redis_queue/
queue.rs

1use core::{fmt, time};
2use core::future::Future;
3use std::borrow::Cow;
4
5use redis::{Cmd, ToRedisArgs, RedisError, FromRedisValue};
6use redis::aio::ConnectionManager;
7
8use crate::types::{idents, RedisType, TimestampId, StreamId, TrimMethod, GroupInfo, PendingStats, EntryValue, PendingEntry, PendingParams, PendingParamsConfig, FetchParams, FetchParamsConfig, FetchResult, FetchEntries};
9use crate::iters::{FetchIter, PendingIter};
10
11#[derive(Clone)]
12///Queue configuration
13pub struct QueueConfig {
14    ///Stream name to be used by `Queue`
15    pub stream: Cow<'static, str>,
16}
17
18#[derive(Clone)]
19///Queue
20pub struct Queue {
21    config: QueueConfig,
22    conn: ConnectionManager,
23}
24
25impl Queue {
26    ///Creates new instance from existing connection
27    pub fn new(config: QueueConfig, conn: ConnectionManager) -> Self {
28        Self {
29            config,
30            conn,
31        }
32    }
33
34    ///Creates command with specified `name` targeting configured `stream`
35    fn cmd(&self, name: &str) -> Cmd {
36        let mut cmd = Cmd::new();
37        cmd.arg(name).arg(self.config.stream.as_ref());
38        cmd
39    }
40
41    #[inline(always)]
42    ///Gets underlying connection
43    pub fn connection(&self) -> ConnectionManager {
44        self.conn.clone()
45    }
46
47    ///Creates group within queue where pending messages are stored.
48    ///
49    ///User MUST create group before using it.
50    ///
51    ///If config's `stream` doesn't exist yet, creates it
52    pub async fn create_group(&self, group: &str) -> Result<(), RedisError> {
53        let mut conn = self.connection();
54        let mut cmd = self.cmd(idents::TYPE);
55        let typ: RedisType = cmd.query_async(&mut conn).await?;
56        match typ {
57            //Make sure if key doesn't exist or it is stream, otherwise we should fail immediately
58            RedisType::Stream | RedisType::None => (),
59            _ => return Err((redis::ErrorKind::ClientError, "key is already used by non-stream type").into()),
60        }
61
62        cmd = Cmd::new();
63        cmd.arg(idents::XGROUP)
64           .arg(idents::CREATE)
65           .arg(self.config.stream.as_ref())
66           .arg(group)
67           .arg("$")
68           .arg(idents::MKSTREAM);
69
70        match cmd.query_async(&mut conn).await {
71            Ok(()) => Ok(()),
72            Err(error) => match error.code() {
73                //Group already exists
74                Some(idents::BUSYGROUP) => Ok(()),
75                _ => Err(error),
76            },
77        }
78    }
79
80    async fn inner_time_ref(conn: &mut ConnectionManager) -> Result<time::Duration, RedisError> {
81        let mut cmd = Cmd::new();
82        //https://redis.io/commands/time/
83        //The TIME command returns the current server time as a two items lists: a Unix timestamp
84        //and the amount of microseconds already elapsed in the current second
85        cmd.arg(idents::TIME);
86        let result: (u64, u64) = cmd.query_async(conn).await?;
87        let secs = time::Duration::from_secs(result.0);
88        let micros = time::Duration::from_micros(result.1);
89        Ok(secs + micros)
90    }
91
92    #[inline(always)]
93    async fn inner_time(mut conn: ConnectionManager) -> Result<time::Duration, RedisError> {
94        Self::inner_time_ref(&mut conn).await
95    }
96
97    #[inline(always)]
98    ///Returns redis's current time
99    pub fn time(&self) -> impl Future<Output = Result<time::Duration, RedisError>> + Send {
100        Self::inner_time(self.connection())
101    }
102
103    ///Returns number of elements within queue
104    pub async fn len(&self) -> Result<usize, RedisError> {
105        let mut conn = self.connection();
106
107        let cmd = self.cmd(idents::XLEN);
108        cmd.query_async(&mut conn).await
109    }
110
111    ///Marks specified `StreamId` as successfully consumed, resulting in corresponding messages' deletion.
112    pub async fn consume(&self, group: &str, ids: &[StreamId]) -> Result<usize, RedisError> {
113        let mut conn = self.connection();
114
115        let mut cmd = self.cmd(idents::XACK);
116        cmd.arg(group).arg(ids);
117        cmd.query_async(&mut conn).await
118    }
119
120    ///Requests to delete message from the stream.
121    pub async fn delete(&self, ids: &[StreamId]) -> Result<usize, RedisError> {
122        let mut conn = self.connection();
123
124        let mut cmd = self.cmd(idents::XDEL);
125        cmd.arg(ids);
126        cmd.query_async(&mut conn).await
127    }
128
129    ///Trims elements according to specified `method`
130    pub async fn trim(&self, method: TrimMethod) -> Result<u64, RedisError> {
131        let mut conn = self.connection();
132
133        let mut cmd = self.cmd(idents::XTRIM);
134        cmd.arg(method);
135        cmd.query_async(&mut conn).await
136    }
137
138    ///Purges whole message stream
139    pub async fn purge(&self) -> Result<(), RedisError> {
140        let mut conn = self.connection();
141        self.cmd("DEL").query_async(&mut conn).await
142    }
143
144    ///Retrieves summary of every group existing within stream
145    pub async fn groups_info(&self) -> Result<Vec<GroupInfo>, RedisError> {
146        let mut conn = self.connection();
147
148        let mut cmd = Cmd::new();
149        cmd.arg(idents::XINFO)
150           .arg(idents::GROUPS)
151           .arg(self.config.stream.as_ref());
152        cmd.query_async(&mut conn).await
153    }
154
155    ///Retrieves pending messages statistics for `group`
156    pub async fn pending_stats(&self, group: &str) -> Result<PendingStats, RedisError> {
157        let mut conn = self.connection();
158
159        let mut cmd = self.cmd(idents::XPENDING);
160        cmd.arg(&group);
161
162        cmd.query_async(&mut conn).await
163    }
164
165    ///Adds item to the queue at the end of queue.
166    ///
167    ///Returns `StreamId` of newly created item
168    pub async fn append<T: ToRedisArgs>(&self, item: &EntryValue<T>) -> Result<StreamId, RedisError> {
169        let mut conn = self.connection();
170
171        let mut cmd = self.cmd(idents::XADD);
172        cmd.arg("*").arg(item);
173        cmd.query_async(&mut conn).await
174    }
175
176    ///Adds item to the queue with ID generated from current time plus provided `delay`
177    ///
178    ///Returns `StreamId` of newly created item
179    pub async fn append_delayed<T: ToRedisArgs>(&self, item: &EntryValue<T>, delay: time::Duration) -> Result<StreamId, RedisError> {
180        let mut conn = self.connection();
181        let now = Self::inner_time_ref(&mut conn).await?.saturating_add(delay);
182        let id = TimestampId::new(now);
183
184        let mut cmd = self.cmd(idents::XADD);
185        cmd.arg(id).arg(item);
186        cmd.query_async(&mut conn).await
187    }
188
189    ///Retrieves pending messages within stream.
190    pub async fn pending(&self, params: &PendingParams<'_>) -> Result<Vec<PendingEntry>, RedisError> {
191        let mut conn = self.connection();
192
193        //Despite its name, `Cmd::arg` can be one or multiple arguments
194        //So we can encode everything using ToRedisArgs
195        let args = PendingParamsConfig {
196            params,
197            config: &self.config,
198        };
199        let mut cmd = Cmd::new();
200        cmd.arg(idents::XPENDING).arg(&args);
201
202        cmd.query_async(&mut conn).await
203    }
204
205    ///Attempts to fetch message from within queue.
206    ///
207    ///By new it means messages that are not read yet.
208    ///
209    ///Once message is read, it is added as pending to group, according to configuration.
210    ///
211    ///When processing is finished, user must acknowledge ids to remove them from pending group.
212    ///Until then these messages can be always re-fetched.
213    pub async fn fetch<T: FromRedisValue>(&self, params: &FetchParams<'_>) -> Result<FetchResult<T>, redis::RedisError> {
214        let mut conn = self.connection();
215
216        //Despite its name, `Cmd::arg` can be one or multiple arguments
217        //So we can encode everything using ToRedisArgs
218        let args = FetchParamsConfig {
219            params,
220            config: &self.config,
221        };
222        let mut cmd = Cmd::new();
223        cmd.arg(idents::XREADGROUP).arg(&args);
224
225        //If there are no messages, it returns NIL so handle it by returning result with empty entires to avoid extra Option indirection
226        match cmd.query_async::<_, Option<(FetchResult<T>,)>>(&mut conn).await? {
227            Some((res,)) => Ok(res),
228            None => Ok(FetchResult {
229                stream: args.config.stream.clone().into_owned(),
230                entries: Vec::new(),
231            }),
232        }
233    }
234
235    ///Attempts to fetch message from within queue.
236    ///
237    ///By new it means messages that are not read yet.
238    ///
239    ///Once message is read, it is added as pending to group, according to configuration.
240    ///
241    ///When processing is finished, user must acknowledge ids to remove them from pending group.
242    ///Until then these messages can be always re-fetched.
243    pub async fn fetch_entries<T: FromRedisValue>(&self, params: &FetchParams<'_>) -> Result<FetchEntries<T>, redis::RedisError> {
244        let mut conn = self.connection();
245
246        //Despite its name, `Cmd::arg` can be one or multiple arguments
247        //So we can encode everything using ToRedisArgs
248        let args = FetchParamsConfig {
249            params,
250            config: &self.config,
251        };
252        let mut cmd = Cmd::new();
253        cmd.arg(idents::XREADGROUP).arg(&args);
254
255        //If there are no messages, it returns NIL so handle it by returning result with empty entires to avoid extra Option indirection
256        match cmd.query_async::<_, Option<(FetchEntries<T>,)>>(&mut conn).await? {
257            Some((res,)) => Ok(res),
258            None => Ok(FetchEntries {
259                entries: Vec::new(),
260            }),
261        }
262    }
263
264    #[inline(always)]
265    ///Creates new fetch iterator.
266    ///
267    ///This is just useful utility when there is no need to change `params` at runtime.
268    pub fn fetch_iter<'a>(&self, params: FetchParams<'a>) -> FetchIter<'a> {
269        FetchIter::new(params, self.clone())
270    }
271
272    #[inline(always)]
273    ///Creates new pending info iterator.
274    ///
275    ///This is just useful utility when there is no need to change `params` at runtime.
276    pub fn pending_iter<'a>(&self, params: PendingParams<'a>) -> PendingIter<'a> {
277        PendingIter::new(params, self.clone())
278    }
279}
280
281impl fmt::Debug for Queue {
282    #[inline]
283    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
284        fmt.debug_struct("Queue").field("name", &self.config.stream).finish()
285    }
286}