1use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::error::AppError;
45use crate::job::{HarvestJob, WorkerConfig};
46use crate::job_queue::JobQueue;
47use crate::pipeline::HarvestPipeline;
48use crate::progress::ProgressReporter;
49use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
50
51#[derive(Debug, Clone)]
57pub enum WorkerEvent<'a> {
58 Started { worker_id: &'a str },
60 Polling,
62 JobClaimed { job: &'a HarvestJob },
64 JobStarted { job_id: Uuid, portal_url: &'a str },
66 JobCompleted { job_id: Uuid, stats: &'a SyncStats },
68 JobFailed {
70 job_id: Uuid,
71 error: &'a str,
72 will_retry: bool,
73 },
74 JobCancelled { job_id: Uuid, stats: &'a SyncStats },
76 ShuttingDown {
78 worker_id: &'a str,
79 jobs_released: u64,
80 },
81 Stopped { worker_id: &'a str },
83}
84
85pub trait WorkerReporter: Send + Sync {
93 fn report(&self, event: WorkerEvent<'_>) {
97 let _ = event;
98 }
99}
100
101#[derive(Debug, Default, Clone, Copy)]
103pub struct SilentWorkerReporter;
104
105impl WorkerReporter for SilentWorkerReporter {}
106
107#[derive(Debug, Default, Clone, Copy)]
109pub struct TracingWorkerReporter;
110
111impl WorkerReporter for TracingWorkerReporter {
112 fn report(&self, event: WorkerEvent<'_>) {
113 match event {
114 WorkerEvent::Started { worker_id } => {
115 info!(worker_id, "Worker started");
116 }
117 WorkerEvent::Polling => {
118 tracing::debug!("Polling for jobs...");
120 }
121 WorkerEvent::JobClaimed { job } => {
122 info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
123 }
124 WorkerEvent::JobStarted { job_id, portal_url } => {
125 info!(%job_id, portal = portal_url, "Processing job");
126 }
127 WorkerEvent::JobCompleted { job_id, stats } => {
128 info!(
129 %job_id,
130 created = stats.created,
131 updated = stats.updated,
132 unchanged = stats.unchanged,
133 failed = stats.failed,
134 "Job completed"
135 );
136 }
137 WorkerEvent::JobFailed {
138 job_id,
139 error,
140 will_retry,
141 } => {
142 if will_retry {
143 warn!(%job_id, %error, "Job failed, will retry");
144 } else {
145 error!(%job_id, %error, "Job permanently failed");
146 }
147 }
148 WorkerEvent::JobCancelled { job_id, stats } => {
149 info!(%job_id, processed = stats.total(), "Job cancelled");
150 }
151 WorkerEvent::ShuttingDown {
152 worker_id,
153 jobs_released,
154 } => {
155 info!(worker_id, jobs_released, "Worker shutting down");
156 }
157 WorkerEvent::Stopped { worker_id } => {
158 info!(worker_id, "Worker stopped");
159 }
160 }
161 }
162}
163
164pub struct WorkerService<Q, S, E, F>
174where
175 Q: JobQueue,
176 S: DatasetStore,
177 E: EmbeddingProvider,
178 F: PortalClientFactory,
179{
180 queue: Q,
181 pipeline: HarvestPipeline<S, E, F>,
182 config: WorkerConfig,
183}
184
185impl<Q, S, E, F> WorkerService<Q, S, E, F>
186where
187 Q: JobQueue,
188 S: DatasetStore + Clone,
189 E: EmbeddingProvider,
190 F: PortalClientFactory,
191{
192 pub fn new(queue: Q, pipeline: HarvestPipeline<S, E, F>, config: WorkerConfig) -> Self {
194 Self {
195 queue,
196 pipeline,
197 config,
198 }
199 }
200
201 pub async fn run<WR, HR>(
209 &self,
210 cancel_token: CancellationToken,
211 worker_reporter: &WR,
212 harvest_reporter: &HR,
213 ) -> Result<(), AppError>
214 where
215 WR: WorkerReporter,
216 HR: ProgressReporter,
217 {
218 worker_reporter.report(WorkerEvent::Started {
219 worker_id: &self.config.worker_id,
220 });
221
222 loop {
223 if cancel_token.is_cancelled() {
225 break;
226 }
227
228 worker_reporter.report(WorkerEvent::Polling);
230
231 match self.queue.claim_job(&self.config.worker_id).await {
232 Ok(Some(job)) => {
233 worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
234
235 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
237 .await;
238 }
239 Ok(None) => {
240 tokio::select! {
242 _ = tokio::time::sleep(self.config.poll_interval) => {}
243 _ = cancel_token.cancelled() => break,
244 }
245 }
246 Err(e) => {
247 error!(error = %e, "Failed to claim job");
248 tokio::time::sleep(self.config.poll_interval * 2).await;
250 }
251 }
252 }
253
254 let released = self
256 .queue
257 .release_worker_jobs(&self.config.worker_id)
258 .await
259 .unwrap_or_else(|e| {
260 tracing::error!("Failed to release worker jobs on shutdown: {}", e);
261 0
262 });
263
264 worker_reporter.report(WorkerEvent::ShuttingDown {
265 worker_id: &self.config.worker_id,
266 jobs_released: released,
267 });
268
269 worker_reporter.report(WorkerEvent::Stopped {
270 worker_id: &self.config.worker_id,
271 });
272
273 Ok(())
274 }
275
276 async fn process_job<WR, HR>(
278 &self,
279 job: &HarvestJob,
280 cancel_token: &CancellationToken,
281 worker_reporter: &WR,
282 harvest_reporter: &HR,
283 ) where
284 WR: WorkerReporter,
285 HR: ProgressReporter,
286 {
287 worker_reporter.report(WorkerEvent::JobStarted {
288 job_id: job.id,
289 portal_url: &job.portal_url,
290 });
291
292 let job_cancel = cancel_token.child_token();
294
295 let language = job.language.as_deref().unwrap_or("en");
298 let result = self
299 .pipeline
300 .sync_portal_with_progress_cancellable_with_options(
301 &job.portal_url,
302 job.url_template.as_deref(),
303 language,
304 harvest_reporter,
305 job_cancel.clone(),
306 job.force_full_sync,
307 job.portal_type,
308 job.profile.as_deref(),
309 job.sparql_endpoint.as_deref(),
310 )
311 .await;
312
313 match result {
314 Ok((sync_result, _embed_stats)) => {
315 if sync_result.is_cancelled() {
316 worker_reporter.report(WorkerEvent::JobCancelled {
318 job_id: job.id,
319 stats: &sync_result.stats,
320 });
321
322 if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
323 error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
324 }
325 } else {
326 worker_reporter.report(WorkerEvent::JobCompleted {
328 job_id: job.id,
329 stats: &sync_result.stats,
330 });
331
332 if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
333 error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
334 }
335 }
336 }
337 Err(e) => {
338 let error_msg = e.to_string();
339 let can_retry = job.can_retry() && e.is_retryable();
340
341 worker_reporter.report(WorkerEvent::JobFailed {
342 job_id: job.id,
343 error: &error_msg,
344 will_retry: can_retry,
345 });
346
347 let next_retry = if can_retry {
348 Some(job.calculate_next_retry(&self.config.retry_config))
349 } else {
350 None
351 };
352
353 if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
354 error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
355 }
356 }
357 }
358 }
359
360 pub async fn process_single_job<WR, HR>(
365 &self,
366 job_id: Uuid,
367 cancel_token: CancellationToken,
368 worker_reporter: &WR,
369 harvest_reporter: &HR,
370 ) -> Result<(), AppError>
371 where
372 WR: WorkerReporter,
373 HR: ProgressReporter,
374 {
375 let job = self
376 .queue
377 .get_job(job_id)
378 .await?
379 .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
380
381 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
382 .await;
383
384 Ok(())
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_silent_worker_reporter() {
394 let reporter = SilentWorkerReporter;
395 reporter.report(WorkerEvent::Started {
397 worker_id: "test-worker",
398 });
399 reporter.report(WorkerEvent::Polling);
400 reporter.report(WorkerEvent::Stopped {
401 worker_id: "test-worker",
402 });
403 }
404
405 #[test]
406 fn test_tracing_worker_reporter() {
407 let reporter = TracingWorkerReporter;
408
409 reporter.report(WorkerEvent::Started {
411 worker_id: "test-worker",
412 });
413 reporter.report(WorkerEvent::Polling);
414
415 let stats = SyncStats {
416 created: 5,
417 updated: 3,
418 unchanged: 10,
419 failed: 1,
420 skipped: 0,
421 };
422 reporter.report(WorkerEvent::JobCompleted {
423 job_id: Uuid::new_v4(),
424 stats: &stats,
425 });
426 reporter.report(WorkerEvent::JobFailed {
427 job_id: Uuid::new_v4(),
428 error: "test error",
429 will_retry: true,
430 });
431 reporter.report(WorkerEvent::JobFailed {
432 job_id: Uuid::new_v4(),
433 error: "fatal error",
434 will_retry: false,
435 });
436 reporter.report(WorkerEvent::JobCancelled {
437 job_id: Uuid::new_v4(),
438 stats: &stats,
439 });
440 reporter.report(WorkerEvent::ShuttingDown {
441 worker_id: "test-worker",
442 jobs_released: 2,
443 });
444 reporter.report(WorkerEvent::Stopped {
445 worker_id: "test-worker",
446 });
447 }
448}