redis_queue/manager/
run.rs1use crate::manager::Manager;
2use crate::manager::dispatch::{Dispatch, TaskResult};
3
4use core::future::Future;
5use core::{cmp, time};
6use std::time::Instant;
7
8use tokio::sync::oneshot;
9use redis::RedisError;
10
11const TIMEOUT_INTERVAL: time::Duration = time::Duration::from_secs(5);
12
13struct TimeScheduler {
15 timeout_interval: time::Duration,
16 timeout_limit: time::Duration,
17 on_error_timeout: time::Duration,
18}
19
20impl TimeScheduler {
21 #[inline(always)]
22 pub fn new(timeout_limit: time::Duration) -> Self {
26 Self {
27 timeout_interval: TIMEOUT_INTERVAL,
28 on_error_timeout: TIMEOUT_INTERVAL,
29 timeout_limit,
30 }
31 }
32
33 #[inline(always)]
34 pub fn on_redis_recovery(&mut self) {
36 self.on_error_timeout = self.timeout_interval;
37 }
38
39 #[inline(always)]
40 pub fn next_redis_error(&mut self, error: RedisError) -> Result<time::Duration, RedisError> {
42 if error.is_timeout() || error.is_connection_refusal() || error.is_connection_dropped() {
43 tracing::info!("Redis temporary unavailable: {error}");
44 self.on_error_timeout = cmp::min(self.timeout_limit, self.on_error_timeout + self.timeout_interval);
46 Ok(self.on_error_timeout)
47 } else {
48 Err(error)
49 }
50 }
51}
52
53pub struct RunParams<T> {
55 pub manager: Manager,
57 pub shutdown_recv: oneshot::Receiver<()>,
59 pub max_task_count: usize,
65 pub dispatcher: T,
67}
68
69#[tracing::instrument(skip(params), fields(consumer = params.manager.config().consumer.as_ref()))]
70pub async fn manage<T: Dispatch>(params: RunParams<T>) where T::Future: Future<Output = TaskResult<T::PayloadType>> {
72 let RunParams { manager, mut shutdown_recv, max_task_count, dispatcher } = params;
73
74 let mut scheduler = TimeScheduler::new(manager.config().poll_time);
75
76 let max_retry = manager.max_pending_retry_count();
77
78 let mut expired_tasks = manager.expired_pending_tasks(max_task_count, None);
79 let mut fetch_new_tasks = manager.fetch_new_tasks(max_task_count);
80
81 let mut ongoing_tasks = Vec::new();
82 let mut completed_tasks = Vec::new();
83 let mut consumed_tasks_number = 0usize;
84
85 manager.trim_queue(10).await;
87
88 'main: loop {
89 macro_rules! on_redis_error {
97 ($error:ident where OK=$($ok:tt)*) => {
98 match scheduler.next_redis_error($error) {
99 Ok(sleep) => {
100 tracing::info!("Retry in {}s", sleep.as_secs());
101 tokio::time::sleep(sleep).await;
102 continue $($ok)*;
103 }
104 Err(error) => {
105 tracing::error!("Redis queue cannot be processed: {error}");
106 break 'main;
108 }
109 }
110 }
111 }
112
113 if let Err(error) = expired_tasks.set_range_until_now().await {
114 on_redis_error!(error where OK='main);
115 } else {
116 scheduler.on_redis_recovery();
117 }
118
119 'expired_tasks: loop {
121 match expired_tasks.next().await {
122 Ok(tasks) if tasks.is_empty() => {
123 break 'expired_tasks;
124 }
125 Ok(tasks) => {
126 let tasks = tasks.iter().filter(|entry| entry.count > max_retry).map(|entry| entry.id).collect::<Vec<_>>();
129 if tasks.is_empty() {
130 break 'expired_tasks;
131 }
132
133 if let Err(error) = manager.consume_tasks(&tasks).await {
134 on_redis_error!(error where OK=);
135 } else {
136 break 'expired_tasks;
137 }
138 }
139 Err(error) => on_redis_error!(error where OK=),
140 }
141 } scheduler.on_redis_recovery();
144
145 #[allow(clippy::never_loop)]
146 'failed_tasks: loop {
148 let mut pending = manager.pending_tasks(max_task_count, None);
149 pending.set_idle(manager.config().poll_time);
151 'failed_tasks_end_range: loop {
152 if let Err(error) = pending.set_range_until_now().await {
153 on_redis_error!(error where OK='failed_tasks_end_range);
154 } else {
155 scheduler.on_redis_recovery();
156 break 'failed_tasks_end_range;
157 }
158 }
159
160 'failed_tasks_fetch: loop {
161 match pending.next().await {
162 Ok(tasks) if tasks.is_empty() => break 'failed_tasks,
163 Ok(tasks) => {
164 for task in tasks {
165 match manager.get_pending_by_id(task.id).await {
166 Ok(Some(task)) => ongoing_tasks.push(dispatcher.send(task)),
167 Ok(None) => (),
168 Err(error) => on_redis_error!(error where OK=),
169 }
170 }
171 }
172 Err(error) => on_redis_error!(error where OK='failed_tasks_fetch),
173 }
174 }
175 } scheduler.on_redis_recovery();
178
179 let new_tasks_started = Instant::now();
180 fetch_new_tasks.set_timeout(manager.config().poll_time);
181 fetch_new_tasks.set_count(max_task_count);
182 let mut new_tasks_cap = max_task_count;
183
184 macro_rules! process_tasks {
185 () => {
186 for ongoing in ongoing_tasks.drain(..) {
187 let result = ongoing.await;
188 tracing::debug!("task(redis={}, user_id={}): {:?}", result.data.id, result.data.value.id, result.kind);
189 if !result.kind.is_need_retry() {
190 completed_tasks.push(result.data.id);
191 }
192 }
193 if !completed_tasks.is_empty() {
195 'completed_tasks: loop {
196 match manager.consume_tasks(&completed_tasks).await {
197 Ok(_) => {
198 tracing::info!("Completed {} tasks", completed_tasks.len());
199 consumed_tasks_number = consumed_tasks_number.saturating_add(completed_tasks.len());
200 completed_tasks.clear();
201 break 'completed_tasks;
202 }
203 Err(error) => on_redis_error!(error where OK='completed_tasks),
204 }
205 }
206 scheduler.on_redis_recovery();
207 }
208 };
209 }
210
211 'new_tasks: loop {
213 match fetch_new_tasks.next_entries().await {
214 Ok(tasks) if tasks.is_empty() => {
215 break;
216 }
217 Ok(tasks) => {
218 tracing::info!("Fetched {} tasks", tasks.len());
219 let timestamp = 'new_tasks_now: loop {
220 match manager.queue().time().await {
221 Ok(timestamp) => break timestamp,
222 Err(error) => on_redis_error!(error where OK='new_tasks_now),
223 }
224 };
225
226 for task in tasks.into_iter() {
227 if task.id.as_timestamp() <= timestamp {
230 new_tasks_cap = new_tasks_cap.saturating_sub(1);
231 ongoing_tasks.push(dispatcher.send(task));
232 } else {
233 tracing::debug!("task(id={}) scheduled in future. Current time={}", task.id, timestamp.as_millis());
234 }
235 }
236
237 if new_tasks_cap == 0 {
238 process_tasks!();
240 new_tasks_cap = max_task_count;
241 }
242
243 let elapsed = new_tasks_started.elapsed();
245 if let Some(new_timeout) = manager.config().poll_time.checked_sub(elapsed) {
246 if new_timeout.as_secs() == 0 {
248 break 'new_tasks;
249 }
250
251 fetch_new_tasks.set_timeout(new_timeout);
252 fetch_new_tasks.set_count(new_tasks_cap);
253 } else {
254 break 'new_tasks;
255 }
256 }
257 Err(error) => on_redis_error!(error where OK='new_tasks),
258 }
259 } process_tasks!();
263
264 match shutdown_recv.try_recv() {
265 Ok(_) => {
266 tracing::info!("Shutdown requested");
267 if consumed_tasks_number > 0 {
268 manager.trim_queue(1).await;
269 }
270 break 'main;
271 }
272 Err(oneshot::error::TryRecvError::Closed) => {
273 tracing::info!("Unexpected termination");
274 if consumed_tasks_number > 0 {
275 manager.trim_queue(1).await;
276 }
277 break 'main;
278 }
279 Err(oneshot::error::TryRecvError::Empty) => {
280 if consumed_tasks_number > 0 {
281 manager.trim_queue(10).await;
282 consumed_tasks_number = 0;
283 }
284 continue 'main;
285 }
286 }
287 } }