Skip to main content

walrus_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    daemon::event::{DaemonEvent, DaemonEventSender},
10    hook::DaemonHook,
11    service::ServiceManager,
12};
13use ::transport::uds::accept_loop;
14use anyhow::Result;
15use model::ProviderRegistry;
16use std::{
17    path::{Path, PathBuf},
18    sync::Arc,
19};
20use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
21use wcore::Runtime;
22
23pub(crate) mod builder;
24pub mod event;
25mod protocol;
26
27/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
28///
29/// The runtime is stored behind `Arc<RwLock<Arc<Runtime>>>` so that
30/// [`Daemon::reload`] can swap it atomically while in-flight requests that
31/// already cloned the inner `Arc` complete normally.
32#[derive(Clone)]
33pub struct Daemon {
34    /// The walrus runtime, swappable via [`Daemon::reload`].
35    pub runtime: Arc<RwLock<Arc<Runtime<ProviderRegistry, DaemonHook>>>>,
36    /// Config directory — stored so [`Daemon::reload`] can re-read config from disk.
37    pub(crate) config_dir: PathBuf,
38    /// Sender for the daemon event loop — cloned into `Runtime` as `ToolSender`
39    /// so agents can dispatch tool calls. Stored here so [`Daemon::reload`] can
40    /// pass a fresh clone into the rebuilt runtime.
41    pub(crate) event_tx: DaemonEventSender,
42}
43
44impl Daemon {
45    /// Load config, build runtime, and start the event loop.
46    ///
47    /// Returns a [`DaemonHandle`] with the event sender exposed. The caller
48    /// spawns transports (socket, channels) using the handle's `event_tx`
49    /// and `shutdown_tx`, then integrates its own channels by cloning
50    /// `event_tx` and sending [`DaemonEvent::Message`] variants.
51    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
52        let config_path = config_dir.join("walrus.toml");
53        let config = DaemonConfig::load(&config_path)?;
54        tracing::info!("loaded configuration from {}", config_path.display());
55
56        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
57        let (daemon, service_manager) =
58            Daemon::build(&config, config_dir, event_tx.clone()).await?;
59
60        // Broadcast shutdown — all subsystems subscribe.
61        let (shutdown_tx, _) = broadcast::channel::<()>(1);
62        let shutdown_event_tx = event_tx.clone();
63        let mut shutdown_rx = shutdown_tx.subscribe();
64        tokio::spawn(async move {
65            let _ = shutdown_rx.recv().await;
66            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
67        });
68
69        // Per-agent heartbeat timers — only agents with interval > 0 run.
70        for (name, agent) in &config.agents {
71            if agent.heartbeat.interval == 0 {
72                continue;
73            }
74            let agent_name = name.clone();
75            let heartbeat_tx = event_tx.clone();
76            let mut heartbeat_shutdown = shutdown_tx.subscribe();
77            let interval_secs = agent.heartbeat.interval * 60;
78            tokio::spawn(async move {
79                let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
80                tick.tick().await; // skip the immediate first tick
81                loop {
82                    tokio::select! {
83                        _ = tick.tick() => {
84                            let event = DaemonEvent::Heartbeat {
85                                agent: agent_name.clone(),
86                            };
87                            if heartbeat_tx.send(event).is_err() {
88                                break;
89                            }
90                        }
91                        _ = heartbeat_shutdown.recv() => break,
92                    }
93                }
94            });
95            tracing::info!(
96                "heartbeat timer started for '{}' (interval: {}m)",
97                name,
98                agent.heartbeat.interval,
99            );
100        }
101
102        let d = daemon.clone();
103        let event_loop_join = tokio::spawn(async move {
104            d.handle_events(event_rx).await;
105        });
106
107        Ok(DaemonHandle {
108            config,
109            event_tx,
110            shutdown_tx,
111            daemon,
112            event_loop_join: Some(event_loop_join),
113            service_manager,
114        })
115    }
116}
117
118/// Handle returned by [`Daemon::start`] — holds the event sender and shutdown trigger.
119///
120/// The caller spawns transports (socket, TCP) using [`setup_socket`] and
121/// [`setup_tcp`], passing clones of `event_tx` and `shutdown_tx`.
122pub struct DaemonHandle {
123    /// The loaded daemon configuration.
124    pub config: DaemonConfig,
125    /// Sender for injecting events into the daemon event loop.
126    /// Clone this and pass to transport setup functions.
127    pub event_tx: DaemonEventSender,
128    /// Broadcast shutdown — call `.subscribe()` for transport shutdown,
129    /// or use [`DaemonHandle::shutdown`] to trigger.
130    pub shutdown_tx: broadcast::Sender<()>,
131    #[allow(unused)]
132    daemon: Daemon,
133    event_loop_join: Option<tokio::task::JoinHandle<()>>,
134    /// Managed child services — shutdown on daemon stop.
135    service_manager: Option<ServiceManager>,
136}
137
138impl DaemonHandle {
139    /// Wait until the active model provider is ready.
140    ///
141    /// No-op for remote providers. Kept for API compatibility.
142    pub async fn wait_until_ready(&self) -> Result<()> {
143        Ok(())
144    }
145
146    /// Trigger graceful shutdown and wait for the event loop to stop.
147    ///
148    /// Transport tasks (socket, channels) are the caller's responsibility.
149    pub async fn shutdown(mut self) -> Result<()> {
150        // Shutdown managed services before the event loop.
151        if let Some(ref mut sm) = self.service_manager {
152            sm.shutdown_all().await;
153        }
154        let _ = self.shutdown_tx.send(());
155        if let Some(join) = self.event_loop_join.take() {
156            join.await?;
157        }
158        Ok(())
159    }
160}
161
162// ── Transport setup helpers ──────────────────────────────────────────
163
164/// Bind the Unix domain socket and spawn the accept loop.
165pub fn setup_socket(
166    shutdown_tx: &broadcast::Sender<()>,
167    event_tx: &DaemonEventSender,
168) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
169    let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
170    if let Some(parent) = resolved_path.parent() {
171        std::fs::create_dir_all(parent)?;
172    }
173    if resolved_path.exists() {
174        std::fs::remove_file(resolved_path)?;
175    }
176
177    let listener = tokio::net::UnixListener::bind(resolved_path)?;
178    tracing::info!("daemon listening on {}", resolved_path.display());
179
180    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
181    let socket_tx = event_tx.clone();
182    let join = tokio::spawn(accept_loop(
183        listener,
184        move |msg, reply| {
185            let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
186        },
187        socket_shutdown,
188    ));
189
190    Ok((resolved_path, join))
191}
192
193/// Bind a TCP listener and spawn the accept loop.
194///
195/// Tries the default port (6688), falls back to an OS-assigned port.
196/// Returns the join handle and the actual port bound.
197pub fn setup_tcp(
198    shutdown_tx: &broadcast::Sender<()>,
199    event_tx: &DaemonEventSender,
200) -> Result<(tokio::task::JoinHandle<()>, u16)> {
201    let (std_listener, addr) = transport::tcp::bind()?;
202    let listener = tokio::net::TcpListener::from_std(std_listener)?;
203    tracing::info!("daemon listening on tcp://{addr}");
204
205    let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
206    let tcp_tx = event_tx.clone();
207    let join = tokio::spawn(transport::tcp::accept_loop(
208        listener,
209        move |msg, reply| {
210            let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
211        },
212        tcp_shutdown,
213    ));
214
215    Ok((join, addr.port()))
216}
217
218/// Bridge a broadcast receiver into a oneshot receiver.
219pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
220    let (otx, orx) = oneshot::channel();
221    tokio::spawn(async move {
222        let _ = rx.recv().await;
223        let _ = otx.send(());
224    });
225    orx
226}