use std::sync::Arc;
use pulsedb::{ExperienceId, InsightId, RelationId};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::agent::{AgentKindTag, AgentOutcome};
use crate::export::EventExporter;
pub fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum HiveEvent {
AgentStarted {
timestamp_ms: u64,
agent_id: String,
name: String,
kind: AgentKindTag,
},
AgentCompleted {
timestamp_ms: u64,
agent_id: String,
outcome: AgentOutcome,
},
LlmCallStarted {
timestamp_ms: u64,
agent_id: String,
model: String,
message_count: usize,
},
LlmCallCompleted {
timestamp_ms: u64,
agent_id: String,
model: String,
duration_ms: u64,
input_tokens: u32,
output_tokens: u32,
},
LlmTokenStreamed {
timestamp_ms: u64,
agent_id: String,
token: String,
},
ToolCallStarted {
timestamp_ms: u64,
agent_id: String,
tool_name: String,
params: String,
},
ToolCallCompleted {
timestamp_ms: u64,
agent_id: String,
tool_name: String,
duration_ms: u64,
result_preview: String,
},
ToolApprovalRequested {
timestamp_ms: u64,
agent_id: String,
tool_name: String,
description: String,
},
ExperienceRecorded {
timestamp_ms: u64,
experience_id: ExperienceId,
agent_id: String,
content_preview: String,
experience_type: String,
importance: f32,
},
RelationshipInferred {
timestamp_ms: u64,
relation_id: RelationId,
agent_id: String,
},
InsightGenerated {
timestamp_ms: u64,
insight_id: InsightId,
source_count: usize,
agent_id: String,
},
SubstratePerceived {
timestamp_ms: u64,
agent_id: String,
experience_count: usize,
insight_count: usize,
},
EmbeddingComputed {
timestamp_ms: u64,
agent_id: String,
dimensions: usize,
duration_ms: u64,
},
WatchNotification {
timestamp_ms: u64,
experience_id: ExperienceId,
collective_id: pulsedb::CollectiveId,
event_type: String,
},
}
#[derive(Clone)]
pub struct EventEmitter {
sender: broadcast::Sender<HiveEvent>,
exporter: Option<Arc<dyn EventExporter>>,
}
impl EventEmitter {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
exporter: None,
}
}
pub fn with_exporter(capacity: usize, exporter: Arc<dyn EventExporter>) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
exporter: Some(exporter),
}
}
pub fn emit(&self, event: HiveEvent) {
if let Some(exporter) = &self.exporter {
let exporter = Arc::clone(exporter);
let event_clone = event.clone();
tokio::spawn(async move {
exporter.export(&event_clone).await;
});
}
let _ = self.sender.send(event);
}
pub fn subscribe(&self) -> broadcast::Receiver<HiveEvent> {
self.sender.subscribe()
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new(256)
}
}
impl std::fmt::Debug for EventEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmitter")
.field("subscriber_count", &self.sender.receiver_count())
.finish()
}
}
pub type EventBus = EventEmitter;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hive_event_is_debug_clone() {
let event = HiveEvent::AgentStarted {
timestamp_ms: now_ms(),
agent_id: "a1".into(),
name: "researcher".into(),
kind: AgentKindTag::Llm,
};
let cloned = event.clone();
let debug = format!("{:?}", cloned);
assert!(debug.contains("researcher"));
}
#[test]
fn test_hive_event_serializes_to_json() {
let event = HiveEvent::LlmCallCompleted {
timestamp_ms: 1711500000000,
agent_id: "agent-1".into(),
model: "gpt-4".into(),
duration_ms: 1500,
input_tokens: 200,
output_tokens: 50,
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"type\":\"llm_call_completed\""));
assert!(json.contains("\"input_tokens\":200"));
assert!(json.contains("\"output_tokens\":50"));
let deserialized: HiveEvent = serde_json::from_str(&json).unwrap();
assert!(matches!(
deserialized,
HiveEvent::LlmCallCompleted {
input_tokens: 200,
output_tokens: 50,
..
}
));
}
#[test]
fn test_hive_event_serialize_tool_call() {
let event = HiveEvent::ToolCallStarted {
timestamp_ms: now_ms(),
agent_id: "a1".into(),
tool_name: "search".into(),
params: r#"{"query":"test"}"#.into(),
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"params\""));
assert!(json.contains("\"tool_call_started\""));
}
#[tokio::test]
async fn test_event_emitter_send_receive() {
let emitter = EventEmitter::new(16);
let mut rx = emitter.subscribe();
emitter.emit(HiveEvent::AgentStarted {
timestamp_ms: now_ms(),
agent_id: "a1".into(),
name: "test".into(),
kind: AgentKindTag::Llm,
});
let event = rx.recv().await.unwrap();
assert!(matches!(event, HiveEvent::AgentStarted { agent_id, .. } if agent_id == "a1"));
}
#[tokio::test]
async fn test_event_emitter_multiple_subscribers() {
let emitter = EventEmitter::new(16);
let mut rx1 = emitter.subscribe();
let mut rx2 = emitter.subscribe();
emitter.emit(HiveEvent::ToolCallStarted {
timestamp_ms: now_ms(),
agent_id: "a1".into(),
tool_name: "search".into(),
params: "{}".into(),
});
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert!(matches!(e1, HiveEvent::ToolCallStarted { .. }));
assert!(matches!(e2, HiveEvent::ToolCallStarted { .. }));
}
#[test]
fn test_event_emitter_no_subscribers_no_panic() {
let emitter = EventEmitter::new(16);
emitter.emit(HiveEvent::ExperienceRecorded {
timestamp_ms: now_ms(),
experience_id: ExperienceId::new(),
agent_id: "a1".into(),
content_preview: "test".into(),
experience_type: "Generic".into(),
importance: 0.5,
});
}
#[test]
fn test_event_emitter_clone_is_cheap() {
let emitter = EventEmitter::default();
let cloned = emitter.clone();
let mut rx = cloned.subscribe();
emitter.emit(HiveEvent::SubstratePerceived {
timestamp_ms: now_ms(),
agent_id: "a1".into(),
experience_count: 10,
insight_count: 2,
});
assert!(rx.try_recv().is_ok());
}
#[test]
fn test_event_emitter_debug() {
let emitter = EventEmitter::default();
let debug = format!("{:?}", emitter);
assert!(debug.contains("EventEmitter"));
}
#[test]
fn test_all_event_variants_clone() {
let events: Vec<HiveEvent> = vec![
HiveEvent::AgentStarted {
timestamp_ms: 0,
agent_id: "a".into(),
name: "n".into(),
kind: AgentKindTag::Llm,
},
HiveEvent::AgentCompleted {
timestamp_ms: 0,
agent_id: "a".into(),
outcome: AgentOutcome::Complete {
response: "done".into(),
},
},
HiveEvent::LlmCallStarted {
timestamp_ms: 0,
agent_id: "a".into(),
model: "gpt-4".into(),
message_count: 3,
},
HiveEvent::LlmCallCompleted {
timestamp_ms: 0,
agent_id: "a".into(),
model: "gpt-4".into(),
duration_ms: 1500,
input_tokens: 100,
output_tokens: 50,
},
HiveEvent::LlmTokenStreamed {
timestamp_ms: 0,
agent_id: "a".into(),
token: "hello".into(),
},
HiveEvent::ToolCallStarted {
timestamp_ms: 0,
agent_id: "a".into(),
tool_name: "search".into(),
params: "{}".into(),
},
HiveEvent::ToolCallCompleted {
timestamp_ms: 0,
agent_id: "a".into(),
tool_name: "search".into(),
duration_ms: 200,
result_preview: "found it".into(),
},
HiveEvent::ToolApprovalRequested {
timestamp_ms: 0,
agent_id: "a".into(),
tool_name: "delete".into(),
description: "Delete file".into(),
},
HiveEvent::ExperienceRecorded {
timestamp_ms: 0,
experience_id: ExperienceId::new(),
agent_id: "a".into(),
content_preview: "test".into(),
experience_type: "Generic".into(),
importance: 0.5,
},
HiveEvent::RelationshipInferred {
timestamp_ms: 0,
relation_id: RelationId::new(),
agent_id: "a".into(),
},
HiveEvent::InsightGenerated {
timestamp_ms: 0,
insight_id: InsightId::new(),
source_count: 5,
agent_id: "a".into(),
},
HiveEvent::SubstratePerceived {
timestamp_ms: 0,
agent_id: "a".into(),
experience_count: 10,
insight_count: 2,
},
HiveEvent::EmbeddingComputed {
timestamp_ms: 0,
agent_id: "a".into(),
dimensions: 384,
duration_ms: 100,
},
HiveEvent::WatchNotification {
timestamp_ms: 0,
experience_id: ExperienceId::new(),
collective_id: pulsedb::CollectiveId::new(),
event_type: "Created".into(),
},
];
let _cloned: Vec<HiveEvent> = events.to_vec();
assert_eq!(events.len(), 14);
}
#[test]
fn test_now_ms_returns_nonzero() {
let ts = now_ms();
assert!(ts > 0, "Timestamp should be non-zero");
assert!(ts > 1_700_000_000_000, "Timestamp should be after 2023");
}
}