redis_queue/manager/
run.rs

1use crate::manager::Manager;
2use crate::manager::dispatch::{Dispatch, TaskResult};
3
4use core::future::Future;
5use core::{cmp, time};
6use std::time::Instant;
7
8use tokio::sync::oneshot;
9use redis::RedisError;
10
11const TIMEOUT_INTERVAL: time::Duration = time::Duration::from_secs(5);
12
13///Scheduler to control manager loop
14struct TimeScheduler {
15    timeout_interval: time::Duration,
16    timeout_limit: time::Duration,
17    on_error_timeout: time::Duration,
18}
19
20impl TimeScheduler {
21    #[inline(always)]
22    ///Creates new instance with specified `timeout_limit`
23    ///
24    ///All timeouts returned by this scheduler will be limited to this limit
25    pub fn new(timeout_limit: time::Duration) -> Self {
26        Self {
27            timeout_interval: TIMEOUT_INTERVAL,
28            on_error_timeout: TIMEOUT_INTERVAL,
29            timeout_limit,
30        }
31    }
32
33    #[inline(always)]
34    ///Reports Redis working
35    pub fn on_redis_recovery(&mut self) {
36        self.on_error_timeout = self.timeout_interval;
37    }
38
39    #[inline(always)]
40    ///Reports new redis error, returning timeout to sleep for future retry if re-try is possible.
41    pub fn next_redis_error(&mut self, error: RedisError) -> Result<time::Duration, RedisError> {
42        if error.is_timeout() || error.is_connection_refusal() || error.is_connection_dropped() {
43            tracing::info!("Redis temporary unavailable: {error}");
44            //increase by `timeout_interval` and cap by `timeout_limit`
45            self.on_error_timeout = cmp::min(self.timeout_limit, self.on_error_timeout + self.timeout_interval);
46            Ok(self.on_error_timeout)
47        } else {
48            Err(error)
49        }
50    }
51}
52
53///Parameters for `manage` function
54pub struct RunParams<T> {
55    ///Manager
56    pub manager: Manager,
57    ///Shutdown channel
58    pub shutdown_recv: oneshot::Receiver<()>,
59    ///Maximum number of new tasks to add for execution.
60    ///
61    ///If queue has more tasks than this number, it will try to complete these tasks first before
62    ///trying to fetch again.
63    ///Once it exceeds poll_time, it stops fetching and goes for next iteration.
64    pub max_task_count: usize,
65    ///Dispatcher for incoming messages
66    pub dispatcher: T,
67}
68
69#[tracing::instrument(skip(params), fields(consumer = params.manager.config().consumer.as_ref()))]
70///Starts main loop using provided parameters.
71pub async fn manage<T: Dispatch>(params: RunParams<T>) where T::Future: Future<Output = TaskResult<T::PayloadType>> {
72    let RunParams { manager, mut shutdown_recv, max_task_count, dispatcher } = params;
73
74    let mut scheduler = TimeScheduler::new(manager.config().poll_time);
75
76    let max_retry = manager.max_pending_retry_count();
77
78    let mut expired_tasks = manager.expired_pending_tasks(max_task_count, None);
79    let mut fetch_new_tasks = manager.fetch_new_tasks(max_task_count);
80
81    let mut ongoing_tasks = Vec::new();
82    let mut completed_tasks = Vec::new();
83    let mut consumed_tasks_number = 0usize;
84
85    //Do initial cleanup before starting processing tasks
86    manager.trim_queue(10).await;
87
88    'main: loop {
89        ///Generates error handling code which uses `TimeScheduler` to decide next timeout if redis
90        ///error indicates ability to retry.
91        ///
92        ///If error cannot be handled, it breaks 'main loop, exiting this function
93        ///
94        ///- `error` is identifier with variable of error
95        ///- `ok` optional label to specify loop label to continue after sleep.
96        macro_rules! on_redis_error {
97            ($error:ident where OK=$($ok:tt)*) => {
98                match scheduler.next_redis_error($error) {
99                    Ok(sleep) => {
100                        tracing::info!("Retry in {}s", sleep.as_secs());
101                        tokio::time::sleep(sleep).await;
102                        continue $($ok)*;
103                    }
104                    Err(error) => {
105                        tracing::error!("Redis queue cannot be processed: {error}");
106                        //We always exit loop on fatal error as it means Redis is not usable
107                        break 'main;
108                    }
109                }
110            }
111        }
112
113        if let Err(error) = expired_tasks.set_range_until_now().await {
114            on_redis_error!(error where OK='main);
115        } else {
116            scheduler.on_redis_recovery();
117        }
118
119        //Consume expired tasks, if any
120        'expired_tasks: loop {
121            match expired_tasks.next().await {
122                Ok(tasks) if tasks.is_empty() => {
123                    break 'expired_tasks;
124                }
125                Ok(tasks) => {
126                    //filter out all tasks that are not tried enough times for whatever reason.
127                    //Generally we should allow following number of tries: max_pending_time / poll_time or at least 1+ attempt
128                    let tasks = tasks.iter().filter(|entry| entry.count > max_retry).map(|entry| entry.id).collect::<Vec<_>>();
129                    if tasks.is_empty() {
130                        break 'expired_tasks;
131                    }
132
133                    if let Err(error) = manager.consume_tasks(&tasks).await {
134                        on_redis_error!(error where OK=);
135                    } else {
136                        break 'expired_tasks;
137                    }
138                }
139                Err(error) => on_redis_error!(error where OK=),
140            }
141        } //'expired_tasks
142
143        scheduler.on_redis_recovery();
144
145        #[allow(clippy::never_loop)]
146        //Re-visit failed tasks to see if we should re-try new ones
147        'failed_tasks: loop {
148            let mut pending = manager.pending_tasks(max_task_count, None);
149            //IDLE time should be limited to avoid re-trying too much
150            pending.set_idle(manager.config().poll_time);
151            'failed_tasks_end_range: loop {
152                if let Err(error) = pending.set_range_until_now().await {
153                    on_redis_error!(error where OK='failed_tasks_end_range);
154                } else {
155                    scheduler.on_redis_recovery();
156                    break 'failed_tasks_end_range;
157                }
158            }
159
160            'failed_tasks_fetch: loop {
161                match pending.next().await {
162                    Ok(tasks) if tasks.is_empty() => break 'failed_tasks,
163                    Ok(tasks) => {
164                        for task in tasks {
165                            match manager.get_pending_by_id(task.id).await {
166                                Ok(Some(task)) => ongoing_tasks.push(dispatcher.send(task)),
167                                Ok(None) => (),
168                                Err(error) => on_redis_error!(error where OK=),
169                            }
170                        }
171                    }
172                    Err(error) => on_redis_error!(error where OK='failed_tasks_fetch),
173                }
174            }
175        } //'failed_tasks
176
177        scheduler.on_redis_recovery();
178
179        let new_tasks_started = Instant::now();
180        fetch_new_tasks.set_timeout(manager.config().poll_time);
181        fetch_new_tasks.set_count(max_task_count);
182        let mut new_tasks_cap = max_task_count;
183
184        macro_rules! process_tasks {
185            () => {
186                for ongoing in ongoing_tasks.drain(..) {
187                    let result = ongoing.await;
188                    tracing::debug!("task(redis={}, user_id={}): {:?}", result.data.id, result.data.value.id, result.kind);
189                    if !result.kind.is_need_retry() {
190                        completed_tasks.push(result.data.id);
191                    }
192                }
193                //Clean up all completed tasks
194                if !completed_tasks.is_empty() {
195                    'completed_tasks: loop {
196                        match manager.consume_tasks(&completed_tasks).await {
197                            Ok(_) => {
198                                tracing::info!("Completed {} tasks", completed_tasks.len());
199                                consumed_tasks_number = consumed_tasks_number.saturating_add(completed_tasks.len());
200                                completed_tasks.clear();
201                                break 'completed_tasks;
202                            }
203                            Err(error) => on_redis_error!(error where OK='completed_tasks),
204                        }
205                    }
206                    scheduler.on_redis_recovery();
207                }
208            };
209        }
210
211        //Fetch all new tasks available
212        'new_tasks: loop {
213            match fetch_new_tasks.next_entries().await {
214                Ok(tasks) if tasks.is_empty() => {
215                    break;
216                }
217                Ok(tasks) => {
218                    tracing::info!("Fetched {} tasks", tasks.len());
219                    let timestamp = 'new_tasks_now: loop {
220                        match manager.queue().time().await {
221                            Ok(timestamp) => break timestamp,
222                            Err(error) => on_redis_error!(error where OK='new_tasks_now),
223                        }
224                    };
225
226                    for task in tasks.into_iter() {
227                        //If task is scheduler in future, it's timestamp of id will be greater than
228                        //current redis time
229                        if task.id.as_timestamp() <= timestamp {
230                            new_tasks_cap = new_tasks_cap.saturating_sub(1);
231                            ongoing_tasks.push(dispatcher.send(task));
232                        } else {
233                            tracing::debug!("task(id={}) scheduled in future. Current time={}", task.id, timestamp.as_millis());
234                        }
235                    }
236
237                    if new_tasks_cap == 0 {
238                        //Being capped we should start executing tasks immediately
239                        process_tasks!();
240                        new_tasks_cap = max_task_count;
241                    }
242
243                    //After that we check if there is still some time within poll interval to do more work
244                    let elapsed = new_tasks_started.elapsed();
245                    if let Some(new_timeout) = manager.config().poll_time.checked_sub(elapsed) {
246                        //Once left over time is below 1 second, there is no need to poll further, wait for next iteration
247                        if new_timeout.as_secs() == 0 {
248                            break 'new_tasks;
249                        }
250
251                        fetch_new_tasks.set_timeout(new_timeout);
252                        fetch_new_tasks.set_count(new_tasks_cap);
253                    } else {
254                        break 'new_tasks;
255                    }
256                }
257                Err(error) => on_redis_error!(error where OK='new_tasks),
258            }
259        } //'new_tasks
260
261        //Consume leftovers
262        process_tasks!();
263
264        match shutdown_recv.try_recv() {
265            Ok(_) => {
266                tracing::info!("Shutdown requested");
267                if consumed_tasks_number > 0 {
268                    manager.trim_queue(1).await;
269                }
270                break 'main;
271            }
272            Err(oneshot::error::TryRecvError::Closed) => {
273                tracing::info!("Unexpected termination");
274                if consumed_tasks_number > 0 {
275                    manager.trim_queue(1).await;
276                }
277                break 'main;
278            }
279            Err(oneshot::error::TryRecvError::Empty) => {
280                if consumed_tasks_number > 0 {
281                    manager.trim_queue(10).await;
282                    consumed_tasks_number = 0;
283                }
284                continue 'main;
285            }
286        }
287    } //'main
288}