redis_ac/
stream.rs

1use futures::{prelude::*, try_ready};
2use redis::{aio::ConnectionLike, Cmd, FromRedisValue, RedisError, RedisFuture};
3use std::collections::VecDeque;
4
5/// Stream over items of scan commands.
6pub struct RedisScanStream<C, RV> {
7    cursor: u64,
8    con: Option<C>,
9    factory: Box<dyn Fn(u64) -> Cmd + Send>,
10    pending: Option<RedisFuture<(C, (u64, Vec<RV>))>>,
11    queue: VecDeque<RV>,
12}
13
14pub fn stream<F, C, RV>(con: C, factory: F) -> RedisScanStream<C, RV>
15where
16    C: ConnectionLike + Send + 'static,
17    RV: FromRedisValue + Send + 'static,
18    F: Fn(u64) -> Cmd + Send + 'static,
19{
20    RedisScanStream::new(con, factory)
21}
22
23impl<C, RV> RedisScanStream<C, RV>
24where
25    C: ConnectionLike + Send + 'static,
26    RV: FromRedisValue + Send + 'static,
27{
28    pub(crate) fn new<F: Fn(u64) -> Cmd + Send + 'static>(con: C, factory: F) -> Self {
29        // Create initial query
30        let pending = factory(0).query_async(con);
31
32        Self {
33            cursor: 0,
34            con: None,
35            factory: Box::new(factory),
36            pending: Some(pending),
37            queue: VecDeque::new(),
38        }
39    }
40
41    // This function actually never return Ok(Async::Ready(Some(_)))
42    fn poll_query(&mut self) -> Poll<Option<(Option<C>, Option<RV>)>, RedisError> {
43        loop {
44            // Try polling
45            let p = self.pending.as_mut().map(|p| p.poll());
46
47            if let Some(p) = p {
48                let (con, (cursor, rvs)) = try_ready!(p);
49                self.cursor = cursor;
50                self.queue.extend(rvs);
51                self.con = Some(con);
52
53                if self.cursor != 0 {
54                    // Query again
55                    self.pending =
56                        Some((self.factory)(self.cursor).query_async(self.con.take().unwrap()));
57                } else {
58                    self.pending = None;
59                }
60            } else {
61                // No need to query anymore
62                return Ok(Async::Ready(None));
63            }
64        }
65    }
66
67    /// Collects all the results of scanning.
68    ///
69    /// ```rust,no_run
70    /// use futures::prelude::*;
71    /// use redis_ac::Commands;
72    ///
73    /// # fn main() {
74    /// let client = redis::Client::open("redis://127.0.0.1").unwrap();
75    /// let connect = client.get_async_connection();
76    ///
77    /// let f = connect.and_then(|con|{
78    ///     con.scan_match("key*")
79    ///         .all()
80    ///         .map(|(_, items): (_, Vec<String>)| {
81    ///             // All items retrieved by `scan_match`.
82    ///             println!("{:?}", items)
83    ///         })
84    /// }).map_err(|e| eprintln!("{}", e));
85    ///
86    /// tokio::run(f);
87    /// # }
88    /// ```
89    pub fn all(self) -> RedisScanAll<C, RV> {
90        RedisScanAll::new(self)
91    }
92}
93
94impl<C, RV> Stream for RedisScanStream<C, RV>
95where
96    C: ConnectionLike + Send + 'static,
97    RV: FromRedisValue + Send + 'static,
98{
99    type Item = (Option<C>, Option<RV>);
100    type Error = RedisError;
101
102    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
103        let ready = self.poll_query();
104
105        if let Some(item) = self.queue.pop_front() {
106            let con = if self.queue.is_empty() {
107                // `self.con` becomes `Some(con)` only after all the query is done.
108                self.con.take()
109            } else {
110                None
111            };
112
113            Ok(Async::Ready(Some((con, Some(item)))))
114        } else {
115            match ready {
116                Ok(Async::Ready(None)) => {
117                    // At the end, try to return the connection if it's not yet returned.
118                    Ok(Async::Ready(self.con.take().map(|con| (Some(con), None))))
119                }
120                ready => ready,
121            }
122        }
123    }
124}
125
126/// Collects all the results from a scan command.
127pub struct RedisScanAll<C, RV> {
128    items: Vec<RV>,
129    inner: RedisScanStream<C, RV>,
130}
131
132impl<C, RV> RedisScanAll<C, RV> {
133    fn new(inner: RedisScanStream<C, RV>) -> Self {
134        Self {
135            items: Vec::new(),
136            inner,
137        }
138    }
139}
140
141impl<C, RV> Future for RedisScanAll<C, RV>
142where
143    C: ConnectionLike + Send + 'static,
144    RV: FromRedisValue + Send + 'static,
145{
146    type Item = (C, Vec<RV>);
147    type Error = RedisError;
148
149    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
150        loop {
151            match try_ready!(self.inner.poll()) {
152                Some((con, item)) => {
153                    if let Some(item) = item {
154                        self.items.push(item);
155                    }
156                    if let Some(con) = con {
157                        // RedisScanStream guarantees it returns `Some(con)` with last item.
158                        return Ok(Async::Ready((con, self.items.split_off(0))));
159                    }
160                }
161                None => unreachable!("RedisScanStream didn't return connection"),
162            }
163        }
164    }
165}