pallas_machines/
payloads.rs1use super::*;
2
3use log::debug;
4use minicbor::{Decoder, Encoder};
5use pallas_multiplexer::Payload;
6use std::{
7 ops::{Deref, DerefMut},
8 sync::mpsc::Receiver,
9};
10
11pub struct PayloadEncoder<'a>(Encoder<&'a mut Vec<u8>>);
12
13impl<'a> Deref for PayloadEncoder<'a> {
14 type Target = Encoder<&'a mut Vec<u8>>;
15
16 fn deref(&self) -> &Self::Target {
17 &self.0
18 }
19}
20
21impl<'a> DerefMut for PayloadEncoder<'a> {
22 fn deref_mut(&mut self) -> &mut Self::Target {
23 &mut self.0
24 }
25}
26
27impl<'a> PayloadEncoder<'a> {
28 pub fn encode_payload<T: EncodePayload>(
29 &mut self,
30 t: &T,
31 ) -> Result<(), Box<dyn std::error::Error>> {
32 t.encode_payload(self)
33 }
34}
35
36pub struct PayloadDecoder<'a>(pub Decoder<'a>);
37
38impl<'a> Deref for PayloadDecoder<'a> {
39 type Target = Decoder<'a>;
40
41 fn deref(&self) -> &Self::Target {
42 &self.0
43 }
44}
45
46impl<'a> DerefMut for PayloadDecoder<'a> {
47 fn deref_mut(&mut self) -> &mut Self::Target {
48 &mut self.0
49 }
50}
51
52impl<'a> PayloadDecoder<'a> {
53 pub fn decode_payload<T: DecodePayload>(&mut self) -> Result<T, Box<dyn std::error::Error>> {
54 T::decode_payload(self)
55 }
56}
57
58pub trait EncodePayload {
59 fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>>;
60}
61
62pub fn to_payload(data: &dyn EncodePayload) -> Result<Payload, Box<dyn std::error::Error>> {
63 let mut payload = Vec::new();
64 let mut encoder = PayloadEncoder(minicbor::encode::Encoder::new(&mut payload));
65 data.encode_payload(&mut encoder)?;
66
67 Ok(payload)
68}
69
70impl<D> EncodePayload for Vec<D>
71where
72 D: EncodePayload,
73{
74 fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
75 e.array(self.len() as u64)?;
76
77 for item in self {
78 item.encode_payload(e)?;
79 }
80
81 Ok(())
82 }
83}
84
85impl<D> DecodePayload for Vec<D>
86where
87 D: DecodePayload,
88{
89 fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
90 let len = d.array()?.ok_or(CodecError::UnexpectedCbor(
91 "expecting definite-length array",
92 ))? as usize;
93
94 let mut output = Vec::<D>::with_capacity(len);
95
96 #[allow(clippy::needless_range_loop)]
97 for i in 0..(len - 1) {
98 output[i] = D::decode_payload(d)?;
99 }
100
101 Ok(output)
102 }
103}
104
105pub trait DecodePayload: Sized {
106 fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>>;
107}
108
109impl<T: DecodePayload> DecodePayload for Option<T> {
110 fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
111 match d.datatype()? {
112 minicbor::data::Type::Undefined => Ok(None),
113 _ => {
114 let value = d.decode_payload()?;
115 Ok(Some(value))
116 }
117 }
118 }
119}
120
121pub struct PayloadDeconstructor<'a> {
122 pub(crate) rx: &'a mut Receiver<Payload>,
123 pub(crate) remaining: Vec<u8>,
124}
125
126impl<'a> PayloadDeconstructor<'a> {
127 pub fn consume_next_message<T: DecodePayload>(
128 &mut self,
129 ) -> Result<T, Box<dyn std::error::Error>> {
130 if self.remaining.is_empty() {
131 debug!("no remaining payload, fetching next segment");
132 let payload = self.rx.recv()?;
133 self.remaining.extend(payload);
134 }
135
136 let mut decoder = PayloadDecoder(minicbor::Decoder::new(&self.remaining));
137
138 match T::decode_payload(&mut decoder) {
139 Ok(t) => {
140 let new_pos = decoder.position();
141 self.remaining.drain(0..new_pos);
142 debug!("consumed {} from payload buffer", new_pos);
143 Ok(t)
144 }
145 Err(_err) => {
146 debug!("payload incomplete, fetching next segment");
149 let payload = self.rx.recv()?;
150 self.remaining.extend(payload);
151
152 self.consume_next_message::<T>()
153 }
154 }
155 }
156}