Skip to main content

oxios_kernel/
event_bus.rs

1//! Event bus: inter-agent communication via `oxi_sdk::EventBus<KernelEvent>`.
2//!
3//! The event bus is the "pipe" of Oxios. All agents communicate
4//! through kernel events published on the bus.
5//!
6//! After RFC-014 Phase C, this module no longer owns the broadcast channel —
7//! it reuses `oxi_sdk::EventBus<E>`, which is a generic wrapper over
8//! `tokio::sync::broadcast`. The only Oxios-specific bits are:
9//!
10//! - `KernelEvent` enum (oxios-internal event vocabulary)
11//! - `kernel_event_to_audit_action` mapping for the audit trail
12//! - `attach_audit_trail` helper (subscribes the bus to the trail)
13
14use oxi_sdk::EventBus as SdkEventBus;
15use oxi_sdk::observability::{AuditAction, AuditTrail};
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use uuid::Uuid;
19
20use crate::types::AgentId;
21
22/// Kernel event bus — generic SDK bus specialised for `KernelEvent`.
23///
24/// The broadcast channel is owned by `oxi_sdk::EventBus`; this type alias
25/// just makes the call sites read more naturally (`crate::event_bus::EventBus`
26/// instead of `oxi_sdk::EventBus<KernelEvent>`).
27pub type EventBus = SdkEventBus<KernelEvent>;
28
29/// Events that flow through the kernel event bus.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum KernelEvent {
32    /// A new agent has been created.
33    AgentCreated {
34        /// The new agent's ID.
35        id: AgentId,
36        /// The agent's name/goal.
37        name: String,
38    },
39    /// An agent has started executing.
40    AgentStarted {
41        /// The agent's ID.
42        id: AgentId,
43    },
44    /// An agent has been stopped.
45    ///
46    /// Carries `success` so consumers can distinguish a normal completion
47    /// (`success: true`) from an evaluation/assessment failure
48    /// (`success: false`). Infrastructure errors (panic, timeout) emit
49    /// `AgentFailed` instead.
50    AgentStopped {
51        /// The agent's ID.
52        id: AgentId,
53        /// Whether the agent's result passed evaluation. Mirrors
54        /// `ExecutionResult.success` from the Ok path; `false` on the
55        /// kill/terminate path (user-initiated stop).
56        #[serde(default)]
57        success: bool,
58    },
59    /// An agent has encountered a failure.
60    AgentFailed {
61        /// The agent's ID.
62        id: AgentId,
63        /// Description of the error.
64        error: String,
65    },
66    /// A message has been received from an agent.
67    MessageReceived {
68        /// The sending agent's ID.
69        from: AgentId,
70        /// Message content.
71        content: String,
72    },
73    /// A new seed has been created.
74    SeedCreated {
75        /// The seed's ID.
76        seed_id: uuid::Uuid,
77    },
78    /// An evaluation has completed.
79    EvaluationComplete {
80        /// The seed that was evaluated.
81        seed_id: uuid::Uuid,
82        /// Whether the evaluation passed.
83        passed: bool,
84    },
85    /// An Ouroboros phase has started.
86    PhaseStarted {
87        /// The session this phase belongs to.
88        session_id: String,
89        /// The phase that started.
90        phase: oxios_ouroboros::Phase,
91    },
92    /// An Ouroboros phase has completed.
93    PhaseCompleted {
94        /// The session this phase belongs to.
95        session_id: String,
96        /// The phase that completed.
97        phase: oxios_ouroboros::Phase,
98        /// A brief summary of the result.
99        result_summary: String,
100    },
101    /// An agent has produced output.
102    AgentOutput {
103        /// The session this output belongs to.
104        session_id: String,
105        /// The agent's ID.
106        agent_id: AgentId,
107        /// The output content.
108        output: String,
109    },
110    /// A HitL approval request has been submitted.
111    ApprovalRequested {
112        /// The approval request ID.
113        id: uuid::Uuid,
114        /// The tool requesting approval.
115        tool_name: String,
116        /// The action requiring approval.
117        action: String,
118        /// The resource involved.
119        resource: String,
120        /// Reason for the request.
121        reason: String,
122        /// The session ID that triggered this request.
123        session_id: Option<String>,
124    },
125    /// A HitL approval has been resolved (approved or rejected).
126    ApprovalResolved {
127        /// The approval request ID.
128        id: uuid::Uuid,
129        /// Whether it was approved (true) or rejected (false).
130        approved: bool,
131    },
132    /// A memory entry was stored.
133    MemoryStored {
134        /// Memory entry ID.
135        id: String,
136        /// Memory type label.
137        memory_type: String,
138        /// Source of the memory.
139        source: String,
140    },
141    /// Memories were recalled for a new session.
142    MemoryRecalled {
143        /// The recall query.
144        query: String,
145        /// Number of memories returned.
146        count: usize,
147    },
148    /// Multi-agent group created.
149    AgentGroupCreated {
150        /// The group's ID.
151        group_id: uuid::Uuid,
152        /// Number of agents in the group.
153        agent_count: usize,
154    },
155    /// An agent in a group completed.
156    AgentGroupMemberCompleted {
157        /// The group's ID.
158        group_id: uuid::Uuid,
159        /// The agent's ID.
160        agent_id: uuid::Uuid,
161        /// Whether the agent succeeded.
162        success: bool,
163    },
164    /// A new Project has been created (RFC-011).
165    ProjectCreated {
166        /// The project's ID.
167        project_id: uuid::Uuid,
168        /// The project's name.
169        name: String,
170        /// How it was created.
171        source: String,
172    },
173    /// A Project has been activated (RFC-011).
174    ProjectActivated {
175        /// The project's ID.
176        project_id: uuid::Uuid,
177        /// The project's name.
178        name: String,
179    },
180    /// Evolution has started (evaluate → evolve → re-execute loop).
181    EvolutionStarted {
182        /// Seed ID before evolution.
183        seed_id: uuid::Uuid,
184        /// Seed ID after evolution.
185        new_seed_id: uuid::Uuid,
186        /// Current iteration (0-based).
187        iteration: u32,
188    },
189    /// Evolution loop reached max iterations.
190    EvolutionMaxReached {
191        /// The final seed ID.
192        seed_id: uuid::Uuid,
193        /// Final evaluation score.
194        final_score: f64,
195        /// Number of iterations completed.
196        iterations: u32,
197    },
198
199    // ── RFC-015 Chat Transparency ─────────────────────────────
200    // Real-time events emitted by AgentRuntime during tool execution
201    // and streaming. Web channel converts these to WS chunks.
202    /// A tool execution has started (real-time, RFC-015).
203    ToolExecutionStarted {
204        /// Session this tool call belongs to.
205        session_id: String,
206        /// Name of the tool (e.g. "read_file", "bash", "memory_recall").
207        tool_name: String,
208        /// Provider-specific tool call ID used to correlate start/end.
209        tool_call_id: String,
210        /// Tool input arguments (JSON).
211        tool_args: serde_json::Value,
212        /// Semantic context inferred by oxi-agent 0.32+ from tool name/args
213        /// (e.g. WebSearch, PageVisit). `None` for tools without context mapping.
214        #[serde(default, skip_serializing_if = "Option::is_none")]
215        context: Option<serde_json::Value>,
216    },
217    /// A tool execution has finished (real-time, RFC-015).
218    ToolExecutionFinished {
219        /// Session this tool call belongs to.
220        session_id: String,
221        /// Provider-specific tool call ID.
222        tool_call_id: String,
223        /// Name of the tool.
224        tool_name: String,
225        /// Wall-clock duration in milliseconds.
226        duration_ms: u64,
227        /// Whether the tool returned an error.
228        is_error: bool,
229        /// Truncated output (max ~500 chars) for streaming.
230        output_summary: String,
231    },
232    /// A tool execution emitted a progress update (real-time, RFC-015).
233    ToolExecutionProgress {
234        /// Session this tool call belongs to.
235        session_id: String,
236        /// Provider-specific tool call ID.
237        tool_call_id: String,
238        /// Name of the tool.
239        tool_name: String,
240        /// Human-readable progress text (already-formatted by the tool).
241        progress: String,
242        /// Tab that emitted this progress event, if the upstream tool tracks
243        /// tabs. `None` for tools that don't have a tab concept (e.g. legacy
244        /// oxi-agent versions that don't propagate `tab_id`).
245        #[serde(default, skip_serializing_if = "Option::is_none")]
246        tab_id: Option<Uuid>,
247        /// Semantic context from the tool call (e.g. PageVisit, WebSearch).
248        /// Stored as `serde_json::Value` to decouple kernel events from
249        /// oxi-sdk's internal `ToolCallContext` enum. UI consumers that
250        /// understand a context variant render it richly; older consumers
251        /// simply ignore the field.
252        #[serde(default, skip_serializing_if = "Option::is_none")]
253        context: Option<serde_json::Value>,
254    },
255    /// Memory was recalled during agent execution (RFC-015).
256    MemoryRecallUsed {
257        /// Session this recall belongs to.
258        session_id: String,
259        /// The recall query.
260        query: String,
261        /// Number of memories returned.
262        count: usize,
263        /// Memory tier source ("hot" | "warm" | "cold").
264        source: String,
265    },
266    /// Token usage update (RFC-015).
267    TokenUsageUpdate {
268        /// Session this usage belongs to.
269        session_id: String,
270        /// Cumulative input tokens.
271        input_tokens: u64,
272        /// Cumulative output tokens.
273        output_tokens: u64,
274    },
275    /// Reasoning/compaction fragment (RFC-015).
276    ReasoningFragment {
277        /// Session this fragment belongs to.
278        session_id: String,
279        /// The fragment text (chain-of-thought, compaction summary, etc).
280        content: String,
281        /// Source label: "chain_of_thought" | "compaction" | "reflection".
282        source: String,
283    },
284
285    // ── Calendar ──────────────────────────────────────────────
286    /// A calendar event was created.
287    CalendarEventCreated {
288        /// Event UID.
289        uid: String,
290        /// Event title.
291        title: String,
292        /// Start time.
293        start: String,
294        /// End time.
295        end: String,
296    },
297    /// A calendar event was updated.
298    CalendarEventUpdated {
299        /// Event UID.
300        uid: String,
301        /// Event title.
302        title: String,
303    },
304    /// A calendar event was deleted.
305    CalendarEventDeleted {
306        /// Event UID.
307        uid: String,
308        /// Event title.
309        title: String,
310    },
311    /// An email has been sent.
312    EmailSent {
313        /// Email subject.
314        subject: String,
315        /// SMTP message ID.
316        message_id: String,
317        /// Template name (if template was used/saved).
318        #[serde(default, skip_serializing_if = "Option::is_none")]
319        template_name: Option<String>,
320    },
321
322    // ── Knowledge ──────────────────────────────────────────────
323    /// A knowledge note was persisted (hook, user, or tool).
324    KnowledgePersisted {
325        session_id: String,
326        message_index: usize,
327        path: String,
328        source: String, // "hook", "user", "tool"
329    },
330    /// A knowledge note was removed by user action.
331    KnowledgeRemoved {
332        session_id: String,
333        message_index: usize,
334    },
335    /// A question was posed to the user by the agent (RFC-027, `ask_user`).
336    /// The frontend renders an input/option picker and resolves the
337    /// pending oneshot via a separate response endpoint.
338    AskUserRequest {
339        /// Unique request ID — used by the response handler to resolve
340        /// the oneshot the tool is awaiting.
341        id: String,
342        /// The question text the user sees.
343        question: String,
344        /// Optional structured options. Empty when the question is open-ended.
345        options: Vec<String>,
346    },
347}
348
349/// Convert a KernelEvent to an AuditAction for the audit trail.
350pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
351    match event {
352        KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
353            task_type: name.clone(),
354        },
355        KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
356            task_type: "started".to_string(),
357        },
358        KernelEvent::AgentStopped { success, .. } => AuditAction::AgentExit {
359            reason: if *success {
360                "completed".to_string()
361            } else {
362                "stopped".to_string()
363            },
364        },
365        KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
366            reason: error.clone(),
367        },
368        KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
369            detail: format!("message: {content}"),
370        },
371        KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
372            detail: format!("seed_created:{seed_id}"),
373        },
374        KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
375            detail: format!("evaluation:{seed_id}:{passed}"),
376        },
377        KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
378            detail: format!("phase_started:{session_id}:{phase}"),
379        },
380        KernelEvent::PhaseCompleted {
381            session_id,
382            phase,
383            result_summary,
384        } => AuditAction::Other {
385            detail: format!("phase_completed:{session_id}:{phase}:{result_summary}"),
386        },
387        KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
388            detail: format!("agent_output:{output}"),
389        },
390        KernelEvent::ApprovalRequested {
391            id,
392            action,
393            resource,
394            ..
395        } => AuditAction::Other {
396            detail: format!("approval_requested:{id}:{action}:{resource}"),
397        },
398        KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
399            detail: format!("approval_resolved:{id}:{approved}"),
400        },
401        KernelEvent::MemoryStored {
402            id, memory_type, ..
403        } => AuditAction::MemoryWrite {
404            entry_id: format!("{id}:{memory_type}"),
405        },
406        KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
407            entry_id: format!("query:{query}:{count}results"),
408        },
409        KernelEvent::AgentGroupCreated {
410            group_id,
411            agent_count,
412        } => AuditAction::Other {
413            detail: format!("group_created:{group_id}:{agent_count}agents"),
414        },
415        KernelEvent::AgentGroupMemberCompleted {
416            group_id,
417            agent_id,
418            success,
419        } => AuditAction::Other {
420            detail: format!("group_member_completed:{group_id}:{agent_id}:{success}"),
421        },
422        KernelEvent::EvolutionStarted {
423            seed_id,
424            new_seed_id,
425            iteration,
426        } => AuditAction::Other {
427            detail: format!("evolution:{seed_id}->{new_seed_id}:iter{iteration}"),
428        },
429        KernelEvent::EvolutionMaxReached {
430            seed_id,
431            final_score,
432            iterations,
433        } => AuditAction::Other {
434            detail: format!("evolution_max:{seed_id}:score={final_score}:iters={iterations}"),
435        },
436        KernelEvent::ProjectCreated {
437            project_id: _,
438            name,
439            source,
440        } => AuditAction::Other {
441            detail: format!("project_created:{name}:{source}"),
442        },
443        KernelEvent::ProjectActivated {
444            project_id: _,
445            name,
446        } => AuditAction::Other {
447            detail: format!("project_activated:{name}"),
448        },
449        // ── RFC-015 ──
450        KernelEvent::ToolExecutionStarted { tool_name, .. } => AuditAction::Other {
451            detail: format!("tool_started:{tool_name}"),
452        },
453        KernelEvent::ToolExecutionFinished {
454            tool_name,
455            is_error,
456            ..
457        } => AuditAction::Other {
458            detail: format!(
459                "tool_finished:{tool_name}:{}",
460                if *is_error { "error" } else { "ok" }
461            ),
462        },
463        KernelEvent::ToolExecutionProgress {
464            tool_name,
465            tab_id,
466            context,
467            ..
468        } => AuditAction::Other {
469            detail: {
470                let mut d = format!("tool_progress:{tool_name}");
471                if let Some(id) = tab_id {
472                    d.push_str(&format!(":tab={id}"));
473                }
474                if let Some(ctx) = context
475                    .as_ref()
476                    .and_then(|c| c.get("kind"))
477                    .and_then(|k| k.as_str())
478                {
479                    d.push_str(&format!(":{ctx}"));
480                }
481                d
482            },
483        },
484        KernelEvent::MemoryRecallUsed { query, count, .. } => AuditAction::MemoryRead {
485            entry_id: format!("recall:{query}:{count}results"),
486        },
487        KernelEvent::TokenUsageUpdate {
488            input_tokens,
489            output_tokens,
490            ..
491        } => AuditAction::Other {
492            detail: format!("tokens:in={input_tokens}:out={output_tokens}"),
493        },
494        KernelEvent::ReasoningFragment { source, .. } => AuditAction::Other {
495            detail: format!("reasoning:{source}"),
496        },
497        KernelEvent::CalendarEventCreated { uid, title, .. } => AuditAction::Other {
498            detail: format!("calendar:created:{uid}:{title}"),
499        },
500        KernelEvent::CalendarEventUpdated { uid, title } => AuditAction::Other {
501            detail: format!("calendar:updated:{uid}:{title}"),
502        },
503        KernelEvent::CalendarEventDeleted { uid, title } => AuditAction::Other {
504            detail: format!("calendar:deleted:{uid}:{title}"),
505        },
506        KernelEvent::EmailSent {
507            subject,
508            message_id,
509            template_name,
510        } => AuditAction::Other {
511            detail: format!("email:sent:{subject} (msg={message_id}, tpl={template_name:?})"),
512        },
513        KernelEvent::KnowledgePersisted {
514            session_id,
515            message_index,
516            path,
517            source,
518        } => AuditAction::Other {
519            detail: format!("knowledge:persisted:{session_id}:{message_index}:{path}:{source}"),
520        },
521        KernelEvent::KnowledgeRemoved {
522            session_id,
523            message_index,
524        } => AuditAction::Other {
525            detail: format!("knowledge:removed:{session_id}:{message_index}"),
526        },
527        KernelEvent::AskUserRequest { id, question, .. } => AuditAction::Other {
528            detail: format!("ask_user:{id}:{question}"),
529        },
530    }
531}
532
533/// Extract agent ID from a KernelEvent variant.
534fn extract_agent_id(event: &KernelEvent) -> String {
535    match event {
536        KernelEvent::AgentCreated { id, .. } => id.to_string(),
537        KernelEvent::AgentStarted { id, .. } => id.to_string(),
538        KernelEvent::AgentStopped { id, .. } => id.to_string(),
539        KernelEvent::AgentFailed { id, .. } => id.to_string(),
540        KernelEvent::MessageReceived { from, .. } => from.to_string(),
541        KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
542        KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
543        KernelEvent::ProjectActivated { project_id, .. } => format!("project:{project_id}"),
544        // RFC-015: session-scoped events use session_id as the subject
545        KernelEvent::ToolExecutionStarted { session_id, .. } => format!("session:{session_id}"),
546        KernelEvent::ToolExecutionFinished { session_id, .. } => format!("session:{session_id}"),
547        KernelEvent::ToolExecutionProgress { session_id, .. } => format!("session:{session_id}"),
548        KernelEvent::MemoryRecallUsed { session_id, .. } => format!("session:{session_id}"),
549        KernelEvent::TokenUsageUpdate { session_id, .. } => format!("session:{session_id}"),
550        KernelEvent::ReasoningFragment { session_id, .. } => format!("session:{session_id}"),
551        KernelEvent::KnowledgePersisted { session_id, .. } => format!("session:{session_id}"),
552        KernelEvent::KnowledgeRemoved { session_id, .. } => format!("session:{session_id}"),
553        _ => "system".to_string(),
554    }
555}
556
557/// Subscribe the audit trail to all kernel events.
558///
559/// The bus is broadcast-based; this spawns a long-running task that
560/// forwards every event into the audit trail as a structured entry.
561/// Lagged subscribers are logged and recovered.
562pub fn attach_audit_trail(bus: &EventBus, audit: Arc<AuditTrail>) {
563    let mut rx = bus.subscribe();
564    tokio::spawn(async move {
565        loop {
566            match rx.recv().await {
567                Ok(event) => {
568                    let actor = extract_agent_id(&event);
569                    let action = kernel_event_to_audit_action(&event);
570                    let resource = format!("{event:?}");
571                    audit.append(actor, action, resource);
572                }
573                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
574                    // Surface the drop as a metric so operators can detect
575                    // incomplete audit trails instead of the events
576                    // vanishing silently (state-area F4).
577                    crate::metrics::get_metrics().audit_lagged_events.inc_by(n);
578                    tracing::warn!(
579                        skipped = n,
580                        "Audit trail subscriber lagged, skipping events"
581                    );
582                    continue;
583                }
584                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
585                    tracing::info!("Audit trail event bus closed, exiting");
586                    break;
587                }
588            }
589        }
590    });
591}
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596
597    fn sample_event(name: &str) -> KernelEvent {
598        KernelEvent::AgentCreated {
599            id: AgentId::new_v4(),
600            name: name.to_string(),
601        }
602    }
603
604    #[test]
605    fn test_event_bus_uses_sdk() {
606        let bus: EventBus = EventBus::new(256);
607        assert!(format!("{:?}", bus).contains("EventBus"));
608    }
609
610    #[tokio::test]
611    async fn test_publish_no_subscribers_ok() {
612        let bus = EventBus::new(16);
613        let result = bus.publish(sample_event("orphan"));
614        assert!(result.is_ok());
615    }
616
617    #[tokio::test]
618    async fn test_single_subscriber_receives_event() {
619        let bus = EventBus::new(16);
620        let mut rx = bus.subscribe();
621
622        let event = sample_event("test-agent");
623        bus.publish(event.clone()).unwrap();
624
625        let received = rx.try_recv().expect("should receive event");
626        match received {
627            KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "test-agent"),
628            _ => panic!("wrong event type"),
629        }
630    }
631
632    #[tokio::test]
633    async fn test_multiple_subscribers_receive_events() {
634        let bus = EventBus::new(16);
635        let mut rx1 = bus.subscribe();
636        let mut rx2 = bus.subscribe();
637
638        let event = sample_event("multi");
639        bus.publish(event.clone()).unwrap();
640
641        let r1 = rx1.try_recv().expect("rx1 should receive event");
642        let r2 = rx2.try_recv().expect("rx2 should receive event");
643
644        assert!(matches!(r1, KernelEvent::AgentCreated { .. }));
645        assert!(matches!(r2, KernelEvent::AgentCreated { .. }));
646    }
647
648    #[tokio::test]
649    async fn test_kernel_event_to_audit_action() {
650        let event = KernelEvent::AgentFailed {
651            id: AgentId::new_v4(),
652            error: "boom".to_string(),
653        };
654        let action = kernel_event_to_audit_action(&event);
655        match action {
656            AuditAction::AgentExit { reason } => assert_eq!(reason, "boom"),
657            other => panic!("expected AgentExit, got {other:?}"),
658        }
659    }
660
661    // ── RFC-015 chat transparency event coverage ──
662
663    /// Round-trip JSON serialization for every new RFC-015 variant. This
664    /// guards against accidental renames that would break the WebSocket
665    /// wire format on the frontend.
666    #[test]
667    fn test_rfc015_event_round_trip_json() {
668        let cases: Vec<KernelEvent> = vec![
669            KernelEvent::ToolExecutionStarted {
670                session_id: "s1".into(),
671                tool_name: "read_file".into(),
672                tool_call_id: "call_1".into(),
673                tool_args: serde_json::json!({"path": "/src/main.rs"}),
674                context: None,
675            },
676            KernelEvent::ToolExecutionFinished {
677                session_id: "s1".into(),
678                tool_call_id: "call_1".into(),
679                tool_name: "read_file".into(),
680                duration_ms: 234,
681                is_error: false,
682                output_summary: "fn main() {}".into(),
683            },
684            KernelEvent::ToolExecutionProgress {
685                session_id: "s1".into(),
686                tool_call_id: "call_1".into(),
687                tool_name: "read_file".into(),
688                progress: "reading line 42/100".into(),
689                tab_id: None,
690                context: None,
691            },
692            KernelEvent::MemoryRecallUsed {
693                session_id: "s1".into(),
694                query: "rust errors".into(),
695                count: 3,
696                source: "warm".into(),
697            },
698            KernelEvent::TokenUsageUpdate {
699                session_id: "s1".into(),
700                input_tokens: 1234,
701                output_tokens: 567,
702            },
703            KernelEvent::ReasoningFragment {
704                session_id: "s1".into(),
705                content: "compaction done".into(),
706                source: "compaction".into(),
707            },
708        ];
709        for event in cases {
710            let json = serde_json::to_string(&event).expect("serialize");
711            let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
712            let json2 = serde_json::to_string(&back).expect("serialize round-trip");
713            assert_eq!(json, json2, "round-trip should be stable");
714        }
715    }
716
717    /// Tool progress events serialize/deserialize cleanly and round-trip
718    /// stable JSON, matching the wire format the WS layer expects.
719    #[test]
720    fn test_tool_execution_progress_serde_round_trip() {
721        let event = KernelEvent::ToolExecutionProgress {
722            session_id: "s-abc".into(),
723            tool_call_id: "call_42".into(),
724            tool_name: "browse".into(),
725            progress: "loading https://example.com".into(),
726            tab_id: Some(Uuid::new_v4()),
727            context: None,
728        };
729        let json = serde_json::to_string(&event).expect("serialize");
730        let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
731        match back {
732            KernelEvent::ToolExecutionProgress {
733                ref session_id,
734                ref tool_call_id,
735                ref tool_name,
736                ref progress,
737                tab_id,
738                ..
739            } => {
740                assert_eq!(session_id, "s-abc");
741                assert_eq!(tool_call_id, "call_42");
742                assert_eq!(tool_name, "browse");
743                assert_eq!(progress, "loading https://example.com");
744                assert!(tab_id.is_some(), "tab_id should round-trip when present");
745            }
746            other => panic!("expected ToolExecutionProgress, got {other:?}"),
747        }
748    }
749
750    /// The audit-action mapping for tool progress should produce a stable,
751    /// searchable detail string (used by the audit-trail UI to filter).
752    /// When `tab_id` is set, the detail includes `:tab=<id>`; when absent,
753    /// the original `tool_progress:<tool>` form is preserved (back-compat
754    /// for older oxi-agent versions that don't propagate tabs).
755    #[test]
756    fn test_tool_execution_progress_audit_action() {
757        let with_tab = KernelEvent::ToolExecutionProgress {
758            session_id: "s1".into(),
759            tool_call_id: "c1".into(),
760            tool_name: "browse".into(),
761            progress: "navigating".into(),
762            tab_id: Some(Uuid::new_v4()),
763            context: None,
764        };
765        match kernel_event_to_audit_action(&with_tab) {
766            AuditAction::Other { detail } => {
767                assert!(detail.contains("tool_progress"), "detail: {detail}");
768                assert!(detail.contains("browse"), "detail: {detail}");
769                assert!(
770                    detail.contains(":tab="),
771                    "detail should include tab id: {detail}"
772                );
773            }
774            other => panic!("expected Other, got {other:?}"),
775        }
776        let without_tab = KernelEvent::ToolExecutionProgress {
777            session_id: "s1".into(),
778            tool_call_id: "c1".into(),
779            tool_name: "browse".into(),
780            progress: "navigating".into(),
781            tab_id: None,
782            context: None,
783        };
784        match kernel_event_to_audit_action(&without_tab) {
785            AuditAction::Other { detail } => {
786                assert_eq!(detail, "tool_progress:browse");
787            }
788            other => panic!("expected Other, got {other:?}"),
789        }
790    }
791
792    /// `tab_id` is optional in serde (`#[serde(default)]`) so older oxi-agent
793    /// versions that don't emit it still round-trip cleanly. This guards the
794    /// backwards-compat contract explicitly.
795    #[test]
796    fn test_tool_execution_progress_tab_id_optional_in_serde() {
797        // Simulate a payload from a legacy oxi-agent (no tab_id key).
798        // KernelEvent is externally tagged, so the variant is the JSON key.
799        let legacy_json = r#"{
800            "ToolExecutionProgress": {
801                "session_id": "s-old",
802                "tool_call_id": "call_legacy",
803                "tool_name": "browse",
804                "progress": "step 1"
805            }
806        }"#;
807        let event: KernelEvent = serde_json::from_str(legacy_json).expect("deserialize legacy");
808        match &event {
809            KernelEvent::ToolExecutionProgress {
810                session_id,
811                tool_call_id,
812                tool_name,
813                progress,
814                tab_id,
815                ..
816            } => {
817                assert_eq!(session_id, "s-old");
818                assert_eq!(tool_call_id, "call_legacy");
819                assert_eq!(tool_name, "browse");
820                assert_eq!(progress, "step 1");
821                assert!(tab_id.is_none(), "missing field should default to None");
822            }
823            other => panic!("expected ToolExecutionProgress, got {other:?}"),
824        }
825        // And re-serialise — `skip_serializing_if = "Option::is_none"` keeps
826        // the wire format clean when downstream tools don't set tab_id.
827        let json = serde_json::to_string(&event).expect("serialize");
828        assert!(
829            !json.contains("tab_id"),
830            "tab_id should be omitted when None: {json}"
831        );
832    }
833
834    /// The agent_id extractor should map session-scoped RFC-015 events to
835    /// `session:<id>` for audit-trail grouping, while non-session events
836    /// keep their existing behaviour.
837    #[test]
838    fn test_rfc015_extract_agent_id() {
839        let event = KernelEvent::ToolExecutionStarted {
840            session_id: "abc-123".into(),
841            tool_name: "bash".into(),
842            tool_call_id: "c1".into(),
843            tool_args: serde_json::Value::Null,
844            context: None,
845        };
846        // The function is private; verify via the public AuditAction mapping
847        // that session-scoped events do not collide with real agent ids.
848        let action = kernel_event_to_audit_action(&event);
849        match action {
850            AuditAction::Other { detail } => {
851                assert!(
852                    detail.contains("bash"),
853                    "tool name in audit detail: {detail}"
854                );
855            }
856            other => panic!("expected Other, got {other:?}"),
857        }
858    }
859}