walrus_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 daemon::event::{DaemonEvent, DaemonEventSender},
10 hook::DaemonHook,
11};
12use ::socket::server::accept_loop;
13use anyhow::Result;
14use model::ProviderManager;
15use std::{
16 collections::BTreeMap,
17 path::{Path, PathBuf},
18 sync::Arc,
19};
20use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
21use wcore::AgentConfig;
22use wcore::Runtime;
23use wcore::protocol::message::client::ClientMessage;
24
25pub(crate) mod builder;
26pub mod event;
27mod protocol;
28
29#[derive(Clone)]
35pub struct Daemon {
36 pub runtime: Arc<RwLock<Arc<Runtime<ProviderManager, DaemonHook>>>>,
38 pub(crate) config_dir: PathBuf,
40 pub(crate) event_tx: DaemonEventSender,
44 pub(crate) agents_config: BTreeMap<String, AgentConfig>,
46}
47
48impl Daemon {
49 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
56 let config_path = config_dir.join("walrus.toml");
57 let config = DaemonConfig::load(&config_path)?;
58 tracing::info!("loaded configuration from {}", config_path.display());
59
60 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
61 let daemon = Daemon::build(&config, config_dir, event_tx.clone()).await?;
62
63 let (shutdown_tx, _) = broadcast::channel::<()>(1);
65 let shutdown_event_tx = event_tx.clone();
66 let mut shutdown_rx = shutdown_tx.subscribe();
67 tokio::spawn(async move {
68 let _ = shutdown_rx.recv().await;
69 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
70 });
71
72 for (name, agent) in &config.agents {
74 if agent.heartbeat.interval == 0 {
75 continue;
76 }
77 let agent_name = compact_str::CompactString::from(name.as_str());
78 let heartbeat_tx = event_tx.clone();
79 let mut heartbeat_shutdown = shutdown_tx.subscribe();
80 let interval_secs = agent.heartbeat.interval * 60;
81 tokio::spawn(async move {
82 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
83 tick.tick().await; loop {
85 tokio::select! {
86 _ = tick.tick() => {
87 let event = DaemonEvent::Heartbeat {
88 agent: agent_name.clone(),
89 };
90 if heartbeat_tx.send(event).is_err() {
91 break;
92 }
93 }
94 _ = heartbeat_shutdown.recv() => break,
95 }
96 }
97 });
98 tracing::info!(
99 "heartbeat timer started for '{}' (interval: {}m)",
100 name,
101 agent.heartbeat.interval,
102 );
103 }
104
105 let d = daemon.clone();
106 let event_loop_join = tokio::spawn(async move {
107 d.handle_events(event_rx).await;
108 });
109
110 Ok(DaemonHandle {
111 config,
112 event_tx,
113 shutdown_tx,
114 daemon,
115 event_loop_join: Some(event_loop_join),
116 })
117 }
118}
119
120pub struct DaemonHandle {
125 pub config: DaemonConfig,
127 pub event_tx: DaemonEventSender,
130 pub shutdown_tx: broadcast::Sender<()>,
133 daemon: Daemon,
134 event_loop_join: Option<tokio::task::JoinHandle<()>>,
135}
136
137impl DaemonHandle {
138 pub async fn wait_until_ready(&self) -> Result<()> {
143 let rt = self.daemon.runtime.read().await;
144 rt.model.wait_until_ready().await
145 }
146
147 pub async fn shutdown(mut self) -> Result<()> {
151 let _ = self.shutdown_tx.send(());
152 if let Some(join) = self.event_loop_join.take() {
153 join.await?;
154 }
155 Ok(())
156 }
157}
158
159pub fn setup_socket(
163 shutdown_tx: &broadcast::Sender<()>,
164 event_tx: &DaemonEventSender,
165) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
166 let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
167 if let Some(parent) = resolved_path.parent() {
168 std::fs::create_dir_all(parent)?;
169 }
170 if resolved_path.exists() {
171 std::fs::remove_file(resolved_path)?;
172 }
173
174 let listener = tokio::net::UnixListener::bind(resolved_path)?;
175 tracing::info!("daemon listening on {}", resolved_path.display());
176
177 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
178 let socket_tx = event_tx.clone();
179 let join = tokio::spawn(accept_loop(
180 listener,
181 move |msg, reply| {
182 let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
183 },
184 socket_shutdown,
185 ));
186
187 Ok((resolved_path, join))
188}
189
190pub async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
192 let tx = event_tx.clone();
193 let on_message = Arc::new(move |msg: ClientMessage| {
194 let tx = tx.clone();
195 async move {
196 let (reply_tx, reply_rx) = mpsc::unbounded_channel();
197 let _ = tx.send(DaemonEvent::Message {
198 msg,
199 reply: reply_tx,
200 });
201 reply_rx
202 }
203 });
204
205 let agents_dir = wcore::paths::CONFIG_DIR.join(wcore::paths::AGENTS_DIR);
207 let default_agent = crate::config::load_agents_dir(&agents_dir)
208 .ok()
209 .and_then(|agents| agents.into_iter().next())
210 .map(|(stem, _)| compact_str::CompactString::from(stem))
211 .unwrap_or_else(|| compact_str::CompactString::from("assistant"));
212 channel::spawn_channels(&config.channel, default_agent, on_message).await;
213}
214
215pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
217 let (otx, orx) = oneshot::channel();
218 tokio::spawn(async move {
219 let _ = rx.recv().await;
220 let _ = otx.send(());
221 });
222 orx
223}