use assay_workflow::{Engine, SqliteStore};
use std::sync::Arc;
use tokio::sync::broadcast;
async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
let store = SqliteStore::new("sqlite::memory:").await.unwrap();
let engine = Engine::start(store);
let (event_tx, _) = broadcast::channel(64);
let state = Arc::new(assay_workflow::api::AppState {
engine: Arc::new(engine),
event_tx,
auth_mode: assay_workflow::api::auth::AuthMode::NoAuth,
});
let app = assay_workflow::api::router(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let base_url = format!("http://127.0.0.1:{port}");
let handle = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
(base_url, handle)
}
fn client() -> reqwest::Client {
reqwest::Client::new()
}
#[tokio::test]
async fn health_check() {
let (url, _handle) = start_test_server().await;
let resp = client()
.get(format!("{url}/api/v1/health"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "ok");
assert_eq!(body["service"], "assay-workflow");
}
#[tokio::test]
async fn start_and_list_workflows() {
let (url, _handle) = start_test_server().await;
let c = client();
let resp = c
.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "IngestData",
"workflow_id": "wf-test-1",
"input": {"source": "s3://bucket"},
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["workflow_id"], "wf-test-1");
assert_eq!(body["status"], "PENDING");
let resp = c
.get(format!("{url}/api/v1/workflows"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Vec<serde_json::Value> = resp.json().await.unwrap();
assert_eq!(body.len(), 1);
assert_eq!(body[0]["id"], "wf-test-1");
let resp = c
.get(format!("{url}/api/v1/workflows/wf-test-1"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["workflow_type"], "IngestData");
let resp = c
.get(format!("{url}/api/v1/workflows/wf-test-1/events"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Vec<serde_json::Value> = resp.json().await.unwrap();
assert_eq!(body.len(), 1);
assert_eq!(body[0]["event_type"], "WorkflowStarted");
}
#[tokio::test]
async fn signal_and_cancel_workflow() {
let (url, _handle) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "Approval",
"workflow_id": "wf-sig-1",
}))
.send()
.await
.unwrap();
let resp = c
.post(format!("{url}/api/v1/workflows/wf-sig-1/signal/approve"))
.json(&serde_json::json!({ "payload": {"approved": true} }))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = c
.post(format!("{url}/api/v1/workflows/wf-sig-1/cancel"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = c
.post(format!("{url}/api/v1/workflows/wf-sig-1/cancel"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn worker_register_and_poll() {
let (url, _handle) = start_test_server().await;
let c = client();
let resp = c
.post(format!("{url}/api/v1/workers/register"))
.json(&serde_json::json!({
"identity": "test-worker-1",
"queue": "default",
"activities": ["fetch_data"],
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
let worker_id = body["worker_id"].as_str().unwrap().to_string();
assert!(worker_id.starts_with("w-"));
let resp = c
.get(format!("{url}/api/v1/workers"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Vec<serde_json::Value> = resp.json().await.unwrap();
assert_eq!(body.len(), 1);
let resp = c
.post(format!("{url}/api/v1/tasks/poll"))
.json(&serde_json::json!({
"queue": "default",
"worker_id": worker_id,
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert!(body["task"].is_null());
}
#[tokio::test]
async fn schedule_crud() {
let (url, _handle) = start_test_server().await;
let c = client();
let resp = c
.post(format!("{url}/api/v1/schedules"))
.json(&serde_json::json!({
"name": "hourly-ingest",
"workflow_type": "IngestData",
"cron_expr": "0 * * * *",
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201);
let resp = c
.get(format!("{url}/api/v1/schedules"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Vec<serde_json::Value> = resp.json().await.unwrap();
assert_eq!(body.len(), 1);
assert_eq!(body[0]["name"], "hourly-ingest");
let resp = c
.get(format!("{url}/api/v1/schedules/hourly-ingest"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = c
.delete(format!("{url}/api/v1/schedules/hourly-ingest"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = c
.get(format!("{url}/api/v1/schedules/hourly-ingest"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn workflow_not_found() {
let (url, _handle) = start_test_server().await;
let c = client();
let resp = c
.get(format!("{url}/api/v1/workflows/nonexistent"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
}