use crate::daemon::Daemon;
use futures_util::{StreamExt, pin_mut};
use runtime::host::Host;
use tokio::sync::mpsc;
use wcore::{
ToolRequest,
protocol::{
api::Server,
message::{ClientMessage, ServerMessage},
},
};
pub enum DaemonEvent {
Message {
msg: ClientMessage,
reply: mpsc::Sender<ServerMessage>,
},
ToolCall(ToolRequest),
Shutdown,
}
pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
impl<H: Host + 'static> Daemon<H> {
pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
tracing::info!("event loop started");
while let Some(event) = rx.recv().await {
match event {
DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
DaemonEvent::Shutdown => {
tracing::info!("event loop shutting down");
break;
}
}
}
tracing::info!("event loop stopped");
}
fn handle_message(&self, msg: ClientMessage, reply: mpsc::Sender<ServerMessage>) {
let daemon = self.clone();
tokio::spawn(async move {
let stream = daemon.dispatch(msg);
pin_mut!(stream);
while let Some(server_msg) = stream.next().await {
if reply.send(server_msg).await.is_err() {
break;
}
}
});
}
fn handle_tool_call(&self, req: ToolRequest) {
let runtime = self.runtime.clone();
tokio::spawn(async move {
tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
let rt = runtime.read().await.clone();
let result = rt
.hook
.dispatch_tool(
&req.name,
&req.args,
&req.agent,
&req.sender,
req.session_id,
)
.await;
let _ = req.reply.send(result);
});
}
}