1use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::config::PortalType;
45use crate::error::AppError;
46use crate::job::{HarvestJob, WorkerConfig};
47use crate::job_queue::JobQueue;
48use crate::pipeline::HarvestPipeline;
49use crate::progress::ProgressReporter;
50use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
51
52#[derive(Debug, Clone)]
58pub enum WorkerEvent<'a> {
59 Started { worker_id: &'a str },
61 Polling,
63 JobClaimed { job: &'a HarvestJob },
65 JobStarted { job_id: Uuid, portal_url: &'a str },
67 JobCompleted { job_id: Uuid, stats: &'a SyncStats },
69 JobFailed {
71 job_id: Uuid,
72 error: &'a str,
73 will_retry: bool,
74 },
75 JobCancelled { job_id: Uuid, stats: &'a SyncStats },
77 ShuttingDown {
79 worker_id: &'a str,
80 jobs_released: u64,
81 },
82 Stopped { worker_id: &'a str },
84}
85
86pub trait WorkerReporter: Send + Sync {
94 fn report(&self, event: WorkerEvent<'_>) {
98 let _ = event;
99 }
100}
101
102#[derive(Debug, Default, Clone, Copy)]
104pub struct SilentWorkerReporter;
105
106impl WorkerReporter for SilentWorkerReporter {}
107
108#[derive(Debug, Default, Clone, Copy)]
110pub struct TracingWorkerReporter;
111
112impl WorkerReporter for TracingWorkerReporter {
113 fn report(&self, event: WorkerEvent<'_>) {
114 match event {
115 WorkerEvent::Started { worker_id } => {
116 info!(worker_id, "Worker started");
117 }
118 WorkerEvent::Polling => {
119 tracing::debug!("Polling for jobs...");
121 }
122 WorkerEvent::JobClaimed { job } => {
123 info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
124 }
125 WorkerEvent::JobStarted { job_id, portal_url } => {
126 info!(%job_id, portal = portal_url, "Processing job");
127 }
128 WorkerEvent::JobCompleted { job_id, stats } => {
129 info!(
130 %job_id,
131 created = stats.created,
132 updated = stats.updated,
133 unchanged = stats.unchanged,
134 failed = stats.failed,
135 "Job completed"
136 );
137 }
138 WorkerEvent::JobFailed {
139 job_id,
140 error,
141 will_retry,
142 } => {
143 if will_retry {
144 warn!(%job_id, %error, "Job failed, will retry");
145 } else {
146 error!(%job_id, %error, "Job permanently failed");
147 }
148 }
149 WorkerEvent::JobCancelled { job_id, stats } => {
150 info!(%job_id, processed = stats.total(), "Job cancelled");
151 }
152 WorkerEvent::ShuttingDown {
153 worker_id,
154 jobs_released,
155 } => {
156 info!(worker_id, jobs_released, "Worker shutting down");
157 }
158 WorkerEvent::Stopped { worker_id } => {
159 info!(worker_id, "Worker stopped");
160 }
161 }
162 }
163}
164
165pub struct WorkerService<Q, S, E, F>
175where
176 Q: JobQueue,
177 S: DatasetStore,
178 E: EmbeddingProvider,
179 F: PortalClientFactory,
180{
181 queue: Q,
182 pipeline: HarvestPipeline<S, E, F>,
183 config: WorkerConfig,
184}
185
186impl<Q, S, E, F> WorkerService<Q, S, E, F>
187where
188 Q: JobQueue,
189 S: DatasetStore + Clone,
190 E: EmbeddingProvider,
191 F: PortalClientFactory,
192{
193 pub fn new(queue: Q, pipeline: HarvestPipeline<S, E, F>, config: WorkerConfig) -> Self {
195 Self {
196 queue,
197 pipeline,
198 config,
199 }
200 }
201
202 pub async fn run<WR, HR>(
210 &self,
211 cancel_token: CancellationToken,
212 worker_reporter: &WR,
213 harvest_reporter: &HR,
214 ) -> Result<(), AppError>
215 where
216 WR: WorkerReporter,
217 HR: ProgressReporter,
218 {
219 worker_reporter.report(WorkerEvent::Started {
220 worker_id: &self.config.worker_id,
221 });
222
223 loop {
224 if cancel_token.is_cancelled() {
226 break;
227 }
228
229 worker_reporter.report(WorkerEvent::Polling);
231
232 match self.queue.claim_job(&self.config.worker_id).await {
233 Ok(Some(job)) => {
234 worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
235
236 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
238 .await;
239 }
240 Ok(None) => {
241 tokio::select! {
243 _ = tokio::time::sleep(self.config.poll_interval) => {}
244 _ = cancel_token.cancelled() => break,
245 }
246 }
247 Err(e) => {
248 error!(error = %e, "Failed to claim job");
249 tokio::time::sleep(self.config.poll_interval * 2).await;
251 }
252 }
253 }
254
255 let released = self
257 .queue
258 .release_worker_jobs(&self.config.worker_id)
259 .await
260 .unwrap_or_else(|e| {
261 tracing::error!("Failed to release worker jobs on shutdown: {}", e);
262 0
263 });
264
265 worker_reporter.report(WorkerEvent::ShuttingDown {
266 worker_id: &self.config.worker_id,
267 jobs_released: released,
268 });
269
270 worker_reporter.report(WorkerEvent::Stopped {
271 worker_id: &self.config.worker_id,
272 });
273
274 Ok(())
275 }
276
277 async fn process_job<WR, HR>(
279 &self,
280 job: &HarvestJob,
281 cancel_token: &CancellationToken,
282 worker_reporter: &WR,
283 harvest_reporter: &HR,
284 ) where
285 WR: WorkerReporter,
286 HR: ProgressReporter,
287 {
288 worker_reporter.report(WorkerEvent::JobStarted {
289 job_id: job.id,
290 portal_url: &job.portal_url,
291 });
292
293 let job_cancel = cancel_token.child_token();
295
296 let language = job.language.as_deref().unwrap_or("en");
300 let result = self
301 .pipeline
302 .sync_portal_with_progress_cancellable_with_options(
303 &job.portal_url,
304 job.url_template.as_deref(),
305 language,
306 harvest_reporter,
307 job_cancel.clone(),
308 job.force_full_sync,
309 PortalType::default(),
310 )
311 .await;
312
313 match result {
314 Ok((sync_result, _embed_stats)) => {
315 if sync_result.is_cancelled() {
316 worker_reporter.report(WorkerEvent::JobCancelled {
318 job_id: job.id,
319 stats: &sync_result.stats,
320 });
321
322 if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
323 error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
324 }
325 } else {
326 worker_reporter.report(WorkerEvent::JobCompleted {
328 job_id: job.id,
329 stats: &sync_result.stats,
330 });
331
332 if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
333 error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
334 }
335 }
336 }
337 Err(e) => {
338 let error_msg = e.to_string();
339 let can_retry = job.can_retry() && e.is_retryable();
340
341 worker_reporter.report(WorkerEvent::JobFailed {
342 job_id: job.id,
343 error: &error_msg,
344 will_retry: can_retry,
345 });
346
347 let next_retry = if can_retry {
348 Some(job.calculate_next_retry(&self.config.retry_config))
349 } else {
350 None
351 };
352
353 if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
354 error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
355 }
356 }
357 }
358 }
359
360 pub async fn process_single_job<WR, HR>(
365 &self,
366 job_id: Uuid,
367 cancel_token: CancellationToken,
368 worker_reporter: &WR,
369 harvest_reporter: &HR,
370 ) -> Result<(), AppError>
371 where
372 WR: WorkerReporter,
373 HR: ProgressReporter,
374 {
375 let job = self
376 .queue
377 .get_job(job_id)
378 .await?
379 .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
380
381 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
382 .await;
383
384 Ok(())
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_silent_worker_reporter() {
394 let reporter = SilentWorkerReporter;
395 reporter.report(WorkerEvent::Started {
397 worker_id: "test-worker",
398 });
399 reporter.report(WorkerEvent::Polling);
400 reporter.report(WorkerEvent::Stopped {
401 worker_id: "test-worker",
402 });
403 }
404
405 #[test]
406 fn test_tracing_worker_reporter() {
407 let reporter = TracingWorkerReporter;
408
409 reporter.report(WorkerEvent::Started {
411 worker_id: "test-worker",
412 });
413 reporter.report(WorkerEvent::Polling);
414
415 let stats = SyncStats {
416 created: 5,
417 updated: 3,
418 unchanged: 10,
419 failed: 1,
420 skipped: 0,
421 };
422 reporter.report(WorkerEvent::JobCompleted {
423 job_id: Uuid::new_v4(),
424 stats: &stats,
425 });
426 reporter.report(WorkerEvent::JobFailed {
427 job_id: Uuid::new_v4(),
428 error: "test error",
429 will_retry: true,
430 });
431 reporter.report(WorkerEvent::JobFailed {
432 job_id: Uuid::new_v4(),
433 error: "fatal error",
434 will_retry: false,
435 });
436 reporter.report(WorkerEvent::JobCancelled {
437 job_id: Uuid::new_v4(),
438 stats: &stats,
439 });
440 reporter.report(WorkerEvent::ShuttingDown {
441 worker_id: "test-worker",
442 jobs_released: 2,
443 });
444 reporter.report(WorkerEvent::Stopped {
445 worker_id: "test-worker",
446 });
447 }
448}