Skip to main content

walrus_daemon/daemon/
event.rs

1//! Daemon event types and dispatch.
2//!
3//! All inbound stimuli (socket, channel, tool calls) are represented as
4//! [`DaemonEvent`] variants sent through a single `mpsc::unbounded_channel`.
5//! The [`Daemon`] processes them via [`handle_events`](Daemon::handle_events).
6//!
7//! Tool call routing is fully delegated to [`DaemonHook::dispatch_tool`] —
8//! no tool name matching happens here.
9
10use crate::daemon::Daemon;
11use compact_str::CompactString;
12use futures_util::{StreamExt, pin_mut};
13use tokio::sync::mpsc;
14use wcore::{
15    ToolRequest,
16    protocol::{
17        api::Server,
18        message::{ClientMessage, ServerMessage},
19    },
20};
21
22/// Inbound event from any source, processed by the central event loop.
23pub enum DaemonEvent {
24    /// A client message from any source (socket, telegram, discord).
25    /// Reply channel streams `ServerMessage`s back to the caller.
26    Message {
27        /// The parsed client message.
28        msg: ClientMessage,
29        /// Per-request reply channel for streaming `ServerMessage`s back.
30        reply: mpsc::UnboundedSender<ServerMessage>,
31    },
32    /// A tool call from an agent, routed through `DaemonHook::dispatch_tool`.
33    ToolCall(ToolRequest),
34    /// Periodic heartbeat tick for a specific agent.
35    Heartbeat { agent: CompactString },
36    /// Graceful shutdown request.
37    Shutdown,
38}
39
40/// Shorthand for the event sender half of the daemon event channel.
41pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
42
43// ── Event dispatch ───────────────────────────────────────────────────
44
45impl Daemon {
46    /// Process events until [`DaemonEvent::Shutdown`] is received.
47    ///
48    /// Spawns a task for each event to avoid blocking on LLM calls.
49    pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
50        tracing::info!("event loop started");
51        while let Some(event) = rx.recv().await {
52            match event {
53                DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
54                DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
55                DaemonEvent::Heartbeat { agent } => self.handle_heartbeat(agent),
56                DaemonEvent::Shutdown => {
57                    tracing::info!("event loop shutting down");
58                    break;
59                }
60            }
61        }
62        tracing::info!("event loop stopped");
63    }
64
65    /// Dispatch a client message through the Server trait and stream replies.
66    fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
67        let daemon = self.clone();
68        tokio::spawn(async move {
69            let stream = daemon.dispatch(msg);
70            pin_mut!(stream);
71            while let Some(server_msg) = stream.next().await {
72                if reply.send(server_msg).is_err() {
73                    break;
74                }
75            }
76        });
77    }
78
79    /// Handle a heartbeat tick for a specific agent: promote queued tasks.
80    fn handle_heartbeat(&self, _agent: CompactString) {
81        let daemon = self.clone();
82        tokio::spawn(async move {
83            let rt = daemon.runtime.read().await.clone();
84            let tasks_arc = rt.hook.tasks.clone();
85            let mut reg = tasks_arc.lock().await;
86            crate::hook::system::task::tool::try_promote(
87                &mut reg,
88                tasks_arc.clone(),
89                rt.hook.event_tx.clone(),
90                rt.hook.task_timeout,
91            );
92        });
93    }
94
95    /// Route a tool call through `DaemonHook::dispatch_tool`.
96    fn handle_tool_call(&self, req: ToolRequest) {
97        let runtime = self.runtime.clone();
98        tokio::spawn(async move {
99            tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
100            let rt = runtime.read().await.clone();
101            let result = rt
102                .hook
103                .dispatch_tool(&req.name, &req.args, &req.agent, req.task_id)
104                .await;
105            let _ = req.reply.send(result);
106        });
107    }
108}