axum_tasks/
worker.rs

1use crate::{
2    AppTasks, TaskResult, TaskStatus,
3    types::{MAX_RETRIES, QueuedTask},
4};
5use flume::Receiver;
6use std::time::Duration;
7use tokio_util::sync::CancellationToken;
8
9/// Spawns background task workers. Returns immediately.
10/// Workers process tasks until shutdown, completing their current task before exiting.
11pub fn spawn_task_workers(
12    app_tasks: AppTasks,
13    shutdown: CancellationToken,
14    worker_count: Option<usize>,
15) {
16    let num_workers = worker_count.unwrap_or_else(|| std::cmp::max(4, num_cpus::get() / 2));
17    tracing::info!("Starting {} task workers", num_workers);
18    for worker_id in 0..num_workers {
19        let receiver = app_tasks.receiver().clone();
20        let app_tasks = app_tasks.clone();
21        let shutdown = shutdown.clone();
22        tokio::spawn(async move {
23            worker_loop(worker_id, receiver, app_tasks, shutdown).await;
24        });
25    }
26}
27
28async fn worker_loop(
29    worker_id: usize,
30    receiver: Receiver<QueuedTask>,
31    app_tasks: AppTasks,
32    shutdown: CancellationToken,
33) {
34    tracing::debug!("Worker {} started", worker_id);
35    loop {
36        tokio::select! {
37            task_result = receiver.recv_async() => {
38                match task_result {
39                    Ok(task) => process_task(worker_id, task, &app_tasks).await,
40                    Err(_) => break,
41                }
42            }
43            _ = shutdown.cancelled() => break,
44        }
45    }
46    tracing::debug!("Worker {} stopped", worker_id);
47}
48
49async fn process_task(worker_id: usize, task: QueuedTask, app_tasks: &AppTasks) {
50    let start_time = std::time::Instant::now();
51
52    tracing::debug!(
53        task_id = %task.id,
54        task_name = %task.task_name,
55        worker_id = worker_id,
56        retry_count = task.retry_count,
57        "Processing task"
58    );
59
60    let queue_wait_time = task.created_at.elapsed();
61    if queue_wait_time > Duration::from_secs(300) {
62        tracing::warn!(
63            task_id = %task.id,
64            wait_time_ms = queue_wait_time.as_millis(),
65            "Task waited unusually long in queue - system may be overloaded"
66        );
67    }
68
69    app_tasks
70        .update_task_status(
71            &task.id,
72            TaskStatus::InProgress,
73            Some(worker_id),
74            None,
75            None,
76        )
77        .await;
78
79    let result =
80        execute_task_from_registry(&task.task_name, &task.task_data, app_tasks, &task.id).await;
81    let duration = start_time.elapsed();
82    let duration_ms = duration.as_millis() as u64;
83
84    match result {
85        TaskResult::Success => {
86            app_tasks
87                .update_task_status(
88                    &task.id,
89                    TaskStatus::Completed,
90                    Some(worker_id),
91                    Some(duration_ms),
92                    None,
93                )
94                .await;
95
96            app_tasks.metrics_ref().record_completed();
97
98            tracing::info!(
99                task_id = %task.id,
100                duration_ms = duration_ms,
101                "Task completed successfully"
102            );
103        }
104
105        TaskResult::RetryableError(error) => {
106            if task.retry_count < MAX_RETRIES {
107                let delay = calculate_retry_delay(task.retry_count);
108
109                tracing::warn!(
110                    task_id = %task.id,
111                    error = %error,
112                    retry_count = task.retry_count,
113                    delay_ms = delay.as_millis(),
114                    "Task failed, scheduling retry"
115                );
116
117                app_tasks
118                    .update_task_status(
119                        &task.id,
120                        TaskStatus::Retrying,
121                        Some(worker_id),
122                        Some(duration_ms),
123                        Some(error.clone()),
124                    )
125                    .await;
126
127                schedule_retry(task, delay, app_tasks).await;
128            } else {
129                app_tasks
130                    .update_task_status(
131                        &task.id,
132                        TaskStatus::Failed,
133                        Some(worker_id),
134                        Some(duration_ms),
135                        Some(format!("Max retries exceeded: {}", error)),
136                    )
137                    .await;
138
139                app_tasks.metrics_ref().record_failed();
140
141                tracing::error!(
142                    task_id = %task.id,
143                    error = %error,
144                    retry_count = task.retry_count,
145                    "Task failed permanently after max retries"
146                );
147            }
148        }
149
150        TaskResult::PermanentError(error) => {
151            app_tasks
152                .update_task_status(
153                    &task.id,
154                    TaskStatus::Failed,
155                    Some(worker_id),
156                    Some(duration_ms),
157                    Some(error.clone()),
158                )
159                .await;
160
161            app_tasks.metrics_ref().record_failed();
162
163            tracing::error!(
164                task_id = %task.id,
165                error = %error,
166                "Task failed permanently"
167            );
168        }
169    }
170}
171
172async fn execute_task_from_registry(
173    task_name: &str,
174    task_data: &[u8],
175    app_tasks: &AppTasks,
176    job_id: &str,
177) -> TaskResult {
178    // Use the global task registry to find and execute the task
179    for registration in inventory::iter::<crate::TaskRegistration> {
180        if registration.name == task_name {
181            match (registration.handler)(task_data, app_tasks, job_id).await {
182                Ok(result) => return result,
183                Err(error_result) => return error_result,
184            }
185        }
186    }
187
188    TaskResult::PermanentError(format!("Unknown task type: {}", task_name))
189}
190
191async fn schedule_retry(mut task: QueuedTask, delay: Duration, app_tasks: &AppTasks) {
192    task.retry_count += 1;
193
194    let sender = app_tasks.sender().clone();
195
196    tokio::spawn(async move {
197        tokio::time::sleep(delay).await;
198
199        if let Err(e) = sender.send_async(task.clone()).await {
200            tracing::error!(
201                task_id = %task.id,
202                error = %e,
203                "Failed to requeue task for retry"
204            );
205        }
206    });
207}
208
209fn calculate_retry_delay(retry_count: u32) -> Duration {
210    // Exponential backoff: 2^retry_count seconds, capped at 5 minutes
211    let base_delay = 2_u64.pow(retry_count);
212    let delay_seconds = std::cmp::min(base_delay, 300); // Cap at 5 minutes
213    Duration::from_secs(delay_seconds)
214}