1use std::fmt::Debug;
2
3use log::{debug, log_enabled, trace};
4
5use pallas_machines::{
6 primitives::Point, Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
7};
8
9use crate::{Message, State, Tip};
10
11pub trait BlockLike: EncodePayload + DecodePayload + Debug {
14 fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>>;
15}
16
17pub trait Observer<C>
19where
20 C: Debug,
21{
22 fn on_block(
23 &self,
24 cursor: &Option<Point>,
25 content: &C,
26 ) -> Result<(), Box<dyn std::error::Error>> {
27 log::debug!(
28 "asked to save block content {:?} at cursor {:?}",
29 content,
30 cursor
31 );
32 Ok(())
33 }
34
35 fn on_intersect_found(
36 &self,
37 point: &Point,
38 tip: &Tip,
39 ) -> Result<(), Box<dyn std::error::Error>> {
40 log::debug!("intersect was found {:?} (tip: {:?})", point, tip);
41 Ok(())
42 }
43
44 fn on_rollback(&self, point: &Point) -> Result<(), Box<dyn std::error::Error>> {
45 log::debug!("asked to roll back {:?}", point);
46 Ok(())
47 }
48 fn on_tip_reached(&self) -> Result<(), Box<dyn std::error::Error>> {
49 log::debug!("tip was reached");
50 Ok(())
51 }
52}
53
54#[derive(Debug)]
55pub struct NoopObserver {}
56
57impl<C> Observer<C> for NoopObserver where C: Debug {}
58
59#[derive(Debug)]
60pub struct Consumer<C, O>
61where
62 O: Observer<C>,
63 C: Debug,
64{
65 pub state: State,
66 pub known_points: Vec<Point>,
67 pub cursor: Option<Point>,
68 pub tip: Option<Tip>,
69
70 observer: O,
71
72 _phantom: Option<C>,
74}
75
76impl<C, O> Consumer<C, O>
77where
78 C: BlockLike + EncodePayload + DecodePayload + Debug,
79 O: Observer<C>,
80{
81 pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
82 Self {
83 state: State::Idle,
84 cursor: None,
85 tip: None,
86 known_points,
87 observer,
88
89 _phantom: None,
90 }
91 }
92
93 fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
94 let msg = Message::<C>::FindIntersect(self.known_points.clone());
95
96 tx.send_msg(&msg)?;
97
98 Ok(Self {
99 state: State::Intersect,
100 ..self
101 })
102 }
103
104 fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
105 let msg = Message::<C>::RequestNext;
106
107 tx.send_msg(&msg)?;
108
109 Ok(Self {
110 state: State::CanAwait,
111 ..self
112 })
113 }
114
115 fn on_intersect_found(self, point: Point, tip: Tip) -> Transition<Self> {
116 debug!("intersect found: {:?} (tip: {:?})", point, tip);
117
118 self.observer.on_intersect_found(&point, &tip)?;
119
120 Ok(Self {
121 tip: Some(tip),
122 cursor: Some(point),
123 state: State::Idle,
124 ..self
125 })
126 }
127
128 fn on_intersect_not_found(self, tip: Tip) -> Transition<Self> {
129 debug!("intersect not found (tip: {:?})", tip);
130
131 Ok(Self {
132 tip: Some(tip),
133 cursor: None,
134 state: State::Done,
135 ..self
136 })
137 }
138
139 fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
140 debug!("rolling forward");
141
142 let point = content.block_point()?;
143
144 if log_enabled!(log::Level::Trace) {
145 trace!("content: {:?}", content);
146 }
147
148 debug!("reporting block to observer");
149 self.observer.on_block(&self.cursor, &content)?;
150
151 Ok(Self {
152 cursor: Some(point),
153 tip: Some(tip),
154 state: State::Idle,
155 ..self
156 })
157 }
158
159 fn on_roll_backward(self, point: Point, tip: Tip) -> Transition<Self> {
160 debug!("rolling backward to point: {:?}", point);
161
162 debug!("reporting rollback to observer");
163 self.observer.on_rollback(&point)?;
164
165 Ok(Self {
166 tip: Some(tip),
167 cursor: Some(point),
168 state: State::Idle,
169 ..self
170 })
171 }
172
173 fn on_await_reply(self) -> Transition<Self> {
174 debug!("reached tip, await reply");
175
176 debug!("reporting tip to observer");
177 self.observer.on_tip_reached()?;
178
179 Ok(Self {
180 state: State::MustReply,
181 ..self
182 })
183 }
184}
185
186impl<C, O> Agent for Consumer<C, O>
187where
188 C: BlockLike + EncodePayload + DecodePayload + Debug + 'static,
189 O: Observer<C>,
190{
191 type Message = Message<C>;
192
193 fn is_done(&self) -> bool {
194 self.state == State::Done
195 }
196
197 fn has_agency(&self) -> bool {
198 match self.state {
199 State::Idle => true,
200 State::CanAwait => false,
201 State::MustReply => false,
202 State::Intersect => false,
203 State::Done => false,
204 }
205 }
206
207 fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
208 match self.state {
209 State::Idle => match self.cursor {
210 Some(_) => self.send_request_next(tx),
211 None => self.send_find_intersect(tx),
212 },
213 _ => panic!("I don't have agency, don't know what to do"),
214 }
215 }
216
217 fn receive_next(self, msg: Self::Message) -> Transition<Self> {
218 match (&self.state, msg) {
219 (State::CanAwait, Message::RollForward(header, tip)) => {
220 self.on_roll_forward(header, tip)
221 }
222 (State::CanAwait, Message::RollBackward(point, tip)) => {
223 self.on_roll_backward(point, tip)
224 }
225 (State::CanAwait, Message::AwaitReply) => self.on_await_reply(),
226 (State::MustReply, Message::RollForward(header, tip)) => {
227 self.on_roll_forward(header, tip)
228 }
229 (State::MustReply, Message::RollBackward(point, tip)) => {
230 self.on_roll_backward(point, tip)
231 }
232 (State::Intersect, Message::IntersectFound(point, tip)) => {
233 self.on_intersect_found(point, tip)
234 }
235 (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
236 (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
237 }
238 }
239}
240
241#[derive(Debug)]
242pub struct TipFinder {
243 pub state: State,
244 pub wellknown_point: Point,
245 pub output: Option<Tip>,
246}
247
248impl TipFinder {
249 pub fn initial(wellknown_point: Point) -> Self {
250 TipFinder {
251 wellknown_point,
252 output: None,
253 state: State::Idle,
254 }
255 }
256
257 fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
258 let msg = Message::<NoopContent>::FindIntersect(vec![self.wellknown_point.clone()]);
259
260 tx.send_msg(&msg)?;
261
262 Ok(Self {
263 state: State::Intersect,
264 ..self
265 })
266 }
267
268 fn on_intersect_found(self, tip: Tip) -> Transition<Self> {
269 debug!("intersect found with tip: {:?}", tip);
270
271 Ok(Self {
272 state: State::Done,
273 output: Some(tip),
274 ..self
275 })
276 }
277
278 fn on_intersect_not_found(self, tip: Tip) -> Transition<Self> {
279 debug!("intersect not found but still have a tip: {:?}", tip);
280
281 Ok(Self {
282 state: State::Done,
283 output: Some(tip),
284 ..self
285 })
286 }
287}
288
289#[derive(Debug)]
290pub struct NoopContent {}
291
292impl EncodePayload for NoopContent {
293 fn encode_payload(
294 &self,
295 _e: &mut pallas_machines::PayloadEncoder,
296 ) -> Result<(), Box<dyn std::error::Error>> {
297 todo!()
298 }
299}
300
301impl DecodePayload for NoopContent {
302 fn decode_payload(
303 _d: &mut pallas_machines::PayloadDecoder,
304 ) -> Result<Self, Box<dyn std::error::Error>> {
305 todo!()
306 }
307}
308
309impl BlockLike for NoopContent {
310 fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
311 todo!()
312 }
313}
314
315impl Agent for TipFinder {
316 type Message = Message<NoopContent>;
317
318 fn is_done(&self) -> bool {
319 self.state == State::Done
320 }
321
322 fn has_agency(&self) -> bool {
323 match self.state {
324 State::Idle => true,
325 State::CanAwait => false,
326 State::MustReply => false,
327 State::Intersect => false,
328 State::Done => false,
329 }
330 }
331
332 fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
333 match self.state {
334 State::Idle => self.send_find_intersect(tx),
335 _ => panic!("I don't have agency, don't know what to do"),
336 }
337 }
338
339 fn receive_next(self, msg: Self::Message) -> Transition<Self> {
340 match (&self.state, msg) {
341 (State::Intersect, Message::IntersectFound(_point, tip)) => {
342 self.on_intersect_found(tip)
343 }
344 (State::Intersect, Message::IntersectNotFound(tip)) => self.on_intersect_not_found(tip),
345 (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
346 }
347 }
348}