1use core::{fmt, time};
2use core::future::Future;
3use std::borrow::Cow;
4
5use redis::{Cmd, ToRedisArgs, RedisError, FromRedisValue};
6use redis::aio::ConnectionManager;
7
8use crate::types::{idents, RedisType, TimestampId, StreamId, TrimMethod, GroupInfo, PendingStats, EntryValue, PendingEntry, PendingParams, PendingParamsConfig, FetchParams, FetchParamsConfig, FetchResult, FetchEntries};
9use crate::iters::{FetchIter, PendingIter};
10
11#[derive(Clone)]
12pub struct QueueConfig {
14 pub stream: Cow<'static, str>,
16}
17
18#[derive(Clone)]
19pub struct Queue {
21 config: QueueConfig,
22 conn: ConnectionManager,
23}
24
25impl Queue {
26 pub fn new(config: QueueConfig, conn: ConnectionManager) -> Self {
28 Self {
29 config,
30 conn,
31 }
32 }
33
34 fn cmd(&self, name: &str) -> Cmd {
36 let mut cmd = Cmd::new();
37 cmd.arg(name).arg(self.config.stream.as_ref());
38 cmd
39 }
40
41 #[inline(always)]
42 pub fn connection(&self) -> ConnectionManager {
44 self.conn.clone()
45 }
46
47 pub async fn create_group(&self, group: &str) -> Result<(), RedisError> {
53 let mut conn = self.connection();
54 let mut cmd = self.cmd(idents::TYPE);
55 let typ: RedisType = cmd.query_async(&mut conn).await?;
56 match typ {
57 RedisType::Stream | RedisType::None => (),
59 _ => return Err((redis::ErrorKind::ClientError, "key is already used by non-stream type").into()),
60 }
61
62 cmd = Cmd::new();
63 cmd.arg(idents::XGROUP)
64 .arg(idents::CREATE)
65 .arg(self.config.stream.as_ref())
66 .arg(group)
67 .arg("$")
68 .arg(idents::MKSTREAM);
69
70 match cmd.query_async(&mut conn).await {
71 Ok(()) => Ok(()),
72 Err(error) => match error.code() {
73 Some(idents::BUSYGROUP) => Ok(()),
75 _ => Err(error),
76 },
77 }
78 }
79
80 async fn inner_time_ref(conn: &mut ConnectionManager) -> Result<time::Duration, RedisError> {
81 let mut cmd = Cmd::new();
82 cmd.arg(idents::TIME);
86 let result: (u64, u64) = cmd.query_async(conn).await?;
87 let secs = time::Duration::from_secs(result.0);
88 let micros = time::Duration::from_micros(result.1);
89 Ok(secs + micros)
90 }
91
92 #[inline(always)]
93 async fn inner_time(mut conn: ConnectionManager) -> Result<time::Duration, RedisError> {
94 Self::inner_time_ref(&mut conn).await
95 }
96
97 #[inline(always)]
98 pub fn time(&self) -> impl Future<Output = Result<time::Duration, RedisError>> + Send {
100 Self::inner_time(self.connection())
101 }
102
103 pub async fn len(&self) -> Result<usize, RedisError> {
105 let mut conn = self.connection();
106
107 let cmd = self.cmd(idents::XLEN);
108 cmd.query_async(&mut conn).await
109 }
110
111 pub async fn consume(&self, group: &str, ids: &[StreamId]) -> Result<usize, RedisError> {
113 let mut conn = self.connection();
114
115 let mut cmd = self.cmd(idents::XACK);
116 cmd.arg(group).arg(ids);
117 cmd.query_async(&mut conn).await
118 }
119
120 pub async fn delete(&self, ids: &[StreamId]) -> Result<usize, RedisError> {
122 let mut conn = self.connection();
123
124 let mut cmd = self.cmd(idents::XDEL);
125 cmd.arg(ids);
126 cmd.query_async(&mut conn).await
127 }
128
129 pub async fn trim(&self, method: TrimMethod) -> Result<u64, RedisError> {
131 let mut conn = self.connection();
132
133 let mut cmd = self.cmd(idents::XTRIM);
134 cmd.arg(method);
135 cmd.query_async(&mut conn).await
136 }
137
138 pub async fn purge(&self) -> Result<(), RedisError> {
140 let mut conn = self.connection();
141 self.cmd("DEL").query_async(&mut conn).await
142 }
143
144 pub async fn groups_info(&self) -> Result<Vec<GroupInfo>, RedisError> {
146 let mut conn = self.connection();
147
148 let mut cmd = Cmd::new();
149 cmd.arg(idents::XINFO)
150 .arg(idents::GROUPS)
151 .arg(self.config.stream.as_ref());
152 cmd.query_async(&mut conn).await
153 }
154
155 pub async fn pending_stats(&self, group: &str) -> Result<PendingStats, RedisError> {
157 let mut conn = self.connection();
158
159 let mut cmd = self.cmd(idents::XPENDING);
160 cmd.arg(&group);
161
162 cmd.query_async(&mut conn).await
163 }
164
165 pub async fn append<T: ToRedisArgs>(&self, item: &EntryValue<T>) -> Result<StreamId, RedisError> {
169 let mut conn = self.connection();
170
171 let mut cmd = self.cmd(idents::XADD);
172 cmd.arg("*").arg(item);
173 cmd.query_async(&mut conn).await
174 }
175
176 pub async fn append_delayed<T: ToRedisArgs>(&self, item: &EntryValue<T>, delay: time::Duration) -> Result<StreamId, RedisError> {
180 let mut conn = self.connection();
181 let now = Self::inner_time_ref(&mut conn).await?.saturating_add(delay);
182 let id = TimestampId::new(now);
183
184 let mut cmd = self.cmd(idents::XADD);
185 cmd.arg(id).arg(item);
186 cmd.query_async(&mut conn).await
187 }
188
189 pub async fn pending(&self, params: &PendingParams<'_>) -> Result<Vec<PendingEntry>, RedisError> {
191 let mut conn = self.connection();
192
193 let args = PendingParamsConfig {
196 params,
197 config: &self.config,
198 };
199 let mut cmd = Cmd::new();
200 cmd.arg(idents::XPENDING).arg(&args);
201
202 cmd.query_async(&mut conn).await
203 }
204
205 pub async fn fetch<T: FromRedisValue>(&self, params: &FetchParams<'_>) -> Result<FetchResult<T>, redis::RedisError> {
214 let mut conn = self.connection();
215
216 let args = FetchParamsConfig {
219 params,
220 config: &self.config,
221 };
222 let mut cmd = Cmd::new();
223 cmd.arg(idents::XREADGROUP).arg(&args);
224
225 match cmd.query_async::<_, Option<(FetchResult<T>,)>>(&mut conn).await? {
227 Some((res,)) => Ok(res),
228 None => Ok(FetchResult {
229 stream: args.config.stream.clone().into_owned(),
230 entries: Vec::new(),
231 }),
232 }
233 }
234
235 pub async fn fetch_entries<T: FromRedisValue>(&self, params: &FetchParams<'_>) -> Result<FetchEntries<T>, redis::RedisError> {
244 let mut conn = self.connection();
245
246 let args = FetchParamsConfig {
249 params,
250 config: &self.config,
251 };
252 let mut cmd = Cmd::new();
253 cmd.arg(idents::XREADGROUP).arg(&args);
254
255 match cmd.query_async::<_, Option<(FetchEntries<T>,)>>(&mut conn).await? {
257 Some((res,)) => Ok(res),
258 None => Ok(FetchEntries {
259 entries: Vec::new(),
260 }),
261 }
262 }
263
264 #[inline(always)]
265 pub fn fetch_iter<'a>(&self, params: FetchParams<'a>) -> FetchIter<'a> {
269 FetchIter::new(params, self.clone())
270 }
271
272 #[inline(always)]
273 pub fn pending_iter<'a>(&self, params: PendingParams<'a>) -> PendingIter<'a> {
277 PendingIter::new(params, self.clone())
278 }
279}
280
281impl fmt::Debug for Queue {
282 #[inline]
283 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
284 fmt.debug_struct("Queue").field("name", &self.config.stream).finish()
285 }
286}