redis_queue/manager/
mod.rs

1//! Queue processing and management
2//!
3//! ## Usage
4//!
5//! This module provides utilities to manage and process queue tasks
6//!
7//! Example below provides brief explanation on how to use it
8//!
9//! ```rust,no_run
10//! use redis_queue::redis;
11//! use redis_queue::{Queue, QueueConfig};
12//! use redis_queue::types::Entry;
13//! use redis_queue::manager::{Manager, ManagerConfig, ConsumerKind, RunParams, manage};
14//! use redis_queue::manager::dispatch::{Dispatch, TaskResult, TaskResultKind};
15//!
16//! use core::future::Future;
17//! use core::pin::Pin;
18//! use core::time;
19//!
20//! use tokio::sync::oneshot;
21//!
22//! ///This is dispatcher to process your tasks
23//! struct TaskDispatcher {
24//! }
25//!
26//! impl Dispatch for TaskDispatcher {
27//!     type PayloadType = String;
28//!     type Future = Pin<Box<dyn Future<Output = TaskResult<String>> + Send + Sync + 'static>>;
29//!     fn send(&self, entry: Entry<Self::PayloadType>) -> Self::Future {
30//!         //TaskResultKind enum determines how queue manage loop will handle result
31//!         //For now only `TempFail` has special handling to retry task,
32//!         //while other variants will just result in log entry
33//!         todo!();
34//!     }
35//! }
36//!
37//! async fn example() {
38//!     //Limit on concurrent task number
39//!     const MAX_CONCURRENT_TASK: usize = 1000;
40//!     //User of your queue, will also connect to the same redis instance and use the same stream name.
41//!     let config = QueueConfig {
42//!         stream: "stream_name".into()
43//!     };
44//!     let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
45//!     let conn = client.get_tokio_connection_manager().await.expect("to get connection");
46//!     let queue = Queue::new(config, conn);
47//!
48//!     let config = ManagerConfig {
49//!         //Group is shared by all consumers
50//!         group: "group".into(),
51//!         //Use Single if you only need 1 manager.
52//!         //Use 1 Main and Extra when deploying multiple.
53//!         //Only Main/Single trims queue
54//!         kind: ConsumerKind::Single,
55//!         //This is unique consumer name.
56//!         //Every instance of manager should have own name to avoid clashes
57//!         consumer: "consumer-name".into(),
58//!         //This is maximum time manager is allowed to block waiting for new messages in queue
59//!         poll_time: time::Duration::from_secs(10),
60//!         //This is maximum time task that temporary failed will remain in queue.
61//!         //Note that it will remain longer if due to concurrency starvation
62//!         //it cannot complete at least max_pending_time / poll_time retries
63//!         max_pending_time: time::Duration::from_secs(60),
64//!     };
65//!     let manager = Manager::new(queue, config).await.expect("to create manager");
66//!
67//!     let (shutdown, shutdown_recv) = oneshot::channel();
68//!     let params = RunParams {
69//!         manager,
70//!         shutdown_recv,
71//!         max_task_count: MAX_CONCURRENT_TASK,
72//!         dispatcher: TaskDispatcher {
73//!         }
74//!     };
75//!     let handle = tokio::spawn(manage(params));
76//!     //Do whatever you want (like wait some signal to shutdown yourself)
77//!
78//!     //then if you want to shutdown gracefully:
79//!     shutdown.send(()).expect("manager lives");
80//!     handle.await.expect("finish successfully");
81//! }
82//! ```
83
84use core::{fmt, time, cmp};
85use core::future::Future;
86use std::borrow::Cow;
87
88use redis::{RedisError, FromRedisValue};
89
90use crate::Queue;
91use crate::iters::{PendingIter, FetchIter};
92use crate::types::{StreamId, Range, RangeIdx, FetchParams, PendingParams, FetchType, Entry};
93
94pub mod dispatch;
95mod run;
96pub use run::{RunParams, manage};
97mod utils;
98pub use utils::*;
99
100#[derive(Debug)]
101///Possible error creating manager
102pub enum ConfigError {
103    ///Queue group name is not specified
104    QueueGroupNameMissing,
105    ///Queue group name is invalid
106    QueueGroupNameInvalid,
107    ///Queue manager name is not specified
108    QueueManagerNameMissing,
109    ///Queue manager name is invalid
110    QueueManagerNameInvalid,
111    ///Queue manager poll time is invalid
112    QueueManagerPollTimeInvalid,
113    ///Queue manager max pending time is invalid
114    QueueManagerMaxPendingTimeInvalid,
115    ///Redis error happened
116    Redis(RedisError),
117}
118
119impl fmt::Display for ConfigError {
120    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
121        match self {
122            Self::QueueGroupNameMissing => fmt.write_str("Queue group name is empty"),
123            Self::QueueGroupNameInvalid => fmt.write_str("Queue group name is not valid ASCII string."),
124            Self::QueueManagerNameMissing => fmt.write_str("Queue manager name is empty."),
125            Self::QueueManagerNameInvalid => fmt.write_str("Queue manager name is not valid ASCII string."),
126            Self::QueueManagerPollTimeInvalid => fmt.write_str("Queue manager poll time is not valid positive integer."),
127            Self::QueueManagerMaxPendingTimeInvalid => fmt.write_str("Queue manager max pending time is not valid positive integer."),
128            Self::Redis(error) => fmt.write_fmt(format_args!("Redis error: {error}")),
129        }
130    }
131}
132
133impl From<RedisError> for ConfigError {
134    #[inline(always)]
135    fn from(value: RedisError) -> Self {
136        Self::Redis(value)
137    }
138}
139
140impl std::error::Error for ConfigError {
141    #[inline]
142    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
143        match self {
144            Self::Redis(error) => Some(error),
145            _ => None,
146        }
147    }
148}
149
150#[derive(Clone, Copy, PartialEq, Eq)]
151///Describes type of consumer configured.
152///
153///Derived from name of consumer.
154///
155///When you want to scale up manager instances you should use naming scheme `<name>-<pod idx>`.
156///Then just use `ConsumerKind::determine` to infer type of consumer.
157///
158///Otherwise introduce own algorithm to master node selection
159pub enum ConsumerKind {
160    ///Single instance configuration
161    ///
162    ///This is equivalent to `Main`
163    Single,
164    ///Main instance which is configured with pod index 0.
165    ///
166    ///This instance is responsible for maintenance of queue in additional to serving tasks as
167    ///`Extra`
168    Main,
169    ///Additional instance added for scaling purposes.
170    ///
171    ///This instance is only responsible for processing queue.
172    Extra,
173}
174
175impl ConsumerKind {
176    #[inline]
177    ///Determines consumer type from its name
178    pub fn determine(name: &str) -> Self {
179        let mut split = name.rsplitn(2, '-');
180        match split.next().and_then(|idx| idx.parse::<u32>().ok()) {
181            Some(0) => Self::Main,
182            Some(_) => Self::Extra,
183            None => Self::Single,
184        }
185    }
186}
187
188#[derive(Clone)]
189///Manager configuration.
190pub struct ManagerConfig {
191    ///Group name
192    ///
193    ///All tasks will belong to this group
194    pub group: Cow<'static, str>,
195    ///Consumer name
196    ///
197    ///All tasks managed will be claimed by this consumer
198    pub consumer: Cow<'static, str>,
199    ///Consumer kind
200    pub kind: ConsumerKind,
201    ///Blocking time while awaiting for new messages.
202    pub poll_time: time::Duration,
203    ///Duration for which tasks are allowed to remain in queue in order to retry it later.
204    ///
205    ///Once tasks expires over this time, it shall be deleted from queue.
206    pub max_pending_time: time::Duration,
207}
208
209#[derive(Clone)]
210///Task Queue manager.
211pub struct Manager {
212    queue: Queue,
213    config: ManagerConfig,
214}
215
216impl Manager {
217    ///Creates new manager from configuration.
218    pub async fn new(queue: Queue, config: ManagerConfig) -> Result<Self, ConfigError> {
219        if config.poll_time.is_zero() {
220            Err(ConfigError::QueueManagerPollTimeInvalid)
221        } else if config.max_pending_time.is_zero() {
222            Err(ConfigError::QueueManagerMaxPendingTimeInvalid)
223        } else if config.group.as_ref().is_empty() {
224            Err(ConfigError::QueueGroupNameMissing)
225        } else if !config.group.as_ref().is_ascii() {
226            Err(ConfigError::QueueGroupNameInvalid)
227        } else if config.consumer.as_ref().is_empty() {
228            Err(ConfigError::QueueManagerNameMissing)
229        } else if !config.consumer.as_ref().is_ascii() {
230            Err(ConfigError::QueueManagerNameInvalid)
231        } else {
232            //Create group on start to make sure Redis config is valid for use.
233            queue.create_group(&config.group).await?;
234
235            Ok(Self {
236                queue,
237                config
238            })
239        }
240    }
241
242    #[inline(always)]
243    ///Access underlying queue
244    pub fn queue(&self) -> &Queue {
245        &self.queue
246    }
247
248    #[inline(always)]
249    ///Access manager's configuration
250    pub fn config(&self) -> &ManagerConfig {
251        &self.config
252    }
253
254    ///Returns number of re-tries current configuration should allow.
255    ///
256    ///Generally it is just `min(self.config.max_pending_time / self.config.poll_time, 1)`
257    pub fn max_pending_retry_count(&self) -> u64 {
258        if self.config.max_pending_time > self.config.poll_time {
259            let result =
260                self.config.max_pending_time.as_secs_f64() / self.config.poll_time.as_secs_f64();
261            //We always have minimum of 1 retry
262            cmp::min(result.round() as u64, 1)
263        } else {
264            //This is generally invalid configuration, but just for the sake being safe
265            1
266        }
267    }
268
269    ///Creates iterator of pending entries in queue.
270    ///
271    ///`last_id` can be used to specify from where to continue for iteration purpose.
272    pub fn pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
273        let range = Range {
274            start: match last_id {
275                Some(last_id) => RangeIdx::ExcludeId(last_id),
276                None => RangeIdx::Any,
277            },
278            end: RangeIdx::Any,
279        };
280        let params = PendingParams {
281            group: self.config.group.as_ref(),
282            consumer: Some(self.config.consumer.as_ref()),
283            range,
284            idle: None,
285            count,
286        };
287
288        PendingIter::new(params, self.queue.clone())
289    }
290
291    ///Creates iterator of expired entries in queue
292    ///
293    ///`last_id` can be used to specify from where to continue for iteration purpose.
294    pub fn expired_pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
295        let range = Range {
296            start: match last_id {
297                Some(last_id) => RangeIdx::ExcludeId(last_id),
298                None => RangeIdx::Any,
299            },
300            end: RangeIdx::Any,
301        };
302        let params = PendingParams {
303            group: self.config.group.as_ref(),
304            consumer: Some(self.config.consumer.as_ref()),
305            range,
306            idle: Some(self.config.max_pending_time),
307            count,
308        };
309
310        PendingIter::new(params, self.queue.clone())
311    }
312
313    ///Creates iterator over new tasks within queue
314    pub fn fetch_new_tasks(&self, count: usize) -> FetchIter {
315        let params = FetchParams {
316            group: self.config.group.as_ref(),
317            consumer: self.config.consumer.as_ref(),
318            typ: FetchType::New,
319            count,
320            timeout: Some(self.config.poll_time),
321        };
322
323        FetchIter::new(params, self.queue.clone())
324    }
325
326    ///Retrieves task entry by `id`
327    ///
328    ///## Implementation
329    ///
330    ///Due to Redis not providing any method to get message by id, we have to emulate it by doing
331    ///query for message after `id - 1` to fetch message by `id`.
332    ///
333    ///If message is no longer exist, we return `None`.
334    ///
335    ///Note that when reading pending message data, there is no timeout possible
336    ///If there is no message, it will return `None`
337    pub async fn get_pending_by_id<T: FromRedisValue>(&self, id: StreamId) -> Result<Option<Entry<T>>, RedisError> {
338        let mut iter = self.fetch_new_tasks(1);
339        iter.set_cursor(FetchType::After(id.prev()));
340        let mut result = iter.next_entries().await?;
341        if let Some(item) = result.pop() {
342            if item.id != id {
343                Ok(None)
344            } else {
345                Ok(Some(item))
346            }
347        } else {
348            Ok(None)
349        }
350    }
351
352    #[inline]
353    ///Consumes tasks by specified IDs.
354    ///
355    ///If error is returned, `tasks` modified with cleaned IDs removed.
356    pub async fn consume_tasks(&self, tasks: &[StreamId]) -> Result<usize, RedisError> {
357        self.queue.consume(&self.config.group, tasks).await
358    }
359
360    #[inline(always)]
361    ///Performs queue trimming removing all tasks that are consumed
362    fn trim_queue(&self, retry_num: u32) -> impl Future<Output = ()> + Send + 'static {
363        //This gives this future priority by disable scheduler within tokio.
364        //
365        //We should finish it as soon as possible and only call on demand
366        tokio::task::unconstrained(trim_queue_task(
367            self.queue().clone(),
368            self.config().kind,
369            retry_num,
370        ))
371    }
372}