Skip to main content

ember_client/
pipeline.rs

1//! Command pipeline builder.
2//!
3//! A [`Pipeline`] queues multiple commands and sends them in a single network
4//! round-trip. This dramatically reduces latency when you need to issue many
5//! independent commands in sequence.
6//!
7//! # Example
8//!
9//! ```no_run
10//! use ember_client::{Client, Pipeline};
11//!
12//! #[tokio::main]
13//! async fn main() -> Result<(), ember_client::ClientError> {
14//!     let mut client = Client::connect("127.0.0.1", 6379).await?;
15//!
16//!     let results = client.execute_pipeline(
17//!         Pipeline::new()
18//!             .set("greeting", "hello")
19//!             .get("greeting")
20//!             .incr("visits"),
21//!     ).await?;
22//!
23//!     println!("{} responses", results.len());
24//!     Ok(())
25//! }
26//! ```
27
28use bytes::Bytes;
29use ember_protocol::types::Frame;
30
31/// A batch of commands to be sent in a single network round-trip.
32///
33/// Build the pipeline with the typed builder methods, then execute it
34/// with [`Client::execute_pipeline`].
35pub struct Pipeline {
36    pub(crate) cmds: Vec<Frame>,
37}
38
39impl Pipeline {
40    /// Creates an empty pipeline.
41    pub fn new() -> Self {
42        Self { cmds: Vec::new() }
43    }
44
45    /// Returns the number of queued commands.
46    pub fn len(&self) -> usize {
47        self.cmds.len()
48    }
49
50    /// Returns `true` if no commands have been queued.
51    pub fn is_empty(&self) -> bool {
52        self.cmds.is_empty()
53    }
54
55    /// Queues a raw command from string slices.
56    ///
57    /// Useful for commands not covered by the typed builders.
58    pub fn send(self, args: &[&str]) -> Self {
59        let parts = args
60            .iter()
61            .map(|s| Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())))
62            .collect();
63        self.push(Frame::Array(parts))
64    }
65
66    // --- string commands ---
67
68    /// Queues a `GET key` command.
69    pub fn get(self, key: &str) -> Self {
70        self.push(array2(b"GET", key.as_bytes()))
71    }
72
73    /// Queues a `SET key value` command.
74    pub fn set(self, key: &str, value: impl AsRef<[u8]>) -> Self {
75        self.push(array3(b"SET", key.as_bytes(), value.as_ref()))
76    }
77
78    /// Queues a `DEL key [key ...]` command.
79    pub fn del(self, keys: &[&str]) -> Self {
80        self.push(array_with_keys(b"DEL", keys))
81    }
82
83    /// Queues an `EXPIRE key seconds` command.
84    pub fn expire(self, key: &str, seconds: u64) -> Self {
85        let secs = seconds.to_string();
86        self.push(array3(b"EXPIRE", key.as_bytes(), secs.as_bytes()))
87    }
88
89    /// Queues an `INCR key` command.
90    pub fn incr(self, key: &str) -> Self {
91        self.push(array2(b"INCR", key.as_bytes()))
92    }
93
94    /// Queues a `DECR key` command.
95    pub fn decr(self, key: &str) -> Self {
96        self.push(array2(b"DECR", key.as_bytes()))
97    }
98
99    /// Queues an `INCRBY key delta` command.
100    pub fn incrby(self, key: &str, delta: i64) -> Self {
101        let d = delta.to_string();
102        self.push(array3(b"INCRBY", key.as_bytes(), d.as_bytes()))
103    }
104
105    /// Queues a `PING` command.
106    pub fn ping(self) -> Self {
107        self.push(Frame::Array(vec![Frame::Bulk(Bytes::from_static(b"PING"))]))
108    }
109
110    /// Queues an `EXISTS key [key ...]` command.
111    pub fn exists(self, keys: &[&str]) -> Self {
112        self.push(array_with_keys(b"EXISTS", keys))
113    }
114
115    /// Queues a `TTL key` command.
116    pub fn ttl(self, key: &str) -> Self {
117        self.push(array2(b"TTL", key.as_bytes()))
118    }
119
120    // --- list commands ---
121
122    /// Queues an `LPUSH key value [value ...]` command.
123    pub fn lpush<V: AsRef<[u8]>>(self, key: &str, values: &[V]) -> Self {
124        self.push(array_with_cmd_key_values(b"LPUSH", key, values))
125    }
126
127    /// Queues an `RPUSH key value [value ...]` command.
128    pub fn rpush<V: AsRef<[u8]>>(self, key: &str, values: &[V]) -> Self {
129        self.push(array_with_cmd_key_values(b"RPUSH", key, values))
130    }
131
132    /// Queues an `LPOP key` command.
133    pub fn lpop(self, key: &str) -> Self {
134        self.push(array2(b"LPOP", key.as_bytes()))
135    }
136
137    /// Queues an `RPOP key` command.
138    pub fn rpop(self, key: &str) -> Self {
139        self.push(array2(b"RPOP", key.as_bytes()))
140    }
141
142    /// Queues an `LLEN key` command.
143    pub fn llen(self, key: &str) -> Self {
144        self.push(array2(b"LLEN", key.as_bytes()))
145    }
146
147    // --- hash commands ---
148
149    /// Queues an `HGET key field` command.
150    pub fn hget(self, key: &str, field: &str) -> Self {
151        self.push(array3(b"HGET", key.as_bytes(), field.as_bytes()))
152    }
153
154    /// Queues an `HSET key field value [field value ...]` command.
155    pub fn hset<V: AsRef<[u8]>>(self, key: &str, pairs: &[(&str, V)]) -> Self {
156        let mut parts = Vec::with_capacity(2 + pairs.len() * 2);
157        parts.push(Frame::Bulk(Bytes::from_static(b"HSET")));
158        parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
159        for (field, val) in pairs {
160            parts.push(Frame::Bulk(Bytes::copy_from_slice(field.as_bytes())));
161            parts.push(Frame::Bulk(Bytes::copy_from_slice(val.as_ref())));
162        }
163        self.push(Frame::Array(parts))
164    }
165
166    /// Queues an `HDEL key field [field ...]` command.
167    pub fn hdel(self, key: &str, fields: &[&str]) -> Self {
168        self.push(array_with_key_and_keys(b"HDEL", key, fields))
169    }
170
171    /// Queues an `HINCRBY key field delta` command.
172    pub fn hincrby(self, key: &str, field: &str, delta: i64) -> Self {
173        let d = delta.to_string();
174        self.push(Frame::Array(vec![
175            Frame::Bulk(Bytes::from_static(b"HINCRBY")),
176            Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
177            Frame::Bulk(Bytes::copy_from_slice(field.as_bytes())),
178            Frame::Bulk(Bytes::copy_from_slice(d.as_bytes())),
179        ]))
180    }
181
182    // --- set commands ---
183
184    /// Queues an `SADD key member [member ...]` command.
185    pub fn sadd(self, key: &str, members: &[&str]) -> Self {
186        self.push(array_with_key_and_keys(b"SADD", key, members))
187    }
188
189    /// Queues an `SREM key member [member ...]` command.
190    pub fn srem(self, key: &str, members: &[&str]) -> Self {
191        self.push(array_with_key_and_keys(b"SREM", key, members))
192    }
193
194    /// Queues an `SCARD key` command.
195    pub fn scard(self, key: &str) -> Self {
196        self.push(array2(b"SCARD", key.as_bytes()))
197    }
198
199    // --- sorted set commands ---
200
201    /// Queues a `ZADD key score member [score member ...]` command.
202    pub fn zadd(self, key: &str, members: &[(f64, &str)]) -> Self {
203        let mut parts = Vec::with_capacity(2 + members.len() * 2);
204        parts.push(Frame::Bulk(Bytes::from_static(b"ZADD")));
205        parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
206        for (score, member) in members {
207            let s = score.to_string();
208            parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
209            parts.push(Frame::Bulk(Bytes::copy_from_slice(member.as_bytes())));
210        }
211        self.push(Frame::Array(parts))
212    }
213
214    /// Queues a `ZCARD key` command.
215    pub fn zcard(self, key: &str) -> Self {
216        self.push(array2(b"ZCARD", key.as_bytes()))
217    }
218
219    /// Queues a `ZSCORE key member` command.
220    pub fn zscore(self, key: &str, member: &str) -> Self {
221        self.push(array3(b"ZSCORE", key.as_bytes(), member.as_bytes()))
222    }
223
224    // --- more string commands ---
225
226    /// Queues a `STRLEN key` command.
227    pub fn strlen(self, key: &str) -> Self {
228        self.push(array2(b"STRLEN", key.as_bytes()))
229    }
230
231    /// Queues an `INCRBYFLOAT key delta` command.
232    pub fn incr_by_float(self, key: &str, delta: f64) -> Self {
233        let d = delta.to_string();
234        self.push(array3(b"INCRBYFLOAT", key.as_bytes(), d.as_bytes()))
235    }
236
237    // --- key commands ---
238
239    /// Queues a `TYPE key` command.
240    pub fn key_type(self, key: &str) -> Self {
241        self.push(array2(b"TYPE", key.as_bytes()))
242    }
243
244    /// Queues a `KEYS pattern` command.
245    pub fn keys(self, pattern: &str) -> Self {
246        self.push(array2(b"KEYS", pattern.as_bytes()))
247    }
248
249    /// Queues a `RENAME key newkey` command.
250    pub fn rename(self, key: &str, newkey: &str) -> Self {
251        self.push(array3(b"RENAME", key.as_bytes(), newkey.as_bytes()))
252    }
253
254    /// Queues a `PEXPIRE key millis` command.
255    pub fn pexpire(self, key: &str, millis: u64) -> Self {
256        let ms = millis.to_string();
257        self.push(array3(b"PEXPIRE", key.as_bytes(), ms.as_bytes()))
258    }
259
260    /// Queues an `UNLINK key [key ...]` command.
261    pub fn unlink(self, keys: &[&str]) -> Self {
262        self.push(array_with_keys(b"UNLINK", keys))
263    }
264
265    // --- more hash commands ---
266
267    /// Queues an `HMGET key field [field ...]` command.
268    pub fn hmget(self, key: &str, fields: &[&str]) -> Self {
269        self.push(array_with_key_and_keys(b"HMGET", key, fields))
270    }
271
272    // --- server commands ---
273
274    /// Queues an `ECHO message` command.
275    pub fn echo(self, message: &str) -> Self {
276        self.push(array2(b"ECHO", message.as_bytes()))
277    }
278
279    /// Queues a `PUBLISH channel message` command.
280    pub fn publish(self, channel: &str, message: impl AsRef<[u8]>) -> Self {
281        self.push(array3(b"PUBLISH", channel.as_bytes(), message.as_ref()))
282    }
283
284    // --- internal ---
285
286    fn push(mut self, frame: Frame) -> Self {
287        self.cmds.push(frame);
288        self
289    }
290}
291
292impl Default for Pipeline {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298// --- frame construction helpers ---
299// These exist so each builder method stays a one-liner.
300
301fn array2(cmd: &'static [u8], a: &[u8]) -> Frame {
302    Frame::Array(vec![
303        Frame::Bulk(Bytes::from_static(cmd)),
304        Frame::Bulk(Bytes::copy_from_slice(a)),
305    ])
306}
307
308fn array3(cmd: &'static [u8], a: &[u8], b: &[u8]) -> Frame {
309    Frame::Array(vec![
310        Frame::Bulk(Bytes::from_static(cmd)),
311        Frame::Bulk(Bytes::copy_from_slice(a)),
312        Frame::Bulk(Bytes::copy_from_slice(b)),
313    ])
314}
315
316/// `CMD key1 key2 ...`
317fn array_with_keys(cmd: &'static [u8], keys: &[&str]) -> Frame {
318    let mut parts = Vec::with_capacity(1 + keys.len());
319    parts.push(Frame::Bulk(Bytes::from_static(cmd)));
320    for k in keys {
321        parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
322    }
323    Frame::Array(parts)
324}
325
326/// `CMD key field1 field2 ...`
327fn array_with_key_and_keys(cmd: &'static [u8], key: &str, rest: &[&str]) -> Frame {
328    let mut parts = Vec::with_capacity(2 + rest.len());
329    parts.push(Frame::Bulk(Bytes::from_static(cmd)));
330    parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
331    for s in rest {
332        parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
333    }
334    Frame::Array(parts)
335}
336
337/// `CMD key value1 value2 ...`
338fn array_with_cmd_key_values<V: AsRef<[u8]>>(cmd: &'static [u8], key: &str, values: &[V]) -> Frame {
339    let mut parts = Vec::with_capacity(2 + values.len());
340    parts.push(Frame::Bulk(Bytes::from_static(cmd)));
341    parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
342    for v in values {
343        parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
344    }
345    Frame::Array(parts)
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    fn bulk(b: &[u8]) -> Frame {
353        Frame::Bulk(Bytes::copy_from_slice(b))
354    }
355
356    #[test]
357    fn get_produces_correct_frame() {
358        let pipe = Pipeline::new().get("mykey");
359        assert_eq!(pipe.len(), 1);
360        assert_eq!(
361            pipe.cmds[0],
362            Frame::Array(vec![bulk(b"GET"), bulk(b"mykey")])
363        );
364    }
365
366    #[test]
367    fn del_multiple_keys() {
368        let pipe = Pipeline::new().del(&["a", "b"]);
369        assert_eq!(pipe.len(), 1);
370        match &pipe.cmds[0] {
371            Frame::Array(parts) => {
372                assert_eq!(parts.len(), 3); // DEL + 2 keys
373                assert_eq!(parts[0], bulk(b"DEL"));
374                assert_eq!(parts[1], bulk(b"a"));
375                assert_eq!(parts[2], bulk(b"b"));
376            }
377            other => panic!("expected Array, got {other:?}"),
378        }
379    }
380
381    #[test]
382    fn chaining_accumulates_commands() {
383        let pipe = Pipeline::new()
384            .ping()
385            .get("k1")
386            .set("k2", "v2")
387            .incr("counter");
388        assert_eq!(pipe.len(), 4);
389    }
390
391    #[test]
392    fn empty_pipeline() {
393        let pipe = Pipeline::new();
394        assert!(pipe.is_empty());
395        assert_eq!(pipe.len(), 0);
396    }
397
398    #[test]
399    fn hset_pairs_layout() {
400        let pipe = Pipeline::new().hset("myhash", &[("field1", "val1"), ("field2", "val2")]);
401        match &pipe.cmds[0] {
402            Frame::Array(parts) => {
403                assert_eq!(parts.len(), 6); // HSET + key + 2*(field+val)
404                assert_eq!(parts[0], bulk(b"HSET"));
405                assert_eq!(parts[1], bulk(b"myhash"));
406                assert_eq!(parts[2], bulk(b"field1"));
407                assert_eq!(parts[3], bulk(b"val1"));
408            }
409            other => panic!("expected Array, got {other:?}"),
410        }
411    }
412
413    #[test]
414    fn zadd_score_member_layout() {
415        let pipe = Pipeline::new().zadd("leaderboard", &[(1.5, "alice"), (2.0, "bob")]);
416        match &pipe.cmds[0] {
417            Frame::Array(parts) => {
418                assert_eq!(parts.len(), 6); // ZADD + key + 2*(score+member)
419                assert_eq!(parts[0], bulk(b"ZADD"));
420                assert_eq!(parts[2], bulk(b"1.5"));
421                assert_eq!(parts[3], bulk(b"alice"));
422            }
423            other => panic!("expected Array, got {other:?}"),
424        }
425    }
426}