use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use workflow_graph_queue::memory::*;
use workflow_graph_queue::*;
use workflow_graph_shared::{JobStatus, Workflow};
fn sample_workflow() -> Workflow {
Workflow::sample()
}
fn create_backends() -> (
Arc<InMemoryJobQueue>,
Arc<InMemoryArtifactStore>,
Arc<InMemoryLogSink>,
Arc<InMemoryWorkerRegistry>,
SharedState,
) {
(
Arc::new(InMemoryJobQueue::new()),
Arc::new(InMemoryArtifactStore::new()),
Arc::new(InMemoryLogSink::new()),
Arc::new(InMemoryWorkerRegistry::new()),
Arc::new(RwLock::new(WorkflowState::new())),
)
}
#[tokio::test]
async fn test_full_workflow_lifecycle() {
let (queue, artifacts, _logs, _workers, state) = create_backends();
let workflow = sample_workflow();
let wf_id = workflow.id.clone();
state
.write()
.await
.workflows
.insert(wf_id.clone(), workflow.clone());
let scheduler = Arc::new(DagScheduler::new(
queue.clone(),
artifacts.clone(),
state.clone(),
));
let scheduler_handle = tokio::spawn(scheduler.clone().run());
scheduler
.start_workflow(&wf_id)
.await
.expect("start_workflow should succeed");
let mut completed_count = 0u32;
let max_iterations = 80;
for _ in 0..max_iterations {
tokio::time::sleep(Duration::from_millis(50)).await;
let claimed = queue
.claim("test-worker", &[], Duration::from_secs(30))
.await
.expect("claim should not error");
if let Some((_job, lease)) = claimed {
queue
.complete(&lease.lease_id, std::collections::HashMap::new())
.await
.expect("complete should succeed");
completed_count += 1;
tokio::time::sleep(Duration::from_millis(100)).await;
if completed_count == 8 {
break;
}
}
}
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(completed_count, 8, "Should have completed all 8 jobs");
let final_state = state.read().await;
let final_workflow = final_state.workflows.get(&wf_id).unwrap();
for job in &final_workflow.jobs {
assert_eq!(
job.status,
JobStatus::Success,
"Job {} should be Success, was {:?}",
job.id,
job.status
);
}
scheduler_handle.abort();
}
#[tokio::test]
async fn test_failure_cascading_integration() {
let (queue, artifacts, _logs, _workers, state) = create_backends();
let workflow = sample_workflow();
let wf_id = workflow.id.clone();
state
.write()
.await
.workflows
.insert(wf_id.clone(), workflow.clone());
let scheduler = Arc::new(DagScheduler::new(
queue.clone(),
artifacts.clone(),
state.clone(),
));
let scheduler_handle = tokio::spawn(scheduler.clone().run());
scheduler
.start_workflow(&wf_id)
.await
.expect("start_workflow should succeed");
let mut claimed_ids = Vec::new();
for _ in 0..3 {
tokio::time::sleep(Duration::from_millis(20)).await;
let claimed = queue
.claim("test-worker", &[], Duration::from_secs(30))
.await
.expect("claim should not error");
if let Some((job, lease)) = claimed {
if job.job_id == "unit-tests" {
queue
.fail(&lease.lease_id, "test failure".into(), false)
.await
.expect("fail should succeed");
} else {
queue
.complete(&lease.lease_id, std::collections::HashMap::new())
.await
.expect("complete should succeed");
}
claimed_ids.push(job.job_id.clone());
}
}
tokio::time::sleep(Duration::from_millis(200)).await;
let final_state = state.read().await;
let wf = final_state.workflows.get(&wf_id).unwrap();
let status_of = |id: &str| wf.jobs.iter().find(|j| j.id == id).unwrap().status.clone();
assert_eq!(status_of("unit-tests"), JobStatus::Failure);
assert_eq!(status_of("build"), JobStatus::Skipped);
assert_eq!(status_of("deploy-db"), JobStatus::Skipped);
assert_eq!(status_of("deploy-web"), JobStatus::Skipped);
scheduler_handle.abort();
}
#[tokio::test]
async fn test_cancel_workflow_integration() {
let (queue, artifacts, _logs, _workers, state) = create_backends();
let workflow = sample_workflow();
let wf_id = workflow.id.clone();
state
.write()
.await
.workflows
.insert(wf_id.clone(), workflow.clone());
let scheduler = Arc::new(DagScheduler::new(
queue.clone(),
artifacts.clone(),
state.clone(),
));
let scheduler_handle = tokio::spawn(scheduler.clone().run());
scheduler
.start_workflow(&wf_id)
.await
.expect("start_workflow should succeed");
tokio::time::sleep(Duration::from_millis(50)).await;
let claimed = queue
.claim("test-worker", &[], Duration::from_secs(30))
.await
.expect("claim should not error");
assert!(claimed.is_some(), "Should be able to claim a root job");
scheduler
.cancel_workflow(&wf_id)
.await
.expect("cancel_workflow should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
let final_state = state.read().await;
let wf = final_state.workflows.get(&wf_id).unwrap();
let cancelled_count = wf
.jobs
.iter()
.filter(|j| j.status == JobStatus::Cancelled)
.count();
assert!(
cancelled_count >= 5,
"Most jobs should be cancelled, got {cancelled_count}"
);
scheduler_handle.abort();
}
#[tokio::test]
async fn test_concurrent_workers_no_double_claim() {
let (queue, artifacts, _logs, _workers, state) = create_backends();
let workflow = sample_workflow();
let wf_id = workflow.id.clone();
state
.write()
.await
.workflows
.insert(wf_id.clone(), workflow.clone());
let scheduler = Arc::new(DagScheduler::new(
queue.clone(),
artifacts.clone(),
state.clone(),
));
let scheduler_handle = tokio::spawn(scheduler.clone().run());
scheduler
.start_workflow(&wf_id)
.await
.expect("start_workflow should succeed");
tokio::time::sleep(Duration::from_millis(50)).await;
let mut handles = Vec::new();
for i in 0..5 {
let queue = queue.clone();
handles.push(tokio::spawn(async move {
queue
.claim(&format!("worker-{i}"), &[], Duration::from_secs(30))
.await
.expect("claim should not error")
}));
}
let mut claimed_count = 0u32;
let mut claimed_ids: Vec<String> = Vec::new();
for handle in handles {
let result: Option<(
workflow_graph_queue::traits::QueuedJob,
workflow_graph_queue::traits::Lease,
)> = handle.await.unwrap();
if let Some((job, _lease)) = result {
claimed_count += 1;
claimed_ids.push(job.job_id.clone());
}
}
assert_eq!(
claimed_count, 3,
"Exactly 3 root jobs should be claimed by 5 workers, got {claimed_count}"
);
let unique: std::collections::HashSet<_> = claimed_ids.iter().collect();
assert_eq!(unique.len(), claimed_ids.len(), "No duplicate claims");
scheduler_handle.abort();
}