redis_queue/manager/mod.rs
1//! Queue processing and management
2//!
3//! ## Usage
4//!
5//! This module provides utilities to manage and process queue tasks
6//!
7//! Example below provides brief explanation on how to use it
8//!
9//! ```rust,no_run
10//! use redis_queue::redis;
11//! use redis_queue::{Queue, QueueConfig};
12//! use redis_queue::types::Entry;
13//! use redis_queue::manager::{Manager, ManagerConfig, ConsumerKind, RunParams, manage};
14//! use redis_queue::manager::dispatch::{Dispatch, TaskResult, TaskResultKind};
15//!
16//! use core::future::Future;
17//! use core::pin::Pin;
18//! use core::time;
19//!
20//! use tokio::sync::oneshot;
21//!
22//! ///This is dispatcher to process your tasks
23//! struct TaskDispatcher {
24//! }
25//!
26//! impl Dispatch for TaskDispatcher {
27//! type PayloadType = String;
28//! type Future = Pin<Box<dyn Future<Output = TaskResult<String>> + Send + Sync + 'static>>;
29//! fn send(&self, entry: Entry<Self::PayloadType>) -> Self::Future {
30//! //TaskResultKind enum determines how queue manage loop will handle result
31//! //For now only `TempFail` has special handling to retry task,
32//! //while other variants will just result in log entry
33//! todo!();
34//! }
35//! }
36//!
37//! async fn example() {
38//! //Limit on concurrent task number
39//! const MAX_CONCURRENT_TASK: usize = 1000;
40//! //User of your queue, will also connect to the same redis instance and use the same stream name.
41//! let config = QueueConfig {
42//! stream: "stream_name".into()
43//! };
44//! let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
45//! let conn = client.get_tokio_connection_manager().await.expect("to get connection");
46//! let queue = Queue::new(config, conn);
47//!
48//! let config = ManagerConfig {
49//! //Group is shared by all consumers
50//! group: "group".into(),
51//! //Use Single if you only need 1 manager.
52//! //Use 1 Main and Extra when deploying multiple.
53//! //Only Main/Single trims queue
54//! kind: ConsumerKind::Single,
55//! //This is unique consumer name.
56//! //Every instance of manager should have own name to avoid clashes
57//! consumer: "consumer-name".into(),
58//! //This is maximum time manager is allowed to block waiting for new messages in queue
59//! poll_time: time::Duration::from_secs(10),
60//! //This is maximum time task that temporary failed will remain in queue.
61//! //Note that it will remain longer if due to concurrency starvation
62//! //it cannot complete at least max_pending_time / poll_time retries
63//! max_pending_time: time::Duration::from_secs(60),
64//! };
65//! let manager = Manager::new(queue, config).await.expect("to create manager");
66//!
67//! let (shutdown, shutdown_recv) = oneshot::channel();
68//! let params = RunParams {
69//! manager,
70//! shutdown_recv,
71//! max_task_count: MAX_CONCURRENT_TASK,
72//! dispatcher: TaskDispatcher {
73//! }
74//! };
75//! let handle = tokio::spawn(manage(params));
76//! //Do whatever you want (like wait some signal to shutdown yourself)
77//!
78//! //then if you want to shutdown gracefully:
79//! shutdown.send(()).expect("manager lives");
80//! handle.await.expect("finish successfully");
81//! }
82//! ```
83
84use core::{fmt, time, cmp};
85use core::future::Future;
86use std::borrow::Cow;
87
88use redis::{RedisError, FromRedisValue};
89
90use crate::Queue;
91use crate::iters::{PendingIter, FetchIter};
92use crate::types::{StreamId, Range, RangeIdx, FetchParams, PendingParams, FetchType, Entry};
93
94pub mod dispatch;
95mod run;
96pub use run::{RunParams, manage};
97mod utils;
98pub use utils::*;
99
100#[derive(Debug)]
101///Possible error creating manager
102pub enum ConfigError {
103 ///Queue group name is not specified
104 QueueGroupNameMissing,
105 ///Queue group name is invalid
106 QueueGroupNameInvalid,
107 ///Queue manager name is not specified
108 QueueManagerNameMissing,
109 ///Queue manager name is invalid
110 QueueManagerNameInvalid,
111 ///Queue manager poll time is invalid
112 QueueManagerPollTimeInvalid,
113 ///Queue manager max pending time is invalid
114 QueueManagerMaxPendingTimeInvalid,
115 ///Redis error happened
116 Redis(RedisError),
117}
118
119impl fmt::Display for ConfigError {
120 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
121 match self {
122 Self::QueueGroupNameMissing => fmt.write_str("Queue group name is empty"),
123 Self::QueueGroupNameInvalid => fmt.write_str("Queue group name is not valid ASCII string."),
124 Self::QueueManagerNameMissing => fmt.write_str("Queue manager name is empty."),
125 Self::QueueManagerNameInvalid => fmt.write_str("Queue manager name is not valid ASCII string."),
126 Self::QueueManagerPollTimeInvalid => fmt.write_str("Queue manager poll time is not valid positive integer."),
127 Self::QueueManagerMaxPendingTimeInvalid => fmt.write_str("Queue manager max pending time is not valid positive integer."),
128 Self::Redis(error) => fmt.write_fmt(format_args!("Redis error: {error}")),
129 }
130 }
131}
132
133impl From<RedisError> for ConfigError {
134 #[inline(always)]
135 fn from(value: RedisError) -> Self {
136 Self::Redis(value)
137 }
138}
139
140impl std::error::Error for ConfigError {
141 #[inline]
142 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
143 match self {
144 Self::Redis(error) => Some(error),
145 _ => None,
146 }
147 }
148}
149
150#[derive(Clone, Copy, PartialEq, Eq)]
151///Describes type of consumer configured.
152///
153///Derived from name of consumer.
154///
155///When you want to scale up manager instances you should use naming scheme `<name>-<pod idx>`.
156///Then just use `ConsumerKind::determine` to infer type of consumer.
157///
158///Otherwise introduce own algorithm to master node selection
159pub enum ConsumerKind {
160 ///Single instance configuration
161 ///
162 ///This is equivalent to `Main`
163 Single,
164 ///Main instance which is configured with pod index 0.
165 ///
166 ///This instance is responsible for maintenance of queue in additional to serving tasks as
167 ///`Extra`
168 Main,
169 ///Additional instance added for scaling purposes.
170 ///
171 ///This instance is only responsible for processing queue.
172 Extra,
173}
174
175impl ConsumerKind {
176 #[inline]
177 ///Determines consumer type from its name
178 pub fn determine(name: &str) -> Self {
179 let mut split = name.rsplitn(2, '-');
180 match split.next().and_then(|idx| idx.parse::<u32>().ok()) {
181 Some(0) => Self::Main,
182 Some(_) => Self::Extra,
183 None => Self::Single,
184 }
185 }
186}
187
188#[derive(Clone)]
189///Manager configuration.
190pub struct ManagerConfig {
191 ///Group name
192 ///
193 ///All tasks will belong to this group
194 pub group: Cow<'static, str>,
195 ///Consumer name
196 ///
197 ///All tasks managed will be claimed by this consumer
198 pub consumer: Cow<'static, str>,
199 ///Consumer kind
200 pub kind: ConsumerKind,
201 ///Blocking time while awaiting for new messages.
202 pub poll_time: time::Duration,
203 ///Duration for which tasks are allowed to remain in queue in order to retry it later.
204 ///
205 ///Once tasks expires over this time, it shall be deleted from queue.
206 pub max_pending_time: time::Duration,
207}
208
209#[derive(Clone)]
210///Task Queue manager.
211pub struct Manager {
212 queue: Queue,
213 config: ManagerConfig,
214}
215
216impl Manager {
217 ///Creates new manager from configuration.
218 pub async fn new(queue: Queue, config: ManagerConfig) -> Result<Self, ConfigError> {
219 if config.poll_time.is_zero() {
220 Err(ConfigError::QueueManagerPollTimeInvalid)
221 } else if config.max_pending_time.is_zero() {
222 Err(ConfigError::QueueManagerMaxPendingTimeInvalid)
223 } else if config.group.as_ref().is_empty() {
224 Err(ConfigError::QueueGroupNameMissing)
225 } else if !config.group.as_ref().is_ascii() {
226 Err(ConfigError::QueueGroupNameInvalid)
227 } else if config.consumer.as_ref().is_empty() {
228 Err(ConfigError::QueueManagerNameMissing)
229 } else if !config.consumer.as_ref().is_ascii() {
230 Err(ConfigError::QueueManagerNameInvalid)
231 } else {
232 //Create group on start to make sure Redis config is valid for use.
233 queue.create_group(&config.group).await?;
234
235 Ok(Self {
236 queue,
237 config
238 })
239 }
240 }
241
242 #[inline(always)]
243 ///Access underlying queue
244 pub fn queue(&self) -> &Queue {
245 &self.queue
246 }
247
248 #[inline(always)]
249 ///Access manager's configuration
250 pub fn config(&self) -> &ManagerConfig {
251 &self.config
252 }
253
254 ///Returns number of re-tries current configuration should allow.
255 ///
256 ///Generally it is just `min(self.config.max_pending_time / self.config.poll_time, 1)`
257 pub fn max_pending_retry_count(&self) -> u64 {
258 if self.config.max_pending_time > self.config.poll_time {
259 let result =
260 self.config.max_pending_time.as_secs_f64() / self.config.poll_time.as_secs_f64();
261 //We always have minimum of 1 retry
262 cmp::min(result.round() as u64, 1)
263 } else {
264 //This is generally invalid configuration, but just for the sake being safe
265 1
266 }
267 }
268
269 ///Creates iterator of pending entries in queue.
270 ///
271 ///`last_id` can be used to specify from where to continue for iteration purpose.
272 pub fn pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
273 let range = Range {
274 start: match last_id {
275 Some(last_id) => RangeIdx::ExcludeId(last_id),
276 None => RangeIdx::Any,
277 },
278 end: RangeIdx::Any,
279 };
280 let params = PendingParams {
281 group: self.config.group.as_ref(),
282 consumer: Some(self.config.consumer.as_ref()),
283 range,
284 idle: None,
285 count,
286 };
287
288 PendingIter::new(params, self.queue.clone())
289 }
290
291 ///Creates iterator of expired entries in queue
292 ///
293 ///`last_id` can be used to specify from where to continue for iteration purpose.
294 pub fn expired_pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
295 let range = Range {
296 start: match last_id {
297 Some(last_id) => RangeIdx::ExcludeId(last_id),
298 None => RangeIdx::Any,
299 },
300 end: RangeIdx::Any,
301 };
302 let params = PendingParams {
303 group: self.config.group.as_ref(),
304 consumer: Some(self.config.consumer.as_ref()),
305 range,
306 idle: Some(self.config.max_pending_time),
307 count,
308 };
309
310 PendingIter::new(params, self.queue.clone())
311 }
312
313 ///Creates iterator over new tasks within queue
314 pub fn fetch_new_tasks(&self, count: usize) -> FetchIter {
315 let params = FetchParams {
316 group: self.config.group.as_ref(),
317 consumer: self.config.consumer.as_ref(),
318 typ: FetchType::New,
319 count,
320 timeout: Some(self.config.poll_time),
321 };
322
323 FetchIter::new(params, self.queue.clone())
324 }
325
326 ///Retrieves task entry by `id`
327 ///
328 ///## Implementation
329 ///
330 ///Due to Redis not providing any method to get message by id, we have to emulate it by doing
331 ///query for message after `id - 1` to fetch message by `id`.
332 ///
333 ///If message is no longer exist, we return `None`.
334 ///
335 ///Note that when reading pending message data, there is no timeout possible
336 ///If there is no message, it will return `None`
337 pub async fn get_pending_by_id<T: FromRedisValue>(&self, id: StreamId) -> Result<Option<Entry<T>>, RedisError> {
338 let mut iter = self.fetch_new_tasks(1);
339 iter.set_cursor(FetchType::After(id.prev()));
340 let mut result = iter.next_entries().await?;
341 if let Some(item) = result.pop() {
342 if item.id != id {
343 Ok(None)
344 } else {
345 Ok(Some(item))
346 }
347 } else {
348 Ok(None)
349 }
350 }
351
352 #[inline]
353 ///Consumes tasks by specified IDs.
354 ///
355 ///If error is returned, `tasks` modified with cleaned IDs removed.
356 pub async fn consume_tasks(&self, tasks: &[StreamId]) -> Result<usize, RedisError> {
357 self.queue.consume(&self.config.group, tasks).await
358 }
359
360 #[inline(always)]
361 ///Performs queue trimming removing all tasks that are consumed
362 fn trim_queue(&self, retry_num: u32) -> impl Future<Output = ()> + Send + 'static {
363 //This gives this future priority by disable scheduler within tokio.
364 //
365 //We should finish it as soon as possible and only call on demand
366 tokio::task::unconstrained(trim_queue_task(
367 self.queue().clone(),
368 self.config().kind,
369 retry_num,
370 ))
371 }
372}