akribes-sdk 0.22.2

Rust client SDK for the Akribes workflow server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
//! Integration tests for `WorkflowEvent` conversion and `RunStream`.
//!
//! Conversion tests live in the module's `#[cfg(test)]` block — this file
//! focuses on end-to-end behaviour: the `RunStream` lifecycle, terminal
//! detection, and error classification through `output()`.

use std::time::Duration;

use akribes_sdk::{AkribesError, EventCategory, WorkflowEvent};
use akribes_types::error::ErrorKind;
use akribes_types::event::EngineEvent;
use akribes_types::value::Value;

// ── Conversion spot-checks ──────────────────────────────────────────────────

#[test]
fn conversion_round_trip_covers_all_categories() {
    let progress_events = [
        EngineEvent::WorkflowStart(3),
        EngineEvent::WorkflowEnd(akribes_types::event::WorkflowEndPayload::new(Value::Null)),
        EngineEvent::TaskStart("t".into(), None),
        EngineEvent::TaskEnd {
            task: "t".into(),
            on_error_label: None,
            value: Value::Null,
            value_type: None,
            duration: Duration::ZERO,
            attempt: 1,
            usage: None,
            variant: akribes_types::event::TaskEndVariant::Success,
        },
    ];
    for evt in progress_events {
        let wf: WorkflowEvent = evt.into();
        assert_eq!(wf.category(), EventCategory::Progress);
    }

    let chunk: WorkflowEvent = EngineEvent::AgentOutput {
        task_name: "t".into(),
        agent_name: None,
        task_id: "1".into(),
        schema_type: None,
        chunk: "x".into(),
    }
    .into();
    assert_eq!(chunk.category(), EventCategory::Output);

    let err: WorkflowEvent = EngineEvent::error_kind(ErrorKind::ScriptError, "boom").into();
    assert_eq!(err.category(), EventCategory::Error);
}

#[test]
fn unknown_long_tail_variant_becomes_other_with_type_name() {
    let evt: WorkflowEvent = EngineEvent::McpServerDegraded {
        alias: "weather".into(),
        reason: "timeout".into(),
    }
    .into();
    match evt {
        WorkflowEvent::Other { type_name, payload } => {
            assert_eq!(type_name, "McpServerDegraded");
            assert_eq!(payload["type"], "McpServerDegraded");
            assert_eq!(payload["payload"]["alias"], "weather");
        }
        _ => panic!("expected Other"),
    }
}

// ── RunStream terminal detection ────────────────────────────────────────────
//
// These tests drive the *translation + terminal-detection* layer of
// RunStream directly, without spinning up a mock SSE server. Since that
// layer sits above the SSE byte-parser (which is already covered by the
// existing mockito-backed tests in `lib.rs`), it's enough to show that
// `output()` correctly drains, classifies the terminal event, and yields
// `WorkflowEvent::Error` to in-flight `next()` calls too.

/// Drain helper: construct a RunStream from pre-made WorkflowEvents,
/// mirroring what the real filter task produces after SSE translation.
fn make_stream(events: Vec<WorkflowEvent>) -> akribes_sdk::RunStream {
    use tokio::sync::mpsc;
    use tokio::task::JoinHandle;

    let (tx, rx) = mpsc::unbounded_channel();
    let handle: JoinHandle<()> = tokio::spawn(async move {
        for evt in events {
            if tx.send(Ok(evt)).is_err() {
                break;
            }
        }
    });
    // The public constructor is pub(crate); access it via the crate-internal
    // path by going through the `start_run_stream` helper would require a
    // live server. Instead we test via the public `output()` + `next()` API
    // against a stream built from the internal helper.
    akribes_sdk::_test::make_run_stream("exec-1".into(), rx, handle)
}

