Skip to main content

walrus_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    daemon::event::{DaemonEvent, DaemonEventSender},
10    hook::DaemonHook,
11};
12use ::socket::server::accept_loop;
13use anyhow::Result;
14use compact_str::CompactString;
15use model::ProviderManager;
16use std::{
17    path::{Path, PathBuf},
18    sync::Arc,
19};
20use tokio::sync::{broadcast, mpsc, oneshot};
21use wcore::Runtime;
22
23pub(crate) mod builder;
24pub(crate) mod event;
25mod protocol;
26
27/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
28#[derive(Clone)]
29pub struct Daemon {
30    /// The walrus runtime.
31    pub runtime: Arc<Runtime<ProviderManager, DaemonHook>>,
32}
33
34/// Handle returned by [`Daemon::start`] — holds the socket path and shutdown trigger.
35pub struct DaemonHandle {
36    /// The Unix domain socket path the daemon is listening on.
37    pub socket_path: PathBuf,
38    shutdown_tx: Option<broadcast::Sender<()>>,
39    socket_join: Option<tokio::task::JoinHandle<()>>,
40    event_loop_join: Option<tokio::task::JoinHandle<()>>,
41}
42
43impl DaemonHandle {
44    /// Trigger graceful shutdown and wait for all subsystems to stop.
45    pub async fn shutdown(mut self) -> Result<()> {
46        if let Some(tx) = self.shutdown_tx.take() {
47            let _ = tx.send(());
48        }
49        if let Some(join) = self.socket_join.take() {
50            join.await?;
51        }
52        if let Some(join) = self.event_loop_join.take() {
53            join.await?;
54        }
55        let _ = std::fs::remove_file(&self.socket_path);
56        Ok(())
57    }
58}
59
60impl Daemon {
61    /// Load config, build runtime, bind the Unix domain socket, and start serving.
62    ///
63    /// Returns a [`DaemonHandle`] with the socket path and a shutdown trigger.
64    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
65        let config_path = config_dir.join("walrus.toml");
66        let config = DaemonConfig::load(&config_path)?;
67        tracing::info!("loaded configuration from {}", config_path.display());
68        Self::start_with_config(&config, config_dir).await
69    }
70
71    /// Start with an already-loaded config. Useful when the caller resolves
72    /// config separately (e.g. CLI with scaffold logic).
73    pub async fn start_with_config(
74        config: &DaemonConfig,
75        config_dir: &Path,
76    ) -> Result<DaemonHandle> {
77        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
78        let runtime = builder::Builder::new(config, config_dir).build().await?;
79        let runtime = Arc::new(runtime);
80        let daemon = Daemon {
81            runtime: Arc::clone(&runtime),
82        };
83
84        // Broadcast shutdown — all subsystems subscribe.
85        let (shutdown_tx, _) = broadcast::channel::<()>(1);
86
87        // Bridge broadcast shutdown into the event loop.
88        let shutdown_event_tx = event_tx.clone();
89        let mut shutdown_rx = shutdown_tx.subscribe();
90        tokio::spawn(async move {
91            let _ = shutdown_rx.recv().await;
92            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
93        });
94
95        let (socket_path, socket_join) = setup_socket(&shutdown_tx, &event_tx)?;
96        setup_channels(config, &event_tx).await;
97
98        let d = daemon.clone();
99        let event_loop_join = tokio::spawn(async move {
100            d.handle_events(event_rx).await;
101        });
102
103        Ok(DaemonHandle {
104            socket_path,
105            shutdown_tx: Some(shutdown_tx),
106            socket_join: Some(socket_join),
107            event_loop_join: Some(event_loop_join),
108        })
109    }
110}
111
112// ── Transport setup helpers ──────────────────────────────────────────
113
114/// Bind the Unix domain socket and spawn the accept loop.
115fn setup_socket(
116    shutdown_tx: &broadcast::Sender<()>,
117    event_tx: &DaemonEventSender,
118) -> Result<(PathBuf, tokio::task::JoinHandle<()>)> {
119    let resolved_path = crate::config::socket_path();
120    if let Some(parent) = resolved_path.parent() {
121        std::fs::create_dir_all(parent)?;
122    }
123    if resolved_path.exists() {
124        std::fs::remove_file(&resolved_path)?;
125    }
126
127    let listener = tokio::net::UnixListener::bind(&resolved_path)?;
128    tracing::info!("daemon listening on {}", resolved_path.display());
129
130    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
131    let socket_tx = event_tx.clone();
132    let join = tokio::spawn(accept_loop(
133        listener,
134        move |msg, reply| {
135            let _ = socket_tx.send(DaemonEvent::Socket { msg, reply });
136        },
137        socket_shutdown,
138    ));
139
140    Ok((resolved_path, join))
141}
142
143/// Build the channel router and spawn channel transports.
144async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
145    let channels = config.channels.values().cloned().collect::<Vec<_>>();
146    let router = channel::build_router(&channels);
147    let router = Arc::new(router);
148    let channel_tx = event_tx.clone();
149    let on_message = Arc::new(move |agent: CompactString, content: String| {
150        let tx = channel_tx.clone();
151        async move {
152            let (reply_tx, reply_rx) = oneshot::channel();
153            let event = DaemonEvent::Channel {
154                agent,
155                content,
156                reply: reply_tx,
157            };
158            if tx.send(event).is_err() {
159                return Err("event loop closed".to_owned());
160            }
161            reply_rx
162                .await
163                .unwrap_or(Err("event loop dropped".to_owned()))
164        }
165    });
166    channel::spawn_channels(&channels, router, on_message).await;
167}
168
169/// Bridge a broadcast receiver into a oneshot receiver.
170fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
171    let (otx, orx) = oneshot::channel();
172    tokio::spawn(async move {
173        let _ = rx.recv().await;
174        let _ = otx.send(());
175    });
176    orx
177}