use crate::daemon::Daemon;
use crabllm_core::Provider;
use futures_util::{StreamExt, pin_mut};
use runtime::host::Host;
use tokio::sync::{mpsc, oneshot};
use wcore::{
AgentConfig, ToolRequest,
protocol::{
api::Server,
message::{ClientMessage, ServerMessage},
},
};
pub enum DaemonEvent {
Message {
msg: ClientMessage,
reply: mpsc::Sender<ServerMessage>,
},
ToolCall(ToolRequest),
PublishEvent {
source: String,
payload: String,
},
AddEphemeral {
config: AgentConfig,
reply: oneshot::Sender<()>,
},
RemoveEphemeral { name: String },
Shutdown,
}
pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
impl<P: Provider + 'static, H: Host + 'static> Daemon<P, H> {
pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
tracing::info!("event loop started");
while let Some(event) = rx.recv().await {
match event {
DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
DaemonEvent::PublishEvent { source, payload } => {
self.events.lock().await.publish(&source, &payload);
}
DaemonEvent::AddEphemeral { config, reply } => {
let rt = self.runtime.read().await.clone();
rt.add_ephemeral(config).await;
let _ = reply.send(());
}
DaemonEvent::RemoveEphemeral { name } => {
let rt = self.runtime.read().await.clone();
rt.remove_ephemeral(&name).await;
}
DaemonEvent::Shutdown => {
tracing::info!("event loop shutting down");
break;
}
}
}
tracing::info!("event loop stopped");
}
fn handle_message(&self, msg: ClientMessage, reply: mpsc::Sender<ServerMessage>) {
let daemon = self.clone();
tokio::spawn(async move {
let stream = daemon.dispatch(msg);
pin_mut!(stream);
while let Some(server_msg) = stream.next().await {
if reply.send(server_msg).await.is_err() {
break;
}
}
});
}
fn handle_tool_call(&self, req: ToolRequest) {
let runtime = self.runtime.clone();
tokio::spawn(async move {
tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
let rt = runtime.read().await.clone();
let result = rt
.hook
.dispatch_tool(
&req.name,
&req.args,
&req.agent,
&req.sender,
req.conversation_id,
)
.await;
let _ = req.reply.send(result);
});
}
}