1use futures::{prelude::*, try_ready};
2use redis::{aio::ConnectionLike, Cmd, FromRedisValue, RedisError, RedisFuture};
3use std::collections::VecDeque;
4
5pub struct RedisScanStream<C, RV> {
7 cursor: u64,
8 con: Option<C>,
9 factory: Box<dyn Fn(u64) -> Cmd + Send>,
10 pending: Option<RedisFuture<(C, (u64, Vec<RV>))>>,
11 queue: VecDeque<RV>,
12}
13
14pub fn stream<F, C, RV>(con: C, factory: F) -> RedisScanStream<C, RV>
15where
16 C: ConnectionLike + Send + 'static,
17 RV: FromRedisValue + Send + 'static,
18 F: Fn(u64) -> Cmd + Send + 'static,
19{
20 RedisScanStream::new(con, factory)
21}
22
23impl<C, RV> RedisScanStream<C, RV>
24where
25 C: ConnectionLike + Send + 'static,
26 RV: FromRedisValue + Send + 'static,
27{
28 pub(crate) fn new<F: Fn(u64) -> Cmd + Send + 'static>(con: C, factory: F) -> Self {
29 let pending = factory(0).query_async(con);
31
32 Self {
33 cursor: 0,
34 con: None,
35 factory: Box::new(factory),
36 pending: Some(pending),
37 queue: VecDeque::new(),
38 }
39 }
40
41 fn poll_query(&mut self) -> Poll<Option<(Option<C>, Option<RV>)>, RedisError> {
43 loop {
44 let p = self.pending.as_mut().map(|p| p.poll());
46
47 if let Some(p) = p {
48 let (con, (cursor, rvs)) = try_ready!(p);
49 self.cursor = cursor;
50 self.queue.extend(rvs);
51 self.con = Some(con);
52
53 if self.cursor != 0 {
54 self.pending =
56 Some((self.factory)(self.cursor).query_async(self.con.take().unwrap()));
57 } else {
58 self.pending = None;
59 }
60 } else {
61 return Ok(Async::Ready(None));
63 }
64 }
65 }
66
67 pub fn all(self) -> RedisScanAll<C, RV> {
90 RedisScanAll::new(self)
91 }
92}
93
94impl<C, RV> Stream for RedisScanStream<C, RV>
95where
96 C: ConnectionLike + Send + 'static,
97 RV: FromRedisValue + Send + 'static,
98{
99 type Item = (Option<C>, Option<RV>);
100 type Error = RedisError;
101
102 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
103 let ready = self.poll_query();
104
105 if let Some(item) = self.queue.pop_front() {
106 let con = if self.queue.is_empty() {
107 self.con.take()
109 } else {
110 None
111 };
112
113 Ok(Async::Ready(Some((con, Some(item)))))
114 } else {
115 match ready {
116 Ok(Async::Ready(None)) => {
117 Ok(Async::Ready(self.con.take().map(|con| (Some(con), None))))
119 }
120 ready => ready,
121 }
122 }
123 }
124}
125
126pub struct RedisScanAll<C, RV> {
128 items: Vec<RV>,
129 inner: RedisScanStream<C, RV>,
130}
131
132impl<C, RV> RedisScanAll<C, RV> {
133 fn new(inner: RedisScanStream<C, RV>) -> Self {
134 Self {
135 items: Vec::new(),
136 inner,
137 }
138 }
139}
140
141impl<C, RV> Future for RedisScanAll<C, RV>
142where
143 C: ConnectionLike + Send + 'static,
144 RV: FromRedisValue + Send + 'static,
145{
146 type Item = (C, Vec<RV>);
147 type Error = RedisError;
148
149 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
150 loop {
151 match try_ready!(self.inner.poll()) {
152 Some((con, item)) => {
153 if let Some(item) = item {
154 self.items.push(item);
155 }
156 if let Some(con) = con {
157 return Ok(Async::Ready((con, self.items.split_off(0))));
159 }
160 }
161 None => unreachable!("RedisScanStream didn't return connection"),
162 }
163 }
164 }
165}