Skip to main content

ceres_core/
worker.rs

1//! Worker service for processing harvest jobs from the queue.
2//!
3//! This module provides the [`WorkerService`] that polls for pending jobs
4//! and processes them using the [`crate::HarvestPipeline`].
5//!
6//! # Architecture
7//!
8//! The worker follows a poll-based model:
9//! ```text
10//! loop {
11//!     1. Check for cancellation
12//!     2. Claim next available job (SELECT FOR UPDATE SKIP LOCKED)
13//!     3. Process job using HarvestPipeline
14//!     4. Update job status (completed/failed/cancelled)
15//!     5. If no jobs available, sleep for poll_interval
16//! }
17//! ```
18//!
19//! # Graceful Shutdown
20//!
21//! On cancellation token trigger:
22//! - Stops claiming new jobs
23//! - Allows current job to complete or cancel gracefully
24//! - Releases any claimed jobs back to the queue
25//!
26//! # Example
27//!
28//! ```ignore
29//! use ceres_core::worker::{WorkerService, WorkerConfig, TracingWorkerReporter};
30//! use ceres_core::progress::TracingReporter;
31//! use tokio_util::sync::CancellationToken;
32//!
33//! let worker = WorkerService::new(job_queue, harvest_service, WorkerConfig::default());
34//! let cancel = CancellationToken::new();
35//!
36//! worker.run(cancel, &TracingWorkerReporter, &TracingReporter).await?;
37//! ```
38
39use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::config::PortalType;
45use crate::error::AppError;
46use crate::job::{HarvestJob, WorkerConfig};
47use crate::job_queue::JobQueue;
48use crate::pipeline::HarvestPipeline;
49use crate::progress::ProgressReporter;
50use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
51
52// =============================================================================
53// Worker Events
54// =============================================================================
55
56/// Events emitted by the worker during operation.
57#[derive(Debug, Clone)]
58pub enum WorkerEvent<'a> {
59    /// Worker started and is ready to process jobs.
60    Started { worker_id: &'a str },
61    /// Worker is polling for new jobs.
62    Polling,
63    /// Worker claimed a job.
64    JobClaimed { job: &'a HarvestJob },
65    /// Job processing started.
66    JobStarted { job_id: Uuid, portal_url: &'a str },
67    /// Job completed successfully.
68    JobCompleted { job_id: Uuid, stats: &'a SyncStats },
69    /// Job failed with error.
70    JobFailed {
71        job_id: Uuid,
72        error: &'a str,
73        will_retry: bool,
74    },
75    /// Job was cancelled.
76    JobCancelled { job_id: Uuid, stats: &'a SyncStats },
77    /// Worker is shutting down.
78    ShuttingDown {
79        worker_id: &'a str,
80        jobs_released: u64,
81    },
82    /// Worker stopped.
83    Stopped { worker_id: &'a str },
84}
85
86// =============================================================================
87// Worker Reporter Trait
88// =============================================================================
89
90/// Trait for reporting worker events.
91///
92/// Similar to [`ProgressReporter`] but for worker-level events.
93pub trait WorkerReporter: Send + Sync {
94    /// Called when a worker event occurs.
95    ///
96    /// The default implementation does nothing (silent mode).
97    fn report(&self, event: WorkerEvent<'_>) {
98        let _ = event;
99    }
100}
101
102/// Silent worker reporter that ignores all events.
103#[derive(Debug, Default, Clone, Copy)]
104pub struct SilentWorkerReporter;
105
106impl WorkerReporter for SilentWorkerReporter {}
107
108/// Tracing-based worker reporter for CLI/server logging.
109#[derive(Debug, Default, Clone, Copy)]
110pub struct TracingWorkerReporter;
111
112impl WorkerReporter for TracingWorkerReporter {
113    fn report(&self, event: WorkerEvent<'_>) {
114        match event {
115            WorkerEvent::Started { worker_id } => {
116                info!(worker_id, "Worker started");
117            }
118            WorkerEvent::Polling => {
119                // Debug level to avoid spam
120                tracing::debug!("Polling for jobs...");
121            }
122            WorkerEvent::JobClaimed { job } => {
123                info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
124            }
125            WorkerEvent::JobStarted { job_id, portal_url } => {
126                info!(%job_id, portal = portal_url, "Processing job");
127            }
128            WorkerEvent::JobCompleted { job_id, stats } => {
129                info!(
130                    %job_id,
131                    created = stats.created,
132                    updated = stats.updated,
133                    unchanged = stats.unchanged,
134                    failed = stats.failed,
135                    "Job completed"
136                );
137            }
138            WorkerEvent::JobFailed {
139                job_id,
140                error,
141                will_retry,
142            } => {
143                if will_retry {
144                    warn!(%job_id, %error, "Job failed, will retry");
145                } else {
146                    error!(%job_id, %error, "Job permanently failed");
147                }
148            }
149            WorkerEvent::JobCancelled { job_id, stats } => {
150                info!(%job_id, processed = stats.total(), "Job cancelled");
151            }
152            WorkerEvent::ShuttingDown {
153                worker_id,
154                jobs_released,
155            } => {
156                info!(worker_id, jobs_released, "Worker shutting down");
157            }
158            WorkerEvent::Stopped { worker_id } => {
159                info!(worker_id, "Worker stopped");
160            }
161        }
162    }
163}
164
165// =============================================================================
166// Worker Service
167// =============================================================================
168
169/// Worker service that processes jobs from the queue.
170///
171/// This service polls for pending jobs and processes them using the
172/// [`HarvestPipeline`]. It supports both full harvest+embed and metadata-only
173/// modes, and graceful shutdown via cancellation tokens.
174pub struct WorkerService<Q, S, E, F>
175where
176    Q: JobQueue,
177    S: DatasetStore,
178    E: EmbeddingProvider,
179    F: PortalClientFactory,
180{
181    queue: Q,
182    pipeline: HarvestPipeline<S, E, F>,
183    config: WorkerConfig,
184}
185
186impl<Q, S, E, F> WorkerService<Q, S, E, F>
187where
188    Q: JobQueue,
189    S: DatasetStore + Clone,
190    E: EmbeddingProvider,
191    F: PortalClientFactory,
192{
193    /// Create a new worker service.
194    pub fn new(queue: Q, pipeline: HarvestPipeline<S, E, F>, config: WorkerConfig) -> Self {
195        Self {
196            queue,
197            pipeline,
198            config,
199        }
200    }
201
202    /// Run the worker until cancelled.
203    ///
204    /// The worker will:
205    /// 1. Poll for available jobs
206    /// 2. Claim and process jobs
207    /// 3. Handle retries on failure
208    /// 4. Release jobs on graceful shutdown
209    pub async fn run<WR, HR>(
210        &self,
211        cancel_token: CancellationToken,
212        worker_reporter: &WR,
213        harvest_reporter: &HR,
214    ) -> Result<(), AppError>
215    where
216        WR: WorkerReporter,
217        HR: ProgressReporter,
218    {
219        worker_reporter.report(WorkerEvent::Started {
220            worker_id: &self.config.worker_id,
221        });
222
223        loop {
224            // Check for shutdown
225            if cancel_token.is_cancelled() {
226                break;
227            }
228
229            // Poll for jobs
230            worker_reporter.report(WorkerEvent::Polling);
231
232            match self.queue.claim_job(&self.config.worker_id).await {
233                Ok(Some(job)) => {
234                    worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
235
236                    // Process the job
237                    self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
238                        .await;
239                }
240                Ok(None) => {
241                    // No jobs available, wait before polling again
242                    tokio::select! {
243                        _ = tokio::time::sleep(self.config.poll_interval) => {}
244                        _ = cancel_token.cancelled() => break,
245                    }
246                }
247                Err(e) => {
248                    error!(error = %e, "Failed to claim job");
249                    // Back off on errors
250                    tokio::time::sleep(self.config.poll_interval * 2).await;
251                }
252            }
253        }
254
255        // Graceful shutdown: release any jobs this worker claimed
256        let released = self
257            .queue
258            .release_worker_jobs(&self.config.worker_id)
259            .await
260            .unwrap_or_else(|e| {
261                tracing::error!("Failed to release worker jobs on shutdown: {}", e);
262                0
263            });
264
265        worker_reporter.report(WorkerEvent::ShuttingDown {
266            worker_id: &self.config.worker_id,
267            jobs_released: released,
268        });
269
270        worker_reporter.report(WorkerEvent::Stopped {
271            worker_id: &self.config.worker_id,
272        });
273
274        Ok(())
275    }
276
277    /// Process a single job.
278    async fn process_job<WR, HR>(
279        &self,
280        job: &HarvestJob,
281        cancel_token: &CancellationToken,
282        worker_reporter: &WR,
283        harvest_reporter: &HR,
284    ) where
285        WR: WorkerReporter,
286        HR: ProgressReporter,
287    {
288        worker_reporter.report(WorkerEvent::JobStarted {
289            job_id: job.id,
290            portal_url: &job.portal_url,
291        });
292
293        // Create job-specific cancellation token
294        let job_cancel = cancel_token.child_token();
295
296        // Execute the harvest + embed pipeline with cancellation support.
297        // The pipeline first harvests metadata, then embeds pending datasets.
298        // TODO: Add portal_type to HarvestJob when Socrata/DCAT support is added
299        let language = job.language.as_deref().unwrap_or("en");
300        let result = self
301            .pipeline
302            .sync_portal_with_progress_cancellable_with_options(
303                &job.portal_url,
304                job.url_template.as_deref(),
305                language,
306                harvest_reporter,
307                job_cancel.clone(),
308                job.force_full_sync,
309                PortalType::default(),
310            )
311            .await;
312
313        match result {
314            Ok((sync_result, _embed_stats)) => {
315                if sync_result.is_cancelled() {
316                    // Job was cancelled
317                    worker_reporter.report(WorkerEvent::JobCancelled {
318                        job_id: job.id,
319                        stats: &sync_result.stats,
320                    });
321
322                    if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
323                        error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
324                    }
325                } else {
326                    // Job completed successfully
327                    worker_reporter.report(WorkerEvent::JobCompleted {
328                        job_id: job.id,
329                        stats: &sync_result.stats,
330                    });
331
332                    if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
333                        error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
334                    }
335                }
336            }
337            Err(e) => {
338                let error_msg = e.to_string();
339                let can_retry = job.can_retry() && e.is_retryable();
340
341                worker_reporter.report(WorkerEvent::JobFailed {
342                    job_id: job.id,
343                    error: &error_msg,
344                    will_retry: can_retry,
345                });
346
347                let next_retry = if can_retry {
348                    Some(job.calculate_next_retry(&self.config.retry_config))
349                } else {
350                    None
351                };
352
353                if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
354                    error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
355                }
356            }
357        }
358    }
359
360    /// Process a single job by ID (for one-off execution, e.g., CLI).
361    ///
362    /// This method is useful for processing a specific job without running
363    /// the full polling loop.
364    pub async fn process_single_job<WR, HR>(
365        &self,
366        job_id: Uuid,
367        cancel_token: CancellationToken,
368        worker_reporter: &WR,
369        harvest_reporter: &HR,
370    ) -> Result<(), AppError>
371    where
372        WR: WorkerReporter,
373        HR: ProgressReporter,
374    {
375        let job = self
376            .queue
377            .get_job(job_id)
378            .await?
379            .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
380
381        self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
382            .await;
383
384        Ok(())
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_silent_worker_reporter() {
394        let reporter = SilentWorkerReporter;
395        // Should not panic
396        reporter.report(WorkerEvent::Started {
397            worker_id: "test-worker",
398        });
399        reporter.report(WorkerEvent::Polling);
400        reporter.report(WorkerEvent::Stopped {
401            worker_id: "test-worker",
402        });
403    }
404
405    #[test]
406    fn test_tracing_worker_reporter() {
407        let reporter = TracingWorkerReporter;
408
409        // Test all event variants don't panic
410        reporter.report(WorkerEvent::Started {
411            worker_id: "test-worker",
412        });
413        reporter.report(WorkerEvent::Polling);
414
415        let stats = SyncStats {
416            created: 5,
417            updated: 3,
418            unchanged: 10,
419            failed: 1,
420            skipped: 0,
421        };
422        reporter.report(WorkerEvent::JobCompleted {
423            job_id: Uuid::new_v4(),
424            stats: &stats,
425        });
426        reporter.report(WorkerEvent::JobFailed {
427            job_id: Uuid::new_v4(),
428            error: "test error",
429            will_retry: true,
430        });
431        reporter.report(WorkerEvent::JobFailed {
432            job_id: Uuid::new_v4(),
433            error: "fatal error",
434            will_retry: false,
435        });
436        reporter.report(WorkerEvent::JobCancelled {
437            job_id: Uuid::new_v4(),
438            stats: &stats,
439        });
440        reporter.report(WorkerEvent::ShuttingDown {
441            worker_id: "test-worker",
442            jobs_released: 2,
443        });
444        reporter.report(WorkerEvent::Stopped {
445            worker_id: "test-worker",
446        });
447    }
448}