Skip to main content

kevy_client_async/
pipeline.rs

1//! Pipeline-first sugar — RFC Q4 part b. Where async actually pays
2//! off: one TCP round-trip per batch instead of per command.
3//!
4//! ```ignore
5//! let replies = conn.pipeline()
6//!     .set(b"k1", b"v1")
7//!     .get(b"k2")
8//!     .incr(b"counter")
9//!     .run().await?;
10//! ```
11//!
12//! The builder owns no connection — it just accumulates argv vectors.
13//! `run(&mut conn)` writes all commands as one buffered `write_all`
14//! and then drains N replies in declaration order via the same codec
15//! the rest of the crate uses.
16//!
17//! # Partial-failure semantics (T4.17)
18//!
19//! `run()` returns `Result<Vec<Reply>, io::Error>`. The outer `Err` is
20//! reserved for connection-level failures (transport error, malformed
21//! frame). **Per-command** errors surface inside the `Vec<Reply>` as
22//! `Reply::Error(_)` entries — the rest of the batch is unaffected.
23//! Callers iterate and decide which to ignore vs propagate:
24//!
25//! ```ignore
26//! for (i, r) in replies.iter().enumerate() {
27//!     if let Reply::Error(msg) = r {
28//!         eprintln!("cmd {i}: {}", String::from_utf8_lossy(msg));
29//!     }
30//! }
31//! ```
32//!
33//! # Degrade path (T4.18)
34//!
35//! [`Pipeline::into_cmds`] hands back the raw argv vectors so callers
36//! can feed them into a blocking client one at a time. Same builder,
37//! same accumulated state — only the executor changes.
38//!
39//! ```ignore
40//! let cmds = conn.pipeline().get(b"a").set(b"b", b"v").into_cmds();
41//! // On blocking kevy_client::Connection:
42//! // for cmd in &cmds { blocking_resp.request(cmd)?; }
43//! ```
44
45use std::io;
46use std::time::Duration;
47
48use kevy_resp::Reply;
49
50use crate::conn::AsyncConnection;
51use crate::reply::{vec2, vec3};
52
53/// Accumulating command builder. Created via
54/// [`AsyncConnection::pipeline`]. Owns no connection — that's bound
55/// at [`Pipeline::run`] time.
56#[derive(Default, Clone)]
57pub struct Pipeline {
58    cmds: Vec<Vec<Vec<u8>>>,
59}
60
61impl Pipeline {
62    /// Empty pipeline.
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    /// Number of queued commands.
68    pub fn len(&self) -> usize {
69        self.cmds.len()
70    }
71
72    /// True if no commands are queued yet.
73    pub fn is_empty(&self) -> bool {
74        self.cmds.is_empty()
75    }
76
77    /// Append a pre-built RESP argv. Escape hatch for commands the
78    /// typed builder methods don't cover.
79    pub fn push_raw(mut self, argv: Vec<Vec<u8>>) -> Self {
80        self.cmds.push(argv);
81        self
82    }
83
84    // ── String + generic key commands ─────────────────────────────
85
86    /// Queue `GET key`.
87    pub fn get(mut self, key: &[u8]) -> Self {
88        self.cmds.push(vec2(b"GET", key));
89        self
90    }
91
92    /// Queue `SET key value`.
93    pub fn set(mut self, key: &[u8], value: &[u8]) -> Self {
94        self.cmds.push(vec3(b"SET", key, value));
95        self
96    }
97
98    /// Queue `SET key value PX ttl_ms`.
99    pub fn set_with_ttl(mut self, key: &[u8], value: &[u8], ttl: Duration) -> Self {
100        let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
101        self.cmds.push(vec![
102            b"SET".to_vec(),
103            key.to_vec(),
104            value.to_vec(),
105            b"PX".to_vec(),
106            ms.to_string().into_bytes(),
107        ]);
108        self
109    }
110
111    /// Queue `DEL key [key ...]`.
112    pub fn del(mut self, keys: &[&[u8]]) -> Self {
113        let mut argv = Vec::with_capacity(keys.len() + 1);
114        argv.push(b"DEL".to_vec());
115        argv.extend(keys.iter().map(|k| k.to_vec()));
116        self.cmds.push(argv);
117        self
118    }
119
120    /// Queue `EXISTS key [key ...]`.
121    pub fn exists(mut self, keys: &[&[u8]]) -> Self {
122        let mut argv = Vec::with_capacity(keys.len() + 1);
123        argv.push(b"EXISTS".to_vec());
124        argv.extend(keys.iter().map(|k| k.to_vec()));
125        self.cmds.push(argv);
126        self
127    }
128
129    /// Queue `INCR key`.
130    pub fn incr(mut self, key: &[u8]) -> Self {
131        self.cmds.push(vec2(b"INCR", key));
132        self
133    }
134
135    /// Queue `INCRBY key delta`.
136    pub fn incr_by(mut self, key: &[u8], delta: i64) -> Self {
137        self.cmds.push(vec![
138            b"INCRBY".to_vec(),
139            key.to_vec(),
140            delta.to_string().into_bytes(),
141        ]);
142        self
143    }
144
145    /// Queue `PEXPIRE key ttl_ms`.
146    pub fn expire(mut self, key: &[u8], ttl: Duration) -> Self {
147        let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
148        self.cmds.push(vec![
149            b"PEXPIRE".to_vec(),
150            key.to_vec(),
151            ms.to_string().into_bytes(),
152        ]);
153        self
154    }
155
156    /// Queue `PUBLISH channel message`.
157    pub fn publish(mut self, channel: &[u8], message: &[u8]) -> Self {
158        self.cmds.push(vec3(b"PUBLISH", channel, message));
159        self
160    }
161
162    /// Queue `HGET key field`.
163    pub fn hget(mut self, key: &[u8], field: &[u8]) -> Self {
164        self.cmds.push(vec3(b"HGET", key, field));
165        self
166    }
167
168    /// Queue `HSET key field value [field value ...]`.
169    pub fn hset(mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> Self {
170        let mut argv = Vec::with_capacity(2 + pairs.len() * 2);
171        argv.push(b"HSET".to_vec());
172        argv.push(key.to_vec());
173        for (f, v) in pairs {
174            argv.push(f.to_vec());
175            argv.push(v.to_vec());
176        }
177        self.cmds.push(argv);
178        self
179    }
180
181    /// Queue `LPUSH key value [value ...]`.
182    pub fn lpush(mut self, key: &[u8], values: &[&[u8]]) -> Self {
183        let mut argv = Vec::with_capacity(values.len() + 2);
184        argv.push(b"LPUSH".to_vec());
185        argv.push(key.to_vec());
186        argv.extend(values.iter().map(|v| v.to_vec()));
187        self.cmds.push(argv);
188        self
189    }
190
191    /// Queue `RPUSH key value [value ...]`.
192    pub fn rpush(mut self, key: &[u8], values: &[&[u8]]) -> Self {
193        let mut argv = Vec::with_capacity(values.len() + 2);
194        argv.push(b"RPUSH".to_vec());
195        argv.push(key.to_vec());
196        argv.extend(values.iter().map(|v| v.to_vec()));
197        self.cmds.push(argv);
198        self
199    }
200
201    /// Queue `SADD key member [member ...]`.
202    pub fn sadd(mut self, key: &[u8], members: &[&[u8]]) -> Self {
203        let mut argv = Vec::with_capacity(members.len() + 2);
204        argv.push(b"SADD".to_vec());
205        argv.push(key.to_vec());
206        argv.extend(members.iter().map(|m| m.to_vec()));
207        self.cmds.push(argv);
208        self
209    }
210
211    // ── Execution + escape hatch ──────────────────────────────────
212
213    /// Send the batched commands as one write and drain one reply per
214    /// command in declaration order. Single network round-trip.
215    ///
216    /// Per-command errors land as `Reply::Error(_)` entries in the
217    /// returned vec; outer `Err` means a connection-level failure.
218    pub async fn run(self, conn: &mut AsyncConnection) -> io::Result<Vec<Reply>> {
219        if self.cmds.is_empty() {
220            return Ok(Vec::new());
221        }
222        conn.codec_mut().pipeline(&self.cmds).await
223    }
224
225    /// Hand back the raw argv vectors so the same builder can drive a
226    /// blocking client (or any other RESP transport). See module doc
227    /// for the degrade-path pattern.
228    pub fn into_cmds(self) -> Vec<Vec<Vec<u8>>> {
229        self.cmds
230    }
231}
232
233// ── AsyncConnection entry point ───────────────────────────────────
234
235impl AsyncConnection {
236    /// Open a [`Pipeline`] builder bound to this connection. Chain
237    /// command-queue methods on the returned [`Pipeline`] then call
238    /// `.run(&mut conn).await`.
239    pub fn pipeline(&mut self) -> Pipeline {
240        Pipeline::new()
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn builder_accumulates_in_order() {
250        let p = Pipeline::new()
251            .set(b"k", b"v")
252            .get(b"k")
253            .incr(b"counter")
254            .del(&[&b"a"[..], &b"b"[..]]);
255        assert_eq!(p.len(), 4);
256        let cmds = p.into_cmds();
257        assert_eq!(cmds[0][0], b"SET");
258        assert_eq!(cmds[1][0], b"GET");
259        assert_eq!(cmds[2][0], b"INCR");
260        assert_eq!(cmds[3], vec![b"DEL".to_vec(), b"a".to_vec(), b"b".to_vec()]);
261    }
262
263    #[test]
264    fn empty_pipeline_yields_empty_cmds() {
265        let p = Pipeline::new();
266        assert!(p.is_empty());
267        assert_eq!(p.into_cmds(), Vec::<Vec<Vec<u8>>>::new());
268    }
269
270    #[test]
271    fn push_raw_escape_hatch() {
272        let cmds = Pipeline::new()
273            .push_raw(vec![b"CUSTOM".to_vec(), b"arg".to_vec()])
274            .into_cmds();
275        assert_eq!(cmds, vec![vec![b"CUSTOM".to_vec(), b"arg".to_vec()]]);
276    }
277}