use crate::daemon::event::{DaemonEvent, DaemonEventSender};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf};
use wcore::protocol::message::{ClientMessage, SendMsg};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventSubscription {
pub id: u64,
pub source: String,
pub target_agent: String,
#[serde(default)]
pub once: bool,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct EventFile {
#[serde(default)]
subscription: Vec<EventSubscription>,
}
pub struct EventBus {
subscriptions: HashMap<u64, EventSubscription>,
next_id: u64,
events_path: PathBuf,
event_tx: DaemonEventSender,
}
impl EventBus {
pub fn load(events_path: PathBuf, event_tx: DaemonEventSender) -> Self {
let mut subscriptions = HashMap::new();
let mut max_id = 0u64;
if let Ok(content) = std::fs::read_to_string(&events_path) {
if let Ok(file) = toml::from_str::<EventFile>(&content) {
for sub in file.subscription {
max_id = max_id.max(sub.id);
subscriptions.insert(sub.id, sub);
}
} else {
tracing::warn!(
"failed to parse {}, starting with empty subscriptions",
events_path.display()
);
}
}
Self {
subscriptions,
next_id: max_id + 1,
events_path,
event_tx,
}
}
pub fn subscribe(&mut self, mut sub: EventSubscription) -> EventSubscription {
sub.id = self.next_id;
self.next_id += 1;
self.subscriptions.insert(sub.id, sub.clone());
self.save();
sub
}
pub fn unsubscribe(&mut self, id: u64) -> bool {
if self.subscriptions.remove(&id).is_none() {
return false;
}
self.save();
true
}
pub fn list(&self) -> Vec<EventSubscription> {
self.subscriptions.values().cloned().collect()
}
pub fn publish(&mut self, source: &str, payload: &str) {
let mut to_remove = Vec::new();
for (id, sub) in &self.subscriptions {
if sub.source == source {
self.fire_agent(sub, payload);
if sub.once {
to_remove.push(*id);
}
}
}
if !to_remove.is_empty() {
for id in &to_remove {
self.subscriptions.remove(id);
}
self.save();
}
}
fn fire_agent(&self, sub: &EventSubscription, payload: &str) {
tracing::info!(
"event bus: firing agent='{}' for source='{}'",
sub.target_agent,
sub.source,
);
let (reply_tx, _) = tokio::sync::mpsc::channel(1);
let msg = ClientMessage::from(SendMsg {
agent: sub.target_agent.clone(),
content: payload.to_owned(),
sender: Some(format!("event:{}", sub.source)),
cwd: None,
guest: None,
tool_choice: None,
});
let _ = self.event_tx.send(DaemonEvent::Message {
msg,
reply: reply_tx,
});
}
fn save(&self) {
let file = EventFile {
subscription: self.subscriptions.values().cloned().collect(),
};
let content = match toml::to_string_pretty(&file) {
Ok(c) => c,
Err(e) => {
tracing::error!("failed to serialize subscriptions: {e}");
return;
}
};
let tmp = self.events_path.with_extension("toml.tmp");
if let Err(e) = std::fs::write(&tmp, &content) {
tracing::error!("failed to write {}: {e}", tmp.display());
return;
}
if let Err(e) = std::fs::rename(&tmp, &self.events_path) {
tracing::error!(
"failed to rename {} -> {}: {e}",
tmp.display(),
self.events_path.display()
);
}
}
}