use assay::workflow::store::sqlite::SqliteStore;
use assay::workflow::store::WorkflowStore;
use assay::workflow::types::*;
async fn test_store() -> SqliteStore {
SqliteStore::new("sqlite::memory:").await.unwrap()
}
fn now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}
fn make_workflow(id: &str, wf_type: &str) -> WorkflowRecord {
let ts = now();
WorkflowRecord {
id: id.to_string(),
namespace: "main".to_string(),
run_id: format!("run-{id}"),
workflow_type: wf_type.to_string(),
task_queue: "main".to_string(),
status: "PENDING".to_string(),
input: Some(r#"{"key":"value"}"#.to_string()),
result: None,
error: None,
parent_id: None,
claimed_by: None,
search_attributes: None,
archived_at: None,
archive_uri: None,
created_at: ts,
updated_at: ts,
completed_at: None,
}
}
#[tokio::test]
async fn workflow_create_and_get() {
let store = test_store().await;
let wf = make_workflow("wf-1", "IngestData");
store.create_workflow(&wf).await.unwrap();
let fetched = store.get_workflow("wf-1").await.unwrap().unwrap();
assert_eq!(fetched.id, "wf-1");
assert_eq!(fetched.workflow_type, "IngestData");
assert_eq!(fetched.status, "PENDING");
assert_eq!(fetched.input.as_deref(), Some(r#"{"key":"value"}"#));
}
#[tokio::test]
async fn workflow_get_nonexistent() {
let store = test_store().await;
let result = store.get_workflow("nonexistent").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn workflow_list_filter_by_status() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
store
.create_workflow(&make_workflow("wf-2", "TypeB"))
.await
.unwrap();
store
.update_workflow_status("wf-1", WorkflowStatus::Running, None, None)
.await
.unwrap();
let running = store
.list_workflows("main", Some(WorkflowStatus::Running), None, None, 100, 0)
.await
.unwrap();
assert_eq!(running.len(), 1);
assert_eq!(running[0].id, "wf-1");
let pending = store
.list_workflows("main", Some(WorkflowStatus::Pending), None, None, 100, 0)
.await
.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id, "wf-2");
let all = store
.list_workflows("main", None, None, None, 100, 0)
.await
.unwrap();
assert_eq!(all.len(), 2);
}
#[tokio::test]
async fn workflow_claim() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
let claimed = store.claim_workflow("wf-1", "worker-1").await.unwrap();
assert!(claimed);
let claimed_again = store.claim_workflow("wf-1", "worker-2").await.unwrap();
assert!(!claimed_again);
let wf = store.get_workflow("wf-1").await.unwrap().unwrap();
assert_eq!(wf.claimed_by.as_deref(), Some("worker-1"));
assert_eq!(wf.status, "RUNNING");
}
#[tokio::test]
async fn workflow_update_status_to_completed() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
store
.update_workflow_status("wf-1", WorkflowStatus::Completed, Some(r#"{"done":true}"#), None)
.await
.unwrap();
let wf = store.get_workflow("wf-1").await.unwrap().unwrap();
assert_eq!(wf.status, "COMPLETED");
assert_eq!(wf.result.as_deref(), Some(r#"{"done":true}"#));
assert!(wf.completed_at.is_some());
}
#[tokio::test]
async fn event_append_and_list() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
let ts = now();
store
.append_event(&WorkflowEvent {
id: None,
workflow_id: "wf-1".to_string(),
seq: 1,
event_type: "WorkflowStarted".to_string(),
payload: Some(r#"{"input":"data"}"#.to_string()),
timestamp: ts,
})
.await
.unwrap();
store
.append_event(&WorkflowEvent {
id: None,
workflow_id: "wf-1".to_string(),
seq: 2,
event_type: "ActivityScheduled".to_string(),
payload: Some(r#"{"name":"fetch"}"#.to_string()),
timestamp: ts + 1.0,
})
.await
.unwrap();
let events = store.list_events("wf-1").await.unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].seq, 1);
assert_eq!(events[0].event_type, "WorkflowStarted");
assert_eq!(events[1].seq, 2);
let count = store.get_event_count("wf-1").await.unwrap();
assert_eq!(count, 2);
}
#[tokio::test]
async fn activity_create_claim_complete() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
let ts = now();
store
.create_activity(&WorkflowActivity {
id: None,
workflow_id: "wf-1".to_string(),
seq: 1,
name: "fetch_data".to_string(),
task_queue: "default".to_string(),
input: Some(r#"{"url":"http://example.com"}"#.to_string()),
status: "PENDING".to_string(),
result: None,
error: None,
attempt: 1,
max_attempts: 3,
initial_interval_secs: 1.0,
backoff_coefficient: 2.0,
start_to_close_secs: 300.0,
heartbeat_timeout_secs: None,
claimed_by: None,
scheduled_at: ts,
started_at: None,
completed_at: None,
last_heartbeat: None,
})
.await
.unwrap();
let wrong_queue = store.claim_activity("gpu", "worker-1").await.unwrap();
assert!(wrong_queue.is_none());
let claimed = store.claim_activity("default", "worker-1").await.unwrap();
assert!(claimed.is_some());
let act = claimed.unwrap();
assert_eq!(act.name, "fetch_data");
assert_eq!(act.status, "RUNNING");
let next = store.claim_activity("default", "worker-2").await.unwrap();
assert!(next.is_none());
store
.complete_activity(act.id.unwrap(), Some(r#"{"rows":42}"#), None, false)
.await
.unwrap();
}
#[tokio::test]
async fn timer_create_and_fire() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
let past = now() - 10.0;
let future = now() + 3600.0;
store
.create_timer(&WorkflowTimer {
id: None,
workflow_id: "wf-1".to_string(),
seq: 1,
fire_at: past,
fired: false,
})
.await
.unwrap();
store
.create_timer(&WorkflowTimer {
id: None,
workflow_id: "wf-1".to_string(),
seq: 2,
fire_at: future,
fired: false,
})
.await
.unwrap();
let fired = store.fire_due_timers(now()).await.unwrap();
assert_eq!(fired.len(), 1);
assert_eq!(fired[0].seq, 1);
assert!(fired[0].fired);
let fired_again = store.fire_due_timers(now()).await.unwrap();
assert!(fired_again.is_empty());
}
#[tokio::test]
async fn signal_send_and_consume() {
let store = test_store().await;
store
.create_workflow(&make_workflow("wf-1", "TypeA"))
.await
.unwrap();
store
.send_signal(&WorkflowSignal {
id: None,
workflow_id: "wf-1".to_string(),
name: "approval".to_string(),
payload: Some(r#"{"approved":true}"#.to_string()),
consumed: false,
received_at: now(),
})
.await
.unwrap();
store
.send_signal(&WorkflowSignal {
id: None,
workflow_id: "wf-1".to_string(),
name: "approval".to_string(),
payload: Some(r#"{"approved":false}"#.to_string()),
consumed: false,
received_at: now(),
})
.await
.unwrap();
let consumed = store.consume_signals("wf-1", "approval").await.unwrap();
assert_eq!(consumed.len(), 2);
let consumed_again = store.consume_signals("wf-1", "approval").await.unwrap();
assert!(consumed_again.is_empty());
}
#[tokio::test]
async fn schedule_crud() {
let store = test_store().await;
store
.create_schedule(&WorkflowSchedule {
name: "hourly-ingest".to_string(),
namespace: "main".to_string(),
workflow_type: "IngestData".to_string(),
cron_expr: "0 * * * *".to_string(),
timezone: "UTC".to_string(),
input: None,
task_queue: "main".to_string(),
overlap_policy: "skip".to_string(),
paused: false,
last_run_at: None,
next_run_at: Some(now() + 3600.0),
last_workflow_id: None,
created_at: now(),
})
.await
.unwrap();
let sched = store.get_schedule("main", "hourly-ingest").await.unwrap().unwrap();
assert_eq!(sched.workflow_type, "IngestData");
assert_eq!(sched.cron_expr, "0 * * * *");
let all = store.list_schedules("main").await.unwrap();
assert_eq!(all.len(), 1);
store
.update_schedule_last_run("main", "hourly-ingest", now(), now() + 3600.0, "wf-run-1")
.await
.unwrap();
let updated = store.get_schedule("main", "hourly-ingest").await.unwrap().unwrap();
assert!(updated.last_run_at.is_some());
assert_eq!(updated.last_workflow_id.as_deref(), Some("wf-run-1"));
let deleted = store.delete_schedule("main", "hourly-ingest").await.unwrap();
assert!(deleted);
let gone = store.get_schedule("main", "hourly-ingest").await.unwrap();
assert!(gone.is_none());
}
#[tokio::test]
async fn worker_register_heartbeat_remove() {
let store = test_store().await;
let ts = now();
store
.register_worker(&WorkflowWorker {
id: "w-1".to_string(),
namespace: "main".to_string(),
identity: "pipeline-pod-1".to_string(),
task_queue: "main".to_string(),
workflows: Some(r#"["IngestData"]"#.to_string()),
activities: Some(r#"["fetch_data"]"#.to_string()),
max_concurrent_workflows: 10,
max_concurrent_activities: 20,
active_tasks: 0,
last_heartbeat: ts,
registered_at: ts,
})
.await
.unwrap();
let workers = store.list_workers("main").await.unwrap();
assert_eq!(workers.len(), 1);
assert_eq!(workers[0].identity, "pipeline-pod-1");
store.heartbeat_worker("w-1", ts + 30.0).await.unwrap();
let removed = store.remove_dead_workers(ts + 29.0).await.unwrap();
assert!(removed.is_empty());
let removed = store.remove_dead_workers(ts + 31.0).await.unwrap();
assert_eq!(removed.len(), 1);
assert_eq!(removed[0], "w-1");
let workers = store.list_workers("main").await.unwrap();
assert!(workers.is_empty());
}
#[tokio::test]
async fn archivable_list_and_purge() {
let store = test_store().await;
let mut old = make_workflow("wf-old", "Type");
old.status = "COMPLETED".to_string();
old.completed_at = Some(100.0);
old.updated_at = 100.0;
store.create_workflow(&old).await.unwrap();
store
.append_event(&WorkflowEvent {
id: None,
workflow_id: "wf-old".to_string(),
seq: 1,
event_type: "WorkflowStarted".to_string(),
payload: None,
timestamp: 100.0,
})
.await
.unwrap();
let mut recent = make_workflow("wf-recent", "Type");
recent.status = "COMPLETED".to_string();
recent.completed_at = Some(now());
store.create_workflow(&recent).await.unwrap();
let running = make_workflow("wf-running", "Type");
store.create_workflow(&running).await.unwrap();
let archivable = store
.list_archivable_workflows(now() - 1.0, 10)
.await
.unwrap();
let ids: Vec<&str> = archivable.iter().map(|w| w.id.as_str()).collect();
assert_eq!(ids, vec!["wf-old"]);
store
.mark_archived_and_purge("wf-old", "s3://bucket/wf-old.json", now())
.await
.unwrap();
let stub = store.get_workflow("wf-old").await.unwrap().unwrap();
assert!(stub.archived_at.is_some());
assert_eq!(stub.archive_uri.as_deref(), Some("s3://bucket/wf-old.json"));
let events = store.list_events("wf-old").await.unwrap();
assert!(events.is_empty(), "events should be purged");
let running_still = store.get_workflow("wf-running").await.unwrap().unwrap();
assert!(running_still.archived_at.is_none());
let recent_still = store.get_workflow("wf-recent").await.unwrap().unwrap();
assert!(recent_still.archived_at.is_none());
}