Skip to main content

walrus_daemon/daemon/
event.rs

1//! Daemon event types and dispatch.
2//!
3//! All inbound stimuli (socket, channel, tool calls) are represented as
4//! [`DaemonEvent`] variants sent through a single `mpsc::unbounded_channel`.
5//! The [`Daemon`] processes them via [`handle_events`](Daemon::handle_events).
6//!
7//! Tool call routing is fully delegated to [`DaemonHook::dispatch_tool`] —
8//! no tool name matching happens here.
9
10use crate::daemon::Daemon;
11use futures_util::{StreamExt, pin_mut};
12use tokio::sync::mpsc;
13use wcore::{
14    ToolRequest,
15    protocol::{
16        api::Server,
17        message::{ClientMessage, ServerMessage},
18    },
19};
20
21/// Inbound event from any source, processed by the central event loop.
22pub enum DaemonEvent {
23    /// A client message from any source (socket, telegram).
24    /// Reply channel streams `ServerMessage`s back to the caller.
25    Message {
26        /// The parsed client message.
27        msg: ClientMessage,
28        /// Per-request reply channel for streaming `ServerMessage`s back.
29        reply: mpsc::UnboundedSender<ServerMessage>,
30    },
31    /// A tool call from an agent, routed through `DaemonHook::dispatch_tool`.
32    ToolCall(ToolRequest),
33    /// Periodic heartbeat tick for a specific agent.
34    Heartbeat { agent: String },
35    /// Graceful shutdown request.
36    Shutdown,
37}
38
39/// Shorthand for the event sender half of the daemon event channel.
40pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
41
42// ── Event dispatch ───────────────────────────────────────────────────
43
44impl Daemon {
45    /// Process events until [`DaemonEvent::Shutdown`] is received.
46    ///
47    /// Spawns a task for each event to avoid blocking on LLM calls.
48    pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
49        tracing::info!("event loop started");
50        while let Some(event) = rx.recv().await {
51            match event {
52                DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
53                DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
54                DaemonEvent::Heartbeat { .. } => {} // No-op: no queue promotion needed.
55                DaemonEvent::Shutdown => {
56                    tracing::info!("event loop shutting down");
57                    break;
58                }
59            }
60        }
61        tracing::info!("event loop stopped");
62    }
63
64    /// Dispatch a client message through the Server trait and stream replies.
65    fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
66        let daemon = self.clone();
67        tokio::spawn(async move {
68            let stream = daemon.dispatch(msg);
69            pin_mut!(stream);
70            while let Some(server_msg) = stream.next().await {
71                if reply.send(server_msg).is_err() {
72                    break;
73                }
74            }
75        });
76    }
77
78    /// Route a tool call through `DaemonHook::dispatch_tool`.
79    fn handle_tool_call(&self, req: ToolRequest) {
80        let runtime = self.runtime.clone();
81        tokio::spawn(async move {
82            tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
83            let rt = runtime.read().await.clone();
84            let result = rt
85                .hook
86                .dispatch_tool(&req.name, &req.args, &req.agent, &req.sender)
87                .await;
88            let _ = req.reply.send(result);
89        });
90    }
91}