pallas_localstate/
lib.rs

1mod codec;
2pub mod queries;
3
4use std::fmt::Debug;
5
6use log::debug;
7
8use pallas_machines::{
9    primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
10};
11
12#[derive(Debug, PartialEq, Clone)]
13pub enum State {
14    Idle,
15    Acquiring,
16    Acquired,
17    Querying,
18    Done,
19}
20
21#[derive(Debug)]
22pub enum AcquireFailure {
23    PointTooOld,
24    PointNotInChain,
25}
26pub trait Query: Debug {
27    type Request: EncodePayload + DecodePayload + Clone + Debug;
28    type Response: EncodePayload + DecodePayload + Clone + Debug;
29}
30
31#[derive(Debug)]
32pub enum Message<Q: Query> {
33    Acquire(Option<Point>),
34    Failure(AcquireFailure),
35    Acquired,
36    Query(Q::Request),
37    Result(Q::Response),
38    ReAcquire(Option<Point>),
39    Release,
40    Done,
41}
42
43pub type Output<QR> = Result<QR, AcquireFailure>;
44
45#[derive(Debug)]
46pub struct OneShotClient<Q: Query> {
47    pub state: State,
48    pub check_point: Option<Point>,
49    pub request: Q::Request,
50    pub output: Option<Output<Q::Response>>,
51}
52
53impl<Q: Query> OneShotClient<Q> {
54    pub fn initial(check_point: Option<Point>, request: Q::Request) -> Self {
55        Self {
56            state: State::Idle,
57            output: None,
58            check_point,
59            request,
60        }
61    }
62
63    fn send_acquire(self, tx: &impl MachineOutput) -> Transition<Self> {
64        let msg = Message::<Q>::Acquire(self.check_point.clone());
65
66        tx.send_msg(&msg)?;
67
68        Ok(Self {
69            state: State::Acquiring,
70            ..self
71        })
72    }
73
74    fn send_query(self, tx: &impl MachineOutput) -> Transition<Self> {
75        let msg = Message::<Q>::Query(self.request.clone());
76
77        tx.send_msg(&msg)?;
78
79        Ok(Self {
80            state: State::Querying,
81            ..self
82        })
83    }
84
85    fn send_release(self, tx: &impl MachineOutput) -> Transition<Self> {
86        let msg = Message::<Q>::Release;
87
88        tx.send_msg(&msg)?;
89
90        Ok(Self {
91            state: State::Idle,
92            ..self
93        })
94    }
95
96    fn on_acquired(self) -> Transition<Self> {
97        debug!("acquired check point for chain state");
98
99        Ok(Self {
100            state: State::Acquired,
101            ..self
102        })
103    }
104
105    fn on_result(self, response: Q::Response) -> Transition<Self> {
106        debug!("query result received: {:?}", response);
107
108        Ok(Self {
109            state: State::Acquired,
110            output: Some(Ok(response)),
111            ..self
112        })
113    }
114
115    fn on_failure(self, failure: AcquireFailure) -> Transition<Self> {
116        debug!("acquire failure: {:?}", failure);
117
118        Ok(Self {
119            state: State::Idle,
120            output: Some(Err(failure)),
121            ..self
122        })
123    }
124
125    fn done(self) -> Transition<Self> {
126        Ok(Self {
127            state: State::Done,
128            ..self
129        })
130    }
131}
132
133impl<Q: Query + 'static> Agent for OneShotClient<Q> {
134    type Message = Message<Q>;
135
136    fn is_done(&self) -> bool {
137        self.state == State::Done
138    }
139
140    #[allow(clippy::match_like_matches_macro)]
141    fn has_agency(&self) -> bool {
142        match self.state {
143            State::Idle => true,
144            State::Acquired => true,
145            _ => false,
146        }
147    }
148
149    fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
150        match (&self.state, &self.output) {
151            // if we're idle and without a result, assume start of flow
152            (State::Idle, None) => self.send_acquire(tx),
153            // if we're idle and with a result, assume end of flow
154            (State::Idle, Some(_)) => self.done(),
155            // if we don't have an output, assume start of query
156            (State::Acquired, None) => self.send_query(tx),
157            // if we have an output but still acquired, release the server
158            (State::Acquired, Some(_)) => self.send_release(tx),
159            _ => panic!("I don't have agency, don't know what to do"),
160        }
161    }
162
163    fn receive_next(self, msg: Self::Message) -> Transition<Self> {
164        match (&self.state, msg) {
165            (State::Acquiring, Message::Acquired) => self.on_acquired(),
166            (State::Acquiring, Message::Failure(failure)) => self.on_failure(failure),
167            (State::Querying, Message::Result(result)) => self.on_result(result),
168            (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
169        }
170    }
171}