Skip to main content

walrus_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    daemon::event::{DaemonEvent, DaemonEventSender},
10    hook::DaemonHook,
11    service::ServiceManager,
12};
13use ::socket::server::accept_loop;
14use anyhow::Result;
15use model::ProviderManager;
16use std::{
17    collections::BTreeMap,
18    path::{Path, PathBuf},
19    sync::Arc,
20};
21use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
22use wcore::AgentConfig;
23use wcore::Runtime;
24
25pub(crate) mod builder;
26pub mod event;
27mod protocol;
28
29/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
30///
31/// The runtime is stored behind `Arc<RwLock<Arc<Runtime>>>` so that
32/// [`Daemon::reload`] can swap it atomically while in-flight requests that
33/// already cloned the inner `Arc` complete normally.
34#[derive(Clone)]
35pub struct Daemon {
36    /// The walrus runtime, swappable via [`Daemon::reload`].
37    pub runtime: Arc<RwLock<Arc<Runtime<ProviderManager, DaemonHook>>>>,
38    /// Config directory — stored so [`Daemon::reload`] can re-read config from disk.
39    pub(crate) config_dir: PathBuf,
40    /// Sender for the daemon event loop — cloned into `Runtime` as `ToolSender`
41    /// so agents can dispatch tool calls. Stored here so [`Daemon::reload`] can
42    /// pass a fresh clone into the rebuilt runtime.
43    pub(crate) event_tx: DaemonEventSender,
44    /// Per-agent configurations (name → config).
45    pub(crate) agents_config: BTreeMap<String, AgentConfig>,
46}
47
48impl Daemon {
49    /// Load config, build runtime, and start the event loop.
50    ///
51    /// Returns a [`DaemonHandle`] with the event sender exposed. The caller
52    /// spawns transports (socket, channels) using the handle's `event_tx`
53    /// and `shutdown_tx`, then integrates its own channels by cloning
54    /// `event_tx` and sending [`DaemonEvent::Message`] variants.
55    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
56        let config_path = config_dir.join("walrus.toml");
57        let config = DaemonConfig::load(&config_path)?;
58        tracing::info!("loaded configuration from {}", config_path.display());
59
60        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
61        let (daemon, service_manager) =
62            Daemon::build(&config, config_dir, event_tx.clone()).await?;
63
64        // Broadcast shutdown — all subsystems subscribe.
65        let (shutdown_tx, _) = broadcast::channel::<()>(1);
66        let shutdown_event_tx = event_tx.clone();
67        let mut shutdown_rx = shutdown_tx.subscribe();
68        tokio::spawn(async move {
69            let _ = shutdown_rx.recv().await;
70            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
71        });
72
73        // Per-agent heartbeat timers — only agents with interval > 0 run.
74        for (name, agent) in &config.agents {
75            if agent.heartbeat.interval == 0 {
76                continue;
77            }
78            let agent_name = compact_str::CompactString::from(name.as_str());
79            let heartbeat_tx = event_tx.clone();
80            let mut heartbeat_shutdown = shutdown_tx.subscribe();
81            let interval_secs = agent.heartbeat.interval * 60;
82            tokio::spawn(async move {
83                let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
84                tick.tick().await; // skip the immediate first tick
85                loop {
86                    tokio::select! {
87                        _ = tick.tick() => {
88                            let event = DaemonEvent::Heartbeat {
89                                agent: agent_name.clone(),
90                            };
91                            if heartbeat_tx.send(event).is_err() {
92                                break;
93                            }
94                        }
95                        _ = heartbeat_shutdown.recv() => break,
96                    }
97                }
98            });
99            tracing::info!(
100                "heartbeat timer started for '{}' (interval: {}m)",
101                name,
102                agent.heartbeat.interval,
103            );
104        }
105
106        let d = daemon.clone();
107        let event_loop_join = tokio::spawn(async move {
108            d.handle_events(event_rx).await;
109        });
110
111        Ok(DaemonHandle {
112            config,
113            event_tx,
114            shutdown_tx,
115            daemon,
116            event_loop_join: Some(event_loop_join),
117            service_manager,
118        })
119    }
120}
121
122/// Handle returned by [`Daemon::start`] — holds the event sender and shutdown trigger.
123///
124/// The caller spawns transports (socket, TCP) using [`setup_socket`] and
125/// [`setup_tcp`], passing clones of `event_tx` and `shutdown_tx`.
126pub struct DaemonHandle {
127    /// The loaded daemon configuration.
128    pub config: DaemonConfig,
129    /// Sender for injecting events into the daemon event loop.
130    /// Clone this and pass to transport setup functions.
131    pub event_tx: DaemonEventSender,
132    /// Broadcast shutdown — call `.subscribe()` for transport shutdown,
133    /// or use [`DaemonHandle::shutdown`] to trigger.
134    pub shutdown_tx: broadcast::Sender<()>,
135    #[allow(unused)]
136    daemon: Daemon,
137    event_loop_join: Option<tokio::task::JoinHandle<()>>,
138    /// Managed child services — shutdown on daemon stop.
139    service_manager: Option<ServiceManager>,
140}
141
142impl DaemonHandle {
143    /// Wait until the active model provider is ready.
144    ///
145    /// No-op for remote providers. Kept for API compatibility.
146    pub async fn wait_until_ready(&self) -> Result<()> {
147        Ok(())
148    }
149
150    /// Trigger graceful shutdown and wait for the event loop to stop.
151    ///
152    /// Transport tasks (socket, channels) are the caller's responsibility.
153    pub async fn shutdown(mut self) -> Result<()> {
154        // Shutdown managed services before the event loop.
155        if let Some(ref mut sm) = self.service_manager {
156            sm.shutdown_all().await;
157        }
158        let _ = self.shutdown_tx.send(());
159        if let Some(join) = self.event_loop_join.take() {
160            join.await?;
161        }
162        Ok(())
163    }
164}
165
166// ── Transport setup helpers ──────────────────────────────────────────
167
168/// Bind the Unix domain socket and spawn the accept loop.
169pub fn setup_socket(
170    shutdown_tx: &broadcast::Sender<()>,
171    event_tx: &DaemonEventSender,
172) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
173    let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
174    if let Some(parent) = resolved_path.parent() {
175        std::fs::create_dir_all(parent)?;
176    }
177    if resolved_path.exists() {
178        std::fs::remove_file(resolved_path)?;
179    }
180
181    let listener = tokio::net::UnixListener::bind(resolved_path)?;
182    tracing::info!("daemon listening on {}", resolved_path.display());
183
184    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
185    let socket_tx = event_tx.clone();
186    let join = tokio::spawn(accept_loop(
187        listener,
188        move |msg, reply| {
189            let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
190        },
191        socket_shutdown,
192    ));
193
194    Ok((resolved_path, join))
195}
196
197/// Bind a TCP listener and spawn the accept loop.
198///
199/// Tries the default port (6688), falls back to an OS-assigned port.
200/// Returns the join handle and the actual port bound.
201pub fn setup_tcp(
202    shutdown_tx: &broadcast::Sender<()>,
203    event_tx: &DaemonEventSender,
204) -> Result<(tokio::task::JoinHandle<()>, u16)> {
205    let (std_listener, addr) = tcp::server::bind()?;
206    let listener = tokio::net::TcpListener::from_std(std_listener)?;
207    tracing::info!("daemon listening on tcp://{addr}");
208
209    let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
210    let tcp_tx = event_tx.clone();
211    let join = tokio::spawn(tcp::server::accept_loop(
212        listener,
213        move |msg, reply| {
214            let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
215        },
216        tcp_shutdown,
217    ));
218
219    Ok((join, addr.port()))
220}
221
222/// Bridge a broadcast receiver into a oneshot receiver.
223pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
224    let (otx, orx) = oneshot::channel();
225    tokio::spawn(async move {
226        let _ = rx.recv().await;
227        let _ = otx.send(());
228    });
229    orx
230}