oxur_repl/transport/
traits.rs

1// Transport abstraction for REPL communication
2//
3// Provides trait-based abstraction over different connection types:
4// - TCP sockets (network communication)
5// - Unix domain sockets (local IPC)
6// - Named pipes (Windows IPC)
7// - In-process channels (same-process communication)
8
9use crate::protocol::{PostcardCodec, Request, Response};
10use std::io;
11use thiserror::Error;
12use tokio::io::{AsyncRead, AsyncWrite};
13
14/// Transport-level errors
15#[derive(Debug, Error)]
16pub enum TransportError {
17    /// I/O error during transport operation
18    #[error("I/O error: {0}")]
19    Io(#[from] io::Error),
20
21    /// Codec error during message encoding/decoding
22    #[error("Codec error: {0}")]
23    Codec(#[from] crate::protocol::CodecError),
24
25    /// Connection closed by peer
26    #[error("Connection closed")]
27    ConnectionClosed,
28
29    /// Connection failed
30    #[error("Connection failed: {0}")]
31    ConnectionFailed(String),
32
33    /// Timeout waiting for operation
34    #[error("Operation timed out")]
35    Timeout,
36}
37
38pub type Result<T> = std::result::Result<T, TransportError>;
39
40/// Transport abstraction for bidirectional communication
41///
42/// This trait abstracts over different connection types (TCP, Unix sockets,
43/// named pipes, in-process channels) to provide a uniform API for sending
44/// and receiving REPL messages.
45#[async_trait::async_trait]
46pub trait Transport: Send + Sync {
47    /// Send a Request to the remote peer
48    ///
49    /// # Errors
50    ///
51    /// Returns error if serialization fails or connection is broken.
52    async fn send_request(&mut self, request: &Request) -> Result<()>;
53
54    /// Send a Response to the remote peer
55    ///
56    /// # Errors
57    ///
58    /// Returns error if serialization fails or connection is broken.
59    async fn send_response(&mut self, response: &Response) -> Result<()>;
60
61    /// Receive a Request from the remote peer
62    ///
63    /// # Errors
64    ///
65    /// Returns error if deserialization fails or connection is broken.
66    async fn recv_request(&mut self) -> Result<Request>;
67
68    /// Receive a Response from the remote peer
69    ///
70    /// # Errors
71    ///
72    /// Returns error if deserialization fails or connection is broken.
73    async fn recv_response(&mut self) -> Result<Response>;
74
75    /// Close the transport connection gracefully
76    async fn close(&mut self) -> Result<()>;
77}
78
79/// Split transport into separate reader and writer halves
80///
81/// Enables full-duplex communication where reading and writing can happen
82/// concurrently in separate tasks.
83pub trait SplitTransport: Transport {
84    /// Reader half of the transport
85    type Reader: TransportReader;
86
87    /// Writer half of the transport
88    type Writer: TransportWriter;
89
90    /// Split transport into independent reader and writer
91    ///
92    /// After splitting, the original transport cannot be used.
93    fn split(self) -> (Self::Reader, Self::Writer);
94}
95
96/// Reader half of a split transport
97#[async_trait::async_trait]
98pub trait TransportReader: Send {
99    /// Receive a Request from the remote peer
100    async fn recv_request(&mut self) -> Result<Request>;
101
102    /// Receive a Response from the remote peer
103    async fn recv_response(&mut self) -> Result<Response>;
104}
105
106/// Writer half of a split transport
107#[async_trait::async_trait]
108pub trait TransportWriter: Send {
109    /// Send a Request to the remote peer
110    async fn send_request(&mut self, request: &Request) -> Result<()>;
111
112    /// Send a Response to the remote peer
113    async fn send_response(&mut self, response: &Response) -> Result<()>;
114
115    /// Flush any buffered data
116    async fn flush(&mut self) -> Result<()>;
117}
118
119/// Helper functions for working with AsyncRead/AsyncWrite streams
120pub(crate) mod helpers {
121    use super::*;
122    use tokio::io::{AsyncReadExt, AsyncWriteExt};
123
124    /// Read a framed message from an async reader
125    ///
126    /// Reads length prefix (4 bytes) then payload.
127    pub async fn read_framed<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Vec<u8>> {
128        // Read 4-byte length prefix
129        let mut len_bytes = [0u8; 4];
130        reader.read_exact(&mut len_bytes).await.map_err(|e| match e.kind() {
131            io::ErrorKind::UnexpectedEof => TransportError::ConnectionClosed,
132            _ => TransportError::Io(e),
133        })?;
134
135        let len = u32::from_le_bytes(len_bytes);
136
137        // Validate length
138        const MAX_MESSAGE_SIZE: u32 = 10 * 1024 * 1024; // 10 MB
139        if len > MAX_MESSAGE_SIZE {
140            return Err(TransportError::Codec(crate::protocol::CodecError::MessageTooLarge(len)));
141        }
142
143        // Read payload
144        let mut payload = vec![0u8; len as usize];
145        reader.read_exact(&mut payload).await.map_err(|e| match e.kind() {
146            io::ErrorKind::UnexpectedEof => TransportError::ConnectionClosed,
147            _ => TransportError::Io(e),
148        })?;
149
150        Ok(payload)
151    }
152
153    /// Write a framed message to an async writer
154    ///
155    /// Writes length prefix (4 bytes) then payload, then flushes.
156    pub async fn write_framed<W: AsyncWrite + Unpin>(writer: &mut W, payload: &[u8]) -> Result<()> {
157        let len = payload.len() as u32;
158
159        // Write length prefix
160        writer.write_all(&len.to_le_bytes()).await?;
161
162        // Write payload
163        writer.write_all(payload).await?;
164
165        // Flush to ensure data is sent
166        writer.flush().await?;
167
168        Ok(())
169    }
170
171    /// Send a Request over an async writer
172    pub async fn send_request<W: AsyncWrite + Unpin>(
173        writer: &mut W,
174        request: &Request,
175    ) -> Result<()> {
176        let payload = PostcardCodec::encode_request(request)?;
177        write_framed(writer, &payload).await
178    }
179
180    /// Send a Response over an async writer
181    pub async fn send_response<W: AsyncWrite + Unpin>(
182        writer: &mut W,
183        response: &Response,
184    ) -> Result<()> {
185        let payload = PostcardCodec::encode_response(response)?;
186        write_framed(writer, &payload).await
187    }
188
189    /// Receive a Request from an async reader
190    pub async fn recv_request<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Request> {
191        let payload = read_framed(reader).await?;
192        Ok(PostcardCodec::decode_request(&payload)?)
193    }
194
195    /// Receive a Response from an async reader
196    pub async fn recv_response<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Response> {
197        let payload = read_framed(reader).await?;
198        Ok(PostcardCodec::decode_response(&payload)?)
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::protocol::{MessageId, Operation, ReplMode, SessionId};
206
207    #[tokio::test]
208    async fn test_send_recv_request() {
209        let (mut client, mut server) = tokio::io::duplex(1024);
210
211        let request = Request {
212            id: MessageId::new(1),
213            session_id: SessionId::new("test-session"),
214            operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
215        };
216
217        // Send in one task, receive in another
218        let send_handle = tokio::spawn(async move {
219            helpers::send_request(&mut client, &request).await.unwrap();
220        });
221
222        let recv_handle = tokio::spawn(async move {
223            let received = helpers::recv_request(&mut server).await.unwrap();
224            assert_eq!(received.id, MessageId::new(1));
225            assert_eq!(received.session_id, SessionId::new("test-session"));
226        });
227
228        send_handle.await.unwrap();
229        recv_handle.await.unwrap();
230    }
231
232    #[tokio::test]
233    async fn test_send_recv_response() {
234        let (mut client, mut server) = tokio::io::duplex(1024);
235
236        let response = Response {
237            request_id: MessageId::new(42),
238            session_id: SessionId::new("test-session"),
239            result: crate::protocol::OperationResult::Success {
240                status: crate::protocol::Status { tier: 1, cached: false, duration_ms: 5 },
241                value: Some("3".to_string()),
242                stdout: None,
243                stderr: None,
244            },
245        };
246
247        let send_handle = tokio::spawn(async move {
248            helpers::send_response(&mut server, &response).await.unwrap();
249        });
250
251        let recv_handle = tokio::spawn(async move {
252            let received = helpers::recv_response(&mut client).await.unwrap();
253            assert_eq!(received.request_id, MessageId::new(42));
254        });
255
256        send_handle.await.unwrap();
257        recv_handle.await.unwrap();
258    }
259
260    #[tokio::test]
261    async fn test_connection_closed() {
262        let (mut client, server) = tokio::io::duplex(64);
263
264        // Close server side
265        drop(server);
266
267        // Try to read from client should fail with ConnectionClosed
268        let result = helpers::recv_request(&mut client).await;
269        assert!(matches!(result, Err(TransportError::ConnectionClosed)));
270    }
271
272    #[tokio::test]
273    async fn test_message_too_large() {
274        // Create a request with huge payload
275        let huge_code = "x".repeat(11 * 1024 * 1024); // 11 MB
276        let request = Request {
277            id: MessageId::new(1),
278            session_id: SessionId::new("test"),
279            operation: Operation::Eval { code: huge_code, mode: ReplMode::Lisp },
280        };
281
282        // Encoding should fail
283        let result = PostcardCodec::encode_request(&request);
284        assert!(result.is_err());
285    }
286}