oxur_repl/transport/
traits.rs1use crate::protocol::{PostcardCodec, Request, Response};
10use std::io;
11use thiserror::Error;
12use tokio::io::{AsyncRead, AsyncWrite};
13
14#[derive(Debug, Error)]
16pub enum TransportError {
17 #[error("I/O error: {0}")]
19 Io(#[from] io::Error),
20
21 #[error("Codec error: {0}")]
23 Codec(#[from] crate::protocol::CodecError),
24
25 #[error("Connection closed")]
27 ConnectionClosed,
28
29 #[error("Connection failed: {0}")]
31 ConnectionFailed(String),
32
33 #[error("Operation timed out")]
35 Timeout,
36}
37
38pub type Result<T> = std::result::Result<T, TransportError>;
39
40#[async_trait::async_trait]
46pub trait Transport: Send + Sync {
47 async fn send_request(&mut self, request: &Request) -> Result<()>;
53
54 async fn send_response(&mut self, response: &Response) -> Result<()>;
60
61 async fn recv_request(&mut self) -> Result<Request>;
67
68 async fn recv_response(&mut self) -> Result<Response>;
74
75 async fn close(&mut self) -> Result<()>;
77}
78
79pub trait SplitTransport: Transport {
84 type Reader: TransportReader;
86
87 type Writer: TransportWriter;
89
90 fn split(self) -> (Self::Reader, Self::Writer);
94}
95
96#[async_trait::async_trait]
98pub trait TransportReader: Send {
99 async fn recv_request(&mut self) -> Result<Request>;
101
102 async fn recv_response(&mut self) -> Result<Response>;
104}
105
106#[async_trait::async_trait]
108pub trait TransportWriter: Send {
109 async fn send_request(&mut self, request: &Request) -> Result<()>;
111
112 async fn send_response(&mut self, response: &Response) -> Result<()>;
114
115 async fn flush(&mut self) -> Result<()>;
117}
118
119pub(crate) mod helpers {
121 use super::*;
122 use tokio::io::{AsyncReadExt, AsyncWriteExt};
123
124 pub async fn read_framed<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Vec<u8>> {
128 let mut len_bytes = [0u8; 4];
130 reader.read_exact(&mut len_bytes).await.map_err(|e| match e.kind() {
131 io::ErrorKind::UnexpectedEof => TransportError::ConnectionClosed,
132 _ => TransportError::Io(e),
133 })?;
134
135 let len = u32::from_le_bytes(len_bytes);
136
137 const MAX_MESSAGE_SIZE: u32 = 10 * 1024 * 1024; if len > MAX_MESSAGE_SIZE {
140 return Err(TransportError::Codec(crate::protocol::CodecError::MessageTooLarge(len)));
141 }
142
143 let mut payload = vec![0u8; len as usize];
145 reader.read_exact(&mut payload).await.map_err(|e| match e.kind() {
146 io::ErrorKind::UnexpectedEof => TransportError::ConnectionClosed,
147 _ => TransportError::Io(e),
148 })?;
149
150 Ok(payload)
151 }
152
153 pub async fn write_framed<W: AsyncWrite + Unpin>(writer: &mut W, payload: &[u8]) -> Result<()> {
157 let len = payload.len() as u32;
158
159 writer.write_all(&len.to_le_bytes()).await?;
161
162 writer.write_all(payload).await?;
164
165 writer.flush().await?;
167
168 Ok(())
169 }
170
171 pub async fn send_request<W: AsyncWrite + Unpin>(
173 writer: &mut W,
174 request: &Request,
175 ) -> Result<()> {
176 let payload = PostcardCodec::encode_request(request)?;
177 write_framed(writer, &payload).await
178 }
179
180 pub async fn send_response<W: AsyncWrite + Unpin>(
182 writer: &mut W,
183 response: &Response,
184 ) -> Result<()> {
185 let payload = PostcardCodec::encode_response(response)?;
186 write_framed(writer, &payload).await
187 }
188
189 pub async fn recv_request<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Request> {
191 let payload = read_framed(reader).await?;
192 Ok(PostcardCodec::decode_request(&payload)?)
193 }
194
195 pub async fn recv_response<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Response> {
197 let payload = read_framed(reader).await?;
198 Ok(PostcardCodec::decode_response(&payload)?)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::protocol::{MessageId, Operation, ReplMode, SessionId};
206
207 #[tokio::test]
208 async fn test_send_recv_request() {
209 let (mut client, mut server) = tokio::io::duplex(1024);
210
211 let request = Request {
212 id: MessageId::new(1),
213 session_id: SessionId::new("test-session"),
214 operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
215 };
216
217 let send_handle = tokio::spawn(async move {
219 helpers::send_request(&mut client, &request).await.unwrap();
220 });
221
222 let recv_handle = tokio::spawn(async move {
223 let received = helpers::recv_request(&mut server).await.unwrap();
224 assert_eq!(received.id, MessageId::new(1));
225 assert_eq!(received.session_id, SessionId::new("test-session"));
226 });
227
228 send_handle.await.unwrap();
229 recv_handle.await.unwrap();
230 }
231
232 #[tokio::test]
233 async fn test_send_recv_response() {
234 let (mut client, mut server) = tokio::io::duplex(1024);
235
236 let response = Response {
237 request_id: MessageId::new(42),
238 session_id: SessionId::new("test-session"),
239 result: crate::protocol::OperationResult::Success {
240 status: crate::protocol::Status { tier: 1, cached: false, duration_ms: 5 },
241 value: Some("3".to_string()),
242 stdout: None,
243 stderr: None,
244 },
245 };
246
247 let send_handle = tokio::spawn(async move {
248 helpers::send_response(&mut server, &response).await.unwrap();
249 });
250
251 let recv_handle = tokio::spawn(async move {
252 let received = helpers::recv_response(&mut client).await.unwrap();
253 assert_eq!(received.request_id, MessageId::new(42));
254 });
255
256 send_handle.await.unwrap();
257 recv_handle.await.unwrap();
258 }
259
260 #[tokio::test]
261 async fn test_connection_closed() {
262 let (mut client, server) = tokio::io::duplex(64);
263
264 drop(server);
266
267 let result = helpers::recv_request(&mut client).await;
269 assert!(matches!(result, Err(TransportError::ConnectionClosed)));
270 }
271
272 #[tokio::test]
273 async fn test_message_too_large() {
274 let huge_code = "x".repeat(11 * 1024 * 1024); let request = Request {
277 id: MessageId::new(1),
278 session_id: SessionId::new("test"),
279 operation: Operation::Eval { code: huge_code, mode: ReplMode::Lisp },
280 };
281
282 let result = PostcardCodec::encode_request(&request);
284 assert!(result.is_err());
285 }
286}