Skip to main content

ceres_core/
worker.rs

1//! Worker service for processing harvest jobs from the queue.
2//!
3//! This module provides the [`WorkerService`] that polls for pending jobs
4//! and processes them using the [`HarvestService`].
5//!
6//! # Architecture
7//!
8//! The worker follows a poll-based model:
9//! ```text
10//! loop {
11//!     1. Check for cancellation
12//!     2. Claim next available job (SELECT FOR UPDATE SKIP LOCKED)
13//!     3. Process job using HarvestService
14//!     4. Update job status (completed/failed/cancelled)
15//!     5. If no jobs available, sleep for poll_interval
16//! }
17//! ```
18//!
19//! # Graceful Shutdown
20//!
21//! On cancellation token trigger:
22//! - Stops claiming new jobs
23//! - Allows current job to complete or cancel gracefully
24//! - Releases any claimed jobs back to the queue
25//!
26//! # Example
27//!
28//! ```ignore
29//! use ceres_core::worker::{WorkerService, WorkerConfig, TracingWorkerReporter};
30//! use ceres_core::progress::TracingReporter;
31//! use tokio_util::sync::CancellationToken;
32//!
33//! let worker = WorkerService::new(job_queue, harvest_service, WorkerConfig::default());
34//! let cancel = CancellationToken::new();
35//!
36//! worker.run(cancel, &TracingWorkerReporter, &TracingReporter).await?;
37//! ```
38
39use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::config::PortalType;
45use crate::error::AppError;
46use crate::harvest::HarvestService;
47use crate::job::{HarvestJob, WorkerConfig};
48use crate::job_queue::JobQueue;
49use crate::progress::ProgressReporter;
50use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
51
52// =============================================================================
53// Worker Events
54// =============================================================================
55
56/// Events emitted by the worker during operation.
57#[derive(Debug, Clone)]
58pub enum WorkerEvent<'a> {
59    /// Worker started and is ready to process jobs.
60    Started { worker_id: &'a str },
61    /// Worker is polling for new jobs.
62    Polling,
63    /// Worker claimed a job.
64    JobClaimed { job: &'a HarvestJob },
65    /// Job processing started.
66    JobStarted { job_id: Uuid, portal_url: &'a str },
67    /// Job completed successfully.
68    JobCompleted { job_id: Uuid, stats: &'a SyncStats },
69    /// Job failed with error.
70    JobFailed {
71        job_id: Uuid,
72        error: &'a str,
73        will_retry: bool,
74    },
75    /// Job was cancelled.
76    JobCancelled { job_id: Uuid, stats: &'a SyncStats },
77    /// Worker is shutting down.
78    ShuttingDown {
79        worker_id: &'a str,
80        jobs_released: u64,
81    },
82    /// Worker stopped.
83    Stopped { worker_id: &'a str },
84}
85
86// =============================================================================
87// Worker Reporter Trait
88// =============================================================================
89
90/// Trait for reporting worker events.
91///
92/// Similar to [`ProgressReporter`] but for worker-level events.
93pub trait WorkerReporter: Send + Sync {
94    /// Called when a worker event occurs.
95    ///
96    /// The default implementation does nothing (silent mode).
97    fn report(&self, event: WorkerEvent<'_>) {
98        let _ = event;
99    }
100}
101
102/// Silent worker reporter that ignores all events.
103#[derive(Debug, Default, Clone, Copy)]
104pub struct SilentWorkerReporter;
105
106impl WorkerReporter for SilentWorkerReporter {}
107
108/// Tracing-based worker reporter for CLI/server logging.
109#[derive(Debug, Default, Clone, Copy)]
110pub struct TracingWorkerReporter;
111
112impl WorkerReporter for TracingWorkerReporter {
113    fn report(&self, event: WorkerEvent<'_>) {
114        match event {
115            WorkerEvent::Started { worker_id } => {
116                info!(worker_id, "Worker started");
117            }
118            WorkerEvent::Polling => {
119                // Debug level to avoid spam
120                tracing::debug!("Polling for jobs...");
121            }
122            WorkerEvent::JobClaimed { job } => {
123                info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
124            }
125            WorkerEvent::JobStarted { job_id, portal_url } => {
126                info!(%job_id, portal = portal_url, "Processing job");
127            }
128            WorkerEvent::JobCompleted { job_id, stats } => {
129                info!(
130                    %job_id,
131                    created = stats.created,
132                    updated = stats.updated,
133                    unchanged = stats.unchanged,
134                    failed = stats.failed,
135                    "Job completed"
136                );
137            }
138            WorkerEvent::JobFailed {
139                job_id,
140                error,
141                will_retry,
142            } => {
143                if will_retry {
144                    warn!(%job_id, %error, "Job failed, will retry");
145                } else {
146                    error!(%job_id, %error, "Job permanently failed");
147                }
148            }
149            WorkerEvent::JobCancelled { job_id, stats } => {
150                info!(%job_id, processed = stats.total(), "Job cancelled");
151            }
152            WorkerEvent::ShuttingDown {
153                worker_id,
154                jobs_released,
155            } => {
156                info!(worker_id, jobs_released, "Worker shutting down");
157            }
158            WorkerEvent::Stopped { worker_id } => {
159                info!(worker_id, "Worker stopped");
160            }
161        }
162    }
163}
164
165// =============================================================================
166// Worker Service
167// =============================================================================
168
169/// Worker service that processes jobs from the queue.
170///
171/// This service polls for pending jobs and processes them using the
172/// [`HarvestService`]. It supports graceful shutdown via cancellation tokens.
173pub struct WorkerService<Q, S, E, F>
174where
175    Q: JobQueue,
176    S: DatasetStore,
177    E: EmbeddingProvider,
178    F: PortalClientFactory,
179{
180    queue: Q,
181    harvest_service: HarvestService<S, E, F>,
182    config: WorkerConfig,
183}
184
185impl<Q, S, E, F> WorkerService<Q, S, E, F>
186where
187    Q: JobQueue,
188    S: DatasetStore,
189    E: EmbeddingProvider,
190    F: PortalClientFactory,
191{
192    /// Create a new worker service.
193    pub fn new(queue: Q, harvest_service: HarvestService<S, E, F>, config: WorkerConfig) -> Self {
194        Self {
195            queue,
196            harvest_service,
197            config,
198        }
199    }
200
201    /// Run the worker until cancelled.
202    ///
203    /// The worker will:
204    /// 1. Poll for available jobs
205    /// 2. Claim and process jobs
206    /// 3. Handle retries on failure
207    /// 4. Release jobs on graceful shutdown
208    pub async fn run<WR, HR>(
209        &self,
210        cancel_token: CancellationToken,
211        worker_reporter: &WR,
212        harvest_reporter: &HR,
213    ) -> Result<(), AppError>
214    where
215        WR: WorkerReporter,
216        HR: ProgressReporter,
217    {
218        worker_reporter.report(WorkerEvent::Started {
219            worker_id: &self.config.worker_id,
220        });
221
222        loop {
223            // Check for shutdown
224            if cancel_token.is_cancelled() {
225                break;
226            }
227
228            // Poll for jobs
229            worker_reporter.report(WorkerEvent::Polling);
230
231            match self.queue.claim_job(&self.config.worker_id).await {
232                Ok(Some(job)) => {
233                    worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
234
235                    // Process the job
236                    self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
237                        .await;
238                }
239                Ok(None) => {
240                    // No jobs available, wait before polling again
241                    tokio::select! {
242                        _ = tokio::time::sleep(self.config.poll_interval) => {}
243                        _ = cancel_token.cancelled() => break,
244                    }
245                }
246                Err(e) => {
247                    error!(error = %e, "Failed to claim job");
248                    // Back off on errors
249                    tokio::time::sleep(self.config.poll_interval * 2).await;
250                }
251            }
252        }
253
254        // Graceful shutdown: release any jobs this worker claimed
255        let released = self
256            .queue
257            .release_worker_jobs(&self.config.worker_id)
258            .await
259            .unwrap_or_else(|e| {
260                tracing::error!("Failed to release worker jobs on shutdown: {}", e);
261                0
262            });
263
264        worker_reporter.report(WorkerEvent::ShuttingDown {
265            worker_id: &self.config.worker_id,
266            jobs_released: released,
267        });
268
269        worker_reporter.report(WorkerEvent::Stopped {
270            worker_id: &self.config.worker_id,
271        });
272
273        Ok(())
274    }
275
276    /// Process a single job.
277    async fn process_job<WR, HR>(
278        &self,
279        job: &HarvestJob,
280        cancel_token: &CancellationToken,
281        worker_reporter: &WR,
282        harvest_reporter: &HR,
283    ) where
284        WR: WorkerReporter,
285        HR: ProgressReporter,
286    {
287        worker_reporter.report(WorkerEvent::JobStarted {
288            job_id: job.id,
289            portal_url: &job.portal_url,
290        });
291
292        // Create job-specific cancellation token
293        let job_cancel = cancel_token.child_token();
294
295        // Execute the harvest with cancellation support
296        // TODO: Add portal_type to HarvestJob when Socrata/DCAT support is added
297        let language = job.language.as_deref().unwrap_or("en");
298        let result = self
299            .harvest_service
300            .sync_portal_with_progress_cancellable_with_options(
301                &job.portal_url,
302                job.url_template.as_deref(),
303                language,
304                harvest_reporter,
305                job_cancel.clone(),
306                job.force_full_sync,
307                PortalType::default(),
308            )
309            .await;
310
311        match result {
312            Ok(sync_result) => {
313                if sync_result.is_cancelled() {
314                    // Job was cancelled
315                    worker_reporter.report(WorkerEvent::JobCancelled {
316                        job_id: job.id,
317                        stats: &sync_result.stats,
318                    });
319
320                    if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
321                        error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
322                    }
323                } else {
324                    // Job completed successfully
325                    worker_reporter.report(WorkerEvent::JobCompleted {
326                        job_id: job.id,
327                        stats: &sync_result.stats,
328                    });
329
330                    if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
331                        error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
332                    }
333                }
334            }
335            Err(e) => {
336                let error_msg = e.to_string();
337                let can_retry = job.can_retry() && e.is_retryable();
338
339                worker_reporter.report(WorkerEvent::JobFailed {
340                    job_id: job.id,
341                    error: &error_msg,
342                    will_retry: can_retry,
343                });
344
345                let next_retry = if can_retry {
346                    Some(job.calculate_next_retry(&self.config.retry_config))
347                } else {
348                    None
349                };
350
351                if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
352                    error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
353                }
354            }
355        }
356    }
357
358    /// Process a single job by ID (for one-off execution, e.g., CLI).
359    ///
360    /// This method is useful for processing a specific job without running
361    /// the full polling loop.
362    pub async fn process_single_job<WR, HR>(
363        &self,
364        job_id: Uuid,
365        cancel_token: CancellationToken,
366        worker_reporter: &WR,
367        harvest_reporter: &HR,
368    ) -> Result<(), AppError>
369    where
370        WR: WorkerReporter,
371        HR: ProgressReporter,
372    {
373        let job = self
374            .queue
375            .get_job(job_id)
376            .await?
377            .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
378
379        self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
380            .await;
381
382        Ok(())
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    #[test]
391    fn test_silent_worker_reporter() {
392        let reporter = SilentWorkerReporter;
393        // Should not panic
394        reporter.report(WorkerEvent::Started {
395            worker_id: "test-worker",
396        });
397        reporter.report(WorkerEvent::Polling);
398        reporter.report(WorkerEvent::Stopped {
399            worker_id: "test-worker",
400        });
401    }
402
403    #[test]
404    fn test_tracing_worker_reporter() {
405        let reporter = TracingWorkerReporter;
406
407        // Test all event variants don't panic
408        reporter.report(WorkerEvent::Started {
409            worker_id: "test-worker",
410        });
411        reporter.report(WorkerEvent::Polling);
412
413        let stats = SyncStats {
414            created: 5,
415            updated: 3,
416            unchanged: 10,
417            failed: 1,
418            skipped: 0,
419        };
420        reporter.report(WorkerEvent::JobCompleted {
421            job_id: Uuid::new_v4(),
422            stats: &stats,
423        });
424        reporter.report(WorkerEvent::JobFailed {
425            job_id: Uuid::new_v4(),
426            error: "test error",
427            will_retry: true,
428        });
429        reporter.report(WorkerEvent::JobFailed {
430            job_id: Uuid::new_v4(),
431            error: "fatal error",
432            will_retry: false,
433        });
434        reporter.report(WorkerEvent::JobCancelled {
435            job_id: Uuid::new_v4(),
436            stats: &stats,
437        });
438        reporter.report(WorkerEvent::ShuttingDown {
439            worker_id: "test-worker",
440            jobs_released: 2,
441        });
442        reporter.report(WorkerEvent::Stopped {
443            worker_id: "test-worker",
444        });
445    }
446}