use std::time::Duration;
use akribes_sdk::models::{BenchEvent, BenchRunEvent, EngineEvent, HubEvent};
use akribes_sdk::{AkribesClient, AkribesError, WorkflowEvent};
use mockito::{Matcher, Server};
use tokio::time::timeout;
fn make_client(server: &Server) -> AkribesClient {
AkribesClient::builder(server.url())
.project_id(1)
.name("sse-test")
.id("sse-id")
.build()
}
fn batch_frame(events: &serde_json::Value, seq: Option<i64>) -> String {
let data = serde_json::to_string(events).unwrap();
match seq {
Some(s) => format!("event: batch\ndata: {data}\nid: {s}\n\n"),
None => format!("event: batch\ndata: {data}\n\n"),
}
}
fn exec_event(script: &str, exec_id: &str, engine: serde_json::Value) -> serde_json::Value {
serde_json::json!({
"type": "Execution",
"payload": {
"project_id": 1,
"script_name": script,
"execution_id": exec_id,
"event": engine,
}
})
}
#[tokio::test]
async fn event_stream_delivers_batched_hub_events_in_order() {
let mut server = Server::new_async().await;
let body = batch_frame(
&serde_json::json!([
exec_event(
"summarise",
"exec-1",
serde_json::json!({"type":"WorkflowStart","payload":2})
),
exec_event(
"summarise",
"exec-1",
serde_json::json!({"type":"TaskStart","payload":["summarise",null]})
),
exec_event(
"summarise",
"exec-1",
serde_json::json!({"type":"WorkflowEnd","payload":"done"})
),
]),
Some(7),
);
let _m = server
.mock("GET", "/events")
.match_query(Matcher::UrlEncoded("project_id".into(), "1".into()))
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let e1 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
let e2 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
let e3 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(
e1,
HubEvent::Execution { ref event, .. } if matches!(event, EngineEvent::WorkflowStart(2))
));
assert!(matches!(e2, HubEvent::Execution { .. }));
if let HubEvent::Execution { execution_id, .. } = &e3 {
assert_eq!(execution_id.as_deref(), Some("exec-1"));
} else {
panic!("expected Execution");
}
}
#[tokio::test]
async fn event_stream_handles_multiple_frames() {
let mut server = Server::new_async().await;
let mut body = batch_frame(
&serde_json::json!([exec_event(
"s",
"e1",
serde_json::json!({"type":"WorkflowStart","payload":1})
)]),
Some(1),
);
body.push_str(&batch_frame(
&serde_json::json!([exec_event(
"s",
"e1",
serde_json::json!({"type":"WorkflowEnd","payload":"ok"})
)]),
Some(2),
));
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let first = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
let second = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(
first,
HubEvent::Execution { ref event, .. } if matches!(event, EngineEvent::WorkflowStart(1))
));
assert!(matches!(second, HubEvent::Execution { .. }));
}
#[tokio::test]
async fn event_stream_tolerates_crlf_delimited_frames() {
let mut server = Server::new_async().await;
let data = serde_json::to_string(&serde_json::json!([exec_event(
"s",
"e1",
serde_json::json!({"type":"WorkflowStart","payload":3})
)]))
.unwrap();
let body = format!("event: batch\r\ndata: {data}\r\nid: 9\r\n\r\n");
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(
evt,
HubEvent::Execution { ref event, .. } if matches!(event, EngineEvent::WorkflowStart(3))
));
}
#[tokio::test]
async fn event_stream_skips_malformed_json_then_continues() {
let mut server = Server::new_async().await;
let mut body = String::from("event: batch\ndata: {not valid json}\n\n");
body.push_str(&batch_frame(
&serde_json::json!([exec_event(
"s",
"e1",
serde_json::json!({"type":"WorkflowEnd","payload":"recovered"})
)]),
Some(2),
));
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(evt, HubEvent::Execution { .. }));
}
#[tokio::test]
async fn event_stream_ignores_non_batch_event_types() {
let mut server = Server::new_async().await;
let mut body = String::from("event: error\ndata: {\"lagged\":5}\n\n");
body.push_str(&batch_frame(
&serde_json::json!([exec_event(
"s",
"e1",
serde_json::json!({"type":"WorkflowStart","payload":1})
)]),
None,
));
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(evt, HubEvent::Execution { .. }));
}
#[tokio::test]
async fn event_stream_adds_script_name_query_when_filtered() {
let mut server = Server::new_async().await;
let _m = server
.mock("GET", "/events")
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("project_id".into(), "1".into()),
Matcher::UrlEncoded("script_name".into(), "foo".into()),
]))
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(batch_frame(
&serde_json::json!([exec_event(
"foo",
"e1",
serde_json::json!({"type":"WorkflowStart","payload":1})
)]),
None,
))
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client
.project(1)
.events()
.event_stream(Some("foo"))
.await
.unwrap();
let _ = timeout(Duration::from_secs(5), rx.recv()).await.unwrap();
}
#[tokio::test]
async fn execution_stream_filters_to_engine_events() {
let mut server = Server::new_async().await;
let mut body = batch_frame(
&serde_json::json!([{
"type":"Registry",
"payload":{"type":"ScriptUpdated","payload":{
"project_id":1,"script_name":"summarise","version_id":5,"channel":"production"
}}
}]),
None,
);
body.push_str(&batch_frame(
&serde_json::json!([exec_event(
"summarise",
"e1",
serde_json::json!({"type":"WorkflowStart","payload":4})
)]),
None,
));
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client
.project(1)
.events()
.execution_stream("summarise")
.await
.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(evt, EngineEvent::WorkflowStart(4)));
}
#[tokio::test]
async fn run_stream_drains_to_terminal_output() {
let mut server = Server::new_async().await;
let sse_body = batch_frame(
&serde_json::json!([
exec_event(
"summarise",
"exec-99",
serde_json::json!({"type":"WorkflowStart","payload":1})
),
exec_event(
"summarise",
"exec-99",
serde_json::json!({
"type":"AgentOutput",
"payload":{"task_name":"summarise","agent_name":null,"task_id":"t1",
"schema_type":null,"chunk":"hi"}
})
),
exec_event(
"summarise",
"exec-99",
serde_json::json!({"type":"WorkflowEnd","payload":{"answer":42}})
),
]),
Some(3),
);
let _sse = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(sse_body)
.create_async()
.await;
let _run = server
.mock("POST", "/projects/1/scripts/summarise/run")
.match_query(Matcher::UrlEncoded("channel".into(), "production".into()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"execution_id":"exec-99"}"#)
.create_async()
.await;
let client = make_client(&server);
let executions = client.project(1).executions();
let stream = executions
.run_stream(executions.run("summarise"))
.await
.unwrap();
assert_eq!(stream.execution_id, "exec-99");
let out = timeout(Duration::from_secs(10), stream.output())
.await
.expect("run_stream should resolve before timeout")
.expect("output ok");
assert_eq!(out, serde_json::json!({"answer": 42}));
}
#[tokio::test]
async fn run_stream_filters_out_other_executions_of_same_script() {
let mut server = Server::new_async().await;
let sse_body = batch_frame(
&serde_json::json!([
exec_event(
"summarise",
"exec-OTHER",
serde_json::json!({"type":"WorkflowEnd","payload":"WRONG"})
),
exec_event(
"summarise",
"exec-mine",
serde_json::json!({"type":"WorkflowStart","payload":1})
),
exec_event(
"summarise",
"exec-mine",
serde_json::json!({"type":"WorkflowEnd","payload":"RIGHT"})
),
]),
Some(3),
);
let _sse = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(sse_body)
.create_async()
.await;
let _run = server
.mock("POST", "/projects/1/scripts/summarise/run")
.match_query(Matcher::UrlEncoded("channel".into(), "production".into()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"execution_id":"exec-mine"}"#)
.create_async()
.await;
let client = make_client(&server);
let executions = client.project(1).executions();
let stream = executions
.run_stream(executions.run("summarise"))
.await
.unwrap();
let out = timeout(Duration::from_secs(10), stream.output())
.await
.expect("resolve before timeout")
.expect("ok");
assert_eq!(out, serde_json::json!("RIGHT"));
}
#[tokio::test]
async fn run_stream_classifies_terminal_error_event() {
let mut server = Server::new_async().await;
let error_engine = serde_json::to_value(akribes_types::event::EngineEvent::error_kind(
akribes_types::error::ErrorKind::RateLimit,
"429 from provider",
))
.unwrap();
let sse_body = batch_frame(
&serde_json::json!([
exec_event(
"summarise",
"exec-err",
serde_json::json!({"type":"WorkflowStart","payload":1})
),
exec_event("summarise", "exec-err", error_engine),
]),
Some(2),
);
let _sse = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(sse_body)
.create_async()
.await;
let _run = server
.mock("POST", "/projects/1/scripts/summarise/run")
.match_query(Matcher::UrlEncoded("channel".into(), "production".into()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"execution_id":"exec-err"}"#)
.create_async()
.await;
let client = make_client(&server);
let executions = client.project(1).executions();
let stream = executions
.run_stream(executions.run("summarise"))
.await
.unwrap();
let err = timeout(Duration::from_secs(10), stream.output())
.await
.expect("resolve before timeout")
.expect_err("should error");
match err {
akribes_sdk::AkribesError::Transient {
execution_id,
status,
..
} => {
assert_eq!(execution_id.as_deref(), Some("exec-err"));
assert_eq!(status, Some(429));
}
other => panic!("expected Transient, got {other:?}"),
}
}
#[tokio::test]
async fn run_stream_yields_typed_events_via_next() {
let mut server = Server::new_async().await;
let sse_body = batch_frame(
&serde_json::json!([
exec_event(
"summarise",
"x",
serde_json::json!({"type":"WorkflowStart","payload":1})
),
exec_event(
"summarise",
"x",
serde_json::json!({
"type":"AgentOutput",
"payload":{"task_name":"summarise","agent_name":null,"task_id":"t1",
"schema_type":null,"chunk":"chunk-A"}
})
),
exec_event(
"summarise",
"x",
serde_json::json!({"type":"WorkflowEnd","payload":null})
),
]),
Some(3),
);
let _sse = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(sse_body)
.create_async()
.await;
let _run = server
.mock("POST", "/projects/1/scripts/summarise/run")
.match_query(Matcher::UrlEncoded("channel".into(), "production".into()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"execution_id":"x"}"#)
.create_async()
.await;
let client = make_client(&server);
let executions = client.project(1).executions();
let mut stream = executions
.run_stream(executions.run("summarise"))
.await
.unwrap();
let mut chunks = Vec::new();
let mut saw_end = false;
while let Some(item) = timeout(Duration::from_secs(10), stream.next())
.await
.expect("event before timeout")
{
match item.unwrap() {
WorkflowEvent::AgentChunk { chunk, .. } => chunks.push(chunk),
WorkflowEvent::End { .. } => saw_end = true,
_ => {}
}
}
assert_eq!(chunks, vec!["chunk-A".to_string()]);
assert!(saw_end, "stream must yield the terminal End event");
}
#[tokio::test]
async fn run_stream_surfaces_post_run_failure() {
let mut server = Server::new_async().await;
let _sse = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body("\n\n")
.create_async()
.await;
let _run = server
.mock("POST", "/projects/1/scripts/summarise/run")
.match_query(Matcher::UrlEncoded("channel".into(), "production".into()))
.with_status(422)
.with_body("missing required input")
.create_async()
.await;
let client = make_client(&server);
let executions = client.project(1).executions();
let err = timeout(
Duration::from_secs(10),
executions.run_stream(executions.run("summarise")),
)
.await
.expect("run_stream should return before timeout")
.expect_err("POST /run failed → run_stream must error");
match err {
akribes_sdk::AkribesError::HttpStatus { status, .. } => assert_eq!(status, 422),
other => panic!("expected HttpStatus 422, got {other:?}"),
}
}
fn bench_result_frame(result: &serde_json::Value) -> String {
let data = serde_json::to_string(result).unwrap();
format!("event: result\ndata: {data}\n\n")
}
#[tokio::test]
async fn bench_subscribe_yields_results_then_terminal() {
let mut server = Server::new_async().await;
let mut body = bench_result_frame(&serde_json::json!({
"id": 1, "bench_run_id": 42, "case_id": "case_a",
"workflow_execution_id": "exec_a", "judge_execution_id": "judge_a",
"score": {"quality": 0.9}, "headline_score": 0.9, "status": "ok",
"cost_usd": 0.01, "duration_ms": 1200, "cache_hit": false,
"created_at": "2026-01-01T00:00:00Z"
}));
body.push_str(&bench_result_frame(&serde_json::json!({
"id": 2, "bench_run_id": 42, "case_id": "case_b",
"workflow_execution_id": null, "judge_execution_id": null,
"score": null, "headline_score": null, "status": "workflow_failed",
"cost_usd": 0.0, "duration_ms": null, "cache_hit": false,
"error": "boom", "created_at": "2026-01-01T00:00:01Z"
})));
body.push_str("event: terminal\ndata: {\"status\":\"completed\"}\n\n");
let _m = server
.mock("GET", "/bench-runs/42/events")
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client
.bench_runs()
.subscribe_run_events(42)
.await
.expect("subscribe ok");
let e1 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e1 {
BenchRunEvent::Result(r) => {
assert_eq!(r.case_id, "case_a");
assert_eq!(r.status, "ok");
assert_eq!(r.headline_score, Some(0.9));
assert_eq!(r.score.as_ref().unwrap()["quality"], 0.9);
assert!(!r.cache_hit);
}
other => panic!("expected Result, got {other:?}"),
}
let e2 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e2 {
BenchRunEvent::Result(r) => {
assert_eq!(r.case_id, "case_b");
assert_eq!(r.status, "workflow_failed");
assert_eq!(r.error.as_deref(), Some("boom"));
assert!(r.headline_score.is_none());
}
other => panic!("expected Result, got {other:?}"),
}
let e3 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e3 {
BenchRunEvent::Terminal { status } => assert_eq!(status, "completed"),
other => panic!("expected Terminal, got {other:?}"),
}
let after = timeout(Duration::from_secs(5), rx.recv()).await.unwrap();
assert!(after.is_none(), "stream must close after terminal");
}
#[tokio::test]
async fn bench_subscribe_surfaces_lagged_frame() {
let mut server = Server::new_async().await;
let mut body = String::from("event: lagged\ndata: {\"dropped\":7}\n\n");
body.push_str(&bench_result_frame(&serde_json::json!({
"id": 3, "bench_run_id": 42, "case_id": "case_c",
"status": "cached", "cost_usd": 0.0, "cache_hit": true,
"created_at": "2026-01-01T00:00:02Z"
})));
body.push_str("event: terminal\ndata: {\"status\":\"completed\"}\n\n");
let _m = server
.mock("GET", "/bench-runs/42/events")
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client
.bench_runs()
.subscribe_run_events(42)
.await
.expect("subscribe ok");
let e1 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e1 {
BenchRunEvent::Lagged { dropped } => assert_eq!(dropped, 7),
other => panic!("expected Lagged, got {other:?}"),
}
let e2 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e2 {
BenchRunEvent::Result(r) => {
assert_eq!(r.case_id, "case_c");
assert!(r.cache_hit);
assert_eq!(r.status, "cached");
}
other => panic!("expected Result, got {other:?}"),
}
}
#[tokio::test]
async fn bench_subscribe_tolerates_crlf_and_ignores_unknown_events() {
let mut server = Server::new_async().await;
let data = serde_json::to_string(&serde_json::json!({
"id": 9, "bench_run_id": 42, "case_id": "case_z",
"status": "ok", "headline_score": 0.5, "cost_usd": 0.0,
"cache_hit": false, "created_at": "2026-01-01T00:00:03Z"
}))
.unwrap();
let mut body = String::from("event: keepalive\r\ndata: {}\r\n\r\n");
body.push_str(&format!("event: result\r\ndata: {data}\r\n\r\n"));
body.push_str("event: terminal\r\ndata: {\"status\":\"failed\"}\r\n\r\n");
let _m = server
.mock("GET", "/bench-runs/42/events")
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client
.bench_runs()
.subscribe_run_events(42)
.await
.expect("subscribe ok");
let e1 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e1 {
BenchRunEvent::Result(r) => assert_eq!(r.case_id, "case_z"),
other => panic!("expected Result (keepalive must be skipped), got {other:?}"),
}
let e2 = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match e2 {
BenchRunEvent::Terminal { status } => assert_eq!(status, "failed"),
other => panic!("expected Terminal, got {other:?}"),
}
}
#[tokio::test]
async fn bench_subscribe_surfaces_non_2xx_at_subscribe_time() {
let mut server = Server::new_async().await;
let _m = server
.mock("GET", "/bench-runs/42/events")
.with_status(403)
.with_body("forbidden")
.create_async()
.await;
let client = make_client(&server);
let res = timeout(
Duration::from_secs(5),
client.bench_runs().subscribe_run_events(42),
)
.await
.expect("subscribe returns before timeout");
match res {
Ok(_) => panic!("403 must surface as an error, not a live stream"),
Err(AkribesError::HttpStatus { status, .. }) => assert_eq!(status, 403),
Err(other) => panic!("expected HttpStatus 403, got {other:?}"),
}
}
fn bench_run_json(id: i64, status: &str) -> serde_json::Value {
serde_json::json!({
"id": id, "bench_id": 7, "channel": "production",
"workflow_version_id": 11, "judge_version_id": 12,
"status": status, "triggered_at": "2026-01-01T00:00:00Z"
})
}
fn bench_result_json(id: i64, case_id: &str, status: &str) -> serde_json::Value {
serde_json::json!({
"id": id, "bench_run_id": 42, "case_id": case_id,
"status": status, "created_at": "2026-01-01T00:00:01Z"
})
}
#[tokio::test]
async fn event_stream_yields_typed_bench_run_started() {
let mut server = Server::new_async().await;
let body = batch_frame(
&serde_json::json!([{
"type": "Bench",
"payload": {
"type": "RunStarted",
"payload": {
"project_id": 1, "script_name": "summarise",
"run": bench_run_json(99, "running")
}
}
}]),
Some(1),
);
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match evt {
HubEvent::Bench(BenchEvent::RunStarted {
project_id,
script_name,
run,
}) => {
assert_eq!(project_id, 1);
assert_eq!(script_name, "summarise");
assert_eq!(run.id, 99);
assert_eq!(run.status, "running");
}
other => panic!("expected Bench(RunStarted), got {other:?}"),
}
}
#[tokio::test]
async fn event_stream_yields_typed_bench_result_recorded() {
let mut server = Server::new_async().await;
let body = batch_frame(
&serde_json::json!([{
"type": "Bench",
"payload": {
"type": "ResultRecorded",
"payload": {
"project_id": 1, "script_name": "summarise", "run_id": 42,
"result": bench_result_json(5, "case_a", "ok")
}
}
}]),
Some(1),
);
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match evt {
HubEvent::Bench(BenchEvent::ResultRecorded {
project_id,
script_name,
run_id,
result,
}) => {
assert_eq!(project_id, 1);
assert_eq!(script_name, "summarise");
assert_eq!(run_id, 42);
assert_eq!(result.case_id, "case_a");
assert_eq!(result.status, "ok");
}
other => panic!("expected Bench(ResultRecorded), got {other:?}"),
}
}
#[tokio::test]
async fn event_stream_yields_typed_bench_run_finished() {
let mut server = Server::new_async().await;
let body = batch_frame(
&serde_json::json!([{
"type": "Bench",
"payload": {
"type": "RunFinished",
"payload": {
"project_id": 1, "script_name": "summarise",
"run": bench_run_json(99, "completed")
}
}
}]),
Some(1),
);
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match evt {
HubEvent::Bench(BenchEvent::RunFinished { run, .. }) => {
assert_eq!(run.id, 99);
assert_eq!(run.status, "completed");
}
other => panic!("expected Bench(RunFinished), got {other:?}"),
}
}
#[tokio::test]
async fn event_stream_keeps_known_events_past_unknown_type_in_batch() {
let mut server = Server::new_async().await;
let body = batch_frame(
&serde_json::json!([
{ "type": "Wibble", "payload": { "anything": 123 } },
exec_event(
"summarise",
"exec-1",
serde_json::json!({"type":"WorkflowEnd","payload":"survived"})
),
]),
Some(1),
);
let _m = server
.mock("GET", "/events")
.match_query(Matcher::Any)
.with_status(200)
.with_header("content-type", "text/event-stream")
.with_body(body)
.create_async()
.await;
let client = make_client(&server);
let (mut rx, _sub) = client.project(1).events().event_stream(None).await.unwrap();
let evt = timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
match evt {
HubEvent::Execution {
event,
execution_id,
..
} => {
assert_eq!(execution_id.as_deref(), Some("exec-1"));
assert!(matches!(event, EngineEvent::WorkflowEnd(_)));
}
other => panic!("expected the Execution event to survive the unknown one, got {other:?}"),
}
}