Skip to main content

ceres_core/
worker.rs

1//! Worker service for processing harvest jobs from the queue.
2//!
3//! This module provides the [`WorkerService`] that polls for pending jobs
4//! and processes them using the [`crate::HarvestPipeline`].
5//!
6//! # Architecture
7//!
8//! The worker follows a poll-based model:
9//! ```text
10//! loop {
11//!     1. Check for cancellation
12//!     2. Claim next available job (SELECT FOR UPDATE SKIP LOCKED)
13//!     3. Process job using HarvestPipeline
14//!     4. Update job status (completed/failed/cancelled)
15//!     5. If no jobs available, sleep for poll_interval
16//! }
17//! ```
18//!
19//! # Graceful Shutdown
20//!
21//! On cancellation token trigger:
22//! - Stops claiming new jobs
23//! - Allows current job to complete or cancel gracefully
24//! - Releases any claimed jobs back to the queue
25//!
26//! # Example
27//!
28//! ```ignore
29//! use ceres_core::worker::{WorkerService, WorkerConfig, TracingWorkerReporter};
30//! use ceres_core::progress::TracingReporter;
31//! use tokio_util::sync::CancellationToken;
32//!
33//! let worker = WorkerService::new(job_queue, harvest_service, WorkerConfig::default());
34//! let cancel = CancellationToken::new();
35//!
36//! worker.run(cancel, &TracingWorkerReporter, &TracingReporter).await?;
37//! ```
38
39use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::error::AppError;
45use crate::job::{HarvestJob, WorkerConfig};
46use crate::job_queue::JobQueue;
47use crate::pipeline::HarvestPipeline;
48use crate::progress::ProgressReporter;
49use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
50
51// =============================================================================
52// Worker Events
53// =============================================================================
54
55/// Events emitted by the worker during operation.
56#[derive(Debug, Clone)]
57pub enum WorkerEvent<'a> {
58    /// Worker started and is ready to process jobs.
59    Started { worker_id: &'a str },
60    /// Worker is polling for new jobs.
61    Polling,
62    /// Worker claimed a job.
63    JobClaimed { job: &'a HarvestJob },
64    /// Job processing started.
65    JobStarted { job_id: Uuid, portal_url: &'a str },
66    /// Job completed successfully.
67    JobCompleted { job_id: Uuid, stats: &'a SyncStats },
68    /// Job failed with error.
69    JobFailed {
70        job_id: Uuid,
71        error: &'a str,
72        will_retry: bool,
73    },
74    /// Job was cancelled.
75    JobCancelled { job_id: Uuid, stats: &'a SyncStats },
76    /// Worker is shutting down.
77    ShuttingDown {
78        worker_id: &'a str,
79        jobs_released: u64,
80    },
81    /// Worker stopped.
82    Stopped { worker_id: &'a str },
83}
84
85// =============================================================================
86// Worker Reporter Trait
87// =============================================================================
88
89/// Trait for reporting worker events.
90///
91/// Similar to [`ProgressReporter`] but for worker-level events.
92pub trait WorkerReporter: Send + Sync {
93    /// Called when a worker event occurs.
94    ///
95    /// The default implementation does nothing (silent mode).
96    fn report(&self, event: WorkerEvent<'_>) {
97        let _ = event;
98    }
99}
100
101/// Silent worker reporter that ignores all events.
102#[derive(Debug, Default, Clone, Copy)]
103pub struct SilentWorkerReporter;
104
105impl WorkerReporter for SilentWorkerReporter {}
106
107/// Tracing-based worker reporter for CLI/server logging.
108#[derive(Debug, Default, Clone, Copy)]
109pub struct TracingWorkerReporter;
110
111impl WorkerReporter for TracingWorkerReporter {
112    fn report(&self, event: WorkerEvent<'_>) {
113        match event {
114            WorkerEvent::Started { worker_id } => {
115                info!(worker_id, "Worker started");
116            }
117            WorkerEvent::Polling => {
118                // Debug level to avoid spam
119                tracing::debug!("Polling for jobs...");
120            }
121            WorkerEvent::JobClaimed { job } => {
122                info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
123            }
124            WorkerEvent::JobStarted { job_id, portal_url } => {
125                info!(%job_id, portal = portal_url, "Processing job");
126            }
127            WorkerEvent::JobCompleted { job_id, stats } => {
128                info!(
129                    %job_id,
130                    created = stats.created,
131                    updated = stats.updated,
132                    unchanged = stats.unchanged,
133                    failed = stats.failed,
134                    "Job completed"
135                );
136            }
137            WorkerEvent::JobFailed {
138                job_id,
139                error,
140                will_retry,
141            } => {
142                if will_retry {
143                    warn!(%job_id, %error, "Job failed, will retry");
144                } else {
145                    error!(%job_id, %error, "Job permanently failed");
146                }
147            }
148            WorkerEvent::JobCancelled { job_id, stats } => {
149                info!(%job_id, processed = stats.total(), "Job cancelled");
150            }
151            WorkerEvent::ShuttingDown {
152                worker_id,
153                jobs_released,
154            } => {
155                info!(worker_id, jobs_released, "Worker shutting down");
156            }
157            WorkerEvent::Stopped { worker_id } => {
158                info!(worker_id, "Worker stopped");
159            }
160        }
161    }
162}
163
164// =============================================================================
165// Worker Service
166// =============================================================================
167
168/// Worker service that processes jobs from the queue.
169///
170/// This service polls for pending jobs and processes them using the
171/// [`HarvestPipeline`]. It supports both full harvest+embed and metadata-only
172/// modes, and graceful shutdown via cancellation tokens.
173pub struct WorkerService<Q, S, E, F>
174where
175    Q: JobQueue,
176    S: DatasetStore,
177    E: EmbeddingProvider,
178    F: PortalClientFactory,
179{
180    queue: Q,
181    pipeline: HarvestPipeline<S, E, F>,
182    config: WorkerConfig,
183}
184
185impl<Q, S, E, F> WorkerService<Q, S, E, F>
186where
187    Q: JobQueue,
188    S: DatasetStore + Clone,
189    E: EmbeddingProvider,
190    F: PortalClientFactory,
191{
192    /// Create a new worker service.
193    pub fn new(queue: Q, pipeline: HarvestPipeline<S, E, F>, config: WorkerConfig) -> Self {
194        Self {
195            queue,
196            pipeline,
197            config,
198        }
199    }
200
201    /// Run the worker until cancelled.
202    ///
203    /// The worker will:
204    /// 1. Poll for available jobs
205    /// 2. Claim and process jobs
206    /// 3. Handle retries on failure
207    /// 4. Release jobs on graceful shutdown
208    pub async fn run<WR, HR>(
209        &self,
210        cancel_token: CancellationToken,
211        worker_reporter: &WR,
212        harvest_reporter: &HR,
213    ) -> Result<(), AppError>
214    where
215        WR: WorkerReporter,
216        HR: ProgressReporter,
217    {
218        worker_reporter.report(WorkerEvent::Started {
219            worker_id: &self.config.worker_id,
220        });
221
222        loop {
223            // Check for shutdown
224            if cancel_token.is_cancelled() {
225                break;
226            }
227
228            // Poll for jobs
229            worker_reporter.report(WorkerEvent::Polling);
230
231            match self.queue.claim_job(&self.config.worker_id).await {
232                Ok(Some(job)) => {
233                    worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
234
235                    // Process the job
236                    self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
237                        .await;
238                }
239                Ok(None) => {
240                    // No jobs available, wait before polling again
241                    tokio::select! {
242                        _ = tokio::time::sleep(self.config.poll_interval) => {}
243                        _ = cancel_token.cancelled() => break,
244                    }
245                }
246                Err(e) => {
247                    error!(error = %e, "Failed to claim job");
248                    // Back off on errors
249                    tokio::time::sleep(self.config.poll_interval * 2).await;
250                }
251            }
252        }
253
254        // Graceful shutdown: release any jobs this worker claimed
255        let released = self
256            .queue
257            .release_worker_jobs(&self.config.worker_id)
258            .await
259            .unwrap_or_else(|e| {
260                tracing::error!("Failed to release worker jobs on shutdown: {}", e);
261                0
262            });
263
264        worker_reporter.report(WorkerEvent::ShuttingDown {
265            worker_id: &self.config.worker_id,
266            jobs_released: released,
267        });
268
269        worker_reporter.report(WorkerEvent::Stopped {
270            worker_id: &self.config.worker_id,
271        });
272
273        Ok(())
274    }
275
276    /// Process a single job.
277    async fn process_job<WR, HR>(
278        &self,
279        job: &HarvestJob,
280        cancel_token: &CancellationToken,
281        worker_reporter: &WR,
282        harvest_reporter: &HR,
283    ) where
284        WR: WorkerReporter,
285        HR: ProgressReporter,
286    {
287        worker_reporter.report(WorkerEvent::JobStarted {
288            job_id: job.id,
289            portal_url: &job.portal_url,
290        });
291
292        // Create job-specific cancellation token
293        let job_cancel = cancel_token.child_token();
294
295        // Execute the harvest + embed pipeline with cancellation support.
296        // The pipeline first harvests metadata, then embeds pending datasets.
297        let language = job.language.as_deref().unwrap_or("en");
298        let result = self
299            .pipeline
300            .sync_portal_with_progress_cancellable_with_options(
301                &job.portal_url,
302                job.url_template.as_deref(),
303                language,
304                harvest_reporter,
305                job_cancel.clone(),
306                job.force_full_sync,
307                job.portal_type,
308                job.profile.as_deref(),
309                job.sparql_endpoint.as_deref(),
310            )
311            .await;
312
313        match result {
314            Ok((sync_result, _embed_stats)) => {
315                if sync_result.is_cancelled() {
316                    // Job was cancelled
317                    worker_reporter.report(WorkerEvent::JobCancelled {
318                        job_id: job.id,
319                        stats: &sync_result.stats,
320                    });
321
322                    if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
323                        error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
324                    }
325                } else {
326                    // Job completed successfully
327                    worker_reporter.report(WorkerEvent::JobCompleted {
328                        job_id: job.id,
329                        stats: &sync_result.stats,
330                    });
331
332                    if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
333                        error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
334                    }
335                }
336            }
337            Err(e) => {
338                let error_msg = e.to_string();
339                let can_retry = job.can_retry() && e.is_retryable();
340
341                worker_reporter.report(WorkerEvent::JobFailed {
342                    job_id: job.id,
343                    error: &error_msg,
344                    will_retry: can_retry,
345                });
346
347                let next_retry = if can_retry {
348                    Some(job.calculate_next_retry(&self.config.retry_config))
349                } else {
350                    None
351                };
352
353                if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
354                    error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
355                }
356            }
357        }
358    }
359
360    /// Process a single job by ID (for one-off execution, e.g., CLI).
361    ///
362    /// This method is useful for processing a specific job without running
363    /// the full polling loop.
364    pub async fn process_single_job<WR, HR>(
365        &self,
366        job_id: Uuid,
367        cancel_token: CancellationToken,
368        worker_reporter: &WR,
369        harvest_reporter: &HR,
370    ) -> Result<(), AppError>
371    where
372        WR: WorkerReporter,
373        HR: ProgressReporter,
374    {
375        let job = self
376            .queue
377            .get_job(job_id)
378            .await?
379            .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
380
381        self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
382            .await;
383
384        Ok(())
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_silent_worker_reporter() {
394        let reporter = SilentWorkerReporter;
395        // Should not panic
396        reporter.report(WorkerEvent::Started {
397            worker_id: "test-worker",
398        });
399        reporter.report(WorkerEvent::Polling);
400        reporter.report(WorkerEvent::Stopped {
401            worker_id: "test-worker",
402        });
403    }
404
405    #[test]
406    fn test_tracing_worker_reporter() {
407        let reporter = TracingWorkerReporter;
408
409        // Test all event variants don't panic
410        reporter.report(WorkerEvent::Started {
411            worker_id: "test-worker",
412        });
413        reporter.report(WorkerEvent::Polling);
414
415        let stats = SyncStats {
416            created: 5,
417            updated: 3,
418            unchanged: 10,
419            failed: 1,
420            skipped: 0,
421        };
422        reporter.report(WorkerEvent::JobCompleted {
423            job_id: Uuid::new_v4(),
424            stats: &stats,
425        });
426        reporter.report(WorkerEvent::JobFailed {
427            job_id: Uuid::new_v4(),
428            error: "test error",
429            will_retry: true,
430        });
431        reporter.report(WorkerEvent::JobFailed {
432            job_id: Uuid::new_v4(),
433            error: "fatal error",
434            will_retry: false,
435        });
436        reporter.report(WorkerEvent::JobCancelled {
437            job_id: Uuid::new_v4(),
438            stats: &stats,
439        });
440        reporter.report(WorkerEvent::ShuttingDown {
441            worker_id: "test-worker",
442            jobs_released: 2,
443        });
444        reporter.report(WorkerEvent::Stopped {
445            worker_id: "test-worker",
446        });
447    }
448}