Skip to main content

ceres_core/
worker.rs

1//! Worker service for processing harvest jobs from the queue.
2//!
3//! This module provides the [`WorkerService`] that polls for pending jobs
4//! and processes them using the [`HarvestService`].
5//!
6//! # Architecture
7//!
8//! The worker follows a poll-based model:
9//! ```text
10//! loop {
11//!     1. Check for cancellation
12//!     2. Claim next available job (SELECT FOR UPDATE SKIP LOCKED)
13//!     3. Process job using HarvestService
14//!     4. Update job status (completed/failed/cancelled)
15//!     5. If no jobs available, sleep for poll_interval
16//! }
17//! ```
18//!
19//! # Graceful Shutdown
20//!
21//! On cancellation token trigger:
22//! - Stops claiming new jobs
23//! - Allows current job to complete or cancel gracefully
24//! - Releases any claimed jobs back to the queue
25//!
26//! # Example
27//!
28//! ```ignore
29//! use ceres_core::worker::{WorkerService, WorkerConfig, TracingWorkerReporter};
30//! use ceres_core::progress::TracingReporter;
31//! use tokio_util::sync::CancellationToken;
32//!
33//! let worker = WorkerService::new(job_queue, harvest_service, WorkerConfig::default());
34//! let cancel = CancellationToken::new();
35//!
36//! worker.run(cancel, &TracingWorkerReporter, &TracingReporter).await?;
37//! ```
38
39use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::error::AppError;
45use crate::harvest::HarvestService;
46use crate::job::{HarvestJob, WorkerConfig};
47use crate::job_queue::JobQueue;
48use crate::progress::ProgressReporter;
49use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
50
51// =============================================================================
52// Worker Events
53// =============================================================================
54
55/// Events emitted by the worker during operation.
56#[derive(Debug, Clone)]
57pub enum WorkerEvent<'a> {
58    /// Worker started and is ready to process jobs.
59    Started { worker_id: &'a str },
60    /// Worker is polling for new jobs.
61    Polling,
62    /// Worker claimed a job.
63    JobClaimed { job: &'a HarvestJob },
64    /// Job processing started.
65    JobStarted { job_id: Uuid, portal_url: &'a str },
66    /// Job completed successfully.
67    JobCompleted { job_id: Uuid, stats: &'a SyncStats },
68    /// Job failed with error.
69    JobFailed {
70        job_id: Uuid,
71        error: &'a str,
72        will_retry: bool,
73    },
74    /// Job was cancelled.
75    JobCancelled { job_id: Uuid, stats: &'a SyncStats },
76    /// Worker is shutting down.
77    ShuttingDown {
78        worker_id: &'a str,
79        jobs_released: u64,
80    },
81    /// Worker stopped.
82    Stopped { worker_id: &'a str },
83}
84
85// =============================================================================
86// Worker Reporter Trait
87// =============================================================================
88
89/// Trait for reporting worker events.
90///
91/// Similar to [`ProgressReporter`] but for worker-level events.
92pub trait WorkerReporter: Send + Sync {
93    /// Called when a worker event occurs.
94    ///
95    /// The default implementation does nothing (silent mode).
96    fn report(&self, event: WorkerEvent<'_>) {
97        let _ = event;
98    }
99}
100
101/// Silent worker reporter that ignores all events.
102#[derive(Debug, Default, Clone, Copy)]
103pub struct SilentWorkerReporter;
104
105impl WorkerReporter for SilentWorkerReporter {}
106
107/// Tracing-based worker reporter for CLI/server logging.
108#[derive(Debug, Default, Clone, Copy)]
109pub struct TracingWorkerReporter;
110
111impl WorkerReporter for TracingWorkerReporter {
112    fn report(&self, event: WorkerEvent<'_>) {
113        match event {
114            WorkerEvent::Started { worker_id } => {
115                info!(worker_id, "Worker started");
116            }
117            WorkerEvent::Polling => {
118                // Debug level to avoid spam
119                tracing::debug!("Polling for jobs...");
120            }
121            WorkerEvent::JobClaimed { job } => {
122                info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
123            }
124            WorkerEvent::JobStarted { job_id, portal_url } => {
125                info!(%job_id, portal = portal_url, "Processing job");
126            }
127            WorkerEvent::JobCompleted { job_id, stats } => {
128                info!(
129                    %job_id,
130                    created = stats.created,
131                    updated = stats.updated,
132                    unchanged = stats.unchanged,
133                    failed = stats.failed,
134                    "Job completed"
135                );
136            }
137            WorkerEvent::JobFailed {
138                job_id,
139                error,
140                will_retry,
141            } => {
142                if will_retry {
143                    warn!(%job_id, %error, "Job failed, will retry");
144                } else {
145                    error!(%job_id, %error, "Job permanently failed");
146                }
147            }
148            WorkerEvent::JobCancelled { job_id, stats } => {
149                info!(%job_id, processed = stats.total(), "Job cancelled");
150            }
151            WorkerEvent::ShuttingDown {
152                worker_id,
153                jobs_released,
154            } => {
155                info!(worker_id, jobs_released, "Worker shutting down");
156            }
157            WorkerEvent::Stopped { worker_id } => {
158                info!(worker_id, "Worker stopped");
159            }
160        }
161    }
162}
163
164// =============================================================================
165// Worker Service
166// =============================================================================
167
168/// Worker service that processes jobs from the queue.
169///
170/// This service polls for pending jobs and processes them using the
171/// [`HarvestService`]. It supports graceful shutdown via cancellation tokens.
172pub struct WorkerService<Q, S, E, F>
173where
174    Q: JobQueue,
175    S: DatasetStore,
176    E: EmbeddingProvider,
177    F: PortalClientFactory,
178{
179    queue: Q,
180    harvest_service: HarvestService<S, E, F>,
181    config: WorkerConfig,
182}
183
184impl<Q, S, E, F> WorkerService<Q, S, E, F>
185where
186    Q: JobQueue,
187    S: DatasetStore,
188    E: EmbeddingProvider,
189    F: PortalClientFactory,
190{
191    /// Create a new worker service.
192    pub fn new(queue: Q, harvest_service: HarvestService<S, E, F>, config: WorkerConfig) -> Self {
193        Self {
194            queue,
195            harvest_service,
196            config,
197        }
198    }
199
200    /// Run the worker until cancelled.
201    ///
202    /// The worker will:
203    /// 1. Poll for available jobs
204    /// 2. Claim and process jobs
205    /// 3. Handle retries on failure
206    /// 4. Release jobs on graceful shutdown
207    pub async fn run<WR, HR>(
208        &self,
209        cancel_token: CancellationToken,
210        worker_reporter: &WR,
211        harvest_reporter: &HR,
212    ) -> Result<(), AppError>
213    where
214        WR: WorkerReporter,
215        HR: ProgressReporter,
216    {
217        worker_reporter.report(WorkerEvent::Started {
218            worker_id: &self.config.worker_id,
219        });
220
221        loop {
222            // Check for shutdown
223            if cancel_token.is_cancelled() {
224                break;
225            }
226
227            // Poll for jobs
228            worker_reporter.report(WorkerEvent::Polling);
229
230            match self.queue.claim_job(&self.config.worker_id).await {
231                Ok(Some(job)) => {
232                    worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
233
234                    // Process the job
235                    self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
236                        .await;
237                }
238                Ok(None) => {
239                    // No jobs available, wait before polling again
240                    tokio::select! {
241                        _ = tokio::time::sleep(self.config.poll_interval) => {}
242                        _ = cancel_token.cancelled() => break,
243                    }
244                }
245                Err(e) => {
246                    error!(error = %e, "Failed to claim job");
247                    // Back off on errors
248                    tokio::time::sleep(self.config.poll_interval * 2).await;
249                }
250            }
251        }
252
253        // Graceful shutdown: release any jobs this worker claimed
254        let released = self
255            .queue
256            .release_worker_jobs(&self.config.worker_id)
257            .await
258            .unwrap_or(0);
259
260        worker_reporter.report(WorkerEvent::ShuttingDown {
261            worker_id: &self.config.worker_id,
262            jobs_released: released,
263        });
264
265        worker_reporter.report(WorkerEvent::Stopped {
266            worker_id: &self.config.worker_id,
267        });
268
269        Ok(())
270    }
271
272    /// Process a single job.
273    async fn process_job<WR, HR>(
274        &self,
275        job: &HarvestJob,
276        cancel_token: &CancellationToken,
277        worker_reporter: &WR,
278        harvest_reporter: &HR,
279    ) where
280        WR: WorkerReporter,
281        HR: ProgressReporter,
282    {
283        worker_reporter.report(WorkerEvent::JobStarted {
284            job_id: job.id,
285            portal_url: &job.portal_url,
286        });
287
288        // Create job-specific cancellation token
289        let job_cancel = cancel_token.child_token();
290
291        // Execute the harvest with cancellation support
292        let result = self
293            .harvest_service
294            .sync_portal_with_progress_cancellable_with_options(
295                &job.portal_url,
296                harvest_reporter,
297                job_cancel.clone(),
298                job.force_full_sync,
299            )
300            .await;
301
302        match result {
303            Ok(sync_result) => {
304                if sync_result.is_cancelled() {
305                    // Job was cancelled
306                    worker_reporter.report(WorkerEvent::JobCancelled {
307                        job_id: job.id,
308                        stats: &sync_result.stats,
309                    });
310
311                    if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
312                        error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
313                    }
314                } else {
315                    // Job completed successfully
316                    worker_reporter.report(WorkerEvent::JobCompleted {
317                        job_id: job.id,
318                        stats: &sync_result.stats,
319                    });
320
321                    if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
322                        error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
323                    }
324                }
325            }
326            Err(e) => {
327                let error_msg = e.to_string();
328                let can_retry = job.can_retry() && e.is_retryable();
329
330                worker_reporter.report(WorkerEvent::JobFailed {
331                    job_id: job.id,
332                    error: &error_msg,
333                    will_retry: can_retry,
334                });
335
336                let next_retry = if can_retry {
337                    Some(job.calculate_next_retry(&self.config.retry_config))
338                } else {
339                    None
340                };
341
342                if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
343                    error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
344                }
345            }
346        }
347    }
348
349    /// Process a single job by ID (for one-off execution, e.g., CLI).
350    ///
351    /// This method is useful for processing a specific job without running
352    /// the full polling loop.
353    pub async fn process_single_job<WR, HR>(
354        &self,
355        job_id: Uuid,
356        cancel_token: CancellationToken,
357        worker_reporter: &WR,
358        harvest_reporter: &HR,
359    ) -> Result<(), AppError>
360    where
361        WR: WorkerReporter,
362        HR: ProgressReporter,
363    {
364        let job = self
365            .queue
366            .get_job(job_id)
367            .await?
368            .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
369
370        self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
371            .await;
372
373        Ok(())
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn test_silent_worker_reporter() {
383        let reporter = SilentWorkerReporter;
384        // Should not panic
385        reporter.report(WorkerEvent::Started {
386            worker_id: "test-worker",
387        });
388        reporter.report(WorkerEvent::Polling);
389        reporter.report(WorkerEvent::Stopped {
390            worker_id: "test-worker",
391        });
392    }
393
394    #[test]
395    fn test_tracing_worker_reporter() {
396        let reporter = TracingWorkerReporter;
397
398        // Test all event variants don't panic
399        reporter.report(WorkerEvent::Started {
400            worker_id: "test-worker",
401        });
402        reporter.report(WorkerEvent::Polling);
403
404        let stats = SyncStats {
405            created: 5,
406            updated: 3,
407            unchanged: 10,
408            failed: 1,
409            skipped: 0,
410        };
411        reporter.report(WorkerEvent::JobCompleted {
412            job_id: Uuid::new_v4(),
413            stats: &stats,
414        });
415        reporter.report(WorkerEvent::JobFailed {
416            job_id: Uuid::new_v4(),
417            error: "test error",
418            will_retry: true,
419        });
420        reporter.report(WorkerEvent::JobFailed {
421            job_id: Uuid::new_v4(),
422            error: "fatal error",
423            will_retry: false,
424        });
425        reporter.report(WorkerEvent::JobCancelled {
426            job_id: Uuid::new_v4(),
427            stats: &stats,
428        });
429        reporter.report(WorkerEvent::ShuttingDown {
430            worker_id: "test-worker",
431            jobs_released: 2,
432        });
433        reporter.report(WorkerEvent::Stopped {
434            worker_id: "test-worker",
435        });
436    }
437}