Skip to main content

walrus_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    config::{GLOBAL_CONFIG_DIR, scaffold_work_dir},
10    daemon::event::{DaemonEvent, DaemonEventSender},
11    hook::DaemonHook,
12};
13use ::socket::server::accept_loop;
14use anyhow::Result;
15use model::ProviderManager;
16use std::{
17    path::{Path, PathBuf},
18    sync::Arc,
19};
20use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
21use wcore::Runtime;
22use wcore::protocol::message::client::ClientMessage;
23
24pub(crate) mod builder;
25pub mod event;
26mod protocol;
27
28/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
29///
30/// The runtime is stored behind `Arc<RwLock<Arc<Runtime>>>` so that
31/// [`Daemon::reload`] can swap it atomically while in-flight requests that
32/// already cloned the inner `Arc` complete normally.
33#[derive(Clone)]
34pub struct Daemon {
35    /// The walrus runtime, swappable via [`Daemon::reload`].
36    pub runtime: Arc<RwLock<Arc<Runtime<ProviderManager, DaemonHook>>>>,
37    /// Config directory — stored so [`Daemon::reload`] can re-read config from disk.
38    pub(crate) config_dir: PathBuf,
39    /// Sender for the daemon event loop — cloned into `Runtime` as `ToolSender`
40    /// so agents can dispatch tool calls. Stored here so [`Daemon::reload`] can
41    /// pass a fresh clone into the rebuilt runtime.
42    pub(crate) event_tx: DaemonEventSender,
43}
44
45impl Daemon {
46    /// Load config, build runtime, and start the event loop.
47    ///
48    /// Returns a [`DaemonHandle`] with the event sender exposed. The caller
49    /// spawns transports (socket, channels) using the handle's `event_tx`
50    /// and `shutdown_tx`, then integrates its own channels by cloning
51    /// `event_tx` and sending [`DaemonEvent::Message`] variants.
52    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
53        let config_path = config_dir.join("walrus.toml");
54        let config = DaemonConfig::load(&config_path)?;
55        tracing::info!("loaded configuration from {}", config_path.display());
56
57        scaffold_work_dir(&GLOBAL_CONFIG_DIR, config.work_dir.as_deref())?;
58        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
59        let daemon = Daemon::build(&config, config_dir, event_tx.clone()).await?;
60
61        // Broadcast shutdown — all subsystems subscribe.
62        let (shutdown_tx, _) = broadcast::channel::<()>(1);
63        let shutdown_event_tx = event_tx.clone();
64        let mut shutdown_rx = shutdown_tx.subscribe();
65        tokio::spawn(async move {
66            let _ = shutdown_rx.recv().await;
67            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
68        });
69
70        let d = daemon.clone();
71        let event_loop_join = tokio::spawn(async move {
72            d.handle_events(event_rx).await;
73        });
74
75        Ok(DaemonHandle {
76            config,
77            event_tx,
78            shutdown_tx,
79            event_loop_join: Some(event_loop_join),
80        })
81    }
82}
83
84/// Handle returned by [`Daemon::start`] — holds the event sender and shutdown trigger.
85///
86/// The caller spawns transports (socket, channels) using [`setup_socket`] and
87/// [`setup_channels`], passing clones of `event_tx` and `shutdown_tx`.
88pub struct DaemonHandle {
89    /// The loaded daemon configuration.
90    pub config: DaemonConfig,
91    /// Sender for injecting events into the daemon event loop.
92    /// Clone this and pass to transport setup functions.
93    pub event_tx: DaemonEventSender,
94    /// Broadcast shutdown — call `.subscribe()` for transport shutdown,
95    /// or use [`DaemonHandle::shutdown`] to trigger.
96    pub shutdown_tx: broadcast::Sender<()>,
97    event_loop_join: Option<tokio::task::JoinHandle<()>>,
98}
99
100impl DaemonHandle {
101    /// Trigger graceful shutdown and wait for the event loop to stop.
102    ///
103    /// Transport tasks (socket, channels) are the caller's responsibility.
104    pub async fn shutdown(mut self) -> Result<()> {
105        let _ = self.shutdown_tx.send(());
106        if let Some(join) = self.event_loop_join.take() {
107            join.await?;
108        }
109        Ok(())
110    }
111}
112
113// ── Transport setup helpers ──────────────────────────────────────────
114
115/// Bind the Unix domain socket and spawn the accept loop.
116pub fn setup_socket(
117    shutdown_tx: &broadcast::Sender<()>,
118    event_tx: &DaemonEventSender,
119) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
120    let resolved_path: &'static Path = &crate::config::SOCKET_PATH;
121    if let Some(parent) = resolved_path.parent() {
122        std::fs::create_dir_all(parent)?;
123    }
124    if resolved_path.exists() {
125        std::fs::remove_file(resolved_path)?;
126    }
127
128    let listener = tokio::net::UnixListener::bind(resolved_path)?;
129    tracing::info!("daemon listening on {}", resolved_path.display());
130
131    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
132    let socket_tx = event_tx.clone();
133    let join = tokio::spawn(accept_loop(
134        listener,
135        move |msg, reply| {
136            let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
137        },
138        socket_shutdown,
139    ));
140
141    Ok((resolved_path, join))
142}
143
144/// Spawn channel transports.
145pub async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
146    let tx = event_tx.clone();
147    let on_message = Arc::new(move |msg: ClientMessage| {
148        let tx = tx.clone();
149        async move {
150            let (reply_tx, reply_rx) = mpsc::unbounded_channel();
151            let _ = tx.send(DaemonEvent::Message {
152                msg,
153                reply: reply_tx,
154            });
155            reply_rx
156        }
157    });
158
159    // Use the first configured agent name as the default, falling back to "assistant".
160    let agents_dir = crate::config::GLOBAL_CONFIG_DIR.join(crate::config::AGENTS_DIR);
161    let default_agent = crate::config::load_agents_dir(&agents_dir)
162        .ok()
163        .and_then(|agents| agents.into_iter().next())
164        .map(|a| a.name)
165        .unwrap_or_else(|| compact_str::CompactString::from("assistant"));
166    channel::spawn_channels(&config.channel, default_agent, on_message).await;
167}
168
169/// Bridge a broadcast receiver into a oneshot receiver.
170pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
171    let (otx, orx) = oneshot::channel();
172    tokio::spawn(async move {
173        let _ = rx.recv().await;
174        let _ = otx.send(());
175    });
176    orx
177}