Skip to main content

walrus_daemon/daemon/
event.rs

1//! Daemon event types and dispatch.
2//!
3//! All inbound stimuli (socket, channel, tool calls) are represented as
4//! [`DaemonEvent`] variants sent through a single `mpsc::unbounded_channel`.
5//! The [`Daemon`] processes them via [`handle_events`](Daemon::handle_events).
6//!
7//! Tool call routing is fully delegated to [`DaemonHook::dispatch_tool`] —
8//! no tool name matching happens here.
9
10use crate::daemon::Daemon;
11use compact_str::CompactString;
12use futures_util::{StreamExt, pin_mut};
13use tokio::sync::mpsc;
14use wcore::{
15    ToolRequest,
16    protocol::{
17        api::Server,
18        message::{client::ClientMessage, server::ServerMessage},
19    },
20};
21
22/// Inbound event from any source, processed by the central event loop.
23pub enum DaemonEvent {
24    /// A client message from any source (socket, telegram, discord).
25    /// Reply channel streams `ServerMessage`s back to the caller.
26    Message {
27        /// The parsed client message.
28        msg: ClientMessage,
29        /// Per-request reply channel for streaming `ServerMessage`s back.
30        reply: mpsc::UnboundedSender<ServerMessage>,
31    },
32    /// A tool call from an agent, routed through `DaemonHook::dispatch_tool`.
33    ToolCall(ToolRequest),
34    /// Periodic heartbeat tick for a specific agent.
35    Heartbeat { agent: CompactString },
36    /// Graceful shutdown request.
37    Shutdown,
38}
39
40/// Shorthand for the event sender half of the daemon event channel.
41pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
42
43// ── Event dispatch ───────────────────────────────────────────────────
44
45impl Daemon {
46    /// Process events until [`DaemonEvent::Shutdown`] is received.
47    ///
48    /// Spawns a task for each event to avoid blocking on LLM calls.
49    pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
50        tracing::info!("event loop started");
51        while let Some(event) = rx.recv().await {
52            match event {
53                DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
54                DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
55                DaemonEvent::Heartbeat { agent } => self.handle_heartbeat(agent),
56                DaemonEvent::Shutdown => {
57                    tracing::info!("event loop shutting down");
58                    break;
59                }
60            }
61        }
62        tracing::info!("event loop stopped");
63    }
64
65    /// Dispatch a client message through the Server trait and stream replies.
66    fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
67        let daemon = self.clone();
68        tokio::spawn(async move {
69            let stream = daemon.dispatch(msg);
70            pin_mut!(stream);
71            while let Some(server_msg) = stream.next().await {
72                if reply.send(server_msg).is_err() {
73                    break;
74                }
75            }
76        });
77    }
78
79    /// Handle a heartbeat tick for a specific agent: deliver queued create_task
80    /// entries and promote spawn_task entries.
81    fn handle_heartbeat(&self, agent: CompactString) {
82        let daemon = self.clone();
83        tokio::spawn(async move {
84            tracing::debug!(agent = %agent, "heartbeat tick");
85            let rt = daemon.runtime.read().await.clone();
86            let tasks_arc = rt.hook.tasks.clone();
87
88            // Gather queued create_task entries for this agent.
89            let task_entries = {
90                let registry = tasks_arc.lock().await;
91                registry.queued_create_tasks_for(&agent)
92            };
93
94            if !task_entries.is_empty() {
95                let task_context: String = task_entries
96                    .iter()
97                    .map(|(id, desc)| format!("- Task #{id}: {desc}"))
98                    .collect::<Vec<_>>()
99                    .join("\n");
100
101                let prompt = daemon
102                    .agents_config
103                    .get(agent.as_str())
104                    .map(|a| a.heartbeat.prompt.as_str())
105                    .unwrap_or("");
106                let content = if prompt.is_empty() {
107                    format!("You have pending tasks:\n{task_context}")
108                } else {
109                    format!("{prompt}\n\nPending tasks:\n{task_context}")
110                };
111
112                // Mark tasks InProgress.
113                {
114                    let mut registry = tasks_arc.lock().await;
115                    for (id, _) in &task_entries {
116                        registry.set_status(*id, crate::hook::task::TaskStatus::InProgress);
117                    }
118                }
119
120                let msg = ClientMessage::Send {
121                    agent: agent.clone(),
122                    content,
123                    session: None,
124                    sender: None,
125                };
126                let (reply_tx, _reply_rx) = mpsc::unbounded_channel();
127                let _ = daemon.event_tx.send(DaemonEvent::Message {
128                    msg,
129                    reply: reply_tx,
130                });
131            }
132
133            // Promote queued spawn_task entries.
134            {
135                let reg = tasks_arc.clone();
136                tasks_arc.lock().await.promote_next(reg);
137            }
138        });
139    }
140
141    /// Route a tool call through `DaemonHook::dispatch_tool`.
142    fn handle_tool_call(&self, req: ToolRequest) {
143        let runtime = self.runtime.clone();
144        tokio::spawn(async move {
145            tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
146            let rt = runtime.read().await.clone();
147            let result = rt
148                .hook
149                .dispatch_tool(&req.name, &req.args, &req.agent, req.task_id, &req.sender)
150                .await;
151            let _ = req.reply.send(result);
152        });
153    }
154}