1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
//! Queue processing and management
//!
//! ## Usage
//!
//! This module provides utilities to manage and process queue tasks
//!
//! Example below provides brief explanation on how to use it
//!
//! ```rust,no_run
//! use redis_queue::redis;
//! use redis_queue::{Queue, QueueConfig};
//! use redis_queue::types::Entry;
//! use redis_queue::manager::{Manager, ManagerConfig, ConsumerKind, RunParams, manage};
//! use redis_queue::manager::dispatch::{Dispatch, TaskResult, TaskResultKind};
//!
//! use core::future::Future;
//! use core::pin::Pin;
//! use core::time;
//!
//! use tokio::sync::oneshot;
//!
//! ///This is dispatcher to process your tasks
//! struct TaskDispatcher {
//! }
//!
//! impl Dispatch for TaskDispatcher {
//!     type PayloadType = String;
//!     type Future = Pin<Box<dyn Future<Output = TaskResult<String>> + Send + Sync + 'static>>;
//!     fn send(&self, entry: Entry<Self::PayloadType>) -> Self::Future {
//!         //TaskResultKind enum determines how queue manage loop will handle result
//!         //For now only `TempFail` has special handling to retry task,
//!         //while other variants will just result in log entry
//!         todo!();
//!     }
//! }
//!
//! async fn example() {
//!     //Limit on concurrent task number
//!     const MAX_CONCURRENT_TASK: usize = 1000;
//!     //User of your queue, will also connect to the same redis instance and use the same stream name.
//!     let config = QueueConfig {
//!         stream: "stream_name".into()
//!     };
//!     let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
//!     let conn = client.get_tokio_connection_manager().await.expect("to get connection");
//!     let queue = Queue::new(config, conn);
//!
//!     let config = ManagerConfig {
//!         //Group is shared by all consumers
//!         group: "group".into(),
//!         //Use Single if you only need 1 manager.
//!         //Use 1 Main and Extra when deploying multiple.
//!         //Only Main/Single trims queue
//!         kind: ConsumerKind::Single,
//!         //This is unique consumer name.
//!         //Every instance of manager should have own name to avoid clashes
//!         consumer: "consumer-name".into(),
//!         //This is maximum time manager is allowed to block waiting for new messages in queue
//!         poll_time: time::Duration::from_secs(10),
//!         //This is maximum time task that temporary failed will remain in queue.
//!         //Note that it will remain longer if due to concurrency starvation
//!         //it cannot complete at least max_pending_time / poll_time retries
//!         max_pending_time: time::Duration::from_secs(60),
//!     };
//!     let manager = Manager::new(queue, config).await.expect("to create manager");
//!
//!     let (shutdown, shutdown_recv) = oneshot::channel();
//!     let params = RunParams {
//!         manager,
//!         shutdown_recv,
//!         max_task_count: MAX_CONCURRENT_TASK,
//!         dispatcher: TaskDispatcher {
//!         }
//!     };
//!     let handle = tokio::spawn(manage(params));
//!     //Do whatever you want (like wait some signal to shutdown yourself)
//!
//!     //then if you want to shutdown gracefully:
//!     shutdown.send(()).expect("manager lives");
//!     handle.await.expect("finish successfully");
//! }
//! ```

use core::{fmt, time, cmp};
use core::future::Future;
use std::borrow::Cow;

use redis::{RedisError, FromRedisValue};

use crate::Queue;
use crate::iters::{PendingIter, FetchIter};
use crate::types::{StreamId, Range, RangeIdx, FetchParams, PendingParams, FetchType, Entry};

pub mod dispatch;
mod run;
pub use run::{RunParams, manage};
mod utils;
pub use utils::*;

#[derive(Debug)]
///Possible error creating manager
pub enum ConfigError {
    ///Queue group name is not specified
    QueueGroupNameMissing,
    ///Queue group name is invalid
    QueueGroupNameInvalid,
    ///Queue manager name is not specified
    QueueManagerNameMissing,
    ///Queue manager name is invalid
    QueueManagerNameInvalid,
    ///Queue manager poll time is invalid
    QueueManagerPollTimeInvalid,
    ///Queue manager max pending time is invalid
    QueueManagerMaxPendingTimeInvalid,
    ///Redis error happened
    Redis(RedisError),
}

