use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::SyncStats;
use crate::error::AppError;
use crate::job::{HarvestJob, WorkerConfig};
use crate::job_queue::JobQueue;
use crate::pipeline::HarvestPipeline;
use crate::progress::ProgressReporter;
use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
#[derive(Debug, Clone)]
pub enum WorkerEvent<'a> {
Started { worker_id: &'a str },
Polling,
JobClaimed { job: &'a HarvestJob },
JobStarted { job_id: Uuid, portal_url: &'a str },
JobCompleted { job_id: Uuid, stats: &'a SyncStats },
JobFailed {
job_id: Uuid,
error: &'a str,
will_retry: bool,
},
JobCancelled { job_id: Uuid, stats: &'a SyncStats },
ShuttingDown {
worker_id: &'a str,
jobs_released: u64,
},
Stopped { worker_id: &'a str },
}
pub trait WorkerReporter: Send + Sync {
fn report(&self, event: WorkerEvent<'_>) {
let _ = event;
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct SilentWorkerReporter;
impl WorkerReporter for SilentWorkerReporter {}
#[derive(Debug, Default, Clone, Copy)]
pub struct TracingWorkerReporter;
impl WorkerReporter for TracingWorkerReporter {
fn report(&self, event: WorkerEvent<'_>) {
match event {
WorkerEvent::Started { worker_id } => {
info!(worker_id, "Worker started");
}
WorkerEvent::Polling => {
tracing::debug!("Polling for jobs...");
}
WorkerEvent::JobClaimed { job } => {
info!(job_id = %job.id, portal = %job.portal_url, "Job claimed");
}
WorkerEvent::JobStarted { job_id, portal_url } => {
info!(%job_id, portal = portal_url, "Processing job");
}
WorkerEvent::JobCompleted { job_id, stats } => {
info!(
%job_id,
created = stats.created,
updated = stats.updated,
unchanged = stats.unchanged,
failed = stats.failed,
"Job completed"
);
}
WorkerEvent::JobFailed {
job_id,
error,
will_retry,
} => {
if will_retry {
warn!(%job_id, %error, "Job failed, will retry");
} else {
error!(%job_id, %error, "Job permanently failed");
}
}
WorkerEvent::JobCancelled { job_id, stats } => {
info!(%job_id, processed = stats.total(), "Job cancelled");
}
WorkerEvent::ShuttingDown {
worker_id,
jobs_released,
} => {
info!(worker_id, jobs_released, "Worker shutting down");
}
WorkerEvent::Stopped { worker_id } => {
info!(worker_id, "Worker stopped");
}
}
}
}
pub struct WorkerService<Q, S, E, F>
where
Q: JobQueue,
S: DatasetStore,
E: EmbeddingProvider,
F: PortalClientFactory,
{
queue: Q,
pipeline: HarvestPipeline<S, E, F>,
config: WorkerConfig,
}
impl<Q, S, E, F> WorkerService<Q, S, E, F>
where
Q: JobQueue,
S: DatasetStore + Clone,
E: EmbeddingProvider,
F: PortalClientFactory,
{
pub fn new(queue: Q, pipeline: HarvestPipeline<S, E, F>, config: WorkerConfig) -> Self {
Self {
queue,
pipeline,
config,
}
}
pub async fn run<WR, HR>(
&self,
cancel_token: CancellationToken,
worker_reporter: &WR,
harvest_reporter: &HR,
) -> Result<(), AppError>
where
WR: WorkerReporter,
HR: ProgressReporter,
{
worker_reporter.report(WorkerEvent::Started {
worker_id: &self.config.worker_id,
});
loop {
if cancel_token.is_cancelled() {
break;
}
worker_reporter.report(WorkerEvent::Polling);
match self.queue.claim_job(&self.config.worker_id).await {
Ok(Some(job)) => {
worker_reporter.report(WorkerEvent::JobClaimed { job: &job });
self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
.await;
}
Ok(None) => {
tokio::select! {
_ = tokio::time::sleep(self.config.poll_interval) => {}
_ = cancel_token.cancelled() => break,
}
}
Err(e) => {
error!(error = %e, "Failed to claim job");
tokio::time::sleep(self.config.poll_interval * 2).await;
}
}
}
let released = self
.queue
.release_worker_jobs(&self.config.worker_id)
.await
.unwrap_or_else(|e| {
tracing::error!("Failed to release worker jobs on shutdown: {}", e);
0
});
worker_reporter.report(WorkerEvent::ShuttingDown {
worker_id: &self.config.worker_id,
jobs_released: released,
});
worker_reporter.report(WorkerEvent::Stopped {
worker_id: &self.config.worker_id,
});
Ok(())
}
async fn process_job<WR, HR>(
&self,
job: &HarvestJob,
cancel_token: &CancellationToken,
worker_reporter: &WR,
harvest_reporter: &HR,
) where
WR: WorkerReporter,
HR: ProgressReporter,
{
worker_reporter.report(WorkerEvent::JobStarted {
job_id: job.id,
portal_url: &job.portal_url,
});
let job_cancel = cancel_token.child_token();
let language = job.language.as_deref().unwrap_or("en");
let result = self
.pipeline
.sync_portal_with_progress_cancellable_with_options(
&job.portal_url,
job.url_template.as_deref(),
language,
harvest_reporter,
job_cancel.clone(),
job.force_full_sync,
job.portal_type,
job.profile.as_deref(),
job.sparql_endpoint.as_deref(),
)
.await;
match result {
Ok((sync_result, _embed_stats)) => {
if sync_result.is_cancelled() {
worker_reporter.report(WorkerEvent::JobCancelled {
job_id: job.id,
stats: &sync_result.stats,
});
if let Err(e) = self.queue.cancel_job(job.id, Some(sync_result.stats)).await {
error!(job_id = %job.id, error = %e, "Failed to mark job as cancelled");
}
} else {
worker_reporter.report(WorkerEvent::JobCompleted {
job_id: job.id,
stats: &sync_result.stats,
});
if let Err(e) = self.queue.complete_job(job.id, sync_result.stats).await {
error!(job_id = %job.id, error = %e, "Failed to mark job as completed");
}
}
}
Err(e) => {
let error_msg = e.to_string();
let can_retry = job.can_retry() && e.is_retryable();
worker_reporter.report(WorkerEvent::JobFailed {
job_id: job.id,
error: &error_msg,
will_retry: can_retry,
});
let next_retry = if can_retry {
Some(job.calculate_next_retry(&self.config.retry_config))
} else {
None
};
if let Err(e) = self.queue.fail_job(job.id, &error_msg, next_retry).await {
error!(job_id = %job.id, error = %e, "Failed to mark job as failed");
}
}
}
}
pub async fn process_single_job<WR, HR>(
&self,
job_id: Uuid,
cancel_token: CancellationToken,
worker_reporter: &WR,
harvest_reporter: &HR,
) -> Result<(), AppError>
where
WR: WorkerReporter,
HR: ProgressReporter,
{
let job = self
.queue
.get_job(job_id)
.await?
.ok_or_else(|| AppError::Generic(format!("Job not found: {}", job_id)))?;
self.process_job(&job, &cancel_token, worker_reporter, harvest_reporter)
.await;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_silent_worker_reporter() {
let reporter = SilentWorkerReporter;
reporter.report(WorkerEvent::Started {
worker_id: "test-worker",
});
reporter.report(WorkerEvent::Polling);
reporter.report(WorkerEvent::Stopped {
worker_id: "test-worker",
});
}
#[test]
fn test_tracing_worker_reporter() {
let reporter = TracingWorkerReporter;
reporter.report(WorkerEvent::Started {
worker_id: "test-worker",
});
reporter.report(WorkerEvent::Polling);
let stats = SyncStats {
created: 5,
updated: 3,
unchanged: 10,
failed: 1,
skipped: 0,
};
reporter.report(WorkerEvent::JobCompleted {
job_id: Uuid::new_v4(),
stats: &stats,
});
reporter.report(WorkerEvent::JobFailed {
job_id: Uuid::new_v4(),
error: "test error",
will_retry: true,
});
reporter.report(WorkerEvent::JobFailed {
job_id: Uuid::new_v4(),
error: "fatal error",
will_retry: false,
});
reporter.report(WorkerEvent::JobCancelled {
job_id: Uuid::new_v4(),
stats: &stats,
});
reporter.report(WorkerEvent::ShuttingDown {
worker_id: "test-worker",
jobs_released: 2,
});
reporter.report(WorkerEvent::Stopped {
worker_id: "test-worker",
});
}
}