Skip to main content

walrus_daemon/daemon/
event.rs

1//! Daemon event types and dispatch.
2//!
3//! All inbound stimuli (socket, channel, tool calls) are represented as
4//! [`DaemonEvent`] variants sent through a single `mpsc::unbounded_channel`.
5//! The [`Daemon`] processes them via [`handle_events`](Daemon::handle_events).
6//!
7//! Tool call routing is fully delegated to [`DaemonHook::dispatch_tool`] —
8//! no tool name matching happens here.
9
10use crate::daemon::Daemon;
11use futures_util::{StreamExt, pin_mut};
12use tokio::sync::mpsc;
13use wcore::{
14    ToolRequest,
15    protocol::{
16        api::Server,
17        message::{client::ClientMessage, server::ServerMessage},
18    },
19};
20
21/// Inbound event from any source, processed by the central event loop.
22pub enum DaemonEvent {
23    /// A client message from any source (socket, telegram, discord).
24    /// Reply channel streams `ServerMessage`s back to the caller.
25    Message {
26        /// The parsed client message.
27        msg: ClientMessage,
28        /// Per-request reply channel for streaming `ServerMessage`s back.
29        reply: mpsc::UnboundedSender<ServerMessage>,
30    },
31    /// A tool call from an agent, routed through `DaemonHook::dispatch_tool`.
32    ToolCall(ToolRequest),
33    /// Graceful shutdown request.
34    Shutdown,
35}
36
37/// Shorthand for the event sender half of the daemon event channel.
38pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
39
40// ── Event dispatch ───────────────────────────────────────────────────
41
42impl Daemon {
43    /// Process events until [`DaemonEvent::Shutdown`] is received.
44    ///
45    /// Spawns a task for each event to avoid blocking on LLM calls.
46    pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
47        tracing::info!("event loop started");
48        while let Some(event) = rx.recv().await {
49            match event {
50                DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
51                DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
52                DaemonEvent::Shutdown => {
53                    tracing::info!("event loop shutting down");
54                    break;
55                }
56            }
57        }
58        tracing::info!("event loop stopped");
59    }
60
61    /// Dispatch a client message through the Server trait and stream replies.
62    fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
63        let daemon = self.clone();
64        tokio::spawn(async move {
65            let stream = daemon.dispatch(msg);
66            pin_mut!(stream);
67            while let Some(server_msg) = stream.next().await {
68                if reply.send(server_msg).is_err() {
69                    break;
70                }
71            }
72        });
73    }
74
75    /// Route a tool call through `DaemonHook::dispatch_tool`.
76    fn handle_tool_call(&self, req: ToolRequest) {
77        let runtime = self.runtime.clone();
78        tokio::spawn(async move {
79            tracing::debug!(tool = %req.name, "tool dispatch");
80            let rt = runtime.read().await.clone();
81            let result = rt.hook.dispatch_tool(&req.name, &req.args).await;
82            let _ = req.reply.send(result);
83        });
84    }
85}