use akribes_sdk::{EventCategory, RuntimeEvent, WorkflowEvent, runtime::RuntimeErrorKind};
use serde_json::json;
#[test]
fn runtime_start_envelope_decodes_to_typed_arm() {
let envelope = json!({
"type": "RuntimeStart",
"payload": {
"task_name": "analyse_data",
"runtime_name": "run_python",
"language": "python",
},
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
match evt {
WorkflowEvent::RuntimeStart {
task_name,
runtime_name,
language,
} => {
assert_eq!(task_name, "analyse_data");
assert_eq!(runtime_name, "run_python");
assert_eq!(language, "python");
}
other => panic!("expected RuntimeStart, got {other:?}"),
}
}
#[test]
fn runtime_stdout_envelope_decodes_to_typed_arm() {
let envelope = json!({
"type": "RuntimeStdout",
"payload": {"task_name": "analyse_data", "chunk": "hello, world\n"},
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
match evt {
WorkflowEvent::RuntimeStdout { task_name, chunk } => {
assert_eq!(task_name, "analyse_data");
assert_eq!(chunk, "hello, world\n");
}
other => panic!("expected RuntimeStdout, got {other:?}"),
}
}
#[test]
fn runtime_stderr_envelope_decodes_to_typed_arm() {
let envelope = json!({
"type": "RuntimeStderr",
"payload": {"task_name": "t", "chunk": "DeprecationWarning: foo\n"},
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
match evt {
WorkflowEvent::RuntimeStderr { task_name, chunk } => {
assert_eq!(task_name, "t");
assert_eq!(chunk, "DeprecationWarning: foo\n");
}
other => panic!("expected RuntimeStderr, got {other:?}"),
}
}
#[test]
fn runtime_end_envelope_decodes_to_typed_arm() {
let envelope = json!({
"type": "RuntimeEnd",
"payload": {"task_name": "t", "exit_code": 0, "duration_ms": 1234},
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
match evt {
WorkflowEvent::RuntimeEnd {
task_name,
exit_code,
duration_ms,
} => {
assert_eq!(task_name, "t");
assert_eq!(exit_code, 0);
assert_eq!(duration_ms, 1234);
}
other => panic!("expected RuntimeEnd, got {other:?}"),
}
}
#[test]
fn runtime_end_envelope_preserves_negative_exit_code() {
let envelope = json!({
"type": "RuntimeEnd",
"payload": {"task_name": "t", "exit_code": -9, "duration_ms": 50},
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
if let WorkflowEvent::RuntimeEnd { exit_code, .. } = evt {
assert_eq!(exit_code, -9);
} else {
panic!("expected RuntimeEnd");
}
}
#[test]
fn runtime_error_envelope_decodes_to_typed_arm() {
let envelope = json!({
"type": "RuntimeError",
"payload": {
"task_name": "t",
"kind": "Timeout",
"message": "execution exceeded 30s timeout",
},
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
match evt {
WorkflowEvent::RuntimeError {
task_name,
kind,
message,
} => {
assert_eq!(task_name, "t");
assert_eq!(kind, "Timeout");
assert_eq!(message, "execution exceeded 30s timeout");
assert_eq!(
RuntimeErrorKind::from_wire(&kind),
RuntimeErrorKind::Timeout
);
}
other => panic!("expected RuntimeError, got {other:?}"),
}
}
#[test]
fn cancelled_runtime_error_envelope_round_trips() {
let envelope = json!({
"type": "RuntimeError",
"payload": {
"task_name": "t",
"kind": "Cancelled",
"message": "runtime call cancelled before completion",
},
});
let evt = WorkflowEvent::from_envelope_json(envelope.clone()).expect("decode");
match &evt {
WorkflowEvent::RuntimeError { kind, .. } => {
assert_eq!(kind, "Cancelled");
assert_eq!(
RuntimeErrorKind::from_wire(kind),
RuntimeErrorKind::Cancelled
);
}
other => panic!("expected RuntimeError, got {other:?}"),
}
let payload = envelope["payload"].clone();
let typed = RuntimeEvent::RuntimeError(akribes_sdk::RuntimeErrorPayload {
task_name: payload["task_name"].as_str().unwrap().into(),
kind: payload["kind"].as_str().unwrap().into(),
message: payload["message"].as_str().unwrap().into(),
});
let re_encoded = serde_json::to_value(&typed).unwrap();
assert_eq!(re_encoded, envelope);
}
#[test]
fn runtime_start_and_end_route_to_progress_category() {
let start = WorkflowEvent::from_envelope_json(json!({
"type": "RuntimeStart",
"payload": {"task_name": "t", "runtime_name": "r", "language": "python"},
}))
.unwrap();
assert_eq!(start.category(), EventCategory::Progress);
let end = WorkflowEvent::from_envelope_json(json!({
"type": "RuntimeEnd",
"payload": {"task_name": "t", "exit_code": 0, "duration_ms": 0},
}))
.unwrap();
assert_eq!(end.category(), EventCategory::Progress);
}
#[test]
fn runtime_stdout_and_stderr_route_to_output_category() {
let stdout = WorkflowEvent::from_envelope_json(json!({
"type": "RuntimeStdout",
"payload": {"task_name": "t", "chunk": "x"},
}))
.unwrap();
assert_eq!(stdout.category(), EventCategory::Output);
let stderr = WorkflowEvent::from_envelope_json(json!({
"type": "RuntimeStderr",
"payload": {"task_name": "t", "chunk": "x"},
}))
.unwrap();
assert_eq!(stderr.category(), EventCategory::Output);
}
#[test]
fn runtime_error_routes_to_error_category() {
let err = WorkflowEvent::from_envelope_json(json!({
"type": "RuntimeError",
"payload": {"task_name": "t", "kind": "OomKilled", "message": ""},
}))
.unwrap();
assert_eq!(err.category(), EventCategory::Error);
}
#[test]
fn non_runtime_envelope_falls_through_to_engine_decoder() {
let envelope = json!({
"type": "TaskStart",
"payload": ["summarise", null],
});
let evt = WorkflowEvent::from_envelope_json(envelope).expect("decode");
match evt {
WorkflowEvent::TaskStart { task, on_error } => {
assert_eq!(task, "summarise");
assert!(on_error.is_none());
}
other => panic!("expected TaskStart, got {other:?}"),
}
}
#[test]
fn invalid_runtime_payload_surfaces_runtime_decode_error() {
use akribes_sdk::EnvelopeDecodeError;
let envelope = json!({
"type": "RuntimeStart",
"payload": {"task_name": "t"}, });
match WorkflowEvent::from_envelope_json(envelope) {
Err(EnvelopeDecodeError::Runtime(_)) => {}
other => panic!("expected Runtime decode error, got {other:?}"),
}
}
#[test]
fn mixed_stream_decodes_in_order() {
let envelopes = [
json!({"type": "WorkflowStart", "payload": 1}),
json!({"type": "TaskStart", "payload": ["analyse", null]}),
json!({
"type": "RuntimeStart",
"payload": {
"task_name": "analyse",
"runtime_name": "run_python",
"language": "python",
},
}),
json!({
"type": "RuntimeStdout",
"payload": {"task_name": "analyse", "chunk": "hello "},
}),
json!({
"type": "RuntimeStdout",
"payload": {"task_name": "analyse", "chunk": "world\n"},
}),
json!({
"type": "RuntimeStderr",
"payload": {"task_name": "analyse", "chunk": "DeprecationWarning\n"},
}),
json!({
"type": "RuntimeEnd",
"payload": {"task_name": "analyse", "exit_code": 0, "duration_ms": 42},
}),
json!({
"type": "TaskEnd",
"payload": {
"task": "analyse",
"on_error_label": null,
"value": "Null",
"value_type": null,
"duration": {"secs": 0, "nanos": 42_000_000},
"attempt": 1,
"usage": null,
},
}),
json!({"type": "WorkflowEnd", "payload": "Null"}),
];
let decoded: Vec<WorkflowEvent> = envelopes
.iter()
.cloned()
.map(|e| WorkflowEvent::from_envelope_json(e).expect("decode"))
.collect();
let tags: Vec<&'static str> = decoded
.iter()
.map(|e| match e {
WorkflowEvent::Start { .. } => "Start",
WorkflowEvent::TaskStart { .. } => "TaskStart",
WorkflowEvent::RuntimeStart { .. } => "RuntimeStart",
WorkflowEvent::RuntimeStdout { .. } => "RuntimeStdout",
WorkflowEvent::RuntimeStderr { .. } => "RuntimeStderr",
WorkflowEvent::RuntimeEnd { .. } => "RuntimeEnd",
WorkflowEvent::TaskEnd { .. } => "TaskEnd",
WorkflowEvent::End { .. } => "End",
other => panic!("unexpected variant in mixed stream: {other:?}"),
})
.collect();
assert_eq!(
tags,
vec![
"Start",
"TaskStart",
"RuntimeStart",
"RuntimeStdout",
"RuntimeStdout",
"RuntimeStderr",
"RuntimeEnd",
"TaskEnd",
"End",
]
);
let stdout_chunks: Vec<&str> = decoded
.iter()
.filter_map(|e| match e {
WorkflowEvent::RuntimeStdout { chunk, .. } => Some(chunk.as_str()),
_ => None,
})
.collect();
assert_eq!(stdout_chunks, vec!["hello ", "world\n"]);
let runtime_end = decoded.iter().find_map(|e| match e {
WorkflowEvent::RuntimeEnd {
task_name,
exit_code,
duration_ms,
} => Some((task_name.as_str(), *exit_code, *duration_ms)),
_ => None,
});
assert_eq!(runtime_end, Some(("analyse", 0, 42)));
}
#[test]
fn runtime_event_serializes_to_canonical_envelope() {
let cases = [
(
RuntimeEvent::RuntimeStart(akribes_sdk::RuntimeStartPayload {
task_name: "t".into(),
runtime_name: "r".into(),
language: "python".into(),
}),
json!({
"type": "RuntimeStart",
"payload": {"task_name": "t", "runtime_name": "r", "language": "python"},
}),
),
(
RuntimeEvent::RuntimeStdout(akribes_sdk::RuntimeStdoutPayload {
task_name: "t".into(),
chunk: "x".into(),
}),
json!({
"type": "RuntimeStdout",
"payload": {"task_name": "t", "chunk": "x"},
}),
),
(
RuntimeEvent::RuntimeStderr(akribes_sdk::RuntimeStderrPayload {
task_name: "t".into(),
chunk: "x".into(),
}),
json!({
"type": "RuntimeStderr",
"payload": {"task_name": "t", "chunk": "x"},
}),
),
(
RuntimeEvent::RuntimeEnd(akribes_sdk::RuntimeEndPayload {
task_name: "t".into(),
exit_code: 1,
duration_ms: 99,
}),
json!({
"type": "RuntimeEnd",
"payload": {"task_name": "t", "exit_code": 1, "duration_ms": 99},
}),
),
(
RuntimeEvent::RuntimeError(akribes_sdk::RuntimeErrorPayload {
task_name: "t".into(),
kind: "OomKilled".into(),
message: "container exceeded 512MB".into(),
}),
json!({
"type": "RuntimeError",
"payload": {
"task_name": "t",
"kind": "OomKilled",
"message": "container exceeded 512MB",
},
}),
),
];
for (evt, expected) in cases {
assert_eq!(serde_json::to_value(&evt).unwrap(), expected);
let back: RuntimeEvent = serde_json::from_value(expected).unwrap();
assert_eq!(back, evt);
}
}