1use bytes::Bytes;
29use ember_protocol::types::Frame;
30
31pub struct Pipeline {
36 pub(crate) cmds: Vec<Frame>,
37}
38
39impl Pipeline {
40 pub fn new() -> Self {
42 Self { cmds: Vec::new() }
43 }
44
45 pub fn len(&self) -> usize {
47 self.cmds.len()
48 }
49
50 pub fn is_empty(&self) -> bool {
52 self.cmds.is_empty()
53 }
54
55 pub fn send(self, args: &[&str]) -> Self {
59 let parts = args
60 .iter()
61 .map(|s| Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())))
62 .collect();
63 self.push(Frame::Array(parts))
64 }
65
66 pub fn get(self, key: &str) -> Self {
70 self.push(array2(b"GET", key.as_bytes()))
71 }
72
73 pub fn set(self, key: &str, value: impl AsRef<[u8]>) -> Self {
75 self.push(array3(b"SET", key.as_bytes(), value.as_ref()))
76 }
77
78 pub fn del(self, keys: &[&str]) -> Self {
80 self.push(array_with_keys(b"DEL", keys))
81 }
82
83 pub fn expire(self, key: &str, seconds: u64) -> Self {
85 let secs = seconds.to_string();
86 self.push(array3(b"EXPIRE", key.as_bytes(), secs.as_bytes()))
87 }
88
89 pub fn incr(self, key: &str) -> Self {
91 self.push(array2(b"INCR", key.as_bytes()))
92 }
93
94 pub fn decr(self, key: &str) -> Self {
96 self.push(array2(b"DECR", key.as_bytes()))
97 }
98
99 pub fn incrby(self, key: &str, delta: i64) -> Self {
101 let d = delta.to_string();
102 self.push(array3(b"INCRBY", key.as_bytes(), d.as_bytes()))
103 }
104
105 pub fn ping(self) -> Self {
107 self.push(Frame::Array(vec![Frame::Bulk(Bytes::from_static(b"PING"))]))
108 }
109
110 pub fn exists(self, keys: &[&str]) -> Self {
112 self.push(array_with_keys(b"EXISTS", keys))
113 }
114
115 pub fn ttl(self, key: &str) -> Self {
117 self.push(array2(b"TTL", key.as_bytes()))
118 }
119
120 pub fn lpush<V: AsRef<[u8]>>(self, key: &str, values: &[V]) -> Self {
124 self.push(array_with_cmd_key_values(b"LPUSH", key, values))
125 }
126
127 pub fn rpush<V: AsRef<[u8]>>(self, key: &str, values: &[V]) -> Self {
129 self.push(array_with_cmd_key_values(b"RPUSH", key, values))
130 }
131
132 pub fn lpop(self, key: &str) -> Self {
134 self.push(array2(b"LPOP", key.as_bytes()))
135 }
136
137 pub fn rpop(self, key: &str) -> Self {
139 self.push(array2(b"RPOP", key.as_bytes()))
140 }
141
142 pub fn llen(self, key: &str) -> Self {
144 self.push(array2(b"LLEN", key.as_bytes()))
145 }
146
147 pub fn hget(self, key: &str, field: &str) -> Self {
151 self.push(array3(b"HGET", key.as_bytes(), field.as_bytes()))
152 }
153
154 pub fn hset<V: AsRef<[u8]>>(self, key: &str, pairs: &[(&str, V)]) -> Self {
156 let mut parts = Vec::with_capacity(2 + pairs.len() * 2);
157 parts.push(Frame::Bulk(Bytes::from_static(b"HSET")));
158 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
159 for (field, val) in pairs {
160 parts.push(Frame::Bulk(Bytes::copy_from_slice(field.as_bytes())));
161 parts.push(Frame::Bulk(Bytes::copy_from_slice(val.as_ref())));
162 }
163 self.push(Frame::Array(parts))
164 }
165
166 pub fn hdel(self, key: &str, fields: &[&str]) -> Self {
168 self.push(array_with_key_and_keys(b"HDEL", key, fields))
169 }
170
171 pub fn hincrby(self, key: &str, field: &str, delta: i64) -> Self {
173 let d = delta.to_string();
174 self.push(Frame::Array(vec![
175 Frame::Bulk(Bytes::from_static(b"HINCRBY")),
176 Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())),
177 Frame::Bulk(Bytes::copy_from_slice(field.as_bytes())),
178 Frame::Bulk(Bytes::copy_from_slice(d.as_bytes())),
179 ]))
180 }
181
182 pub fn sadd(self, key: &str, members: &[&str]) -> Self {
186 self.push(array_with_key_and_keys(b"SADD", key, members))
187 }
188
189 pub fn srem(self, key: &str, members: &[&str]) -> Self {
191 self.push(array_with_key_and_keys(b"SREM", key, members))
192 }
193
194 pub fn scard(self, key: &str) -> Self {
196 self.push(array2(b"SCARD", key.as_bytes()))
197 }
198
199 pub fn zadd(self, key: &str, members: &[(f64, &str)]) -> Self {
203 let mut parts = Vec::with_capacity(2 + members.len() * 2);
204 parts.push(Frame::Bulk(Bytes::from_static(b"ZADD")));
205 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
206 for (score, member) in members {
207 let s = score.to_string();
208 parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
209 parts.push(Frame::Bulk(Bytes::copy_from_slice(member.as_bytes())));
210 }
211 self.push(Frame::Array(parts))
212 }
213
214 pub fn zcard(self, key: &str) -> Self {
216 self.push(array2(b"ZCARD", key.as_bytes()))
217 }
218
219 pub fn zscore(self, key: &str, member: &str) -> Self {
221 self.push(array3(b"ZSCORE", key.as_bytes(), member.as_bytes()))
222 }
223
224 pub fn strlen(self, key: &str) -> Self {
228 self.push(array2(b"STRLEN", key.as_bytes()))
229 }
230
231 pub fn incr_by_float(self, key: &str, delta: f64) -> Self {
233 let d = delta.to_string();
234 self.push(array3(b"INCRBYFLOAT", key.as_bytes(), d.as_bytes()))
235 }
236
237 pub fn key_type(self, key: &str) -> Self {
241 self.push(array2(b"TYPE", key.as_bytes()))
242 }
243
244 pub fn keys(self, pattern: &str) -> Self {
246 self.push(array2(b"KEYS", pattern.as_bytes()))
247 }
248
249 pub fn rename(self, key: &str, newkey: &str) -> Self {
251 self.push(array3(b"RENAME", key.as_bytes(), newkey.as_bytes()))
252 }
253
254 pub fn pexpire(self, key: &str, millis: u64) -> Self {
256 let ms = millis.to_string();
257 self.push(array3(b"PEXPIRE", key.as_bytes(), ms.as_bytes()))
258 }
259
260 pub fn unlink(self, keys: &[&str]) -> Self {
262 self.push(array_with_keys(b"UNLINK", keys))
263 }
264
265 pub fn hmget(self, key: &str, fields: &[&str]) -> Self {
269 self.push(array_with_key_and_keys(b"HMGET", key, fields))
270 }
271
272 pub fn echo(self, message: &str) -> Self {
276 self.push(array2(b"ECHO", message.as_bytes()))
277 }
278
279 pub fn publish(self, channel: &str, message: impl AsRef<[u8]>) -> Self {
281 self.push(array3(b"PUBLISH", channel.as_bytes(), message.as_ref()))
282 }
283
284 fn push(mut self, frame: Frame) -> Self {
287 self.cmds.push(frame);
288 self
289 }
290}
291
292impl Default for Pipeline {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298fn array2(cmd: &'static [u8], a: &[u8]) -> Frame {
302 Frame::Array(vec![
303 Frame::Bulk(Bytes::from_static(cmd)),
304 Frame::Bulk(Bytes::copy_from_slice(a)),
305 ])
306}
307
308fn array3(cmd: &'static [u8], a: &[u8], b: &[u8]) -> Frame {
309 Frame::Array(vec![
310 Frame::Bulk(Bytes::from_static(cmd)),
311 Frame::Bulk(Bytes::copy_from_slice(a)),
312 Frame::Bulk(Bytes::copy_from_slice(b)),
313 ])
314}
315
316fn array_with_keys(cmd: &'static [u8], keys: &[&str]) -> Frame {
318 let mut parts = Vec::with_capacity(1 + keys.len());
319 parts.push(Frame::Bulk(Bytes::from_static(cmd)));
320 for k in keys {
321 parts.push(Frame::Bulk(Bytes::copy_from_slice(k.as_bytes())));
322 }
323 Frame::Array(parts)
324}
325
326fn array_with_key_and_keys(cmd: &'static [u8], key: &str, rest: &[&str]) -> Frame {
328 let mut parts = Vec::with_capacity(2 + rest.len());
329 parts.push(Frame::Bulk(Bytes::from_static(cmd)));
330 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
331 for s in rest {
332 parts.push(Frame::Bulk(Bytes::copy_from_slice(s.as_bytes())));
333 }
334 Frame::Array(parts)
335}
336
337fn array_with_cmd_key_values<V: AsRef<[u8]>>(cmd: &'static [u8], key: &str, values: &[V]) -> Frame {
339 let mut parts = Vec::with_capacity(2 + values.len());
340 parts.push(Frame::Bulk(Bytes::from_static(cmd)));
341 parts.push(Frame::Bulk(Bytes::copy_from_slice(key.as_bytes())));
342 for v in values {
343 parts.push(Frame::Bulk(Bytes::copy_from_slice(v.as_ref())));
344 }
345 Frame::Array(parts)
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 fn bulk(b: &[u8]) -> Frame {
353 Frame::Bulk(Bytes::copy_from_slice(b))
354 }
355
356 #[test]
357 fn get_produces_correct_frame() {
358 let pipe = Pipeline::new().get("mykey");
359 assert_eq!(pipe.len(), 1);
360 assert_eq!(
361 pipe.cmds[0],
362 Frame::Array(vec![bulk(b"GET"), bulk(b"mykey")])
363 );
364 }
365
366 #[test]
367 fn del_multiple_keys() {
368 let pipe = Pipeline::new().del(&["a", "b"]);
369 assert_eq!(pipe.len(), 1);
370 match &pipe.cmds[0] {
371 Frame::Array(parts) => {
372 assert_eq!(parts.len(), 3); assert_eq!(parts[0], bulk(b"DEL"));
374 assert_eq!(parts[1], bulk(b"a"));
375 assert_eq!(parts[2], bulk(b"b"));
376 }
377 other => panic!("expected Array, got {other:?}"),
378 }
379 }
380
381 #[test]
382 fn chaining_accumulates_commands() {
383 let pipe = Pipeline::new()
384 .ping()
385 .get("k1")
386 .set("k2", "v2")
387 .incr("counter");
388 assert_eq!(pipe.len(), 4);
389 }
390
391 #[test]
392 fn empty_pipeline() {
393 let pipe = Pipeline::new();
394 assert!(pipe.is_empty());
395 assert_eq!(pipe.len(), 0);
396 }
397
398 #[test]
399 fn hset_pairs_layout() {
400 let pipe = Pipeline::new().hset("myhash", &[("field1", "val1"), ("field2", "val2")]);
401 match &pipe.cmds[0] {
402 Frame::Array(parts) => {
403 assert_eq!(parts.len(), 6); assert_eq!(parts[0], bulk(b"HSET"));
405 assert_eq!(parts[1], bulk(b"myhash"));
406 assert_eq!(parts[2], bulk(b"field1"));
407 assert_eq!(parts[3], bulk(b"val1"));
408 }
409 other => panic!("expected Array, got {other:?}"),
410 }
411 }
412
413 #[test]
414 fn zadd_score_member_layout() {
415 let pipe = Pipeline::new().zadd("leaderboard", &[(1.5, "alice"), (2.0, "bob")]);
416 match &pipe.cmds[0] {
417 Frame::Array(parts) => {
418 assert_eq!(parts.len(), 6); assert_eq!(parts[0], bulk(b"ZADD"));
420 assert_eq!(parts[2], bulk(b"1.5"));
421 assert_eq!(parts[3], bulk(b"alice"));
422 }
423 other => panic!("expected Array, got {other:?}"),
424 }
425 }
426}