1use crate::{
2 AppTasks, TaskResult, TaskStatus,
3 types::{MAX_RETRIES, QueuedTask},
4};
5use flume::Receiver;
6use std::time::Duration;
7use tokio_util::sync::CancellationToken;
8
9pub fn spawn_task_workers(
12 app_tasks: AppTasks,
13 shutdown: CancellationToken,
14 worker_count: Option<usize>,
15) {
16 let num_workers = worker_count.unwrap_or_else(|| std::cmp::max(4, num_cpus::get() / 2));
17 tracing::info!("Starting {} task workers", num_workers);
18 for worker_id in 0..num_workers {
19 let receiver = app_tasks.receiver().clone();
20 let app_tasks = app_tasks.clone();
21 let shutdown = shutdown.clone();
22 tokio::spawn(async move {
23 worker_loop(worker_id, receiver, app_tasks, shutdown).await;
24 });
25 }
26}
27
28async fn worker_loop(
29 worker_id: usize,
30 receiver: Receiver<QueuedTask>,
31 app_tasks: AppTasks,
32 shutdown: CancellationToken,
33) {
34 tracing::debug!("Worker {} started", worker_id);
35 loop {
36 tokio::select! {
37 task_result = receiver.recv_async() => {
38 match task_result {
39 Ok(task) => process_task(worker_id, task, &app_tasks).await,
40 Err(_) => break,
41 }
42 }
43 _ = shutdown.cancelled() => break,
44 }
45 }
46 tracing::debug!("Worker {} stopped", worker_id);
47}
48
49async fn process_task(worker_id: usize, task: QueuedTask, app_tasks: &AppTasks) {
50 let start_time = std::time::Instant::now();
51
52 tracing::debug!(
53 task_id = %task.id,
54 task_name = %task.task_name,
55 worker_id = worker_id,
56 retry_count = task.retry_count,
57 "Processing task"
58 );
59
60 let queue_wait_time = task.created_at.elapsed();
61 if queue_wait_time > Duration::from_secs(300) {
62 tracing::warn!(
63 task_id = %task.id,
64 wait_time_ms = queue_wait_time.as_millis(),
65 "Task waited unusually long in queue - system may be overloaded"
66 );
67 }
68
69 app_tasks
70 .update_task_status(
71 &task.id,
72 TaskStatus::InProgress,
73 Some(worker_id),
74 None,
75 None,
76 )
77 .await;
78
79 let result =
80 execute_task_from_registry(&task.task_name, &task.task_data, app_tasks, &task.id).await;
81 let duration = start_time.elapsed();
82 let duration_ms = duration.as_millis() as u64;
83
84 match result {
85 TaskResult::Success => {
86 app_tasks
87 .update_task_status(
88 &task.id,
89 TaskStatus::Completed,
90 Some(worker_id),
91 Some(duration_ms),
92 None,
93 )
94 .await;
95
96 app_tasks.metrics_ref().record_completed();
97
98 tracing::info!(
99 task_id = %task.id,
100 duration_ms = duration_ms,
101 "Task completed successfully"
102 );
103 }
104
105 TaskResult::RetryableError(error) => {
106 if task.retry_count < MAX_RETRIES {
107 let delay = calculate_retry_delay(task.retry_count);
108
109 tracing::warn!(
110 task_id = %task.id,
111 error = %error,
112 retry_count = task.retry_count,
113 delay_ms = delay.as_millis(),
114 "Task failed, scheduling retry"
115 );
116
117 app_tasks
118 .update_task_status(
119 &task.id,
120 TaskStatus::Retrying,
121 Some(worker_id),
122 Some(duration_ms),
123 Some(error.clone()),
124 )
125 .await;
126
127 schedule_retry(task, delay, app_tasks).await;
128 } else {
129 app_tasks
130 .update_task_status(
131 &task.id,
132 TaskStatus::Failed,
133 Some(worker_id),
134 Some(duration_ms),
135 Some(format!("Max retries exceeded: {}", error)),
136 )
137 .await;
138
139 app_tasks.metrics_ref().record_failed();
140
141 tracing::error!(
142 task_id = %task.id,
143 error = %error,
144 retry_count = task.retry_count,
145 "Task failed permanently after max retries"
146 );
147 }
148 }
149
150 TaskResult::PermanentError(error) => {
151 app_tasks
152 .update_task_status(
153 &task.id,
154 TaskStatus::Failed,
155 Some(worker_id),
156 Some(duration_ms),
157 Some(error.clone()),
158 )
159 .await;
160
161 app_tasks.metrics_ref().record_failed();
162
163 tracing::error!(
164 task_id = %task.id,
165 error = %error,
166 "Task failed permanently"
167 );
168 }
169 }
170}
171
172async fn execute_task_from_registry(
173 task_name: &str,
174 task_data: &[u8],
175 app_tasks: &AppTasks,
176 job_id: &str,
177) -> TaskResult {
178 for registration in inventory::iter::<crate::TaskRegistration> {
180 if registration.name == task_name {
181 match (registration.handler)(task_data, app_tasks, job_id).await {
182 Ok(result) => return result,
183 Err(error_result) => return error_result,
184 }
185 }
186 }
187
188 TaskResult::PermanentError(format!("Unknown task type: {}", task_name))
189}
190
191async fn schedule_retry(mut task: QueuedTask, delay: Duration, app_tasks: &AppTasks) {
192 task.retry_count += 1;
193
194 let sender = app_tasks.sender().clone();
195
196 tokio::spawn(async move {
197 tokio::time::sleep(delay).await;
198
199 if let Err(e) = sender.send_async(task.clone()).await {
200 tracing::error!(
201 task_id = %task.id,
202 error = %e,
203 "Failed to requeue task for retry"
204 );
205 }
206 });
207}
208
209fn calculate_retry_delay(retry_count: u32) -> Duration {
210 let base_delay = 2_u64.pow(retry_count);
212 let delay_seconds = std::cmp::min(base_delay, 300); Duration::from_secs(delay_seconds)
214}