Skip to main content

walrus_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    daemon::event::{DaemonEvent, DaemonEventSender},
10    hook::DaemonHook,
11};
12use anyhow::Result;
13use compact_str::CompactString;
14use model::ProviderManager;
15use runtime::Runtime;
16use std::{
17    path::{Path, PathBuf},
18    sync::Arc,
19};
20use tokio::sync::{broadcast, mpsc, oneshot};
21
22pub(crate) mod builder;
23pub(crate) mod event;
24mod protocol;
25
26/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
27#[derive(Clone)]
28pub struct Daemon {
29    /// The walrus runtime.
30    pub runtime: Arc<Runtime<ProviderManager, DaemonHook>>,
31}
32
33/// Handle returned by [`Daemon::start`] — holds the socket path and shutdown trigger.
34pub struct DaemonHandle {
35    /// The Unix domain socket path the daemon is listening on.
36    pub socket_path: PathBuf,
37    shutdown_tx: Option<broadcast::Sender<()>>,
38    socket_join: Option<tokio::task::JoinHandle<()>>,
39    event_loop_join: Option<tokio::task::JoinHandle<()>>,
40}
41
42impl DaemonHandle {
43    /// Trigger graceful shutdown and wait for all subsystems to stop.
44    pub async fn shutdown(mut self) -> Result<()> {
45        if let Some(tx) = self.shutdown_tx.take() {
46            let _ = tx.send(());
47        }
48        if let Some(join) = self.socket_join.take() {
49            join.await?;
50        }
51        if let Some(join) = self.event_loop_join.take() {
52            join.await?;
53        }
54        let _ = std::fs::remove_file(&self.socket_path);
55        Ok(())
56    }
57}
58
59impl Daemon {
60    /// Load config, build runtime, bind the Unix domain socket, and start serving.
61    ///
62    /// Returns a [`DaemonHandle`] with the socket path and a shutdown trigger.
63    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
64        let config_path = config_dir.join("walrus.toml");
65        let config = DaemonConfig::load(&config_path)?;
66        tracing::info!("loaded configuration from {}", config_path.display());
67        Self::start_with_config(&config, config_dir).await
68    }
69
70    /// Start with an already-loaded config. Useful when the caller resolves
71    /// config separately (e.g. CLI with scaffold logic).
72    pub async fn start_with_config(
73        config: &DaemonConfig,
74        config_dir: &Path,
75    ) -> Result<DaemonHandle> {
76        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
77
78        let runtime = builder::Builder::new(config, config_dir, event_tx.clone())
79            .build()
80            .await?;
81        let runtime = Arc::new(runtime);
82        let daemon = Daemon {
83            runtime: Arc::clone(&runtime),
84        };
85
86        // Broadcast shutdown — all subsystems subscribe.
87        let (shutdown_tx, _) = broadcast::channel::<()>(1);
88
89        // Bridge broadcast shutdown into the event loop.
90        let shutdown_event_tx = event_tx.clone();
91        let mut shutdown_rx = shutdown_tx.subscribe();
92        tokio::spawn(async move {
93            let _ = shutdown_rx.recv().await;
94            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
95        });
96
97        let (socket_path, socket_join) = setup_socket(&shutdown_tx, &event_tx)?;
98        setup_channels(config, &event_tx).await;
99        let cron_add_tx = setup_cron(&runtime, &shutdown_tx, &event_tx).await;
100
101        let d = daemon.clone();
102        let event_loop_join = tokio::spawn(async move {
103            d.handle_events(event_rx, cron_add_tx).await;
104        });
105
106        Ok(DaemonHandle {
107            socket_path,
108            shutdown_tx: Some(shutdown_tx),
109            socket_join: Some(socket_join),
110            event_loop_join: Some(event_loop_join),
111        })
112    }
113}
114
115// ── Transport setup helpers ──────────────────────────────────────────
116
117/// Bind the Unix domain socket and spawn the accept loop.
118fn setup_socket(
119    shutdown_tx: &broadcast::Sender<()>,
120    event_tx: &DaemonEventSender,
121) -> Result<(PathBuf, tokio::task::JoinHandle<()>)> {
122    let resolved_path = crate::config::socket_path();
123    if let Some(parent) = resolved_path.parent() {
124        std::fs::create_dir_all(parent)?;
125    }
126    if resolved_path.exists() {
127        std::fs::remove_file(&resolved_path)?;
128    }
129
130    let listener = tokio::net::UnixListener::bind(&resolved_path)?;
131    tracing::info!("daemon listening on {}", resolved_path.display());
132
133    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
134    let socket_tx = event_tx.clone();
135    let join = tokio::spawn(socket::server::accept_loop(
136        listener,
137        move |msg, reply| {
138            let _ = socket_tx.send(DaemonEvent::Socket { msg, reply });
139        },
140        socket_shutdown,
141    ));
142
143    Ok((resolved_path, join))
144}
145
146/// Build the channel router and spawn channel transports.
147async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
148    let router = router::build_router(&config.channels);
149    let router = Arc::new(router);
150    let channel_tx = event_tx.clone();
151    let on_message = Arc::new(move |agent: CompactString, content: String| {
152        let tx = channel_tx.clone();
153        async move {
154            let (reply_tx, reply_rx) = oneshot::channel();
155            let event = DaemonEvent::Channel {
156                agent,
157                content,
158                reply: reply_tx,
159            };
160            if tx.send(event).is_err() {
161                return Err("event loop closed".to_owned());
162            }
163            reply_rx
164                .await
165                .unwrap_or(Err("event loop dropped".to_owned()))
166        }
167    });
168    router::spawn_channels(&config.channels, router, on_message).await;
169}
170
171/// Spawn the cron scheduler wired into the event loop.
172async fn setup_cron(
173    runtime: &Arc<Runtime<ProviderManager, DaemonHook>>,
174    shutdown_tx: &broadcast::Sender<()>,
175    event_tx: &DaemonEventSender,
176) -> mpsc::UnboundedSender<wcron::CronJob> {
177    let cron_jobs = runtime.hook.cron.jobs().await;
178    let cron_tx = event_tx.clone();
179    wcron::spawn_with_callback(
180        cron_jobs,
181        move |job| {
182            let tx = cron_tx.clone();
183            async move {
184                let _ = tx.send(DaemonEvent::Cron {
185                    agent: job.agent.clone(),
186                    content: job.message.clone(),
187                    job_name: job.name.clone(),
188                });
189            }
190        },
191        shutdown_tx.subscribe(),
192    )
193}
194
195/// Bridge a broadcast receiver into a oneshot receiver.
196fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
197    let (otx, orx) = oneshot::channel();
198    tokio::spawn(async move {
199        let _ = rx.recv().await;
200        let _ = otx.send(());
201    });
202    orx
203}