walrus_daemon/daemon/
event.rs1use crate::daemon::Daemon;
11use compact_str::CompactString;
12use futures_util::{StreamExt, pin_mut};
13use tokio::sync::mpsc;
14use wcore::{
15 ToolRequest,
16 protocol::{
17 api::Server,
18 message::{ClientMessage, ServerMessage},
19 },
20};
21
22pub enum DaemonEvent {
24 Message {
27 msg: ClientMessage,
29 reply: mpsc::UnboundedSender<ServerMessage>,
31 },
32 ToolCall(ToolRequest),
34 Heartbeat { agent: CompactString },
36 Shutdown,
38}
39
40pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
42
43impl Daemon {
46 pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
50 tracing::info!("event loop started");
51 while let Some(event) = rx.recv().await {
52 match event {
53 DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
54 DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
55 DaemonEvent::Heartbeat { agent } => self.handle_heartbeat(agent),
56 DaemonEvent::Shutdown => {
57 tracing::info!("event loop shutting down");
58 break;
59 }
60 }
61 }
62 tracing::info!("event loop stopped");
63 }
64
65 fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
67 let daemon = self.clone();
68 tokio::spawn(async move {
69 let stream = daemon.dispatch(msg);
70 pin_mut!(stream);
71 while let Some(server_msg) = stream.next().await {
72 if reply.send(server_msg).is_err() {
73 break;
74 }
75 }
76 });
77 }
78
79 fn handle_heartbeat(&self, _agent: CompactString) {
81 let daemon = self.clone();
82 tokio::spawn(async move {
83 let rt = daemon.runtime.read().await.clone();
84 let tasks_arc = rt.hook.tasks.clone();
85 let mut reg = tasks_arc.lock().await;
86 crate::hook::system::task::tool::try_promote(
87 &mut reg,
88 tasks_arc.clone(),
89 rt.hook.event_tx.clone(),
90 rt.hook.task_timeout,
91 );
92 });
93 }
94
95 fn handle_tool_call(&self, req: ToolRequest) {
97 let runtime = self.runtime.clone();
98 tokio::spawn(async move {
99 tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
100 let rt = runtime.read().await.clone();
101 let result = rt
102 .hook
103 .dispatch_tool(&req.name, &req.args, &req.agent, req.task_id)
104 .await;
105 let _ = req.reply.send(result);
106 });
107 }
108}