Skip to main content

kevy_client_async/
codec.rs

1//! Async RESP3 codec — state machine mirroring the blocking
2//! [`kevy_resp_client::RespClient`] but with async IO.
3//!
4//! Reuses `kevy_resp::{encode_command, parse_reply}` for the pure parse
5//! / encode logic so the wire format never has two implementations.
6//! What's different here is the IO loop: instead of blocking
7//! `Read::read` we drive the codec on top of any
8//! [`AsyncTransport`][crate::AsyncTransport].
9//!
10//! The state machine is identical to blocking:
11//!
12//! 1. `encode_command(args)` → `write_all` over the transport.
13//! 2. Loop: `parse_reply(&buf)`; on `None` await one `read` chunk and
14//!    extend `buf`; on `Some((reply, used))` drain `used` and return.
15//!
16//! No buffered allocations beyond what blocking does (one growable
17//! `buf` for partial replies + one boxed chunk for reads). Pipelining
18//! reuses this same codec — `run_pipeline` writes N commands in one
19//! batch and reads N replies in sequence (T4.16).
20
21use std::io;
22
23use kevy_resp::{Reply, encode_command, parse_reply};
24
25use crate::transport::{AsyncTransport, read, write_all};
26
27/// Buffered RESP3 codec over an [`AsyncTransport`].
28pub struct AsyncRespCodec<T> {
29    transport: T,
30    buf: Vec<u8>,
31    chunk: Box<[u8]>,
32}
33
34impl<T: AsyncTransport> AsyncRespCodec<T> {
35    /// Wrap a transport. Matches the blocking client's 8 KiB initial
36    /// buffer capacity + 8 KiB read chunk — same memory footprint per
37    /// connection as `RespClient`.
38    pub fn new(transport: T) -> Self {
39        Self {
40            transport,
41            buf: Vec::with_capacity(8192),
42            chunk: vec![0u8; 8192].into_boxed_slice(),
43        }
44    }
45
46    /// Get the underlying transport back (e.g. to swap it or close it
47    /// explicitly).
48    pub fn into_inner(self) -> T {
49        self.transport
50    }
51
52    /// Send one command (`args` = multibulk argv) and await exactly one
53    /// reply. Direct async mirror of `RespClient::request`.
54    pub async fn request(&mut self, args: &[Vec<u8>]) -> io::Result<Reply> {
55        self.send(args).await?;
56        self.read_reply().await
57    }
58
59    /// Encode + write a single command without waiting for a reply.
60    /// Used by [`crate::AsyncSubscriber`]: SUBSCRIBE / PSUBSCRIBE etc.
61    /// don't return replies in the conventional sense — the server
62    /// pushes ack frames that are drained later by `read_reply`.
63    pub async fn send(&mut self, args: &[Vec<u8>]) -> io::Result<()> {
64        let mut out = Vec::new();
65        encode_command(&mut out, args);
66        write_all(&mut self.transport, &out).await?;
67        Ok(())
68    }
69
70    /// Drain one parsed reply from the read buffer, reading more bytes
71    /// from the transport as needed. Pipelining (T4.16) calls this N
72    /// times after a single batched write.
73    pub async fn read_reply(&mut self) -> io::Result<Reply> {
74        // Destructure so the loop can borrow `transport` and `chunk`
75        // disjointly from `buf`.
76        let Self { transport, buf, chunk } = self;
77        loop {
78            match parse_reply(buf) {
79                Ok(Some((reply, used))) => {
80                    buf.drain(..used);
81                    return Ok(reply);
82                }
83                Ok(None) => {}
84                Err(_) => {
85                    return Err(io::Error::new(
86                        io::ErrorKind::InvalidData,
87                        "malformed reply",
88                    ));
89                }
90            }
91            let n = read(transport, &mut chunk[..]).await?;
92            if n == 0 {
93                return Err(io::Error::new(
94                    io::ErrorKind::UnexpectedEof,
95                    "server closed connection",
96                ));
97            }
98            buf.extend_from_slice(&chunk[..n]);
99        }
100    }
101
102    /// Send N commands as one write batch (pipelining), then read N
103    /// replies in declaration order. Single network round-trip if the
104    /// transport supports it.
105    pub async fn pipeline(&mut self, batch: &[Vec<Vec<u8>>]) -> io::Result<Vec<Reply>> {
106        let mut out = Vec::new();
107        for args in batch {
108            encode_command(&mut out, args);
109        }
110        write_all(&mut self.transport, &out).await?;
111
112        let mut replies = Vec::with_capacity(batch.len());
113        for _ in batch {
114            replies.push(self.read_reply().await?);
115        }
116        Ok(replies)
117    }
118}
119
120// ─── Tests ────────────────────────────────────────────────────────────
121//
122// Mock transport + minimal executor (no runtime dep) to verify
123// round-trip without pulling tokio etc. into the test surface.
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::transport::{AsyncRead, AsyncWrite};
129    use core::future::Future;
130    use core::pin::Pin;
131    use core::task::{Context, Poll, Waker};
132
133    /// In-memory transport: pre-loaded `incoming` bytes feed `poll_read`;
134    /// `poll_write` appends to `outgoing`.
135    struct MockTransport {
136        incoming: Vec<u8>,
137        in_cursor: usize,
138        outgoing: Vec<u8>,
139    }
140
141    impl MockTransport {
142        fn new(server_reply: Vec<u8>) -> Self {
143            Self {
144                incoming: server_reply,
145                in_cursor: 0,
146                outgoing: Vec::new(),
147            }
148        }
149    }
150
151    impl AsyncRead for MockTransport {
152        fn poll_read(
153            mut self: Pin<&mut Self>,
154            _cx: &mut Context<'_>,
155            buf: &mut [u8],
156        ) -> Poll<io::Result<usize>> {
157            let remaining = self.incoming.len() - self.in_cursor;
158            let n = remaining.min(buf.len());
159            buf[..n].copy_from_slice(&self.incoming[self.in_cursor..self.in_cursor + n]);
160            self.in_cursor += n;
161            Poll::Ready(Ok(n))
162        }
163    }
164
165    impl AsyncWrite for MockTransport {
166        fn poll_write(
167            mut self: Pin<&mut Self>,
168            _cx: &mut Context<'_>,
169            buf: &[u8],
170        ) -> Poll<io::Result<usize>> {
171            self.outgoing.extend_from_slice(buf);
172            Poll::Ready(Ok(buf.len()))
173        }
174        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
175            Poll::Ready(Ok(()))
176        }
177        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
178            Poll::Ready(Ok(()))
179        }
180    }
181
182    /// Minimal blocking executor — polls once; works for futures that
183    /// never park (our mock returns `Poll::Ready` synchronously). Uses
184    /// `Box::pin` so the test stays safe-only under `forbid(unsafe_code)`.
185    fn block_on<F: Future>(fut: F) -> F::Output {
186        let waker = Waker::noop();
187        let mut cx = Context::from_waker(waker);
188        let mut pinned = Box::pin(fut);
189        match pinned.as_mut().poll(&mut cx) {
190            Poll::Ready(v) => v,
191            Poll::Pending => panic!("mock transport must never park"),
192        }
193    }
194
195    #[test]
196    fn request_sends_command_and_parses_reply() {
197        // Server response: +OK\r\n (simple string)
198        let mock = MockTransport::new(b"+OK\r\n".to_vec());
199        let mut codec = AsyncRespCodec::new(mock);
200        let reply = block_on(codec.request(&[b"PING".to_vec()])).unwrap();
201        match reply {
202            Reply::Simple(s) => assert_eq!(s, b"OK"),
203            other => panic!("expected Simple, got {other:?}"),
204        }
205        // Outgoing wire is RESP multibulk [PING].
206        let mock = codec.into_inner();
207        assert_eq!(mock.outgoing, b"*1\r\n$4\r\nPING\r\n");
208    }
209
210    #[test]
211    fn pipeline_batches_three_commands() {
212        // Three replies concatenated: +A\r\n +B\r\n +C\r\n
213        let mock = MockTransport::new(b"+A\r\n+B\r\n+C\r\n".to_vec());
214        let mut codec = AsyncRespCodec::new(mock);
215        let batch = vec![
216            vec![b"PING".to_vec()],
217            vec![b"PING".to_vec()],
218            vec![b"PING".to_vec()],
219        ];
220        let replies = block_on(codec.pipeline(&batch)).unwrap();
221        assert_eq!(replies.len(), 3);
222        let mock = codec.into_inner();
223        // Single write contained all three encoded commands.
224        let expected: Vec<u8> = b"*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nPING\r\n"
225            .to_vec();
226        assert_eq!(mock.outgoing, expected);
227    }
228}