1use std::sync::mpsc::Receiver;
2
3use log::debug;
4use pallas_machines::{
5 primitives::Point, Agent, CodecError, DecodePayload, EncodePayload, MachineOutput,
6 PayloadDecoder, PayloadEncoder, Transition,
7};
8
9#[derive(Debug, PartialEq, Clone)]
10pub enum State {
11 Idle,
12 Busy,
13 Streaming,
14 Done,
15}
16
17#[derive(Debug)]
18pub enum Message {
19 RequestRange { range: (Point, Point) },
20 ClientDone,
21 StartBatch,
22 NoBlocks,
23 Block { body: Vec<u8> },
24 BatchDone,
25}
26
27impl EncodePayload for Message {
28 fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
29 match self {
30 Message::RequestRange { range } => {
31 e.array(3)?.u16(0)?;
32 range.0.encode_payload(e)?;
33 range.1.encode_payload(e)?;
34 Ok(())
35 }
36 Message::ClientDone => {
37 e.array(1)?.u16(1)?;
38 Ok(())
39 }
40 Message::StartBatch => {
41 e.array(1)?.u16(2)?;
42 Ok(())
43 }
44 Message::NoBlocks => {
45 e.array(1)?.u16(3)?;
46 Ok(())
47 }
48 Message::Block { body } => {
49 e.array(2)?.u16(4)?;
50 e.bytes(body)?;
51 Ok(())
52 }
53 Message::BatchDone => {
54 e.array(1)?.u16(5)?;
55 Ok(())
56 }
57 }
58 }
59}
60
61impl DecodePayload for Message {
62 fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
63 d.array()?;
64 let label = d.u16()?;
65
66 match label {
67 0 => {
68 let point1 = Point::decode_payload(d)?;
69 let point2 = Point::decode_payload(d)?;
70 Ok(Message::RequestRange {
71 range: (point1, point2),
72 })
73 }
74 1 => Ok(Message::ClientDone),
75 2 => Ok(Message::StartBatch),
76 3 => Ok(Message::NoBlocks),
77 4 => {
78 d.tag()?;
79 let body = d.bytes()?;
80 Ok(Message::Block {
81 body: Vec::from(body),
82 })
83 }
84 5 => Ok(Message::BatchDone),
85 x => Err(Box::new(CodecError::BadLabel(x))),
86 }
87 }
88}
89
90pub trait Observer {
91 fn on_block_received(&self, body: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
92 log::debug!("block received, sice: {}", body.len());
93 Ok(())
94 }
95
96 fn on_block_range_requested(
97 &self,
98 range: &(Point, Point),
99 ) -> Result<(), Box<dyn std::error::Error>> {
100 log::debug!(
101 "block range requested, from: {:?}, to: {:?}",
102 range.0,
103 range.1
104 );
105 Ok(())
106 }
107}
108
109#[derive(Debug)]
110pub struct NoopObserver {}
111
112impl Observer for NoopObserver {}
113
114#[derive(Debug)]
115pub struct BatchClient<O>
116where
117 O: Observer,
118{
119 pub state: State,
120 pub range: (Point, Point),
121 pub observer: O,
122}
123
124impl<O> BatchClient<O>
125where
126 O: Observer,
127{
128 pub fn initial(range: (Point, Point), observer: O) -> Self {
129 Self {
130 state: State::Idle,
131 range,
132 observer,
133 }
134 }
135
136 fn send_request_range(self, tx: &impl MachineOutput) -> Transition<Self> {
137 let msg = Message::RequestRange {
138 range: self.range.clone(),
139 };
140
141 tx.send_msg(&msg)?;
142
143 self.observer.on_block_range_requested(&self.range)?;
144
145 Ok(Self {
146 state: State::Busy,
147 ..self
148 })
149 }
150
151 fn on_block(self, body: Vec<u8>) -> Transition<Self> {
152 debug!("received block body, size {}", body.len());
153
154 self.observer.on_block_received(body)?;
155
156 Ok(self)
157 }
158}
159
160impl<O> Agent for BatchClient<O>
161where
162 O: Observer,
163{
164 type Message = Message;
165
166 fn is_done(&self) -> bool {
167 self.state == State::Done
168 }
169
170 fn has_agency(&self) -> bool {
171 match self.state {
172 State::Idle => true,
173 State::Busy => false,
174 State::Streaming => false,
175 State::Done => false,
176 }
177 }
178
179 fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
180 match self.state {
181 State::Idle => self.send_request_range(tx),
182 _ => panic!("I don't have agency, don't know what to do"),
183 }
184 }
185
186 fn receive_next(self, msg: Self::Message) -> Transition<Self> {
187 match (&self.state, msg) {
188 (State::Busy, Message::StartBatch) => Ok(Self {
189 state: State::Streaming,
190 ..self
191 }),
192 (State::Busy, Message::NoBlocks) => Ok(Self {
193 state: State::Done,
194 ..self
195 }),
196 (State::Streaming, Message::Block { body }) => self.on_block(body),
197 (State::Streaming, Message::BatchDone) => Ok(Self {
198 state: State::Done,
199 ..self
200 }),
201 _ => panic!("I have agency, I don't expect messages"),
202 }
203 }
204}
205
206#[derive(Debug)]
207pub struct OnDemandClient<O>
208where
209 O: Observer,
210{
211 pub state: State,
212 pub requests: Receiver<Point>,
213 pub observer: O,
214}
215
216impl<O> OnDemandClient<O>
217where
218 O: Observer,
219{
220 pub fn initial(requests: Receiver<Point>, observer: O) -> Self {
221 Self {
222 state: State::Idle,
223 requests,
224 observer,
225 }
226 }
227
228 fn wait_for_request_and_send(self, tx: &impl MachineOutput) -> Transition<Self> {
229 let point = self.requests.recv()?;
230
231 let msg = Message::RequestRange {
232 range: (point.clone(), point),
233 };
234
235 tx.send_msg(&msg)?;
236
237 Ok(Self {
238 state: State::Busy,
239 ..self
240 })
241 }
242
243 fn on_block(self, body: Vec<u8>) -> Transition<Self> {
244 debug!("received block body, size {}", body.len());
245
246 self.observer.on_block_received(body)?;
247
248 Ok(self)
249 }
250}
251
252impl<O> Agent for OnDemandClient<O>
253where
254 O: Observer,
255{
256 type Message = Message;
257
258 fn is_done(&self) -> bool {
261 false
262 }
263
264 fn has_agency(&self) -> bool {
265 match self.state {
266 State::Idle => true,
267 State::Busy => false,
268 State::Streaming => false,
269 State::Done => false,
270 }
271 }
272
273 fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
274 match self.state {
275 State::Idle => self.wait_for_request_and_send(tx),
276 _ => panic!("I don't have agency, don't know what to do"),
277 }
278 }
279
280 fn receive_next(self, msg: Self::Message) -> Transition<Self> {
281 match (&self.state, msg) {
282 (State::Busy, Message::StartBatch) => Ok(Self {
283 state: State::Streaming,
284 ..self
285 }),
286 (State::Busy, Message::NoBlocks) => Ok(Self {
287 state: State::Idle,
288 ..self
289 }),
290 (State::Streaming, Message::Block { body }) => self.on_block(body),
291 (State::Streaming, Message::BatchDone) => Ok(Self {
292 state: State::Idle,
293 ..self
294 }),
295 _ => panic!("I have agency, I don't expect messages"),
296 }
297 }
298}