1use tokio_util::sync::CancellationToken;
40use tracing::{error, info, warn};
41use uuid::Uuid;
42
43use crate::SyncStats;
44use crate::error::AppError;
45use crate::harvest::HarvestService;
46use crate::job::{HarvestJob, WorkerConfig};
47use crate::job_queue::JobQueue;
48use crate::progress::ProgressReporter;
49use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
50
51#[derive(Debug, Clone)]
57pub enum WorkerEvent<'a> {
58 Started { worker_id: &'a str },
60 Polling,
62 JobClaimed { job: &'a HarvestJob },
64 JobStarted { job_id: Uuid, portal_url: &'a str },
66 JobCompleted { job_id: Uuid, stats: &'a SyncStats },
68 JobFailed {
70 job_id: Uuid,
71 error: &'a str,
72 will_retry: bool,
73 },
74 JobCancelled { job_id: Uuid, stats: &'a SyncStats },
76 ShuttingDown {
78 worker_id: &'a str,
79 jobs_released: u64,
80 },
81 Stopped { worker_id: &'a str },
83}
84
85pub trait WorkerReporter: Send + Sync {
93 fn report(&self, event: WorkerEvent<'_>) {
97 let _ = event;
98 }
99}
100
101#[derive(Debug, Default, Clone, Copy)]
103pub struct SilentWorkerReporter;
104
105impl WorkerReporter for SilentWorkerReporter {}
106
107#[derive(Debug, Default, Clone, Copy)]
109pub struct TracingWorkerReporter;
110
111impl WorkerReporter for TracingWorkerReporter {
112 fn report(&self, event: WorkerEvent<'_>) {
113 match event {
114 WorkerEvent::Started { worker_id } => {
115 info!(worker_id, "Worker started");
116 }
117 WorkerEvent::Polling => {
118 tracing::debug!("Polling for jobs...");
120 }
121 WorkerEvent::JobClaimed { job } => {
122 info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
123 }
124 WorkerEvent::JobStarted { job_id, portal_url } => {
125 info!(%job_id, portal = portal_url, "Processing job");
126 }
127 WorkerEvent::JobCompleted { job_id, stats } => {
128 info!(
129 %job_id,
130 created = stats.created,
131 updated = stats.updated,
132 unchanged = stats.unchanged,
133 failed = stats.failed,
134 "Job completed"
135 );
136 }
137 WorkerEvent::JobFailed {
138 job_id,
139 error,
140 will_retry,
141 } => {
142 if will_retry {
143 warn!(%job_id, %error, "Job failed, will retry");
144 } else {
145 error!(%job_id, %error, "Job permanently failed");
146 }
147 }
148 WorkerEvent::JobCancelled { job_id, stats } => {
149 info!(%job_id, processed = stats.total(), "Job cancelled");
150 }
151 WorkerEvent::ShuttingDown {
152 worker_id,
153 jobs_released,
154 } => {
155 info!(worker_id, jobs_released, "Worker shutting down");
156 }
157 WorkerEvent::Stopped { worker_id } => {
158 info!(worker_id, "Worker stopped");
159 }
160 }
161 }
162}
163
164pub struct WorkerService<Q, S, E, F>
173where
174 Q: JobQueue,
175 S: DatasetStore,
176 E: EmbeddingProvider,
177 F: PortalClientFactory,
178{
179 queue: Q,
180 harvest_service: HarvestService<S, E, F>,
181 config: WorkerConfig,
182}
183
184impl<Q, S, E, F> WorkerService<Q, S, E, F>
185where
186 Q: JobQueue,
187 S: DatasetStore,
188 E: EmbeddingProvider,
189 F: PortalClientFactory,
190{
191 pub fn new(queue: Q, harvest_service: HarvestService<S, E, F>, config: WorkerConfig) -> Self {
193 Self {
194 queue,
195 harvest_service,
196 config,
197 }
198 }
199
200 pub async fn run<WR, HR>(
208 &self,
209 cancel_token: CancellationToken,
210 worker_reporter: &WR,
211 harvest_reporter: &HR,
212 ) -> Result<(), AppError>
213 where
214 WR: WorkerReporter,
215 HR: ProgressReporter,
216 {
217 worker_reporter.report(WorkerEvent::Started {
218 worker_id: &self.config.worker_id,
219 });
220
221 loop {
222 if cancel_token.is_cancelled() {
224 break;
225 }
226
227 worker_reporter.report(WorkerEvent::Polling);
229
230 match self.queue.claim_job(&self.config.worker_id).await {
231 Ok(Some(job)) => {
232 worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
233
234 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
236 .await;
237 }
238 Ok(None) => {
239 tokio::select! {
241 _ = tokio::time::sleep(self.config.poll_interval) => {}
242 _ = cancel_token.cancelled() => break,
243 }
244 }
245 Err(e) => {
246 error!(error = %e, "Failed to claim job");
247 tokio::time::sleep(self.config.poll_interval * 2).await;
249 }
250 }
251 }
252
253 let released = self
255 .queue
256 .release_worker_jobs(&self.config.worker_id)
257 .await
258 .unwrap_or(0);
259
260 worker_reporter.report(WorkerEvent::ShuttingDown {
261 worker_id: &self.config.worker_id,
262 jobs_released: released,
263 });
264
265 worker_reporter.report(WorkerEvent::Stopped {
266 worker_id: &self.config.worker_id,
267 });
268
269 Ok(())
270 }
271
272 async fn process_job<WR, HR>(
274 &self,
275 job: &HarvestJob,
276 cancel_token: &CancellationToken,
277 worker_reporter: &WR,
278 harvest_reporter: &HR,
279 ) where
280 WR: WorkerReporter,
281 HR: ProgressReporter,
282 {
283 worker_reporter.report(WorkerEvent::JobStarted {
284 job_id: job.id,
285 portal_url: &job.portal_url,
286 });
287
288 let job_cancel = cancel_token.child_token();
290
291 let result = self
293 .harvest_service
294 .sync_portal_with_progress_cancellable_with_options(
295 &job.portal_url,
296 harvest_reporter,
297 job_cancel.clone(),
298 job.force_full_sync,
299 )
300 .await;
301
302 match result {
303 Ok(sync_result) => {
304 if sync_result.is_cancelled() {
305 worker_reporter.report(WorkerEvent::JobCancelled {
307 job_id: job.id,
308 stats: &sync_result.stats,
309 });
310
311 if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
312 error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
313 }
314 } else {
315 worker_reporter.report(WorkerEvent::JobCompleted {
317 job_id: job.id,
318 stats: &sync_result.stats,
319 });
320
321 if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
322 error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
323 }
324 }
325 }
326 Err(e) => {
327 let error_msg = e.to_string();
328 let can_retry = job.can_retry() && e.is_retryable();
329
330 worker_reporter.report(WorkerEvent::JobFailed {
331 job_id: job.id,
332 error: &error_msg,
333 will_retry: can_retry,
334 });
335
336 let next_retry = if can_retry {
337 Some(job.calculate_next_retry(&self.config.retry_config))
338 } else {
339 None
340 };
341
342 if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
343 error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
344 }
345 }
346 }
347 }
348
349 pub async fn process_single_job<WR, HR>(
354 &self,
355 job_id: Uuid,
356 cancel_token: CancellationToken,
357 worker_reporter: &WR,
358 harvest_reporter: &HR,
359 ) -> Result<(), AppError>
360 where
361 WR: WorkerReporter,
362 HR: ProgressReporter,
363 {
364 let job = self
365 .queue
366 .get_job(job_id)
367 .await?
368 .ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
369
370 self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
371 .await;
372
373 Ok(())
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn test_silent_worker_reporter() {
383 let reporter = SilentWorkerReporter;
384 reporter.report(WorkerEvent::Started {
386 worker_id: "test-worker",
387 });
388 reporter.report(WorkerEvent::Polling);
389 reporter.report(WorkerEvent::Stopped {
390 worker_id: "test-worker",
391 });
392 }
393
394 #[test]
395 fn test_tracing_worker_reporter() {
396 let reporter = TracingWorkerReporter;
397
398 reporter.report(WorkerEvent::Started {
400 worker_id: "test-worker",
401 });
402 reporter.report(WorkerEvent::Polling);
403
404 let stats = SyncStats {
405 created: 5,
406 updated: 3,
407 unchanged: 10,
408 failed: 1,
409 skipped: 0,
410 };
411 reporter.report(WorkerEvent::JobCompleted {
412 job_id: Uuid::new_v4(),
413 stats: &stats,
414 });
415 reporter.report(WorkerEvent::JobFailed {
416 job_id: Uuid::new_v4(),
417 error: "test error",
418 will_retry: true,
419 });
420 reporter.report(WorkerEvent::JobFailed {
421 job_id: Uuid::new_v4(),
422 error: "fatal error",
423 will_retry: false,
424 });
425 reporter.report(WorkerEvent::JobCancelled {
426 job_id: Uuid::new_v4(),
427 stats: &stats,
428 });
429 reporter.report(WorkerEvent::ShuttingDown {
430 worker_id: "test-worker",
431 jobs_released: 2,
432 });
433 reporter.report(WorkerEvent::Stopped {
434 worker_id: "test-worker",
435 });
436 }
437}