#[test]
fn load_run_events_jsonl_filters_since_and_tail() {
let test_root = std::env::temp_dir().join(format!("run-events-test-{}", Uuid::new_v4()));
std::fs::create_dir_all(&test_root).expect("mkdir");
let path = test_root.join("events.jsonl");
std::fs::write(
&path,
[
serde_json::json!({"seq":1,"type":"run_created"}).to_string(),
serde_json::json!({"seq":2,"type":"planning_started"}).to_string(),
serde_json::json!({"seq":3,"type":"task_started"}).to_string(),
]
.join("\n"),
)
.expect("write");
let since = load_run_events_jsonl(&path, Some(1), None);
assert_eq!(since.len(), 2);
assert_eq!(since[0].get("seq").and_then(|v| v.as_u64()), Some(2));
assert_eq!(since[1].get("seq").and_then(|v| v.as_u64()), Some(3));
let tail = load_run_events_jsonl(&path, None, Some(1));
assert_eq!(tail.len(), 1);
assert_eq!(tail[0].get("seq").and_then(|v| v.as_u64()), Some(3));
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir_all(&test_root);
}
#[tokio::test]
async fn context_run_create_append_event_and_get() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.header("x-tandem-org-id", "acme")
.header("x-tandem-workspace-id", "north")
.header("x-user-id", "user-1")
.body(Body::from(
json!({
"run_id": "ctx-run-1",
"objective": "test context run",
"model_provider": "openrouter",
"model_id": "google/gemini-3-flash-preview"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let create_body = to_bytes(create_resp.into_body(), usize::MAX)
.await
.expect("create body");
let create_payload: Value = serde_json::from_slice(&create_body).expect("create json");
assert_eq!(
create_payload
.get("run")
.and_then(|run| run.get("tenant_context"))
.and_then(|tenant| tenant.get("org_id"))
.and_then(Value::as_str),
Some("acme")
);
assert_eq!(
create_payload
.get("run")
.and_then(|run| run.get("tenant_context"))
.and_then(|tenant| tenant.get("workspace_id"))
.and_then(Value::as_str),
Some("north")
);
let event_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-1/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "planning_started",
"status": "planning",
"payload": {"k":"v"}
})
.to_string(),
))
.expect("event request");
let event_resp = app
.clone()
.oneshot(event_req)
.await
.expect("event response");
assert_eq!(event_resp.status(), StatusCode::OK);
let ledger_event_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-1/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "tool_effect_recorded",
"status": "running",
"payload": {
"record": {
"session_id": "session-1",
"message_id": "message-1",
"tool": "read",
"phase": "invocation",
"status": "started",
"args_summary": {"type":"object","field_count":1,"keys":["path"]},
}
}
})
.to_string(),
))
.expect("ledger event request");
let ledger_event_resp = app
.clone()
.oneshot(ledger_event_req)
.await
.expect("ledger event response");
assert_eq!(ledger_event_resp.status(), StatusCode::OK);
let mutation_event_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-1/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "mutation_checkpoint_recorded",
"status": "running",
"payload": {
"record": {
"session_id": "session-1",
"message_id": "message-2",
"tool": "write",
"outcome": "succeeded",
"file_count": 1,
"changed_file_count": 1,
"files": [{
"path": "src/lib.rs",
"resolved_path": "/workspace/src/lib.rs",
"existed_before": false,
"existed_after": true,
"changed": true,
"rollback_snapshot": {
"status": "not_needed"
}
}]
}
}
})
.to_string(),
))
.expect("mutation event request");
let mutation_event_resp = app
.clone()
.oneshot(mutation_event_req)
.await
.expect("mutation event response");
assert_eq!(mutation_event_resp.status(), StatusCode::OK);
let rollback_blocked_event_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-1/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "rollback_execution_blocked",
"status": "running",
"payload": {
"reason": "rollback execution is not allowed for the current run status",
"selected_event_ids": ["evt-1"]
}
})
.to_string(),
))
.expect("rollback blocked event request");
let rollback_blocked_event_resp = app
.clone()
.oneshot(rollback_blocked_event_req)
.await
.expect("rollback blocked event response");
assert_eq!(rollback_blocked_event_resp.status(), StatusCode::OK);
let list_events_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-1/events?since_seq=0")
.body(Body::empty())
.expect("list events request");
let list_events_resp = app
.clone()
.oneshot(list_events_req)
.await
.expect("list events response");
assert_eq!(list_events_resp.status(), StatusCode::OK);
let list_events_body = to_bytes(list_events_resp.into_body(), usize::MAX)
.await
.expect("list events body");
let list_events_payload: Value =
serde_json::from_slice(&list_events_body).expect("list events json");
assert_eq!(
list_events_payload
.get("events")
.and_then(|v| v.as_array())
.map(|rows| rows.len()),
Some(4)
);
let get_run_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-1")
.body(Body::empty())
.expect("get run request");
let get_run_resp = app
.clone()
.oneshot(get_run_req)
.await
.expect("get run response");
assert_eq!(get_run_resp.status(), StatusCode::OK);
let get_run_body = to_bytes(get_run_resp.into_body(), usize::MAX)
.await
.expect("get run body");
let get_run_payload: Value = serde_json::from_slice(&get_run_body).expect("get run json");
assert_eq!(
get_run_payload
.get("run")
.and_then(|run| run.get("status"))
.and_then(Value::as_str),
Some("running")
);
assert_eq!(
get_run_payload
.get("run")
.and_then(|run| run.get("model_provider"))
.and_then(Value::as_str),
Some("openrouter")
);
assert_eq!(
get_run_payload
.get("run")
.and_then(|run| run.get("model_id"))
.and_then(Value::as_str),
Some("google/gemini-3-flash-preview")
);
assert_eq!(
get_run_payload
.get("ledger_summary")
.and_then(|summary| summary.get("record_count"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("ledger_summary")
.and_then(|summary| summary.get("by_tool"))
.and_then(|tools| tools.get("read"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("mutation_checkpoint_summary")
.and_then(|summary| summary.get("record_count"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("mutation_checkpoint_summary")
.and_then(|summary| summary.get("by_tool"))
.and_then(|tools| tools.get("write"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("rollback_preview_summary")
.and_then(|summary| summary.get("executable"))
.and_then(Value::as_bool),
Some(true)
);
assert_eq!(
get_run_payload
.get("rollback_preview_summary")
.and_then(|summary| summary.get("step_count"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("rollback_history_summary")
.and_then(|summary| summary.get("entry_count"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("rollback_history_summary")
.and_then(|summary| summary.get("by_outcome"))
.and_then(|counts| counts.get("blocked"))
.and_then(Value::as_u64),
Some(1)
);
assert_eq!(
get_run_payload
.get("last_rollback_outcome")
.and_then(|value| value.get("outcome"))
.and_then(Value::as_str),
Some("blocked")
);
assert_eq!(
get_run_payload
.get("last_rollback_outcome")
.and_then(|value| value.get("reason"))
.and_then(Value::as_str),
Some("rollback execution is not allowed for the current run status")
);
assert_eq!(
get_run_payload
.get("rollback_policy")
.and_then(|summary| summary.get("eligible"))
.and_then(Value::as_bool),
Some(false)
);
assert_eq!(
get_run_payload
.get("rollback_policy")
.and_then(|summary| summary.get("required_policy_ack"))
.and_then(Value::as_str),
Some("allow_rollback_execution")
);
}
#[tokio::test]
async fn context_run_event_step_completed_sets_done_status() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-step-done",
"objective": "step done transition"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let sync_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-step-done/todos/sync")
.header("content-type", "application/json")
.body(Body::from(
json!({
"replace": true,
"todos": [{"id":"step-1","content":"Do thing","status":"pending"}]
})
.to_string(),
))
.expect("sync request");
let sync_resp = app.clone().oneshot(sync_req).await.expect("sync response");
assert_eq!(sync_resp.status(), StatusCode::OK);
let started_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-step-done/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "step_started",
"status": "running",
"step_id": "step-1",
"payload": {"step_status":"in_progress"}
})
.to_string(),
))
.expect("started request");
let started_resp = app
.clone()
.oneshot(started_req)
.await
.expect("started response");
assert_eq!(started_resp.status(), StatusCode::OK);
let completed_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-step-done/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "step_completed",
"status": "running",
"step_id": "step-1",
"payload": {"step_status":"done"}
})
.to_string(),
))
.expect("completed request");
let completed_resp = app
.clone()
.oneshot(completed_req)
.await
.expect("completed response");
assert_eq!(completed_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-step-done")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
assert_eq!(get_resp.status(), StatusCode::OK);
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
assert_eq!(
get_payload
.get("run")
.and_then(|run| run.get("steps"))
.and_then(Value::as_array)
.and_then(|steps| steps.first())
.and_then(|step| step.get("status"))
.and_then(Value::as_str),
Some("done")
);
}
#[tokio::test]
async fn context_run_list_supports_workspace_filter_and_limit() {
let state = test_state().await;
let app = app_router(state.clone());
let create_one = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-list-1",
"objective": "first",
"workspace": {
"workspace_id": "ws-1",
"canonical_path": "/tmp/ws-one",
"lease_epoch": 1
}
})
.to_string(),
))
.expect("create one request");
let create_two = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-list-2",
"objective": "second",
"workspace": {
"workspace_id": "ws-2",
"canonical_path": "/tmp/ws-two",
"lease_epoch": 1
}
})
.to_string(),
))
.expect("create two request");
let _ = app.clone().oneshot(create_one).await.expect("create one");
let _ = app.clone().oneshot(create_two).await.expect("create two");
let filtered_req = Request::builder()
.method("GET")
.uri("/context/runs?workspace=/tmp/ws-two&limit=1")
.body(Body::empty())
.expect("filtered list request");
let filtered_resp = app
.clone()
.oneshot(filtered_req)
.await
.expect("filtered list response");
assert_eq!(filtered_resp.status(), StatusCode::OK);
let filtered_body = to_bytes(filtered_resp.into_body(), usize::MAX)
.await
.expect("filtered list body");
let filtered_payload: Value =
serde_json::from_slice(&filtered_body).expect("filtered list json");
let rows = filtered_payload
.get("runs")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0]
.get("workspace")
.and_then(|v| v.get("canonical_path"))
.and_then(Value::as_str),
Some("/tmp/ws-two")
);
}
#[tokio::test]
async fn context_run_lease_mismatch_pauses_run() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-lease",
"objective": "lease mismatch",
"workspace": {
"workspace_id": "ws-1",
"canonical_path": "/expected/path",
"lease_epoch": 1
}
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let validate_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-lease/lease/validate")
.header("content-type", "application/json")
.body(Body::from(
json!({
"phase": "pre_dispatch",
"current_path": "/other/path",
"step_id": "step-1"
})
.to_string(),
))
.expect("validate request");
let validate_resp = app
.clone()
.oneshot(validate_req)
.await
.expect("validate response");
assert_eq!(validate_resp.status(), StatusCode::OK);
let validate_body = to_bytes(validate_resp.into_body(), usize::MAX)
.await
.expect("validate body");
let validate_payload: Value = serde_json::from_slice(&validate_body).expect("validate json");
assert_eq!(
validate_payload.get("mismatch").and_then(Value::as_bool),
Some(true)
);
let get_run_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-lease")
.body(Body::empty())
.expect("get run request");
let get_run_resp = app
.clone()
.oneshot(get_run_req)
.await
.expect("get run response");
let get_run_body = to_bytes(get_run_resp.into_body(), usize::MAX)
.await
.expect("get run body");
let get_run_payload: Value = serde_json::from_slice(&get_run_body).expect("get run json");
assert_eq!(
get_run_payload
.get("run")
.and_then(|run| run.get("status"))
.and_then(Value::as_str),
Some("paused")
);
}
#[tokio::test]
async fn context_run_replay_matches_persisted_state_without_drift() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-replay-ok",
"objective": "replay no drift"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let event_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-replay-ok/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "step_started",
"status": "running",
"step_id": "s1",
"payload": {
"step_title": "Plan",
"step_status": "in_progress",
"why_next_step": "Need active planning"
}
})
.to_string(),
))
.expect("event request");
let event_resp = app
.clone()
.oneshot(event_req)
.await
.expect("event response");
assert_eq!(event_resp.status(), StatusCode::OK);
let replay_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-replay-ok/replay")
.body(Body::empty())
.expect("replay request");
let replay_resp = app
.clone()
.oneshot(replay_req)
.await
.expect("replay response");
assert_eq!(replay_resp.status(), StatusCode::OK);
let replay_body = to_bytes(replay_resp.into_body(), usize::MAX)
.await
.expect("replay body");
let replay_payload: Value = serde_json::from_slice(&replay_body).expect("replay json");
assert_eq!(
replay_payload
.get("drift")
.and_then(|d| d.get("mismatch"))
.and_then(Value::as_bool),
Some(false)
);
assert_eq!(
replay_payload
.get("replay")
.and_then(|r| r.get("status"))
.and_then(Value::as_str),
Some("running")
);
}
#[tokio::test]
async fn context_run_replay_detects_status_drift() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-replay-drift",
"objective": "replay drift"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let event_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-replay-drift/events")
.header("content-type", "application/json")
.body(Body::from(
json!({
"type": "planning_started",
"status": "planning",
"payload": {}
})
.to_string(),
))
.expect("event request");
let event_resp = app
.clone()
.oneshot(event_req)
.await
.expect("event response");
assert_eq!(event_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-replay-drift")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let mut get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
get_payload["run"]["status"] = Value::String("failed".to_string());
let put_req = Request::builder()
.method("PUT")
.uri("/context/runs/ctx-run-replay-drift")
.header("content-type", "application/json")
.body(Body::from(
get_payload
.get("run")
.cloned()
.expect("run payload")
.to_string(),
))
.expect("put request");
let put_resp = app.clone().oneshot(put_req).await.expect("put response");
assert_eq!(put_resp.status(), StatusCode::OK);
let replay_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-replay-drift/replay")
.body(Body::empty())
.expect("replay request");
let replay_resp = app
.clone()
.oneshot(replay_req)
.await
.expect("replay response");
assert_eq!(replay_resp.status(), StatusCode::OK);
let replay_body = to_bytes(replay_resp.into_body(), usize::MAX)
.await
.expect("replay body");
let replay_payload: Value = serde_json::from_slice(&replay_body).expect("replay json");
assert_eq!(
replay_payload
.get("drift")
.and_then(|d| d.get("mismatch"))
.and_then(Value::as_bool),
Some(true)
);
assert_eq!(
replay_payload
.get("drift")
.and_then(|d| d.get("status_mismatch"))
.and_then(Value::as_bool),
Some(true)
);
}
#[tokio::test]
async fn context_run_driver_next_selects_runnable_step_and_sets_why() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-driver-1",
"objective": "meta manager select"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-1")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let mut get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
get_payload["run"]["steps"] = json!([
{"step_id":"s1","title":"Plan","status":"pending"},
{"step_id":"s2","title":"Execute","status":"runnable"}
]);
let put_req = Request::builder()
.method("PUT")
.uri("/context/runs/ctx-run-driver-1")
.header("content-type", "application/json")
.body(Body::from(
get_payload
.get("run")
.cloned()
.expect("run payload")
.to_string(),
))
.expect("put request");
let put_resp = app.clone().oneshot(put_req).await.expect("put response");
assert_eq!(put_resp.status(), StatusCode::OK);
let next_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-driver-1/driver/next")
.header("content-type", "application/json")
.body(Body::from(json!({}).to_string()))
.expect("next request");
let next_resp = app.clone().oneshot(next_req).await.expect("next response");
assert_eq!(next_resp.status(), StatusCode::OK);
let next_body = to_bytes(next_resp.into_body(), usize::MAX)
.await
.expect("next body");
let next_payload: Value = serde_json::from_slice(&next_body).expect("next json");
assert_eq!(
next_payload.get("selected_step_id").and_then(Value::as_str),
Some("s2")
);
assert!(next_payload
.get("why_next_step")
.and_then(Value::as_str)
.map(|v| !v.trim().is_empty())
.unwrap_or(false));
let run_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-1")
.body(Body::empty())
.expect("run request");
let run_resp = app.clone().oneshot(run_req).await.expect("run response");
let run_body = to_bytes(run_resp.into_body(), usize::MAX)
.await
.expect("run body");
let run_payload: Value = serde_json::from_slice(&run_body).expect("run json");
assert_eq!(
run_payload
.get("run")
.and_then(|r| r.get("status"))
.and_then(Value::as_str),
Some("running")
);
assert_eq!(
run_payload
.get("run")
.and_then(|r| r.get("steps"))
.and_then(Value::as_array)
.and_then(|steps| steps.get(1))
.and_then(|step| step.get("status"))
.and_then(Value::as_str),
Some("in_progress")
);
}
#[tokio::test]
async fn context_run_driver_next_respects_terminal_state() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-driver-2",
"objective": "terminal check"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-2")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let mut get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
get_payload["run"]["status"] = Value::String("completed".to_string());
let put_req = Request::builder()
.method("PUT")
.uri("/context/runs/ctx-run-driver-2")
.header("content-type", "application/json")
.body(Body::from(
get_payload
.get("run")
.cloned()
.expect("run payload")
.to_string(),
))
.expect("put request");
let put_resp = app.clone().oneshot(put_req).await.expect("put response");
assert_eq!(put_resp.status(), StatusCode::OK);
let next_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-driver-2/driver/next")
.header("content-type", "application/json")
.body(Body::from(json!({}).to_string()))
.expect("next request");
let next_resp = app.clone().oneshot(next_req).await.expect("next response");
assert_eq!(next_resp.status(), StatusCode::OK);
let next_body = to_bytes(next_resp.into_body(), usize::MAX)
.await
.expect("next body");
let next_payload: Value = serde_json::from_slice(&next_body).expect("next json");
assert_eq!(next_payload.get("selected_step_id"), Some(&Value::Null));
assert_eq!(
next_payload.get("target_status").and_then(Value::as_str),
Some("completed")
);
}
#[tokio::test]
async fn context_run_driver_next_dry_run_does_not_mutate_state_or_events() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-driver-dry",
"objective": "dry run guardrail"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-dry")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let mut get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
get_payload["run"]["steps"] = json!([
{"step_id":"s1","title":"Plan","status":"runnable"}
]);
let before_revision = get_payload["run"]["revision"]
.as_u64()
.expect("before revision");
let put_req = Request::builder()
.method("PUT")
.uri("/context/runs/ctx-run-driver-dry")
.header("content-type", "application/json")
.body(Body::from(
get_payload
.get("run")
.cloned()
.expect("run payload")
.to_string(),
))
.expect("put request");
let put_resp = app.clone().oneshot(put_req).await.expect("put response");
assert_eq!(put_resp.status(), StatusCode::OK);
let dry_next_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-driver-dry/driver/next")
.header("content-type", "application/json")
.body(Body::from(json!({"dry_run": true}).to_string()))
.expect("dry next request");
let dry_next_resp = app
.clone()
.oneshot(dry_next_req)
.await
.expect("dry next response");
assert_eq!(dry_next_resp.status(), StatusCode::OK);
let run_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-dry")
.body(Body::empty())
.expect("run request");
let run_resp = app.clone().oneshot(run_req).await.expect("run response");
let run_body = to_bytes(run_resp.into_body(), usize::MAX)
.await
.expect("run body");
let run_payload: Value = serde_json::from_slice(&run_body).expect("run json");
assert_eq!(
run_payload
.get("run")
.and_then(|r| r.get("revision"))
.and_then(Value::as_u64),
Some(before_revision.saturating_add(1))
);
assert_eq!(
run_payload
.get("run")
.and_then(|r| r.get("steps"))
.and_then(Value::as_array)
.and_then(|steps| steps.first())
.and_then(|step| step.get("status"))
.and_then(Value::as_str),
Some("runnable")
);
let events_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-dry/events")
.body(Body::empty())
.expect("events request");
let events_resp = app
.clone()
.oneshot(events_req)
.await
.expect("events response");
let events_body = to_bytes(events_resp.into_body(), usize::MAX)
.await
.expect("events body");
let events_payload: Value = serde_json::from_slice(&events_body).expect("events json");
let has_decision_event = events_payload
.get("events")
.and_then(Value::as_array)
.map(|rows| {
rows.iter().any(|row| {
row.get("type")
.and_then(Value::as_str)
.map(|ty| ty == "meta_next_step_selected")
.unwrap_or(false)
})
})
.unwrap_or(false);
assert!(!has_decision_event);
}
#[tokio::test]
async fn context_run_driver_next_emits_decision_event_with_why() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-driver-event",
"objective": "emit decision event"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-event")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let mut get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
get_payload["run"]["steps"] = json!([
{"step_id":"s1","title":"Plan","status":"runnable"}
]);
let put_req = Request::builder()
.method("PUT")
.uri("/context/runs/ctx-run-driver-event")
.header("content-type", "application/json")
.body(Body::from(
get_payload
.get("run")
.cloned()
.expect("run payload")
.to_string(),
))
.expect("put request");
let put_resp = app.clone().oneshot(put_req).await.expect("put response");
assert_eq!(put_resp.status(), StatusCode::OK);
let next_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-driver-event/driver/next")
.header("content-type", "application/json")
.body(Body::from(json!({"dry_run": false}).to_string()))
.expect("next request");
let next_resp = app.clone().oneshot(next_req).await.expect("next response");
assert_eq!(next_resp.status(), StatusCode::OK);
let events_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-driver-event/events")
.body(Body::empty())
.expect("events request");
let events_resp = app
.clone()
.oneshot(events_req)
.await
.expect("events response");
let events_body = to_bytes(events_resp.into_body(), usize::MAX)
.await
.expect("events body");
let events_payload: Value = serde_json::from_slice(&events_body).expect("events json");
let decision_event = events_payload
.get("events")
.and_then(Value::as_array)
.and_then(|rows| {
rows.iter().find(|row| {
row.get("type")
.and_then(Value::as_str)
.map(|ty| ty == "meta_next_step_selected")
.unwrap_or(false)
})
})
.cloned()
.expect("decision event");
assert!(decision_event
.get("payload")
.and_then(|p| p.get("why_next_step"))
.and_then(Value::as_str)
.map(|why| !why.trim().is_empty())
.unwrap_or(false));
}
#[tokio::test]
async fn context_run_fault_injection_workspace_mismatch_checkpoint_replay() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-fault-1",
"objective": "fault injection path",
"workspace": {
"workspace_id": "ws-fault",
"canonical_path": "/expected/path",
"lease_epoch": 1
}
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let get_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-fault-1")
.body(Body::empty())
.expect("get request");
let get_resp = app.clone().oneshot(get_req).await.expect("get response");
let get_body = to_bytes(get_resp.into_body(), usize::MAX)
.await
.expect("get body");
let mut get_payload: Value = serde_json::from_slice(&get_body).expect("get json");
get_payload["run"]["steps"] = json!([
{"step_id":"s1","title":"Plan","status":"runnable"}
]);
let put_req = Request::builder()
.method("PUT")
.uri("/context/runs/ctx-run-fault-1")
.header("content-type", "application/json")
.body(Body::from(
get_payload
.get("run")
.cloned()
.expect("run payload")
.to_string(),
))
.expect("put request");
let put_resp = app.clone().oneshot(put_req).await.expect("put response");
assert_eq!(put_resp.status(), StatusCode::OK);
let next_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-fault-1/driver/next")
.header("content-type", "application/json")
.body(Body::from(json!({"dry_run": false}).to_string()))
.expect("next request");
let next_resp = app.clone().oneshot(next_req).await.expect("next response");
assert_eq!(next_resp.status(), StatusCode::OK);
let mismatch_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-fault-1/lease/validate")
.header("content-type", "application/json")
.body(Body::from(
json!({
"phase": "pre_tool_call",
"current_path": "/other/path",
"step_id": "s1"
})
.to_string(),
))
.expect("mismatch request");
let mismatch_resp = app
.clone()
.oneshot(mismatch_req)
.await
.expect("mismatch response");
assert_eq!(mismatch_resp.status(), StatusCode::OK);
let mismatch_body = to_bytes(mismatch_resp.into_body(), usize::MAX)
.await
.expect("mismatch body");
let mismatch_payload: Value = serde_json::from_slice(&mismatch_body).expect("mismatch json");
assert_eq!(
mismatch_payload.get("mismatch").and_then(Value::as_bool),
Some(true)
);
let checkpoint_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-fault-1/checkpoints")
.header("content-type", "application/json")
.body(Body::from(json!({"reason":"fault_injection"}).to_string()))
.expect("checkpoint request");
let checkpoint_resp = app
.clone()
.oneshot(checkpoint_req)
.await
.expect("checkpoint response");
assert_eq!(checkpoint_resp.status(), StatusCode::OK);
let events_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-fault-1/events")
.body(Body::empty())
.expect("events request");
let events_resp = app
.clone()
.oneshot(events_req)
.await
.expect("events response");
let events_body = to_bytes(events_resp.into_body(), usize::MAX)
.await
.expect("events body");
let events_payload: Value = serde_json::from_slice(&events_body).expect("events json");
let event_rows = events_payload
.get("events")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
assert!(event_rows.iter().any(|row| {
row.get("type")
.and_then(Value::as_str)
.map(|ty| ty == "meta_next_step_selected")
.unwrap_or(false)
}));
assert!(event_rows.iter().any(|row| {
row.get("type")
.and_then(Value::as_str)
.map(|ty| ty == "workspace_mismatch")
.unwrap_or(false)
}));
assert!(event_rows.iter().any(|row| {
row.get("status")
.and_then(Value::as_str)
.map(|status| status == "paused")
.unwrap_or(false)
}));
let replay_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-fault-1/replay")
.body(Body::empty())
.expect("replay request");
let replay_resp = app
.clone()
.oneshot(replay_req)
.await
.expect("replay response");
assert_eq!(replay_resp.status(), StatusCode::OK);
let replay_body = to_bytes(replay_resp.into_body(), usize::MAX)
.await
.expect("replay body");
let replay_payload: Value = serde_json::from_slice(&replay_body).expect("replay json");
assert_eq!(
replay_payload
.get("replay")
.and_then(|r| r.get("status"))
.and_then(Value::as_str),
Some("paused")
);
assert_eq!(
replay_payload
.get("drift")
.and_then(|d| d.get("mismatch"))
.and_then(Value::as_bool),
Some(false)
);
}
#[tokio::test]
async fn context_run_todos_sync_maps_to_steps_and_emits_event() {
let state = test_state().await;
let app = app_router(state.clone());
let create_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-todos-sync",
"objective": "sync todos"
})
.to_string(),
))
.expect("create request");
let create_resp = app
.clone()
.oneshot(create_req)
.await
.expect("create response");
assert_eq!(create_resp.status(), StatusCode::OK);
let sync_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-todos-sync/todos/sync")
.header("content-type", "application/json")
.body(Body::from(
json!({
"replace": true,
"source_session_id": "s-1",
"source_run_id": "r-1",
"todos": [
{"id":"task-1","content":"Plan architecture","status":"in_progress"},
{"id":"task-2","content":"Implement endpoint","status":"pending"},
{"id":"task-3","content":"Write tests","status":"completed"}
]
})
.to_string(),
))
.expect("sync request");
let sync_resp = app.clone().oneshot(sync_req).await.expect("sync response");
assert_eq!(sync_resp.status(), StatusCode::OK);
let run_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-todos-sync")
.body(Body::empty())
.expect("run request");
let run_resp = app.clone().oneshot(run_req).await.expect("run response");
let run_body = to_bytes(run_resp.into_body(), usize::MAX)
.await
.expect("run body");
let run_payload: Value = serde_json::from_slice(&run_body).expect("run json");
assert_eq!(
run_payload
.get("run")
.and_then(|r| r.get("status"))
.and_then(Value::as_str),
Some("running")
);
assert_eq!(
run_payload
.get("run")
.and_then(|r| r.get("steps"))
.and_then(Value::as_array)
.map(|rows| rows.len()),
Some(3)
);
let events_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-todos-sync/events")
.body(Body::empty())
.expect("events request");
let events_resp = app
.clone()
.oneshot(events_req)
.await
.expect("events response");
let events_body = to_bytes(events_resp.into_body(), usize::MAX)
.await
.expect("events body");
let events_payload: Value = serde_json::from_slice(&events_body).expect("events json");
let has_todo_synced = events_payload
.get("events")
.and_then(Value::as_array)
.map(|rows| {
rows.iter().any(|row| {
row.get("type")
.and_then(Value::as_str)
.map(|v| v == "todo_synced")
.unwrap_or(false)
})
})
.unwrap_or(false);
assert!(has_todo_synced);
}
#[tokio::test]
async fn context_tasks_claim_is_single_winner_under_race() {
let state = test_state().await;
let app = app_router(state.clone());
let create_run_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-task-race",
"objective": "claim race"
})
.to_string(),
))
.expect("create run request");
let create_run_resp = app
.clone()
.oneshot(create_run_req)
.await
.expect("create run response");
assert_eq!(create_run_resp.status(), StatusCode::OK);
let create_tasks_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-race/tasks")
.header("content-type", "application/json")
.body(Body::from(
json!({
"tasks": [
{
"id": "task-1",
"task_type": "unit_work",
"status": "runnable",
"payload": {"title": "Only task"}
}
]
})
.to_string(),
))
.expect("create tasks request");
let create_tasks_resp = app
.clone()
.oneshot(create_tasks_req)
.await
.expect("create tasks response");
assert_eq!(create_tasks_resp.status(), StatusCode::OK);
let claim_one = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-race/tasks/claim")
.header("content-type", "application/json")
.body(Body::from(
json!({
"agent_id": "agent-a",
"command_id": "claim-a"
})
.to_string(),
))
.expect("claim one request");
let claim_two = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-race/tasks/claim")
.header("content-type", "application/json")
.body(Body::from(
json!({
"agent_id": "agent-b",
"command_id": "claim-b"
})
.to_string(),
))
.expect("claim two request");
let (resp_one, resp_two) = tokio::join!(
app.clone().oneshot(claim_one),
app.clone().oneshot(claim_two)
);
let resp_one = resp_one.expect("claim one response");
let resp_two = resp_two.expect("claim two response");
assert_eq!(resp_one.status(), StatusCode::OK);
assert_eq!(resp_two.status(), StatusCode::OK);
let body_one = to_bytes(resp_one.into_body(), usize::MAX)
.await
.expect("claim one body");
let body_two = to_bytes(resp_two.into_body(), usize::MAX)
.await
.expect("claim two body");
let payload_one: Value = serde_json::from_slice(&body_one).expect("claim one json");
let payload_two: Value = serde_json::from_slice(&body_two).expect("claim two json");
let winner_count = [payload_one.clone(), payload_two.clone()]
.iter()
.filter(|payload| !payload.get("task").unwrap_or(&Value::Null).is_null())
.count();
assert_eq!(winner_count, 1);
}
#[tokio::test]
async fn context_task_transition_command_id_is_idempotent() {
let state = test_state().await;
let app = app_router(state.clone());
let create_run_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-task-idempotent",
"objective": "idempotent transition"
})
.to_string(),
))
.expect("create run request");
let create_run_resp = app
.clone()
.oneshot(create_run_req)
.await
.expect("create run response");
assert_eq!(create_run_resp.status(), StatusCode::OK);
let create_tasks_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-idempotent/tasks")
.header("content-type", "application/json")
.body(Body::from(
json!({
"tasks": [
{
"id": "task-1",
"task_type": "unit_work",
"status": "in_progress",
"payload": {"title": "Task"}
}
]
})
.to_string(),
))
.expect("create tasks request");
let create_tasks_resp = app
.clone()
.oneshot(create_tasks_req)
.await
.expect("create tasks response");
assert_eq!(create_tasks_resp.status(), StatusCode::OK);
let transition_req_one = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-idempotent/tasks/task-1/transition")
.header("content-type", "application/json")
.body(Body::from(
json!({
"action": "fail",
"command_id": "cmd-fail-1",
"error": "boom"
})
.to_string(),
))
.expect("transition one request");
let transition_resp_one = app
.clone()
.oneshot(transition_req_one)
.await
.expect("transition one response");
assert_eq!(transition_resp_one.status(), StatusCode::OK);
let transition_req_two = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-idempotent/tasks/task-1/transition")
.header("content-type", "application/json")
.body(Body::from(
json!({
"action": "fail",
"command_id": "cmd-fail-1",
"error": "boom"
})
.to_string(),
))
.expect("transition two request");
let transition_resp_two = app
.clone()
.oneshot(transition_req_two)
.await
.expect("transition two response");
assert_eq!(transition_resp_two.status(), StatusCode::OK);
let transition_two_body = to_bytes(transition_resp_two.into_body(), usize::MAX)
.await
.expect("transition two body");
let transition_two_payload: Value =
serde_json::from_slice(&transition_two_body).expect("transition two json");
assert_eq!(
transition_two_payload
.get("deduped")
.and_then(Value::as_bool),
Some(true)
);
}
#[tokio::test]
async fn context_task_claim_requeues_stale_lease_before_claiming() {
let state = test_state().await;
let app = app_router(state.clone());
let create_run_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-task-stale-claim",
"objective": "stale lease claim"
})
.to_string(),
))
.expect("create run request");
let create_run_resp = app
.clone()
.oneshot(create_run_req)
.await
.expect("create run response");
assert_eq!(create_run_resp.status(), StatusCode::OK);
let create_tasks_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-stale-claim/tasks")
.header("content-type", "application/json")
.body(Body::from(
json!({
"tasks": [
{
"id": "task-1",
"task_type": "unit_work",
"status": "in_progress",
"payload": { "title": "Stale task" }
}
]
})
.to_string(),
))
.expect("create tasks request");
let create_tasks_resp = app
.clone()
.oneshot(create_tasks_req)
.await
.expect("create tasks response");
assert_eq!(create_tasks_resp.status(), StatusCode::OK);
let mut run =
crate::http::context_runs::load_context_run_state(&state, "ctx-run-task-stale-claim")
.await
.expect("load run");
let task = run
.tasks
.iter_mut()
.find(|task| task.id == "task-1")
.expect("task present");
task.lease_owner = Some("agent-stale".to_string());
task.assigned_agent = Some("agent-stale".to_string());
task.lease_token = Some("lease-stale".to_string());
task.lease_expires_at_ms = Some(crate::now_ms().saturating_sub(1_000));
crate::http::context_runs::save_context_run_state(&state, &run)
.await
.expect("save stale run");
let claim_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-stale-claim/tasks/claim")
.header("content-type", "application/json")
.body(Body::from(
json!({
"agent_id": "agent-fresh",
"command_id": "claim-stale-lease"
})
.to_string(),
))
.expect("claim request");
let claim_resp = app
.clone()
.oneshot(claim_req)
.await
.expect("claim response");
assert_eq!(claim_resp.status(), StatusCode::OK);
let claim_body = to_bytes(claim_resp.into_body(), usize::MAX)
.await
.expect("claim body");
let claim_payload: Value = serde_json::from_slice(&claim_body).expect("claim json");
assert_eq!(
claim_payload
.get("task")
.and_then(|task| task.get("id"))
.and_then(Value::as_str),
Some("task-1")
);
assert_eq!(
claim_payload
.get("task")
.and_then(|task| task.get("assigned_agent"))
.and_then(Value::as_str),
Some("agent-fresh")
);
let blackboard_req = Request::builder()
.method("GET")
.uri("/context/runs/ctx-run-task-stale-claim/blackboard")
.body(Body::empty())
.expect("blackboard request");
let blackboard_resp = app
.clone()
.oneshot(blackboard_req)
.await
.expect("blackboard response");
assert_eq!(blackboard_resp.status(), StatusCode::OK);
let blackboard_body = to_bytes(blackboard_resp.into_body(), usize::MAX)
.await
.expect("blackboard body");
let blackboard_payload: Value =
serde_json::from_slice(&blackboard_body).expect("blackboard json");
let tasks = blackboard_payload
.get("blackboard")
.and_then(|value| value.get("tasks"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let task = tasks
.iter()
.find(|task| task.get("id").and_then(Value::as_str) == Some("task-1"))
.expect("task in blackboard");
assert_eq!(
task.get("status").and_then(Value::as_str),
Some("in_progress")
);
assert_eq!(
task.get("assigned_agent").and_then(Value::as_str),
Some("agent-fresh")
);
assert_eq!(
task.get("last_error").and_then(Value::as_str),
Some("task lease expired while assigned to agent-stale")
);
}
#[tokio::test]
async fn context_task_transition_rejects_task_revision_mismatch() {
let state = test_state().await;
let app = app_router(state.clone());
let create_run_req = Request::builder()
.method("POST")
.uri("/context/runs")
.header("content-type", "application/json")
.body(Body::from(
json!({
"run_id": "ctx-run-task-rev",
"objective": "rev mismatch"
})
.to_string(),
))
.expect("create run request");
let create_run_resp = app
.clone()
.oneshot(create_run_req)
.await
.expect("create run response");
assert_eq!(create_run_resp.status(), StatusCode::OK);
let create_tasks_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-rev/tasks")
.header("content-type", "application/json")
.body(Body::from(
json!({
"tasks": [
{
"id": "task-1",
"task_type": "unit_work",
"status": "runnable"
}
]
})
.to_string(),
))
.expect("create tasks request");
let create_tasks_resp = app
.clone()
.oneshot(create_tasks_req)
.await
.expect("create tasks response");
assert_eq!(create_tasks_resp.status(), StatusCode::OK);
let transition_req = Request::builder()
.method("POST")
.uri("/context/runs/ctx-run-task-rev/tasks/task-1/transition")
.header("content-type", "application/json")
.body(Body::from(
json!({
"action": "status",
"status": "in_progress",
"expected_task_rev": 999
})
.to_string(),
))
.expect("transition request");
let transition_resp = app
.clone()
.oneshot(transition_req)
.await
.expect("transition response");
assert_eq!(transition_resp.status(), StatusCode::OK);
let transition_body = to_bytes(transition_resp.into_body(), usize::MAX)
.await
.expect("transition body");
let transition_payload: Value =
serde_json::from_slice(&transition_body).expect("transition json");
assert_eq!(
transition_payload.get("ok").and_then(Value::as_bool),
Some(false)
);
assert_eq!(
transition_payload.get("code").and_then(Value::as_str),
Some("TASK_REV_MISMATCH")
);
}