layer_client/
transport_intermediate.rs1use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::TcpStream;
14use crate::InvocationError;
15
16pub struct IntermediateTransport {
25 stream: TcpStream,
26 init_sent: bool,
27}
28
29impl IntermediateTransport {
30 pub async fn connect(addr: &str) -> Result<Self, InvocationError> {
32 let stream = TcpStream::connect(addr).await?;
33 Ok(Self { stream, init_sent: false })
34 }
35
36 pub fn from_stream(stream: TcpStream) -> Self {
38 Self { stream, init_sent: false }
39 }
40
41 pub async fn send(&mut self, data: &[u8]) -> Result<(), InvocationError> {
43 if !self.init_sent {
44 self.stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
45 self.init_sent = true;
46 }
47 let len = (data.len() as u32).to_le_bytes();
48 self.stream.write_all(&len).await?;
49 self.stream.write_all(data).await?;
50 Ok(())
51 }
52
53 pub async fn recv(&mut self) -> Result<Vec<u8>, InvocationError> {
55 let mut len_buf = [0u8; 4];
56 self.stream.read_exact(&mut len_buf).await?;
57 let len = u32::from_le_bytes(len_buf) as usize;
58 let mut buf = vec![0u8; len];
59 self.stream.read_exact(&mut buf).await?;
60 Ok(buf)
61 }
62
63 pub fn into_inner(self) -> TcpStream { self.stream }
64}
65
66pub struct FullTransport {
80 stream: TcpStream,
81 send_seqno: u32,
82 recv_seqno: u32,
83}
84
85impl FullTransport {
86 pub async fn connect(addr: &str) -> Result<Self, InvocationError> {
87 let stream = TcpStream::connect(addr).await?;
88 Ok(Self { stream, send_seqno: 0, recv_seqno: 0 })
89 }
90
91 pub fn from_stream(stream: TcpStream) -> Self {
92 Self { stream, send_seqno: 0, recv_seqno: 0 }
93 }
94
95 pub async fn send(&mut self, data: &[u8]) -> Result<(), InvocationError> {
97 let total_len = (data.len() + 12) as u32; let seq = self.send_seqno;
99 self.send_seqno = self.send_seqno.wrapping_add(1);
100
101 let mut packet = Vec::with_capacity(total_len as usize);
102 packet.extend_from_slice(&total_len.to_le_bytes());
103 packet.extend_from_slice(&seq.to_le_bytes());
104 packet.extend_from_slice(data);
105
106 let crc = crc32_ieee(&packet);
107 packet.extend_from_slice(&crc.to_le_bytes());
108
109 self.stream.write_all(&packet).await?;
110 Ok(())
111 }
112
113 pub async fn recv(&mut self) -> Result<Vec<u8>, InvocationError> {
115 let mut len_buf = [0u8; 4];
116 self.stream.read_exact(&mut len_buf).await?;
117 let total_len = u32::from_le_bytes(len_buf) as usize;
118 if total_len < 12 {
119 return Err(InvocationError::Deserialize("Full transport: packet too short".into()));
120 }
121 let mut rest = vec![0u8; total_len - 4];
122 self.stream.read_exact(&mut rest).await?;
123
124 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
126 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
127 let mut check_input = len_buf.to_vec();
128 check_input.extend_from_slice(body);
129 let actual_crc = crc32_ieee(&check_input);
130 if actual_crc != expected_crc {
131 return Err(InvocationError::Deserialize(format!(
132 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
133 )));
134 }
135
136 let _recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
138 self.recv_seqno = self.recv_seqno.wrapping_add(1);
139
140 Ok(body[4..].to_vec())
141 }
142
143 pub fn into_inner(self) -> TcpStream { self.stream }
144}
145
146fn crc32_ieee(data: &[u8]) -> u32 {
150 const POLY: u32 = 0xedb88320;
151 let mut crc: u32 = 0xffffffff;
152 for &byte in data {
153 let mut b = byte as u32;
154 for _ in 0..8 {
155 let mix = (crc ^ b) & 1;
156 crc >>= 1;
157 if mix != 0 { crc ^= POLY; }
158 b >>= 1;
159 }
160 }
161 crc ^ 0xffffffff
162}