1#[cfg(feature = "aio")]
2use futures_util::{
3 future::BoxFuture,
4 task::{Context, Poll},
5 Stream, StreamExt,
6};
7#[cfg(feature = "aio")]
8use std::pin::Pin;
9#[cfg(feature = "cache-aio")]
10use std::time::Duration;
11use std::{fmt, io};
12
13use crate::connection::ConnectionLike;
14use crate::pipeline::Pipeline;
15use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
16
17#[derive(Clone)]
19pub enum Arg<D> {
20 Simple(D),
22 Cursor,
24}
25
26#[cfg(feature = "cache-aio")]
39#[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
40#[derive(Clone)]
41pub struct CommandCacheConfig {
42 pub(crate) enable_cache: bool,
43 pub(crate) client_side_ttl: Option<Duration>,
44}
45
46#[cfg(feature = "cache-aio")]
47impl CommandCacheConfig {
48 pub fn new() -> Self {
50 Self {
51 enable_cache: true,
52 client_side_ttl: None,
53 }
54 }
55
56 pub fn set_enable_cache(mut self, enable_cache: bool) -> Self {
59 self.enable_cache = enable_cache;
60 self
61 }
62
63 pub fn set_client_side_ttl(mut self, client_side_ttl: Duration) -> Self {
65 self.client_side_ttl = Some(client_side_ttl);
66 self
67 }
68}
69#[cfg(feature = "cache-aio")]
70impl Default for CommandCacheConfig {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76#[derive(Clone)]
78pub struct Cmd {
79 pub(crate) data: Vec<u8>,
80 args: Vec<Arg<usize>>,
82 cursor: Option<u64>,
83 no_response: bool,
85 #[cfg(feature = "cache-aio")]
86 cache: Option<CommandCacheConfig>,
87}
88
89pub struct Iter<'a, T: FromRedisValue> {
91 batch: std::vec::IntoIter<T>,
92 cursor: u64,
93 con: &'a mut (dyn ConnectionLike + 'a),
94 cmd: Cmd,
95}
96
97impl<T: FromRedisValue> Iterator for Iter<'_, T> {
98 type Item = T;
99
100 #[inline]
101 fn next(&mut self) -> Option<T> {
102 loop {
107 if let Some(v) = self.batch.next() {
108 return Some(v);
109 };
110 if self.cursor == 0 {
111 return None;
112 }
113
114 let pcmd = self.cmd.get_packed_command_with_cursor(self.cursor)?;
115 let rv = self.con.req_packed_command(&pcmd).ok()?;
116 let (cur, batch): (u64, Vec<T>) = from_owned_redis_value(rv).ok()?;
117
118 self.cursor = cur;
119 self.batch = batch.into_iter();
120 }
121 }
122}
123
124#[cfg(feature = "aio")]
125use crate::aio::ConnectionLike as AsyncConnection;
126
127#[cfg(feature = "aio")]
129struct AsyncIterInner<'a, T: FromRedisValue + 'a> {
130 batch: std::vec::IntoIter<T>,
131 con: &'a mut (dyn AsyncConnection + Send + 'a),
132 cmd: Cmd,
133}
134
135#[cfg(feature = "aio")]
137enum IterOrFuture<'a, T: FromRedisValue + 'a> {
138 Iter(AsyncIterInner<'a, T>),
139 Future(BoxFuture<'a, (AsyncIterInner<'a, T>, Option<T>)>),
140 Empty,
141}
142
143#[cfg(feature = "aio")]
145pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
146 inner: IterOrFuture<'a, T>,
147}
148
149#[cfg(feature = "aio")]
150impl<'a, T: FromRedisValue + 'a> AsyncIterInner<'a, T> {
151 #[inline]
152 pub async fn next_item(&mut self) -> Option<T> {
153 loop {
158 if let Some(v) = self.batch.next() {
159 return Some(v);
160 };
161 if let Some(cursor) = self.cmd.cursor {
162 if cursor == 0 {
163 return None;
164 }
165 } else {
166 return None;
167 }
168
169 let rv = self.con.req_packed_command(&self.cmd).await.ok()?;
170 let (cur, batch): (u64, Vec<T>) = from_owned_redis_value(rv).ok()?;
171
172 self.cmd.cursor = Some(cur);
173 self.batch = batch.into_iter();
174 }
175 }
176}
177
178#[cfg(feature = "aio")]
179impl<'a, T: FromRedisValue + 'a + Unpin + Send> AsyncIter<'a, T> {
180 #[inline]
195 pub async fn next_item(&mut self) -> Option<T> {
196 StreamExt::next(self).await
197 }
198}
199
200#[cfg(feature = "aio")]
201impl<'a, T: FromRedisValue + Unpin + Send + 'a> Stream for AsyncIter<'a, T> {
202 type Item = T;
203
204 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
205 let this = self.get_mut();
206 let inner = std::mem::replace(&mut this.inner, IterOrFuture::Empty);
207 match inner {
208 IterOrFuture::Iter(mut iter) => {
209 let fut = async move {
210 let next_item = iter.next_item().await;
211 (iter, next_item)
212 };
213 this.inner = IterOrFuture::Future(Box::pin(fut));
214 Pin::new(this).poll_next(cx)
215 }
216 IterOrFuture::Future(mut fut) => match fut.as_mut().poll(cx) {
217 Poll::Pending => {
218 this.inner = IterOrFuture::Future(fut);
219 Poll::Pending
220 }
221 Poll::Ready((iter, value)) => {
222 this.inner = IterOrFuture::Iter(iter);
223 Poll::Ready(value)
224 }
225 },
226 IterOrFuture::Empty => unreachable!(),
227 }
228 }
229}
230
231fn countdigits(mut v: usize) -> usize {
232 let mut result = 1;
233 loop {
234 if v < 10 {
235 return result;
236 }
237 if v < 100 {
238 return result + 1;
239 }
240 if v < 1000 {
241 return result + 2;
242 }
243 if v < 10000 {
244 return result + 3;
245 }
246
247 v /= 10000;
248 result += 4;
249 }
250}
251
252#[inline]
253fn bulklen(len: usize) -> usize {
254 1 + countdigits(len) + 2 + len + 2
255}
256
257fn args_len<'a, I>(args: I, cursor: u64) -> usize
258where
259 I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
260{
261 let mut totlen = 1 + countdigits(args.len()) + 2;
262 for item in args {
263 totlen += bulklen(match item {
264 Arg::Cursor => countdigits(cursor as usize),
265 Arg::Simple(val) => val.len(),
266 });
267 }
268 totlen
269}
270
271pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
272 args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
273}
274
275fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
276where
277 I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
278{
279 let mut cmd = Vec::new();
280 write_command_to_vec(&mut cmd, args, cursor);
281 cmd
282}
283
284fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
285where
286 I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
287{
288 let totlen = args_len(args.clone(), cursor);
289
290 cmd.reserve(totlen);
291
292 write_command(cmd, args, cursor).unwrap()
293}
294
295fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
296where
297 I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
298{
299 let mut buf = ::itoa::Buffer::new();
300
301 cmd.write_all(b"*")?;
302 let s = buf.format(args.len());
303 cmd.write_all(s.as_bytes())?;
304 cmd.write_all(b"\r\n")?;
305
306 let mut cursor_bytes = itoa::Buffer::new();
307 for item in args {
308 let bytes = match item {
309 Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
310 Arg::Simple(val) => val,
311 };
312
313 cmd.write_all(b"$")?;
314 let s = buf.format(bytes.len());
315 cmd.write_all(s.as_bytes())?;
316 cmd.write_all(b"\r\n")?;
317
318 cmd.write_all(bytes)?;
319 cmd.write_all(b"\r\n")?;
320 }
321 Ok(())
322}
323
324impl RedisWrite for Cmd {
325 fn write_arg(&mut self, arg: &[u8]) {
326 self.data.extend_from_slice(arg);
327 self.args.push(Arg::Simple(self.data.len()));
328 }
329
330 fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
331 use std::io::Write;
332 write!(self.data, "{arg}").unwrap();
333 self.args.push(Arg::Simple(self.data.len()));
334 }
335
336 fn writer_for_next_arg(&mut self) -> impl std::io::Write + '_ {
337 struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
338 impl Drop for CmdBufferedArgGuard<'_> {
339 fn drop(&mut self) {
340 self.0.args.push(Arg::Simple(self.0.data.len()));
341 }
342 }
343 impl std::io::Write for CmdBufferedArgGuard<'_> {
344 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
345 self.0.data.extend_from_slice(buf);
346 Ok(buf.len())
347 }
348
349 fn flush(&mut self) -> std::io::Result<()> {
350 Ok(())
351 }
352 }
353
354 CmdBufferedArgGuard(self)
355 }
356
357 fn reserve_space_for_args(&mut self, additional: impl IntoIterator<Item = usize>) {
358 let mut capacity = 0;
359 let mut args = 0;
360 for add in additional {
361 capacity += add;
362 args += 1;
363 }
364 self.data.reserve(capacity);
365 self.args.reserve(args);
366 }
367
368 #[cfg(feature = "bytes")]
369 fn bufmut_for_next_arg(&mut self, capacity: usize) -> impl bytes::BufMut + '_ {
370 self.data.reserve(capacity);
371 struct CmdBufferedArgGuard<'a>(&'a mut Cmd);
372 impl Drop for CmdBufferedArgGuard<'_> {
373 fn drop(&mut self) {
374 self.0.args.push(Arg::Simple(self.0.data.len()));
375 }
376 }
377 unsafe impl bytes::BufMut for CmdBufferedArgGuard<'_> {
378 fn remaining_mut(&self) -> usize {
379 self.0.data.remaining_mut()
380 }
381
382 unsafe fn advance_mut(&mut self, cnt: usize) {
383 self.0.data.advance_mut(cnt);
384 }
385
386 fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
387 self.0.data.chunk_mut()
388 }
389
390 fn put<T: bytes::buf::Buf>(&mut self, src: T)
392 where
393 Self: Sized,
394 {
395 self.0.data.put(src);
396 }
397
398 fn put_slice(&mut self, src: &[u8]) {
399 self.0.data.put_slice(src);
400 }
401
402 fn put_bytes(&mut self, val: u8, cnt: usize) {
403 self.0.data.put_bytes(val, cnt);
404 }
405 }
406
407 CmdBufferedArgGuard(self)
408 }
409}
410
411impl Default for Cmd {
412 fn default() -> Cmd {
413 Cmd::new()
414 }
415}
416
417impl Cmd {
445 pub fn new() -> Cmd {
447 Cmd {
448 data: vec![],
449 args: vec![],
450 cursor: None,
451 no_response: false,
452 #[cfg(feature = "cache-aio")]
453 cache: None,
454 }
455 }
456
457 pub fn with_capacity(arg_count: usize, size_of_data: usize) -> Cmd {
459 Cmd {
460 data: Vec::with_capacity(size_of_data),
461 args: Vec::with_capacity(arg_count),
462 cursor: None,
463 no_response: false,
464 #[cfg(feature = "cache-aio")]
465 cache: None,
466 }
467 }
468
469 #[cfg(test)]
471 #[allow(dead_code)]
472 pub(crate) fn capacity(&self) -> (usize, usize) {
473 (self.args.capacity(), self.data.capacity())
474 }
475
476 #[inline]
490 pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
491 arg.write_redis_args(self);
492 self
493 }
494
495 #[inline]
511 pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
512 assert!(!self.in_scan_mode());
513 self.cursor = Some(cursor);
514 self.args.push(Arg::Cursor);
515 self
516 }
517
518 #[inline]
520 pub fn get_packed_command(&self) -> Vec<u8> {
521 let mut cmd = Vec::new();
522 self.write_packed_command(&mut cmd);
523 cmd
524 }
525
526 pub(crate) fn write_packed_command(&self, cmd: &mut Vec<u8>) {
527 write_command_to_vec(cmd, self.args_iter(), self.cursor.unwrap_or(0))
528 }
529
530 pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
531 write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
532 }
533
534 #[inline]
538 fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
539 if !self.in_scan_mode() {
540 None
541 } else {
542 Some(encode_command(self.args_iter(), cursor))
543 }
544 }
545
546 #[inline]
548 pub fn in_scan_mode(&self) -> bool {
549 self.cursor.is_some()
550 }
551
552 #[inline]
556 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
557 match con.req_command(self) {
558 Ok(val) => from_owned_redis_value(val.extract_error()?),
559 Err(e) => Err(e),
560 }
561 }
562
563 #[inline]
565 #[cfg(feature = "aio")]
566 pub async fn query_async<T: FromRedisValue>(
567 &self,
568 con: &mut impl crate::aio::ConnectionLike,
569 ) -> RedisResult<T> {
570 let val = con.req_packed_command(self).await?;
571 from_owned_redis_value(val.extract_error()?)
572 }
573
574 #[inline]
589 pub fn iter<T: FromRedisValue>(self, con: &mut dyn ConnectionLike) -> RedisResult<Iter<'_, T>> {
590 let rv = con.req_command(&self)?;
591
592 let (cursor, batch) = if rv.looks_like_cursor() {
593 from_owned_redis_value::<(u64, Vec<T>)>(rv)?
594 } else {
595 (0, from_owned_redis_value(rv)?)
596 };
597
598 Ok(Iter {
599 batch: batch.into_iter(),
600 cursor,
601 con,
602 cmd: self,
603 })
604 }
605
606 #[cfg(feature = "aio")]
622 #[inline]
623 pub async fn iter_async<'a, T: FromRedisValue + 'a>(
624 mut self,
625 con: &'a mut (dyn AsyncConnection + Send),
626 ) -> RedisResult<AsyncIter<'a, T>> {
627 let rv = con.req_packed_command(&self).await?;
628
629 let (cursor, batch) = if rv.looks_like_cursor() {
630 from_owned_redis_value::<(u64, Vec<T>)>(rv)?
631 } else {
632 (0, from_owned_redis_value(rv)?)
633 };
634 if cursor == 0 {
635 self.cursor = None;
636 } else {
637 self.cursor = Some(cursor);
638 }
639
640 Ok(AsyncIter {
641 inner: IterOrFuture::Iter(AsyncIterInner {
642 batch: batch.into_iter(),
643 con,
644 cmd: self,
645 }),
646 })
647 }
648
649 #[inline]
662 #[deprecated(note = "Use Cmd::exec + unwrap, instead")]
663 pub fn execute(&self, con: &mut dyn ConnectionLike) {
664 self.exec(con).unwrap();
665 }
666
667 #[inline]
672 pub fn exec(&self, con: &mut dyn ConnectionLike) -> RedisResult<()> {
673 self.query::<()>(con)
674 }
675
676 #[cfg(feature = "aio")]
681 pub async fn exec_async(&self, con: &mut impl crate::aio::ConnectionLike) -> RedisResult<()> {
682 self.query_async::<()>(con).await
683 }
684
685 pub fn args_iter(&self) -> impl Clone + ExactSizeIterator<Item = Arg<&[u8]>> {
687 let mut prev = 0;
688 self.args.iter().map(move |arg| match *arg {
689 Arg::Simple(i) => {
690 let arg = Arg::Simple(&self.data[prev..i]);
691 prev = i;
692 arg
693 }
694
695 Arg::Cursor => Arg::Cursor,
696 })
697 }
698
699 #[cfg(any(feature = "cluster", feature = "cache-aio"))]
701 pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
702 if idx >= self.args.len() {
703 return None;
704 }
705
706 let start = if idx == 0 {
707 0
708 } else {
709 match self.args[idx - 1] {
710 Arg::Simple(n) => n,
711 _ => 0,
712 }
713 };
714 let end = match self.args[idx] {
715 Arg::Simple(n) => n,
716 _ => 0,
717 };
718 if start == 0 && end == 0 {
719 return None;
720 }
721 Some(&self.data[start..end])
722 }
723
724 #[inline]
726 pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd {
727 self.no_response = nr;
728 self
729 }
730
731 #[inline]
733 pub fn is_no_response(&self) -> bool {
734 self.no_response
735 }
736
737 #[cfg(feature = "cache-aio")]
739 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
740 pub fn set_cache_config(&mut self, command_cache_config: CommandCacheConfig) -> &mut Cmd {
741 self.cache = Some(command_cache_config);
742 self
743 }
744
745 #[cfg(feature = "cache-aio")]
746 #[inline]
747 pub(crate) fn get_cache_config(&self) -> &Option<CommandCacheConfig> {
748 &self.cache
749 }
750}
751
752pub fn cmd(name: &str) -> Cmd {
762 let mut rv = Cmd::new();
763 rv.arg(name);
764 rv
765}
766
767pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
786 encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
787}
788
789pub fn pipe() -> Pipeline {
791 Pipeline::new()
792}
793
794#[cfg(test)]
795mod tests {
796 use super::Cmd;
797 #[cfg(feature = "bytes")]
798 use bytes::BufMut;
799
800 use crate::RedisWrite;
801 use std::io::Write;
802
803 #[test]
804 fn test_cmd_writer_for_next_arg() {
805 let mut c1 = Cmd::new();
808 {
809 let mut c1_writer = c1.writer_for_next_arg();
810 c1_writer.write_all(b"foo").unwrap();
811 c1_writer.write_all(b"bar").unwrap();
812 c1_writer.flush().unwrap();
813 }
814 let v1 = c1.get_packed_command();
815
816 let mut c2 = Cmd::new();
817 c2.write_arg(b"foobar");
818 let v2 = c2.get_packed_command();
819
820 assert_eq!(v1, v2);
821 }
822
823 #[test]
826 fn test_cmd_writer_for_next_arg_multiple() {
827 let mut c1 = Cmd::new();
828 {
829 let mut c1_writer = c1.writer_for_next_arg();
830 c1_writer.write_all(b"foo").unwrap();
831 c1_writer.write_all(b"bar").unwrap();
832 c1_writer.flush().unwrap();
833 }
834 {
835 let mut c1_writer = c1.writer_for_next_arg();
836 c1_writer.write_all(b"baz").unwrap();
837 c1_writer.write_all(b"qux").unwrap();
838 c1_writer.flush().unwrap();
839 }
840 let v1 = c1.get_packed_command();
841
842 let mut c2 = Cmd::new();
843 c2.write_arg(b"foobar");
844 c2.write_arg(b"bazqux");
845 let v2 = c2.get_packed_command();
846
847 assert_eq!(v1, v2);
848 }
849
850 #[test]
852 fn test_cmd_writer_for_next_arg_empty() {
853 let mut c1 = Cmd::new();
854 {
855 let mut c1_writer = c1.writer_for_next_arg();
856 c1_writer.flush().unwrap();
857 }
858 let v1 = c1.get_packed_command();
859
860 let mut c2 = Cmd::new();
861 c2.write_arg(b"");
862 let v2 = c2.get_packed_command();
863
864 assert_eq!(v1, v2);
865 }
866
867 #[cfg(feature = "bytes")]
868 #[test]
871 fn test_cmd_bufmut_for_next_arg() {
872 let mut c1 = Cmd::new();
873 {
874 let mut c1_writer = c1.bufmut_for_next_arg(6);
875 c1_writer.put_slice(b"foo");
876 c1_writer.put_slice(b"bar");
877 }
878 let v1 = c1.get_packed_command();
879
880 let mut c2 = Cmd::new();
881 c2.write_arg(b"foobar");
882 let v2 = c2.get_packed_command();
883
884 assert_eq!(v1, v2);
885 }
886
887 #[cfg(feature = "bytes")]
888 #[test]
891 fn test_cmd_bufmut_for_next_arg_multiple() {
892 let mut c1 = Cmd::new();
893 {
894 let mut c1_writer = c1.bufmut_for_next_arg(6);
895 c1_writer.put_slice(b"foo");
896 c1_writer.put_slice(b"bar");
897 }
898 {
899 let mut c1_writer = c1.bufmut_for_next_arg(6);
900 c1_writer.put_slice(b"baz");
901 c1_writer.put_slice(b"qux");
902 }
903 let v1 = c1.get_packed_command();
904
905 let mut c2 = Cmd::new();
906 c2.write_arg(b"foobar");
907 c2.write_arg(b"bazqux");
908 let v2 = c2.get_packed_command();
909
910 assert_eq!(v1, v2);
911 }
912
913 #[cfg(feature = "bytes")]
914 #[test]
916 fn test_cmd_bufmut_for_next_arg_empty() {
917 let mut c1 = Cmd::new();
918 {
919 let _c1_writer = c1.bufmut_for_next_arg(0);
920 }
921 let v1 = c1.get_packed_command();
922
923 let mut c2 = Cmd::new();
924 c2.write_arg(b"");
925 let v2 = c2.get_packed_command();
926
927 assert_eq!(v1, v2);
928 }
929
930 #[test]
931 #[cfg(feature = "cluster")]
932 fn test_cmd_arg_idx() {
933 let mut c = Cmd::new();
934 assert_eq!(c.arg_idx(0), None);
935
936 c.arg("SET");
937 assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
938 assert_eq!(c.arg_idx(1), None);
939
940 c.arg("foo").arg("42");
941 assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
942 assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
943 assert_eq!(c.arg_idx(3), None);
944 assert_eq!(c.arg_idx(4), None);
945 }
946}