pallas_chainsync/
clients.rs

1use std::fmt::Debug;
2
3use log::{debug, log_enabled, trace};
4
5use pallas_machines::{
6    primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
7};
8
9use crate::{Message, State, Tip};
10
11/// A trait to deal with polymorphic payloads in the ChainSync protocol
12/// (WrappedHeader vs BlockBody)
13pub trait BlockLike: EncodePayload + DecodePayload + Debug {
14    fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>>;
15}
16
17/// An observer of chain-sync events sent by the state-machine
18pub trait Observer<C>
19where
20    C: Debug,
21{
22    fn on_block(
23        &self,
24        cursor: &Option<Point>,
25        content: &C,
26    ) -> Result<(), Box<dyn std::error::Error>> {
27        log::debug!(
28            "asked to save block content {:?} at cursor {:?}",
29            content,
30            cursor
31        );
32        Ok(())
33    }
34
35    fn on_intersect_found(
36        &self,
37        point: &Point,
38        tip: &Tip,
39    ) -> Result<(), Box<dyn std::error::Error>> {
40        log::debug!("intersect was found {:?} (tip: {:?})", point, tip);
41        Ok(())
42    }
43
44    fn on_rollback(&self, point: &Point) -> Result<(), Box<dyn std::error::Error>> {
45        log::debug!("asked to roll back {:?}", point);
46        Ok(())
47    }
48    fn on_tip_reached(&self) -> Result<(), Box<dyn std::error::Error>> {
49        log::debug!("tip was reached");
50        Ok(())
51    }
52}
53
54#[derive(Debug)]
55pub struct NoopObserver {}
56
57impl<C> Observer<C> for NoopObserver where C: Debug {}
58
59#[derive(Debug)]
60pub struct Consumer<C, O>
61where
62    O: Observer<C>,
63    C: Debug,
64{
65    pub state: State,
66    pub known_points: Vec<Point>,
67    pub cursor: Option<Point>,
68    pub tip: Option<Tip>,
69
70    observer: O,
71
72    // as recommended here: https://doc.rust-lang.org/error-index.html#E0207
73    _phantom: Option<C>,
74}
75
76impl<C, O> Consumer<C, O>
77where
78    C: BlockLike + EncodePayload + DecodePayload + Debug,
79    O: Observer<C>,
80{
81    pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
82        Self {
83            state: State::Idle,
84            cursor: None,
85            tip: None,
86            known_points,
87            observer,
88
89            _phantom: None,
90        }
91    }
92
93    fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
94        let msg = Message::<C>::FindIntersect(self.known_points.clone());
95
96        tx.send_msg(&msg)?;
97
98        Ok(Self {
99            state: State::Intersect,
100            ..self
101        })
102    }
103
104    fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
105        let msg = Message::<C>::RequestNext;
106
107        tx.send_msg(&msg)?;
108
109        Ok(Self {
110            state: State::CanAwait,
111            ..self
112        })
113    }
114
115    fn on_intersect_found(self, point: Point, tip: Tip) -> Transition<Self> {
116        debug!("intersect found: {:?} (tip: {:?})", point, tip);
117
118        self.observer.on_intersect_found(&point, &tip)?;
119
120        Ok(Self {
121            tip: Some(tip),
122            cursor: Some(point),
123            state: State::Idle,
124            ..self
125        })
126    }
127
128    fn on_intersect_not_found(self, tip: Tip) -> Transition<Self> {
129        debug!("intersect not found (tip: {:?})", tip);
130
131        Ok(Self {
132            tip: Some(tip),
133            cursor: None,
134            state: State::Done,
135            ..self
136        })
137    }
138
139    fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
140        debug!("rolling forward");
141
142        let point = content.block_point()?;
143
144        if log_enabled!(log::Level::Trace) {
145            trace!("content: {:?}", content);
146        }
147
148        debug!("reporting block to observer");
149        self.observer.on_block(&self.cursor, &content)?;
150
151        Ok(Self {
152            cursor: Some(point),
153            tip: Some(tip),
154            state: State::Idle,
155            ..self
156        })
157    }
158
159    fn on_roll_backward(self, point: Point, tip: Tip) -> Transition<Self> {
160        debug!("rolling backward to point: {:?}", point);
161
162        debug!("reporting rollback to observer");
163        self.observer.on_rollback(&point)?;
164
165        Ok(Self {
166            tip: Some(tip),
167            cursor: Some(point),
168            state: State::Idle,
169            ..self
170        })
171    }
172
173    fn on_await_reply(self) -> Transition<Self> {
174        debug!("reached tip, await reply");
175
176        debug!("reporting tip to observer");
177        self.observer.on_tip_reached()?;
178
179        Ok(Self {
180            state: State::MustReply,
181            ..self
182        })
183    }
184}
185
186impl<C, O> Agent for Consumer<C, O>
187where
188    C: BlockLike + EncodePayload + DecodePayload + Debug + 'static,
189    O: Observer<C>,
190{
191    type Message = Message<C>;
192
193    fn is_done(&self) -> bool {
194        self.state == State::Done
195    }
196
197    fn has_agency(&self) -> bool {
198        match self.state {
199            State::Idle => true,
200            State::CanAwait => false,
201            State::MustReply => false,
202            State::Intersect => false,
203            State::Done => false,
204        }
205    }
206
207    fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
208        match self.state {
209            State::Idle => match self.cursor {
210                Some(_) => self.send_request_next(tx),
211                None => self.send_find_intersect(tx),
212            },
213            _ => panic!("I don't have agency, don't know what to do"),
214        }
215    }
216
217    fn receive_next(self, msg: Self::Message) -> Transition<Self> {
218        match (&self.state, msg) {
219            (State::CanAwait, Message::RollForward(header, tip)) => {
220                self.on_roll_forward(header, tip)
221            }
222            (State::CanAwait, Message::RollBackward(point, tip)) => {
223                self.on_roll_backward(point, tip)
224            }
225            (State::CanAwait, Message::AwaitReply) => self.on_await_reply(),
226            (State::MustReply, Message::RollForward(header, tip)) => {
227                self.on_roll_forward(header, tip)
228            }
229            (State::MustReply, Message::RollBackward(point, tip)) => {
230                self.on_roll_backward(point, tip)
231            }
232            (State::Intersect, Message::IntersectFound(point, tip)) => {
233                self.on_intersect_found(point, tip)
234            }
235            (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
236            (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
237        }
238    }
239}
240
241#[derive(Debug)]
242pub struct TipFinder {
243    pub state: State,
244    pub wellknown_point: Point,
245    pub output: Option<Tip>,
246}
247
248impl TipFinder {
249    pub fn initial(wellknown_point: Point) -> Self {
250        TipFinder {
251            wellknown_point,
252            output: None,
253            state: State::Idle,
254        }
255    }
256
257    fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
258        let msg = Message::<NoopContent>::FindIntersect(vec![self.wellknown_point.clone()]);
259
260        tx.send_msg(&msg)?;
261
262        Ok(Self {
263            state: State::Intersect,
264            ..self
265        })
266    }
267
268    fn on_intersect_found(self, tip: Tip) -> Transition<Self> {
269        debug!("intersect found with tip: {:?}", tip);
270
271        Ok(Self {
272            state: State::Done,
273            output: Some(tip),
274            ..self
275        })
276    }
277
278    fn on_intersect_not_found(self, tip: Tip) -> Transition<Self> {
279        debug!("intersect not found but still have a tip: {:?}", tip);
280
281        Ok(Self {
282            state: State::Done,
283            output: Some(tip),
284            ..self
285        })
286    }
287}
288
289#[derive(Debug)]
290pub struct NoopContent {}
291
292impl EncodePayload for NoopContent {
293    fn encode_payload(
294        &self,
295        _e: &mut pallas_machines::PayloadEncoder,
296    ) -> Result<(), Box<dyn std::error::Error>> {
297        todo!()
298    }
299}
300
301impl DecodePayload for NoopContent {
302    fn decode_payload(
303        _d: &mut pallas_machines::PayloadDecoder,
304    ) -> Result<Self, Box<dyn std::error::Error>> {
305        todo!()
306    }
307}
308
309impl BlockLike for NoopContent {
310    fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
311        todo!()
312    }
313}
314
315impl Agent for TipFinder {
316    type Message = Message<NoopContent>;
317
318    fn is_done(&self) -> bool {
319        self.state == State::Done
320    }
321
322    fn has_agency(&self) -> bool {
323        match self.state {
324            State::Idle => true,
325            State::CanAwait => false,
326            State::MustReply => false,
327            State::Intersect => false,
328            State::Done => false,
329        }
330    }
331
332    fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
333        match self.state {
334            State::Idle => self.send_find_intersect(tx),
335            _ => panic!("I don't have agency, don't know what to do"),
336        }
337    }
338
339    fn receive_next(self, msg: Self::Message) -> Transition<Self> {
340        match (&self.state, msg) {
341            (State::Intersect, Message::IntersectFound(_point, tip)) => {
342                self.on_intersect_found(tip)
343            }
344            (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
345            (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
346        }
347    }
348}