#[tokio::test]
async fn output_resolves_on_workflow_end() {
    let stream = make_stream(vec![
        WorkflowEvent::Start { total_tasks: 2 },
        WorkflowEvent::TaskStart {
            task: "t".into(),
            on_error: None,
        },
        WorkflowEvent::End {
            output: serde_json::json!({"answer": 42}),
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);
    let out = stream.output().await.expect("should resolve to output");
    assert_eq!(out, serde_json::json!({"answer": 42}));
}

#[tokio::test]
async fn output_errors_on_workflow_error_with_classification() {
    let stream = make_stream(vec![
        WorkflowEvent::Start { total_tasks: 1 },
        WorkflowEvent::Error {
            message: "rate limited".into(),
            kind: ErrorKind::RateLimit,
            code: None,
        },
    ]);
    let err = stream.output().await.expect_err("should error");
    assert!(
        matches!(err, AkribesError::Transient { .. }),
        "rate-limit ErrorKind should classify as Transient, got {err:?}",
    );

    let stream = make_stream(vec![WorkflowEvent::Error {
        message: "script blew up".into(),
        kind: ErrorKind::ScriptError,
        code: None,
    }]);
    let err = stream.output().await.expect_err("should error");
    assert!(
        matches!(err, AkribesError::Script { .. }),
        "ScriptError should classify as Script, got {err:?}",
    );

    let stream = make_stream(vec![WorkflowEvent::Error {
        message: "no auth".into(),
        kind: ErrorKind::AuthError,
        code: None,
    }]);
    let err = stream.output().await.expect_err("should error");
    assert!(
        matches!(err, AkribesError::Fatal { .. }),
        "AuthError should classify as Fatal, got {err:?}",
    );
}

#[tokio::test]
async fn next_yields_events_in_order_and_stops_after_end() {
    let mut stream = make_stream(vec![
        WorkflowEvent::Start { total_tasks: 1 },
        WorkflowEvent::End {
            output: serde_json::Value::Null,
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);

    let first = stream.next().await.unwrap().unwrap();
    assert!(matches!(first, WorkflowEvent::Start { total_tasks: 1 }));

    let end = stream.next().await.unwrap().unwrap();
    assert!(matches!(end, WorkflowEvent::End { .. }));

    // After End, the stream terminates — no more events even if the
    // underlying channel has more buffered (it doesn't here, but the
    // `terminated` flag short-circuits).
    assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn output_without_terminal_event_errors() {
    let stream = make_stream(vec![
        WorkflowEvent::Start { total_tasks: 1 },
        WorkflowEvent::TaskStart {
            task: "t".into(),
            on_error: None,
        },
        // Channel closes without an End/Error — shouldn't happen on a real
        // server, but RunStream must not hang.
    ]);
    let err = stream.output().await.expect_err("should error");
    assert!(matches!(err, AkribesError::Other(_)), "got {err:?}");
}

// ── Callback API ────────────────────────────────────────────────────────────
//
// Mirrors the TS/Python `.on.<category>()` surface. The iterator stays the
// canonical API; callbacks are convenience sinks that fire on the polling
// thread as events arrive. Tests below assert (a) callbacks fire in
// registration order, (b) category routing is correct, (c) `on_any` sees
// every event after category-specific callbacks, and (d) callbacks do not
// disturb the iterator's own behaviour.

use std::sync::{Arc, Mutex};

#[tokio::test]
async fn on_output_fires_for_agent_chunks_in_order() {
    let mut stream = make_stream(vec![
        WorkflowEvent::Start { total_tasks: 1 },
        WorkflowEvent::AgentChunk {
            task: "t".into(),
            agent: None,
            task_id: "1".into(),
            chunk: "hello ".into(),
        },
        WorkflowEvent::AgentChunk {
            task: "t".into(),
            agent: None,
            task_id: "1".into(),
            chunk: "world".into(),
        },
        WorkflowEvent::End {
            output: serde_json::Value::Null,
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);
    let chunks = Arc::new(Mutex::new(Vec::<String>::new()));
    let chunks_cb = Arc::clone(&chunks);
    stream.on_output(move |v| {
        if let Some(s) = v.as_str() {
            chunks_cb.lock().unwrap().push(s.to_string());
        }
    });

    let _out = stream.output().await.unwrap();
    let recorded = chunks.lock().unwrap().clone();
    assert_eq!(recorded, vec!["hello ".to_string(), "world".to_string()]);
}

#[tokio::test]
async fn on_task_end_fires_with_typed_payload() {
    let mut stream = make_stream(vec![
        WorkflowEvent::TaskEnd {
            task: "summarise".into(),
            output: serde_json::json!({"answer": 42}),
            duration: Duration::from_millis(100),
            usage: None,
            variant: akribes_sdk::TaskEndVariant::Success,
        },
        WorkflowEvent::End {
            output: serde_json::Value::Null,
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);
    let seen = Arc::new(Mutex::new(Vec::<String>::new()));
    let seen_cb = Arc::clone(&seen);
    stream.on_task_end(move |p| {
        seen_cb.lock().unwrap().push(p.task.clone());
        assert_eq!(p.output["answer"], 42);
        assert_eq!(p.variant, akribes_sdk::TaskEndVariant::Success);
    });
    let _ = stream.output().await.unwrap();
    assert_eq!(seen.lock().unwrap().clone(), vec!["summarise".to_string()]);
}

#[tokio::test]
async fn on_error_fires_before_termination() {
    let mut stream = make_stream(vec![
        WorkflowEvent::Start { total_tasks: 1 },
        WorkflowEvent::Error {
            message: "boom".into(),
            kind: ErrorKind::ScriptError,
            code: None,
        },
    ]);
    let err_seen = Arc::new(Mutex::new(Vec::<String>::new()));
    let err_cb = Arc::clone(&err_seen);
    stream.on_error(move |p| {
        err_cb.lock().unwrap().push(p.message.clone());
        assert_eq!(p.kind, ErrorKind::ScriptError);
    });

    let result = stream.output().await;
    assert!(matches!(result, Err(AkribesError::Script { .. })));
    assert_eq!(err_seen.lock().unwrap().clone(), vec!["boom".to_string()]);
}

#[tokio::test]
async fn on_any_fires_for_every_event_after_specific_callbacks() {
    // Order requirement: for each event, category-specific callbacks fire
    // first, then `on_any`. The recorded sequence interleaves "specific"
    // and "any" markers in that order so the test catches a regression
    // where a future refactor re-orders the dispatch.
    let mut stream = make_stream(vec![
        WorkflowEvent::AgentChunk {
            task: "t".into(),
            agent: None,
            task_id: "1".into(),
            chunk: "x".into(),
        },
        WorkflowEvent::End {
            output: serde_json::Value::Null,
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);
    let log = Arc::new(Mutex::new(Vec::<String>::new()));
    let l_specific = Arc::clone(&log);
    stream.on_output(move |_| l_specific.lock().unwrap().push("output".into()));
    let l_any = Arc::clone(&log);
    stream.on_any(move |evt| {
        let tag = match evt {
            WorkflowEvent::AgentChunk { .. } => "any:chunk",
            WorkflowEvent::End { .. } => "any:end",
            _ => "any:other",
        };
        l_any.lock().unwrap().push(tag.into());
    });

    let _ = stream.output().await.unwrap();
    assert_eq!(
        log.lock().unwrap().clone(),
        vec!["output".to_string(), "any:chunk".into(), "any:end".into()]
    );
}

#[tokio::test]
async fn callbacks_fire_when_iterating_via_next() {
    // Callbacks must fire from `next()` too, not just from `output()` —
    // both share the same dispatch path.
    let mut stream = make_stream(vec![
        WorkflowEvent::AgentChunk {
            task: "t".into(),
            agent: None,
            task_id: "1".into(),
            chunk: "ping".into(),
        },
        WorkflowEvent::End {
            output: serde_json::Value::Null,
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);
    let chunks = Arc::new(Mutex::new(Vec::<String>::new()));
    let chunks_cb = Arc::clone(&chunks);
    stream.on_output(move |v| {
        chunks_cb
            .lock()
            .unwrap()
            .push(v.as_str().unwrap_or("").into());
    });

    while let Some(_evt) = stream.next().await {}
    assert_eq!(chunks.lock().unwrap().clone(), vec!["ping".to_string()]);
}

#[tokio::test]
async fn multiple_callbacks_per_category_run_in_registration_order() {
    let mut stream = make_stream(vec![
        WorkflowEvent::AgentChunk {
            task: "t".into(),
            agent: None,
            task_id: "1".into(),
            chunk: "x".into(),
        },
        WorkflowEvent::End {
            output: serde_json::Value::Null,
            duration: Duration::ZERO,
            totals: Default::default(),
        },
    ]);
    let log = Arc::new(Mutex::new(Vec::<u8>::new()));
    let l1 = Arc::clone(&log);
    stream.on_output(move |_| l1.lock().unwrap().push(1));
    let l2 = Arc::clone(&log);
    stream.on_output(move |_| l2.lock().unwrap().push(2));
    let l3 = Arc::clone(&log);
    stream.on_output(move |_| l3.lock().unwrap().push(3));

    let _ = stream.output().await.unwrap();
    assert_eq!(log.lock().unwrap().clone(), vec![1u8, 2, 3]);
}

// ── HubEvent wire shape: execution_id + seq round-trip ──────────────────────
//
// Until the SDK carried these on the `HubEvent::Execution` variant,
// concurrent runs of the same script could deliver each other's events
// into a `RunStream` and resolve `.output()` with the wrong value. Lock
// the wire shape: both fields are `#[serde(default)]` so old servers
// that don't stamp them still parse, but the new fields must round-trip
// when present.

#[test]
fn hub_event_execution_carries_execution_id_and_seq() {
    use akribes_sdk::HubEvent;
    let wire = serde_json::json!({
        "type": "Execution",
        "payload": {
            "project_id": 7,
            "script_name": "summarize",
            "execution_id": "exec-abc",
            "seq": 42,
            "at": "2026-05-20T12:34:56.789Z",
            "event": { "type": "WorkflowStart", "payload": 3 }
        }
    });
    let parsed: HubEvent = serde_json::from_value(wire).expect("deserialize");
    match parsed {
        HubEvent::Execution {
            project_id,
            script_name,
            execution_id,
            seq,
            at,
            ..
        } => {
            assert_eq!(project_id, 7);
            assert_eq!(script_name, "summarize");
            assert_eq!(execution_id.as_deref(), Some("exec-abc"));
            assert_eq!(seq, Some(42));
            assert_eq!(at.as_deref(), Some("2026-05-20T12:34:56.789Z"));
        }
        other => panic!("expected Execution, got {other:?}"),
    }
}

#[test]
fn hub_event_execution_back_compat_missing_id_and_seq() {
    // Old servers that don't stamp execution_id/seq still parse cleanly
    // (back-compat). The fields default to `None` and the filter falls
    // through (cross-execution contamination only resurfaces against a
    // server old enough to predate the broadcast envelope changes).
    use akribes_sdk::HubEvent;
    let wire = serde_json::json!({
        "type": "Execution",
        "payload": {
            "project_id": 1,
            "script_name": "noop",
            "event": { "type": "WorkflowStart", "payload": 0 }
        }
    });
    let parsed: HubEvent = serde_json::from_value(wire).expect("deserialize");
    if let HubEvent::Execution {
        execution_id,
        seq,
        at,
        ..
    } = parsed
    {
        assert!(execution_id.is_none());
        assert!(seq.is_none());
        assert!(at.is_none());
    } else {
        panic!("expected Execution");
    }
}