walrus_daemon/daemon/event.rs
1//! Daemon event types and dispatch.
2//!
3//! All inbound stimuli (socket, channel, tool calls) are represented as
4//! [`DaemonEvent`] variants sent through a single `mpsc::unbounded_channel`.
5//! The [`Daemon`] processes them via [`handle_events`](Daemon::handle_events).
6//!
7//! Tool call routing is fully delegated to [`DaemonHook::dispatch_tool`] —
8//! no tool name matching happens here.
9
10use crate::daemon::Daemon;
11use futures_util::{StreamExt, pin_mut};
12use tokio::sync::mpsc;
13use wcore::{
14 ToolRequest,
15 protocol::{
16 api::Server,
17 message::{client::ClientMessage, server::ServerMessage},
18 },
19};
20
21/// Inbound event from any source, processed by the central event loop.
22pub enum DaemonEvent {
23 /// A client message from any source (socket, telegram, discord).
24 /// Reply channel streams `ServerMessage`s back to the caller.
25 Message {
26 /// The parsed client message.
27 msg: ClientMessage,
28 /// Per-request reply channel for streaming `ServerMessage`s back.
29 reply: mpsc::UnboundedSender<ServerMessage>,
30 },
31 /// A tool call from an agent, routed through `DaemonHook::dispatch_tool`.
32 ToolCall(ToolRequest),
33 /// Graceful shutdown request.
34 Shutdown,
35}
36
37/// Shorthand for the event sender half of the daemon event channel.
38pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
39
40// ── Event dispatch ───────────────────────────────────────────────────
41
42impl Daemon {
43 /// Process events until [`DaemonEvent::Shutdown`] is received.
44 ///
45 /// Spawns a task for each event to avoid blocking on LLM calls.
46 pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
47 tracing::info!("event loop started");
48 while let Some(event) = rx.recv().await {
49 match event {
50 DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
51 DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
52 DaemonEvent::Shutdown => {
53 tracing::info!("event loop shutting down");
54 break;
55 }
56 }
57 }
58 tracing::info!("event loop stopped");
59 }
60
61 /// Dispatch a client message through the Server trait and stream replies.
62 fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
63 let daemon = self.clone();
64 tokio::spawn(async move {
65 let stream = daemon.dispatch(msg);
66 pin_mut!(stream);
67 while let Some(server_msg) = stream.next().await {
68 if reply.send(server_msg).is_err() {
69 break;
70 }
71 }
72 });
73 }
74
75 /// Route a tool call through `DaemonHook::dispatch_tool`.
76 fn handle_tool_call(&self, req: ToolRequest) {
77 let runtime = self.runtime.clone();
78 tokio::spawn(async move {
79 tracing::debug!(tool = %req.name, "tool dispatch");
80 let rt = runtime.read().await.clone();
81 let result = rt.hook.dispatch_tool(&req.name, &req.args).await;
82 let _ = req.reply.send(result);
83 });
84 }
85}