pallas_blockfetch/
lib.rs

1use std::sync::mpsc::Receiver;
2
3use log::debug;
4use pallas_machines::{
5    primitives::Point, Agent, CodecError, DecodePayload, EncodePayload, MachineOutput,
6    PayloadDecoder, PayloadEncoder, Transition,
7};
8
9#[derive(Debug, PartialEq, Clone)]
10pub enum State {
11    Idle,
12    Busy,
13    Streaming,
14    Done,
15}
16
17#[derive(Debug)]
18pub enum Message {
19    RequestRange { range: (Point, Point) },
20    ClientDone,
21    StartBatch,
22    NoBlocks,
23    Block { body: Vec<u8> },
24    BatchDone,
25}
26
27impl EncodePayload for Message {
28    fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
29        match self {
30            Message::RequestRange { range } => {
31                e.array(3)?.u16(0)?;
32                range.0.encode_payload(e)?;
33                range.1.encode_payload(e)?;
34                Ok(())
35            }
36            Message::ClientDone => {
37                e.array(1)?.u16(1)?;
38                Ok(())
39            }
40            Message::StartBatch => {
41                e.array(1)?.u16(2)?;
42                Ok(())
43            }
44            Message::NoBlocks => {
45                e.array(1)?.u16(3)?;
46                Ok(())
47            }
48            Message::Block { body } => {
49                e.array(2)?.u16(4)?;
50                e.bytes(body)?;
51                Ok(())
52            }
53            Message::BatchDone => {
54                e.array(1)?.u16(5)?;
55                Ok(())
56            }
57        }
58    }
59}
60
61impl DecodePayload for Message {
62    fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
63        d.array()?;
64        let label = d.u16()?;
65
66        match label {
67            0 => {
68                let point1 = Point::decode_payload(d)?;
69                let point2 = Point::decode_payload(d)?;
70                Ok(Message::RequestRange {
71                    range: (point1, point2),
72                })
73            }
74            1 => Ok(Message::ClientDone),
75            2 => Ok(Message::StartBatch),
76            3 => Ok(Message::NoBlocks),
77            4 => {
78                d.tag()?;
79                let body = d.bytes()?;
80                Ok(Message::Block {
81                    body: Vec::from(body),
82                })
83            }
84            5 => Ok(Message::BatchDone),
85            x => Err(Box::new(CodecError::BadLabel(x))),
86        }
87    }
88}
89
90pub trait Observer {
91    fn on_block_received(&self, body: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
92        log::debug!("block received, sice: {}", body.len());
93        Ok(())
94    }
95
96    fn on_block_range_requested(
97        &self,
98        range: &(Point, Point),
99    ) -> Result<(), Box<dyn std::error::Error>> {
100        log::debug!(
101            "block range requested, from: {:?}, to: {:?}",
102            range.0,
103            range.1
104        );
105        Ok(())
106    }
107}
108
109#[derive(Debug)]
110pub struct NoopObserver {}
111
112impl Observer for NoopObserver {}
113
114#[derive(Debug)]
115pub struct BatchClient<O>
116where
117    O: Observer,
118{
119    pub state: State,
120    pub range: (Point, Point),
121    pub observer: O,
122}
123
124impl<O> BatchClient<O>
125where
126    O: Observer,
127{
128    pub fn initial(range: (Point, Point), observer: O) -> Self {
129        Self {
130            state: State::Idle,
131            range,
132            observer,
133        }
134    }
135
136    fn send_request_range(self, tx: &impl MachineOutput) -> Transition<Self> {
137        let msg = Message::RequestRange {
138            range: self.range.clone(),
139        };
140
141        tx.send_msg(&msg)?;
142
143        self.observer.on_block_range_requested(&self.range)?;
144
145        Ok(Self {
146            state: State::Busy,
147            ..self
148        })
149    }
150
151    fn on_block(self, body: Vec<u8>) -> Transition<Self> {
152        debug!("received block body, size {}", body.len());
153
154        self.observer.on_block_received(body)?;
155
156        Ok(self)
157    }
158}
159
160impl<O> Agent for BatchClient<O>
161where
162    O: Observer,
163{
164    type Message = Message;
165
166    fn is_done(&self) -> bool {
167        self.state == State::Done
168    }
169
170    fn has_agency(&self) -> bool {
171        match self.state {
172            State::Idle => true,
173            State::Busy => false,
174            State::Streaming => false,
175            State::Done => false,
176        }
177    }
178
179    fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
180        match self.state {
181            State::Idle => self.send_request_range(tx),
182            _ => panic!("I don't have agency, don't know what to do"),
183        }
184    }
185
186    fn receive_next(self, msg: Self::Message) -> Transition<Self> {
187        match (&self.state, msg) {
188            (State::Busy, Message::StartBatch) => Ok(Self {
189                state: State::Streaming,
190                ..self
191            }),
192            (State::Busy, Message::NoBlocks) => Ok(Self {
193                state: State::Done,
194                ..self
195            }),
196            (State::Streaming, Message::Block { body }) => self.on_block(body),
197            (State::Streaming, Message::BatchDone) => Ok(Self {
198                state: State::Done,
199                ..self
200            }),
201            _ => panic!("I have agency, I don't expect messages"),
202        }
203    }
204}
205
206#[derive(Debug)]
207pub struct OnDemandClient<O>
208where
209    O: Observer,
210{
211    pub state: State,
212    pub requests: Receiver<Point>,
213    pub observer: O,
214}
215
216impl<O> OnDemandClient<O>
217where
218    O: Observer,
219{
220    pub fn initial(requests: Receiver<Point>, observer: O) -> Self {
221        Self {
222            state: State::Idle,
223            requests,
224            observer,
225        }
226    }
227
228    fn wait_for_request_and_send(self, tx: &impl MachineOutput) -> Transition<Self> {
229        let point = self.requests.recv()?;
230
231        let msg = Message::RequestRange {
232            range: (point.clone(), point),
233        };
234
235        tx.send_msg(&msg)?;
236
237        Ok(Self {
238            state: State::Busy,
239            ..self
240        })
241    }
242
243    fn on_block(self, body: Vec<u8>) -> Transition<Self> {
244        debug!("received block body, size {}", body.len());
245
246        self.observer.on_block_received(body)?;
247
248        Ok(self)
249    }
250}
251
252impl<O> Agent for OnDemandClient<O>
253where
254    O: Observer,
255{
256    type Message = Message;
257
258    // we're never done because we react to external work requests.
259    // TODO: see if we can inspect mpsc channel status and stop if disconnected
260    fn is_done(&self) -> bool {
261        false
262    }
263
264    fn has_agency(&self) -> bool {
265        match self.state {
266            State::Idle => true,
267            State::Busy => false,
268            State::Streaming => false,
269            State::Done => false,
270        }
271    }
272
273    fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
274        match self.state {
275            State::Idle => self.wait_for_request_and_send(tx),
276            _ => panic!("I don't have agency, don't know what to do"),
277        }
278    }
279
280    fn receive_next(self, msg: Self::Message) -> Transition<Self> {
281        match (&self.state, msg) {
282            (State::Busy, Message::StartBatch) => Ok(Self {
283                state: State::Streaming,
284                ..self
285            }),
286            (State::Busy, Message::NoBlocks) => Ok(Self {
287                state: State::Idle,
288                ..self
289            }),
290            (State::Streaming, Message::Block { body }) => self.on_block(body),
291            (State::Streaming, Message::BatchDone) => Ok(Self {
292                state: State::Idle,
293                ..self
294            }),
295            _ => panic!("I have agency, I don't expect messages"),
296        }
297    }
298}