Skip to main content

walrus_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    daemon::event::{DaemonEvent, DaemonEventSender},
10    hook::DaemonHook,
11};
12use ::socket::server::accept_loop;
13use anyhow::Result;
14use model::ProviderManager;
15use std::{
16    collections::BTreeMap,
17    path::{Path, PathBuf},
18    sync::Arc,
19};
20use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
21use wcore::AgentConfig;
22use wcore::Runtime;
23use wcore::protocol::message::client::ClientMessage;
24
25pub(crate) mod builder;
26pub mod event;
27mod protocol;
28
29/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
30///
31/// The runtime is stored behind `Arc<RwLock<Arc<Runtime>>>` so that
32/// [`Daemon::reload`] can swap it atomically while in-flight requests that
33/// already cloned the inner `Arc` complete normally.
34#[derive(Clone)]
35pub struct Daemon {
36    /// The walrus runtime, swappable via [`Daemon::reload`].
37    pub runtime: Arc<RwLock<Arc<Runtime<ProviderManager, DaemonHook>>>>,
38    /// Config directory — stored so [`Daemon::reload`] can re-read config from disk.
39    pub(crate) config_dir: PathBuf,
40    /// Sender for the daemon event loop — cloned into `Runtime` as `ToolSender`
41    /// so agents can dispatch tool calls. Stored here so [`Daemon::reload`] can
42    /// pass a fresh clone into the rebuilt runtime.
43    pub(crate) event_tx: DaemonEventSender,
44    /// Per-agent configurations (name → config).
45    pub(crate) agents_config: BTreeMap<String, AgentConfig>,
46}
47
48impl Daemon {
49    /// Load config, build runtime, and start the event loop.
50    ///
51    /// Returns a [`DaemonHandle`] with the event sender exposed. The caller
52    /// spawns transports (socket, channels) using the handle's `event_tx`
53    /// and `shutdown_tx`, then integrates its own channels by cloning
54    /// `event_tx` and sending [`DaemonEvent::Message`] variants.
55    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
56        let config_path = config_dir.join("walrus.toml");
57        let config = DaemonConfig::load(&config_path)?;
58        tracing::info!("loaded configuration from {}", config_path.display());
59
60        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
61        let daemon = Daemon::build(&config, config_dir, event_tx.clone()).await?;
62
63        // Broadcast shutdown — all subsystems subscribe.
64        let (shutdown_tx, _) = broadcast::channel::<()>(1);
65        let shutdown_event_tx = event_tx.clone();
66        let mut shutdown_rx = shutdown_tx.subscribe();
67        tokio::spawn(async move {
68            let _ = shutdown_rx.recv().await;
69            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
70        });
71
72        // Per-agent heartbeat timers — only agents with interval > 0 run.
73        for (name, agent) in &config.agents {
74            if agent.heartbeat.interval == 0 {
75                continue;
76            }
77            let agent_name = compact_str::CompactString::from(name.as_str());
78            let heartbeat_tx = event_tx.clone();
79            let mut heartbeat_shutdown = shutdown_tx.subscribe();
80            let interval_secs = agent.heartbeat.interval * 60;
81            tokio::spawn(async move {
82                let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
83                tick.tick().await; // skip the immediate first tick
84                loop {
85                    tokio::select! {
86                        _ = tick.tick() => {
87                            let event = DaemonEvent::Heartbeat {
88                                agent: agent_name.clone(),
89                            };
90                            if heartbeat_tx.send(event).is_err() {
91                                break;
92                            }
93                        }
94                        _ = heartbeat_shutdown.recv() => break,
95                    }
96                }
97            });
98            tracing::info!(
99                "heartbeat timer started for '{}' (interval: {}m)",
100                name,
101                agent.heartbeat.interval,
102            );
103        }
104
105        let d = daemon.clone();
106        let event_loop_join = tokio::spawn(async move {
107            d.handle_events(event_rx).await;
108        });
109
110        Ok(DaemonHandle {
111            config,
112            event_tx,
113            shutdown_tx,
114            daemon,
115            event_loop_join: Some(event_loop_join),
116        })
117    }
118}
119
120/// Handle returned by [`Daemon::start`] — holds the event sender and shutdown trigger.
121///
122/// The caller spawns transports (socket, channels) using [`setup_socket`] and
123/// [`setup_channels`], passing clones of `event_tx` and `shutdown_tx`.
124pub struct DaemonHandle {
125    /// The loaded daemon configuration.
126    pub config: DaemonConfig,
127    /// Sender for injecting events into the daemon event loop.
128    /// Clone this and pass to transport setup functions.
129    pub event_tx: DaemonEventSender,
130    /// Broadcast shutdown — call `.subscribe()` for transport shutdown,
131    /// or use [`DaemonHandle::shutdown`] to trigger.
132    pub shutdown_tx: broadcast::Sender<()>,
133    daemon: Daemon,
134    event_loop_join: Option<tokio::task::JoinHandle<()>>,
135}
136
137impl DaemonHandle {
138    /// Wait until the active model provider is ready.
139    ///
140    /// Call this after socket setup so transports are available while the
141    /// model loads. No-op for remote providers.
142    pub async fn wait_until_ready(&self) -> Result<()> {
143        let rt = self.daemon.runtime.read().await;
144        rt.model.wait_until_ready().await
145    }
146
147    /// Trigger graceful shutdown and wait for the event loop to stop.
148    ///
149    /// Transport tasks (socket, channels) are the caller's responsibility.
150    pub async fn shutdown(mut self) -> Result<()> {
151        let _ = self.shutdown_tx.send(());
152        if let Some(join) = self.event_loop_join.take() {
153            join.await?;
154        }
155        Ok(())
156    }
157}
158
159// ── Transport setup helpers ──────────────────────────────────────────
160
161/// Bind the Unix domain socket and spawn the accept loop.
162pub fn setup_socket(
163    shutdown_tx: &broadcast::Sender<()>,
164    event_tx: &DaemonEventSender,
165) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
166    let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
167    if let Some(parent) = resolved_path.parent() {
168        std::fs::create_dir_all(parent)?;
169    }
170    if resolved_path.exists() {
171        std::fs::remove_file(resolved_path)?;
172    }
173
174    let listener = tokio::net::UnixListener::bind(resolved_path)?;
175    tracing::info!("daemon listening on {}", resolved_path.display());
176
177    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
178    let socket_tx = event_tx.clone();
179    let join = tokio::spawn(accept_loop(
180        listener,
181        move |msg, reply| {
182            let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
183        },
184        socket_shutdown,
185    ));
186
187    Ok((resolved_path, join))
188}
189
190/// Spawn channel transports.
191pub async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
192    let tx = event_tx.clone();
193    let on_message = Arc::new(move |msg: ClientMessage| {
194        let tx = tx.clone();
195        async move {
196            let (reply_tx, reply_rx) = mpsc::unbounded_channel();
197            let _ = tx.send(DaemonEvent::Message {
198                msg,
199                reply: reply_tx,
200            });
201            reply_rx
202        }
203    });
204
205    // Use the first configured agent name as the default, falling back to "assistant".
206    let agents_dir = wcore::paths::CONFIG_DIR.join(wcore::paths::AGENTS_DIR);
207    let default_agent = crate::config::load_agents_dir(&agents_dir)
208        .ok()
209        .and_then(|agents| agents.into_iter().next())
210        .map(|(stem, _)| compact_str::CompactString::from(stem))
211        .unwrap_or_else(|| compact_str::CompactString::from("assistant"));
212    channel::spawn_channels(&config.channel, default_agent, on_message).await;
213}
214
215/// Bind a TCP listener and spawn the accept loop.
216///
217/// Tries the default port (6688), falls back to an OS-assigned port.
218/// Returns the join handle and the actual port bound.
219pub fn setup_tcp(
220    shutdown_tx: &broadcast::Sender<()>,
221    event_tx: &DaemonEventSender,
222) -> Result<(tokio::task::JoinHandle<()>, u16)> {
223    let (std_listener, addr) = tcp::server::bind()?;
224    let listener = tokio::net::TcpListener::from_std(std_listener)?;
225    tracing::info!("daemon listening on tcp://{addr}");
226
227    let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
228    let tcp_tx = event_tx.clone();
229    let join = tokio::spawn(tcp::server::accept_loop(
230        listener,
231        move |msg, reply| {
232            let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
233        },
234        tcp_shutdown,
235    ));
236
237    Ok((join, addr.port()))
238}
239
240/// Bridge a broadcast receiver into a oneshot receiver.
241pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
242    let (otx, orx) = oneshot::channel();
243    tokio::spawn(async move {
244        let _ = rx.recv().await;
245        let _ = otx.send(());
246    });
247    orx
248}