1use std::fmt::Debug;
2
3use itertools::Itertools;
4use log::debug;
5
6use pallas_machines::{
7 Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
8 PayloadEncoder, Transition,
9};
10
11#[derive(Debug, PartialEq, Clone)]
12pub enum State {
13 Idle,
14 TxIdsNonBlocking,
15 TxIdsBlocking,
16 Txs,
17 Done,
18}
19
20pub type Blocking = bool;
21
22pub type TxCount = u16;
23
24pub type TxSizeInBytes = u32;
25
26pub type TxId = u64;
27
28#[derive(Debug)]
29pub struct TxIdAndSize(TxId, TxSizeInBytes);
30
31impl EncodePayload for TxIdAndSize {
32 fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
33 e.array(2)?;
34 e.u64(self.0)?;
35 e.u32(self.1)?;
36
37 Ok(())
38 }
39}
40
41impl DecodePayload for TxIdAndSize {
42 fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
43 d.array()?;
44 let id = d.u64()?;
45 let size = d.u32()?;
46
47 Ok(Self(id, size))
48 }
49}
50
51pub type TxBody = Vec<u8>;
52
53#[derive(Debug, Clone)]
54pub struct Tx(TxId, TxBody);
55
56impl From<&Tx> for TxIdAndSize {
57 fn from(other: &Tx) -> Self {
58 TxIdAndSize(other.0, other.1.len() as u32)
59 }
60}
61
62#[derive(Debug)]
63pub enum Message {
64 RequestTxIds(Blocking, TxCount, TxCount),
65 ReplyTxIds(Vec<TxIdAndSize>),
66 RequestTxs(Vec<TxId>),
67 ReplyTxs(Vec<TxBody>),
68 Done,
69}
70
71impl EncodePayload for Message {
72 fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
73 match self {
74 Message::RequestTxIds(blocking, ack, req) => {
75 e.array(4)?.u16(0)?;
76 e.bool(*blocking)?;
77 e.u16(*ack)?;
78 e.u16(*req)?;
79 Ok(())
80 }
81 Message::ReplyTxIds(ids) => {
82 e.array(2)?.u16(1)?;
83 e.array(ids.len() as u64)?;
84 for id in ids {
85 id.encode_payload(e)?;
86 }
87 Ok(())
88 }
89 Message::RequestTxs(ids) => {
90 e.array(2)?.u16(2)?;
91 e.array(ids.len() as u64)?;
92 for id in ids {
93 e.u64(*id)?;
94 }
95 Ok(())
96 }
97 Message::ReplyTxs(txs) => {
98 e.array(2)?.u16(3)?;
99 e.array(txs.len() as u64)?;
100 for tx in txs {
101 e.bytes(tx)?;
102 }
103 Ok(())
104 }
105 Message::Done => {
106 e.array(1)?.u16(4)?;
107 Ok(())
108 }
109 }
110 }
111}
112
113impl DecodePayload for Message {
114 fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
115 d.array()?;
116 let label = d.u16()?;
117
118 match label {
119 0 => {
120 let blocking = d.bool()?;
121 let ack = d.u16()?;
122 let req = d.u16()?;
123 Ok(Message::RequestTxIds(blocking, ack, req))
124 }
125 1 => {
126 let items = Vec::<TxIdAndSize>::decode_payload(d)?;
127 Ok(Message::ReplyTxIds(items))
128 }
129 2 => {
130 let ids = d.array_iter::<TxId>()?.try_collect()?;
131 Ok(Message::RequestTxs(ids))
132 }
133 3 => {
134 todo!()
135 }
136 4 => Ok(Message::Done),
137 x => Err(Box::new(CodecError::BadLabel(x))),
138 }
139 }
140}
141
142#[derive(Debug)]
151pub struct NaiveProvider {
152 pub state: State,
153 pub fifo_txs: Vec<Tx>,
154 pub acknowledged_count: usize,
155 pub requested_ids_count: usize,
156 pub requested_txs: Option<Vec<TxId>>,
157}
158
159impl NaiveProvider {
160 pub fn initial(fifo_txs: Vec<Tx>) -> Self {
161 Self {
162 state: State::Idle,
163 acknowledged_count: 0,
164 requested_ids_count: 0,
165 requested_txs: None,
166 fifo_txs,
167 }
168 }
169
170 fn send_done(self, tx: &impl MachineOutput) -> Transition<Self> {
171 let msg = Message::Done;
172
173 tx.send_msg(&msg)?;
174
175 Ok(Self {
176 state: State::Done,
177 ..self
178 })
179 }
180
181 fn send_tx_ids(mut self, tx: &impl MachineOutput) -> Transition<Self> {
182 debug!("draining {} from tx fifo queue", self.acknowledged_count);
183 self.fifo_txs.drain(0..(self.acknowledged_count - 1));
184
185 debug!(
186 "sending next {} tx ids from fifo queue",
187 self.requested_ids_count
188 );
189 let to_send = self.fifo_txs[0..self.requested_ids_count]
190 .iter()
191 .map_into()
192 .collect_vec();
193
194 let msg = Message::ReplyTxIds(to_send);
195 tx.send_msg(&msg)?;
196
197 Ok(Self {
198 state: State::Idle,
199 acknowledged_count: 0,
200 requested_ids_count: 0,
201 ..self
202 })
203 }
204
205 fn send_txs(self, tx: &impl MachineOutput) -> Transition<Self> {
206 let matches = self
207 .fifo_txs
208 .iter()
209 .filter(|Tx(candidate_id, _)| match &self.requested_txs {
210 Some(requested) => requested.iter().contains(candidate_id),
211 None => false,
212 })
213 .map(|Tx(_, body)| body.clone())
214 .collect_vec();
215
216 let msg = Message::ReplyTxs(matches);
217 tx.send_msg(&msg)?;
218
219 Ok(Self {
220 state: State::Idle,
221 requested_txs: None,
222 ..self
223 })
224 }
225
226 fn on_tx_ids_request(
227 self,
228 acknowledged_count: usize,
229 requested_ids_count: usize,
230 ) -> Transition<Self> {
231 debug!(
232 "new tx id request {} (ack: {})",
233 requested_ids_count, acknowledged_count
234 );
235
236 Ok(Self {
237 state: State::Idle,
238 requested_ids_count,
239 acknowledged_count,
240 ..self
241 })
242 }
243
244 fn on_txs_request(self, requested_txs: Vec<TxId>) -> Transition<Self> {
245 debug!("new txs request {:?}", requested_txs,);
246
247 Ok(Self {
248 state: State::Idle,
249 requested_txs: Some(requested_txs),
250 ..self
251 })
252 }
253}
254
255impl Agent for NaiveProvider {
256 type Message = Message;
257
258 fn is_done(&self) -> bool {
259 self.state == State::Done
260 }
261
262 fn has_agency(&self) -> bool {
263 match self.state {
264 State::Idle => false,
265 State::TxIdsNonBlocking => true,
266 State::TxIdsBlocking => true,
267 State::Txs => true,
268 State::Done => false,
269 }
270 }
271
272 fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
273 match self.state {
274 State::TxIdsBlocking => self.send_done(tx),
275 State::TxIdsNonBlocking => self.send_tx_ids(tx),
276 State::Txs => self.send_txs(tx),
277 _ => panic!("I don't have agency, don't know what to do"),
278 }
279 }
280
281 fn receive_next(self, msg: Self::Message) -> Transition<Self> {
282 match (&self.state, msg) {
283 (State::Idle, Message::RequestTxIds(block, ack, req)) if !block => {
284 self.on_tx_ids_request(ack as usize, req as usize)
285 }
286 (State::Idle, Message::RequestTxIds(block, _, _)) if block => Ok(Self {
287 state: State::TxIdsBlocking,
288 ..self
289 }),
290 (State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids),
291 (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
292 }
293 }
294}