walrus_daemon/daemon/event.rs
1//! Daemon event types and dispatch.
2//!
3//! All inbound stimuli (socket, channel, tool calls) are represented as
4//! [`DaemonEvent`] variants sent through a single `mpsc::unbounded_channel`.
5//! The [`Daemon`] processes them via [`handle_events`](Daemon::handle_events).
6//!
7//! Tool call routing is fully delegated to [`DaemonHook::dispatch_tool`] —
8//! no tool name matching happens here.
9
10use crate::daemon::Daemon;
11use futures_util::{StreamExt, pin_mut};
12use tokio::sync::mpsc;
13use wcore::{
14 ToolRequest,
15 protocol::{
16 api::Server,
17 message::{ClientMessage, ServerMessage},
18 },
19};
20
21/// Inbound event from any source, processed by the central event loop.
22pub enum DaemonEvent {
23 /// A client message from any source (socket, telegram).
24 /// Reply channel streams `ServerMessage`s back to the caller.
25 Message {
26 /// The parsed client message.
27 msg: ClientMessage,
28 /// Per-request reply channel for streaming `ServerMessage`s back.
29 reply: mpsc::UnboundedSender<ServerMessage>,
30 },
31 /// A tool call from an agent, routed through `DaemonHook::dispatch_tool`.
32 ToolCall(ToolRequest),
33 /// Periodic heartbeat tick for a specific agent.
34 Heartbeat { agent: String },
35 /// Graceful shutdown request.
36 Shutdown,
37}
38
39/// Shorthand for the event sender half of the daemon event channel.
40pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
41
42// ── Event dispatch ───────────────────────────────────────────────────
43
44impl Daemon {
45 /// Process events until [`DaemonEvent::Shutdown`] is received.
46 ///
47 /// Spawns a task for each event to avoid blocking on LLM calls.
48 pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
49 tracing::info!("event loop started");
50 while let Some(event) = rx.recv().await {
51 match event {
52 DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
53 DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
54 DaemonEvent::Heartbeat { .. } => {} // No-op: no queue promotion needed.
55 DaemonEvent::Shutdown => {
56 tracing::info!("event loop shutting down");
57 break;
58 }
59 }
60 }
61 tracing::info!("event loop stopped");
62 }
63
64 /// Dispatch a client message through the Server trait and stream replies.
65 fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
66 let daemon = self.clone();
67 tokio::spawn(async move {
68 let stream = daemon.dispatch(msg);
69 pin_mut!(stream);
70 while let Some(server_msg) = stream.next().await {
71 if reply.send(server_msg).is_err() {
72 break;
73 }
74 }
75 });
76 }
77
78 /// Route a tool call through `DaemonHook::dispatch_tool`.
79 fn handle_tool_call(&self, req: ToolRequest) {
80 let runtime = self.runtime.clone();
81 tokio::spawn(async move {
82 tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
83 let rt = runtime.read().await.clone();
84 let result = rt
85 .hook
86 .dispatch_tool(&req.name, &req.args, &req.agent, &req.sender)
87 .await;
88 let _ = req.reply.send(result);
89 });
90 }
91}