pallas_machines/
payloads.rs

1use super::*;
2
3use log::debug;
4use minicbor::{Decoder, Encoder};
5use pallas_multiplexer::Payload;
6use std::{
7    ops::{Deref, DerefMut},
8    sync::mpsc::Receiver,
9};
10
11pub struct PayloadEncoder<'a>(Encoder<&'a mut Vec<u8>>);
12
13impl<'a> Deref for PayloadEncoder<'a> {
14    type Target = Encoder<&'a mut Vec<u8>>;
15
16    fn deref(&self) -> &Self::Target {
17        &self.0
18    }
19}
20
21impl<'a> DerefMut for PayloadEncoder<'a> {
22    fn deref_mut(&mut self) -> &mut Self::Target {
23        &mut self.0
24    }
25}
26
27impl<'a> PayloadEncoder<'a> {
28    pub fn encode_payload<T: EncodePayload>(
29        &mut self,
30        t: &T,
31    ) -> Result<(), Box<dyn std::error::Error>> {
32        t.encode_payload(self)
33    }
34}
35
36pub struct PayloadDecoder<'a>(pub Decoder<'a>);
37
38impl<'a> Deref for PayloadDecoder<'a> {
39    type Target = Decoder<'a>;
40
41    fn deref(&self) -> &Self::Target {
42        &self.0
43    }
44}
45
46impl<'a> DerefMut for PayloadDecoder<'a> {
47    fn deref_mut(&mut self) -> &mut Self::Target {
48        &mut self.0
49    }
50}
51
52impl<'a> PayloadDecoder<'a> {
53    pub fn decode_payload<T: DecodePayload>(&mut self) -> Result<T, Box<dyn std::error::Error>> {
54        T::decode_payload(self)
55    }
56}
57
58pub trait EncodePayload {
59    fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>>;
60}
61
62pub fn to_payload(data: &dyn EncodePayload) -> Result<Payload, Box<dyn std::error::Error>> {
63    let mut payload = Vec::new();
64    let mut encoder = PayloadEncoder(minicbor::encode::Encoder::new(&mut payload));
65    data.encode_payload(&mut encoder)?;
66
67    Ok(payload)
68}
69
70impl<D> EncodePayload for Vec<D>
71where
72    D: EncodePayload,
73{
74    fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
75        e.array(self.len() as u64)?;
76
77        for item in self {
78            item.encode_payload(e)?;
79        }
80
81        Ok(())
82    }
83}
84
85impl<D> DecodePayload for Vec<D>
86where
87    D: DecodePayload,
88{
89    fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
90        let len = d.array()?.ok_or(CodecError::UnexpectedCbor(
91            "expecting definite-length array",
92        ))? as usize;
93
94        let mut output = Vec::<D>::with_capacity(len);
95
96        #[allow(clippy::needless_range_loop)]
97        for i in 0..(len - 1) {
98            output[i] = D::decode_payload(d)?;
99        }
100
101        Ok(output)
102    }
103}
104
105pub trait DecodePayload: Sized {
106    fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>>;
107}
108
109impl<T: DecodePayload> DecodePayload for Option<T> {
110    fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
111        match d.datatype()? {
112            minicbor::data::Type::Undefined => Ok(None),
113            _ => {
114                let value = d.decode_payload()?;
115                Ok(Some(value))
116            }
117        }
118    }
119}
120
121pub struct PayloadDeconstructor<'a> {
122    pub(crate) rx: &'a mut Receiver<Payload>,
123    pub(crate) remaining: Vec<u8>,
124}
125
126impl<'a> PayloadDeconstructor<'a> {
127    pub fn consume_next_message<T: DecodePayload>(
128        &mut self,
129    ) -> Result<T, Box<dyn std::error::Error>> {
130        if self.remaining.is_empty() {
131            debug!("no remaining payload, fetching next segment");
132            let payload = self.rx.recv()?;
133            self.remaining.extend(payload);
134        }
135
136        let mut decoder = PayloadDecoder(minicbor::Decoder::new(&self.remaining));
137
138        match T::decode_payload(&mut decoder) {
139            Ok(t) => {
140                let new_pos = decoder.position();
141                self.remaining.drain(0..new_pos);
142                debug!("consumed {} from payload buffer", new_pos);
143                Ok(t)
144            }
145            Err(_err) => {
146                //TODO: we need to match EndOfInput kind of errors
147
148                debug!("payload incomplete, fetching next segment");
149                let payload = self.rx.recv()?;
150                self.remaining.extend(payload);
151
152                self.consume_next_message::<T>()
153            }
154        }
155    }
156}