kevy_client_async/
pipeline.rs1use std::io;
46use std::time::Duration;
47
48use kevy_resp::Reply;
49
50use crate::conn::AsyncConnection;
51use crate::reply::{vec2, vec3};
52
53#[derive(Default, Clone)]
57pub struct Pipeline {
58 cmds: Vec<Vec<Vec<u8>>>,
59}
60
61impl Pipeline {
62 pub fn new() -> Self {
64 Self::default()
65 }
66
67 pub fn len(&self) -> usize {
69 self.cmds.len()
70 }
71
72 pub fn is_empty(&self) -> bool {
74 self.cmds.is_empty()
75 }
76
77 pub fn push_raw(mut self, argv: Vec<Vec<u8>>) -> Self {
80 self.cmds.push(argv);
81 self
82 }
83
84 pub fn get(mut self, key: &[u8]) -> Self {
88 self.cmds.push(vec2(b"GET", key));
89 self
90 }
91
92 pub fn set(mut self, key: &[u8], value: &[u8]) -> Self {
94 self.cmds.push(vec3(b"SET", key, value));
95 self
96 }
97
98 pub fn set_with_ttl(mut self, key: &[u8], value: &[u8], ttl: Duration) -> Self {
100 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
101 self.cmds.push(vec![
102 b"SET".to_vec(),
103 key.to_vec(),
104 value.to_vec(),
105 b"PX".to_vec(),
106 ms.to_string().into_bytes(),
107 ]);
108 self
109 }
110
111 pub fn del(mut self, keys: &[&[u8]]) -> Self {
113 let mut argv = Vec::with_capacity(keys.len() + 1);
114 argv.push(b"DEL".to_vec());
115 argv.extend(keys.iter().map(|k| k.to_vec()));
116 self.cmds.push(argv);
117 self
118 }
119
120 pub fn exists(mut self, keys: &[&[u8]]) -> Self {
122 let mut argv = Vec::with_capacity(keys.len() + 1);
123 argv.push(b"EXISTS".to_vec());
124 argv.extend(keys.iter().map(|k| k.to_vec()));
125 self.cmds.push(argv);
126 self
127 }
128
129 pub fn incr(mut self, key: &[u8]) -> Self {
131 self.cmds.push(vec2(b"INCR", key));
132 self
133 }
134
135 pub fn incr_by(mut self, key: &[u8], delta: i64) -> Self {
137 self.cmds.push(vec![
138 b"INCRBY".to_vec(),
139 key.to_vec(),
140 delta.to_string().into_bytes(),
141 ]);
142 self
143 }
144
145 pub fn expire(mut self, key: &[u8], ttl: Duration) -> Self {
147 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
148 self.cmds.push(vec![
149 b"PEXPIRE".to_vec(),
150 key.to_vec(),
151 ms.to_string().into_bytes(),
152 ]);
153 self
154 }
155
156 pub fn publish(mut self, channel: &[u8], message: &[u8]) -> Self {
158 self.cmds.push(vec3(b"PUBLISH", channel, message));
159 self
160 }
161
162 pub fn hget(mut self, key: &[u8], field: &[u8]) -> Self {
164 self.cmds.push(vec3(b"HGET", key, field));
165 self
166 }
167
168 pub fn hset(mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> Self {
170 let mut argv = Vec::with_capacity(2 + pairs.len() * 2);
171 argv.push(b"HSET".to_vec());
172 argv.push(key.to_vec());
173 for (f, v) in pairs {
174 argv.push(f.to_vec());
175 argv.push(v.to_vec());
176 }
177 self.cmds.push(argv);
178 self
179 }
180
181 pub fn lpush(mut self, key: &[u8], values: &[&[u8]]) -> Self {
183 let mut argv = Vec::with_capacity(values.len() + 2);
184 argv.push(b"LPUSH".to_vec());
185 argv.push(key.to_vec());
186 argv.extend(values.iter().map(|v| v.to_vec()));
187 self.cmds.push(argv);
188 self
189 }
190
191 pub fn rpush(mut self, key: &[u8], values: &[&[u8]]) -> Self {
193 let mut argv = Vec::with_capacity(values.len() + 2);
194 argv.push(b"RPUSH".to_vec());
195 argv.push(key.to_vec());
196 argv.extend(values.iter().map(|v| v.to_vec()));
197 self.cmds.push(argv);
198 self
199 }
200
201 pub fn sadd(mut self, key: &[u8], members: &[&[u8]]) -> Self {
203 let mut argv = Vec::with_capacity(members.len() + 2);
204 argv.push(b"SADD".to_vec());
205 argv.push(key.to_vec());
206 argv.extend(members.iter().map(|m| m.to_vec()));
207 self.cmds.push(argv);
208 self
209 }
210
211 pub async fn run(self, conn: &mut AsyncConnection) -> io::Result<Vec<Reply>> {
219 if self.cmds.is_empty() {
220 return Ok(Vec::new());
221 }
222 conn.codec_mut().pipeline(&self.cmds).await
223 }
224
225 pub fn into_cmds(self) -> Vec<Vec<Vec<u8>>> {
229 self.cmds
230 }
231}
232
233impl AsyncConnection {
236 pub fn pipeline(&mut self) -> Pipeline {
240 Pipeline::new()
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn builder_accumulates_in_order() {
250 let p = Pipeline::new()
251 .set(b"k", b"v")
252 .get(b"k")
253 .incr(b"counter")
254 .del(&[&b"a"[..], &b"b"[..]]);
255 assert_eq!(p.len(), 4);
256 let cmds = p.into_cmds();
257 assert_eq!(cmds[0][0], b"SET");
258 assert_eq!(cmds[1][0], b"GET");
259 assert_eq!(cmds[2][0], b"INCR");
260 assert_eq!(cmds[3], vec![b"DEL".to_vec(), b"a".to_vec(), b"b".to_vec()]);
261 }
262
263 #[test]
264 fn empty_pipeline_yields_empty_cmds() {
265 let p = Pipeline::new();
266 assert!(p.is_empty());
267 assert_eq!(p.into_cmds(), Vec::<Vec<Vec<u8>>>::new());
268 }
269
270 #[test]
271 fn push_raw_escape_hatch() {
272 let cmds = Pipeline::new()
273 .push_raw(vec![b"CUSTOM".to_vec(), b"arg".to_vec()])
274 .into_cmds();
275 assert_eq!(cmds, vec![vec![b"CUSTOM".to_vec(), b"arg".to_vec()]]);
276 }
277}