1use oxi_sdk::EventBus as SdkEventBus;
15use oxi_sdk::observability::{AuditAction, AuditTrail};
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use uuid::Uuid;
19
20use crate::types::AgentId;
21
22pub type EventBus = SdkEventBus<KernelEvent>;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum KernelEvent {
32 AgentCreated {
34 id: AgentId,
36 name: String,
38 },
39 AgentStarted {
41 id: AgentId,
43 },
44 AgentStopped {
46 id: AgentId,
48 },
49 AgentFailed {
51 id: AgentId,
53 error: String,
55 },
56 MessageReceived {
58 from: AgentId,
60 content: String,
62 },
63 SeedCreated {
65 seed_id: uuid::Uuid,
67 },
68 EvaluationComplete {
70 seed_id: uuid::Uuid,
72 passed: bool,
74 },
75 PhaseStarted {
77 session_id: String,
79 phase: oxios_ouroboros::Phase,
81 },
82 PhaseCompleted {
84 session_id: String,
86 phase: oxios_ouroboros::Phase,
88 result_summary: String,
90 },
91 AgentOutput {
93 session_id: String,
95 agent_id: AgentId,
97 output: String,
99 },
100 ApprovalRequested {
102 id: uuid::Uuid,
104 tool_name: String,
106 action: String,
108 resource: String,
110 reason: String,
112 session_id: Option<String>,
114 },
115 ApprovalResolved {
117 id: uuid::Uuid,
119 approved: bool,
121 },
122 MemoryStored {
124 id: String,
126 memory_type: String,
128 source: String,
130 },
131 MemoryRecalled {
133 query: String,
135 count: usize,
137 },
138 AgentGroupCreated {
140 group_id: uuid::Uuid,
142 agent_count: usize,
144 },
145 AgentGroupMemberCompleted {
147 group_id: uuid::Uuid,
149 agent_id: uuid::Uuid,
151 success: bool,
153 },
154 ProjectCreated {
156 project_id: uuid::Uuid,
158 name: String,
160 source: String,
162 },
163 ProjectActivated {
165 project_id: uuid::Uuid,
167 name: String,
169 },
170 EvolutionStarted {
172 seed_id: uuid::Uuid,
174 new_seed_id: uuid::Uuid,
176 iteration: u32,
178 },
179 EvolutionMaxReached {
181 seed_id: uuid::Uuid,
183 final_score: f64,
185 iterations: u32,
187 },
188
189 ToolExecutionStarted {
194 session_id: String,
196 tool_name: String,
198 tool_call_id: String,
200 tool_args: serde_json::Value,
202 #[serde(default, skip_serializing_if = "Option::is_none")]
205 context: Option<serde_json::Value>,
206 },
207 ToolExecutionFinished {
209 session_id: String,
211 tool_call_id: String,
213 tool_name: String,
215 duration_ms: u64,
217 is_error: bool,
219 output_summary: String,
221 },
222 ToolExecutionProgress {
224 session_id: String,
226 tool_call_id: String,
228 tool_name: String,
230 progress: String,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
236 tab_id: Option<Uuid>,
237 #[serde(default, skip_serializing_if = "Option::is_none")]
243 context: Option<serde_json::Value>,
244 },
245 MemoryRecallUsed {
247 session_id: String,
249 query: String,
251 count: usize,
253 source: String,
255 },
256 TokenUsageUpdate {
258 session_id: String,
260 input_tokens: u64,
262 output_tokens: u64,
264 },
265 ReasoningFragment {
267 session_id: String,
269 content: String,
271 source: String,
273 },
274
275 CalendarEventCreated {
278 uid: String,
280 title: String,
282 start: String,
284 end: String,
286 },
287 CalendarEventUpdated {
289 uid: String,
291 title: String,
293 },
294 CalendarEventDeleted {
296 uid: String,
298 title: String,
300 },
301 EmailSent {
303 subject: String,
305 message_id: String,
307 #[serde(default, skip_serializing_if = "Option::is_none")]
309 template_name: Option<String>,
310 },
311
312 KnowledgePersisted {
315 session_id: String,
316 message_index: usize,
317 path: String,
318 source: String, },
320 KnowledgeRemoved {
322 session_id: String,
323 message_index: usize,
324 },
325}
326
327pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
329 match event {
330 KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
331 task_type: name.clone(),
332 },
333 KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
334 task_type: "started".to_string(),
335 },
336 KernelEvent::AgentStopped { .. } => AuditAction::AgentExit {
337 reason: "stopped".to_string(),
338 },
339 KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
340 reason: error.clone(),
341 },
342 KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
343 detail: format!("message: {content}"),
344 },
345 KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
346 detail: format!("seed_created:{seed_id}"),
347 },
348 KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
349 detail: format!("evaluation:{seed_id}:{passed}"),
350 },
351 KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
352 detail: format!("phase_started:{session_id}:{phase}"),
353 },
354 KernelEvent::PhaseCompleted {
355 session_id,
356 phase,
357 result_summary,
358 } => AuditAction::Other {
359 detail: format!("phase_completed:{session_id}:{phase}:{result_summary}"),
360 },
361 KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
362 detail: format!("agent_output:{output}"),
363 },
364 KernelEvent::ApprovalRequested {
365 id,
366 action,
367 resource,
368 ..
369 } => AuditAction::Other {
370 detail: format!("approval_requested:{id}:{action}:{resource}"),
371 },
372 KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
373 detail: format!("approval_resolved:{id}:{approved}"),
374 },
375 KernelEvent::MemoryStored {
376 id, memory_type, ..
377 } => AuditAction::MemoryWrite {
378 entry_id: format!("{id}:{memory_type}"),
379 },
380 KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
381 entry_id: format!("query:{query}:{count}results"),
382 },
383 KernelEvent::AgentGroupCreated {
384 group_id,
385 agent_count,
386 } => AuditAction::Other {
387 detail: format!("group_created:{group_id}:{agent_count}agents"),
388 },
389 KernelEvent::AgentGroupMemberCompleted {
390 group_id,
391 agent_id,
392 success,
393 } => AuditAction::Other {
394 detail: format!("group_member_completed:{group_id}:{agent_id}:{success}"),
395 },
396 KernelEvent::EvolutionStarted {
397 seed_id,
398 new_seed_id,
399 iteration,
400 } => AuditAction::Other {
401 detail: format!("evolution:{seed_id}->{new_seed_id}:iter{iteration}"),
402 },
403 KernelEvent::EvolutionMaxReached {
404 seed_id,
405 final_score,
406 iterations,
407 } => AuditAction::Other {
408 detail: format!("evolution_max:{seed_id}:score={final_score}:iters={iterations}"),
409 },
410 KernelEvent::ProjectCreated {
411 project_id: _,
412 name,
413 source,
414 } => AuditAction::Other {
415 detail: format!("project_created:{name}:{source}"),
416 },
417 KernelEvent::ProjectActivated {
418 project_id: _,
419 name,
420 } => AuditAction::Other {
421 detail: format!("project_activated:{name}"),
422 },
423 KernelEvent::ToolExecutionStarted { tool_name, .. } => AuditAction::Other {
425 detail: format!("tool_started:{tool_name}"),
426 },
427 KernelEvent::ToolExecutionFinished {
428 tool_name,
429 is_error,
430 ..
431 } => AuditAction::Other {
432 detail: format!(
433 "tool_finished:{tool_name}:{}",
434 if *is_error { "error" } else { "ok" }
435 ),
436 },
437 KernelEvent::ToolExecutionProgress {
438 tool_name,
439 tab_id,
440 context,
441 ..
442 } => AuditAction::Other {
443 detail: {
444 let mut d = format!("tool_progress:{tool_name}");
445 if let Some(id) = tab_id {
446 d.push_str(&format!(":tab={id}"));
447 }
448 if let Some(ctx) = context
449 .as_ref()
450 .and_then(|c| c.get("kind"))
451 .and_then(|k| k.as_str())
452 {
453 d.push_str(&format!(":{ctx}"));
454 }
455 d
456 },
457 },
458 KernelEvent::MemoryRecallUsed { query, count, .. } => AuditAction::MemoryRead {
459 entry_id: format!("recall:{query}:{count}results"),
460 },
461 KernelEvent::TokenUsageUpdate {
462 input_tokens,
463 output_tokens,
464 ..
465 } => AuditAction::Other {
466 detail: format!("tokens:in={input_tokens}:out={output_tokens}"),
467 },
468 KernelEvent::ReasoningFragment { source, .. } => AuditAction::Other {
469 detail: format!("reasoning:{source}"),
470 },
471 KernelEvent::CalendarEventCreated { uid, title, .. } => AuditAction::Other {
472 detail: format!("calendar:created:{uid}:{title}"),
473 },
474 KernelEvent::CalendarEventUpdated { uid, title } => AuditAction::Other {
475 detail: format!("calendar:updated:{uid}:{title}"),
476 },
477 KernelEvent::CalendarEventDeleted { uid, title } => AuditAction::Other {
478 detail: format!("calendar:deleted:{uid}:{title}"),
479 },
480 KernelEvent::EmailSent {
481 subject,
482 message_id,
483 template_name,
484 } => AuditAction::Other {
485 detail: format!("email:sent:{subject} (msg={message_id}, tpl={template_name:?})"),
486 },
487 KernelEvent::KnowledgePersisted {
488 session_id,
489 message_index,
490 path,
491 source,
492 } => AuditAction::Other {
493 detail: format!("knowledge:persisted:{session_id}:{message_index}:{path}:{source}"),
494 },
495 KernelEvent::KnowledgeRemoved {
496 session_id,
497 message_index,
498 } => AuditAction::Other {
499 detail: format!("knowledge:removed:{session_id}:{message_index}"),
500 },
501 }
502}
503
504fn extract_agent_id(event: &KernelEvent) -> String {
506 match event {
507 KernelEvent::AgentCreated { id, .. } => id.to_string(),
508 KernelEvent::AgentStarted { id, .. } => id.to_string(),
509 KernelEvent::AgentStopped { id, .. } => id.to_string(),
510 KernelEvent::AgentFailed { id, .. } => id.to_string(),
511 KernelEvent::MessageReceived { from, .. } => from.to_string(),
512 KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
513 KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
514 KernelEvent::ProjectActivated { project_id, .. } => format!("project:{project_id}"),
515 KernelEvent::ToolExecutionStarted { session_id, .. } => format!("session:{session_id}"),
517 KernelEvent::ToolExecutionFinished { session_id, .. } => format!("session:{session_id}"),
518 KernelEvent::ToolExecutionProgress { session_id, .. } => format!("session:{session_id}"),
519 KernelEvent::MemoryRecallUsed { session_id, .. } => format!("session:{session_id}"),
520 KernelEvent::TokenUsageUpdate { session_id, .. } => format!("session:{session_id}"),
521 KernelEvent::ReasoningFragment { session_id, .. } => format!("session:{session_id}"),
522 KernelEvent::KnowledgePersisted { session_id, .. } => format!("session:{session_id}"),
523 KernelEvent::KnowledgeRemoved { session_id, .. } => format!("session:{session_id}"),
524 _ => "system".to_string(),
525 }
526}
527
528pub fn attach_audit_trail(bus: &EventBus, audit: Arc<AuditTrail>) {
534 let mut rx = bus.subscribe();
535 tokio::spawn(async move {
536 loop {
537 match rx.recv().await {
538 Ok(event) => {
539 let actor = extract_agent_id(&event);
540 let action = kernel_event_to_audit_action(&event);
541 let resource = format!("{event:?}");
542 audit.append(actor, action, resource);
543 }
544 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
545 tracing::warn!(
546 skipped = n,
547 "Audit trail subscriber lagged, skipping events"
548 );
549 continue;
550 }
551 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
552 tracing::info!("Audit trail event bus closed, exiting");
553 break;
554 }
555 }
556 }
557 });
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563
564 fn sample_event(name: &str) -> KernelEvent {
565 KernelEvent::AgentCreated {
566 id: AgentId::new_v4(),
567 name: name.to_string(),
568 }
569 }
570
571 #[test]
572 fn test_event_bus_uses_sdk() {
573 let bus: EventBus = EventBus::new(256);
574 assert!(format!("{:?}", bus).contains("EventBus"));
575 }
576
577 #[tokio::test]
578 async fn test_publish_no_subscribers_ok() {
579 let bus = EventBus::new(16);
580 let result = bus.publish(sample_event("orphan"));
581 assert!(result.is_ok());
582 }
583
584 #[tokio::test]
585 async fn test_single_subscriber_receives_event() {
586 let bus = EventBus::new(16);
587 let mut rx = bus.subscribe();
588
589 let event = sample_event("test-agent");
590 bus.publish(event.clone()).unwrap();
591
592 let received = rx.try_recv().expect("should receive event");
593 match received {
594 KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "test-agent"),
595 _ => panic!("wrong event type"),
596 }
597 }
598
599 #[tokio::test]
600 async fn test_multiple_subscribers_receive_events() {
601 let bus = EventBus::new(16);
602 let mut rx1 = bus.subscribe();
603 let mut rx2 = bus.subscribe();
604
605 let event = sample_event("multi");
606 bus.publish(event.clone()).unwrap();
607
608 let r1 = rx1.try_recv().expect("rx1 should receive event");
609 let r2 = rx2.try_recv().expect("rx2 should receive event");
610
611 assert!(matches!(r1, KernelEvent::AgentCreated { .. }));
612 assert!(matches!(r2, KernelEvent::AgentCreated { .. }));
613 }
614
615 #[tokio::test]
616 async fn test_kernel_event_to_audit_action() {
617 let event = KernelEvent::AgentFailed {
618 id: AgentId::new_v4(),
619 error: "boom".to_string(),
620 };
621 let action = kernel_event_to_audit_action(&event);
622 match action {
623 AuditAction::AgentExit { reason } => assert_eq!(reason, "boom"),
624 other => panic!("expected AgentExit, got {other:?}"),
625 }
626 }
627
628 #[test]
634 fn test_rfc015_event_round_trip_json() {
635 let cases: Vec<KernelEvent> = vec![
636 KernelEvent::ToolExecutionStarted {
637 session_id: "s1".into(),
638 tool_name: "read_file".into(),
639 tool_call_id: "call_1".into(),
640 tool_args: serde_json::json!({"path": "/src/main.rs"}),
641 context: None,
642 },
643 KernelEvent::ToolExecutionFinished {
644 session_id: "s1".into(),
645 tool_call_id: "call_1".into(),
646 tool_name: "read_file".into(),
647 duration_ms: 234,
648 is_error: false,
649 output_summary: "fn main() {}".into(),
650 },
651 KernelEvent::ToolExecutionProgress {
652 session_id: "s1".into(),
653 tool_call_id: "call_1".into(),
654 tool_name: "read_file".into(),
655 progress: "reading line 42/100".into(),
656 tab_id: None,
657 context: None,
658 },
659 KernelEvent::MemoryRecallUsed {
660 session_id: "s1".into(),
661 query: "rust errors".into(),
662 count: 3,
663 source: "warm".into(),
664 },
665 KernelEvent::TokenUsageUpdate {
666 session_id: "s1".into(),
667 input_tokens: 1234,
668 output_tokens: 567,
669 },
670 KernelEvent::ReasoningFragment {
671 session_id: "s1".into(),
672 content: "compaction done".into(),
673 source: "compaction".into(),
674 },
675 ];
676 for event in cases {
677 let json = serde_json::to_string(&event).expect("serialize");
678 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
679 let json2 = serde_json::to_string(&back).expect("serialize round-trip");
680 assert_eq!(json, json2, "round-trip should be stable");
681 }
682 }
683
684 #[test]
687 fn test_tool_execution_progress_serde_round_trip() {
688 let event = KernelEvent::ToolExecutionProgress {
689 session_id: "s-abc".into(),
690 tool_call_id: "call_42".into(),
691 tool_name: "browse".into(),
692 progress: "loading https://example.com".into(),
693 tab_id: Some(Uuid::new_v4()),
694 context: None,
695 };
696 let json = serde_json::to_string(&event).expect("serialize");
697 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
698 match back {
699 KernelEvent::ToolExecutionProgress {
700 ref session_id,
701 ref tool_call_id,
702 ref tool_name,
703 ref progress,
704 tab_id,
705 ..
706 } => {
707 assert_eq!(session_id, "s-abc");
708 assert_eq!(tool_call_id, "call_42");
709 assert_eq!(tool_name, "browse");
710 assert_eq!(progress, "loading https://example.com");
711 assert!(tab_id.is_some(), "tab_id should round-trip when present");
712 }
713 other => panic!("expected ToolExecutionProgress, got {other:?}"),
714 }
715 }
716
717 #[test]
723 fn test_tool_execution_progress_audit_action() {
724 let with_tab = KernelEvent::ToolExecutionProgress {
725 session_id: "s1".into(),
726 tool_call_id: "c1".into(),
727 tool_name: "browse".into(),
728 progress: "navigating".into(),
729 tab_id: Some(Uuid::new_v4()),
730 context: None,
731 };
732 match kernel_event_to_audit_action(&with_tab) {
733 AuditAction::Other { detail } => {
734 assert!(detail.contains("tool_progress"), "detail: {detail}");
735 assert!(detail.contains("browse"), "detail: {detail}");
736 assert!(
737 detail.contains(":tab="),
738 "detail should include tab id: {detail}"
739 );
740 }
741 other => panic!("expected Other, got {other:?}"),
742 }
743 let without_tab = KernelEvent::ToolExecutionProgress {
744 session_id: "s1".into(),
745 tool_call_id: "c1".into(),
746 tool_name: "browse".into(),
747 progress: "navigating".into(),
748 tab_id: None,
749 context: None,
750 };
751 match kernel_event_to_audit_action(&without_tab) {
752 AuditAction::Other { detail } => {
753 assert_eq!(detail, "tool_progress:browse");
754 }
755 other => panic!("expected Other, got {other:?}"),
756 }
757 }
758
759 #[test]
763 fn test_tool_execution_progress_tab_id_optional_in_serde() {
764 let legacy_json = r#"{
767 "ToolExecutionProgress": {
768 "session_id": "s-old",
769 "tool_call_id": "call_legacy",
770 "tool_name": "browse",
771 "progress": "step 1"
772 }
773 }"#;
774 let event: KernelEvent = serde_json::from_str(legacy_json).expect("deserialize legacy");
775 match &event {
776 KernelEvent::ToolExecutionProgress {
777 session_id,
778 tool_call_id,
779 tool_name,
780 progress,
781 tab_id,
782 ..
783 } => {
784 assert_eq!(session_id, "s-old");
785 assert_eq!(tool_call_id, "call_legacy");
786 assert_eq!(tool_name, "browse");
787 assert_eq!(progress, "step 1");
788 assert!(tab_id.is_none(), "missing field should default to None");
789 }
790 other => panic!("expected ToolExecutionProgress, got {other:?}"),
791 }
792 let json = serde_json::to_string(&event).expect("serialize");
795 assert!(
796 !json.contains("tab_id"),
797 "tab_id should be omitted when None: {json}"
798 );
799 }
800
801 #[test]
805 fn test_rfc015_extract_agent_id() {
806 let event = KernelEvent::ToolExecutionStarted {
807 session_id: "abc-123".into(),
808 tool_name: "bash".into(),
809 tool_call_id: "c1".into(),
810 tool_args: serde_json::Value::Null,
811 context: None,
812 };
813 let action = kernel_event_to_audit_action(&event);
816 match action {
817 AuditAction::Other { detail } => {
818 assert!(
819 detail.contains("bash"),
820 "tool name in audit detail: {detail}"
821 );
822 }
823 other => panic!("expected Other, got {other:?}"),
824 }
825 }
826}