use std::time::Duration;
use ceres_core::config::PortalType;
use ceres_core::error::AppError;
use ceres_core::job::{JobStatus, WorkerConfig};
use ceres_core::pipeline::HarvestPipeline;
use ceres_core::progress::SilentReporter;
use ceres_core::traits::PortalClientFactory;
use ceres_core::worker::{SilentWorkerReporter, WorkerService};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use super::common::{
MockDatasetStore, MockEmbeddingProvider, MockJobQueue, MockPortalClientFactory, MockPortalData,
};
const TEST_PORTAL: &str = "https://test.example.com";
fn test_worker_config() -> WorkerConfig {
WorkerConfig::default()
.with_worker_id("test-worker")
.with_poll_interval(Duration::from_millis(10))
}
fn make_worker(
queue: MockJobQueue,
datasets: Vec<MockPortalData>,
) -> WorkerService<MockJobQueue, MockDatasetStore, MockEmbeddingProvider, MockPortalClientFactory> {
let store = MockDatasetStore::new();
let embedding = MockEmbeddingProvider::new();
let factory = MockPortalClientFactory::new(datasets);
let pipeline = HarvestPipeline::new(store, embedding, factory);
WorkerService::new(queue, pipeline, test_worker_config())
}
#[derive(Clone)]
struct FailingPortalClientFactory;
impl PortalClientFactory for FailingPortalClientFactory {
type Client = super::common::MockPortalClient;
fn create(
&self,
_portal_url: &str,
_portal_type: PortalType,
_language: &str,
_profile: Option<&str>,
_sparql_endpoint: Option<&str>,
) -> Result<Self::Client, AppError> {
Err(AppError::NetworkError(
"simulated network failure".to_string(),
))
}
}
fn make_failing_worker(
queue: MockJobQueue,
) -> WorkerService<MockJobQueue, MockDatasetStore, MockEmbeddingProvider, FailingPortalClientFactory>
{
let store = MockDatasetStore::new();
let embedding = MockEmbeddingProvider::new();
let pipeline = HarvestPipeline::new(store, embedding, FailingPortalClientFactory);
WorkerService::new(queue, pipeline, test_worker_config())
}
async fn wait_for_terminal(queue: &MockJobQueue, job_id: Uuid) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if let Some(status) = queue.job_status(job_id)
&& (status.is_terminal() || status == JobStatus::Pending)
{
if status.is_terminal() {
return;
}
if let Some(job) = queue.get_job_snapshot(job_id)
&& job.error_message.is_some()
{
return; }
}
if tokio::time::Instant::now() > deadline {
panic!("Timeout waiting for job {} to reach terminal state", job_id);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
#[tokio::test]
async fn test_worker_completes_job_successfully() {
let datasets = vec![MockPortalData {
id: "d1".to_string(),
title: "Test Dataset".to_string(),
description: None,
}];
let queue = MockJobQueue::new();
let (queue, job_id) = queue.with_pending_job(TEST_PORTAL);
let worker = make_worker(queue.clone(), datasets);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = tokio::spawn(async move {
worker
.run(cancel_clone, &SilentWorkerReporter, &SilentReporter)
.await
});
wait_for_terminal(&queue, job_id).await;
cancel.cancel();
handle.await.unwrap().unwrap();
assert_eq!(queue.job_status(job_id), Some(JobStatus::Completed));
let job = queue.get_job_snapshot(job_id).unwrap();
assert!(job.sync_stats.is_some());
assert!(job.completed_at.is_some());
}
#[tokio::test]
async fn test_worker_fails_job_retryable() {
let queue = MockJobQueue::new();
let (queue, job_id) = queue.with_pending_job_config(TEST_PORTAL, 0, 3);
let worker = make_failing_worker(queue.clone());
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = tokio::spawn(async move {
worker
.run(cancel_clone, &SilentWorkerReporter, &SilentReporter)
.await
});
wait_for_terminal(&queue, job_id).await;
cancel.cancel();
handle.await.unwrap().unwrap();
let job = queue.get_job_snapshot(job_id).unwrap();
assert_eq!(job.status, JobStatus::Pending);
assert!(job.error_message.is_some());
assert!(job.next_retry_at.is_some());
assert_eq!(job.retry_count, 1);
}
#[tokio::test]
async fn test_worker_fails_job_permanent() {
let queue = MockJobQueue::new();
let (queue, job_id) = queue.with_pending_job_config(TEST_PORTAL, 3, 3);
let worker = make_failing_worker(queue.clone());
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = tokio::spawn(async move {
worker
.run(cancel_clone, &SilentWorkerReporter, &SilentReporter)
.await
});
wait_for_terminal(&queue, job_id).await;
cancel.cancel();
handle.await.unwrap().unwrap();
assert_eq!(queue.job_status(job_id), Some(JobStatus::Failed));
let job = queue.get_job_snapshot(job_id).unwrap();
assert!(job.error_message.is_some());
assert!(job.next_retry_at.is_none());
}
#[tokio::test]
async fn test_worker_releases_jobs_on_shutdown() {
let queue = MockJobQueue::new();
let cancel = CancellationToken::new();
cancel.cancel();
let worker = make_worker(queue.clone(), vec![]);
let result = worker
.run(cancel, &SilentWorkerReporter, &SilentReporter)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_single_job_not_found() {
let queue = MockJobQueue::new();
let worker = make_worker(queue, vec![]);
let cancel = CancellationToken::new();
let result = worker
.process_single_job(
Uuid::new_v4(), cancel,
&SilentWorkerReporter,
&SilentReporter,
)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Job not found"));
}
#[tokio::test]
async fn test_worker_handles_no_jobs_gracefully() {
let queue = MockJobQueue::new();
let worker = make_worker(queue, vec![]);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = tokio::spawn(async move {
worker
.run(cancel_clone, &SilentWorkerReporter, &SilentReporter)
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
cancel.cancel();
let result = handle.await.unwrap();
assert!(result.is_ok());
}