cardano_net/packet/
rawchan.rs

1use cardano_sdk::protocol::{Protocol, Time};
2use tokio::sync::mpsc;
3use tracing::{debug, error};
4
5use super::frame::{IdAndResponder, PacketBytes};
6use super::ProtocolError;
7
8use cbored::validate::{CborDataMissing, ValidateError, Validator};
9
10pub struct RawChannel {
11    id: u16,
12    writer: mpsc::Sender<PacketBytes>,
13    reader: mpsc::Receiver<Vec<u8>>,
14    buf: Buf,
15    /*
16    buf: Vec<u8>,
17    read: usize,
18    */
19}
20
21#[derive(Clone)]
22pub struct ChannelWriter(pub(crate) mpsc::Sender<Vec<u8>>);
23
24impl ChannelWriter {
25    pub async fn append(&self, inner: Vec<u8>) {
26        self.0.send(inner).await.unwrap();
27    }
28}
29
30struct Buf {
31    buf: Vec<u8>,
32    pos: usize,
33}
34
35impl Buf {
36    pub fn new() -> Self {
37        Self {
38            buf: vec![0_u8; 1024 * 1024],
39            pos: 0,
40        }
41    }
42
43    pub fn len(&self) -> usize {
44        self.pos
45    }
46
47    pub fn append(&mut self, data: &[u8]) {
48        self.buf[self.pos..self.pos + data.len()].copy_from_slice(data);
49        self.pos += data.len()
50    }
51
52    pub fn slice(&self) -> &[u8] {
53        &self.buf[0..self.pos]
54    }
55
56    pub fn consume(&mut self, bytes: usize) {
57        let current = self.buf.len();
58        assert!(current >= bytes);
59        if current == bytes {
60            //self.buf.truncate(0);
61        } else {
62            self.buf.copy_within(bytes..current, 0);
63            //self.buf.truncate(current - bytes);
64        }
65        self.pos -= bytes;
66    }
67}
68
69#[test]
70fn buf_works() {
71    let mut b = Buf::new();
72    assert_eq!(b.len(), 0);
73    let data = &[1, 2, 3, 4, 5, 6];
74    b.append(data);
75    assert_eq!(b.slice(), data);
76    b.consume(3);
77    assert_eq!(b.slice(), &data[3..]);
78}
79
80impl RawChannel {
81    pub fn new(
82        id: u16,
83        writer: mpsc::Sender<PacketBytes>,
84        reader: mpsc::Receiver<Vec<u8>>,
85    ) -> Self {
86        RawChannel {
87            id,
88            writer,
89            reader,
90            buf: Buf::new(),
91        }
92    }
93
94    pub async fn rx_cbor<T: cbored::Decode>(&mut self) -> Result<T, ProtocolError> {
95        match self.rx_cbor_loop().await {
96            Err(e) => {
97                error!("rx_cbor failed {}: buf={}", e, self.buf.len(),);
98                Err(e)
99            }
100            Ok(o) => Ok(o),
101        }
102    }
103
104    pub async fn fill_buf(&mut self, expected_bytes: usize) -> Result<(), ProtocolError> {
105        let mut need = expected_bytes;
106        while need > 0 {
107            let data = self.reader.recv().await;
108            match data {
109                None => {
110                    return Err(ProtocolError::StreamError {
111                        expecting: expected_bytes,
112                        got: expected_bytes - need,
113                    })
114                }
115                Some(mut dat) => {
116                    self.buf.append(&mut dat);
117                    need = need.saturating_sub(dat.len())
118                }
119            };
120        }
121        Ok(())
122    }
123
124    pub async fn rx_cbor_loop<T: cbored::Decode>(&mut self) -> Result<T, ProtocolError> {
125        if self.buf.len() == 0 {
126            self.fill_buf(1).await?;
127        }
128
129        loop {
130            let mut validator = Validator::new(self.buf.slice());
131            match validator.next() {
132                Err(ValidateError::DataMissing(datamissing)) => {
133                    let CborDataMissing {
134                        expecting_bytes,
135                        got_bytes,
136                        context: _,
137                    } = datamissing;
138                    debug!(
139                        "data missing trying to read: {} but only: {}",
140                        expecting_bytes, got_bytes
141                    );
142                    self.fill_buf(expecting_bytes - got_bytes).await?;
143                }
144                Err(ValidateError::LeadError(e)) => {
145                    return Err(ProtocolError::CBORError(
146                        cbored::DecodeErrorKind::Custom(format!(
147                            "validation error: lead : {:?}",
148                            e
149                        ))
150                        .context::<Self>(),
151                    ));
152                }
153                Err(ValidateError::StateError(e)) => {
154                    return Err(ProtocolError::CBORError(
155                        cbored::DecodeErrorKind::Custom(format!(
156                            "validation error: state : {:?}",
157                            e
158                        ))
159                        .context::<Self>(),
160                    ));
161                }
162                Ok((slice, validate_processed)) => {
163                    let mut reader = cbored::Reader::new(slice.as_ref());
164                    let t = reader.decode::<T>()?;
165                    self.buf.consume(validate_processed);
166                    return Ok(t);
167                }
168            }
169        }
170    }
171
172    pub async fn tx_proto<T: Protocol>(
173        &self,
174        time: Time,
175        respond: bool,
176        t: T,
177    ) -> Result<(), ProtocolError> {
178        assert_eq!(self.id, T::NUMBER);
179        let mut writer = cbored::Writer::new();
180        let idf = IdAndResponder::new(self.id, respond);
181        t.encode(&mut writer);
182        let bytes = writer.finalize();
183        self.writer
184            .send(PacketBytes {
185                time,
186                idf,
187                inner: bytes,
188            })
189            .await
190            .map_err(|_e| ProtocolError::TxSendFailed(T::NUMBER))?;
191        Ok(())
192    }
193}