use assay_workflow::{Engine, SqliteStore};
use std::sync::Arc;
use tokio::sync::broadcast;
async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
let store = SqliteStore::new("sqlite::memory:").await.unwrap();
let engine = Engine::start(store);
let (event_tx, _) = broadcast::channel(64);
let state = Arc::new(assay_workflow::api::AppState {
engine: Arc::new(engine),
event_tx,
auth_mode: assay_workflow::api::auth::AuthMode::no_auth(),
binary_version: None,
});
let app = assay_workflow::api::router(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let base_url = format!("http://127.0.0.1:{port}");
let handle = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
(base_url, handle)
}
fn client() -> reqwest::Client {
reqwest::Client::new()
}
#[tokio::test]
async fn schedule_activity_creates_pending_row_and_event() {
let (url, _h) = start_test_server().await;
let c = client();
let resp = c
.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-1",
"task_queue": "default",
"input": {"hello": "world"},
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201, "start workflow");
let resp = c
.post(format!("{url}/api/v1/workflows/wf-1/activities"))
.json(&serde_json::json!({
"name": "fetch",
"input": {"url": "https://example.com"},
"task_queue": "default",
"seq": 1,
"max_attempts": 3,
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201, "schedule activity");
let scheduled: serde_json::Value = resp.json().await.unwrap();
let activity_id = scheduled["id"].as_i64().expect("activity id");
let resp = c
.get(format!("{url}/api/v1/activities/{activity_id}"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200, "get activity");
let activity: serde_json::Value = resp.json().await.unwrap();
assert_eq!(activity["status"], "PENDING");
assert_eq!(activity["name"], "fetch");
assert_eq!(activity["task_queue"], "default");
assert_eq!(activity["workflow_id"], "wf-1");
assert_eq!(activity["seq"], 1);
let resp = c
.get(format!("{url}/api/v1/workflows/wf-1/events"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200, "get events");
let events: Vec<serde_json::Value> = resp.json().await.unwrap();
let types: Vec<&str> = events.iter().map(|e| e["event_type"].as_str().unwrap()).collect();
assert!(
types.contains(&"WorkflowStarted"),
"events should include WorkflowStarted, got {types:?}"
);
assert!(
types.contains(&"ActivityScheduled"),
"events should include ActivityScheduled, got {types:?}"
);
let resp = c
.get(format!("{url}/api/v1/workflows/wf-1"))
.send()
.await
.unwrap();
let wf: serde_json::Value = resp.json().await.unwrap();
assert_eq!(wf["status"], "RUNNING");
}
#[tokio::test]
async fn schedule_activity_is_idempotent_on_seq() {
let (url, _h) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-idem",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let body = serde_json::json!({
"name": "fetch",
"input": {"x": 1},
"task_queue": "default",
"seq": 1,
});
let r1: serde_json::Value = c
.post(format!("{url}/api/v1/workflows/wf-idem/activities"))
.json(&body)
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let r2: serde_json::Value = c
.post(format!("{url}/api/v1/workflows/wf-idem/activities"))
.json(&body)
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(r1["id"], r2["id"], "same seq → same activity id");
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-idem/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let scheduled_count = events
.iter()
.filter(|e| e["event_type"].as_str() == Some("ActivityScheduled"))
.count();
assert_eq!(scheduled_count, 1, "second schedule must not append a second event");
}
async fn schedule_and_claim(c: &reqwest::Client, url: &str, workflow_id: &str) -> i64 {
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": workflow_id,
"task_queue": "default",
}))
.send()
.await
.unwrap();
c.post(format!("{url}/api/v1/workers/register"))
.json(&serde_json::json!({
"identity": "test-worker",
"queue": "default",
"activities": ["fetch"],
}))
.send()
.await
.unwrap();
let scheduled: serde_json::Value = c
.post(format!("{url}/api/v1/workflows/{workflow_id}/activities"))
.json(&serde_json::json!({
"name": "fetch",
"input": {"x": 1},
"task_queue": "default",
"seq": 1,
"max_attempts": 3,
"initial_interval_secs": 0.05,
}))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let activity_id = scheduled["id"].as_i64().expect("activity id");
let poll_resp: serde_json::Value = c
.post(format!("{url}/api/v1/tasks/poll"))
.json(&serde_json::json!({
"queue": "default",
"worker_id": "test-worker",
}))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(
poll_resp["id"].as_i64(),
Some(activity_id),
"expected to claim the just-scheduled activity, got {poll_resp}"
);
activity_id
}
#[tokio::test]
async fn complete_activity_appends_event() {
let (url, _h) = start_test_server().await;
let c = client();
let activity_id = schedule_and_claim(&c, &url, "wf-complete").await;
let resp = c
.post(format!("{url}/api/v1/tasks/{activity_id}/complete"))
.json(&serde_json::json!({"result": {"bytes": 42}}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-complete/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let completed = events
.iter()
.find(|e| e["event_type"].as_str() == Some("ActivityCompleted"))
.expect("ActivityCompleted event should appear");
let payload: serde_json::Value =
serde_json::from_str(completed["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["activity_seq"], 1, "event must carry activity seq");
assert_eq!(payload["activity_id"], activity_id);
assert!(payload["result"].is_object() || payload["result"].is_string());
}
#[tokio::test]
async fn fail_activity_retries_until_max_attempts() {
let (url, _h) = start_test_server().await;
let c = client();
let activity_id = schedule_and_claim(&c, &url, "wf-retry").await;
let resp = c
.post(format!("{url}/api/v1/tasks/{activity_id}/fail"))
.json(&serde_json::json!({"error": "transient: ConnectionReset"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let act: serde_json::Value = c
.get(format!("{url}/api/v1/activities/{activity_id}"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(
act["status"], "PENDING",
"first fail must requeue while attempts remain, got {act}"
);
assert_eq!(act["attempt"], 2, "attempt should increment");
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-retry/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let failed_count = events
.iter()
.filter(|e| e["event_type"].as_str() == Some("ActivityFailed"))
.count();
assert_eq!(failed_count, 0, "should not fire ActivityFailed while retrying");
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
for expected_attempt in 2..=3 {
let claimed: serde_json::Value = c
.post(format!("{url}/api/v1/tasks/poll"))
.json(&serde_json::json!({
"queue": "default",
"worker_id": "test-worker",
}))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(claimed["id"].as_i64(), Some(activity_id), "should re-claim same activity");
assert_eq!(claimed["attempt"], expected_attempt);
c.post(format!("{url}/api/v1/tasks/{activity_id}/fail"))
.json(&serde_json::json!({"error": "still failing"}))
.send()
.await
.unwrap();
if expected_attempt < 3 {
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
}
}
let act: serde_json::Value = c
.get(format!("{url}/api/v1/activities/{activity_id}"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(act["status"], "FAILED", "after max attempts the activity is FAILED");
assert_eq!(act["attempt"], 3);
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-retry/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let failed_count = events
.iter()
.filter(|e| e["event_type"].as_str() == Some("ActivityFailed"))
.count();
assert_eq!(failed_count, 1, "exactly one ActivityFailed event after exhausting retries");
}
async fn poll_workflow_task(
c: &reqwest::Client,
url: &str,
queue: &str,
worker_id: &str,
) -> serde_json::Value {
c.post(format!("{url}/api/v1/workflow-tasks/poll"))
.json(&serde_json::json!({"queue": queue, "worker_id": worker_id}))
.send()
.await
.unwrap()
.json()
.await
.unwrap()
}
#[tokio::test]
async fn start_workflow_makes_it_dispatchable() {
let (url, _h) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-disp-1",
"task_queue": "default",
"input": {"hello": "world"},
}))
.send()
.await
.unwrap();
let task = poll_workflow_task(&c, &url, "default", "worker-A").await;
assert_eq!(task["workflow_id"], "wf-disp-1");
assert_eq!(task["workflow_type"], "TestWorkflow");
assert_eq!(task["input"]["hello"], "world");
let history = task["history"].as_array().expect("history is an array");
assert!(
history.iter().any(|e| e["event_type"] == "WorkflowStarted"),
"history should include WorkflowStarted, got {history:?}"
);
}
#[tokio::test]
async fn workflow_task_claim_is_exclusive() {
let (url, _h) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-disp-2",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let first = poll_workflow_task(&c, &url, "default", "worker-A").await;
assert_eq!(first["workflow_id"], "wf-disp-2", "worker-A should claim it");
let second = poll_workflow_task(&c, &url, "default", "worker-B").await;
assert!(second.is_null(), "worker-B must get nothing while worker-A holds it");
}
#[tokio::test]
async fn submit_commands_schedules_activities_and_releases_claim() {
let (url, _h) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-disp-3",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let _claim = poll_workflow_task(&c, &url, "default", "worker-A").await;
let resp = c
.post(format!("{url}/api/v1/workflow-tasks/wf-disp-3/commands"))
.json(&serde_json::json!({
"worker_id": "worker-A",
"commands": [
{"type": "ScheduleActivity", "seq": 1, "name": "fetch",
"task_queue": "default", "input": {"k": "v"}}
]
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-disp-3/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert!(
events.iter().any(|e| e["event_type"] == "ActivityScheduled"),
"command should have produced ActivityScheduled"
);
let next = poll_workflow_task(&c, &url, "default", "worker-A").await;
assert!(
next.is_null(),
"workflow should not be re-dispatchable until something new happens"
);
}
#[tokio::test]
async fn activity_completion_redispatches_workflow() {
let (url, _h) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-disp-4",
"task_queue": "default",
}))
.send()
.await
.unwrap();
poll_workflow_task(&c, &url, "default", "worker-A").await;
let scheduled: serde_json::Value = c
.post(format!("{url}/api/v1/workflows/wf-disp-4/activities"))
.json(&serde_json::json!({
"name": "fetch", "seq": 1, "task_queue": "default", "input": {}
}))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let activity_id = scheduled["id"].as_i64().unwrap();
c.post(format!("{url}/api/v1/workers/register"))
.json(&serde_json::json!({
"identity": "act-worker", "queue": "default", "activities": ["fetch"],
}))
.send()
.await
.unwrap();
c.post(format!("{url}/api/v1/tasks/poll"))
.json(&serde_json::json!({"queue": "default", "worker_id": "act-worker"}))
.send()
.await
.unwrap();
c.post(format!("{url}/api/v1/tasks/{activity_id}/complete"))
.json(&serde_json::json!({"result": {"ok": true}}))
.send()
.await
.unwrap();
c.post(format!("{url}/api/v1/workflow-tasks/wf-disp-4/commands"))
.json(&serde_json::json!({"worker_id": "worker-A", "commands": []}))
.send()
.await
.unwrap();
let next = poll_workflow_task(&c, &url, "default", "worker-A").await;
assert_eq!(
next["workflow_id"], "wf-disp-4",
"ActivityCompleted should make the workflow dispatchable again, got {next}"
);
}
#[tokio::test]
async fn complete_workflow_command_marks_terminal() {
let (url, _h) = start_test_server().await;
let c = client();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TestWorkflow",
"workflow_id": "wf-disp-5",
"task_queue": "default",
}))
.send()
.await
.unwrap();
poll_workflow_task(&c, &url, "default", "worker-A").await;
let resp = c
.post(format!("{url}/api/v1/workflow-tasks/wf-disp-5/commands"))
.json(&serde_json::json!({
"worker_id": "worker-A",
"commands": [
{"type": "CompleteWorkflow", "result": {"steps": 0}}
]
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let wf: serde_json::Value = c
.get(format!("{url}/api/v1/workflows/wf-disp-5"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(wf["status"], "COMPLETED");
let result_str = wf["result"].as_str().expect("result string");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["steps"], 0);
let next = poll_workflow_task(&c, &url, "default", "worker-A").await;
assert!(next.is_null(), "completed workflow must not poll");
}
use std::path::PathBuf;
use std::process::Stdio;
fn locate_assay_binary() -> Option<PathBuf> {
let here = std::env::current_dir().ok()?;
let mut probe = here.clone();
loop {
let cand_dbg = probe.join("target/debug/assay");
let cand_rel = probe.join("target/release/assay");
if cand_dbg.is_file() {
return Some(cand_dbg);
}
if cand_rel.is_file() {
return Some(cand_rel);
}
if !probe.pop() {
return None;
}
}
}
async fn wait_for_workflow_status(
c: &reqwest::Client,
base_url: &str,
workflow_id: &str,
target_status: &str,
timeout: std::time::Duration,
) -> serde_json::Value {
let deadline = std::time::Instant::now() + timeout;
let mut last: serde_json::Value = serde_json::Value::Null;
while std::time::Instant::now() < deadline {
let resp = c
.get(format!("{base_url}/api/v1/workflows/{workflow_id}"))
.send()
.await
.expect("describe workflow");
if resp.status() == 200 {
last = resp.json().await.expect("workflow json");
if last["status"].as_str() == Some(target_status) {
return last;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let events: serde_json::Value = c
.get(format!("{base_url}/api/v1/workflows/{workflow_id}/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap_or(serde_json::Value::Null);
panic!(
"workflow {workflow_id} did not reach {target_status} within timeout.\n\
last workflow record: {}\n\
events: {}",
serde_json::to_string_pretty(&last).unwrap(),
serde_json::to_string_pretty(&events).unwrap()
);
}
#[tokio::test]
async fn lua_workflow_runs_to_completion() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!(
"SKIP: lua_workflow_runs_to_completion — no assay binary at \
target/{{debug,release}}/assay. Run `cargo build --bin assay` first."
);
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("TwoStep", function(ctx, input)
local a = ctx:execute_activity("step1", { n = input.n })
local b = ctx:execute_activity("step2", { prev = a })
return { first = a, second = b, sum = a.value + b.value }
end)
workflow.activity("step1", function(ctx, input)
return { value = input.n * 2 }
end)
workflow.activity("step2", function(ctx, input)
return { value = input.prev.value + 10 }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn assay worker subprocess");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let resp = c
.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "TwoStep",
"workflow_id": "wf-lua-1",
"task_queue": "default",
"input": {"n": 5},
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201, "workflow start");
let wf = wait_for_workflow_status(
&c,
&url,
"wf-lua-1",
"COMPLETED",
std::time::Duration::from_secs(10),
)
.await;
let result_str = wf["result"].as_str().expect("workflow result");
let result: serde_json::Value = serde_json::from_str(result_str).expect("result json");
assert_eq!(result["first"]["value"], 10, "step1 result");
assert_eq!(result["second"]["value"], 20, "step2 result");
assert_eq!(result["sum"], 30, "sum");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_with_signal() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_with_signal — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("WaitForApproval", function(ctx, input)
local approval = ctx:wait_for_signal("approve")
return { approved = true, by = approval and approval.by }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "WaitForApproval",
"workflow_id": "wf-sig-1",
"task_queue": "default",
}))
.send()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let wf: serde_json::Value = c
.get(format!("{url}/api/v1/workflows/wf-sig-1"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(wf["status"], "RUNNING", "workflow should be waiting, not done");
let resp = c
.post(format!("{url}/api/v1/workflows/wf-sig-1/signal/approve"))
.json(&serde_json::json!({"payload": {"by": "alice"}}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-sig-1",
"COMPLETED",
std::time::Duration::from_secs(5),
)
.await;
let result_str = final_wf["result"].as_str().expect("result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["approved"], true);
assert_eq!(result["by"], "alice");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_ctx_cancel_lands_in_cancelled_status() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_ctx_cancel_lands_in_cancelled_status — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("SelfCancel", function(ctx, input)
ctx:cancel("operator rejected the request")
-- unreachable — ctx:cancel raises
return { unreachable = true }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "SelfCancel",
"workflow_id": "wf-self-cancel",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-self-cancel",
"CANCELLED",
std::time::Duration::from_secs(5),
)
.await;
assert_eq!(final_wf["status"], "CANCELLED");
assert!(
final_wf["result"].is_null(),
"cancelled workflow should not carry a completion result"
);
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_namespace_scoping_end_to_end() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_namespace_scoping_end_to_end — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let resp = c
.post(format!("{url}/api/v1/namespaces"))
.json(&serde_json::json!({ "name": "deployments" }))
.send()
.await
.unwrap();
assert!(resp.status().is_success(), "create namespace: {}", resp.status());
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("ScopedPing", function(ctx, input)
return { pong = input.n }
end)
workflow.listen({ queue = "scoped-q", namespace = "deployments" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "ScopedPing",
"workflow_id": "wf-ns-scoped",
"task_queue": "scoped-q",
"namespace": "deployments",
"input": { "n": 7 },
}))
.send()
.await
.unwrap();
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-ns-scoped",
"COMPLETED",
std::time::Duration::from_secs(5),
)
.await;
assert_eq!(final_wf["namespace"], "deployments", "workflow carries namespace");
let result: serde_json::Value =
serde_json::from_str(final_wf["result"].as_str().expect("result")).unwrap();
assert_eq!(result["pong"], 7);
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_wait_for_signal_timeout_signal_wins() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_wait_for_signal_timeout_signal_wins — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("WaitWithTimeout", function(ctx, input)
local payload = ctx:wait_for_signal("decide", { timeout = 30 })
if payload == nil then
return { timed_out = true }
end
return { timed_out = false, choice = payload.choice }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "WaitWithTimeout",
"workflow_id": "wf-timed-sig-win",
"task_queue": "default",
}))
.send()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let resp = c
.post(format!("{url}/api/v1/workflows/wf-timed-sig-win/signal/decide"))
.json(&serde_json::json!({"payload": {"choice": "approve"}}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-timed-sig-win",
"COMPLETED",
std::time::Duration::from_secs(5),
)
.await;
let result: serde_json::Value =
serde_json::from_str(final_wf["result"].as_str().expect("result")).unwrap();
assert_eq!(result["timed_out"], false);
assert_eq!(result["choice"], "approve");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_wait_for_signal_timeout_timer_wins() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_wait_for_signal_timeout_timer_wins — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("WaitWithShortTimeout", function(ctx, input)
local payload = ctx:wait_for_signal("decide", { timeout = 1 })
if payload == nil then
return { timed_out = true }
end
return { timed_out = false, choice = payload.choice }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "WaitWithShortTimeout",
"workflow_id": "wf-timed-sig-timeout",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-timed-sig-timeout",
"COMPLETED",
std::time::Duration::from_secs(8),
)
.await;
let result: serde_json::Value =
serde_json::from_str(final_wf["result"].as_str().expect("result")).unwrap();
assert_eq!(result["timed_out"], true);
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_cron_schedule_fires_real_workflow() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_cron_schedule_fires_real_workflow — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("CronTriggered", function(ctx, input)
local r = ctx:execute_activity("greet", { who = "world" })
return { greeting = r.message }
end)
workflow.activity("greet", function(ctx, input)
return { message = "hello, " .. input.who }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let resp = c
.post(format!("{url}/api/v1/schedules"))
.json(&serde_json::json!({
"namespace": "main",
"name": "test-cron-1",
"workflow_type": "CronTriggered",
"cron_expr": "* * * * * *",
"task_queue": "default",
}))
.send()
.await
.unwrap();
assert!(
resp.status().is_success(),
"schedule create failed: {}",
resp.status()
);
let started_at = std::time::Instant::now();
let mut found_workflow_id: Option<String> = None;
while started_at.elapsed() < std::time::Duration::from_secs(25) {
let resp = c
.get(format!("{url}/api/v1/workflows?namespace=main"))
.send()
.await
.unwrap();
let workflows: Vec<serde_json::Value> = resp.json().await.unwrap_or_default();
if let Some(wf) = workflows
.iter()
.find(|w| w["workflow_type"] == "CronTriggered")
{
found_workflow_id = wf["id"].as_str().map(String::from);
break;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
let workflow_id = found_workflow_id
.expect("scheduler should have started a CronTriggered workflow within 25s");
let final_wf = wait_for_workflow_status(
&c,
&url,
&workflow_id,
"COMPLETED",
std::time::Duration::from_secs(10),
)
.await;
let result_str = final_wf["result"].as_str().expect("result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["greeting"], "hello, world");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_worker_crash_resumes_workflow() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_worker_crash_resumes_workflow — no assay binary");
return;
};
unsafe { std::env::set_var("ASSAY_WF_DISPATCH_TIMEOUT_SECS", "2") };
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let counter_path = tmp.path().join("counter.txt");
std::fs::write(&counter_path, "0").expect("init counter");
let worker_path = tmp.path().join("worker.lua");
let worker_src = r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
local COUNTER = "__COUNTER_PATH__"
local function bump_and_token()
local cur = tonumber(fs.read(COUNTER)) or 0
fs.write(COUNTER, tostring(cur + 1))
return { token = "tok-" .. tostring(cur + 1) }
end
workflow.define("CrashSafeWorkflow", function(ctx, input)
local t = ctx:side_effect("issue", bump_and_token)
ctx:sleep(2)
local r = ctx:execute_activity("step", { token = t.token })
return { token = t, step = r }
end)
workflow.activity("step", function(ctx, input)
return { saw = input.token }
end)
workflow.listen({ queue = "default" })
"#
.replace("__COUNTER_PATH__", counter_path.to_str().unwrap());
std::fs::write(&worker_path, &worker_src).expect("write worker.lua");
let mut worker_a = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker A");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "CrashSafeWorkflow",
"workflow_id": "wf-crash-1",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let recorded = wait_for_event(&c, &url, "wf-crash-1", "SideEffectRecorded",
std::time::Duration::from_secs(5)).await;
assert!(recorded, "worker A should have recorded the side effect before we kill it");
if let Some(pid) = worker_a.id() {
std::process::Command::new("kill")
.args(["-KILL", &pid.to_string()])
.status()
.expect("kill worker A");
}
let _ = worker_a.wait().await;
let mut worker_b = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker B");
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-crash-1",
"COMPLETED",
std::time::Duration::from_secs(15),
)
.await;
let result_str = final_wf["result"].as_str().expect("result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["token"]["token"], "tok-1");
assert_eq!(result["step"]["saw"], "tok-1");
let counter = std::fs::read_to_string(&counter_path).expect("counter").trim().to_string();
assert_eq!(
counter, "1",
"side_effect must run exactly once across worker crash (got {counter} runs)"
);
let _ = worker_b.kill().await;
}
async fn wait_for_event(
c: &reqwest::Client,
base_url: &str,
workflow_id: &str,
event_type: &str,
timeout: std::time::Duration,
) -> bool {
let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
let events: Vec<serde_json::Value> = c
.get(format!("{base_url}/api/v1/workflows/{workflow_id}/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap_or_default();
if events.iter().any(|e| e["event_type"] == event_type) {
return true;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
false
}
#[tokio::test]
async fn lua_workflow_side_effect_is_recorded_once() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_side_effect_is_recorded_once — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let counter_path = tmp.path().join("counter.txt");
std::fs::write(&counter_path, "0").expect("init counter");
let worker_path = tmp.path().join("worker.lua");
let worker_src = r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
local COUNTER = "__COUNTER_PATH__"
-- Counter file lets the test see how many times the side_effect
-- function actually ran. Side effects must be recorded once and
-- cached for all subsequent replays.
local function bump_counter_and_return_value()
local cur = tonumber(fs.read(COUNTER)) or 0
fs.write(COUNTER, tostring(cur + 1))
return { token = "tok-" .. tostring(cur + 1) }
end
workflow.define("WithSideEffect", function(ctx, input)
local token = ctx:side_effect("issue_token", bump_counter_and_return_value)
-- Two activities so we get multiple replay cycles
local a = ctx:execute_activity("step", { token = token.token })
local b = ctx:execute_activity("step", { token = token.token })
return { token = token, a = a, b = b }
end)
workflow.activity("step", function(ctx, input)
return { saw = input.token }
end)
workflow.listen({ queue = "default" })
"#
.replace("__COUNTER_PATH__", counter_path.to_str().unwrap());
std::fs::write(&worker_path, worker_src).expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "WithSideEffect",
"workflow_id": "wf-se-1",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-se-1",
"COMPLETED",
std::time::Duration::from_secs(10),
)
.await;
let result_str = final_wf["result"].as_str().expect("result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["token"]["token"], "tok-1");
assert_eq!(result["a"]["saw"], "tok-1");
assert_eq!(result["b"]["saw"], "tok-1");
let counter = std::fs::read_to_string(&counter_path).expect("counter").trim().to_string();
assert_eq!(
counter, "1",
"side_effect function ran {counter} times — must be exactly 1"
);
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_child_workflow_completes_before_parent() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_child_workflow_completes_before_parent — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("Parent", function(ctx, input)
local child_result = ctx:start_child_workflow("Child", {
workflow_id = "child-of-" .. input.parent_label,
input = { multiplier = input.multiplier },
})
return { from_child = child_result, parent_label = input.parent_label }
end)
workflow.define("Child", function(ctx, input)
local r = ctx:execute_activity("multiply", { x = 7, by = input.multiplier })
return { product = r.product }
end)
workflow.activity("multiply", function(ctx, input)
return { product = input.x * input.by }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "Parent",
"workflow_id": "wf-parent-1",
"task_queue": "default",
"input": {"parent_label": "alpha", "multiplier": 6},
}))
.send()
.await
.unwrap();
let parent = wait_for_workflow_status(
&c,
&url,
"wf-parent-1",
"COMPLETED",
std::time::Duration::from_secs(15),
)
.await;
let parent_result_str = parent["result"].as_str().expect("parent result");
let parent_result: serde_json::Value =
serde_json::from_str(parent_result_str).expect("parse parent result");
assert_eq!(parent_result["parent_label"], "alpha");
assert_eq!(parent_result["from_child"]["product"], 42, "7 * 6 = 42");
let child: serde_json::Value = c
.get(format!("{url}/api/v1/workflows/child-of-alpha"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(child["status"], "COMPLETED");
assert_eq!(child["parent_id"], "wf-parent-1");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_cancellation_stops_work() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_cancellation_stops_work — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("LongSleepThenWork", function(ctx, input)
ctx:sleep(5)
-- This activity should NEVER be scheduled — we cancel before the
-- timer fires.
return ctx:execute_activity("never_runs", { x = 1 })
end)
workflow.activity("never_runs", function(ctx, input)
return { x = input.x }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "LongSleepThenWork",
"workflow_id": "wf-cancel-1",
"task_queue": "default",
}))
.send()
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let resp = c
.post(format!("{url}/api/v1/workflows/wf-cancel-1/cancel"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let final_wf = wait_for_workflow_status(
&c,
&url,
"wf-cancel-1",
"CANCELLED",
std::time::Duration::from_secs(5),
)
.await;
assert_eq!(final_wf["status"], "CANCELLED");
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-cancel-1/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let activity_scheduled_count = events
.iter()
.filter(|e| e["event_type"].as_str() == Some("ActivityScheduled"))
.count();
assert_eq!(
activity_scheduled_count, 0,
"the post-sleep activity must not have been scheduled, got events: {events:?}"
);
let types: Vec<&str> = events
.iter()
.map(|e| e["event_type"].as_str().unwrap_or(""))
.collect();
assert!(
types.contains(&"WorkflowCancelRequested"),
"missing WorkflowCancelRequested in {types:?}"
);
assert!(
types.contains(&"WorkflowCancelled"),
"missing terminal WorkflowCancelled in {types:?}"
);
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_with_durable_timer() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_with_durable_timer — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("SleepThenStep", function(ctx, input)
ctx:sleep(1)
local r = ctx:execute_activity("step", { x = input.x })
return { x = r.x, slept = true }
end)
workflow.activity("step", function(ctx, input)
return { x = input.x * 3 }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn assay worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
let started_at = std::time::Instant::now();
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "SleepThenStep",
"workflow_id": "wf-timer-1",
"task_queue": "default",
"input": {"x": 7},
}))
.send()
.await
.unwrap();
let wf = wait_for_workflow_status(
&c,
&url,
"wf-timer-1",
"COMPLETED",
std::time::Duration::from_secs(15),
)
.await;
let elapsed = started_at.elapsed();
assert!(
elapsed >= std::time::Duration::from_millis(900),
"workflow finished too fast: {elapsed:?} (durable timer should have made us wait ~1s)"
);
assert!(
elapsed <= std::time::Duration::from_secs(5),
"workflow took too long: {elapsed:?}"
);
let result_str = wf["result"].as_str().expect("result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["x"], 21, "step ran with input.x*3 after timer");
assert_eq!(result["slept"], true);
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-timer-1/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let types: Vec<&str> = events
.iter()
.map(|e| e["event_type"].as_str().unwrap_or(""))
.collect();
assert!(types.contains(&"TimerScheduled"), "missing TimerScheduled in {types:?}");
assert!(types.contains(&"TimerFired"), "missing TimerFired in {types:?}");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_register_query_exposes_live_state() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_register_query_exposes_live_state — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("Staged", function(ctx, input)
local state = { stage = "init", progress = 0 }
ctx:register_query("stage", function() return state.stage end)
ctx:register_query("progress", function() return state.progress end)
state.stage = "running"
state.progress = 0.25
ctx:execute_activity("step_a", {})
state.progress = 0.75
ctx:execute_activity("step_b", {})
state.stage = "done"
state.progress = 1.0
return { ok = true }
end)
workflow.activity("step_a", function(ctx, input) return { ok = true } end)
workflow.activity("step_b", function(ctx, input) return { ok = true } end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "Staged",
"workflow_id": "wf-rq-1",
"task_queue": "default",
}))
.send()
.await
.unwrap();
wait_for_workflow_status(
&c,
&url,
"wf-rq-1",
"COMPLETED",
std::time::Duration::from_secs(10),
)
.await;
let resp = c
.get(format!("{url}/api/v1/workflows/wf-rq-1/state"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200, "state endpoint");
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["state"]["stage"], "done");
assert_eq!(body["state"]["progress"], 1.0);
let resp = c
.get(format!("{url}/api/v1/workflows/wf-rq-1/state/stage"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200, "state/stage endpoint");
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["value"], "done");
let resp = c
.get(format!("{url}/api/v1/workflows/wf-rq-1/state/nonexistent"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "Staged",
"workflow_id": "wf-rq-none",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_continue_as_new_starts_fresh_run() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_continue_as_new_starts_fresh_run — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
-- First run: call continue_as_new with a bumped counter, never returns.
-- Second run: observes the bumped input, completes normally.
workflow.define("Counter", function(ctx, input)
local n = (input and input.n) or 0
if n == 0 then
ctx:continue_as_new({ n = 1 })
end
return { final_n = n }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "Counter",
"workflow_id": "wf-can-1",
"task_queue": "default",
"input": { "n": 0 },
}))
.send()
.await
.unwrap();
wait_for_workflow_status(
&c,
&url,
"wf-can-1",
"COMPLETED",
std::time::Duration::from_secs(10),
)
.await;
let resp = c
.get(format!("{url}/api/v1/workflows?type=Counter&limit=10"))
.send()
.await
.unwrap();
let workflows: Vec<serde_json::Value> = resp.json().await.unwrap();
let second = workflows
.iter()
.find(|w| w["id"].as_str() != Some("wf-can-1"))
.expect("second run should exist");
let second_id = second["id"].as_str().expect("second id");
let second_wf = wait_for_workflow_status(
&c,
&url,
second_id,
"COMPLETED",
std::time::Duration::from_secs(5),
)
.await;
let result_str = second_wf["result"].as_str().expect("second run result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["final_n"], 1, "second run should see bumped input");
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_execute_parallel_three_activities() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_execute_parallel_three_activities — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("FanOut", function(ctx, input)
local results = ctx:execute_parallel({
{ name = "double", input = { n = 1 } },
{ name = "double", input = { n = 2 } },
{ name = "double", input = { n = 3 } },
})
return { sum = results[1].v + results[2].v + results[3].v }
end)
workflow.activity("double", function(ctx, input)
return { v = input.n * 2 }
end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "FanOut",
"workflow_id": "wf-par-1",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let wf = wait_for_workflow_status(
&c,
&url,
"wf-par-1",
"COMPLETED",
std::time::Duration::from_secs(10),
)
.await;
let result_str = wf["result"].as_str().expect("result");
let result: serde_json::Value = serde_json::from_str(result_str).unwrap();
assert_eq!(result["sum"], 12);
let events: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows/wf-par-1/events"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let mut scheduled_before_completed = 0;
for e in &events {
match e["event_type"].as_str().unwrap_or("") {
"ActivityScheduled" => scheduled_before_completed += 1,
"ActivityCompleted" => break,
_ => {}
}
}
assert_eq!(
scheduled_before_completed, 3,
"all 3 parallel activities should be scheduled before any completes"
);
let _ = worker.kill().await;
}
#[tokio::test]
async fn lua_workflow_execute_parallel_one_fails_raises() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_workflow_execute_parallel_one_fails_raises — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let c = client();
let tmp = tempfile::tempdir().expect("tempdir");
let worker_path = tmp.path().join("worker.lua");
std::fs::write(
&worker_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
workflow.define("FanOutWithFailure", function(ctx, input)
ctx:execute_parallel({
{ name = "ok", input = {}, opts = { max_attempts = 1 } },
{ name = "fail", input = {}, opts = { max_attempts = 1 } },
})
return { reached_end = true }
end)
workflow.activity("ok", function(ctx, input) return { ok = true } end)
workflow.activity("fail", function(ctx, input) error("boom") end)
workflow.listen({ queue = "default" })
"#,
)
.expect("write worker.lua");
let mut worker = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&worker_path)
.env("ASSAY_ENGINE_URL", &url)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn worker");
tokio::time::sleep(std::time::Duration::from_millis(800)).await;
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "FanOutWithFailure",
"workflow_id": "wf-par-fail",
"task_queue": "default",
}))
.send()
.await
.unwrap();
let wf = wait_for_workflow_status(
&c,
&url,
"wf-par-fail",
"FAILED",
std::time::Duration::from_secs(10),
)
.await;
let error = wf["error"].as_str().expect("error").to_string();
assert!(
error.contains("fail") || error.contains("boom"),
"error should mention the failing activity, got: {error}"
);
let _ = worker.kill().await;
}
#[tokio::test]
async fn search_attributes_filter_list() {
let (url, _h) = start_test_server().await;
let c = client();
for (id, env) in [("wf-sa-1", "prod"), ("wf-sa-2", "prod"), ("wf-sa-3", "staging")] {
c.post(format!("{url}/api/v1/workflows"))
.json(&serde_json::json!({
"workflow_type": "Tagged",
"workflow_id": id,
"task_queue": "default",
"search_attributes": { "env": env },
}))
.send()
.await
.unwrap();
}
let all: Vec<serde_json::Value> = c
.get(format!("{url}/api/v1/workflows?type=Tagged"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(all.len(), 3);
let prod: Vec<serde_json::Value> = c
.get(format!(
"{url}/api/v1/workflows?type=Tagged&search_attrs=%7B%22env%22%3A%22prod%22%7D"
))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(prod.len(), 2, "env=prod matches two workflows");
let staging: Vec<serde_json::Value> = c
.get(format!(
"{url}/api/v1/workflows?type=Tagged&search_attrs=%7B%22env%22%3A%22staging%22%7D"
))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(staging.len(), 1);
}
#[tokio::test]
async fn lua_stdlib_management_surface_roundtrips() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: lua_stdlib_management_surface_roundtrips — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let tmp = tempfile::tempdir().expect("tempdir");
let script_path = tmp.path().join("stdlib_surface.lua");
std::fs::write(
&script_path,
r#"
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL"))
-- Assay's Lua environment shadows `assert` with a table of helpers
-- (assert.eq, assert.ne, …). Plain `assert(cond, msg)` isn't available,
-- so we use a local `check` helper that raises on false.
local function check(cond, msg)
if not cond then error(msg or "assertion failed") end
end
-- workflow.start + list + describe
workflow.start({
workflow_type = "X", workflow_id = "wf-plan06-a", task_queue = "q1",
input = { n = 1 },
})
workflow.start({
workflow_type = "X", workflow_id = "wf-plan06-b", task_queue = "q1",
})
local listed = workflow.list({ type = "X", limit = 50 })
check(#listed >= 2, "list should return at least the 2 we started")
local desc = workflow.describe("wf-plan06-a")
check(desc.id == "wf-plan06-a", "describe should return the record")
-- workflow.get_events — at least WorkflowStarted
local events = workflow.get_events("wf-plan06-a")
local has_started = false
for _, e in ipairs(events) do
if e.event_type == "WorkflowStarted" then has_started = true end
end
check(has_started, "event log should contain WorkflowStarted")
-- workflow.get_state — 404 is ok before any register_query snapshot written
local state = workflow.get_state("wf-plan06-a")
check(state == nil, "no snapshot yet -> get_state returns nil")
-- workflow.list_children — empty list for a childless workflow
local children = workflow.list_children("wf-plan06-a")
check(#children == 0, "no children yet")
-- workflow.terminate — flips status to FAILED
workflow.terminate("wf-plan06-b", "stdlib test")
local after = workflow.describe("wf-plan06-b")
check(after.status == "FAILED", "terminate should flip to FAILED, got " .. tostring(after.status))
-- workflow.schedules.* — full lifecycle
workflow.schedules.create({
name = "plan06-sched",
workflow_type = "X",
cron_expr = "0 0 2 * * *",
timezone = "Europe/Berlin",
task_queue = "q1",
overlap_policy = "skip",
})
local sched = workflow.schedules.describe("plan06-sched")
check(sched.cron_expr == "0 0 2 * * *", "describe returns the cron expr")
check(sched.timezone == "Europe/Berlin", "timezone persists")
check(sched.paused == false, "fresh schedule is not paused")
workflow.schedules.patch("plan06-sched", { cron_expr = "0 0 3 * * *" })
local patched = workflow.schedules.describe("plan06-sched")
check(patched.cron_expr == "0 0 3 * * *", "patch updates cron")
check(patched.timezone == "Europe/Berlin", "patch preserves unchanged fields")
workflow.schedules.pause("plan06-sched")
check(workflow.schedules.describe("plan06-sched").paused == true, "pause sets paused")
workflow.schedules.resume("plan06-sched")
check(workflow.schedules.describe("plan06-sched").paused == false, "resume clears paused")
local schedules = workflow.schedules.list()
local found = false
for _, s in ipairs(schedules) do
if s.name == "plan06-sched" then found = true end
end
check(found, "list should include our schedule")
workflow.schedules.delete("plan06-sched")
check(workflow.schedules.describe("plan06-sched") == nil, "delete removes it")
-- workflow.namespaces.*
workflow.namespaces.create("plan06-ns")
local namespaces = workflow.namespaces.list()
local ns_found = false
for _, n in ipairs(namespaces) do
if n.name == "plan06-ns" then ns_found = true end
end
check(ns_found, "created namespace should appear in list")
local stats = workflow.namespaces.stats("main")
check(type(stats.total_workflows) == "number", "stats returns counts")
workflow.namespaces.delete("plan06-ns")
-- workflow.workers.list + workflow.queues.stats — just verify they return tables
local workers = workflow.workers.list()
check(type(workers) == "table", "workers.list returns a table")
local q = workflow.queues.stats()
check(type(q) == "table", "queues.stats returns a table")
print("stdlib_surface: all assertions passed")
"#,
)
.expect("write script");
let output = tokio::process::Command::new(&assay_bin)
.arg("run")
.arg(&script_path)
.env("ASSAY_ENGINE_URL", &url)
.output()
.await
.expect("run assay script");
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
output.status.success(),
"stdlib surface script failed.\nstdout:\n{stdout}\nstderr:\n{stderr}"
);
assert!(
stdout.contains("all assertions passed"),
"expected success marker in stdout.\nstdout:\n{stdout}\nstderr:\n{stderr}"
);
}
#[tokio::test]
async fn cli_workflow_list_empty() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_workflow_list_empty — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let output = tokio::process::Command::new(&assay_bin)
.args(["workflow", "list", "--output", "table", "--engine-url", &url])
.output()
.await
.expect("run assay workflow list");
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(
output.status.success(),
"exit 0; stderr: {}",
String::from_utf8_lossy(&output.stderr)
);
assert!(stdout.contains("ID"), "header row present: {stdout}");
}
#[tokio::test]
async fn cli_workflow_describe_missing_is_exit_1() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_workflow_describe_missing_is_exit_1 — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let output = tokio::process::Command::new(&assay_bin)
.args(["workflow", "describe", "nonexistent", "--engine-url", &url])
.output()
.await
.expect("run assay workflow describe");
assert!(!output.status.success(), "missing workflow should exit non-zero");
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("404") || stderr.contains("not found"),
"stderr should mention 404/not-found, got: {stderr}"
);
}
#[tokio::test]
async fn cli_schedule_full_lifecycle() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_schedule_full_lifecycle — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
async fn run_ok(bin: &std::path::Path, args: &[&str]) -> std::process::Output {
let out = tokio::process::Command::new(bin)
.args(args)
.output()
.await
.expect("run assay");
assert!(
out.status.success(),
"command {:?} failed.\nstdout: {}\nstderr: {}",
args,
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
out
}
run_ok(&assay_bin, &[
"schedule", "create", "cli-sched",
"--type", "CliTestWorkflow",
"--cron", "0 0 2 * * *",
"--timezone", "Europe/Berlin",
"--engine-url", &url,
]).await;
let list_out = run_ok(&assay_bin, &["schedule", "list", "--engine-url", &url]).await;
let stdout = String::from_utf8_lossy(&list_out.stdout);
assert!(stdout.contains("cli-sched"), "list includes new schedule: {stdout}");
assert!(
stdout.contains("Europe/Berlin"),
"timezone column populated: {stdout}"
);
run_ok(&assay_bin, &[
"schedule", "patch", "cli-sched",
"--cron", "0 0 3 * * *",
"--engine-url", &url,
]).await;
run_ok(&assay_bin, &["schedule", "pause", "cli-sched", "--engine-url", &url]).await;
run_ok(&assay_bin, &["schedule", "resume", "cli-sched", "--engine-url", &url]).await;
run_ok(&assay_bin, &["schedule", "delete", "cli-sched", "--engine-url", &url]).await;
let final_list = run_ok(&assay_bin, &["schedule", "list", "--engine-url", &url]).await;
let stdout = String::from_utf8_lossy(&final_list.stdout);
assert!(
!stdout.contains("cli-sched"),
"deleted schedule should be gone: {stdout}"
);
}
#[tokio::test]
async fn cli_schedule_patch_without_fields_is_exit_1() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_schedule_patch_without_fields_is_exit_1 — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let output = tokio::process::Command::new(&assay_bin)
.args(["schedule", "patch", "whatever", "--engine-url", &url])
.output()
.await
.expect("run assay schedule patch");
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("at least one") || stderr.contains("--cron"),
"stderr should hint at required fields, got: {stderr}"
);
}
#[tokio::test]
async fn cli_honors_engine_url_env_var() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_honors_engine_url_env_var — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let output = tokio::process::Command::new(&assay_bin)
.args(["workflow", "list"])
.env("ASSAY_ENGINE_URL", &url)
.env_remove("ASSAY_API_KEY")
.output()
.await
.expect("run");
assert!(output.status.success(), "env var honored; stderr: {}", String::from_utf8_lossy(&output.stderr));
}
#[tokio::test]
async fn cli_workflow_start_returns_identifiers() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_workflow_start_returns_identifiers — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let out = tokio::process::Command::new(&assay_bin)
.args([
"workflow", "start",
"--type", "SmokeTest",
"--id", "wf-cli-start",
"--input", r#"{"n":1}"#,
"--output", "json",
"--engine-url", &url,
])
.output()
.await
.expect("run assay workflow start");
assert!(out.status.success(), "stderr: {}", String::from_utf8_lossy(&out.stderr));
let body: serde_json::Value = serde_json::from_slice(&out.stdout).unwrap();
assert_eq!(body["workflow_id"], "wf-cli-start");
assert!(body["run_id"].is_string());
}
#[tokio::test]
async fn cli_workflow_wait_times_out_with_exit_2() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_workflow_wait_times_out_with_exit_2 — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
tokio::process::Command::new(&assay_bin)
.args([
"workflow", "start",
"--type", "NeverCompletes",
"--id", "wf-wait-timeout",
"--engine-url", &url,
])
.output()
.await
.unwrap();
let out = tokio::process::Command::new(&assay_bin)
.args([
"workflow", "wait", "wf-wait-timeout",
"--timeout", "1",
"--engine-url", &url,
])
.output()
.await
.expect("run assay workflow wait");
let code = out.status.code().unwrap_or(-1);
assert_eq!(code, 2, "timeout → exit 2; stderr: {}", String::from_utf8_lossy(&out.stderr));
}
#[tokio::test]
async fn cli_namespace_lifecycle_via_cli() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_namespace_lifecycle_via_cli — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
async fn run(bin: &std::path::Path, args: &[&str]) -> std::process::Output {
let out = tokio::process::Command::new(bin)
.args(args)
.output()
.await
.expect("run");
assert!(
out.status.success(),
"command {:?} failed.\nstdout: {}\nstderr: {}",
args,
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
out
}
run(&assay_bin, &["namespace", "create", "cli-ns", "--engine-url", &url]).await;
let listed = run(&assay_bin, &["namespace", "list", "--engine-url", &url]).await;
assert!(
String::from_utf8_lossy(&listed.stdout).contains("cli-ns"),
"list should include cli-ns"
);
let desc = run(
&assay_bin,
&["namespace", "describe", "cli-ns", "--output", "json", "--engine-url", &url],
)
.await;
let body: serde_json::Value = serde_json::from_slice(&desc.stdout).unwrap();
assert_eq!(body["namespace"], "cli-ns");
assert_eq!(body["total_workflows"], 0);
run(&assay_bin, &["namespace", "delete", "cli-ns", "--engine-url", &url]).await;
}
#[tokio::test]
async fn cli_worker_and_queue_list_empty() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_worker_and_queue_list_empty — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let workers = tokio::process::Command::new(&assay_bin)
.args(["worker", "list", "--output", "json", "--engine-url", &url])
.output()
.await
.unwrap();
assert!(workers.status.success());
let v: serde_json::Value = serde_json::from_slice(&workers.stdout).unwrap();
assert!(v.is_array());
let queues = tokio::process::Command::new(&assay_bin)
.args(["queue", "stats", "--output", "json", "--engine-url", &url])
.output()
.await
.unwrap();
assert!(queues.status.success());
let v: serde_json::Value = serde_json::from_slice(&queues.stdout).unwrap();
assert!(v.is_array());
}
#[tokio::test]
async fn cli_output_formats_are_parseable() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_output_formats_are_parseable — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
tokio::process::Command::new(&assay_bin)
.args(["workflow", "start", "--type", "X", "--id", "wf-fmt", "--engine-url", &url])
.output()
.await
.unwrap();
for fmt in ["json", "jsonl", "yaml", "table"] {
let out = tokio::process::Command::new(&assay_bin)
.args(["workflow", "list", "--output", fmt, "--engine-url", &url])
.output()
.await
.expect("run list");
assert!(
out.status.success(),
"format={fmt} failed.\nstderr: {}",
String::from_utf8_lossy(&out.stderr)
);
let stdout = String::from_utf8_lossy(&out.stdout);
match fmt {
"json" => {
let parsed: serde_json::Value = serde_json::from_str(&stdout).unwrap();
assert!(parsed.is_array(), "json output is an array");
}
"jsonl" => {
for line in stdout.lines() {
if !line.trim().is_empty() {
serde_json::from_str::<serde_json::Value>(line).unwrap();
}
}
}
"yaml" => {
assert!(
stdout.trim_start().starts_with('-'),
"yaml list output should start with '- ': {stdout:?}"
);
}
"table" => {
assert!(stdout.contains("wf-fmt"));
}
_ => unreachable!(),
}
}
}
#[tokio::test]
async fn cli_input_via_stdin_resolves() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_input_via_stdin_resolves — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
tokio::process::Command::new(&assay_bin)
.args(["workflow", "start", "--type", "X", "--id", "wf-stdin", "--engine-url", &url])
.output()
.await
.unwrap();
let mut child = tokio::process::Command::new(&assay_bin)
.args(["workflow", "signal", "wf-stdin", "go", "-", "--engine-url", &url])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("spawn");
{
use tokio::io::AsyncWriteExt;
let mut stdin = child.stdin.take().unwrap();
stdin.write_all(br#"{"from":"stdin"}"#).await.unwrap();
}
let output = child.wait_with_output().await.unwrap();
assert!(
output.status.success(),
"stderr: {}",
String::from_utf8_lossy(&output.stderr)
);
}
#[tokio::test]
async fn cli_config_file_supplies_engine_url() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_config_file_supplies_engine_url — no assay binary");
return;
};
let (url, _h) = start_test_server().await;
let tmp = tempfile::tempdir().unwrap();
let cfg_path = tmp.path().join("cfg.yaml");
std::fs::write(&cfg_path, format!("engine_url: {url}\noutput: json\n")).unwrap();
let out = tokio::process::Command::new(&assay_bin)
.args(["workflow", "list", "--config", cfg_path.to_str().unwrap()])
.env_remove("ASSAY_ENGINE_URL")
.env_remove("ASSAY_API_KEY")
.output()
.await
.expect("run");
assert!(
out.status.success(),
"config should supply engine_url.\nstderr: {}",
String::from_utf8_lossy(&out.stderr)
);
let _parsed: serde_json::Value = serde_json::from_slice(&out.stdout).unwrap();
}
#[tokio::test]
async fn cli_completion_generates_for_every_shell() {
let Some(assay_bin) = locate_assay_binary() else {
eprintln!("SKIP: cli_completion_generates_for_every_shell — no assay binary");
return;
};
for shell in ["bash", "zsh", "fish", "powershell", "elvish"] {
let out = tokio::process::Command::new(&assay_bin)
.args(["completion", shell])
.output()
.await
.expect("run completion");
assert!(out.status.success(), "completion {shell}: stderr: {}", String::from_utf8_lossy(&out.stderr));
assert!(!out.stdout.is_empty(), "completion {shell}: empty output");
}
}