pallas_txsubmission/
lib.rs

1use std::fmt::Debug;
2
3use itertools::Itertools;
4use log::debug;
5
6use pallas_machines::{
7    Agent, CodecError, DecodePayload, EncodePayload, MachineError, MachineOutput, PayloadDecoder,
8    PayloadEncoder, Transition,
9};
10
11#[derive(Debug, PartialEq, Clone)]
12pub enum State {
13    Idle,
14    TxIdsNonBlocking,
15    TxIdsBlocking,
16    Txs,
17    Done,
18}
19
20pub type Blocking = bool;
21
22pub type TxCount = u16;
23
24pub type TxSizeInBytes = u32;
25
26pub type TxId = u64;
27
28#[derive(Debug)]
29pub struct TxIdAndSize(TxId, TxSizeInBytes);
30
31impl EncodePayload for TxIdAndSize {
32    fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
33        e.array(2)?;
34        e.u64(self.0)?;
35        e.u32(self.1)?;
36
37        Ok(())
38    }
39}
40
41impl DecodePayload for TxIdAndSize {
42    fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
43        d.array()?;
44        let id = d.u64()?;
45        let size = d.u32()?;
46
47        Ok(Self(id, size))
48    }
49}
50
51pub type TxBody = Vec<u8>;
52
53#[derive(Debug, Clone)]
54pub struct Tx(TxId, TxBody);
55
56impl From<&Tx> for TxIdAndSize {
57    fn from(other: &Tx) -> Self {
58        TxIdAndSize(other.0, other.1.len() as u32)
59    }
60}
61
62#[derive(Debug)]
63pub enum Message {
64    RequestTxIds(Blocking, TxCount, TxCount),
65    ReplyTxIds(Vec<TxIdAndSize>),
66    RequestTxs(Vec<TxId>),
67    ReplyTxs(Vec<TxBody>),
68    Done,
69}
70
71impl EncodePayload for Message {
72    fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
73        match self {
74            Message::RequestTxIds(blocking, ack, req) => {
75                e.array(4)?.u16(0)?;
76                e.bool(*blocking)?;
77                e.u16(*ack)?;
78                e.u16(*req)?;
79                Ok(())
80            }
81            Message::ReplyTxIds(ids) => {
82                e.array(2)?.u16(1)?;
83                e.array(ids.len() as u64)?;
84                for id in ids {
85                    id.encode_payload(e)?;
86                }
87                Ok(())
88            }
89            Message::RequestTxs(ids) => {
90                e.array(2)?.u16(2)?;
91                e.array(ids.len() as u64)?;
92                for id in ids {
93                    e.u64(*id)?;
94                }
95                Ok(())
96            }
97            Message::ReplyTxs(txs) => {
98                e.array(2)?.u16(3)?;
99                e.array(txs.len() as u64)?;
100                for tx in txs {
101                    e.bytes(tx)?;
102                }
103                Ok(())
104            }
105            Message::Done => {
106                e.array(1)?.u16(4)?;
107                Ok(())
108            }
109        }
110    }
111}
112
113impl DecodePayload for Message {
114    fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
115        d.array()?;
116        let label = d.u16()?;
117
118        match label {
119            0 => {
120                let blocking = d.bool()?;
121                let ack = d.u16()?;
122                let req = d.u16()?;
123                Ok(Message::RequestTxIds(blocking, ack, req))
124            }
125            1 => {
126                let items = Vec::<TxIdAndSize>::decode_payload(d)?;
127                Ok(Message::ReplyTxIds(items))
128            }
129            2 => {
130                let ids = d.array_iter::<TxId>()?.try_collect()?;
131                Ok(Message::RequestTxs(ids))
132            }
133            3 => {
134                todo!()
135            }
136            4 => Ok(Message::Done),
137            x => Err(Box::new(CodecError::BadLabel(x))),
138        }
139    }
140}
141
142/// A very basic tx provider agent with a fixed set of tx to submit
143///
144/// This provider takes a set of tx from a vec as the single, static source of
145/// data to transfer to the consumer. It's main use is for implementing peers
146/// that need to answer to v1 implementations of the Tx-Submission
147/// mini-protocol. Since v1 nodes dont' wait for a 'Hello' message, the peer
148/// needs to be prepared to receive Tx requests. This naive provider serves as a
149/// good placeholder for those scenarios.
150#[derive(Debug)]
151pub struct NaiveProvider {
152    pub state: State,
153    pub fifo_txs: Vec<Tx>,
154    pub acknowledged_count: usize,
155    pub requested_ids_count: usize,
156    pub requested_txs: Option<Vec<TxId>>,
157}
158
159impl NaiveProvider {
160    pub fn initial(fifo_txs: Vec<Tx>) -> Self {
161        Self {
162            state: State::Idle,
163            acknowledged_count: 0,
164            requested_ids_count: 0,
165            requested_txs: None,
166            fifo_txs,
167        }
168    }
169
170    fn send_done(self, tx: &impl MachineOutput) -> Transition<Self> {
171        let msg = Message::Done;
172
173        tx.send_msg(&msg)?;
174
175        Ok(Self {
176            state: State::Done,
177            ..self
178        })
179    }
180
181    fn send_tx_ids(mut self, tx: &impl MachineOutput) -> Transition<Self> {
182        debug!("draining {} from tx fifo queue", self.acknowledged_count);
183        self.fifo_txs.drain(0..(self.acknowledged_count - 1));
184
185        debug!(
186            "sending next {} tx ids from fifo queue",
187            self.requested_ids_count
188        );
189        let to_send = self.fifo_txs[0..self.requested_ids_count]
190            .iter()
191            .map_into()
192            .collect_vec();
193
194        let msg = Message::ReplyTxIds(to_send);
195        tx.send_msg(&msg)?;
196
197        Ok(Self {
198            state: State::Idle,
199            acknowledged_count: 0,
200            requested_ids_count: 0,
201            ..self
202        })
203    }
204
205    fn send_txs(self, tx: &impl MachineOutput) -> Transition<Self> {
206        let matches = self
207            .fifo_txs
208            .iter()
209            .filter(|Tx(candidate_id, _)| match &self.requested_txs {
210                Some(requested) => requested.iter().contains(candidate_id),
211                None => false,
212            })
213            .map(|Tx(_, body)| body.clone())
214            .collect_vec();
215
216        let msg = Message::ReplyTxs(matches);
217        tx.send_msg(&msg)?;
218
219        Ok(Self {
220            state: State::Idle,
221            requested_txs: None,
222            ..self
223        })
224    }
225
226    fn on_tx_ids_request(
227        self,
228        acknowledged_count: usize,
229        requested_ids_count: usize,
230    ) -> Transition<Self> {
231        debug!(
232            "new tx id request {} (ack: {})",
233            requested_ids_count, acknowledged_count
234        );
235
236        Ok(Self {
237            state: State::Idle,
238            requested_ids_count,
239            acknowledged_count,
240            ..self
241        })
242    }
243
244    fn on_txs_request(self, requested_txs: Vec<TxId>) -> Transition<Self> {
245        debug!("new txs request {:?}", requested_txs,);
246
247        Ok(Self {
248            state: State::Idle,
249            requested_txs: Some(requested_txs),
250            ..self
251        })
252    }
253}
254
255impl Agent for NaiveProvider {
256    type Message = Message;
257
258    fn is_done(&self) -> bool {
259        self.state == State::Done
260    }
261
262    fn has_agency(&self) -> bool {
263        match self.state {
264            State::Idle => false,
265            State::TxIdsNonBlocking => true,
266            State::TxIdsBlocking => true,
267            State::Txs => true,
268            State::Done => false,
269        }
270    }
271
272    fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
273        match self.state {
274            State::TxIdsBlocking => self.send_done(tx),
275            State::TxIdsNonBlocking => self.send_tx_ids(tx),
276            State::Txs => self.send_txs(tx),
277            _ => panic!("I don't have agency, don't know what to do"),
278        }
279    }
280
281    fn receive_next(self, msg: Self::Message) -> Transition<Self> {
282        match (&self.state, msg) {
283            (State::Idle, Message::RequestTxIds(block, ack, req)) if !block => {
284                self.on_tx_ids_request(ack as usize, req as usize)
285            }
286            (State::Idle, Message::RequestTxIds(block, _, _)) if block => Ok(Self {
287                state: State::TxIdsBlocking,
288                ..self
289            }),
290            (State::Idle, Message::RequestTxs(ids)) => self.on_txs_request(ids),
291            (_, msg) => Err(MachineError::InvalidMsgForState(self.state, msg).into()),
292        }
293    }
294}