kevy_client_async/
codec.rs1use std::io;
22
23use kevy_resp::{Reply, encode_command, parse_reply};
24
25use crate::transport::{AsyncTransport, read, write_all};
26
27pub struct AsyncRespCodec<T> {
29 transport: T,
30 buf: Vec<u8>,
31 chunk: Box<[u8]>,
32}
33
34impl<T: AsyncTransport> AsyncRespCodec<T> {
35 pub fn new(transport: T) -> Self {
39 Self {
40 transport,
41 buf: Vec::with_capacity(8192),
42 chunk: vec![0u8; 8192].into_boxed_slice(),
43 }
44 }
45
46 pub fn into_inner(self) -> T {
49 self.transport
50 }
51
52 pub async fn request(&mut self, args: &[Vec<u8>]) -> io::Result<Reply> {
55 self.send(args).await?;
56 self.read_reply().await
57 }
58
59 pub async fn send(&mut self, args: &[Vec<u8>]) -> io::Result<()> {
64 let mut out = Vec::new();
65 encode_command(&mut out, args);
66 write_all(&mut self.transport, &out).await?;
67 Ok(())
68 }
69
70 pub async fn read_reply(&mut self) -> io::Result<Reply> {
74 let Self { transport, buf, chunk } = self;
77 loop {
78 match parse_reply(buf) {
79 Ok(Some((reply, used))) => {
80 buf.drain(..used);
81 return Ok(reply);
82 }
83 Ok(None) => {}
84 Err(_) => {
85 return Err(io::Error::new(
86 io::ErrorKind::InvalidData,
87 "malformed reply",
88 ));
89 }
90 }
91 let n = read(transport, &mut chunk[..]).await?;
92 if n == 0 {
93 return Err(io::Error::new(
94 io::ErrorKind::UnexpectedEof,
95 "server closed connection",
96 ));
97 }
98 buf.extend_from_slice(&chunk[..n]);
99 }
100 }
101
102 pub async fn pipeline(&mut self, batch: &[Vec<Vec<u8>>]) -> io::Result<Vec<Reply>> {
106 let mut out = Vec::new();
107 for args in batch {
108 encode_command(&mut out, args);
109 }
110 write_all(&mut self.transport, &out).await?;
111
112 let mut replies = Vec::with_capacity(batch.len());
113 for _ in batch {
114 replies.push(self.read_reply().await?);
115 }
116 Ok(replies)
117 }
118}
119
120#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::transport::{AsyncRead, AsyncWrite};
129 use core::future::Future;
130 use core::pin::Pin;
131 use core::task::{Context, Poll, Waker};
132
133 struct MockTransport {
136 incoming: Vec<u8>,
137 in_cursor: usize,
138 outgoing: Vec<u8>,
139 }
140
141 impl MockTransport {
142 fn new(server_reply: Vec<u8>) -> Self {
143 Self {
144 incoming: server_reply,
145 in_cursor: 0,
146 outgoing: Vec::new(),
147 }
148 }
149 }
150
151 impl AsyncRead for MockTransport {
152 fn poll_read(
153 mut self: Pin<&mut Self>,
154 _cx: &mut Context<'_>,
155 buf: &mut [u8],
156 ) -> Poll<io::Result<usize>> {
157 let remaining = self.incoming.len() - self.in_cursor;
158 let n = remaining.min(buf.len());
159 buf[..n].copy_from_slice(&self.incoming[self.in_cursor..self.in_cursor + n]);
160 self.in_cursor += n;
161 Poll::Ready(Ok(n))
162 }
163 }
164
165 impl AsyncWrite for MockTransport {
166 fn poll_write(
167 mut self: Pin<&mut Self>,
168 _cx: &mut Context<'_>,
169 buf: &[u8],
170 ) -> Poll<io::Result<usize>> {
171 self.outgoing.extend_from_slice(buf);
172 Poll::Ready(Ok(buf.len()))
173 }
174 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
175 Poll::Ready(Ok(()))
176 }
177 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
178 Poll::Ready(Ok(()))
179 }
180 }
181
182 fn block_on<F: Future>(fut: F) -> F::Output {
186 let waker = Waker::noop();
187 let mut cx = Context::from_waker(waker);
188 let mut pinned = Box::pin(fut);
189 match pinned.as_mut().poll(&mut cx) {
190 Poll::Ready(v) => v,
191 Poll::Pending => panic!("mock transport must never park"),
192 }
193 }
194
195 #[test]
196 fn request_sends_command_and_parses_reply() {
197 let mock = MockTransport::new(b"+OK\r\n".to_vec());
199 let mut codec = AsyncRespCodec::new(mock);
200 let reply = block_on(codec.request(&[b"PING".to_vec()])).unwrap();
201 match reply {
202 Reply::Simple(s) => assert_eq!(s, b"OK"),
203 other => panic!("expected Simple, got {other:?}"),
204 }
205 let mock = codec.into_inner();
207 assert_eq!(mock.outgoing, b"*1\r\n$4\r\nPING\r\n");
208 }
209
210 #[test]
211 fn pipeline_batches_three_commands() {
212 let mock = MockTransport::new(b"+A\r\n+B\r\n+C\r\n".to_vec());
214 let mut codec = AsyncRespCodec::new(mock);
215 let batch = vec![
216 vec![b"PING".to_vec()],
217 vec![b"PING".to_vec()],
218 vec![b"PING".to_vec()],
219 ];
220 let replies = block_on(codec.pipeline(&batch)).unwrap();
221 assert_eq!(replies.len(), 3);
222 let mock = codec.into_inner();
223 let expected: Vec<u8> = b"*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nPING\r\n"
225 .to_vec();
226 assert_eq!(mock.outgoing, expected);
227 }
228}