walrus_daemon/daemon/
event.rs1use crate::daemon::Daemon;
11use compact_str::CompactString;
12use futures_util::{StreamExt, pin_mut};
13use tokio::sync::mpsc;
14use wcore::{
15 ToolRequest,
16 protocol::{
17 api::Server,
18 message::{client::ClientMessage, server::ServerMessage},
19 },
20};
21
22pub enum DaemonEvent {
24 Message {
27 msg: ClientMessage,
29 reply: mpsc::UnboundedSender<ServerMessage>,
31 },
32 ToolCall(ToolRequest),
34 Heartbeat { agent: CompactString },
36 Shutdown,
38}
39
40pub type DaemonEventSender = mpsc::UnboundedSender<DaemonEvent>;
42
43impl Daemon {
46 pub(crate) async fn handle_events(&self, mut rx: mpsc::UnboundedReceiver<DaemonEvent>) {
50 tracing::info!("event loop started");
51 while let Some(event) = rx.recv().await {
52 match event {
53 DaemonEvent::Message { msg, reply } => self.handle_message(msg, reply),
54 DaemonEvent::ToolCall(req) => self.handle_tool_call(req),
55 DaemonEvent::Heartbeat { agent } => self.handle_heartbeat(agent),
56 DaemonEvent::Shutdown => {
57 tracing::info!("event loop shutting down");
58 break;
59 }
60 }
61 }
62 tracing::info!("event loop stopped");
63 }
64
65 fn handle_message(&self, msg: ClientMessage, reply: mpsc::UnboundedSender<ServerMessage>) {
67 let daemon = self.clone();
68 tokio::spawn(async move {
69 let stream = daemon.dispatch(msg);
70 pin_mut!(stream);
71 while let Some(server_msg) = stream.next().await {
72 if reply.send(server_msg).is_err() {
73 break;
74 }
75 }
76 });
77 }
78
79 fn handle_heartbeat(&self, agent: CompactString) {
82 let daemon = self.clone();
83 tokio::spawn(async move {
84 tracing::debug!(agent = %agent, "heartbeat tick");
85 let rt = daemon.runtime.read().await.clone();
86 let tasks_arc = rt.hook.tasks.clone();
87
88 let task_entries = {
90 let registry = tasks_arc.lock().await;
91 registry.queued_create_tasks_for(&agent)
92 };
93
94 if !task_entries.is_empty() {
95 let task_context: String = task_entries
96 .iter()
97 .map(|(id, desc)| format!("- Task #{id}: {desc}"))
98 .collect::<Vec<_>>()
99 .join("\n");
100
101 let prompt = daemon
102 .agents_config
103 .get(agent.as_str())
104 .map(|a| a.heartbeat.prompt.as_str())
105 .unwrap_or("");
106 let content = if prompt.is_empty() {
107 format!("You have pending tasks:\n{task_context}")
108 } else {
109 format!("{prompt}\n\nPending tasks:\n{task_context}")
110 };
111
112 {
114 let mut registry = tasks_arc.lock().await;
115 for (id, _) in &task_entries {
116 registry.set_status(*id, crate::hook::task::TaskStatus::InProgress);
117 }
118 }
119
120 let msg = ClientMessage::Send {
121 agent: agent.clone(),
122 content,
123 session: None,
124 sender: None,
125 };
126 let (reply_tx, _reply_rx) = mpsc::unbounded_channel();
127 let _ = daemon.event_tx.send(DaemonEvent::Message {
128 msg,
129 reply: reply_tx,
130 });
131 }
132
133 {
135 let reg = tasks_arc.clone();
136 tasks_arc.lock().await.promote_next(reg);
137 }
138 });
139 }
140
141 fn handle_tool_call(&self, req: ToolRequest) {
143 let runtime = self.runtime.clone();
144 tokio::spawn(async move {
145 tracing::debug!(tool = %req.name, agent = %req.agent, "tool dispatch");
146 let rt = runtime.read().await.clone();
147 let result = rt
148 .hook
149 .dispatch_tool(&req.name, &req.args, &req.agent, req.task_id, &req.sender)
150 .await;
151 let _ = req.reply.send(result);
152 });
153 }
154}