1use std::collections::HashMap;
14use std::time::Duration;
15
16use akribes_types::event::{EngineEvent, TokenUsage};
17
18use crate::models::engine_event_type_name;
19use crate::runtime::{
20 RuntimeEndPayload, RuntimeErrorPayload, RuntimeEvent, RuntimeStartPayload,
21 RuntimeStderrPayload, RuntimeStdoutPayload,
22};
23use crate::suspend::SuspendTrigger;
24use crate::task_end::TaskEndVariant;
25
26#[derive(Debug, Clone)]
34pub enum WorkflowEvent {
35 Start {
36 total_tasks: usize,
37 },
38 End {
39 output: serde_json::Value,
40 duration: Duration,
41 totals: akribes_types::event::WorkflowTotals,
49 },
50 TaskStart {
51 task: String,
52 on_error: Option<String>,
53 },
54 TaskEnd {
55 task: String,
56 output: serde_json::Value,
57 duration: Duration,
58 usage: Option<TokenUsage>,
59 variant: TaskEndVariant,
65 },
66 AgentChunk {
67 task: String,
68 agent: Option<String>,
69 task_id: String,
70 chunk: String,
71 },
72 ToolCallStart {
73 task: String,
74 tool: String,
75 server: String,
76 input: serde_json::Value,
77 },
78 ToolCallEnd {
79 task: String,
80 tool: String,
81 output: serde_json::Value,
82 duration: Duration,
83 },
84 Checkpoint {
85 name: String,
86 token: String,
87 prompt: String,
88 schema: serde_json::Value,
89 timeout_secs: Option<u64>,
90 trigger: SuspendTrigger,
96 },
97 ToolApproval {
98 token: String,
99 tool_ref: String,
100 args: serde_json::Value,
101 execution_id: Option<String>,
102 node_id: Option<u64>,
103 },
104 Breakpoint {
105 token: String,
106 node_id: u64,
107 env: HashMap<String, serde_json::Value>,
108 },
109 Error {
110 message: String,
111 kind: akribes_types::error::ErrorKind,
112 code: Option<String>,
116 },
117 ValidationFailure {
126 task_name: String,
127 attempt: u32,
129 model_response: String,
131 missing_fields: Vec<String>,
134 extra_fields: Vec<String>,
136 type_errors: Vec<String>,
139 stop_reason: Option<String>,
142 },
143 RuntimeStart {
148 task_name: String,
149 runtime_name: String,
150 language: String,
154 },
155 RuntimeStdout {
158 task_name: String,
159 chunk: String,
160 },
161 RuntimeStderr {
163 task_name: String,
164 chunk: String,
165 },
166 RuntimeEnd {
171 task_name: String,
172 exit_code: i32,
173 duration_ms: u64,
174 },
175 RuntimeError {
181 task_name: String,
182 kind: String,
183 message: String,
184 },
185 Other {
193 type_name: String,
194 payload: serde_json::Value,
195 },
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
201pub enum EventCategory {
202 Progress,
203 Output,
204 Tool,
205 Suspend,
206 Error,
207 Other,
208}
209
210impl WorkflowEvent {
211 pub fn category(&self) -> EventCategory {
213 match self {
214 Self::Start { .. }
215 | Self::End { .. }
216 | Self::TaskStart { .. }
217 | Self::TaskEnd { .. } => EventCategory::Progress,
218 Self::RuntimeStart { .. } | Self::RuntimeEnd { .. } => EventCategory::Progress,
223 Self::AgentChunk { .. } => EventCategory::Output,
224 Self::RuntimeStdout { .. } | Self::RuntimeStderr { .. } => EventCategory::Output,
228 Self::ValidationFailure { .. } => EventCategory::Output,
229 Self::ToolCallStart { .. } | Self::ToolCallEnd { .. } => EventCategory::Tool,
230 Self::Checkpoint { .. } | Self::ToolApproval { .. } | Self::Breakpoint { .. } => {
231 EventCategory::Suspend
232 }
233 Self::Error { .. } => EventCategory::Error,
234 Self::RuntimeError { .. } => EventCategory::Error,
239 Self::Other { .. } => EventCategory::Other,
240 }
241 }
242
243 pub(crate) fn is_terminal(&self) -> bool {
250 matches!(self, Self::End { .. } | Self::Error { .. })
251 }
252}
253
254impl From<EngineEvent> for WorkflowEvent {
255 fn from(evt: EngineEvent) -> Self {
256 let evt = evt.flatten_subscript_chain();
262 match evt {
263 EngineEvent::WorkflowStart(total_tasks) => Self::Start { total_tasks },
264
265 EngineEvent::WorkflowEnd(payload) => Self::End {
266 output: payload.value.to_json(),
267 duration: Duration::ZERO,
273 totals: payload.totals,
274 },
275
276 EngineEvent::TaskStart(task, on_error) => Self::TaskStart { task, on_error },
277
278 EngineEvent::TaskEnd {
279 task,
280 on_error_label: _,
281 value,
282 value_type: _,
283 duration,
284 attempt: _,
285 usage,
286 variant,
287 } => Self::TaskEnd {
288 task,
289 output: value.to_json(),
290 duration,
291 usage,
292 variant: variant.into(),
293 },
294
295 EngineEvent::AgentOutput {
296 task_name,
297 agent_name,
298 task_id,
299 schema_type: _,
300 chunk,
301 } => Self::AgentChunk {
302 task: task_name,
303 agent: agent_name,
304 task_id,
305 chunk,
306 },
307
308 EngineEvent::ToolCallStart {
309 task_name,
310 tool_name,
311 server_name,
312 input,
313 ..
314 } => Self::ToolCallStart {
315 task: task_name,
316 tool: tool_name,
317 server: server_name,
318 input,
319 },
320
321 EngineEvent::ToolCallEnd {
322 task_name,
323 tool_name,
324 output,
325 duration,
326 ..
327 } => Self::ToolCallEnd {
328 task: task_name,
329 tool: tool_name,
330 output,
331 duration,
332 },
333
334 EngineEvent::Suspended {
335 checkpoint_name,
336 token,
337 prompt,
338 schema,
339 actor_hint: _,
340 timeout_secs,
341 trigger,
342 loop_context: _,
349 } => Self::Checkpoint {
350 name: checkpoint_name,
351 token,
352 prompt,
353 schema,
354 timeout_secs,
355 trigger: trigger.into(),
356 },
357
358 EngineEvent::ToolApprovalPending {
359 execution_id,
360 node_id,
361 token,
362 tool_ref,
363 args,
364 } => Self::ToolApproval {
365 token,
366 tool_ref,
367 args,
368 execution_id,
369 node_id,
370 },
371
372 EngineEvent::Breakpoint {
373 node_id,
374 span: _,
375 token,
376 env_snapshot,
377 } => Self::Breakpoint {
378 token,
379 node_id: node_id as u64,
380 env: env_snapshot
381 .into_iter()
382 .map(|(k, v)| (k, v.to_json()))
383 .collect(),
384 },
385
386 EngineEvent::Error {
392 message,
393 kind,
394 code,
395 ..
396 } => Self::Error {
397 message,
398 kind,
399 code: Some(code.as_wire().to_string()),
400 },
401
402 EngineEvent::ValidationFailure {
403 task_name,
404 attempt,
405 model_response,
406 missing_fields,
407 extra_fields,
408 type_errors,
409 stop_reason,
410 truncated: _,
411 total_length: _,
412 } => Self::ValidationFailure {
413 task_name,
414 attempt,
415 model_response,
416 missing_fields,
417 extra_fields,
418 type_errors,
419 stop_reason,
420 },
421
422 other => {
428 let type_name = engine_event_type_name(&other).to_string();
429 let payload = serde_json::to_value(&other).unwrap_or(serde_json::Value::Null);
430 Self::Other { type_name, payload }
431 }
432 }
433 }
434}
435
436impl From<RuntimeEvent> for WorkflowEvent {
445 fn from(evt: RuntimeEvent) -> Self {
446 match evt {
447 RuntimeEvent::RuntimeStart(RuntimeStartPayload {
448 task_name,
449 runtime_name,
450 language,
451 }) => Self::RuntimeStart {
452 task_name,
453 runtime_name,
454 language,
455 },
456 RuntimeEvent::RuntimeStdout(RuntimeStdoutPayload { task_name, chunk }) => {
457 Self::RuntimeStdout { task_name, chunk }
458 }
459 RuntimeEvent::RuntimeStderr(RuntimeStderrPayload { task_name, chunk }) => {
460 Self::RuntimeStderr { task_name, chunk }
461 }
462 RuntimeEvent::RuntimeEnd(RuntimeEndPayload {
463 task_name,
464 exit_code,
465 duration_ms,
466 }) => Self::RuntimeEnd {
467 task_name,
468 exit_code,
469 duration_ms,
470 },
471 RuntimeEvent::RuntimeError(RuntimeErrorPayload {
472 task_name,
473 kind,
474 message,
475 }) => Self::RuntimeError {
476 task_name,
477 kind,
478 message,
479 },
480 }
481 }
482}
483
484#[derive(Debug, thiserror::Error)]
504pub enum EnvelopeDecodeError {
505 #[error("invalid Runtime envelope: {0}")]
508 Runtime(#[source] serde_json::Error),
509 #[error("failed to decode engine event: {0}")]
512 Engine(#[source] serde_json::Error),
513}
514
515impl WorkflowEvent {
516 pub fn from_envelope_json(value: serde_json::Value) -> Result<Self, EnvelopeDecodeError> {
532 let type_tag = value.get("type").and_then(|t| t.as_str()).unwrap_or("");
536 if matches!(
537 type_tag,
538 "RuntimeStart" | "RuntimeStdout" | "RuntimeStderr" | "RuntimeEnd" | "RuntimeError"
539 ) {
540 let runtime: RuntimeEvent =
541 serde_json::from_value(value).map_err(EnvelopeDecodeError::Runtime)?;
542 return Ok(runtime.into());
543 }
544 let engine: EngineEvent =
545 serde_json::from_value(value).map_err(EnvelopeDecodeError::Engine)?;
546 Ok(engine.into())
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use akribes_types::ast::Span;
554 use akribes_types::error::ErrorKind;
555 use akribes_types::value::Value;
556
557 fn span() -> Span {
558 Span {
559 line: 1,
560 col: 1,
561 end_line: 1,
562 end_col: 1,
563 }
564 }
565
566 #[test]
567 fn start_and_end_map_to_progress() {
568 let start: WorkflowEvent = EngineEvent::WorkflowStart(5).into();
569 assert!(matches!(start, WorkflowEvent::Start { total_tasks: 5 }));
570 assert_eq!(start.category(), EventCategory::Progress);
571
572 let end: WorkflowEvent = EngineEvent::WorkflowEnd(
573 akribes_types::event::WorkflowEndPayload::new(Value::String("done".into())),
574 )
575 .into();
576 match end {
577 WorkflowEvent::End { output, .. } => {
578 assert_eq!(output, serde_json::Value::String("done".into()));
579 }
580 _ => panic!("expected End"),
581 }
582 }
583
584 #[test]
585 fn agent_output_maps_to_chunk() {
586 let evt: WorkflowEvent = EngineEvent::AgentOutput {
587 task_name: "summarise".into(),
588 agent_name: Some("gpt".into()),
589 task_id: "t1".into(),
590 schema_type: None,
591 chunk: "hi".into(),
592 }
593 .into();
594 match evt {
595 WorkflowEvent::AgentChunk {
596 task,
597 agent,
598 task_id,
599 chunk,
600 } => {
601 assert_eq!(task, "summarise");
602 assert_eq!(agent.as_deref(), Some("gpt"));
603 assert_eq!(task_id, "t1");
604 assert_eq!(chunk, "hi");
605 }
606 _ => panic!("expected AgentChunk"),
607 }
608 }
609
610 #[test]
611 fn tool_calls_map_to_tool_category() {
612 let start: WorkflowEvent = EngineEvent::ToolCallStart {
613 task_name: "t".into(),
614 tool_name: "web".into(),
615 server_name: "s".into(),
616 input: serde_json::json!({"q": "hi"}),
617 tool_use_id: String::new(),
618 }
619 .into();
620 assert_eq!(start.category(), EventCategory::Tool);
621
622 let end: WorkflowEvent = EngineEvent::ToolCallEnd {
623 task_name: "t".into(),
624 tool_name: "web".into(),
625 tool_use_id: String::new(),
626 output: serde_json::json!({"r": "ok"}),
627 duration: Duration::from_millis(10),
628 }
629 .into();
630 assert_eq!(end.category(), EventCategory::Tool);
631 }
632
633 #[test]
634 fn suspended_maps_to_checkpoint() {
635 let evt: WorkflowEvent = EngineEvent::Suspended {
636 checkpoint_name: "approve".into(),
637 token: "tok".into(),
638 prompt: "please".into(),
639 schema: serde_json::json!({}),
640 actor_hint: akribes_types::ast::ActorHint::Any,
641 timeout_secs: Some(30),
642 trigger: akribes_types::event::SuspendTrigger::DagPosition,
643 loop_context: None,
644 }
645 .into();
646 assert_eq!(evt.category(), EventCategory::Suspend);
647 match evt {
648 WorkflowEvent::Checkpoint {
649 name,
650 token,
651 timeout_secs,
652 trigger,
653 ..
654 } => {
655 assert_eq!(name, "approve");
656 assert_eq!(token, "tok");
657 assert_eq!(timeout_secs, Some(30));
658 assert!(matches!(trigger, SuspendTrigger::DagPosition));
659 }
660 _ => panic!("expected Checkpoint"),
661 }
662 }
663
664 #[test]
665 fn suspended_with_validation_exhausted_trigger_survives_translation() {
666 let evt: WorkflowEvent = EngineEvent::Suspended {
667 checkpoint_name: "review".into(),
668 token: "tok".into(),
669 prompt: "please review".into(),
670 schema: serde_json::json!({}),
671 actor_hint: akribes_types::ast::ActorHint::Human,
672 timeout_secs: None,
673 trigger: akribes_types::event::SuspendTrigger::ValidationExhausted {
674 task_name: "decompose".into(),
675 retry_count: 3,
676 last_attempt: "{\"bad\":true}".into(),
677 validation_errors: vec![akribes_types::event::ValidationErrorWire {
678 stage: "schema".into(),
679 message: "missing number".into(),
680 path: Some("/0".into()),
681 }],
682 },
683 loop_context: None,
684 }
685 .into();
686 match evt {
687 WorkflowEvent::Checkpoint { trigger, .. } => match trigger {
688 SuspendTrigger::ValidationExhausted {
689 task_name,
690 retry_count,
691 validation_errors,
692 ..
693 } => {
694 assert_eq!(task_name, "decompose");
695 assert_eq!(retry_count, 3);
696 assert_eq!(validation_errors.len(), 1);
697 assert_eq!(validation_errors[0].stage, "schema");
698 }
699 other => panic!("expected ValidationExhausted, got {other:?}"),
700 },
701 _ => panic!("expected Checkpoint"),
702 }
703 }
704
705 #[test]
706 fn suspended_with_agent_unable_trigger_survives_translation() {
707 let evt: WorkflowEvent = EngineEvent::Suspended {
708 checkpoint_name: "escalate".into(),
709 token: "tok".into(),
710 prompt: "take over".into(),
711 schema: serde_json::json!({}),
712 actor_hint: akribes_types::ast::ActorHint::Human,
713 timeout_secs: None,
714 trigger: akribes_types::event::SuspendTrigger::AgentUnable {
715 task_name: "decompose".into(),
716 unable: akribes_types::value::UnableRecord {
717 reason: "image too blurry".into(),
718 missing: vec!["claim_text".into()],
719 category: akribes_types::value::UnableCategory::InputAmbiguous,
720 },
721 },
722 loop_context: None,
723 }
724 .into();
725 match evt {
726 WorkflowEvent::Checkpoint { trigger, .. } => match trigger {
727 SuspendTrigger::AgentUnable { task_name, unable } => {
728 assert_eq!(task_name, "decompose");
729 assert_eq!(unable.reason, "image too blurry");
730 assert_eq!(unable.category, "input_ambiguous");
731 assert_eq!(unable.missing, vec!["claim_text".to_string()]);
732 }
733 other => panic!("expected AgentUnable, got {other:?}"),
734 },
735 _ => panic!("expected Checkpoint"),
736 }
737 }
738
739 #[test]
740 fn tool_approval_has_suspend_category() {
741 let evt: WorkflowEvent = EngineEvent::ToolApprovalPending {
742 execution_id: Some("exec".into()),
743 node_id: Some(1),
744 token: "tk".into(),
745 tool_ref: "web".into(),
746 args: serde_json::json!({}),
747 }
748 .into();
749 assert_eq!(evt.category(), EventCategory::Suspend);
750 }
751
752 #[test]
753 fn log_has_other_category() {
754 let evt: WorkflowEvent = EngineEvent::Log("hello".into()).into();
755 assert_eq!(evt.category(), EventCategory::Other);
756 }
757
758 #[test]
759 fn tool_approval_maps() {
760 let evt: WorkflowEvent = EngineEvent::ToolApprovalPending {
761 execution_id: Some("e1".into()),
762 node_id: Some(42),
763 token: "tok".into(),
764 tool_ref: "web.search".into(),
765 args: serde_json::json!({"q": "hi"}),
766 }
767 .into();
768 match evt {
769 WorkflowEvent::ToolApproval {
770 token,
771 tool_ref,
772 node_id,
773 ..
774 } => {
775 assert_eq!(token, "tok");
776 assert_eq!(tool_ref, "web.search");
777 assert_eq!(node_id, Some(42));
778 }
779 _ => panic!("expected ToolApproval"),
780 }
781 }
782
783 #[test]
784 fn breakpoint_casts_node_id() {
785 let mut env = std::collections::HashMap::new();
786 env.insert("x".to_string(), Value::Int(7));
787 let evt: WorkflowEvent = EngineEvent::Breakpoint {
788 node_id: 3usize,
789 span: span(),
790 token: "tok".into(),
791 env_snapshot: env,
792 }
793 .into();
794 match evt {
795 WorkflowEvent::Breakpoint {
796 token,
797 node_id,
798 env,
799 } => {
800 assert_eq!(token, "tok");
801 assert_eq!(node_id, 3u64);
802 assert_eq!(env.get("x"), Some(&serde_json::json!(7)));
803 }
804 _ => panic!("expected Breakpoint"),
805 }
806 }
807
808 #[test]
809 fn error_maps_to_error_category() {
810 let evt: WorkflowEvent = EngineEvent::error_kind(ErrorKind::ScriptError, "boom").into();
811 assert_eq!(evt.category(), EventCategory::Error);
812 match evt {
813 WorkflowEvent::Error { message, kind, .. } => {
814 assert_eq!(message, "boom");
815 assert_eq!(kind, ErrorKind::ScriptError);
816 }
817 _ => panic!("expected Error"),
818 }
819 }
820
821 #[test]
822 fn task_end_preserves_usage() {
823 let usage = TokenUsage {
824 input_tokens: 10,
825 output_tokens: 20,
826 model: "m".into(),
827 provider: "p".into(),
828 cached_input_tokens: 0,
829 cache_write_input_tokens: 0,
830 cache_write_5m_input_tokens: 0,
831 cache_write_1h_input_tokens: 0,
832 stop_reason: None,
833 raw_stop_reason: None,
834 reasoning_tokens: 0,
835 };
836 let evt: WorkflowEvent = EngineEvent::TaskEnd {
837 task: "t".into(),
838 on_error_label: None,
839 value: Value::String("ok".into()),
840 value_type: None,
841 duration: Duration::from_millis(100),
842 attempt: 1,
843 usage: Some(usage),
844 variant: akribes_types::event::TaskEndVariant::Success,
845 }
846 .into();
847 match evt {
848 WorkflowEvent::TaskEnd {
849 task,
850 usage,
851 duration,
852 variant,
853 ..
854 } => {
855 assert_eq!(task, "t");
856 assert_eq!(duration, Duration::from_millis(100));
857 assert_eq!(usage.unwrap().input_tokens, 10);
858 assert_eq!(variant, TaskEndVariant::Success);
859 }
860 _ => panic!("expected TaskEnd"),
861 }
862 }
863
864 #[test]
865 fn task_end_propagates_unable_variant() {
866 let evt: WorkflowEvent = EngineEvent::TaskEnd {
869 task: "decompose".into(),
870 on_error_label: None,
871 value: Value::Unable(akribes_types::value::UnableRecord {
872 reason: "image too blurry".into(),
873 missing: vec![],
874 category: akribes_types::value::UnableCategory::InputAmbiguous,
875 }),
876 value_type: None,
877 duration: Duration::from_millis(10),
878 attempt: 1,
879 usage: None,
880 variant: akribes_types::event::TaskEndVariant::Unable,
881 }
882 .into();
883 match evt {
884 WorkflowEvent::TaskEnd { variant, .. } => {
885 assert_eq!(variant, TaskEndVariant::Unable);
886 }
887 _ => panic!("expected TaskEnd"),
888 }
889 }
890
891 fn assert_other_named(evt: EngineEvent, expected: &str) {
894 let wf: WorkflowEvent = evt.into();
895 match wf {
896 WorkflowEvent::Other { type_name, .. } => {
897 assert_eq!(type_name, expected);
898 }
899 _ => panic!("expected Other({}), got {:?}", expected, wf),
900 }
901 }
902
903 #[test]
904 fn log_is_other() {
905 assert_other_named(EngineEvent::Log("hi".into()), "Log");
906 }
907
908 #[test]
909 fn state_update_is_other() {
910 assert_other_named(
911 EngineEvent::StateUpdate("x".into(), Value::Int(1)),
912 "StateUpdate",
913 );
914 }
915
916 #[test]
917 fn node_start_end_are_other() {
918 assert_other_named(EngineEvent::NodeStart(0, span()), "NodeStart");
919 assert_other_named(
920 EngineEvent::NodeEnd {
921 node_id: 0,
922 span: span(),
923 target_var: None,
924 value: None,
925 duration: Duration::ZERO,
926 },
927 "NodeEnd",
928 );
929 }
930
931 #[test]
932 fn resumed_is_other() {
933 assert_other_named(
934 EngineEvent::Resumed {
935 checkpoint_name: "c".into(),
936 token: "t".into(),
937 },
938 "Resumed",
939 );
940 }
941
942 #[test]
943 fn breakpoint_resumed_is_other() {
944 assert_other_named(
945 EngineEvent::BreakpointResumed {
946 node_id: 1,
947 token: "t".into(),
948 },
949 "BreakpointResumed",
950 );
951 }
952
953 #[test]
954 fn mcp_degraded_recovered_are_other() {
955 assert_other_named(
956 EngineEvent::McpServerDegraded {
957 alias: "a".into(),
958 reason: "r".into(),
959 },
960 "McpServerDegraded",
961 );
962 assert_other_named(
963 EngineEvent::McpServerRecovered { alias: "a".into() },
964 "McpServerRecovered",
965 );
966 }
967
968 #[test]
969 fn task_prompt_is_other() {
970 assert_other_named(
971 EngineEvent::TaskPrompt("t".into(), "p".into()),
972 "TaskPrompt",
973 );
974 }
975
976 #[test]
977 fn verification_events_are_other() {
978 assert_other_named(
979 EngineEvent::VerificationStart {
980 workflow_name: "w".into(),
981 },
982 "VerificationStart",
983 );
984 assert_other_named(
985 EngineEvent::VerificationResult {
986 workflow_name: "w".into(),
987 results: serde_json::json!({}),
988 duration: Duration::ZERO,
989 },
990 "VerificationResult",
991 );
992 }
993
994 #[test]
995 fn other_payload_preserves_type_tag() {
996 let evt: WorkflowEvent = EngineEvent::Log("hello".into()).into();
997 match evt {
998 WorkflowEvent::Other { type_name, payload } => {
999 assert_eq!(type_name, "Log");
1000 assert_eq!(payload["type"], "Log");
1001 assert_eq!(payload["payload"], "hello");
1002 }
1003 _ => panic!("expected Other"),
1004 }
1005 }
1006
1007 #[test]
1008 fn validation_failure_maps_to_typed_variant() {
1009 let evt: WorkflowEvent = EngineEvent::ValidationFailure {
1010 task_name: "decompose".into(),
1011 attempt: 2,
1012 model_response: "{}".into(),
1013 missing_fields: vec!["/claim_text".into()],
1014 extra_fields: vec![],
1015 type_errors: vec![],
1016 stop_reason: Some("max_tokens".into()),
1017 truncated: false,
1018 total_length: 2,
1019 }
1020 .into();
1021 match evt {
1022 WorkflowEvent::ValidationFailure {
1023 task_name,
1024 attempt,
1025 model_response,
1026 missing_fields,
1027 extra_fields,
1028 type_errors,
1029 stop_reason,
1030 } => {
1031 assert_eq!(task_name, "decompose");
1032 assert_eq!(attempt, 2);
1033 assert_eq!(model_response, "{}");
1034 assert_eq!(missing_fields, vec!["/claim_text".to_string()]);
1035 assert!(extra_fields.is_empty());
1036 assert!(type_errors.is_empty());
1037 assert_eq!(stop_reason.as_deref(), Some("max_tokens"));
1038 }
1039 other => panic!("expected ValidationFailure, got {:?}", other),
1040 }
1041 }
1042
1043 #[test]
1044 fn validation_failure_has_output_category() {
1045 let evt: WorkflowEvent = EngineEvent::ValidationFailure {
1046 task_name: "t".into(),
1047 attempt: 1,
1048 model_response: "".into(),
1049 missing_fields: vec![],
1050 extra_fields: vec![],
1051 type_errors: vec![],
1052 stop_reason: None,
1053 truncated: false,
1054 total_length: 0,
1055 }
1056 .into();
1057 assert_eq!(evt.category(), EventCategory::Output);
1058 }
1059
1060 use crate::runtime::{
1068 RuntimeEndPayload, RuntimeErrorPayload, RuntimeEvent, RuntimeStartPayload,
1069 RuntimeStderrPayload, RuntimeStdoutPayload,
1070 };
1071
1072 #[test]
1073 fn runtime_event_projects_every_variant() {
1074 let cases: [(RuntimeEvent, fn(&WorkflowEvent) -> bool); 5] = [
1075 (
1076 RuntimeEvent::RuntimeStart(RuntimeStartPayload {
1077 task_name: "t".into(),
1078 runtime_name: "r".into(),
1079 language: "python".into(),
1080 }),
1081 |e| matches!(e, WorkflowEvent::RuntimeStart { language, .. } if language == "python"),
1082 ),
1083 (
1084 RuntimeEvent::RuntimeStdout(RuntimeStdoutPayload {
1085 task_name: "t".into(),
1086 chunk: "x".into(),
1087 }),
1088 |e| matches!(e, WorkflowEvent::RuntimeStdout { chunk, .. } if chunk == "x"),
1089 ),
1090 (
1091 RuntimeEvent::RuntimeStderr(RuntimeStderrPayload {
1092 task_name: "t".into(),
1093 chunk: "x".into(),
1094 }),
1095 |e| matches!(e, WorkflowEvent::RuntimeStderr { .. }),
1096 ),
1097 (
1098 RuntimeEvent::RuntimeEnd(RuntimeEndPayload {
1099 task_name: "t".into(),
1100 exit_code: 0,
1101 duration_ms: 4242,
1102 }),
1103 |e| {
1104 matches!(
1105 e,
1106 WorkflowEvent::RuntimeEnd {
1107 duration_ms: 4242,
1108 ..
1109 }
1110 )
1111 },
1112 ),
1113 (
1114 RuntimeEvent::RuntimeError(RuntimeErrorPayload {
1115 task_name: "t".into(),
1116 kind: "Timeout".into(),
1117 message: "x".into(),
1118 }),
1119 |e| matches!(e, WorkflowEvent::RuntimeError { kind, .. } if kind == "Timeout"),
1120 ),
1121 ];
1122 for (input, check) in cases {
1123 let evt: WorkflowEvent = input.into();
1124 assert!(check(&evt), "projection failed: {evt:?}");
1125 }
1126 }
1127
1128 #[test]
1129 fn runtime_events_are_not_terminal() {
1130 let events = [
1133 WorkflowEvent::RuntimeStart {
1134 task_name: "t".into(),
1135 runtime_name: "r".into(),
1136 language: "python".into(),
1137 },
1138 WorkflowEvent::RuntimeStdout {
1139 task_name: "t".into(),
1140 chunk: "x".into(),
1141 },
1142 WorkflowEvent::RuntimeStderr {
1143 task_name: "t".into(),
1144 chunk: "x".into(),
1145 },
1146 WorkflowEvent::RuntimeEnd {
1147 task_name: "t".into(),
1148 exit_code: 0,
1149 duration_ms: 0,
1150 },
1151 WorkflowEvent::RuntimeError {
1152 task_name: "t".into(),
1153 kind: "Timeout".into(),
1154 message: "x".into(),
1155 },
1156 ];
1157 for evt in events {
1158 assert!(!evt.is_terminal(), "{evt:?} should not be terminal");
1159 }
1160 }
1161}