mco_redis_rs/cmd.rs
1#[cfg(feature = "aio")]
2use futures_util::{
3 task::{Context, Poll},
4 FutureExt, Stream,
5};
6#[cfg(feature = "aio")]
7use std::pin::Pin;
8use std::{fmt, io};
9
10use crate::connection::ConnectionLike;
11use crate::pipeline::Pipeline;
12use crate::types::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
13
14/// An argument to a redis command
15#[derive(Clone)]
16pub enum Arg<D> {
17 /// A normal argument
18 Simple(D),
19 /// A cursor argument created from `cursor_arg()`
20 Cursor,
21}
22
23/// Represents redis commands.
24#[derive(Clone)]
25pub struct Cmd {
26 data: Vec<u8>,
27 // Arg::Simple contains the offset that marks the end of the argument
28 args: Vec<Arg<usize>>,
29 cursor: Option<u64>,
30}
31
32/// Represents a redis iterator.
33pub struct Iter<'a, T: FromRedisValue> {
34 batch: std::vec::IntoIter<T>,
35 cursor: u64,
36 con: &'a mut (dyn ConnectionLike + 'a),
37 cmd: Cmd,
38}
39
40impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
41 type Item = T;
42
43 #[inline]
44 fn next(&mut self) -> Option<T> {
45 // we need to do this in a loop until we produce at least one item
46 // or we find the actual end of the iteration. This is necessary
47 // because with filtering an iterator it is possible that a whole
48 // chunk is not matching the pattern and thus yielding empty results.
49 loop {
50 if let Some(v) = self.batch.next() {
51 return Some(v);
52 };
53 if self.cursor == 0 {
54 return None;
55 }
56
57 let pcmd = unwrap_or!(
58 self.cmd.get_packed_command_with_cursor(self.cursor),
59 return None
60 );
61 let rv = unwrap_or!(self.con.req_packed_command(&pcmd).ok(), return None);
62 let (cur, batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(), return None);
63
64 self.cursor = cur;
65 self.batch = batch.into_iter();
66 }
67 }
68}
69
70#[cfg(feature = "aio")]
71use crate::aio::ConnectionLike as AsyncConnection;
72
73/// Represents a redis iterator that can be used with async connections.
74#[cfg(feature = "aio")]
75pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
76 batch: std::vec::IntoIter<T>,
77 con: &'a mut (dyn AsyncConnection + Send + 'a),
78 cmd: Cmd,
79}
80
81#[cfg(feature = "aio")]
82impl<'a, T: FromRedisValue + 'a> AsyncIter<'a, T> {
83 /// ```rust,no_run
84 /// # use redis::AsyncCommands;
85 /// # async fn scan_set() -> redis::RedisResult<()> {
86 /// # let client = redis::Client::open("redis://127.0.0.1/")?;
87 /// # let mut con = client.get_async_connection().await?;
88 /// con.sadd("my_set", 42i32).await?;
89 /// con.sadd("my_set", 43i32).await?;
90 /// let mut iter: redis::AsyncIter<i32> = con.sscan("my_set").await?;
91 /// while let Some(element) = iter.next_item().await {
92 /// assert!(element == 42 || element == 43);
93 /// }
94 /// # Ok(())
95 /// # }
96 /// ```
97 #[inline]
98 pub async fn next_item(&mut self) -> Option<T> {
99 // we need to do this in a loop until we produce at least one item
100 // or we find the actual end of the iteration. This is necessary
101 // because with filtering an iterator it is possible that a whole
102 // chunk is not matching the pattern and thus yielding empty results.
103 loop {
104 if let Some(v) = self.batch.next() {
105 return Some(v);
106 };
107 if let Some(cursor) = self.cmd.cursor {
108 if cursor == 0 {
109 return None;
110 }
111 } else {
112 return None;
113 }
114
115 let rv = unwrap_or!(
116 self.con.req_packed_command(&self.cmd).await.ok(),
117 return None
118 );
119 let (cur, batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(), return None);
120
121 self.cmd.cursor = Some(cur);
122 self.batch = batch.into_iter();
123 }
124 }
125}
126
127#[cfg(feature = "aio")]
128impl<'a, T: FromRedisValue + Unpin + 'a> Stream for AsyncIter<'a, T> {
129 type Item = T;
130
131 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
132 let this = self.get_mut();
133 let mut future = Box::pin(this.next_item());
134 future.poll_unpin(cx)
135 }
136}
137
138fn countdigits(mut v: usize) -> usize {
139 let mut result = 1;
140 loop {
141 if v < 10 {
142 return result;
143 }
144 if v < 100 {
145 return result + 1;
146 }
147 if v < 1000 {
148 return result + 2;
149 }
150 if v < 10000 {
151 return result + 3;
152 }
153
154 v /= 10000;
155 result += 4;
156 }
157}
158
159#[inline]
160fn bulklen(len: usize) -> usize {
161 1 + countdigits(len) + 2 + len + 2
162}
163
164fn args_len<'a, I>(args: I, cursor: u64) -> usize
165where
166 I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
167{
168 let mut totlen = 1 + countdigits(args.len()) + 2;
169 for item in args {
170 totlen += bulklen(match item {
171 Arg::Cursor => countdigits(cursor as usize),
172 Arg::Simple(val) => val.len(),
173 });
174 }
175 totlen
176}
177
178pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
179 args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
180}
181
182fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
183where
184 I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
185{
186 let mut cmd = Vec::new();
187 write_command_to_vec(&mut cmd, args, cursor);
188 cmd
189}
190
191fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
192where
193 I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
194{
195 let totlen = args_len(args.clone(), cursor);
196
197 cmd.reserve(totlen);
198
199 write_command(cmd, args, cursor).unwrap()
200}
201
202fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
203where
204 I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
205{
206 let mut buf = ::itoa::Buffer::new();
207
208 cmd.write_all(b"*")?;
209 let s = buf.format(args.len());
210 cmd.write_all(s.as_bytes())?;
211 cmd.write_all(b"\r\n")?;
212
213 let mut cursor_bytes = itoa::Buffer::new();
214 for item in args {
215 let bytes = match item {
216 Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
217 Arg::Simple(val) => val,
218 };
219
220 cmd.write_all(b"$")?;
221 let s = buf.format(bytes.len());
222 cmd.write_all(s.as_bytes())?;
223 cmd.write_all(b"\r\n")?;
224
225 cmd.write_all(bytes)?;
226 cmd.write_all(b"\r\n")?;
227 }
228 Ok(())
229}
230
231impl RedisWrite for Cmd {
232 fn write_arg(&mut self, arg: &[u8]) {
233 self.data.extend_from_slice(arg);
234 self.args.push(Arg::Simple(self.data.len()));
235 }
236
237 fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
238 use std::io::Write;
239 write!(self.data, "{}", arg).unwrap();
240 self.args.push(Arg::Simple(self.data.len()));
241 }
242}
243
244impl Default for Cmd {
245 fn default() -> Cmd {
246 Cmd::new()
247 }
248}
249
250/// A command acts as a builder interface to creating encoded redis
251/// requests. This allows you to easiy assemble a packed command
252/// by chaining arguments together.
253///
254/// Basic example:
255///
256/// ```rust
257/// redis::Cmd::new().arg("SET").arg("my_key").arg(42);
258/// ```
259///
260/// There is also a helper function called `cmd` which makes it a
261/// tiny bit shorter:
262///
263/// ```rust
264/// redis::cmd("SET").arg("my_key").arg(42);
265/// ```
266///
267/// Because Rust currently does not have an ideal system
268/// for lifetimes of temporaries, sometimes you need to hold on to
269/// the initially generated command:
270///
271/// ```rust,no_run
272/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
273/// # let mut con = client.get_connection().unwrap();
274/// let mut cmd = redis::cmd("SMEMBERS");
275/// let mut iter : redis::Iter<i32> = cmd.arg("my_set").clone().iter(&mut con).unwrap();
276/// ```
277impl Cmd {
278 /// Creates a new empty command.
279 pub fn new() -> Cmd {
280 Cmd {
281 data: vec![],
282 args: vec![],
283 cursor: None,
284 }
285 }
286
287 /// Appends an argument to the command. The argument passed must
288 /// be a type that implements `ToRedisArgs`. Most primitive types as
289 /// well as vectors of primitive types implement it.
290 ///
291 /// For instance all of the following are valid:
292 ///
293 /// ```rust,no_run
294 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
295 /// # let mut con = client.get_connection().unwrap();
296 /// redis::cmd("SET").arg(&["my_key", "my_value"]);
297 /// redis::cmd("SET").arg("my_key").arg(42);
298 /// redis::cmd("SET").arg("my_key").arg(b"my_value");
299 /// ```
300 #[inline]
301 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
302 arg.write_redis_args(self);
303 self
304 }
305
306 /// Works similar to `arg` but adds a cursor argument. This is always
307 /// an integer and also flips the command implementation to support a
308 /// different mode for the iterators where the iterator will ask for
309 /// another batch of items when the local data is exhausted.
310 ///
311 /// ```rust,no_run
312 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
313 /// # let mut con = client.get_connection().unwrap();
314 /// let mut cmd = redis::cmd("SSCAN");
315 /// let mut iter : redis::Iter<isize> =
316 /// cmd.arg("my_set").cursor_arg(0).clone().iter(&mut con).unwrap();
317 /// for x in iter {
318 /// // do something with the item
319 /// }
320 /// ```
321 #[inline]
322 pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
323 assert!(!self.in_scan_mode());
324 self.cursor = Some(cursor);
325 self.args.push(Arg::Cursor);
326 self
327 }
328
329 /// Returns the packed command as a byte vector.
330 #[inline]
331 pub fn get_packed_command(&self) -> Vec<u8> {
332 let mut cmd = Vec::new();
333 self.write_packed_command(&mut cmd);
334 cmd
335 }
336
337 pub(crate) fn write_packed_command(&self, cmd: &mut Vec<u8>) {
338 write_command_to_vec(cmd, self.args_iter(), self.cursor.unwrap_or(0))
339 }
340
341 pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
342 write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
343 }
344
345 /// Like `get_packed_command` but replaces the cursor with the
346 /// provided value. If the command is not in scan mode, `None`
347 /// is returned.
348 #[inline]
349 fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
350 if !self.in_scan_mode() {
351 None
352 } else {
353 Some(encode_command(self.args_iter(), cursor))
354 }
355 }
356
357 /// Returns true if the command is in scan mode.
358 #[inline]
359 pub fn in_scan_mode(&self) -> bool {
360 self.cursor.is_some()
361 }
362
363 /// Sends the command as query to the connection and converts the
364 /// result to the target redis value. This is the general way how
365 /// you can retrieve data.
366 #[inline]
367 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
368 match con.req_command(self) {
369 Ok(val) => from_redis_value(&val),
370 Err(e) => Err(e),
371 }
372 }
373
374 /// Async version of `query`.
375 #[inline]
376 #[cfg(feature = "aio")]
377 pub async fn query_async<C, T: FromRedisValue>(&self, con: &mut C) -> RedisResult<T>
378 where
379 C: crate::aio::ConnectionLike,
380 {
381 let val = con.req_packed_command(self).await?;
382 from_redis_value(&val)
383 }
384
385 /// Similar to `query()` but returns an iterator over the items of the
386 /// bulk result or iterator. In normal mode this is not in any way more
387 /// efficient than just querying into a `Vec<T>` as it's internally
388 /// implemented as buffering into a vector. This however is useful when
389 /// `cursor_arg` was used in which case the iterator will query for more
390 /// items until the server side cursor is exhausted.
391 ///
392 /// This is useful for commands such as `SSCAN`, `SCAN` and others.
393 ///
394 /// One speciality of this function is that it will check if the response
395 /// looks like a cursor or not and always just looks at the payload.
396 /// This way you can use the function the same for responses in the
397 /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
398 /// tuple of cursor and list).
399 #[inline]
400 pub fn iter<T: FromRedisValue>(self, con: &mut dyn ConnectionLike) -> RedisResult<Iter<'_, T>> {
401 let rv = con.req_command(&self)?;
402
403 let (cursor, batch) = if rv.looks_like_cursor() {
404 from_redis_value::<(u64, Vec<T>)>(&rv)?
405 } else {
406 (0, from_redis_value(&rv)?)
407 };
408
409 Ok(Iter {
410 batch: batch.into_iter(),
411 cursor,
412 con,
413 cmd: self,
414 })
415 }
416
417 /// Similar to `iter()` but returns an AsyncIter over the items of the
418 /// bulk result or iterator. A [futures::Stream](https://docs.rs/futures/0.3.3/futures/stream/trait.Stream.html)
419 /// can be obtained by calling `stream()` on the AsyncIter. In normal mode this is not in any way more
420 /// efficient than just querying into a `Vec<T>` as it's internally
421 /// implemented as buffering into a vector. This however is useful when
422 /// `cursor_arg` was used in which case the stream will query for more
423 /// items until the server side cursor is exhausted.
424 ///
425 /// This is useful for commands such as `SSCAN`, `SCAN` and others in async contexts.
426 ///
427 /// One speciality of this function is that it will check if the response
428 /// looks like a cursor or not and always just looks at the payload.
429 /// This way you can use the function the same for responses in the
430 /// format of `KEYS` (just a list) as well as `SSCAN` (which returns a
431 /// tuple of cursor and list).
432 #[cfg(feature = "aio")]
433 #[inline]
434 pub async fn iter_async<'a, T: FromRedisValue + 'a>(
435 mut self,
436 con: &'a mut (dyn AsyncConnection + Send),
437 ) -> RedisResult<AsyncIter<'a, T>> {
438 let rv = con.req_packed_command(&self).await?;
439
440 let (cursor, batch) = if rv.looks_like_cursor() {
441 from_redis_value::<(u64, Vec<T>)>(&rv)?
442 } else {
443 (0, from_redis_value(&rv)?)
444 };
445 if cursor == 0 {
446 self.cursor = None;
447 } else {
448 self.cursor = Some(cursor);
449 }
450
451 Ok(AsyncIter {
452 batch: batch.into_iter(),
453 con,
454 cmd: self,
455 })
456 }
457
458 /// This is a shortcut to `query()` that does not return a value and
459 /// will fail the task if the query fails because of an error. This is
460 /// mainly useful in examples and for simple commands like setting
461 /// keys.
462 ///
463 /// This is equivalent to a call of query like this:
464 ///
465 /// ```rust,no_run
466 /// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
467 /// # let mut con = client.get_connection().unwrap();
468 /// let _ : () = redis::cmd("PING").query(&mut con).unwrap();
469 /// ```
470 #[inline]
471 pub fn execute(&self, con: &mut dyn ConnectionLike) {
472 self.query::<()>(con).unwrap();
473 }
474
475 /// Returns an iterator over the arguments in this command (including the command name itself)
476 pub fn args_iter(&self) -> impl Iterator<Item = Arg<&[u8]>> + Clone + ExactSizeIterator {
477 let mut prev = 0;
478 self.args.iter().map(move |arg| match *arg {
479 Arg::Simple(i) => {
480 let arg = Arg::Simple(&self.data[prev..i]);
481 prev = i;
482 arg
483 }
484
485 Arg::Cursor => Arg::Cursor,
486 })
487 }
488
489 // Get a reference to the argument at `idx`
490 #[cfg(feature = "cluster")]
491 pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
492 if idx >= self.args.len() {
493 return None;
494 }
495
496 let start = if idx == 0 {
497 0
498 } else {
499 match self.args[idx - 1] {
500 Arg::Simple(n) => n,
501 _ => 0,
502 }
503 };
504 let end = match self.args[idx] {
505 Arg::Simple(n) => n,
506 _ => 0,
507 };
508 if start == 0 && end == 0 {
509 return None;
510 }
511 Some(&self.data[start..end])
512 }
513}
514
515/// Shortcut function to creating a command with a single argument.
516///
517/// The first argument of a redis command is always the name of the command
518/// which needs to be a string. This is the recommended way to start a
519/// command pipe.
520///
521/// ```rust
522/// redis::cmd("PING");
523/// ```
524pub fn cmd(name: &str) -> Cmd {
525 let mut rv = Cmd::new();
526 rv.arg(name);
527 rv
528}
529
530/// Packs a bunch of commands into a request. This is generally a quite
531/// useless function as this functionality is nicely wrapped through the
532/// `Cmd` object, but in some cases it can be useful. The return value
533/// of this can then be send to the low level `ConnectionLike` methods.
534///
535/// Example:
536///
537/// ```rust
538/// # use redis::ToRedisArgs;
539/// let mut args = vec![];
540/// args.extend("SET".to_redis_args());
541/// args.extend("my_key".to_redis_args());
542/// args.extend(42.to_redis_args());
543/// let cmd = redis::pack_command(&args);
544/// assert_eq!(cmd, b"*3\r\n$3\r\nSET\r\n$6\r\nmy_key\r\n$2\r\n42\r\n".to_vec());
545/// ```
546pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
547 encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
548}
549
550/// Shortcut for creating a new pipeline.
551pub fn pipe() -> Pipeline {
552 Pipeline::new()
553}
554
555#[cfg(test)]
556#[cfg(feature = "cluster")]
557mod tests {
558 use super::Cmd;
559
560 #[test]
561 fn test_cmd_arg_idx() {
562 let mut c = Cmd::new();
563 assert_eq!(c.arg_idx(0), None);
564
565 c.arg("SET");
566 assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
567 assert_eq!(c.arg_idx(1), None);
568
569 c.arg("foo").arg("42");
570 assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
571 assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
572 assert_eq!(c.arg_idx(3), None);
573 assert_eq!(c.arg_idx(4), None);
574 }
575}