layer_client/
transport_intermediate.rs1use crate::InvocationError;
30use tokio::io::{AsyncReadExt, AsyncWriteExt};
31use tokio::net::TcpStream;
32
33pub struct IntermediateTransport {
42 stream: TcpStream,
43 init_sent: bool,
44}
45
46impl IntermediateTransport {
47 pub async fn connect(addr: &str) -> Result<Self, InvocationError> {
49 let stream = TcpStream::connect(addr).await?;
50 Ok(Self {
51 stream,
52 init_sent: false,
53 })
54 }
55
56 pub fn from_stream(stream: TcpStream) -> Self {
58 Self {
59 stream,
60 init_sent: false,
61 }
62 }
63
64 pub async fn send(&mut self, data: &[u8]) -> Result<(), InvocationError> {
66 if !self.init_sent {
67 self.stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
68 self.init_sent = true;
69 }
70 let len = (data.len() as u32).to_le_bytes();
71 self.stream.write_all(&len).await?;
72 self.stream.write_all(data).await?;
73 Ok(())
74 }
75
76 pub async fn recv(&mut self) -> Result<Vec<u8>, InvocationError> {
78 let mut len_buf = [0u8; 4];
79 self.stream.read_exact(&mut len_buf).await?;
80 let len = u32::from_le_bytes(len_buf) as usize;
81 let mut buf = vec![0u8; len];
82 self.stream.read_exact(&mut buf).await?;
83 Ok(buf)
84 }
85
86 pub fn into_inner(self) -> TcpStream {
87 self.stream
88 }
89}
90
91pub struct FullTransport {
105 stream: TcpStream,
106 send_seqno: u32,
107 recv_seqno: u32,
108}
109
110impl FullTransport {
111 pub async fn connect(addr: &str) -> Result<Self, InvocationError> {
112 let stream = TcpStream::connect(addr).await?;
113 Ok(Self {
114 stream,
115 send_seqno: 0,
116 recv_seqno: 0,
117 })
118 }
119
120 pub fn from_stream(stream: TcpStream) -> Self {
121 Self {
122 stream,
123 send_seqno: 0,
124 recv_seqno: 0,
125 }
126 }
127
128 pub async fn send(&mut self, data: &[u8]) -> Result<(), InvocationError> {
130 let total_len = (data.len() + 12) as u32; let seq = self.send_seqno;
132 self.send_seqno = self.send_seqno.wrapping_add(1);
133
134 let mut packet = Vec::with_capacity(total_len as usize);
135 packet.extend_from_slice(&total_len.to_le_bytes());
136 packet.extend_from_slice(&seq.to_le_bytes());
137 packet.extend_from_slice(data);
138
139 let crc = crc32_ieee(&packet);
140 packet.extend_from_slice(&crc.to_le_bytes());
141
142 self.stream.write_all(&packet).await?;
143 Ok(())
144 }
145
146 pub async fn recv(&mut self) -> Result<Vec<u8>, InvocationError> {
148 let mut len_buf = [0u8; 4];
149 self.stream.read_exact(&mut len_buf).await?;
150 let total_len = u32::from_le_bytes(len_buf) as usize;
151 if total_len < 12 {
152 return Err(InvocationError::Deserialize(
153 "Full transport: packet too short".into(),
154 ));
155 }
156 let mut rest = vec![0u8; total_len - 4];
157 self.stream.read_exact(&mut rest).await?;
158
159 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
161 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
162 let mut check_input = len_buf.to_vec();
163 check_input.extend_from_slice(body);
164 let actual_crc = crc32_ieee(&check_input);
165 if actual_crc != expected_crc {
166 return Err(InvocationError::Deserialize(format!(
167 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
168 )));
169 }
170
171 let _recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
173 self.recv_seqno = self.recv_seqno.wrapping_add(1);
174
175 Ok(body[4..].to_vec())
176 }
177
178 pub fn into_inner(self) -> TcpStream {
179 self.stream
180 }
181}
182
183pub(crate) fn crc32_ieee(data: &[u8]) -> u32 {
187 const POLY: u32 = 0xedb88320;
188 let mut crc: u32 = 0xffffffff;
189 for &byte in data {
190 let mut b = byte as u32;
191 for _ in 0..8 {
192 let mix = (crc ^ b) & 1;
193 crc >>= 1;
194 if mix != 0 {
195 crc ^= POLY;
196 }
197 b >>= 1;
198 }
199 }
200 crc ^ 0xffffffff
201}