cardano_net/packet/
rawchan.rs1use cardano_sdk::protocol::{Protocol, Time};
2use tokio::sync::mpsc;
3use tracing::{debug, error};
4
5use super::frame::{IdAndResponder, PacketBytes};
6use super::ProtocolError;
7
8use cbored::validate::{CborDataMissing, ValidateError, Validator};
9
10pub struct RawChannel {
11 id: u16,
12 writer: mpsc::Sender<PacketBytes>,
13 reader: mpsc::Receiver<Vec<u8>>,
14 buf: Buf,
15 }
20
21#[derive(Clone)]
22pub struct ChannelWriter(pub(crate) mpsc::Sender<Vec<u8>>);
23
24impl ChannelWriter {
25 pub async fn append(&self, inner: Vec<u8>) {
26 self.0.send(inner).await.unwrap();
27 }
28}
29
30struct Buf {
31 buf: Vec<u8>,
32 pos: usize,
33}
34
35impl Buf {
36 pub fn new() -> Self {
37 Self {
38 buf: vec![0_u8; 1024 * 1024],
39 pos: 0,
40 }
41 }
42
43 pub fn len(&self) -> usize {
44 self.pos
45 }
46
47 pub fn append(&mut self, data: &[u8]) {
48 self.buf[self.pos..self.pos + data.len()].copy_from_slice(data);
49 self.pos += data.len()
50 }
51
52 pub fn slice(&self) -> &[u8] {
53 &self.buf[0..self.pos]
54 }
55
56 pub fn consume(&mut self, bytes: usize) {
57 let current = self.buf.len();
58 assert!(current >= bytes);
59 if current == bytes {
60 } else {
62 self.buf.copy_within(bytes..current, 0);
63 }
65 self.pos -= bytes;
66 }
67}
68
69#[test]
70fn buf_works() {
71 let mut b = Buf::new();
72 assert_eq!(b.len(), 0);
73 let data = &[1, 2, 3, 4, 5, 6];
74 b.append(data);
75 assert_eq!(b.slice(), data);
76 b.consume(3);
77 assert_eq!(b.slice(), &data[3..]);
78}
79
80impl RawChannel {
81 pub fn new(
82 id: u16,
83 writer: mpsc::Sender<PacketBytes>,
84 reader: mpsc::Receiver<Vec<u8>>,
85 ) -> Self {
86 RawChannel {
87 id,
88 writer,
89 reader,
90 buf: Buf::new(),
91 }
92 }
93
94 pub async fn rx_cbor<T: cbored::Decode>(&mut self) -> Result<T, ProtocolError> {
95 match self.rx_cbor_loop().await {
96 Err(e) => {
97 error!("rx_cbor failed {}: buf={}", e, self.buf.len(),);
98 Err(e)
99 }
100 Ok(o) => Ok(o),
101 }
102 }
103
104 pub async fn fill_buf(&mut self, expected_bytes: usize) -> Result<(), ProtocolError> {
105 let mut need = expected_bytes;
106 while need > 0 {
107 let data = self.reader.recv().await;
108 match data {
109 None => {
110 return Err(ProtocolError::StreamError {
111 expecting: expected_bytes,
112 got: expected_bytes - need,
113 })
114 }
115 Some(mut dat) => {
116 self.buf.append(&mut dat);
117 need = need.saturating_sub(dat.len())
118 }
119 };
120 }
121 Ok(())
122 }
123
124 pub async fn rx_cbor_loop<T: cbored::Decode>(&mut self) -> Result<T, ProtocolError> {
125 if self.buf.len() == 0 {
126 self.fill_buf(1).await?;
127 }
128
129 loop {
130 let mut validator = Validator::new(self.buf.slice());
131 match validator.next() {
132 Err(ValidateError::DataMissing(datamissing)) => {
133 let CborDataMissing {
134 expecting_bytes,
135 got_bytes,
136 context: _,
137 } = datamissing;
138 debug!(
139 "data missing trying to read: {} but only: {}",
140 expecting_bytes, got_bytes
141 );
142 self.fill_buf(expecting_bytes - got_bytes).await?;
143 }
144 Err(ValidateError::LeadError(e)) => {
145 return Err(ProtocolError::CBORError(
146 cbored::DecodeErrorKind::Custom(format!(
147 "validation error: lead : {:?}",
148 e
149 ))
150 .context::<Self>(),
151 ));
152 }
153 Err(ValidateError::StateError(e)) => {
154 return Err(ProtocolError::CBORError(
155 cbored::DecodeErrorKind::Custom(format!(
156 "validation error: state : {:?}",
157 e
158 ))
159 .context::<Self>(),
160 ));
161 }
162 Ok((slice, validate_processed)) => {
163 let mut reader = cbored::Reader::new(slice.as_ref());
164 let t = reader.decode::<T>()?;
165 self.buf.consume(validate_processed);
166 return Ok(t);
167 }
168 }
169 }
170 }
171
172 pub async fn tx_proto<T: Protocol>(
173 &self,
174 time: Time,
175 respond: bool,
176 t: T,
177 ) -> Result<(), ProtocolError> {
178 assert_eq!(self.id, T::NUMBER);
179 let mut writer = cbored::Writer::new();
180 let idf = IdAndResponder::new(self.id, respond);
181 t.encode(&mut writer);
182 let bytes = writer.finalize();
183 self.writer
184 .send(PacketBytes {
185 time,
186 idf,
187 inner: bytes,
188 })
189 .await
190 .map_err(|_e| ProtocolError::TxSendFailed(T::NUMBER))?;
191 Ok(())
192 }
193}