1use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::config::PortalType;
45use crate::error::AppError;
46use crate::harvest::HarvestService;
47use crate::job::{HarvestJob, WorkerConfig};
48use crate::job_queue::JobQueue;
49use crate::progress::ProgressReporter;
50use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
51
52#[derive(Debug, Clone)]
58pub enum WorkerEvent<'a> {
59 Started { worker_id: &'a str },
61 Polling,
63 JobClaimed { job: &'a HarvestJob },
65 JobStarted { job_id: Uuid, portal_url: &'a str },
67 JobCompleted { job_id: Uuid, stats: &'a SyncStats },
69 JobFailed {
71 job_id: Uuid,
72 error: &'a str,
73 will_retry: bool,
74 },
75 JobCancelled { job_id: Uuid, stats: &'a SyncStats },
77 ShuttingDown {
79 worker_id: &'a str,
80 jobs_released: u64,
81 },
82 Stopped { worker_id: &'a str },
84}
85
86pub trait WorkerReporter: Send + Sync {
94 fn report(&self, event: WorkerEvent<'_>) {
98 let _ = event;
99 }
100}
101
102#[derive(Debug, Default, Clone, Copy)]
104pub struct SilentWorkerReporter;
105
106impl WorkerReporter for SilentWorkerReporter {}
107
108#[derive(Debug, Default, Clone, Copy)]
110pub struct TracingWorkerReporter;
111
112impl WorkerReporter for TracingWorkerReporter {
113 fn report(&self, event: WorkerEvent<'_>) {
114 match event {
115 WorkerEvent::Started { worker_id } => {
116 info!(worker_id, "Worker started");
117 }
118 WorkerEvent::Polling => {
119 tracing::debug!("Polling for jobs...");
121 }
122 WorkerEvent::JobClaimed { job } => {
123 info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
124 }
125 WorkerEvent::JobStarted { job_id, portal_url } => {
126 info!(%job_id, portal = portal_url, "Processing job");
127 }
128 WorkerEvent::JobCompleted { job_id, stats } => {
129 info!(
130 %job_id,
131 created = stats.created,
132 updated = stats.updated,
133 unchanged = stats.unchanged,
134 failed = stats.failed,
135 "Job completed"
136 );
137 }
138 WorkerEvent::JobFailed {
139 job_id,
140 error,
141 will_retry,
142 } => {
143 if will_retry {
144 warn!(%job_id, %error, "Job failed, will retry");
145 } else {
146 error!(%job_id, %error, "Job permanently failed");
147 }
148 }
149 WorkerEvent::JobCancelled { job_id, stats } => {
150 info!(%job_id, processed = stats.total(), "Job cancelled");
151 }
152 WorkerEvent::ShuttingDown {
153 worker_id,
154 jobs_released,
155 } => {
156 info!(worker_id, jobs_released, "Worker shutting down");
157 }
158 WorkerEvent::Stopped { worker_id } => {
159 info!(worker_id, "Worker stopped");
160 }
161 }
162 }
163}
164
165pub struct WorkerService<Q, S, E, F>
174where
175 Q: JobQueue,
176 S: DatasetStore,
177 E: EmbeddingProvider,
178 F: PortalClientFactory,
179{
180 queue: Q,
181 harvest_service: HarvestService<S, E, F>,
182 config: WorkerConfig,
183}
184
185impl<Q, S, E, F> WorkerService<Q, S, E, F>
186where
187 Q: JobQueue,
188 S: DatasetStore,
189 E: EmbeddingProvider,
190 F: PortalClientFactory,
191{
192 pub fn new(queue: Q, harvest_service: HarvestService<S, E, F>, config: WorkerConfig) -> Self {
194 Self {
195 queue,
196 harvest_service,
197 config,
198 }
199 }
200
201 pub async fn run<WR, HR>(
209 &self,
210 cancel_token: CancellationToken,
211 worker_reporter: &WR,
212 harvest_reporter: &HR,
213 ) -> Result<(), AppError>
214 where
215 WR: WorkerReporter,
216 HR: ProgressReporter,
217 {
218 worker_reporter.report(WorkerEvent::Started {
219 worker_id: &self.config.worker_id,
220 });
221
222 loop {
223 if cancel_token.is_cancelled() {
225 break;
226 }
227
228 worker_reporter.report(WorkerEvent::Polling);
230
231 match self.queue.claim_job(&self.config.worker_id).await {
232 Ok(Some(job)) => {
233 worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
234
235 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
237 .await;
238 }
239 Ok(None) => {
240 tokio::select! {
242 _ = tokio::time::sleep(self.config.poll_interval) => {}
243 _ = cancel_token.cancelled() => break,
244 }
245 }
246 Err(e) => {
247 error!(error = %e, "Failed to claim job");
248 tokio::time::sleep(self.config.poll_interval * 2).await;
250 }
251 }
252 }
253
254 let released = self
256 .queue
257 .release_worker_jobs(&self.config.worker_id)
258 .await
259 .unwrap_or_else(|e| {
260 tracing::error!("Failed to release worker jobs on shutdown: {}", e);
261 0
262 });
263
264 worker_reporter.report(WorkerEvent::ShuttingDown {
265 worker_id: &self.config.worker_id,
266 jobs_released: released,
267 });
268
269 worker_reporter.report(WorkerEvent::Stopped {
270 worker_id: &self.config.worker_id,
271 });
272
273 Ok(())
274 }
275
276 async fn process_job<WR, HR>(
278 &self,
279 job: &HarvestJob,
280 cancel_token: &CancellationToken,
281 worker_reporter: &WR,
282 harvest_reporter: &HR,
283 ) where
284 WR: WorkerReporter,
285 HR: ProgressReporter,
286 {
287 worker_reporter.report(WorkerEvent::JobStarted {
288 job_id: job.id,
289 portal_url: &job.portal_url,
290 });
291
292 let job_cancel = cancel_token.child_token();
294
295 let language = job.language.as_deref().unwrap_or("en");
298 let result = self
299 .harvest_service
300 .sync_portal_with_progress_cancellable_with_options(
301 &job.portal_url,
302 job.url_template.as_deref(),
303 language,
304 harvest_reporter,
305 job_cancel.clone(),
306 job.force_full_sync,
307 PortalType::default(),
308 )
309 .await;
310
311 match result {
312 Ok(sync_result) => {
313 if sync_result.is_cancelled() {
314 worker_reporter.report(WorkerEvent::JobCancelled {
316 job_id: job.id,
317 stats: &sync_result.stats,
318 });
319
320 if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
321 error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
322 }
323 } else {
324 worker_reporter.report(WorkerEvent::JobCompleted {
326 job_id: job.id,
327 stats: &sync_result.stats,
328 });
329
330 if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
331 error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
332 }
333 }
334 }
335 Err(e) => {
336 let error_msg = e.to_string();
337 let can_retry = job.can_retry() && e.is_retryable();
338
339 worker_reporter.report(WorkerEvent::JobFailed {
340 job_id: job.id,
341 error: &error_msg,
342 will_retry: can_retry,
343 });
344
345 let next_retry = if can_retry {
346 Some(job.calculate_next_retry(&self.config.retry_config))
347 } else {
348 None
349 };
350
351 if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
352 error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
353 }
354 }
355 }
356 }
357
358 pub async fn process_single_job<WR, HR>(
363 &self,
364 job_id: Uuid,
365 cancel_token: CancellationToken,
366 worker_reporter: &WR,
367 harvest_reporter: &HR,
368 ) -> Result<(), AppError>
369 where
370 WR: WorkerReporter,
371 HR: ProgressReporter,
372 {
373 let job = self
374 .queue
375 .get_job(job_id)
376 .await?
377 .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
378
379 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
380 .await;
381
382 Ok(())
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 #[test]
391 fn test_silent_worker_reporter() {
392 let reporter = SilentWorkerReporter;
393 reporter.report(WorkerEvent::Started {
395 worker_id: "test-worker",
396 });
397 reporter.report(WorkerEvent::Polling);
398 reporter.report(WorkerEvent::Stopped {
399 worker_id: "test-worker",
400 });
401 }
402
403 #[test]
404 fn test_tracing_worker_reporter() {
405 let reporter = TracingWorkerReporter;
406
407 reporter.report(WorkerEvent::Started {
409 worker_id: "test-worker",
410 });
411 reporter.report(WorkerEvent::Polling);
412
413 let stats = SyncStats {
414 created: 5,
415 updated: 3,
416 unchanged: 10,
417 failed: 1,
418 skipped: 0,
419 };
420 reporter.report(WorkerEvent::JobCompleted {
421 job_id: Uuid::new_v4(),
422 stats: &stats,
423 });
424 reporter.report(WorkerEvent::JobFailed {
425 job_id: Uuid::new_v4(),
426 error: "test error",
427 will_retry: true,
428 });
429 reporter.report(WorkerEvent::JobFailed {
430 job_id: Uuid::new_v4(),
431 error: "fatal error",
432 will_retry: false,
433 });
434 reporter.report(WorkerEvent::JobCancelled {
435 job_id: Uuid::new_v4(),
436 stats: &stats,
437 });
438 reporter.report(WorkerEvent::ShuttingDown {
439 worker_id: "test-worker",
440 jobs_released: 2,
441 });
442 reporter.report(WorkerEvent::Stopped {
443 worker_id: "test-worker",
444 });
445 }
446}