Skip to main content

pulsehive_core/
event.rs

1//! Event system for real-time observability into agent execution.
2//!
3//! [`HiveEvent`] covers the full agent lifecycle: LLM calls, tool execution,
4//! substrate operations, and perception. [`EventEmitter`] provides a
5//! fire-and-forget broadcast mechanism for event distribution.
6//!
7//! Events are serializable to JSON for transmission to observability tools
8//! like PulseVision via [`EventExporter`](crate::export::EventExporter).
9//!
10//! Built on `tokio::sync::broadcast` for multi-consumer support.
11
12use pulsedb::{ExperienceId, InsightId, RelationId};
13use serde::{Deserialize, Serialize};
14use tokio::sync::broadcast;
15
16use crate::agent::{AgentKindTag, AgentOutcome};
17
18/// Returns the current time as epoch milliseconds.
19///
20/// Used by event emitters to timestamp events at creation time.
21pub fn now_ms() -> u64 {
22    std::time::SystemTime::now()
23        .duration_since(std::time::UNIX_EPOCH)
24        .unwrap_or_default()
25        .as_millis() as u64
26}
27
28/// Events emitted during agent execution.
29///
30/// Covers the full lifecycle: agent start/stop, LLM calls, tool execution,
31/// substrate operations, and perception. All variants include `timestamp_ms`
32/// (epoch milliseconds) and `agent_id` where applicable for correlation.
33///
34/// Serializes to tagged JSON: `{"type": "llm_call_completed", "agent_id": "...", ...}`
35///
36/// Must be `Clone` because [`EventEmitter`] uses `tokio::sync::broadcast`
37/// which requires cloneable values.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum HiveEvent {
41    // ── Agent lifecycle ──────────────────────────────────────────────
42    /// An agent has started execution.
43    AgentStarted {
44        timestamp_ms: u64,
45        agent_id: String,
46        name: String,
47        kind: AgentKindTag,
48    },
49    /// An agent has completed execution.
50    AgentCompleted {
51        timestamp_ms: u64,
52        agent_id: String,
53        outcome: AgentOutcome,
54    },
55
56    // ── LLM interactions ─────────────────────────────────────────────
57    /// An LLM call has been initiated.
58    LlmCallStarted {
59        timestamp_ms: u64,
60        agent_id: String,
61        model: String,
62        message_count: usize,
63    },
64    /// An LLM call has completed.
65    LlmCallCompleted {
66        timestamp_ms: u64,
67        agent_id: String,
68        model: String,
69        duration_ms: u64,
70        /// Prompt tokens consumed (0 if not reported by provider).
71        input_tokens: u32,
72        /// Completion tokens generated (0 if not reported by provider).
73        output_tokens: u32,
74    },
75    /// A token was received from a streaming LLM response.
76    LlmTokenStreamed {
77        timestamp_ms: u64,
78        agent_id: String,
79        token: String,
80    },
81
82    // ── Tool execution ───────────────────────────────────────────────
83    /// A tool call has started.
84    ToolCallStarted {
85        timestamp_ms: u64,
86        agent_id: String,
87        tool_name: String,
88        /// Tool arguments as a JSON string.
89        params: String,
90    },
91    /// A tool call has completed.
92    ToolCallCompleted {
93        timestamp_ms: u64,
94        agent_id: String,
95        tool_name: String,
96        duration_ms: u64,
97        /// Tool result preview (truncated to 200 chars).
98        result_preview: String,
99    },
100    /// A tool requires human approval before execution.
101    ToolApprovalRequested {
102        timestamp_ms: u64,
103        agent_id: String,
104        tool_name: String,
105        description: String,
106    },
107
108    // ── Substrate operations ─────────────────────────────────────────
109    /// An experience was recorded in the substrate.
110    ExperienceRecorded {
111        timestamp_ms: u64,
112        experience_id: ExperienceId,
113        agent_id: String,
114        /// First 200 characters of the experience content.
115        content_preview: String,
116        /// Experience type as a string (e.g., "Generic", "Solution").
117        experience_type: String,
118        /// Importance score (0.0-1.0).
119        importance: f32,
120    },
121    /// A relationship was inferred between experiences.
122    RelationshipInferred {
123        timestamp_ms: u64,
124        relation_id: RelationId,
125        /// Agent that triggered the inference.
126        agent_id: String,
127    },
128    /// An insight was synthesized from an experience cluster.
129    InsightGenerated {
130        timestamp_ms: u64,
131        insight_id: InsightId,
132        source_count: usize,
133        /// Agent that triggered the synthesis.
134        agent_id: String,
135    },
136
137    // ── Perception ───────────────────────────────────────────────────
138    /// An agent perceived the substrate through its lens.
139    SubstratePerceived {
140        timestamp_ms: u64,
141        agent_id: String,
142        experience_count: usize,
143        insight_count: usize,
144    },
145
146    // ── Embedding ─────────────────────────────────────────────────
147    /// An embedding was computed via the EmbeddingProvider.
148    EmbeddingComputed {
149        timestamp_ms: u64,
150        agent_id: String,
151        dimensions: usize,
152        duration_ms: u64,
153    },
154
155    // ── Watch system ───────────────────────────────────────────────
156    /// A real-time Watch event from the substrate.
157    ///
158    /// Emitted when experiences are created, updated, archived, or deleted
159    /// by other agents in the same collective. Forwarded from PulseDB's
160    /// Watch system into the HiveEvent stream.
161    WatchNotification {
162        timestamp_ms: u64,
163        experience_id: ExperienceId,
164        collective_id: pulsedb::CollectiveId,
165        /// The type of change: "Created", "Updated", "Archived", or "Deleted".
166        event_type: String,
167    },
168}
169
170/// Fire-and-forget event broadcaster.
171///
172/// Wraps `tokio::sync::broadcast` for multi-consumer event distribution.
173/// If no subscribers exist, emitted events are silently dropped.
174///
175/// `Clone` is cheap — it just clones the broadcast sender handle.
176#[derive(Clone)]
177pub struct EventEmitter {
178    sender: broadcast::Sender<HiveEvent>,
179}
180
181impl EventEmitter {
182    /// Creates a new emitter with the given channel capacity.
183    pub fn new(capacity: usize) -> Self {
184        let (sender, _) = broadcast::channel(capacity);
185        Self { sender }
186    }
187
188    /// Emits an event to all subscribers. Fire-and-forget — if no
189    /// subscribers exist, the event is silently dropped.
190    pub fn emit(&self, event: HiveEvent) {
191        let _ = self.sender.send(event);
192    }
193
194    /// Creates a new subscriber that receives all future events.
195    pub fn subscribe(&self) -> broadcast::Receiver<HiveEvent> {
196        self.sender.subscribe()
197    }
198}
199
200impl Default for EventEmitter {
201    fn default() -> Self {
202        Self::new(256)
203    }
204}
205
206impl std::fmt::Debug for EventEmitter {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        f.debug_struct("EventEmitter")
209            .field("subscriber_count", &self.sender.receiver_count())
210            .finish()
211    }
212}
213
214/// Type alias for event fan-out. In Sprint 1, this is just an EventEmitter.
215/// May evolve into a more sophisticated bus with filtering in later sprints.
216pub type EventBus = EventEmitter;
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn test_hive_event_is_debug_clone() {
224        let event = HiveEvent::AgentStarted {
225            timestamp_ms: now_ms(),
226            agent_id: "a1".into(),
227            name: "researcher".into(),
228            kind: AgentKindTag::Llm,
229        };
230        let cloned = event.clone();
231        let debug = format!("{:?}", cloned);
232        assert!(debug.contains("researcher"));
233    }
234
235    #[test]
236    fn test_hive_event_serializes_to_json() {
237        let event = HiveEvent::LlmCallCompleted {
238            timestamp_ms: 1711500000000,
239            agent_id: "agent-1".into(),
240            model: "gpt-4".into(),
241            duration_ms: 1500,
242            input_tokens: 200,
243            output_tokens: 50,
244        };
245        let json = serde_json::to_string(&event).unwrap();
246        assert!(json.contains("\"type\":\"llm_call_completed\""));
247        assert!(json.contains("\"input_tokens\":200"));
248        assert!(json.contains("\"output_tokens\":50"));
249
250        // Roundtrip
251        let deserialized: HiveEvent = serde_json::from_str(&json).unwrap();
252        assert!(matches!(
253            deserialized,
254            HiveEvent::LlmCallCompleted {
255                input_tokens: 200,
256                output_tokens: 50,
257                ..
258            }
259        ));
260    }
261
262    #[test]
263    fn test_hive_event_serialize_tool_call() {
264        let event = HiveEvent::ToolCallStarted {
265            timestamp_ms: now_ms(),
266            agent_id: "a1".into(),
267            tool_name: "search".into(),
268            params: r#"{"query":"test"}"#.into(),
269        };
270        let json = serde_json::to_string(&event).unwrap();
271        assert!(json.contains("\"params\""));
272        assert!(json.contains("\"tool_call_started\""));
273    }
274
275    #[tokio::test]
276    async fn test_event_emitter_send_receive() {
277        let emitter = EventEmitter::new(16);
278        let mut rx = emitter.subscribe();
279
280        emitter.emit(HiveEvent::AgentStarted {
281            timestamp_ms: now_ms(),
282            agent_id: "a1".into(),
283            name: "test".into(),
284            kind: AgentKindTag::Llm,
285        });
286
287        let event = rx.recv().await.unwrap();
288        assert!(matches!(event, HiveEvent::AgentStarted { agent_id, .. } if agent_id == "a1"));
289    }
290
291    #[tokio::test]
292    async fn test_event_emitter_multiple_subscribers() {
293        let emitter = EventEmitter::new(16);
294        let mut rx1 = emitter.subscribe();
295        let mut rx2 = emitter.subscribe();
296
297        emitter.emit(HiveEvent::ToolCallStarted {
298            timestamp_ms: now_ms(),
299            agent_id: "a1".into(),
300            tool_name: "search".into(),
301            params: "{}".into(),
302        });
303
304        let e1 = rx1.recv().await.unwrap();
305        let e2 = rx2.recv().await.unwrap();
306        assert!(matches!(e1, HiveEvent::ToolCallStarted { .. }));
307        assert!(matches!(e2, HiveEvent::ToolCallStarted { .. }));
308    }
309
310    #[test]
311    fn test_event_emitter_no_subscribers_no_panic() {
312        let emitter = EventEmitter::new(16);
313        emitter.emit(HiveEvent::ExperienceRecorded {
314            timestamp_ms: now_ms(),
315            experience_id: ExperienceId::new(),
316            agent_id: "a1".into(),
317            content_preview: "test".into(),
318            experience_type: "Generic".into(),
319            importance: 0.5,
320        });
321    }
322
323    #[test]
324    fn test_event_emitter_clone_is_cheap() {
325        let emitter = EventEmitter::default();
326        let cloned = emitter.clone();
327        let mut rx = cloned.subscribe();
328        emitter.emit(HiveEvent::SubstratePerceived {
329            timestamp_ms: now_ms(),
330            agent_id: "a1".into(),
331            experience_count: 10,
332            insight_count: 2,
333        });
334        assert!(rx.try_recv().is_ok());
335    }
336
337    #[test]
338    fn test_event_emitter_debug() {
339        let emitter = EventEmitter::default();
340        let debug = format!("{:?}", emitter);
341        assert!(debug.contains("EventEmitter"));
342    }
343
344    #[test]
345    fn test_all_event_variants_clone() {
346        let events: Vec<HiveEvent> = vec![
347            HiveEvent::AgentStarted {
348                timestamp_ms: 0,
349                agent_id: "a".into(),
350                name: "n".into(),
351                kind: AgentKindTag::Llm,
352            },
353            HiveEvent::AgentCompleted {
354                timestamp_ms: 0,
355                agent_id: "a".into(),
356                outcome: AgentOutcome::Complete {
357                    response: "done".into(),
358                },
359            },
360            HiveEvent::LlmCallStarted {
361                timestamp_ms: 0,
362                agent_id: "a".into(),
363                model: "gpt-4".into(),
364                message_count: 3,
365            },
366            HiveEvent::LlmCallCompleted {
367                timestamp_ms: 0,
368                agent_id: "a".into(),
369                model: "gpt-4".into(),
370                duration_ms: 1500,
371                input_tokens: 100,
372                output_tokens: 50,
373            },
374            HiveEvent::LlmTokenStreamed {
375                timestamp_ms: 0,
376                agent_id: "a".into(),
377                token: "hello".into(),
378            },
379            HiveEvent::ToolCallStarted {
380                timestamp_ms: 0,
381                agent_id: "a".into(),
382                tool_name: "search".into(),
383                params: "{}".into(),
384            },
385            HiveEvent::ToolCallCompleted {
386                timestamp_ms: 0,
387                agent_id: "a".into(),
388                tool_name: "search".into(),
389                duration_ms: 200,
390                result_preview: "found it".into(),
391            },
392            HiveEvent::ToolApprovalRequested {
393                timestamp_ms: 0,
394                agent_id: "a".into(),
395                tool_name: "delete".into(),
396                description: "Delete file".into(),
397            },
398            HiveEvent::ExperienceRecorded {
399                timestamp_ms: 0,
400                experience_id: ExperienceId::new(),
401                agent_id: "a".into(),
402                content_preview: "test".into(),
403                experience_type: "Generic".into(),
404                importance: 0.5,
405            },
406            HiveEvent::RelationshipInferred {
407                timestamp_ms: 0,
408                relation_id: RelationId::new(),
409                agent_id: "a".into(),
410            },
411            HiveEvent::InsightGenerated {
412                timestamp_ms: 0,
413                insight_id: InsightId::new(),
414                source_count: 5,
415                agent_id: "a".into(),
416            },
417            HiveEvent::SubstratePerceived {
418                timestamp_ms: 0,
419                agent_id: "a".into(),
420                experience_count: 10,
421                insight_count: 2,
422            },
423            HiveEvent::EmbeddingComputed {
424                timestamp_ms: 0,
425                agent_id: "a".into(),
426                dimensions: 384,
427                duration_ms: 100,
428            },
429            HiveEvent::WatchNotification {
430                timestamp_ms: 0,
431                experience_id: ExperienceId::new(),
432                collective_id: pulsedb::CollectiveId::new(),
433                event_type: "Created".into(),
434            },
435        ];
436        let _cloned: Vec<HiveEvent> = events.to_vec();
437        assert_eq!(events.len(), 14);
438    }
439
440    #[test]
441    fn test_now_ms_returns_nonzero() {
442        let ts = now_ms();
443        assert!(ts > 0, "Timestamp should be non-zero");
444        assert!(ts > 1_700_000_000_000, "Timestamp should be after 2023");
445    }
446}