impl fmt::Display for ConfigError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::QueueGroupNameMissing => fmt.write_str("Queue group name is empty"),
            Self::QueueGroupNameInvalid => fmt.write_str("Queue group name is not valid ASCII string."),
            Self::QueueManagerNameMissing => fmt.write_str("Queue manager name is empty."),
            Self::QueueManagerNameInvalid => fmt.write_str("Queue manager name is not valid ASCII string."),
            Self::QueueManagerPollTimeInvalid => fmt.write_str("Queue manager poll time is not valid positive integer."),
            Self::QueueManagerMaxPendingTimeInvalid => fmt.write_str("Queue manager max pending time is not valid positive integer."),
            Self::Redis(error) => fmt.write_fmt(format_args!("Redis error: {error}")),
        }
    }
}

impl From<RedisError> for ConfigError {
    #[inline(always)]
    fn from(value: RedisError) -> Self {
        Self::Redis(value)
    }
}

impl std::error::Error for ConfigError {
    #[inline]
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            Self::Redis(error) => Some(error),
            _ => None,
        }
    }
}

#[derive(Clone, Copy, PartialEq, Eq)]
///Describes type of consumer configured.
///
///Derived from name of consumer.
///
///When you want to scale up manager instances you should use naming scheme `<name>-<pod idx>`.
///Then just use `ConsumerKind::determine` to infer type of consumer.
///
///Otherwise introduce own algorithm to master node selection
pub enum ConsumerKind {
    ///Single instance configuration
    ///
    ///This is equivalent to `Main`
    Single,
    ///Main instance which is configured with pod index 0.
    ///
    ///This instance is responsible for maintenance of queue in additional to serving tasks as
    ///`Extra`
    Main,
    ///Additional instance added for scaling purposes.
    ///
    ///This instance is only responsible for processing queue.
    Extra,
}

impl ConsumerKind {
    #[inline]
    ///Determines consumer type from its name
    pub fn determine(name: &str) -> Self {
        let mut split = name.rsplitn(2, '-');
        match split.next().and_then(|idx| idx.parse::<u32>().ok()) {
            Some(0) => Self::Main,
            Some(_) => Self::Extra,
            None => Self::Single,
        }
    }
}

#[derive(Clone)]
///Manager configuration.
pub struct ManagerConfig {
    ///Group name
    ///
    ///All tasks will belong to this group
    pub group: Cow<'static, str>,
    ///Consumer name
    ///
    ///All tasks managed will be claimed by this consumer
    pub consumer: Cow<'static, str>,
    ///Consumer kind
    pub kind: ConsumerKind,
    ///Blocking time while awaiting for new messages.
    pub poll_time: time::Duration,
    ///Duration for which tasks are allowed to remain in queue in order to retry it later.
    ///
    ///Once tasks expires over this time, it shall be deleted from queue.
    pub max_pending_time: time::Duration,
}

#[derive(Clone)]
///Task Queue manager.
pub struct Manager {
    queue: Queue,
    config: ManagerConfig,
}

impl Manager {
    ///Creates new manager from configuration.
    pub async fn new(queue: Queue, config: ManagerConfig) -> Result<Self, ConfigError> {
        if config.poll_time.is_zero() {
            Err(ConfigError::QueueManagerPollTimeInvalid)
        } else if config.max_pending_time.is_zero() {
            Err(ConfigError::QueueManagerMaxPendingTimeInvalid)
        } else if config.group.as_ref().is_empty() {
            Err(ConfigError::QueueGroupNameMissing)
        } else if !config.group.as_ref().is_ascii() {
            Err(ConfigError::QueueGroupNameInvalid)
        } else if config.consumer.as_ref().is_empty() {
            Err(ConfigError::QueueManagerNameMissing)
        } else if !config.consumer.as_ref().is_ascii() {
            Err(ConfigError::QueueManagerNameInvalid)
        } else {
            //Create group on start to make sure Redis config is valid for use.
            queue.create_group(&config.group).await?;

            Ok(Self {
                queue,
                config
            })
        }
    }

