layer_client/
transport_intermediate.rs1use crate::InvocationError;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::TcpStream;
14
15pub struct IntermediateTransport {
24 stream: TcpStream,
25 init_sent: bool,
26}
27
28impl IntermediateTransport {
29 pub async fn connect(addr: &str) -> Result<Self, InvocationError> {
31 let stream = TcpStream::connect(addr).await?;
32 Ok(Self {
33 stream,
34 init_sent: false,
35 })
36 }
37
38 pub fn from_stream(stream: TcpStream) -> Self {
40 Self {
41 stream,
42 init_sent: false,
43 }
44 }
45
46 pub async fn send(&mut self, data: &[u8]) -> Result<(), InvocationError> {
48 if !self.init_sent {
49 self.stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
50 self.init_sent = true;
51 }
52 let len = (data.len() as u32).to_le_bytes();
53 self.stream.write_all(&len).await?;
54 self.stream.write_all(data).await?;
55 Ok(())
56 }
57
58 pub async fn recv(&mut self) -> Result<Vec<u8>, InvocationError> {
60 let mut len_buf = [0u8; 4];
61 self.stream.read_exact(&mut len_buf).await?;
62 let len = u32::from_le_bytes(len_buf) as usize;
63 let mut buf = vec![0u8; len];
64 self.stream.read_exact(&mut buf).await?;
65 Ok(buf)
66 }
67
68 pub fn into_inner(self) -> TcpStream {
69 self.stream
70 }
71}
72
73pub struct FullTransport {
87 stream: TcpStream,
88 send_seqno: u32,
89 recv_seqno: u32,
90}
91
92impl FullTransport {
93 pub async fn connect(addr: &str) -> Result<Self, InvocationError> {
94 let stream = TcpStream::connect(addr).await?;
95 Ok(Self {
96 stream,
97 send_seqno: 0,
98 recv_seqno: 0,
99 })
100 }
101
102 pub fn from_stream(stream: TcpStream) -> Self {
103 Self {
104 stream,
105 send_seqno: 0,
106 recv_seqno: 0,
107 }
108 }
109
110 pub async fn send(&mut self, data: &[u8]) -> Result<(), InvocationError> {
112 let total_len = (data.len() + 12) as u32; let seq = self.send_seqno;
114 self.send_seqno = self.send_seqno.wrapping_add(1);
115
116 let mut packet = Vec::with_capacity(total_len as usize);
117 packet.extend_from_slice(&total_len.to_le_bytes());
118 packet.extend_from_slice(&seq.to_le_bytes());
119 packet.extend_from_slice(data);
120
121 let crc = crc32_ieee(&packet);
122 packet.extend_from_slice(&crc.to_le_bytes());
123
124 self.stream.write_all(&packet).await?;
125 Ok(())
126 }
127
128 pub async fn recv(&mut self) -> Result<Vec<u8>, InvocationError> {
130 let mut len_buf = [0u8; 4];
131 self.stream.read_exact(&mut len_buf).await?;
132 let total_len = u32::from_le_bytes(len_buf) as usize;
133 if total_len < 12 {
134 return Err(InvocationError::Deserialize(
135 "Full transport: packet too short".into(),
136 ));
137 }
138 let mut rest = vec![0u8; total_len - 4];
139 self.stream.read_exact(&mut rest).await?;
140
141 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
143 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
144 let mut check_input = len_buf.to_vec();
145 check_input.extend_from_slice(body);
146 let actual_crc = crc32_ieee(&check_input);
147 if actual_crc != expected_crc {
148 return Err(InvocationError::Deserialize(format!(
149 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
150 )));
151 }
152
153 let _recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
155 self.recv_seqno = self.recv_seqno.wrapping_add(1);
156
157 Ok(body[4..].to_vec())
158 }
159
160 pub fn into_inner(self) -> TcpStream {
161 self.stream
162 }
163}
164
165pub(crate) fn crc32_ieee(data: &[u8]) -> u32 {
169 const POLY: u32 = 0xedb88320;
170 let mut crc: u32 = 0xffffffff;
171 for &byte in data {
172 let mut b = byte as u32;
173 for _ in 0..8 {
174 let mix = (crc ^ b) & 1;
175 crc >>= 1;
176 if mix != 0 {
177 crc ^= POLY;
178 }
179 b >>= 1;
180 }
181 }
182 crc ^ 0xffffffff
183}