1use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Instant;
14
15use parking_lot::RwLock; use tokio::sync::broadcast;
17
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20
21use crate::trace::TraceWriter;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29pub struct ContextSource {
30 pub node: String,
32 pub tokens: u64,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct ExcludedItem {
39 pub node: String,
41 pub reason: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
62pub struct AgentTurnMetadata {
63 #[serde(skip_serializing_if = "Option::is_none")]
68 pub thinking: Option<String>,
69
70 pub response_text: String,
72
73 pub input_tokens: u64,
75
76 pub output_tokens: u64,
78
79 #[serde(default)]
81 pub cache_read_tokens: u64,
82
83 pub stop_reason: String,
85}
86
87impl AgentTurnMetadata {
88 pub fn text_only(response: impl Into<String>, stop_reason: impl Into<String>) -> Self {
90 Self {
91 thinking: None,
92 response_text: response.into(),
93 input_tokens: 0,
94 output_tokens: 0,
95 cache_read_tokens: 0,
96 stop_reason: stop_reason.into(),
97 }
98 }
99
100 pub fn with_usage(
102 response: impl Into<String>,
103 input_tokens: u64,
104 output_tokens: u64,
105 stop_reason: impl Into<String>,
106 ) -> Self {
107 Self {
108 thinking: None,
109 response_text: response.into(),
110 input_tokens,
111 output_tokens,
112 cache_read_tokens: 0,
113 stop_reason: stop_reason.into(),
114 }
115 }
116
117 pub fn total_tokens(&self) -> u64 {
119 self.input_tokens + self.output_tokens
120 }
121
122 pub fn has_thinking(&self) -> bool {
124 self.thinking.is_some()
125 }
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct Event {
131 pub id: u64,
133 pub timestamp_ms: u64,
135 pub kind: EventKind,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
143#[serde(tag = "type", rename_all = "snake_case")]
144pub enum EventKind {
145 WorkflowStarted {
149 task_count: usize,
150 generation_id: String,
152 workflow_hash: String,
154 nika_version: String,
156 },
157 WorkflowCompleted {
158 final_output: Arc<Value>,
159 total_duration_ms: u64,
160 },
161 WorkflowFailed {
162 error: String,
163 failed_task: Option<Arc<str>>,
164 },
165 WorkflowAborted {
167 reason: String,
169 duration_ms: u64,
171 running_tasks: Vec<Arc<str>>,
173 },
174 WorkflowPaused,
176 WorkflowResumed,
178
179 TaskScheduled {
183 task_id: Arc<str>,
184 dependencies: Vec<Arc<str>>,
185 },
186 TaskStarted {
188 task_id: Arc<str>,
189 verb: Arc<str>,
191 inputs: Value,
193 },
194 TaskCompleted {
195 task_id: Arc<str>,
196 output: Arc<Value>,
197 duration_ms: u64,
198 },
199 TaskFailed {
200 task_id: Arc<str>,
201 error: String,
202 duration_ms: u64,
203 #[serde(skip_serializing_if = "Option::is_none")]
205 error_code: Option<String>,
206 },
207 TaskSkipped {
209 task_id: Arc<str>,
210 reason: String,
212 },
213
214 TemplateResolved {
218 task_id: Arc<str>,
219 template: String,
220 result: String,
221 },
222 ProviderCalled {
223 task_id: Arc<str>,
224 provider: String,
225 model: String,
226 prompt_len: usize,
227 },
228 ProviderResponded {
229 task_id: Arc<str>,
230 request_id: Option<String>,
232 input_tokens: u64,
234 output_tokens: u64,
236 cache_read_tokens: u64,
238 ttft_ms: Option<u64>,
240 finish_reason: String,
242 cost_usd: f64,
244 },
245
246 ContextAssembled {
251 task_id: Arc<str>,
252 sources: Vec<ContextSource>,
254 excluded: Vec<ExcludedItem>,
256 total_tokens: u64,
258 budget_used_pct: f32,
260 truncated: bool,
262 },
263
264 McpInvoke {
269 task_id: Arc<str>,
270 call_id: String,
272 mcp_server: String,
273 tool: Option<String>,
274 resource: Option<String>,
275 #[serde(skip_serializing_if = "Option::is_none")]
277 params: Option<Value>,
278 },
279 McpResponse {
281 task_id: Arc<str>,
282 call_id: String,
284 output_len: usize,
285 duration_ms: u64,
287 cached: bool,
289 is_error: bool,
291 #[serde(skip_serializing_if = "Option::is_none")]
293 response: Option<Value>,
294 },
295 McpConnected {
297 server_name: String,
299 },
300 McpError {
302 server_name: String,
304 error: String,
306 },
307 McpRetry {
313 task_id: Arc<str>,
315 server_name: String,
317 operation: String,
319 attempt: u32,
321 max_attempts: u32,
323 error: String,
325 },
326
327 AgentStart {
332 task_id: Arc<str>,
333 max_turns: u32,
334 mcp_servers: Vec<String>,
335 },
336 AgentTurn {
344 task_id: Arc<str>,
345 turn_index: u32,
346 kind: String,
348 #[serde(skip_serializing_if = "Option::is_none")]
350 metadata: Option<AgentTurnMetadata>,
351 },
352 AgentComplete {
354 task_id: Arc<str>,
355 turns: u32,
356 stop_reason: String,
357 },
358
359 AgentSpawned {
364 parent_task_id: Arc<str>,
366 child_task_id: Arc<str>,
368 depth: u32,
370 },
371
372 GuardrailPassed {
377 task_id: Arc<str>,
379 guardrail_type: String,
381 description: String,
383 },
384 GuardrailFailed {
386 task_id: Arc<str>,
388 guardrail_type: String,
390 description: String,
392 message: String,
394 },
395 GuardrailEscalation {
400 task_id: Arc<str>,
402 guardrail_type: String,
404 guardrail_id: String,
406 message: String,
408 severity: String,
410 #[serde(skip_serializing_if = "Option::is_none")]
412 suggested_action: Option<String>,
413 },
414
415 Log {
420 level: String,
422 message: String,
424 #[serde(skip_serializing_if = "Option::is_none")]
426 task_id: Option<Arc<str>>,
427 },
428
429 Custom {
431 name: String,
433 payload: Value,
435 #[serde(skip_serializing_if = "Option::is_none")]
437 task_id: Option<Arc<str>>,
438 },
439
440 ArtifactWritten {
445 task_id: Arc<str>,
447 path: String,
449 size: u64,
451 format: String,
453 #[serde(skip_serializing_if = "Option::is_none")]
455 checksum: Option<String>,
456 },
457 ArtifactFailed {
459 task_id: Arc<str>,
461 path: String,
463 reason: String,
465 },
466
467 MediaExtracted {
472 task_id: Arc<str>,
473 block_count: u32,
475 content_types: Vec<String>,
477 },
478
479 MediaProcessed {
481 task_id: Arc<str>,
482 hash: String,
484 mime_type: String,
486 size_bytes: u64,
488 },
489
490 MediaStored {
492 task_id: Arc<str>,
493 hash: String,
495 path: String,
497 size_bytes: u64,
499 verified: bool,
501 deduplicated: bool,
503 pipeline_ms: u64,
505 },
506
507 MediaStoreFailed {
509 task_id: Arc<str>,
510 hash: String,
512 reason: String,
514 },
515
516 MediaIntegrityCheck {
518 checked: u64,
520 warnings: u64,
522 },
523
524 StructuredOutputAttempt {
535 task_id: Arc<str>,
537 layer: u8,
539 layer_name: String,
541 attempt: u32,
543 success: bool,
545 #[serde(skip_serializing_if = "Option::is_none")]
547 error: Option<String>,
548 },
549 StructuredOutputSuccess {
553 task_id: Arc<str>,
555 layer: u8,
557 layer_name: String,
559 total_attempts: u32,
561 },
562
563 VisionContentResolved {
571 task_id: Arc<str>,
573 image_count: u32,
575 total_bytes: u64,
577 resolve_ms: u64,
579 },
580
581 HttpRequest {
586 task_id: Arc<str>,
587 method: String,
588 url: String,
589 has_body: bool,
590 },
591 HttpResponse {
593 task_id: Arc<str>,
594 status_code: u16,
595 content_type: Option<String>,
596 content_length: Option<u64>,
597 elapsed_ms: u64,
598 },
599
600 MediaCleanup {
605 removed: u64,
607 bytes_freed: u64,
609 dry_run: bool,
611 },
612
613 ExecCompleted {
618 task_id: Arc<str>,
619 exit_code: i32,
621 stdout_len: usize,
623 stderr_len: usize,
625 duration_ms: u64,
627 },
628
629 FetchRetry {
634 task_id: Arc<str>,
635 url: String,
637 attempt: u32,
639 max_attempts: u32,
641 #[serde(skip_serializing_if = "Option::is_none")]
643 status_code: Option<u16>,
644 backoff_ms: u64,
646 },
647
648 PolicyBlocked {
653 task_id: Arc<str>,
654 verb: String,
656 policy_type: String,
658 reason: String,
660 },
661
662 BootPhaseCompleted {
667 phase: String,
670 success: bool,
672 duration_ms: u64,
674 #[serde(skip_serializing_if = "Vec::is_empty")]
676 warnings: Vec<String>,
677 },
678
679 NativeModelLoaded {
681 model: String,
683 kind: String,
685 size_bytes: u64,
687 duration_ms: u64,
689 is_vision: bool,
691 },
692
693 BindingDefaultApplied {
698 task_id: Arc<str>,
699 alias: String,
701 path: String,
703 default_value: Value,
705 },
706
707 BindingTransformApplied {
709 task_id: Arc<str>,
710 alias: String,
712 transform_chain: String,
714 },
715
716 BindingEnvResolved {
718 task_id: Arc<str>,
719 var_name: String,
721 found: bool,
723 },
724
725 DecomposeStarted {
730 task_id: Arc<str>,
731 strategy: String,
733 },
734
735 DecomposeCompleted {
737 task_id: Arc<str>,
738 strategy: String,
740 item_count: usize,
742 duration_ms: u64,
744 },
745
746 ForEachStarted {
748 task_id: Arc<str>,
749 item_count: usize,
751 concurrency: usize,
753 fail_fast: bool,
755 },
756
757 ForEachCompleted {
759 task_id: Arc<str>,
760 total: u32,
762 succeeded: u32,
764 failed: u32,
766 skipped: u32,
768 duration_ms: u64,
770 },
771
772 ProviderInitialized {
777 provider: String,
779 model: String,
781 cached: bool,
783 },
784
785 BuiltinToolInvoked {
787 task_id: Arc<str>,
788 tool_name: String,
790 duration_ms: u64,
792 success: bool,
794 },
795
796 ExtractApplied {
801 task_id: Arc<str>,
802 mode: String,
804 #[serde(skip_serializing_if = "Option::is_none")]
806 selector: Option<String>,
807 input_len: usize,
809 output_len: usize,
811 },
812}
813
814impl EventKind {
815 pub fn task_id(&self) -> Option<&str> {
817 match self {
818 Self::TaskScheduled { task_id, .. }
819 | Self::TaskStarted { task_id, .. }
820 | Self::TaskCompleted { task_id, .. }
821 | Self::TaskFailed { task_id, .. }
822 | Self::TaskSkipped { task_id, .. }
823 | Self::TemplateResolved { task_id, .. }
824 | Self::ProviderCalled { task_id, .. }
825 | Self::ProviderResponded { task_id, .. }
826 | Self::ContextAssembled { task_id, .. }
827 | Self::McpInvoke { task_id, .. }
828 | Self::McpResponse { task_id, .. }
829 | Self::McpRetry { task_id, .. } | Self::AgentStart { task_id, .. }
831 | Self::AgentTurn { task_id, .. }
832 | Self::AgentComplete { task_id, .. }
833 | Self::ArtifactWritten { task_id, .. }
834 | Self::ArtifactFailed { task_id, .. }
835 | Self::VisionContentResolved { task_id, .. }
836 | Self::MediaExtracted { task_id, .. }
837 | Self::MediaProcessed { task_id, .. }
838 | Self::MediaStored { task_id, .. }
839 | Self::MediaStoreFailed { task_id, .. }
840 | Self::StructuredOutputAttempt { task_id, .. }
841 | Self::StructuredOutputSuccess { task_id, .. }
842 | Self::HttpRequest { task_id, .. }
843 | Self::HttpResponse { task_id, .. }
844 | Self::GuardrailPassed { task_id, .. }
845 | Self::GuardrailFailed { task_id, .. }
846 | Self::GuardrailEscalation { task_id, .. }
847 | Self::ExecCompleted { task_id, .. }
848 | Self::FetchRetry { task_id, .. }
849 | Self::PolicyBlocked { task_id, .. }
850 | Self::BindingDefaultApplied { task_id, .. }
851 | Self::BindingTransformApplied { task_id, .. }
852 | Self::BindingEnvResolved { task_id, .. }
853 | Self::DecomposeStarted { task_id, .. }
854 | Self::DecomposeCompleted { task_id, .. }
855 | Self::ForEachStarted { task_id, .. }
856 | Self::ForEachCompleted { task_id, .. }
857 | Self::BuiltinToolInvoked { task_id, .. }
858 | Self::ExtractApplied { task_id, .. } => Some(task_id),
859 Self::AgentSpawned { parent_task_id, .. } => Some(parent_task_id),
861 Self::Log { task_id, .. } | Self::Custom { task_id, .. } => {
863 task_id.as_ref().map(|s| s.as_ref())
864 }
865 Self::WorkflowStarted { .. }
866 | Self::WorkflowCompleted { .. }
867 | Self::WorkflowFailed { .. }
868 | Self::WorkflowAborted { .. }
869 | Self::WorkflowPaused
870 | Self::WorkflowResumed
871 | Self::McpConnected { .. }
872 | Self::McpError { .. }
873 | Self::MediaCleanup { .. }
874 | Self::MediaIntegrityCheck { .. }
875 | Self::BootPhaseCompleted { .. }
876 | Self::NativeModelLoaded { .. }
877 | Self::ProviderInitialized { .. } => None,
878 }
879 }
880
881 pub fn is_workflow_event(&self) -> bool {
883 matches!(
884 self,
885 Self::WorkflowStarted { .. }
886 | Self::WorkflowCompleted { .. }
887 | Self::WorkflowFailed { .. }
888 | Self::WorkflowAborted { .. }
889 | Self::WorkflowPaused
890 | Self::WorkflowResumed
891 )
892 }
893}
894
895const MAX_EVENTS: usize = 10_000;
906
907#[derive(Clone)]
908pub struct EventLog {
909 events: Arc<RwLock<Vec<Event>>>,
910 start_time: Instant,
911 next_id: Arc<AtomicU64>,
912 broadcast_tx: Option<broadcast::Sender<Event>>,
914 trace_writer: Option<Arc<TraceWriter>>,
918}
919
920impl EventLog {
921 pub fn new() -> Self {
923 Self {
924 events: Arc::new(RwLock::new(Vec::with_capacity(256))),
925 start_time: Instant::now(),
926 next_id: Arc::new(AtomicU64::new(0)),
927 broadcast_tx: None,
928 trace_writer: None,
929 }
930 }
931
932 pub fn new_with_broadcast() -> (Self, broadcast::Receiver<Event>) {
937 let (tx, rx) = broadcast::channel(512);
938 let event_log = Self {
939 events: Arc::new(RwLock::new(Vec::with_capacity(256))),
940 start_time: Instant::now(),
941 next_id: Arc::new(AtomicU64::new(0)),
942 broadcast_tx: Some(tx),
943 trace_writer: None,
944 };
945 (event_log, rx)
946 }
947
948 pub fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
952 self.broadcast_tx.as_ref().map(|tx| tx.subscribe())
953 }
954
955 pub fn attach_trace_writer(&mut self, writer: TraceWriter) {
961 self.trace_writer = Some(Arc::new(writer));
962 }
963
964 pub fn emit(&self, kind: EventKind) -> u64 {
970 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
973 let event = Event {
974 id,
975 timestamp_ms: self.start_time.elapsed().as_millis() as u64,
976 kind,
977 };
978
979 let needs_broadcast = self.broadcast_tx.is_some();
983 let needs_trace = self.trace_writer.is_some();
984
985 if needs_broadcast || needs_trace {
986 let for_vec = event.clone();
988
989 if let Some(ref writer) = self.trace_writer {
990 let _ = writer.append_event(&event);
991 }
992
993 if let Some(ref tx) = self.broadcast_tx {
994 let _ = tx.send(event);
995 } else {
996 drop(event); }
998
999 let mut events = self.events.write();
1000 if events.len() >= MAX_EVENTS {
1001 let drain_to = events.len() / 2;
1003 events.drain(..drain_to);
1004 }
1005 events.push(for_vec);
1006 } else {
1007 let mut events = self.events.write();
1009 if events.len() >= MAX_EVENTS {
1010 let drain_to = events.len() / 2;
1011 events.drain(..drain_to);
1012 }
1013 events.push(event);
1014 }
1015
1016 id
1017 }
1018
1019 pub fn events(&self) -> Vec<Event> {
1021 self.events.read().clone()
1022 }
1023
1024 pub fn with_events<T>(&self, f: impl FnOnce(&[Event]) -> T) -> T {
1029 f(&self.events.read())
1030 }
1031
1032 pub fn filter_task(&self, task_id: &str) -> Vec<Event> {
1034 self.with_events(|events| {
1035 events
1036 .iter()
1037 .filter(|e| e.kind.task_id() == Some(task_id))
1038 .cloned()
1039 .collect()
1040 })
1041 }
1042
1043 pub fn workflow_events(&self) -> Vec<Event> {
1045 self.with_events(|events| {
1046 events
1047 .iter()
1048 .filter(|e| e.kind.is_workflow_event())
1049 .cloned()
1050 .collect()
1051 })
1052 }
1053
1054 pub fn count_task(&self, task_id: &str) -> usize {
1056 self.with_events(|events| {
1057 events
1058 .iter()
1059 .filter(|e| e.kind.task_id() == Some(task_id))
1060 .count()
1061 })
1062 }
1063
1064 pub fn to_json(&self) -> Value {
1066 self.with_events(|events| serde_json::to_value(events).unwrap_or(Value::Null))
1067 }
1068
1069 pub fn events_since(&self, since_id: Option<u64>) -> Vec<Event> {
1070 self.with_events(|events| {
1071 events
1072 .iter()
1073 .filter(|e| since_id.is_none_or(|last| e.id > last))
1074 .cloned()
1075 .collect()
1076 })
1077 }
1078
1079 pub fn with_events_since<R>(&self, since_id: Option<u64>, f: impl FnOnce(&[Event]) -> R) -> R {
1085 self.with_events(|events| {
1086 if let Some(last_id) = since_id {
1087 let start = events.partition_point(|e| e.id <= last_id);
1088 f(&events[start..])
1089 } else {
1090 f(events)
1091 }
1092 })
1093 }
1094
1095 pub fn len(&self) -> usize {
1097 self.events.read().len()
1098 }
1099
1100 pub fn is_empty(&self) -> bool {
1102 self.len() == 0
1103 }
1104}
1105
1106impl Default for EventLog {
1107 fn default() -> Self {
1108 Self::new()
1109 }
1110}
1111
1112impl std::fmt::Debug for EventLog {
1113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1114 f.debug_struct("EventLog")
1115 .field("len", &self.len())
1116 .finish()
1117 }
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122 use super::*;
1123 use serde_json::json;
1124
1125 const TEST_VERSION: &str = env!("CARGO_PKG_VERSION");
1127
1128 fn workflow_started(task_count: usize) -> EventKind {
1134 EventKind::WorkflowStarted {
1135 task_count,
1136 generation_id: "test-gen-123".to_string(),
1137 workflow_hash: "abc123".to_string(),
1138 nika_version: TEST_VERSION.to_string(),
1139 }
1140 }
1141
1142 fn provider_responded(task_id: &str, input_tokens: u64, output_tokens: u64) -> EventKind {
1144 EventKind::ProviderResponded {
1145 task_id: Arc::from(task_id),
1146 request_id: Some("req-456".to_string()),
1147 input_tokens,
1148 output_tokens,
1149 cache_read_tokens: 0,
1150 ttft_ms: Some(150),
1151 finish_reason: "stop".to_string(),
1152 cost_usd: 0.001,
1153 }
1154 }
1155
1156 #[test]
1161 fn eventkind_task_id_extraction() {
1162 let started = EventKind::TaskStarted {
1163 verb: "infer".into(),
1164 task_id: "task1".into(),
1165 inputs: json!({}),
1166 };
1167 assert_eq!(started.task_id(), Some("task1"));
1168
1169 let workflow = workflow_started(5);
1170 assert_eq!(workflow.task_id(), None);
1171 }
1172
1173 #[test]
1174 fn eventkind_is_workflow_event() {
1175 assert!(workflow_started(3).is_workflow_event());
1176 assert!(EventKind::WorkflowCompleted {
1177 final_output: Arc::new(json!("done")),
1178 total_duration_ms: 1000,
1179 }
1180 .is_workflow_event());
1181 assert!(!EventKind::TaskStarted {
1182 verb: "infer".into(),
1183 task_id: "t1".into(),
1184 inputs: json!({}),
1185 }
1186 .is_workflow_event());
1187 }
1188
1189 #[test]
1190 fn eventkind_serializes_with_type_tag() {
1191 let kind = EventKind::TaskCompleted {
1192 task_id: "greet".into(),
1193 output: Arc::new(json!({"message": "Hello"})),
1194 duration_ms: 150,
1195 };
1196
1197 let json = serde_json::to_value(&kind).unwrap();
1198 assert_eq!(json["type"], "task_completed");
1199 assert_eq!(json["task_id"], "greet");
1200 assert_eq!(json["output"]["message"], "Hello");
1201 }
1202
1203 #[test]
1204 fn eventkind_deserializes_from_tagged_json() {
1205 let json = json!({
1206 "type": "task_started",
1207 "task_id": "analyze",
1208 "verb": "infer",
1209 "inputs": {"weather": "sunny"}
1210 });
1211
1212 let kind: EventKind = serde_json::from_value(json).unwrap();
1213 assert_eq!(
1214 kind,
1215 EventKind::TaskStarted {
1216 task_id: "analyze".into(),
1217 verb: "infer".into(),
1218 inputs: json!({"weather": "sunny"}),
1219 }
1220 );
1221 }
1222
1223 #[test]
1228 fn eventlog_new_starts_empty() {
1229 let log = EventLog::new();
1230 assert!(log.is_empty());
1231 assert_eq!(log.len(), 0);
1232 }
1233
1234 #[test]
1235 fn eventlog_emit_returns_monotonic_ids() {
1236 let log = EventLog::new();
1237
1238 let id1 = log.emit(workflow_started(3));
1239 let id2 = log.emit(EventKind::TaskStarted {
1240 verb: "infer".into(),
1241 task_id: "t1".into(),
1242 inputs: json!({}),
1243 });
1244 let id3 = log.emit(EventKind::TaskStarted {
1245 verb: "infer".into(),
1246 task_id: "t2".into(),
1247 inputs: json!({}),
1248 });
1249
1250 assert_eq!(id1, 0);
1251 assert_eq!(id2, 1);
1252 assert_eq!(id3, 2);
1253 assert_eq!(log.len(), 3);
1254 }
1255
1256 #[test]
1257 fn eventlog_events_returns_all() {
1258 let log = EventLog::new();
1259 log.emit(workflow_started(2));
1260 log.emit(EventKind::TaskStarted {
1261 verb: "infer".into(),
1262 task_id: "t1".into(),
1263 inputs: json!({}),
1264 });
1265
1266 let events = log.events();
1267 assert_eq!(events.len(), 2);
1268 assert_eq!(events[0].id, 0);
1269 assert_eq!(events[1].id, 1);
1270 }
1271
1272 #[test]
1273 fn eventlog_filter_task_returns_only_matching() {
1274 let log = EventLog::new();
1275 log.emit(workflow_started(2));
1276 log.emit(EventKind::TaskStarted {
1277 verb: "infer".into(),
1278 task_id: "alpha".into(),
1279 inputs: json!({}),
1280 });
1281 log.emit(EventKind::TaskStarted {
1282 verb: "infer".into(),
1283 task_id: "beta".into(),
1284 inputs: json!({}),
1285 });
1286 log.emit(EventKind::TaskCompleted {
1287 task_id: "alpha".into(),
1288 output: Arc::new(json!("result")),
1289 duration_ms: 100,
1290 });
1291
1292 let alpha_events = log.filter_task("alpha");
1293 assert_eq!(alpha_events.len(), 2); assert!(alpha_events
1295 .iter()
1296 .all(|e| e.kind.task_id() == Some("alpha")));
1297
1298 let beta_events = log.filter_task("beta");
1299 assert_eq!(beta_events.len(), 1);
1300 }
1301
1302 #[test]
1303 fn eventlog_workflow_events_returns_only_workflow() {
1304 let log = EventLog::new();
1305 log.emit(workflow_started(1));
1306 log.emit(EventKind::TaskStarted {
1307 verb: "infer".into(),
1308 task_id: "t1".into(),
1309 inputs: json!({}),
1310 });
1311 log.emit(EventKind::WorkflowCompleted {
1312 final_output: Arc::new(json!("done")),
1313 total_duration_ms: 500,
1314 });
1315
1316 let wf_events = log.workflow_events();
1317 assert_eq!(wf_events.len(), 2);
1318 assert!(wf_events.iter().all(|e| e.kind.is_workflow_event()));
1319 }
1320
1321 #[test]
1322 fn eventlog_to_json() {
1323 let log = EventLog::new();
1324 log.emit(EventKind::TaskStarted {
1325 verb: "infer".into(),
1326 task_id: "task1".into(),
1327 inputs: json!({}),
1328 });
1329
1330 let json = log.to_json();
1331 assert!(json.is_array());
1332 assert_eq!(json.as_array().unwrap().len(), 1);
1333 assert_eq!(json[0]["kind"]["type"], "task_started");
1334 }
1335
1336 #[test]
1337 fn eventlog_is_clone() {
1338 let log = EventLog::new();
1339 log.emit(workflow_started(1));
1340
1341 let cloned = log.clone();
1342 assert_eq!(cloned.len(), 1);
1343
1344 log.emit(EventKind::TaskStarted {
1346 verb: "infer".into(),
1347 task_id: "t1".into(),
1348 inputs: json!({}),
1349 });
1350 assert_eq!(cloned.len(), 2);
1351 }
1352
1353 #[test]
1354 fn eventlog_thread_safe_concurrent_emits() {
1355 use std::thread;
1356
1357 let log = EventLog::new();
1358
1359 let handles: Vec<_> = (0..10)
1360 .map(|i| {
1361 let log = log.clone();
1362 thread::spawn(move || {
1363 log.emit(EventKind::TaskStarted {
1364 verb: "infer".into(),
1365 task_id: Arc::from(format!("task{}", i)),
1366 inputs: json!({}),
1367 })
1368 })
1369 })
1370 .collect();
1371
1372 for h in handles {
1373 h.join().unwrap();
1374 }
1375
1376 assert_eq!(log.len(), 10);
1377
1378 let events = log.events();
1380 let mut ids: Vec<u64> = events.iter().map(|e| e.id).collect();
1381 ids.sort();
1382 ids.dedup();
1383 assert_eq!(ids.len(), 10);
1384 }
1385
1386 #[test]
1387 fn event_timestamp_is_relative() {
1388 let log = EventLog::new();
1389
1390 log.emit(workflow_started(1));
1392
1393 std::thread::sleep(std::time::Duration::from_millis(10));
1394
1395 log.emit(EventKind::TaskStarted {
1396 verb: "infer".into(),
1397 task_id: "t1".into(),
1398 inputs: json!({}),
1399 });
1400
1401 let events = log.events();
1402 assert!(events[1].timestamp_ms >= events[0].timestamp_ms);
1403 }
1404
1405 #[test]
1410 fn task_started_captures_full_context() {
1411 let log = EventLog::new();
1412
1413 let inputs = json!({
1414 "weather": "sunny",
1415 "temperature": 25,
1416 "nested": {"key": "value"}
1417 });
1418
1419 log.emit(EventKind::TaskStarted {
1420 verb: "infer".into(),
1421 task_id: "analyze".into(),
1422 inputs: inputs.clone(),
1423 });
1424
1425 let events = log.filter_task("analyze");
1426 assert_eq!(events.len(), 1);
1427
1428 if let EventKind::TaskStarted {
1429 inputs: captured, ..
1430 } = &events[0].kind
1431 {
1432 assert_eq!(captured, &inputs);
1433 assert_eq!(captured["weather"], "sunny");
1434 assert_eq!(captured["nested"]["key"], "value");
1435 } else {
1436 panic!("Expected TaskStarted event");
1437 }
1438 }
1439
1440 #[test]
1445 fn workflow_started_includes_generation_id() {
1446 let log = EventLog::new();
1447 log.emit(EventKind::WorkflowStarted {
1448 task_count: 3,
1449 generation_id: "gen-abc-123".to_string(),
1450 workflow_hash: "sha256:deadbeef".to_string(),
1451 nika_version: TEST_VERSION.to_string(),
1452 });
1453
1454 let events = log.events();
1455 if let EventKind::WorkflowStarted {
1456 generation_id,
1457 workflow_hash,
1458 nika_version,
1459 ..
1460 } = &events[0].kind
1461 {
1462 assert_eq!(generation_id, "gen-abc-123");
1463 assert_eq!(workflow_hash, "sha256:deadbeef");
1464 assert_eq!(nika_version, TEST_VERSION);
1465 } else {
1466 panic!("Expected WorkflowStarted event");
1467 }
1468 }
1469
1470 #[test]
1471 fn provider_responded_tracks_detailed_tokens() {
1472 let log = EventLog::new();
1473 log.emit(EventKind::ProviderResponded {
1474 task_id: "infer_task".into(),
1475 request_id: Some("req-xyz-789".to_string()),
1476 input_tokens: 500,
1477 output_tokens: 150,
1478 cache_read_tokens: 200,
1479 ttft_ms: Some(85),
1480 finish_reason: "stop".to_string(),
1481 cost_usd: 0.0025,
1482 });
1483
1484 let events = log.filter_task("infer_task");
1485 assert_eq!(events.len(), 1);
1486
1487 if let EventKind::ProviderResponded {
1488 request_id,
1489 input_tokens,
1490 output_tokens,
1491 cache_read_tokens,
1492 ttft_ms,
1493 finish_reason,
1494 cost_usd,
1495 ..
1496 } = &events[0].kind
1497 {
1498 assert_eq!(request_id, &Some("req-xyz-789".to_string()));
1499 assert_eq!(*input_tokens, 500);
1500 assert_eq!(*output_tokens, 150);
1501 assert_eq!(*cache_read_tokens, 200);
1502 assert_eq!(*ttft_ms, Some(85));
1503 assert_eq!(finish_reason, "stop");
1504 assert!((*cost_usd - 0.0025).abs() < f64::EPSILON);
1505 } else {
1506 panic!("Expected ProviderResponded event");
1507 }
1508 }
1509
1510 #[test]
1511 fn context_assembled_tracks_sources() {
1512 let log = EventLog::new();
1513
1514 let sources = vec![
1515 ContextSource {
1516 node: "system_prompt".to_string(),
1517 tokens: 200,
1518 },
1519 ContextSource {
1520 node: "user_input".to_string(),
1521 tokens: 50,
1522 },
1523 ContextSource {
1524 node: "examples".to_string(),
1525 tokens: 300,
1526 },
1527 ];
1528
1529 let excluded = vec![ExcludedItem {
1530 node: "large_history".to_string(),
1531 reason: "exceeded budget".to_string(),
1532 }];
1533
1534 log.emit(EventKind::ContextAssembled {
1535 task_id: "assemble_task".into(),
1536 sources: sources.clone(),
1537 excluded: excluded.clone(),
1538 total_tokens: 550,
1539 budget_used_pct: 55.0,
1540 truncated: false,
1541 });
1542
1543 let events = log.filter_task("assemble_task");
1544 assert_eq!(events.len(), 1);
1545
1546 if let EventKind::ContextAssembled {
1547 sources: s,
1548 excluded: e,
1549 total_tokens,
1550 budget_used_pct,
1551 truncated,
1552 ..
1553 } = &events[0].kind
1554 {
1555 assert_eq!(s.len(), 3);
1556 assert_eq!(s[0].node, "system_prompt");
1557 assert_eq!(s[0].tokens, 200);
1558 assert_eq!(e.len(), 1);
1559 assert_eq!(e[0].reason, "exceeded budget");
1560 assert_eq!(*total_tokens, 550);
1561 assert!((*budget_used_pct - 55.0).abs() < f32::EPSILON);
1562 assert!(!*truncated);
1563 } else {
1564 panic!("Expected ContextAssembled event");
1565 }
1566 }
1567
1568 #[test]
1569 fn context_source_and_excluded_item_serialize() {
1570 let source = ContextSource {
1571 node: "test_node".to_string(),
1572 tokens: 100,
1573 };
1574 let json = serde_json::to_value(&source).unwrap();
1575 assert_eq!(json["node"], "test_node");
1576 assert_eq!(json["tokens"], 100);
1577
1578 let excluded = ExcludedItem {
1579 node: "big_file".to_string(),
1580 reason: "too large".to_string(),
1581 };
1582 let json = serde_json::to_value(&excluded).unwrap();
1583 assert_eq!(json["node"], "big_file");
1584 assert_eq!(json["reason"], "too large");
1585 }
1586
1587 #[test]
1588 fn provider_responded_helper_creates_valid_event() {
1589 let event = provider_responded("test_task", 100, 50);
1590 assert_eq!(event.task_id(), Some("test_task"));
1591
1592 if let EventKind::ProviderResponded {
1593 input_tokens,
1594 output_tokens,
1595 ..
1596 } = event
1597 {
1598 assert_eq!(input_tokens, 100);
1599 assert_eq!(output_tokens, 50);
1600 } else {
1601 panic!("Expected ProviderResponded event");
1602 }
1603 }
1604
1605 #[test]
1610 fn agent_turn_metadata_text_only() {
1611 let metadata = AgentTurnMetadata::text_only("Hello world", "end_turn");
1612
1613 assert_eq!(metadata.response_text, "Hello world");
1614 assert_eq!(metadata.stop_reason, "end_turn");
1615 assert_eq!(metadata.input_tokens, 0);
1616 assert_eq!(metadata.output_tokens, 0);
1617 assert_eq!(metadata.cache_read_tokens, 0);
1618 assert!(!metadata.has_thinking());
1619 assert_eq!(metadata.total_tokens(), 0);
1620 }
1621
1622 #[test]
1623 fn agent_turn_metadata_with_usage() {
1624 let metadata = AgentTurnMetadata::with_usage("Response", 100, 50, "tool_use");
1625
1626 assert_eq!(metadata.response_text, "Response");
1627 assert_eq!(metadata.stop_reason, "tool_use");
1628 assert_eq!(metadata.input_tokens, 100);
1629 assert_eq!(metadata.output_tokens, 50);
1630 assert_eq!(metadata.total_tokens(), 150);
1631 assert!(!metadata.has_thinking());
1632 }
1633
1634 #[test]
1635 fn agent_turn_metadata_with_thinking() {
1636 let metadata = AgentTurnMetadata {
1637 thinking: Some("Let me think about this...".to_string()),
1638 response_text: "Here's my answer".to_string(),
1639 input_tokens: 200,
1640 output_tokens: 100,
1641 cache_read_tokens: 50,
1642 stop_reason: "end_turn".to_string(),
1643 };
1644
1645 assert!(metadata.has_thinking());
1646 assert_eq!(
1647 metadata.thinking.as_ref().unwrap(),
1648 "Let me think about this..."
1649 );
1650 assert_eq!(metadata.total_tokens(), 300);
1651 }
1652
1653 #[test]
1654 fn agent_turn_metadata_serializes() {
1655 let metadata = AgentTurnMetadata::with_usage("Test response", 100, 50, "end_turn");
1656 let json = serde_json::to_value(&metadata).unwrap();
1657
1658 assert_eq!(json["response_text"], "Test response");
1659 assert_eq!(json["input_tokens"], 100);
1660 assert_eq!(json["output_tokens"], 50);
1661 assert_eq!(json["stop_reason"], "end_turn");
1662 assert!(json.get("thinking").is_none());
1664 }
1665
1666 #[test]
1667 fn agent_turn_metadata_with_thinking_serializes() {
1668 let metadata = AgentTurnMetadata {
1669 thinking: Some("My thoughts".to_string()),
1670 response_text: "My response".to_string(),
1671 input_tokens: 50,
1672 output_tokens: 25,
1673 cache_read_tokens: 0,
1674 stop_reason: "end_turn".to_string(),
1675 };
1676 let json = serde_json::to_value(&metadata).unwrap();
1677
1678 assert_eq!(json["thinking"], "My thoughts");
1679 assert_eq!(json["response_text"], "My response");
1680 }
1681
1682 #[test]
1683 fn agent_turn_with_metadata_serializes() {
1684 let log = EventLog::new();
1685
1686 let metadata = AgentTurnMetadata::with_usage("Agent response", 100, 50, "end_turn");
1687
1688 log.emit(EventKind::AgentTurn {
1689 task_id: "agent_task".into(),
1690 turn_index: 1,
1691 kind: "end_turn".to_string(), metadata: Some(metadata),
1693 });
1694
1695 let events = log.filter_task("agent_task");
1696 assert_eq!(events.len(), 1);
1697
1698 if let EventKind::AgentTurn {
1699 metadata: Some(m), ..
1700 } = &events[0].kind
1701 {
1702 assert_eq!(m.response_text, "Agent response");
1703 assert_eq!(m.total_tokens(), 150);
1704 } else {
1705 panic!("Expected AgentTurn with metadata");
1706 }
1707 }
1708
1709 #[test]
1710 fn agent_turn_without_metadata_serializes() {
1711 let log = EventLog::new();
1712
1713 log.emit(EventKind::AgentTurn {
1714 task_id: "agent_task".into(),
1715 turn_index: 1,
1716 kind: "started".to_string(),
1717 metadata: None,
1718 });
1719
1720 let events = log.filter_task("agent_task");
1721 assert_eq!(events.len(), 1);
1722
1723 if let EventKind::AgentTurn { metadata, kind, .. } = &events[0].kind {
1724 assert!(metadata.is_none());
1725 assert_eq!(kind, "started");
1726 } else {
1727 panic!("Expected AgentTurn without metadata");
1728 }
1729 }
1730
1731 #[test]
1736 fn structured_output_attempt_success() {
1737 let log = EventLog::new();
1738
1739 log.emit(EventKind::StructuredOutputAttempt {
1740 task_id: "extract_task".into(),
1741 layer: 1,
1742 layer_name: "rig_extractor".to_string(),
1743 attempt: 1,
1744 success: true,
1745 error: None,
1746 });
1747
1748 let events = log.filter_task("extract_task");
1749 assert_eq!(events.len(), 1);
1750
1751 if let EventKind::StructuredOutputAttempt {
1752 layer,
1753 layer_name,
1754 attempt,
1755 success,
1756 error,
1757 ..
1758 } = &events[0].kind
1759 {
1760 assert_eq!(*layer, 1);
1761 assert_eq!(layer_name, "rig_extractor");
1762 assert_eq!(*attempt, 1);
1763 assert!(*success);
1764 assert!(error.is_none());
1765 } else {
1766 panic!("Expected StructuredOutputAttempt event");
1767 }
1768 }
1769
1770 #[test]
1771 fn structured_output_attempt_failure() {
1772 let log = EventLog::new();
1773
1774 log.emit(EventKind::StructuredOutputAttempt {
1775 task_id: "extract_task".into(),
1776 layer: 2,
1777 layer_name: "extract_validate".to_string(),
1778 attempt: 2,
1779 success: false,
1780 error: Some("Missing required field 'name'".to_string()),
1781 });
1782
1783 let events = log.filter_task("extract_task");
1784 assert_eq!(events.len(), 1);
1785
1786 if let EventKind::StructuredOutputAttempt {
1787 layer,
1788 layer_name,
1789 attempt,
1790 success,
1791 error,
1792 ..
1793 } = &events[0].kind
1794 {
1795 assert_eq!(*layer, 2);
1796 assert_eq!(layer_name, "extract_validate");
1797 assert_eq!(*attempt, 2);
1798 assert!(!*success);
1799 assert_eq!(error.as_ref().unwrap(), "Missing required field 'name'");
1800 } else {
1801 panic!("Expected StructuredOutputAttempt event");
1802 }
1803 }
1804
1805 #[test]
1806 fn structured_output_success_event() {
1807 let log = EventLog::new();
1808
1809 log.emit(EventKind::StructuredOutputSuccess {
1810 task_id: "extract_task".into(),
1811 layer: 3,
1812 layer_name: "retry_with_feedback".to_string(),
1813 total_attempts: 4,
1814 });
1815
1816 let events = log.filter_task("extract_task");
1817 assert_eq!(events.len(), 1);
1818
1819 if let EventKind::StructuredOutputSuccess {
1820 layer,
1821 layer_name,
1822 total_attempts,
1823 ..
1824 } = &events[0].kind
1825 {
1826 assert_eq!(*layer, 3);
1827 assert_eq!(layer_name, "retry_with_feedback");
1828 assert_eq!(*total_attempts, 4);
1829 } else {
1830 panic!("Expected StructuredOutputSuccess event");
1831 }
1832 }
1833
1834 #[test]
1835 fn structured_output_attempt_serializes() {
1836 let event = EventKind::StructuredOutputAttempt {
1837 task_id: "task1".into(),
1838 layer: 1,
1839 layer_name: "rig_extractor".to_string(),
1840 attempt: 1,
1841 success: true,
1842 error: None,
1843 };
1844
1845 let json = serde_json::to_value(&event).unwrap();
1846 assert_eq!(json["type"], "structured_output_attempt");
1847 assert_eq!(json["task_id"], "task1");
1848 assert_eq!(json["layer"], 1);
1849 assert_eq!(json["layer_name"], "rig_extractor");
1850 assert_eq!(json["attempt"], 1);
1851 assert_eq!(json["success"], true);
1852 assert!(json.get("error").is_none());
1854 }
1855
1856 #[test]
1857 fn structured_output_attempt_with_error_serializes() {
1858 let event = EventKind::StructuredOutputAttempt {
1859 task_id: "task1".into(),
1860 layer: 4,
1861 layer_name: "llm_repair".to_string(),
1862 attempt: 1,
1863 success: false,
1864 error: Some("Repair failed".to_string()),
1865 };
1866
1867 let json = serde_json::to_value(&event).unwrap();
1868 assert_eq!(json["type"], "structured_output_attempt");
1869 assert_eq!(json["layer"], 4);
1870 assert_eq!(json["layer_name"], "llm_repair");
1871 assert_eq!(json["success"], false);
1872 assert_eq!(json["error"], "Repair failed");
1873 }
1874
1875 #[test]
1876 fn structured_output_success_serializes() {
1877 let event = EventKind::StructuredOutputSuccess {
1878 task_id: "task1".into(),
1879 layer: 2,
1880 layer_name: "extract_validate".to_string(),
1881 total_attempts: 3,
1882 };
1883
1884 let json = serde_json::to_value(&event).unwrap();
1885 assert_eq!(json["type"], "structured_output_success");
1886 assert_eq!(json["task_id"], "task1");
1887 assert_eq!(json["layer"], 2);
1888 assert_eq!(json["layer_name"], "extract_validate");
1889 assert_eq!(json["total_attempts"], 3);
1890 }
1891
1892 #[test]
1893 fn structured_output_events_task_id_extraction() {
1894 let attempt = EventKind::StructuredOutputAttempt {
1895 task_id: "extract1".into(),
1896 layer: 1,
1897 layer_name: "rig_extractor".to_string(),
1898 attempt: 1,
1899 success: true,
1900 error: None,
1901 };
1902 assert_eq!(attempt.task_id(), Some("extract1"));
1903
1904 let success = EventKind::StructuredOutputSuccess {
1905 task_id: "extract2".into(),
1906 layer: 2,
1907 layer_name: "extract_validate".to_string(),
1908 total_attempts: 1,
1909 };
1910 assert_eq!(success.task_id(), Some("extract2"));
1911 }
1912
1913 #[test]
1914 fn structured_output_full_workflow() {
1915 let log = EventLog::new();
1917
1918 log.emit(EventKind::StructuredOutputAttempt {
1920 task_id: "parse_json".into(),
1921 layer: 1,
1922 layer_name: "rig_extractor".to_string(),
1923 attempt: 1,
1924 success: false,
1925 error: Some("JSON parse error".to_string()),
1926 });
1927
1928 log.emit(EventKind::StructuredOutputAttempt {
1930 task_id: "parse_json".into(),
1931 layer: 2,
1932 layer_name: "extract_validate".to_string(),
1933 attempt: 1,
1934 success: false,
1935 error: Some("Schema validation failed".to_string()),
1936 });
1937
1938 log.emit(EventKind::StructuredOutputAttempt {
1940 task_id: "parse_json".into(),
1941 layer: 3,
1942 layer_name: "retry_with_feedback".to_string(),
1943 attempt: 1,
1944 success: false,
1945 error: Some("Still invalid".to_string()),
1946 });
1947
1948 log.emit(EventKind::StructuredOutputAttempt {
1950 task_id: "parse_json".into(),
1951 layer: 3,
1952 layer_name: "retry_with_feedback".to_string(),
1953 attempt: 2,
1954 success: true,
1955 error: None,
1956 });
1957
1958 log.emit(EventKind::StructuredOutputSuccess {
1960 task_id: "parse_json".into(),
1961 layer: 3,
1962 layer_name: "retry_with_feedback".to_string(),
1963 total_attempts: 4,
1964 });
1965
1966 let events = log.filter_task("parse_json");
1967 assert_eq!(events.len(), 5);
1968
1969 let attempts: Vec<_> = events
1971 .iter()
1972 .filter_map(|e| {
1973 if let EventKind::StructuredOutputAttempt {
1974 layer,
1975 attempt,
1976 success,
1977 ..
1978 } = &e.kind
1979 {
1980 Some((*layer, *attempt, *success))
1981 } else {
1982 None
1983 }
1984 })
1985 .collect();
1986
1987 assert_eq!(
1988 attempts,
1989 vec![
1990 (1, 1, false), (2, 1, false), (3, 1, false), (3, 2, true), ]
1995 );
1996
1997 if let EventKind::StructuredOutputSuccess {
1999 layer,
2000 total_attempts,
2001 ..
2002 } = &events[4].kind
2003 {
2004 assert_eq!(*layer, 3);
2005 assert_eq!(*total_attempts, 4);
2006 } else {
2007 panic!("Expected StructuredOutputSuccess as last event");
2008 }
2009 }
2010
2011 #[test]
2016 fn guardrail_passed_event() {
2017 let log = EventLog::new();
2018
2019 log.emit(EventKind::GuardrailPassed {
2020 task_id: "agent_task".into(),
2021 guardrail_type: "length".to_string(),
2022 description: "min_words: 10".to_string(),
2023 });
2024
2025 let events = log.filter_task("agent_task");
2026 assert_eq!(events.len(), 1);
2027
2028 if let EventKind::GuardrailPassed {
2029 guardrail_type,
2030 description,
2031 ..
2032 } = &events[0].kind
2033 {
2034 assert_eq!(guardrail_type, "length");
2035 assert_eq!(description, "min_words: 10");
2036 } else {
2037 panic!("Expected GuardrailPassed event");
2038 }
2039 }
2040
2041 #[test]
2042 fn guardrail_failed_event() {
2043 let log = EventLog::new();
2044
2045 log.emit(EventKind::GuardrailFailed {
2046 task_id: "agent_task".into(),
2047 guardrail_type: "regex".to_string(),
2048 description: "must_contain_email".to_string(),
2049 message: "Output does not match pattern: [a-z]+@[a-z]+\\.[a-z]+".to_string(),
2050 });
2051
2052 let events = log.filter_task("agent_task");
2053 assert_eq!(events.len(), 1);
2054
2055 if let EventKind::GuardrailFailed {
2056 guardrail_type,
2057 description,
2058 message,
2059 ..
2060 } = &events[0].kind
2061 {
2062 assert_eq!(guardrail_type, "regex");
2063 assert_eq!(description, "must_contain_email");
2064 assert!(message.contains("does not match pattern"));
2065 } else {
2066 panic!("Expected GuardrailFailed event");
2067 }
2068 }
2069
2070 #[test]
2071 fn guardrail_passed_serializes() {
2072 let event = EventKind::GuardrailPassed {
2073 task_id: "task1".into(),
2074 guardrail_type: "schema".to_string(),
2075 description: "output_schema".to_string(),
2076 };
2077
2078 let json = serde_json::to_value(&event).unwrap();
2079 assert_eq!(json["type"], "guardrail_passed");
2080 assert_eq!(json["task_id"], "task1");
2081 assert_eq!(json["guardrail_type"], "schema");
2082 assert_eq!(json["description"], "output_schema");
2083 }
2084
2085 #[test]
2086 fn guardrail_failed_serializes() {
2087 let event = EventKind::GuardrailFailed {
2088 task_id: "task1".into(),
2089 guardrail_type: "length".to_string(),
2090 description: "max_chars: 100".to_string(),
2091 message: "Output has 150 chars, max is 100".to_string(),
2092 };
2093
2094 let json = serde_json::to_value(&event).unwrap();
2095 assert_eq!(json["type"], "guardrail_failed");
2096 assert_eq!(json["task_id"], "task1");
2097 assert_eq!(json["guardrail_type"], "length");
2098 assert_eq!(json["description"], "max_chars: 100");
2099 assert_eq!(json["message"], "Output has 150 chars, max is 100");
2100 }
2101
2102 #[test]
2103 fn guardrail_events_task_id_extraction() {
2104 let passed = EventKind::GuardrailPassed {
2105 task_id: "guard1".into(),
2106 guardrail_type: "length".to_string(),
2107 description: "min_words: 5".to_string(),
2108 };
2109 assert_eq!(passed.task_id(), Some("guard1"));
2110
2111 let failed = EventKind::GuardrailFailed {
2112 task_id: "guard2".into(),
2113 guardrail_type: "regex".to_string(),
2114 description: "pattern".to_string(),
2115 message: "No match".to_string(),
2116 };
2117 assert_eq!(failed.task_id(), Some("guard2"));
2118 }
2119
2120 #[test]
2121 fn guardrail_events_full_workflow() {
2122 let log = EventLog::new();
2124
2125 log.emit(EventKind::GuardrailPassed {
2127 task_id: "validate_output".into(),
2128 guardrail_type: "length".to_string(),
2129 description: "min_words: 10, max_words: 100".to_string(),
2130 });
2131
2132 log.emit(EventKind::GuardrailFailed {
2134 task_id: "validate_output".into(),
2135 guardrail_type: "schema".to_string(),
2136 description: "json_schema".to_string(),
2137 message: "Missing required field: 'title'".to_string(),
2138 });
2139
2140 log.emit(EventKind::GuardrailPassed {
2142 task_id: "validate_output".into(),
2143 guardrail_type: "regex".to_string(),
2144 description: "contains_email".to_string(),
2145 });
2146
2147 let events = log.filter_task("validate_output");
2148 assert_eq!(events.len(), 3);
2149
2150 let passed_count = events
2152 .iter()
2153 .filter(|e| matches!(&e.kind, EventKind::GuardrailPassed { .. }))
2154 .count();
2155 let failed_count = events
2156 .iter()
2157 .filter(|e| matches!(&e.kind, EventKind::GuardrailFailed { .. }))
2158 .count();
2159
2160 assert_eq!(passed_count, 2);
2161 assert_eq!(failed_count, 1);
2162 }
2163
2164 #[test]
2165 fn guardrail_escalation_event() {
2166 let log = EventLog::new();
2167
2168 log.emit(EventKind::GuardrailEscalation {
2169 task_id: "agent_task".into(),
2170 guardrail_type: "llm".to_string(),
2171 guardrail_id: "content_safety".to_string(),
2172 message: "Content may be inappropriate for the target audience".to_string(),
2173 severity: "high".to_string(),
2174 suggested_action: Some("Review output before publishing".to_string()),
2175 });
2176
2177 let events = log.filter_task("agent_task");
2178 assert_eq!(events.len(), 1);
2179
2180 if let EventKind::GuardrailEscalation {
2181 guardrail_type,
2182 guardrail_id,
2183 message,
2184 severity,
2185 suggested_action,
2186 ..
2187 } = &events[0].kind
2188 {
2189 assert_eq!(guardrail_type, "llm");
2190 assert_eq!(guardrail_id, "content_safety");
2191 assert!(message.contains("inappropriate"));
2192 assert_eq!(severity, "high");
2193 assert!(suggested_action.is_some());
2194 } else {
2195 panic!("Expected GuardrailEscalation event");
2196 }
2197 }
2198
2199 #[test]
2200 fn guardrail_escalation_serializes() {
2201 let event = EventKind::GuardrailEscalation {
2202 task_id: "task1".into(),
2203 guardrail_type: "llm".to_string(),
2204 guardrail_id: "safety_check".to_string(),
2205 message: "Safety violation detected".to_string(),
2206 severity: "critical".to_string(),
2207 suggested_action: None,
2208 };
2209
2210 let json = serde_json::to_value(&event).unwrap();
2211 assert_eq!(json["type"], "guardrail_escalation");
2212 assert_eq!(json["task_id"], "task1");
2213 assert_eq!(json["guardrail_type"], "llm");
2214 assert_eq!(json["guardrail_id"], "safety_check");
2215 assert_eq!(json["severity"], "critical");
2216 }
2217
2218 #[test]
2219 fn guardrail_escalation_task_id_extraction() {
2220 let escalation = EventKind::GuardrailEscalation {
2221 task_id: "esc1".into(),
2222 guardrail_type: "llm".to_string(),
2223 guardrail_id: "quality".to_string(),
2224 message: "Quality below threshold".to_string(),
2225 severity: "medium".to_string(),
2226 suggested_action: None,
2227 };
2228 assert_eq!(escalation.task_id(), Some("esc1"));
2229 }
2230
2231 fn all_38_variants() -> Vec<EventKind> {
2237 vec![
2238 EventKind::WorkflowStarted {
2240 task_count: 3,
2241 generation_id: "gen-1".into(),
2242 workflow_hash: "abc123".into(),
2243 nika_version: env!("CARGO_PKG_VERSION").into(),
2244 },
2245 EventKind::WorkflowCompleted {
2246 final_output: Arc::new(serde_json::json!({"result": "ok"})),
2247 total_duration_ms: 1234,
2248 },
2249 EventKind::WorkflowFailed {
2250 error: "boom".into(),
2251 failed_task: Some("task1".into()),
2252 },
2253 EventKind::WorkflowAborted {
2254 reason: "timeout".into(),
2255 duration_ms: 500,
2256 running_tasks: vec!["t1".into(), "t2".into()],
2257 },
2258 EventKind::WorkflowPaused,
2259 EventKind::WorkflowResumed,
2260 EventKind::TaskScheduled {
2262 task_id: "t1".into(),
2263 dependencies: vec!["t0".into()],
2264 },
2265 EventKind::TaskStarted {
2266 task_id: "t1".into(),
2267 verb: "infer".into(),
2268 inputs: serde_json::json!({}),
2269 },
2270 EventKind::TaskCompleted {
2271 task_id: "t1".into(),
2272 output: Arc::new(serde_json::json!("done")),
2273 duration_ms: 100,
2274 },
2275 EventKind::TaskFailed {
2276 task_id: "t1".into(),
2277 error: "fail".into(),
2278 duration_ms: 50,
2279 error_code: None,
2280 },
2281 EventKind::TemplateResolved {
2283 task_id: "t1".into(),
2284 template: "{{with.x}}".into(),
2285 result: "hello".into(),
2286 },
2287 EventKind::ProviderCalled {
2288 task_id: "t1".into(),
2289 provider: "anthropic".into(),
2290 model: "claude-3-haiku".into(),
2291 prompt_len: 42,
2292 },
2293 EventKind::ProviderResponded {
2294 task_id: "t1".into(),
2295 request_id: Some("req-abc".into()),
2296 input_tokens: 100,
2297 output_tokens: 50,
2298 cache_read_tokens: 10,
2299 ttft_ms: Some(200),
2300 finish_reason: "end_turn".into(),
2301 cost_usd: 0.001,
2302 },
2303 EventKind::ContextAssembled {
2305 task_id: "t1".into(),
2306 sources: vec![ContextSource {
2307 node: "entity-1".into(),
2308 tokens: 500,
2309 }],
2310 excluded: vec![ExcludedItem {
2311 node: "entity-2".into(),
2312 reason: "over budget".into(),
2313 }],
2314 total_tokens: 500,
2315 budget_used_pct: 0.75,
2316 truncated: false,
2317 },
2318 EventKind::McpInvoke {
2320 task_id: "t1".into(),
2321 call_id: "call-1".into(),
2322 mcp_server: "novanet".into(),
2323 tool: Some("novanet_search".into()),
2324 resource: None,
2325 params: Some(serde_json::json!({"query": "test"})),
2326 },
2327 EventKind::McpResponse {
2328 task_id: "t1".into(),
2329 call_id: "call-1".into(),
2330 output_len: 256,
2331 duration_ms: 80,
2332 cached: false,
2333 is_error: false,
2334 response: Some(serde_json::json!({"found": 3})),
2335 },
2336 EventKind::McpConnected {
2337 server_name: "novanet".into(),
2338 },
2339 EventKind::McpError {
2340 server_name: "novanet".into(),
2341 error: "connection refused".into(),
2342 },
2343 EventKind::McpRetry {
2344 task_id: "t1".into(),
2345 server_name: "novanet".into(),
2346 operation: "novanet_search".into(),
2347 attempt: 2,
2348 max_attempts: 3,
2349 error: "timeout".into(),
2350 },
2351 EventKind::AgentStart {
2353 task_id: "agent1".into(),
2354 max_turns: 10,
2355 mcp_servers: vec!["novanet".into()],
2356 },
2357 EventKind::AgentTurn {
2358 task_id: "agent1".into(),
2359 turn_index: 1,
2360 kind: "started".into(),
2361 metadata: Some(AgentTurnMetadata {
2362 thinking: Some("Let me think...".into()),
2363 response_text: "Here is my response".into(),
2364 input_tokens: 100,
2365 output_tokens: 50,
2366 cache_read_tokens: 0,
2367 stop_reason: "end_turn".into(),
2368 }),
2369 },
2370 EventKind::AgentComplete {
2371 task_id: "agent1".into(),
2372 turns: 3,
2373 stop_reason: "natural_completion".into(),
2374 },
2375 EventKind::AgentSpawned {
2376 parent_task_id: "agent1".into(),
2377 child_task_id: "sub-agent1".into(),
2378 depth: 1,
2379 },
2380 EventKind::GuardrailPassed {
2382 task_id: "t1".into(),
2383 guardrail_type: "length".into(),
2384 description: "max 1000 chars".into(),
2385 },
2386 EventKind::GuardrailFailed {
2387 task_id: "t1".into(),
2388 guardrail_type: "schema".into(),
2389 description: "JSON schema".into(),
2390 message: "missing field 'title'".into(),
2391 },
2392 EventKind::GuardrailEscalation {
2393 task_id: "t1".into(),
2394 guardrail_type: "llm".into(),
2395 guardrail_id: "safety-check".into(),
2396 message: "content flagged".into(),
2397 severity: "high".into(),
2398 suggested_action: Some("human review".into()),
2399 },
2400 EventKind::Log {
2402 level: "info".into(),
2403 message: "step completed".into(),
2404 task_id: Some("t1".into()),
2405 },
2406 EventKind::Custom {
2407 name: "my_event".into(),
2408 payload: serde_json::json!({"key": "val"}),
2409 task_id: None,
2410 },
2411 EventKind::ArtifactWritten {
2413 task_id: "t1".into(),
2414 path: "/tmp/output.json".into(),
2415 size: 1024,
2416 format: "json".into(),
2417 checksum: None,
2418 },
2419 EventKind::ArtifactFailed {
2420 task_id: "t1".into(),
2421 path: "/tmp/output.json".into(),
2422 reason: "permission denied".into(),
2423 },
2424 EventKind::MediaExtracted {
2426 task_id: "gen_img".into(),
2427 block_count: 2,
2428 content_types: vec!["image".into(), "audio".into()],
2429 },
2430 EventKind::MediaProcessed {
2431 task_id: "gen_img".into(),
2432 hash: "blake3:a1b2c3d4".into(),
2433 mime_type: "image/png".into(),
2434 size_bytes: 4096,
2435 },
2436 EventKind::MediaStored {
2437 task_id: "gen_img".into(),
2438 hash: "blake3:a1b2c3d4".into(),
2439 path: ".nika/media/store/a1/b2c3d4".into(),
2440 size_bytes: 4096,
2441 verified: true,
2442 deduplicated: false,
2443 pipeline_ms: 42,
2444 },
2445 EventKind::MediaStoreFailed {
2446 task_id: "gen_img".into(),
2447 hash: "blake3:a1b2c3d4".into(),
2448 reason: "disk full".into(),
2449 },
2450 EventKind::StructuredOutputAttempt {
2452 task_id: "t1".into(),
2453 layer: 1,
2454 layer_name: "rig_extractor".into(),
2455 attempt: 1,
2456 success: true,
2457 error: None,
2458 },
2459 EventKind::StructuredOutputSuccess {
2460 task_id: "t1".into(),
2461 layer: 1,
2462 layer_name: "rig_extractor".into(),
2463 total_attempts: 1,
2464 },
2465 EventKind::MediaCleanup {
2467 removed: 5,
2468 bytes_freed: 10240,
2469 dry_run: false,
2470 },
2471 EventKind::MediaIntegrityCheck {
2473 checked: 10,
2474 warnings: 0,
2475 },
2476 ]
2477 }
2478
2479 #[test]
2480 fn wave2_variant_count_is_38() {
2481 let variants = all_38_variants();
2482 assert_eq!(
2483 variants.len(),
2484 38,
2485 "EventKind should have exactly 38 variants"
2486 );
2487 }
2488
2489 #[test]
2490 fn wave2_all_variants_serialize_deserialize_roundtrip() {
2491 for (i, variant) in all_38_variants().into_iter().enumerate() {
2492 let json = serde_json::to_string(&variant)
2493 .unwrap_or_else(|e| panic!("variant {i} failed to serialize: {e}"));
2494 let back: EventKind = serde_json::from_str(&json)
2495 .unwrap_or_else(|e| panic!("variant {i} failed to deserialize: {e}\nJSON: {json}"));
2496 assert_eq!(variant, back, "variant {i} round-trip mismatch");
2497 }
2498 }
2499
2500 #[test]
2501 fn wave2_ndjson_no_embedded_newlines() {
2502 for (i, variant) in all_38_variants().into_iter().enumerate() {
2504 let json = serde_json::to_string(&variant).unwrap();
2505 assert!(
2506 !json.contains('\n'),
2507 "variant {i} contains embedded newline in JSON: {json}"
2508 );
2509 }
2510 }
2511
2512 #[test]
2513 fn wave2_full_event_envelope_ndjson_valid() {
2514 let log = EventLog::new();
2515 log.emit(EventKind::WorkflowStarted {
2516 task_count: 1,
2517 generation_id: "g1".into(),
2518 workflow_hash: "h1".into(),
2519 nika_version: env!("CARGO_PKG_VERSION").into(),
2520 });
2521 let events = log.events();
2522 let event = &events[0];
2523 let json = serde_json::to_string(event).unwrap();
2524 assert!(!json.contains('\n'));
2525 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
2527 assert!(parsed.get("id").is_some());
2528 assert!(parsed.get("timestamp_ms").is_some());
2529 assert!(parsed.get("kind").is_some());
2530 }
2531
2532 #[test]
2533 fn wave2_event_ids_monotonic_under_contention() {
2534 use std::thread;
2535 let log = EventLog::new();
2536 let threads: Vec<_> = (0..20)
2537 .map(|_| {
2538 let log = log.clone();
2539 thread::spawn(move || {
2540 for _ in 0..50 {
2541 log.emit(EventKind::WorkflowPaused);
2542 }
2543 })
2544 })
2545 .collect();
2546 for t in threads {
2547 t.join().unwrap();
2548 }
2549 let events = log.events();
2550 assert_eq!(events.len(), 1000);
2551 let mut ids: Vec<u64> = events.iter().map(|e| e.id).collect();
2553 ids.sort();
2554 ids.dedup();
2555 assert_eq!(ids.len(), 1000, "IDs must be unique");
2556 assert_eq!(ids[0], 0);
2557 assert_eq!(ids[999], 999);
2558 }
2559
2560 #[test]
2561 fn wave2_timestamps_monotonically_nondecreasing() {
2562 let log = EventLog::new();
2563 for _ in 0..100 {
2564 log.emit(EventKind::WorkflowPaused);
2565 }
2566 let events = log.events();
2567 for window in events.windows(2) {
2568 assert!(
2569 window[1].timestamp_ms >= window[0].timestamp_ms,
2570 "Timestamps must be monotonically non-decreasing"
2571 );
2572 }
2573 }
2574
2575 #[test]
2576 fn wave2_timestamps_are_relative_not_epoch() {
2577 let log = EventLog::new();
2578 log.emit(EventKind::WorkflowPaused);
2579 let events = log.events();
2580 assert!(
2582 events[0].timestamp_ms < 1000,
2583 "First event timestamp {} should be < 1s (relative to start)",
2584 events[0].timestamp_ms
2585 );
2586 }
2587
2588 #[test]
2589 fn wave2_broadcast_channel_lagged_on_overflow() {
2590 let (log, mut rx) = EventLog::new_with_broadcast();
2593
2594 for _ in 0..600 {
2596 log.emit(EventKind::WorkflowPaused);
2597 }
2598
2599 let mut lagged = false;
2601 loop {
2602 match rx.try_recv() {
2603 Ok(_) => {}
2604 Err(broadcast::error::TryRecvError::Lagged(_n)) => {
2605 lagged = true;
2606 while rx.try_recv().is_ok() {}
2608 break;
2609 }
2610 Err(broadcast::error::TryRecvError::Empty) => break,
2611 Err(broadcast::error::TryRecvError::Closed) => break,
2612 }
2613 }
2614 assert!(
2616 lagged,
2617 "Expected broadcast lag when emitting 600 events into 512 capacity channel"
2618 );
2619 assert_eq!(log.events().len(), 600);
2621 }
2622
2623 #[test]
2626 fn wave2_guardrail_escalation_serialization() {
2627 let variant = EventKind::GuardrailEscalation {
2630 task_id: "t1".into(),
2631 guardrail_type: "llm".into(),
2632 guardrail_id: "check-1".into(),
2633 message: "flagged".into(),
2634 severity: "high".into(),
2635 suggested_action: None,
2636 };
2637 let json = serde_json::to_string(&variant).unwrap();
2639 assert!(json.contains("guardrail_escalation"));
2640 }
2641
2642 #[test]
2643 fn wave2_optional_fields_serialized_as_null_when_none() {
2644 let variant = EventKind::ProviderResponded {
2647 task_id: "t1".into(),
2648 request_id: None,
2649 input_tokens: 100,
2650 output_tokens: 50,
2651 cache_read_tokens: 0,
2652 ttft_ms: None,
2653 finish_reason: "end_turn".into(),
2654 cost_usd: 0.0,
2655 };
2656 let json = serde_json::to_string(&variant).unwrap();
2657 assert!(
2659 json.contains("\"request_id\":null"),
2660 "None should serialize as null: {json}"
2661 );
2662 assert!(
2663 json.contains("\"ttft_ms\":null"),
2664 "None should serialize as null: {json}"
2665 );
2666 }
2667
2668 #[test]
2669 fn wave2_skip_serializing_if_omits_none_fields() {
2670 let variant = EventKind::GuardrailEscalation {
2672 task_id: "t1".into(),
2673 guardrail_type: "llm".into(),
2674 guardrail_id: "check".into(),
2675 message: "flagged".into(),
2676 severity: "high".into(),
2677 suggested_action: None, };
2679 let json = serde_json::to_string(&variant).unwrap();
2680 assert!(
2681 !json.contains("suggested_action"),
2682 "skip_serializing_if should omit None: {json}"
2683 );
2684 }
2685
2686 #[test]
2687 fn wave2_optional_fields_present_when_some() {
2688 let variant = EventKind::ProviderResponded {
2689 task_id: "t1".into(),
2690 request_id: Some("req-1".into()),
2691 input_tokens: 100,
2692 output_tokens: 50,
2693 cache_read_tokens: 0,
2694 ttft_ms: Some(150),
2695 finish_reason: "end_turn".into(),
2696 cost_usd: 0.001,
2697 };
2698 let json = serde_json::to_string(&variant).unwrap();
2699 assert!(
2700 json.contains("\"request_id\":\"req-1\""),
2701 "Some fields should contain value: {json}"
2702 );
2703 assert!(
2704 json.contains("\"ttft_ms\":150"),
2705 "Some fields should contain value: {json}"
2706 );
2707 }
2708
2709 #[test]
2710 fn wave2_task_id_extraction_all_variants() {
2711 let variants = all_38_variants();
2712 let with_task_id: Vec<_> = variants.iter().filter(|v| v.task_id().is_some()).collect();
2714 let without_task_id: Vec<_> = variants.iter().filter(|v| v.task_id().is_none()).collect();
2716
2717 assert_eq!(
2720 with_task_id.len(),
2721 27,
2722 "27 variants should have task_id (including Log with Some, 4 media events)"
2723 );
2724 assert_eq!(
2725 without_task_id.len(),
2726 11,
2727 "11 variants should lack task_id (workflow-level + McpConnected + McpError + Custom with None + MediaCleanup + MediaIntegrityCheck)"
2728 );
2729 }
2730
2731 #[test]
2732 fn wave2_workflow_completed_wraps_json_as_string() {
2733 let inner = serde_json::json!({"key": "value"});
2737 let variant = EventKind::WorkflowCompleted {
2738 final_output: Arc::new(inner.clone()),
2739 total_duration_ms: 100,
2740 };
2741 let json = serde_json::to_string(&variant).unwrap();
2742 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
2743 let output = &parsed["final_output"];
2744 assert!(
2746 output.is_object(),
2747 "final_output should be a JSON object, not a string: {output}"
2748 );
2749 }
2750
2751 #[test]
2752 fn wave2_cloned_eventlog_shares_events() {
2753 let log1 = EventLog::new();
2754 let log2 = log1.clone();
2755 log1.emit(EventKind::WorkflowPaused);
2756 log2.emit(EventKind::WorkflowResumed);
2757 assert_eq!(log1.events().len(), 2);
2759 assert_eq!(log2.events().len(), 2);
2760 }
2761
2762 #[test]
2763 fn wave2_serde_tag_is_snake_case() {
2764 let variant = EventKind::WorkflowStarted {
2766 task_count: 1,
2767 generation_id: "g".into(),
2768 workflow_hash: "h".into(),
2769 nika_version: "v".into(),
2770 };
2771 let json = serde_json::to_string(&variant).unwrap();
2772 assert!(
2773 json.contains("\"type\":\"workflow_started\""),
2774 "Tag should be snake_case: {json}"
2775 );
2776
2777 let variant2 = EventKind::McpRetry {
2778 task_id: "t".into(),
2779 server_name: "s".into(),
2780 operation: "op".into(),
2781 attempt: 1,
2782 max_attempts: 3,
2783 error: "e".into(),
2784 };
2785 let json2 = serde_json::to_string(&variant2).unwrap();
2786 assert!(
2787 json2.contains("\"type\":\"mcp_retry\""),
2788 "Tag should be snake_case: {json2}"
2789 );
2790 }
2791
2792 #[test]
2793 fn wave2_is_workflow_event_correct() {
2794 let workflow_events = vec![
2795 EventKind::WorkflowStarted {
2796 task_count: 1,
2797 generation_id: "g".into(),
2798 workflow_hash: "h".into(),
2799 nika_version: "v".into(),
2800 },
2801 EventKind::WorkflowCompleted {
2802 final_output: Arc::new(serde_json::json!(null)),
2803 total_duration_ms: 0,
2804 },
2805 EventKind::WorkflowFailed {
2806 error: "e".into(),
2807 failed_task: None,
2808 },
2809 EventKind::WorkflowAborted {
2810 reason: "r".into(),
2811 duration_ms: 0,
2812 running_tasks: vec![],
2813 },
2814 EventKind::WorkflowPaused,
2815 EventKind::WorkflowResumed,
2816 ];
2817 for wf in &workflow_events {
2818 assert!(wf.is_workflow_event(), "{:?} should be workflow event", wf);
2819 }
2820 let non_wf = EventKind::TaskStarted {
2822 task_id: "t".into(),
2823 verb: "infer".into(),
2824 inputs: serde_json::json!({}),
2825 };
2826 assert!(!non_wf.is_workflow_event());
2827 }
2828
2829 const BLAKE3_PNG: &str =
2837 "blake3:a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a";
2838 const BLAKE3_WAV: &str =
2839 "blake3:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
2840 const BLAKE3_PDF: &str =
2841 "blake3:6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b";
2842
2843 fn media_extracted(task_id: &str, block_count: u32, types: &[&str]) -> EventKind {
2844 EventKind::MediaExtracted {
2845 task_id: Arc::from(task_id),
2846 block_count,
2847 content_types: types.iter().map(|s| s.to_string()).collect(),
2848 }
2849 }
2850
2851 fn media_processed(task_id: &str, hash: &str, mime: &str, size: u64) -> EventKind {
2852 EventKind::MediaProcessed {
2853 task_id: Arc::from(task_id),
2854 hash: hash.to_string(),
2855 mime_type: mime.to_string(),
2856 size_bytes: size,
2857 }
2858 }
2859
2860 fn media_stored(
2861 task_id: &str,
2862 hash: &str,
2863 path: &str,
2864 size: u64,
2865 verified: bool,
2866 deduplicated: bool,
2867 pipeline_ms: u64,
2868 ) -> EventKind {
2869 EventKind::MediaStored {
2870 task_id: Arc::from(task_id),
2871 hash: hash.to_string(),
2872 path: path.to_string(),
2873 size_bytes: size,
2874 verified,
2875 deduplicated,
2876 pipeline_ms,
2877 }
2878 }
2879
2880 fn media_store_failed(task_id: &str, hash: &str, reason: &str) -> EventKind {
2881 EventKind::MediaStoreFailed {
2882 task_id: Arc::from(task_id),
2883 hash: hash.to_string(),
2884 reason: reason.to_string(),
2885 }
2886 }
2887
2888 #[test]
2893 fn media_extracted_serde_roundtrip_single_type() {
2894 let event = media_extracted("gen_logo", 1, &["image"]);
2895 let json = serde_json::to_string(&event).unwrap();
2896 let back: EventKind = serde_json::from_str(&json).unwrap();
2897 assert_eq!(event, back);
2898 }
2899
2900 #[test]
2901 fn media_extracted_serde_roundtrip_multiple_types() {
2902 let event = media_extracted("gen_multi", 4, &["image", "audio", "video", "application"]);
2903 let json = serde_json::to_string(&event).unwrap();
2904 let back: EventKind = serde_json::from_str(&json).unwrap();
2905 assert_eq!(event, back);
2906
2907 if let EventKind::MediaExtracted {
2909 block_count,
2910 content_types,
2911 ..
2912 } = &back
2913 {
2914 assert_eq!(*block_count, 4);
2915 assert_eq!(content_types.len(), 4);
2916 assert_eq!(content_types[0], "image");
2917 assert_eq!(content_types[3], "application");
2918 } else {
2919 panic!("Expected MediaExtracted");
2920 }
2921 }
2922
2923 #[test]
2924 fn media_extracted_serde_roundtrip_empty_types() {
2925 let event = media_extracted("empty_task", 0, &[]);
2927 let json = serde_json::to_string(&event).unwrap();
2928 let back: EventKind = serde_json::from_str(&json).unwrap();
2929 assert_eq!(event, back);
2930 }
2931
2932 #[test]
2933 fn media_processed_serde_roundtrip_png() {
2934 let event = media_processed("gen_img", BLAKE3_PNG, "image/png", 65536);
2935 let json = serde_json::to_string(&event).unwrap();
2936 let back: EventKind = serde_json::from_str(&json).unwrap();
2937 assert_eq!(event, back);
2938
2939 if let EventKind::MediaProcessed {
2940 hash,
2941 mime_type,
2942 size_bytes,
2943 ..
2944 } = &back
2945 {
2946 assert!(hash.starts_with("blake3:"));
2947 assert_eq!(hash.len(), "blake3:".len() + 64); assert_eq!(mime_type, "image/png");
2949 assert_eq!(*size_bytes, 65536);
2950 } else {
2951 panic!("Expected MediaProcessed");
2952 }
2953 }
2954
2955 #[test]
2956 fn media_processed_serde_roundtrip_wav() {
2957 let event = media_processed("gen_audio", BLAKE3_WAV, "audio/wav", 1_048_576);
2958 let json = serde_json::to_string(&event).unwrap();
2959 let back: EventKind = serde_json::from_str(&json).unwrap();
2960 assert_eq!(event, back);
2961 }
2962
2963 #[test]
2964 fn media_processed_serde_roundtrip_pdf() {
2965 let event = media_processed("gen_doc", BLAKE3_PDF, "application/pdf", 204800);
2966 let json = serde_json::to_string(&event).unwrap();
2967 let back: EventKind = serde_json::from_str(&json).unwrap();
2968 assert_eq!(event, back);
2969 }
2970
2971 #[test]
2972 fn media_stored_serde_roundtrip_all_fields() {
2973 let event = media_stored(
2974 "gen_img",
2975 BLAKE3_PNG,
2976 ".nika/media/store/a7/ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a",
2977 65536,
2978 true,
2979 false,
2980 42,
2981 );
2982 let json = serde_json::to_string(&event).unwrap();
2983 let back: EventKind = serde_json::from_str(&json).unwrap();
2984 assert_eq!(event, back);
2985
2986 if let EventKind::MediaStored {
2987 hash,
2988 path,
2989 size_bytes,
2990 verified,
2991 deduplicated,
2992 pipeline_ms,
2993 ..
2994 } = &back
2995 {
2996 assert_eq!(hash, BLAKE3_PNG);
2997 assert!(path.starts_with(".nika/media/store/"));
2998 assert_eq!(*size_bytes, 65536);
2999 assert!(*verified);
3000 assert!(!*deduplicated);
3001 assert_eq!(*pipeline_ms, 42);
3002 } else {
3003 panic!("Expected MediaStored");
3004 }
3005 }
3006
3007 #[test]
3008 fn media_stored_serde_roundtrip_deduplicated() {
3009 let event = media_stored(
3010 "gen_img_dup",
3011 BLAKE3_PNG,
3012 ".nika/media/store/a7/ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a",
3013 65536,
3014 false, true, 1, );
3018 let json = serde_json::to_string(&event).unwrap();
3019 let back: EventKind = serde_json::from_str(&json).unwrap();
3020 assert_eq!(event, back);
3021 }
3022
3023 #[test]
3024 fn media_store_failed_serde_roundtrip_with_hash() {
3025 let event = media_store_failed("gen_img", BLAKE3_PNG, "disk full");
3026 let json = serde_json::to_string(&event).unwrap();
3027 let back: EventKind = serde_json::from_str(&json).unwrap();
3028 assert_eq!(event, back);
3029
3030 if let EventKind::MediaStoreFailed { hash, reason, .. } = &back {
3031 assert_eq!(hash, BLAKE3_PNG);
3032 assert_eq!(reason, "disk full");
3033 } else {
3034 panic!("Expected MediaStoreFailed");
3035 }
3036 }
3037
3038 #[test]
3039 fn media_store_failed_serde_roundtrip_empty_hash() {
3040 let event = media_store_failed("gen_img", "", "base64 decode failed");
3042 let json = serde_json::to_string(&event).unwrap();
3043 let back: EventKind = serde_json::from_str(&json).unwrap();
3044 assert_eq!(event, back);
3045
3046 if let EventKind::MediaStoreFailed { hash, reason, .. } = &back {
3047 assert!(hash.is_empty(), "Pre-hash failure should have empty hash");
3048 assert_eq!(reason, "base64 decode failed");
3049 } else {
3050 panic!("Expected MediaStoreFailed");
3051 }
3052 }
3053
3054 #[test]
3059 fn media_extracted_returns_task_id() {
3060 let event = media_extracted("extract_images", 3, &["image", "audio", "video"]);
3061 assert_eq!(event.task_id(), Some("extract_images"));
3062 }
3063
3064 #[test]
3065 fn media_processed_returns_task_id() {
3066 let event = media_processed("process_png", BLAKE3_PNG, "image/png", 1024);
3067 assert_eq!(event.task_id(), Some("process_png"));
3068 }
3069
3070 #[test]
3071 fn media_stored_returns_task_id() {
3072 let event = media_stored(
3073 "store_to_cas",
3074 BLAKE3_PNG,
3075 ".nika/media/store/a7/ffc6",
3076 1024,
3077 true,
3078 false,
3079 10,
3080 );
3081 assert_eq!(event.task_id(), Some("store_to_cas"));
3082 }
3083
3084 #[test]
3085 fn media_store_failed_returns_task_id() {
3086 let event = media_store_failed("fail_store", BLAKE3_PNG, "permission denied");
3087 assert_eq!(event.task_id(), Some("fail_store"));
3088 }
3089
3090 #[test]
3091 fn all_4_media_variants_have_task_id() {
3092 let variants: Vec<(&str, EventKind)> = vec![
3093 (
3094 "extracted",
3095 media_extracted("t_extract", 2, &["image", "audio"]),
3096 ),
3097 (
3098 "processed",
3099 media_processed("t_process", BLAKE3_WAV, "audio/wav", 2048),
3100 ),
3101 (
3102 "stored",
3103 media_stored(
3104 "t_store",
3105 BLAKE3_PDF,
3106 ".nika/media/store/6b/86b2",
3107 4096,
3108 true,
3109 false,
3110 5,
3111 ),
3112 ),
3113 (
3114 "failed",
3115 media_store_failed("t_fail", "", "budget exceeded"),
3116 ),
3117 ];
3118
3119 for (name, event) in &variants {
3120 assert!(
3121 event.task_id().is_some(),
3122 "Media{name} must return Some task_id"
3123 );
3124 }
3125 }
3126
3127 #[test]
3132 fn media_extracted_ndjson_no_newlines() {
3133 let event = media_extracted("ndjson_test", 5, &["image", "audio", "video"]);
3134 let json = serde_json::to_string(&event).unwrap();
3135 assert!(
3136 !json.contains('\n'),
3137 "MediaExtracted JSON must not contain newlines: {json}"
3138 );
3139 }
3140
3141 #[test]
3142 fn media_processed_ndjson_no_newlines() {
3143 let event = media_processed("ndjson_test", BLAKE3_PNG, "image/png", 999999);
3144 let json = serde_json::to_string(&event).unwrap();
3145 assert!(
3146 !json.contains('\n'),
3147 "MediaProcessed JSON must not contain newlines: {json}"
3148 );
3149 }
3150
3151 #[test]
3152 fn media_stored_ndjson_no_newlines() {
3153 let event = media_stored(
3154 "ndjson_test",
3155 BLAKE3_WAV,
3156 ".nika/media/store/e3/b0c44298fc1c149afbf4c8996fb924",
3157 512000,
3158 true,
3159 true,
3160 100,
3161 );
3162 let json = serde_json::to_string(&event).unwrap();
3163 assert!(
3164 !json.contains('\n'),
3165 "MediaStored JSON must not contain newlines: {json}"
3166 );
3167 }
3168
3169 #[test]
3170 fn media_store_failed_ndjson_no_newlines() {
3171 let event = media_store_failed("ndjson_test", "", "write error: No space left on device");
3172 let json = serde_json::to_string(&event).unwrap();
3173 assert!(
3174 !json.contains('\n'),
3175 "MediaStoreFailed JSON must not contain newlines: {json}"
3176 );
3177 }
3178
3179 #[test]
3180 fn all_4_media_variants_ndjson_roundtrip() {
3181 let variants = vec![
3183 media_extracted("rt_task", 2, &["image", "audio"]),
3184 media_processed("rt_task", BLAKE3_PNG, "image/png", 8192),
3185 media_stored(
3186 "rt_task",
3187 BLAKE3_PNG,
3188 ".nika/media/store/a7/ffc6f8bf1ed76651",
3189 8192,
3190 true,
3191 false,
3192 25,
3193 ),
3194 media_store_failed("rt_task", BLAKE3_PNG, "verification checksum mismatch"),
3195 ];
3196
3197 for (i, variant) in variants.into_iter().enumerate() {
3198 let json = serde_json::to_string(&variant).unwrap();
3199 assert!(
3200 !json.contains('\n'),
3201 "Media variant {i} has embedded newline"
3202 );
3203 let back: EventKind = serde_json::from_str(&json).unwrap_or_else(|e| {
3204 panic!("Media variant {i} failed to deserialize: {e}\nJSON: {json}")
3205 });
3206 assert_eq!(variant, back, "Media variant {i} roundtrip mismatch");
3207 }
3208 }
3209
3210 #[test]
3211 fn media_events_ndjson_full_envelope() {
3212 let log = EventLog::new();
3214 log.emit(media_extracted("envelope_test", 1, &["image"]));
3215 log.emit(media_processed(
3216 "envelope_test",
3217 BLAKE3_PNG,
3218 "image/png",
3219 4096,
3220 ));
3221 log.emit(media_stored(
3222 "envelope_test",
3223 BLAKE3_PNG,
3224 ".nika/media/store/a7/ffc6",
3225 4096,
3226 true,
3227 false,
3228 15,
3229 ));
3230 log.emit(media_store_failed("envelope_test", "", "boom"));
3231
3232 for event in log.events() {
3233 let json = serde_json::to_string(&event).unwrap();
3234 assert!(
3235 !json.contains('\n'),
3236 "Full Event envelope must be single-line NDJSON: {json}"
3237 );
3238 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
3240 assert!(parsed.get("id").is_some(), "Missing 'id' in envelope");
3241 assert!(
3242 parsed.get("timestamp_ms").is_some(),
3243 "Missing 'timestamp_ms' in envelope"
3244 );
3245 assert!(parsed.get("kind").is_some(), "Missing 'kind' in envelope");
3246 }
3247 }
3248
3249 #[test]
3254 fn media_extracted_appears_in_eventlog_events() {
3255 let log = EventLog::new();
3256 let id = log.emit(media_extracted(
3257 "media_task",
3258 3,
3259 &["image", "audio", "video"],
3260 ));
3261
3262 let events = log.events();
3263 assert_eq!(events.len(), 1);
3264 assert_eq!(events[0].id, id);
3265
3266 if let EventKind::MediaExtracted {
3267 task_id,
3268 block_count,
3269 content_types,
3270 } = &events[0].kind
3271 {
3272 assert_eq!(task_id.as_ref(), "media_task");
3273 assert_eq!(*block_count, 3);
3274 assert_eq!(content_types, &["image", "audio", "video"]);
3275 } else {
3276 panic!("Expected MediaExtracted in events()");
3277 }
3278 }
3279
3280 #[test]
3281 fn media_stored_broadcast_reaches_subscriber() {
3282 let (log, mut rx) = EventLog::new_with_broadcast();
3283
3284 log.emit(media_stored(
3285 "broadcast_test",
3286 BLAKE3_PNG,
3287 ".nika/media/store/a7/ffc6",
3288 4096,
3289 true,
3290 false,
3291 10,
3292 ));
3293
3294 let received = rx
3296 .try_recv()
3297 .expect("Subscriber should receive MediaStored event");
3298 assert_eq!(received.id, 0);
3299
3300 if let EventKind::MediaStored {
3301 task_id,
3302 hash,
3303 verified,
3304 ..
3305 } = &received.kind
3306 {
3307 assert_eq!(task_id.as_ref(), "broadcast_test");
3308 assert_eq!(hash, BLAKE3_PNG);
3309 assert!(*verified);
3310 } else {
3311 panic!("Expected MediaStored via broadcast");
3312 }
3313 }
3314
3315 #[test]
3316 fn media_events_broadcast_multiple_subscribers() {
3317 let (log, mut rx1) = EventLog::new_with_broadcast();
3318 let mut rx2 = log.subscribe().expect("Should be able to subscribe");
3319
3320 log.emit(media_processed("multi_sub", BLAKE3_WAV, "audio/wav", 2048));
3321
3322 let e1 = rx1.try_recv().expect("rx1 should receive");
3323 let e2 = rx2.try_recv().expect("rx2 should receive");
3324 assert_eq!(e1.id, e2.id);
3325 assert_eq!(e1.kind, e2.kind);
3326 }
3327
3328 #[test]
3329 fn filter_task_returns_media_events() {
3330 let log = EventLog::new();
3331
3332 log.emit(EventKind::TaskStarted {
3334 task_id: "gen_image".into(),
3335 verb: "invoke".into(),
3336 inputs: json!({}),
3337 });
3338 log.emit(media_extracted("gen_image", 1, &["image"]));
3339 log.emit(media_processed("gen_image", BLAKE3_PNG, "image/png", 4096));
3340 log.emit(media_stored(
3341 "gen_image",
3342 BLAKE3_PNG,
3343 ".nika/media/store/a7/ffc6",
3344 4096,
3345 true,
3346 false,
3347 20,
3348 ));
3349
3350 log.emit(media_extracted("other_task", 2, &["audio", "video"]));
3352
3353 let gen_events = log.filter_task("gen_image");
3354 assert_eq!(
3355 gen_events.len(),
3356 4,
3357 "gen_image should have 4 events (1 task + 3 media)"
3358 );
3359
3360 assert!(matches!(&gen_events[0].kind, EventKind::TaskStarted { .. }));
3362 assert!(matches!(
3363 &gen_events[1].kind,
3364 EventKind::MediaExtracted { .. }
3365 ));
3366 assert!(matches!(
3367 &gen_events[2].kind,
3368 EventKind::MediaProcessed { .. }
3369 ));
3370 assert!(matches!(&gen_events[3].kind, EventKind::MediaStored { .. }));
3371
3372 let other_events = log.filter_task("other_task");
3373 assert_eq!(other_events.len(), 1, "other_task should only have 1 event");
3374 }
3375
3376 #[test]
3377 fn filter_task_with_media_failure() {
3378 let log = EventLog::new();
3379
3380 log.emit(media_extracted("fail_task", 1, &["image"]));
3381 log.emit(media_processed("fail_task", BLAKE3_PNG, "image/png", 4096));
3382 log.emit(media_store_failed("fail_task", BLAKE3_PNG, "disk full"));
3383
3384 let events = log.filter_task("fail_task");
3385 assert_eq!(events.len(), 3);
3386 assert!(matches!(
3387 &events[2].kind,
3388 EventKind::MediaStoreFailed { .. }
3389 ));
3390 }
3391
3392 #[test]
3393 fn count_task_includes_media_events() {
3394 let log = EventLog::new();
3395
3396 log.emit(media_extracted("count_me", 2, &["image", "audio"]));
3397 log.emit(media_processed("count_me", BLAKE3_PNG, "image/png", 4096));
3398 log.emit(media_processed("count_me", BLAKE3_WAV, "audio/wav", 8192));
3399 log.emit(media_stored(
3400 "count_me",
3401 BLAKE3_PNG,
3402 ".nika/media/store/a7/ffc6",
3403 4096,
3404 true,
3405 false,
3406 15,
3407 ));
3408 log.emit(media_stored(
3409 "count_me",
3410 BLAKE3_WAV,
3411 ".nika/media/store/e3/b0c4",
3412 8192,
3413 true,
3414 false,
3415 22,
3416 ));
3417
3418 assert_eq!(log.count_task("count_me"), 5);
3419 assert_eq!(log.count_task("no_such_task"), 0);
3420 }
3421
3422 #[test]
3423 fn media_events_not_workflow_events() {
3424 let log = EventLog::new();
3425
3426 log.emit(workflow_started(1));
3427 log.emit(media_extracted("t1", 1, &["image"]));
3428 log.emit(media_processed("t1", BLAKE3_PNG, "image/png", 4096));
3429 log.emit(media_stored(
3430 "t1",
3431 BLAKE3_PNG,
3432 ".nika/media/store/a7/ffc6",
3433 4096,
3434 true,
3435 false,
3436 10,
3437 ));
3438 log.emit(media_store_failed("t1", "", "boom"));
3439
3440 let wf_events = log.workflow_events();
3441 assert_eq!(
3442 wf_events.len(),
3443 1,
3444 "Media events must NOT appear in workflow_events()"
3445 );
3446 }
3447
3448 #[test]
3453 fn media_stored_pipeline_ms_reasonable_values() {
3454 let event = media_stored(
3456 "fast_store",
3457 BLAKE3_PNG,
3458 ".nika/media/store/a7/ffc6",
3459 4096,
3460 true,
3461 false,
3462 42,
3463 );
3464 if let EventKind::MediaStored { pipeline_ms, .. } = &event {
3465 assert!(
3466 *pipeline_ms < 10000,
3467 "pipeline_ms={pipeline_ms} should be < 10000ms"
3468 );
3469 }
3470
3471 let event_zero = media_stored(
3473 "dedup_store",
3474 BLAKE3_PNG,
3475 ".nika/media/store/a7/ffc6",
3476 4096,
3477 false,
3478 true,
3479 0,
3480 );
3481 if let EventKind::MediaStored { pipeline_ms, .. } = &event_zero {
3482 assert_eq!(*pipeline_ms, 0, "Dedup fast path can have 0ms pipeline");
3483 }
3484
3485 let event_edge = media_stored(
3487 "slow_store",
3488 BLAKE3_PNG,
3489 ".nika/media/store/a7/ffc6",
3490 4096,
3491 true,
3492 false,
3493 9999,
3494 );
3495 if let EventKind::MediaStored { pipeline_ms, .. } = &event_edge {
3496 assert!(*pipeline_ms < 10000);
3497 }
3498 }
3499
3500 #[test]
3501 fn media_stored_verified_and_deduplicated_independent() {
3502 let combos: Vec<(bool, bool, &str)> = vec![
3504 (true, false, "fresh write, verified"),
3505 (false, false, "fresh write, unverified (small file)"),
3506 (false, true, "dedup hit, not re-verified"),
3507 (true, true, "dedup hit, re-verified"),
3508 ];
3509
3510 for (verified, deduplicated, desc) in combos {
3511 let event = media_stored(
3512 "combo_test",
3513 BLAKE3_PNG,
3514 ".nika/media/store/a7/ffc6",
3515 4096,
3516 verified,
3517 deduplicated,
3518 10,
3519 );
3520 if let EventKind::MediaStored {
3521 verified: v,
3522 deduplicated: d,
3523 ..
3524 } = &event
3525 {
3526 assert_eq!(*v, verified, "verified mismatch for: {desc}");
3527 assert_eq!(*d, deduplicated, "deduplicated mismatch for: {desc}");
3528 }
3529
3530 let json = serde_json::to_string(&event).unwrap();
3532 let back: EventKind = serde_json::from_str(&json).unwrap();
3533 assert_eq!(event, back, "Roundtrip failed for: {desc}");
3534 }
3535 }
3536
3537 #[test]
3538 fn media_stored_path_cas_format() {
3539 let cas_paths = vec![
3541 ".nika/media/store/a7/ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a",
3542 ".nika/media/store/e3/b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
3543 ".nika/media/store/6b/86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b",
3544 ];
3545
3546 for path in cas_paths {
3547 let event = media_stored("path_test", BLAKE3_PNG, path, 4096, true, false, 10);
3548 if let EventKind::MediaStored { path: p, .. } = &event {
3549 assert!(
3550 p.starts_with(".nika/media/store/"),
3551 "CAS path must start with .nika/media/store/: {p}"
3552 );
3553 let suffix = p.strip_prefix(".nika/media/store/").unwrap();
3555 let parts: Vec<&str> = suffix.splitn(2, '/').collect();
3556 assert_eq!(parts.len(), 2, "CAS path suffix must be dir/file: {suffix}");
3557 assert_eq!(
3558 parts[0].len(),
3559 2,
3560 "CAS directory prefix must be 2 chars: {}",
3561 parts[0]
3562 );
3563 assert!(!parts[1].is_empty(), "CAS filename must not be empty");
3564 }
3565 }
3566 }
3567
3568 #[test]
3573 fn media_events_serde_tags_are_snake_case() {
3574 let variants: Vec<(&str, EventKind)> = vec![
3575 ("media_extracted", media_extracted("t", 1, &["image"])),
3576 (
3577 "media_processed",
3578 media_processed("t", BLAKE3_PNG, "image/png", 100),
3579 ),
3580 (
3581 "media_stored",
3582 media_stored(
3583 "t",
3584 BLAKE3_PNG,
3585 ".nika/media/store/a7/ffc6",
3586 100,
3587 true,
3588 false,
3589 5,
3590 ),
3591 ),
3592 ("media_store_failed", media_store_failed("t", "", "err")),
3593 ];
3594
3595 for (expected_tag, event) in variants {
3596 let json = serde_json::to_string(&event).unwrap();
3597 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
3598 assert_eq!(
3599 parsed["type"].as_str().unwrap(),
3600 expected_tag,
3601 "Serde tag mismatch for {expected_tag}"
3602 );
3603 }
3604 }
3605
3606 #[test]
3607 fn media_events_deserialize_from_json_objects() {
3608 let json_extracted = json!({
3610 "type": "media_extracted",
3611 "task_id": "from_json",
3612 "block_count": 2,
3613 "content_types": ["image", "audio"]
3614 });
3615 let extracted: EventKind = serde_json::from_value(json_extracted).unwrap();
3616 assert_eq!(extracted.task_id(), Some("from_json"));
3617 if let EventKind::MediaExtracted {
3618 block_count,
3619 content_types,
3620 ..
3621 } = &extracted
3622 {
3623 assert_eq!(*block_count, 2);
3624 assert_eq!(content_types, &["image", "audio"]);
3625 } else {
3626 panic!("Expected MediaExtracted from JSON");
3627 }
3628
3629 let json_stored = json!({
3630 "type": "media_stored",
3631 "task_id": "from_json",
3632 "hash": BLAKE3_PNG,
3633 "path": ".nika/media/store/a7/ffc6",
3634 "size_bytes": 8192,
3635 "verified": true,
3636 "deduplicated": false,
3637 "pipeline_ms": 33
3638 });
3639 let stored: EventKind = serde_json::from_value(json_stored).unwrap();
3640 if let EventKind::MediaStored {
3641 pipeline_ms,
3642 verified,
3643 deduplicated,
3644 ..
3645 } = &stored
3646 {
3647 assert_eq!(*pipeline_ms, 33);
3648 assert!(*verified);
3649 assert!(!*deduplicated);
3650 } else {
3651 panic!("Expected MediaStored from JSON");
3652 }
3653 }
3654
3655 #[test]
3656 fn media_full_pipeline_lifecycle_in_eventlog() {
3657 let log = EventLog::new();
3659 let task = "generate_screenshot";
3660
3661 log.emit(EventKind::TaskStarted {
3663 task_id: task.into(),
3664 verb: "invoke".into(),
3665 inputs: json!({"tool": "screenshot"}),
3666 });
3667
3668 log.emit(media_extracted(task, 2, &["image", "image"]));
3670
3671 log.emit(media_processed(task, BLAKE3_PNG, "image/png", 65536));
3673 log.emit(media_processed(task, BLAKE3_WAV, "image/jpeg", 32768));
3674
3675 log.emit(media_stored(
3677 task,
3678 BLAKE3_PNG,
3679 ".nika/media/store/a7/ffc6f8bf1ed76651",
3680 65536,
3681 true,
3682 false,
3683 35,
3684 ));
3685 log.emit(media_stored(
3686 task,
3687 BLAKE3_WAV,
3688 ".nika/media/store/e3/b0c44298fc1c1",
3689 32768,
3690 false,
3691 true,
3692 2,
3693 ));
3694
3695 log.emit(EventKind::TaskCompleted {
3697 task_id: task.into(),
3698 output: Arc::new(json!({"images": 2})),
3699 duration_ms: 500,
3700 });
3701
3702 let events = log.filter_task(task);
3703 assert_eq!(
3704 events.len(),
3705 7,
3706 "Full lifecycle: 1 started + 1 extracted + 2 processed + 2 stored + 1 completed"
3707 );
3708
3709 assert!(matches!(&events[0].kind, EventKind::TaskStarted { .. }));
3711 assert!(matches!(&events[1].kind, EventKind::MediaExtracted { .. }));
3712 assert!(matches!(&events[2].kind, EventKind::MediaProcessed { .. }));
3713 assert!(matches!(&events[3].kind, EventKind::MediaProcessed { .. }));
3714 assert!(matches!(&events[4].kind, EventKind::MediaStored { .. }));
3715 assert!(matches!(&events[5].kind, EventKind::MediaStored { .. }));
3716 assert!(matches!(&events[6].kind, EventKind::TaskCompleted { .. }));
3717
3718 if let EventKind::MediaStored {
3720 deduplicated,
3721 pipeline_ms,
3722 ..
3723 } = &events[5].kind
3724 {
3725 assert!(*deduplicated, "Second store should be dedup hit");
3726 assert!(*pipeline_ms < 10, "Dedup fast path should be < 10ms");
3727 }
3728 }
3729
3730 #[test]
3731 fn vision_content_resolved_serde_round_trip() {
3732 let event = EventKind::VisionContentResolved {
3733 task_id: Arc::from("describe_image"),
3734 image_count: 3,
3735 total_bytes: 1_048_576,
3736 resolve_ms: 42,
3737 };
3738 let json = serde_json::to_string(&event).unwrap();
3739 assert!(json.contains("vision_content_resolved"));
3740 assert!(json.contains("describe_image"));
3741 assert!(json.contains("1048576"));
3742 let parsed: EventKind = serde_json::from_str(&json).unwrap();
3743 assert_eq!(event, parsed);
3744 }
3745
3746 #[test]
3747 fn vision_content_resolved_has_task_id() {
3748 let event = EventKind::VisionContentResolved {
3749 task_id: Arc::from("my_task"),
3750 image_count: 1,
3751 total_bytes: 512,
3752 resolve_ms: 5,
3753 };
3754 assert_eq!(event.task_id(), Some("my_task"));
3755 }
3756
3757 #[test]
3762 fn http_request_serde_round_trip() {
3763 let event = EventKind::HttpRequest {
3764 task_id: Arc::from("fetch_task"),
3765 method: "POST".to_string(),
3766 url: "https://api.example.com/data".to_string(),
3767 has_body: true,
3768 };
3769 let json = serde_json::to_string(&event).unwrap();
3770 assert!(json.contains("http_request"));
3771 assert!(json.contains("POST"));
3772 assert!(json.contains("api.example.com"));
3773 let parsed: EventKind = serde_json::from_str(&json).unwrap();
3774 assert_eq!(event, parsed);
3775 }
3776
3777 #[test]
3778 fn http_request_has_task_id() {
3779 let event = EventKind::HttpRequest {
3780 task_id: Arc::from("t1"),
3781 method: "GET".to_string(),
3782 url: "https://example.com".to_string(),
3783 has_body: false,
3784 };
3785 assert_eq!(event.task_id(), Some("t1"));
3786 }
3787
3788 #[test]
3789 fn http_response_serde_round_trip() {
3790 let event = EventKind::HttpResponse {
3791 task_id: Arc::from("fetch_task"),
3792 status_code: 200,
3793 content_type: Some("application/json".to_string()),
3794 content_length: Some(1234),
3795 elapsed_ms: 150,
3796 };
3797 let json = serde_json::to_string(&event).unwrap();
3798 assert!(json.contains("http_response"));
3799 assert!(json.contains("200"));
3800 assert!(json.contains("application/json"));
3801 let parsed: EventKind = serde_json::from_str(&json).unwrap();
3802 assert_eq!(event, parsed);
3803 }
3804
3805 #[test]
3806 fn http_response_without_content_type() {
3807 let event = EventKind::HttpResponse {
3808 task_id: Arc::from("t2"),
3809 status_code: 404,
3810 content_type: None,
3811 content_length: None,
3812 elapsed_ms: 50,
3813 };
3814 let json = serde_json::to_string(&event).unwrap();
3815 let parsed: EventKind = serde_json::from_str(&json).unwrap();
3816 assert_eq!(event, parsed);
3817 }
3818
3819 #[test]
3820 fn http_response_has_task_id() {
3821 let event = EventKind::HttpResponse {
3822 task_id: Arc::from("t3"),
3823 status_code: 500,
3824 content_type: None,
3825 content_length: None,
3826 elapsed_ms: 0,
3827 };
3828 assert_eq!(event.task_id(), Some("t3"));
3829 }
3830
3831 #[test]
3832 fn exec_completed_serializes() {
3833 let event = EventKind::ExecCompleted {
3834 task_id: "exec_task".into(),
3835 exit_code: 0,
3836 stdout_len: 1024,
3837 stderr_len: 0,
3838 duration_ms: 150,
3839 };
3840 let json = serde_json::to_string(&event).unwrap();
3841 assert!(json.contains("exec_completed"));
3842 assert!(json.contains("\"exit_code\":0"));
3843 let round: EventKind = serde_json::from_str(&json).unwrap();
3844 assert_eq!(round.task_id(), Some("exec_task"));
3845 }
3846
3847 #[test]
3848 fn fetch_retry_serializes() {
3849 let event = EventKind::FetchRetry {
3850 task_id: "fetch_task".into(),
3851 url: "https://api.example.com/data".to_string(),
3852 attempt: 2,
3853 max_attempts: 3,
3854 status_code: Some(503),
3855 backoff_ms: 2000,
3856 };
3857 let json = serde_json::to_string(&event).unwrap();
3858 assert!(json.contains("fetch_retry"));
3859 assert!(json.contains("\"attempt\":2"));
3860 let round: EventKind = serde_json::from_str(&json).unwrap();
3861 assert_eq!(round.task_id(), Some("fetch_task"));
3862 }
3863
3864 #[test]
3865 fn policy_blocked_serializes() {
3866 let event = EventKind::PolicyBlocked {
3867 task_id: "exec_task".into(),
3868 verb: "exec".to_string(),
3869 policy_type: "command_blocklist".to_string(),
3870 reason: "Command 'sudo' is blocked".to_string(),
3871 };
3872 let json = serde_json::to_string(&event).unwrap();
3873 assert!(json.contains("policy_blocked"));
3874 assert!(json.contains("command_blocklist"));
3875 let round: EventKind = serde_json::from_str(&json).unwrap();
3876 assert_eq!(round.task_id(), Some("exec_task"));
3877 }
3878
3879 #[test]
3880 fn boot_phase_completed_serializes() {
3881 let event = EventKind::BootPhaseCompleted {
3882 phase: "mcp_startup".to_string(),
3883 success: true,
3884 duration_ms: 1234,
3885 warnings: vec!["daemon not running".to_string()],
3886 };
3887 let json = serde_json::to_string(&event).unwrap();
3888 assert!(json.contains("boot_phase_completed"));
3889 assert!(json.contains("mcp_startup"));
3890 let round: EventKind = serde_json::from_str(&json).unwrap();
3891 assert_eq!(round.task_id(), None);
3892 }
3893
3894 #[test]
3895 fn native_model_loaded_serializes() {
3896 let event = EventKind::NativeModelLoaded {
3897 model: "mistral-7b-instruct.gguf".to_string(),
3898 kind: "gguf".to_string(),
3899 size_bytes: 4_000_000_000,
3900 duration_ms: 3500,
3901 is_vision: false,
3902 };
3903 let json = serde_json::to_string(&event).unwrap();
3904 assert!(json.contains("native_model_loaded"));
3905 let round: EventKind = serde_json::from_str(&json).unwrap();
3906 assert_eq!(round.task_id(), None);
3907 }
3908}