    #[inline(always)]
    ///Access underlying queue
    pub fn queue(&self) -> &Queue {
        &self.queue
    }

    #[inline(always)]
    ///Access manager's configuration
    pub fn config(&self) -> &ManagerConfig {
        &self.config
    }

    ///Returns number of re-tries current configuration should allow.
    ///
    ///Generally it is just `min(self.config.max_pending_time / self.config.poll_time, 1)`
    pub fn max_pending_retry_count(&self) -> u64 {
        if self.config.max_pending_time > self.config.poll_time {
            let result =
                self.config.max_pending_time.as_secs_f64() / self.config.poll_time.as_secs_f64();
            //We always have minimum of 1 retry
            cmp::min(result.round() as u64, 1)
        } else {
            //This is generally invalid configuration, but just for the sake being safe
            1
        }
    }

    ///Creates iterator of pending entries in queue.
    ///
    ///`last_id` can be used to specify from where to continue for iteration purpose.
    pub fn pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
        let range = Range {
            start: match last_id {
                Some(last_id) => RangeIdx::ExcludeId(last_id),
                None => RangeIdx::Any,
            },
            end: RangeIdx::Any,
        };
        let params = PendingParams {
            group: self.config.group.as_ref(),
            consumer: Some(self.config.consumer.as_ref()),
            range,
            idle: None,
            count,
        };

        PendingIter::new(params, self.queue.clone())
    }

    ///Creates iterator of expired entries in queue
    ///
    ///`last_id` can be used to specify from where to continue for iteration purpose.
    pub fn expired_pending_tasks(&self, count: usize, last_id: Option<StreamId>) -> PendingIter<'_> {
        let range = Range {
            start: match last_id {
                Some(last_id) => RangeIdx::ExcludeId(last_id),
                None => RangeIdx::Any,
            },
            end: RangeIdx::Any,
        };
        let params = PendingParams {
            group: self.config.group.as_ref(),
            consumer: Some(self.config.consumer.as_ref()),
            range,
            idle: Some(self.config.max_pending_time),
            count,
        };

        PendingIter::new(params, self.queue.clone())
    }

    ///Creates iterator over new tasks within queue
    pub fn fetch_new_tasks(&self, count: usize) -> FetchIter {
        let params = FetchParams {
            group: self.config.group.as_ref(),
            consumer: self.config.consumer.as_ref(),
            typ: FetchType::New,
            count,
            timeout: Some(self.config.poll_time),
        };

        FetchIter::new(params, self.queue.clone())
    }

    ///Retrieves task entry by `id`
    ///
    ///## Implementation
    ///
    ///Due to Redis not providing any method to get message by id, we have to emulate it by doing
    ///query for message after `id - 1` to fetch message by `id`.
    ///
    ///If message is no longer exist, we return `None`.
    ///
    ///Note that when reading pending message data, there is no timeout possible
    ///If there is no message, it will return `None`
    pub async fn get_pending_by_id<T: FromRedisValue>(&self, id: StreamId) -> Result<Option<Entry<T>>, RedisError> {
        let mut iter = self.fetch_new_tasks(1);
        iter.set_cursor(FetchType::After(id.prev()));
        let mut result = iter.next_entries().await?;
        if let Some(item) = result.pop() {
            if item.id != id {
                Ok(None)
            } else {
                Ok(Some(item))
            }
        } else {
            Ok(None)
        }
    }

    #[inline]
    ///Consumes tasks by specified IDs.
    ///
    ///If error is returned, `tasks` modified with cleaned IDs removed.
    pub async fn consume_tasks(&self, tasks: &[StreamId]) -> Result<usize, RedisError> {
        self.queue.consume(&self.config.group, tasks).await
    }

    #[inline(always)]
    ///Performs queue trimming removing all tasks that are consumed
    fn trim_queue(&self, retry_num: u32) -> impl Future<Output = ()> + Send + 'static {
        //This gives this future priority by disable scheduler within tokio.
        //
        //We should finish it as soon as possible and only call on demand
        tokio::task::unconstrained(trim_queue_task(
            self.queue().clone(),
            self.config().kind,
            retry_num,
        ))
    }
}