Skip to main content

pulsehive_core/
event.rs

1//! Event system for real-time observability into agent execution.
2//!
3//! [`HiveEvent`] covers the full agent lifecycle: LLM calls, tool execution,
4//! substrate operations, and perception. [`EventEmitter`] provides a
5//! fire-and-forget broadcast mechanism for event distribution.
6//!
7//! Events are serializable to JSON for transmission to observability tools
8//! like PulseVision via [`EventExporter`](crate::export::EventExporter).
9//!
10//! Built on `tokio::sync::broadcast` for multi-consumer support.
11
12use std::sync::Arc;
13
14use pulsedb::{ExperienceId, InsightId, RelationId};
15use serde::{Deserialize, Serialize};
16use tokio::sync::broadcast;
17
18use crate::agent::{AgentKindTag, AgentOutcome};
19use crate::export::EventExporter;
20
21/// Returns the current time as epoch milliseconds.
22///
23/// Used by event emitters to timestamp events at creation time.
24pub fn now_ms() -> u64 {
25    std::time::SystemTime::now()
26        .duration_since(std::time::UNIX_EPOCH)
27        .unwrap_or_default()
28        .as_millis() as u64
29}
30
31/// Events emitted during agent execution.
32///
33/// Covers the full lifecycle: agent start/stop, LLM calls, tool execution,
34/// substrate operations, and perception. All variants include `timestamp_ms`
35/// (epoch milliseconds) and `agent_id` where applicable for correlation.
36///
37/// Serializes to tagged JSON: `{"type": "llm_call_completed", "agent_id": "...", ...}`
38///
39/// Must be `Clone` because [`EventEmitter`] uses `tokio::sync::broadcast`
40/// which requires cloneable values.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43pub enum HiveEvent {
44    // ── Agent lifecycle ──────────────────────────────────────────────
45    /// An agent has started execution.
46    AgentStarted {
47        timestamp_ms: u64,
48        agent_id: String,
49        name: String,
50        kind: AgentKindTag,
51    },
52    /// An agent has completed execution.
53    AgentCompleted {
54        timestamp_ms: u64,
55        agent_id: String,
56        outcome: AgentOutcome,
57    },
58
59    // ── LLM interactions ─────────────────────────────────────────────
60    /// An LLM call has been initiated.
61    LlmCallStarted {
62        timestamp_ms: u64,
63        agent_id: String,
64        model: String,
65        message_count: usize,
66    },
67    /// An LLM call has completed.
68    LlmCallCompleted {
69        timestamp_ms: u64,
70        agent_id: String,
71        model: String,
72        duration_ms: u64,
73        /// Prompt tokens consumed (0 if not reported by provider).
74        input_tokens: u32,
75        /// Completion tokens generated (0 if not reported by provider).
76        output_tokens: u32,
77    },
78    /// A token was received from a streaming LLM response.
79    LlmTokenStreamed {
80        timestamp_ms: u64,
81        agent_id: String,
82        token: String,
83    },
84
85    // ── Tool execution ───────────────────────────────────────────────
86    /// A tool call has started.
87    ToolCallStarted {
88        timestamp_ms: u64,
89        agent_id: String,
90        tool_name: String,
91        /// Tool arguments as a JSON string.
92        params: String,
93    },
94    /// A tool call has completed.
95    ToolCallCompleted {
96        timestamp_ms: u64,
97        agent_id: String,
98        tool_name: String,
99        duration_ms: u64,
100        /// Tool result preview (truncated to 200 chars).
101        result_preview: String,
102    },
103    /// A tool requires human approval before execution.
104    ToolApprovalRequested {
105        timestamp_ms: u64,
106        agent_id: String,
107        tool_name: String,
108        description: String,
109    },
110
111    // ── Substrate operations ─────────────────────────────────────────
112    /// An experience was recorded in the substrate.
113    ExperienceRecorded {
114        timestamp_ms: u64,
115        experience_id: ExperienceId,
116        agent_id: String,
117        /// First 200 characters of the experience content.
118        content_preview: String,
119        /// Experience type as a string (e.g., "Generic", "Solution").
120        experience_type: String,
121        /// Importance score (0.0-1.0).
122        importance: f32,
123    },
124    /// A relationship was inferred between experiences.
125    RelationshipInferred {
126        timestamp_ms: u64,
127        relation_id: RelationId,
128        /// Agent that triggered the inference.
129        agent_id: String,
130    },
131    /// An insight was synthesized from an experience cluster.
132    InsightGenerated {
133        timestamp_ms: u64,
134        insight_id: InsightId,
135        source_count: usize,
136        /// Agent that triggered the synthesis.
137        agent_id: String,
138    },
139
140    // ── Perception ───────────────────────────────────────────────────
141    /// An agent perceived the substrate through its lens.
142    SubstratePerceived {
143        timestamp_ms: u64,
144        agent_id: String,
145        experience_count: usize,
146        insight_count: usize,
147    },
148
149    // ── Embedding ─────────────────────────────────────────────────
150    /// An embedding was computed via the EmbeddingProvider.
151    EmbeddingComputed {
152        timestamp_ms: u64,
153        agent_id: String,
154        dimensions: usize,
155        duration_ms: u64,
156    },
157
158    // ── Watch system ───────────────────────────────────────────────
159    /// A real-time Watch event from the substrate.
160    ///
161    /// Emitted when experiences are created, updated, archived, or deleted
162    /// by other agents in the same collective. Forwarded from PulseDB's
163    /// Watch system into the HiveEvent stream.
164    WatchNotification {
165        timestamp_ms: u64,
166        experience_id: ExperienceId,
167        collective_id: pulsedb::CollectiveId,
168        /// The type of change: "Created", "Updated", "Archived", or "Deleted".
169        event_type: String,
170    },
171}
172
173/// Fire-and-forget event broadcaster.
174///
175/// Wraps `tokio::sync::broadcast` for multi-consumer event distribution.
176/// If no subscribers exist, emitted events are silently dropped.
177///
178/// `Clone` is cheap — it just clones the broadcast sender handle.
179#[derive(Clone)]
180pub struct EventEmitter {
181    sender: broadcast::Sender<HiveEvent>,
182    exporter: Option<Arc<dyn EventExporter>>,
183}
184
185impl EventEmitter {
186    /// Creates a new emitter with the given channel capacity.
187    pub fn new(capacity: usize) -> Self {
188        let (sender, _) = broadcast::channel(capacity);
189        Self {
190            sender,
191            exporter: None,
192        }
193    }
194
195    /// Creates a new emitter with an event exporter for external observability.
196    ///
197    /// When set, every emitted event is also forwarded to the exporter
198    /// via a fire-and-forget `tokio::spawn` — zero latency on the emit path.
199    pub fn with_exporter(capacity: usize, exporter: Arc<dyn EventExporter>) -> Self {
200        let (sender, _) = broadcast::channel(capacity);
201        Self {
202            sender,
203            exporter: Some(exporter),
204        }
205    }
206
207    /// Emits an event to all subscribers and the exporter (if set).
208    /// Fire-and-forget — if no subscribers exist, the event is silently dropped.
209    pub fn emit(&self, event: HiveEvent) {
210        if let Some(exporter) = &self.exporter {
211            let exporter = Arc::clone(exporter);
212            let event_clone = event.clone();
213            tokio::spawn(async move {
214                exporter.export(&event_clone).await;
215            });
216        }
217        let _ = self.sender.send(event);
218    }
219
220    /// Creates a new subscriber that receives all future events.
221    pub fn subscribe(&self) -> broadcast::Receiver<HiveEvent> {
222        self.sender.subscribe()
223    }
224}
225
226impl Default for EventEmitter {
227    fn default() -> Self {
228        Self::new(256)
229    }
230}
231
232impl std::fmt::Debug for EventEmitter {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("EventEmitter")
235            .field("subscriber_count", &self.sender.receiver_count())
236            .finish()
237    }
238}
239
240/// Type alias for event fan-out. In Sprint 1, this is just an EventEmitter.
241/// May evolve into a more sophisticated bus with filtering in later sprints.
242pub type EventBus = EventEmitter;
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn test_hive_event_is_debug_clone() {
250        let event = HiveEvent::AgentStarted {
251            timestamp_ms: now_ms(),
252            agent_id: "a1".into(),
253            name: "researcher".into(),
254            kind: AgentKindTag::Llm,
255        };
256        let cloned = event.clone();
257        let debug = format!("{:?}", cloned);
258        assert!(debug.contains("researcher"));
259    }
260
261    #[test]
262    fn test_hive_event_serializes_to_json() {
263        let event = HiveEvent::LlmCallCompleted {
264            timestamp_ms: 1711500000000,
265            agent_id: "agent-1".into(),
266            model: "gpt-4".into(),
267            duration_ms: 1500,
268            input_tokens: 200,
269            output_tokens: 50,
270        };
271        let json = serde_json::to_string(&event).unwrap();
272        assert!(json.contains("\"type\":\"llm_call_completed\""));
273        assert!(json.contains("\"input_tokens\":200"));
274        assert!(json.contains("\"output_tokens\":50"));
275
276        // Roundtrip
277        let deserialized: HiveEvent = serde_json::from_str(&json).unwrap();
278        assert!(matches!(
279            deserialized,
280            HiveEvent::LlmCallCompleted {
281                input_tokens: 200,
282                output_tokens: 50,
283                ..
284            }
285        ));
286    }
287
288    #[test]
289    fn test_hive_event_serialize_tool_call() {
290        let event = HiveEvent::ToolCallStarted {
291            timestamp_ms: now_ms(),
292            agent_id: "a1".into(),
293            tool_name: "search".into(),
294            params: r#"{"query":"test"}"#.into(),
295        };
296        let json = serde_json::to_string(&event).unwrap();
297        assert!(json.contains("\"params\""));
298        assert!(json.contains("\"tool_call_started\""));
299    }
300
301    #[tokio::test]
302    async fn test_event_emitter_send_receive() {
303        let emitter = EventEmitter::new(16);
304        let mut rx = emitter.subscribe();
305
306        emitter.emit(HiveEvent::AgentStarted {
307            timestamp_ms: now_ms(),
308            agent_id: "a1".into(),
309            name: "test".into(),
310            kind: AgentKindTag::Llm,
311        });
312
313        let event = rx.recv().await.unwrap();
314        assert!(matches!(event, HiveEvent::AgentStarted { agent_id, .. } if agent_id == "a1"));
315    }
316
317    #[tokio::test]
318    async fn test_event_emitter_multiple_subscribers() {
319        let emitter = EventEmitter::new(16);
320        let mut rx1 = emitter.subscribe();
321        let mut rx2 = emitter.subscribe();
322
323        emitter.emit(HiveEvent::ToolCallStarted {
324            timestamp_ms: now_ms(),
325            agent_id: "a1".into(),
326            tool_name: "search".into(),
327            params: "{}".into(),
328        });
329
330        let e1 = rx1.recv().await.unwrap();
331        let e2 = rx2.recv().await.unwrap();
332        assert!(matches!(e1, HiveEvent::ToolCallStarted { .. }));
333        assert!(matches!(e2, HiveEvent::ToolCallStarted { .. }));
334    }
335
336    #[test]
337    fn test_event_emitter_no_subscribers_no_panic() {
338        let emitter = EventEmitter::new(16);
339        emitter.emit(HiveEvent::ExperienceRecorded {
340            timestamp_ms: now_ms(),
341            experience_id: ExperienceId::new(),
342            agent_id: "a1".into(),
343            content_preview: "test".into(),
344            experience_type: "Generic".into(),
345            importance: 0.5,
346        });
347    }
348
349    #[test]
350    fn test_event_emitter_clone_is_cheap() {
351        let emitter = EventEmitter::default();
352        let cloned = emitter.clone();
353        let mut rx = cloned.subscribe();
354        emitter.emit(HiveEvent::SubstratePerceived {
355            timestamp_ms: now_ms(),
356            agent_id: "a1".into(),
357            experience_count: 10,
358            insight_count: 2,
359        });
360        assert!(rx.try_recv().is_ok());
361    }
362
363    #[test]
364    fn test_event_emitter_debug() {
365        let emitter = EventEmitter::default();
366        let debug = format!("{:?}", emitter);
367        assert!(debug.contains("EventEmitter"));
368    }
369
370    #[test]
371    fn test_all_event_variants_clone() {
372        let events: Vec<HiveEvent> = vec![
373            HiveEvent::AgentStarted {
374                timestamp_ms: 0,
375                agent_id: "a".into(),
376                name: "n".into(),
377                kind: AgentKindTag::Llm,
378            },
379            HiveEvent::AgentCompleted {
380                timestamp_ms: 0,
381                agent_id: "a".into(),
382                outcome: AgentOutcome::Complete {
383                    response: "done".into(),
384                },
385            },
386            HiveEvent::LlmCallStarted {
387                timestamp_ms: 0,
388                agent_id: "a".into(),
389                model: "gpt-4".into(),
390                message_count: 3,
391            },
392            HiveEvent::LlmCallCompleted {
393                timestamp_ms: 0,
394                agent_id: "a".into(),
395                model: "gpt-4".into(),
396                duration_ms: 1500,
397                input_tokens: 100,
398                output_tokens: 50,
399            },
400            HiveEvent::LlmTokenStreamed {
401                timestamp_ms: 0,
402                agent_id: "a".into(),
403                token: "hello".into(),
404            },
405            HiveEvent::ToolCallStarted {
406                timestamp_ms: 0,
407                agent_id: "a".into(),
408                tool_name: "search".into(),
409                params: "{}".into(),
410            },
411            HiveEvent::ToolCallCompleted {
412                timestamp_ms: 0,
413                agent_id: "a".into(),
414                tool_name: "search".into(),
415                duration_ms: 200,
416                result_preview: "found it".into(),
417            },
418            HiveEvent::ToolApprovalRequested {
419                timestamp_ms: 0,
420                agent_id: "a".into(),
421                tool_name: "delete".into(),
422                description: "Delete file".into(),
423            },
424            HiveEvent::ExperienceRecorded {
425                timestamp_ms: 0,
426                experience_id: ExperienceId::new(),
427                agent_id: "a".into(),
428                content_preview: "test".into(),
429                experience_type: "Generic".into(),
430                importance: 0.5,
431            },
432            HiveEvent::RelationshipInferred {
433                timestamp_ms: 0,
434                relation_id: RelationId::new(),
435                agent_id: "a".into(),
436            },
437            HiveEvent::InsightGenerated {
438                timestamp_ms: 0,
439                insight_id: InsightId::new(),
440                source_count: 5,
441                agent_id: "a".into(),
442            },
443            HiveEvent::SubstratePerceived {
444                timestamp_ms: 0,
445                agent_id: "a".into(),
446                experience_count: 10,
447                insight_count: 2,
448            },
449            HiveEvent::EmbeddingComputed {
450                timestamp_ms: 0,
451                agent_id: "a".into(),
452                dimensions: 384,
453                duration_ms: 100,
454            },
455            HiveEvent::WatchNotification {
456                timestamp_ms: 0,
457                experience_id: ExperienceId::new(),
458                collective_id: pulsedb::CollectiveId::new(),
459                event_type: "Created".into(),
460            },
461        ];
462        let _cloned: Vec<HiveEvent> = events.to_vec();
463        assert_eq!(events.len(), 14);
464    }
465
466    #[test]
467    fn test_now_ms_returns_nonzero() {
468        let ts = now_ms();
469        assert!(ts > 0, "Timestamp should be non-zero");
470        assert!(ts > 1_700_000_000_000, "Timestamp should be after 2023");
471    }
472}