Skip to main content

oxios_kernel/
event_bus.rs

1//! Event bus: inter-agent communication via tokio broadcast channels.
2//!
3//! The event bus is the "pipe" of Oxios. All agents communicate
4//! through kernel events published on the bus.
5
6use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::sync::broadcast;
10
11use crate::audit_trail::{AuditAction, AuditTrail};
12use crate::types::AgentId;
13
14/// Events that flow through the kernel event bus.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub enum KernelEvent {
17    /// A new agent has been created.
18    AgentCreated {
19        /// The new agent's ID.
20        id: AgentId,
21        /// The agent's name/goal.
22        name: String,
23    },
24    /// An agent has started executing.
25    AgentStarted {
26        /// The agent's ID.
27        id: AgentId,
28    },
29    /// An agent has been stopped.
30    AgentStopped {
31        /// The agent's ID.
32        id: AgentId,
33    },
34    /// An agent has encountered a failure.
35    AgentFailed {
36        /// The agent's ID.
37        id: AgentId,
38        /// Description of the error.
39        error: String,
40    },
41    /// A message has been received from an agent.
42    MessageReceived {
43        /// The sending agent's ID.
44        from: AgentId,
45        /// Message content.
46        content: String,
47    },
48    /// A new seed has been created.
49    SeedCreated {
50        /// The seed's ID.
51        seed_id: uuid::Uuid,
52    },
53    /// An evaluation has completed.
54    EvaluationComplete {
55        /// The seed that was evaluated.
56        seed_id: uuid::Uuid,
57        /// Whether the evaluation passed.
58        passed: bool,
59    },
60    /// An Ouroboros phase has started.
61    PhaseStarted {
62        /// The session this phase belongs to.
63        session_id: String,
64        /// The phase that started.
65        phase: oxios_ouroboros::Phase,
66    },
67    /// An Ouroboros phase has completed.
68    PhaseCompleted {
69        /// The session this phase belongs to.
70        session_id: String,
71        /// The phase that completed.
72        phase: oxios_ouroboros::Phase,
73        /// A brief summary of the result.
74        result_summary: String,
75    },
76    /// An agent has produced output.
77    AgentOutput {
78        /// The session this output belongs to.
79        session_id: String,
80        /// The agent's ID.
81        agent_id: AgentId,
82        /// The output content.
83        output: String,
84    },
85    /// A HitL approval request has been submitted.
86    ApprovalRequested {
87        /// The approval request ID.
88        id: uuid::Uuid,
89        /// The action requiring approval.
90        action: String,
91        /// The resource involved.
92        resource: String,
93        /// Reason for the request.
94        reason: String,
95    },
96    /// A HitL approval has been resolved (approved or rejected).
97    ApprovalResolved {
98        /// The approval request ID.
99        id: uuid::Uuid,
100        /// Whether it was approved (true) or rejected (false).
101        approved: bool,
102    },
103    /// A memory entry was stored.
104    MemoryStored {
105        /// Memory entry ID.
106        id: String,
107        /// Memory type label.
108        memory_type: String,
109        /// Source of the memory.
110        source: String,
111    },
112    /// Memories were recalled for a new session.
113    MemoryRecalled {
114        /// The recall query.
115        query: String,
116        /// Number of memories returned.
117        count: usize,
118    },
119    /// Multi-agent group created.
120    AgentGroupCreated {
121        /// The group's ID.
122        group_id: uuid::Uuid,
123        /// Number of agents in the group.
124        agent_count: usize,
125    },
126    /// An agent in a group completed.
127    AgentGroupMemberCompleted {
128        /// The group's ID.
129        group_id: uuid::Uuid,
130        /// The agent's ID.
131        agent_id: uuid::Uuid,
132        /// Whether the agent succeeded.
133        success: bool,
134    },
135    /// A new Space has been created.
136    SpaceCreated {
137        /// The Space's ID.
138        space_id: uuid::Uuid,
139        /// The Space's name.
140        name: String,
141        /// How it was created (auto_resource, auto_topic, manual).
142        source: String,
143    },
144    /// Active Space has changed.
145    SpaceActivated {
146        /// The Space's ID.
147        space_id: uuid::Uuid,
148        /// The Space's name.
149        name: String,
150    },
151    /// A Space has been archived.
152    SpaceArchived {
153        /// The Space's ID.
154        space_id: uuid::Uuid,
155        /// The Space's name.
156        name: String,
157    },
158    /// Cross-Space knowledge was accessed.
159    KnowledgeCrossReferenced {
160        /// Source Space.
161        from_space: uuid::Uuid,
162        /// Target Space.
163        to_space: uuid::Uuid,
164        /// Number of entries accessed.
165        entries: usize,
166        /// Flow type (reference, transfer, synthesis).
167        flow: String,
168    },
169    /// Spaces have been merged.
170    SpacesMerged {
171        /// The surviving Space.
172        survivor: uuid::Uuid,
173        /// The absorbed Space.
174        absorbed: uuid::Uuid,
175        /// Number of entries migrated.
176        entries_migrated: usize,
177    },
178}
179
180/// Convert a KernelEvent to an AuditAction for the audit trail.
181pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
182    match event {
183        KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
184            task_type: name.clone(),
185        },
186        KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
187            task_type: "started".to_string(),
188        },
189        KernelEvent::AgentStopped { .. } => AuditAction::AgentExit {
190            reason: "stopped".to_string(),
191        },
192        KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
193            reason: error.clone(),
194        },
195        KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
196            detail: format!("message: {}", content),
197        },
198        KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
199            detail: format!("seed_created:{}", seed_id),
200        },
201        KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
202            detail: format!("evaluation:{}:{}", seed_id, passed),
203        },
204        KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
205            detail: format!("phase_started:{}:{}", session_id, phase),
206        },
207        KernelEvent::PhaseCompleted {
208            session_id,
209            phase,
210            result_summary,
211        } => AuditAction::Other {
212            detail: format!(
213                "phase_completed:{}:{}:{}",
214                session_id, phase, result_summary
215            ),
216        },
217        KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
218            detail: format!("agent_output:{}", output),
219        },
220        KernelEvent::ApprovalRequested {
221            id,
222            action,
223            resource,
224            reason: _,
225        } => AuditAction::Other {
226            detail: format!("approval_requested:{}:{}:{}", id, action, resource),
227        },
228        KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
229            detail: format!("approval_resolved:{}:{}", id, approved),
230        },
231        KernelEvent::MemoryStored {
232            id, memory_type, ..
233        } => AuditAction::MemoryWrite {
234            entry_id: format!("{}:{}", id, memory_type),
235        },
236        KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
237            entry_id: format!("query:{}:{}results", query, count),
238        },
239        KernelEvent::AgentGroupCreated {
240            group_id,
241            agent_count,
242        } => AuditAction::Other {
243            detail: format!("group_created:{}:{}agents", group_id, agent_count),
244        },
245        KernelEvent::AgentGroupMemberCompleted {
246            group_id,
247            agent_id,
248            success,
249        } => AuditAction::Other {
250            detail: format!(
251                "group_member_completed:{}:{}:{}",
252                group_id, agent_id, success
253            ),
254        },
255        KernelEvent::SpaceCreated {
256            space_id,
257            name,
258            source,
259        } => AuditAction::Other {
260            detail: format!("space_created:{}:{}:{}", space_id, name, source),
261        },
262        KernelEvent::SpaceActivated { space_id, name } => AuditAction::Other {
263            detail: format!("space_activated:{}:{}", space_id, name),
264        },
265        KernelEvent::SpaceArchived { space_id, name } => AuditAction::Other {
266            detail: format!("space_archived:{}:{}", space_id, name),
267        },
268        KernelEvent::KnowledgeCrossReferenced {
269            from_space,
270            to_space,
271            entries,
272            flow,
273        } => AuditAction::Other {
274            detail: format!(
275                "knowledge_xref:{}->{}:{}:{}entries",
276                from_space, to_space, flow, entries
277            ),
278        },
279        KernelEvent::SpacesMerged {
280            survivor,
281            absorbed,
282            entries_migrated,
283        } => AuditAction::Other {
284            detail: format!(
285                "spaces_merged:{}<-{}:{}entries",
286                survivor, absorbed, entries_migrated
287            ),
288        },
289    }
290}
291
292/// Extract agent ID from a KernelEvent variant.
293fn extract_agent_id(event: &KernelEvent) -> String {
294    match event {
295        KernelEvent::AgentCreated { id, .. } => id.to_string(),
296        KernelEvent::AgentStarted { id, .. } => id.to_string(),
297        KernelEvent::AgentStopped { id, .. } => id.to_string(),
298        KernelEvent::AgentFailed { id, .. } => id.to_string(),
299        KernelEvent::MessageReceived { from, .. } => from.to_string(),
300        KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
301        KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
302        KernelEvent::SpaceActivated { space_id, .. } => format!("space:{}", space_id),
303        _ => "system".to_string(),
304    }
305}
306
307/// A broadcast-based event bus for kernel events.
308///
309/// Subscribers receive all events published after they subscribe.
310/// Late subscribers do not receive historical events.
311#[derive(Clone)]
312pub struct EventBus {
313    sender: broadcast::Sender<KernelEvent>,
314}
315
316impl EventBus {
317    /// Creates a new event bus with the given broadcast capacity.
318    ///
319    /// # Example
320    ///
321    /// ```
322    /// use oxios_kernel::EventBus;
323    ///
324    /// let bus = EventBus::new(256);
325    /// let subscriber = bus.subscribe();
326    /// // Subscriber receives all events published after this point.
327    /// ```
328    pub fn new(capacity: usize) -> Self {
329        let (sender, _) = broadcast::channel(capacity);
330        Self { sender }
331    }
332
333    /// Subscribe to receive kernel events.
334    pub fn subscribe(&self) -> broadcast::Receiver<KernelEvent> {
335        self.sender.subscribe()
336    }
337
338    /// Publish a kernel event to all subscribers.
339    pub fn publish(&self, event: KernelEvent) -> Result<()> {
340        // It's okay if there are no subscribers.
341        let _ = self.sender.send(event);
342        Ok(())
343    }
344
345    /// Subscribe the audit trail to all kernel events.
346    /// This forwards all events to the audit trail as background tasks.
347    pub fn attach_audit_trail(&self, audit: Arc<AuditTrail>) {
348        let mut rx = self.subscribe();
349        tokio::spawn(async move {
350            while let Ok(event) = rx.recv().await {
351                let actor = extract_agent_id(&event);
352                let action = kernel_event_to_audit_action(&event);
353                let resource = format!("{:?}", event);
354                audit.append(actor, action, resource);
355            }
356        });
357    }
358}
359
360impl std::fmt::Debug for EventBus {
361    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362        f.debug_struct("EventBus").finish()
363    }
364}