use std::time::Duration;
use akribes_sdk::{AkribesError, EventCategory, WorkflowEvent};
use akribes_types::error::ErrorKind;
use akribes_types::event::EngineEvent;
use akribes_types::value::Value;
#[test]
fn conversion_round_trip_covers_all_categories() {
let progress_events = [
EngineEvent::WorkflowStart(3),
EngineEvent::WorkflowEnd(akribes_types::event::WorkflowEndPayload::new(Value::Null)),
EngineEvent::TaskStart("t".into(), None),
EngineEvent::TaskEnd {
task: "t".into(),
on_error_label: None,
value: Value::Null,
value_type: None,
duration: Duration::ZERO,
attempt: 1,
usage: None,
variant: akribes_types::event::TaskEndVariant::Success,
},
];
for evt in progress_events {
let wf: WorkflowEvent = evt.into();
assert_eq!(wf.category(), EventCategory::Progress);
}
let chunk: WorkflowEvent = EngineEvent::AgentOutput {
task_name: "t".into(),
agent_name: None,
task_id: "1".into(),
schema_type: None,
chunk: "x".into(),
}
.into();
assert_eq!(chunk.category(), EventCategory::Output);
let err: WorkflowEvent = EngineEvent::error_kind(ErrorKind::ScriptError, "boom").into();
assert_eq!(err.category(), EventCategory::Error);
}
#[test]
fn unknown_long_tail_variant_becomes_other_with_type_name() {
let evt: WorkflowEvent = EngineEvent::McpServerDegraded {
alias: "weather".into(),
reason: "timeout".into(),
}
.into();
match evt {
WorkflowEvent::Other { type_name, payload } => {
assert_eq!(type_name, "McpServerDegraded");
assert_eq!(payload["type"], "McpServerDegraded");
assert_eq!(payload["payload"]["alias"], "weather");
}
_ => panic!("expected Other"),
}
}
fn make_stream(events: Vec<WorkflowEvent>) -> akribes_sdk::RunStream {
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
let (tx, rx) = mpsc::unbounded_channel();
let handle: JoinHandle<()> = tokio::spawn(async move {
for evt in events {
if tx.send(Ok(evt)).is_err() {
break;
}
}
});
akribes_sdk::_test::make_run_stream("exec-1".into(), rx, handle)
}
#[tokio::test]
async fn output_resolves_on_workflow_end() {
let stream = make_stream(vec![
WorkflowEvent::Start { total_tasks: 2 },
WorkflowEvent::TaskStart {
task: "t".into(),
on_error: None,
},
WorkflowEvent::End {
output: serde_json::json!({"answer": 42}),
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let out = stream.output().await.expect("should resolve to output");
assert_eq!(out, serde_json::json!({"answer": 42}));
}
#[tokio::test]
async fn output_errors_on_workflow_error_with_classification() {
let stream = make_stream(vec![
WorkflowEvent::Start { total_tasks: 1 },
WorkflowEvent::Error {
message: "rate limited".into(),
kind: ErrorKind::RateLimit,
code: None,
},
]);
let err = stream.output().await.expect_err("should error");
assert!(
matches!(err, AkribesError::Transient { .. }),
"rate-limit ErrorKind should classify as Transient, got {err:?}",
);
let stream = make_stream(vec![WorkflowEvent::Error {
message: "script blew up".into(),
kind: ErrorKind::ScriptError,
code: None,
}]);
let err = stream.output().await.expect_err("should error");
assert!(
matches!(err, AkribesError::Script { .. }),
"ScriptError should classify as Script, got {err:?}",
);
let stream = make_stream(vec![WorkflowEvent::Error {
message: "no auth".into(),
kind: ErrorKind::AuthError,
code: None,
}]);
let err = stream.output().await.expect_err("should error");
assert!(
matches!(err, AkribesError::Fatal { .. }),
"AuthError should classify as Fatal, got {err:?}",
);
}
#[tokio::test]
async fn next_yields_events_in_order_and_stops_after_end() {
let mut stream = make_stream(vec![
WorkflowEvent::Start { total_tasks: 1 },
WorkflowEvent::End {
output: serde_json::Value::Null,
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let first = stream.next().await.unwrap().unwrap();
assert!(matches!(first, WorkflowEvent::Start { total_tasks: 1 }));
let end = stream.next().await.unwrap().unwrap();
assert!(matches!(end, WorkflowEvent::End { .. }));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn output_without_terminal_event_errors() {
let stream = make_stream(vec![
WorkflowEvent::Start { total_tasks: 1 },
WorkflowEvent::TaskStart {
task: "t".into(),
on_error: None,
},
]);
let err = stream.output().await.expect_err("should error");
assert!(matches!(err, AkribesError::Other(_)), "got {err:?}");
}
use std::sync::{Arc, Mutex};
#[tokio::test]
async fn on_output_fires_for_agent_chunks_in_order() {
let mut stream = make_stream(vec![
WorkflowEvent::Start { total_tasks: 1 },
WorkflowEvent::AgentChunk {
task: "t".into(),
agent: None,
task_id: "1".into(),
chunk: "hello ".into(),
},
WorkflowEvent::AgentChunk {
task: "t".into(),
agent: None,
task_id: "1".into(),
chunk: "world".into(),
},
WorkflowEvent::End {
output: serde_json::Value::Null,
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let chunks = Arc::new(Mutex::new(Vec::<String>::new()));
let chunks_cb = Arc::clone(&chunks);
stream.on_output(move |v| {
if let Some(s) = v.as_str() {
chunks_cb.lock().unwrap().push(s.to_string());
}
});
let _out = stream.output().await.unwrap();
let recorded = chunks.lock().unwrap().clone();
assert_eq!(recorded, vec!["hello ".to_string(), "world".to_string()]);
}
#[tokio::test]
async fn on_task_end_fires_with_typed_payload() {
let mut stream = make_stream(vec![
WorkflowEvent::TaskEnd {
task: "summarise".into(),
output: serde_json::json!({"answer": 42}),
duration: Duration::from_millis(100),
usage: None,
variant: akribes_sdk::TaskEndVariant::Success,
},
WorkflowEvent::End {
output: serde_json::Value::Null,
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let seen = Arc::new(Mutex::new(Vec::<String>::new()));
let seen_cb = Arc::clone(&seen);
stream.on_task_end(move |p| {
seen_cb.lock().unwrap().push(p.task.clone());
assert_eq!(p.output["answer"], 42);
assert_eq!(p.variant, akribes_sdk::TaskEndVariant::Success);
});
let _ = stream.output().await.unwrap();
assert_eq!(seen.lock().unwrap().clone(), vec!["summarise".to_string()]);
}
#[tokio::test]
async fn on_error_fires_before_termination() {
let mut stream = make_stream(vec![
WorkflowEvent::Start { total_tasks: 1 },
WorkflowEvent::Error {
message: "boom".into(),
kind: ErrorKind::ScriptError,
code: None,
},
]);
let err_seen = Arc::new(Mutex::new(Vec::<String>::new()));
let err_cb = Arc::clone(&err_seen);
stream.on_error(move |p| {
err_cb.lock().unwrap().push(p.message.clone());
assert_eq!(p.kind, ErrorKind::ScriptError);
});
let result = stream.output().await;
assert!(matches!(result, Err(AkribesError::Script { .. })));
assert_eq!(err_seen.lock().unwrap().clone(), vec!["boom".to_string()]);
}
#[tokio::test]
async fn on_any_fires_for_every_event_after_specific_callbacks() {
let mut stream = make_stream(vec![
WorkflowEvent::AgentChunk {
task: "t".into(),
agent: None,
task_id: "1".into(),
chunk: "x".into(),
},
WorkflowEvent::End {
output: serde_json::Value::Null,
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let log = Arc::new(Mutex::new(Vec::<String>::new()));
let l_specific = Arc::clone(&log);
stream.on_output(move |_| l_specific.lock().unwrap().push("output".into()));
let l_any = Arc::clone(&log);
stream.on_any(move |evt| {
let tag = match evt {
WorkflowEvent::AgentChunk { .. } => "any:chunk",
WorkflowEvent::End { .. } => "any:end",
_ => "any:other",
};
l_any.lock().unwrap().push(tag.into());
});
let _ = stream.output().await.unwrap();
assert_eq!(
log.lock().unwrap().clone(),
vec!["output".to_string(), "any:chunk".into(), "any:end".into()]
);
}
#[tokio::test]
async fn callbacks_fire_when_iterating_via_next() {
let mut stream = make_stream(vec![
WorkflowEvent::AgentChunk {
task: "t".into(),
agent: None,
task_id: "1".into(),
chunk: "ping".into(),
},
WorkflowEvent::End {
output: serde_json::Value::Null,
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let chunks = Arc::new(Mutex::new(Vec::<String>::new()));
let chunks_cb = Arc::clone(&chunks);
stream.on_output(move |v| {
chunks_cb
.lock()
.unwrap()
.push(v.as_str().unwrap_or("").into());
});
while let Some(_evt) = stream.next().await {}
assert_eq!(chunks.lock().unwrap().clone(), vec!["ping".to_string()]);
}
#[tokio::test]
async fn multiple_callbacks_per_category_run_in_registration_order() {
let mut stream = make_stream(vec![
WorkflowEvent::AgentChunk {
task: "t".into(),
agent: None,
task_id: "1".into(),
chunk: "x".into(),
},
WorkflowEvent::End {
output: serde_json::Value::Null,
duration: Duration::ZERO,
totals: Default::default(),
},
]);
let log = Arc::new(Mutex::new(Vec::<u8>::new()));
let l1 = Arc::clone(&log);
stream.on_output(move |_| l1.lock().unwrap().push(1));
let l2 = Arc::clone(&log);
stream.on_output(move |_| l2.lock().unwrap().push(2));
let l3 = Arc::clone(&log);
stream.on_output(move |_| l3.lock().unwrap().push(3));
let _ = stream.output().await.unwrap();
assert_eq!(log.lock().unwrap().clone(), vec![1u8, 2, 3]);
}
#[test]
fn hub_event_execution_carries_execution_id_and_seq() {
use akribes_sdk::HubEvent;
let wire = serde_json::json!({
"type": "Execution",
"payload": {
"project_id": 7,
"script_name": "summarize",
"execution_id": "exec-abc",
"seq": 42,
"at": "2026-05-20T12:34:56.789Z",
"event": { "type": "WorkflowStart", "payload": 3 }
}
});
let parsed: HubEvent = serde_json::from_value(wire).expect("deserialize");
match parsed {
HubEvent::Execution {
project_id,
script_name,
execution_id,
seq,
at,
..
} => {
assert_eq!(project_id, 7);
assert_eq!(script_name, "summarize");
assert_eq!(execution_id.as_deref(), Some("exec-abc"));
assert_eq!(seq, Some(42));
assert_eq!(at.as_deref(), Some("2026-05-20T12:34:56.789Z"));
}
other => panic!("expected Execution, got {other:?}"),
}
}
#[test]
fn hub_event_execution_back_compat_missing_id_and_seq() {
use akribes_sdk::HubEvent;
let wire = serde_json::json!({
"type": "Execution",
"payload": {
"project_id": 1,
"script_name": "noop",
"event": { "type": "WorkflowStart", "payload": 0 }
}
});
let parsed: HubEvent = serde_json::from_value(wire).expect("deserialize");
if let HubEvent::Execution {
execution_id,
seq,
at,
..
} = parsed
{
assert!(execution_id.is_none());
assert!(seq.is_none());
assert!(at.is_none());
} else {
panic!("expected Execution");
}
}