Skip to main content

crabtalk_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    cron::CronStore,
10    daemon::event::{DaemonEvent, DaemonEventSender},
11    hook::DaemonHook,
12};
13use anyhow::Result;
14use model::ProviderRegistry;
15use std::{
16    path::{Path, PathBuf},
17    sync::Arc,
18};
19use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
20use wcore::Runtime;
21
22pub(crate) mod builder;
23pub mod event;
24mod protocol;
25
26/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
27///
28/// The runtime is stored behind `Arc<RwLock<Arc<Runtime>>>` so that
29/// [`Daemon::reload`] can swap it atomically while in-flight requests that
30/// already cloned the inner `Arc` complete normally.
31#[derive(Clone)]
32pub struct Daemon {
33    /// The crabtalk runtime, swappable via [`Daemon::reload`].
34    pub runtime: Arc<RwLock<Arc<Runtime<ProviderRegistry, DaemonHook>>>>,
35    /// Config directory — stored so [`Daemon::reload`] can re-read config from disk.
36    pub(crate) config_dir: PathBuf,
37    /// Sender for the daemon event loop — cloned into `Runtime` as `ToolSender`
38    /// so agents can dispatch tool calls. Stored here so [`Daemon::reload`] can
39    /// pass a fresh clone into the rebuilt runtime.
40    pub(crate) event_tx: DaemonEventSender,
41    /// When the daemon was started (for uptime calculation).
42    pub(crate) started_at: std::time::Instant,
43    /// Daemon-level cron scheduler. Survives runtime reloads.
44    pub(crate) crons: Arc<Mutex<CronStore>>,
45}
46
47impl Daemon {
48    /// Load config, build runtime, and start the event loop.
49    ///
50    /// Returns a [`DaemonHandle`] with the event sender exposed. The caller
51    /// spawns transports (socket, channels) using the handle's `event_tx`
52    /// and `shutdown_tx`, then integrates its own channels by cloning
53    /// `event_tx` and sending [`DaemonEvent::Message`] variants.
54    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
55        let config_path = config_dir.join(wcore::paths::CONFIG_FILE);
56        let config = DaemonConfig::load(&config_path)?;
57        tracing::info!("loaded configuration from {}", config_path.display());
58
59        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
60
61        // Broadcast shutdown — all subsystems subscribe.
62        let (shutdown_tx, _) = broadcast::channel::<()>(1);
63        let shutdown_event_tx = event_tx.clone();
64        let mut shutdown_rx = shutdown_tx.subscribe();
65        tokio::spawn(async move {
66            let _ = shutdown_rx.recv().await;
67            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
68        });
69
70        let daemon =
71            Daemon::build(&config, config_dir, event_tx.clone(), shutdown_tx.clone()).await?;
72
73        let d = daemon.clone();
74        let event_loop_join = tokio::spawn(async move {
75            d.handle_events(event_rx).await;
76        });
77
78        Ok(DaemonHandle {
79            config,
80            event_tx,
81            shutdown_tx,
82            daemon,
83            event_loop_join: Some(event_loop_join),
84        })
85    }
86}
87
88/// Handle returned by [`Daemon::start`] — holds the event sender and shutdown trigger.
89///
90/// The caller spawns transports (socket, TCP) using [`setup_socket`] and
91/// [`setup_tcp`], passing clones of `event_tx` and `shutdown_tx`.
92pub struct DaemonHandle {
93    /// The loaded daemon configuration.
94    pub config: DaemonConfig,
95    /// Sender for injecting events into the daemon event loop.
96    /// Clone this and pass to transport setup functions.
97    pub event_tx: DaemonEventSender,
98    /// Broadcast shutdown — call `.subscribe()` for transport shutdown,
99    /// or use [`DaemonHandle::shutdown`] to trigger.
100    pub shutdown_tx: broadcast::Sender<()>,
101    /// The shared daemon state — exposed for backend/product servers that
102    /// layer additional APIs on top.
103    pub daemon: Daemon,
104    event_loop_join: Option<tokio::task::JoinHandle<()>>,
105}
106
107impl DaemonHandle {
108    /// Wait until the active model provider is ready.
109    ///
110    /// No-op for remote providers. Kept for API compatibility.
111    pub async fn wait_until_ready(&self) -> Result<()> {
112        Ok(())
113    }
114
115    /// Trigger graceful shutdown and wait for the event loop to stop.
116    ///
117    /// Transport tasks (socket, channels) are the caller's responsibility.
118    pub async fn shutdown(mut self) -> Result<()> {
119        let _ = self.shutdown_tx.send(());
120        if let Some(join) = self.event_loop_join.take() {
121            join.await?;
122        }
123        Ok(())
124    }
125}
126
127// ── Transport setup helpers ──────────────────────────────────────────
128
129/// Bind the Unix domain socket and spawn the accept loop.
130#[cfg(unix)]
131pub fn setup_socket(
132    shutdown_tx: &broadcast::Sender<()>,
133    event_tx: &DaemonEventSender,
134) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
135    let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
136    if let Some(parent) = resolved_path.parent() {
137        std::fs::create_dir_all(parent)?;
138    }
139    if resolved_path.exists() {
140        std::fs::remove_file(resolved_path)?;
141    }
142
143    let listener = tokio::net::UnixListener::bind(resolved_path)?;
144    tracing::info!("daemon listening on {}", resolved_path.display());
145
146    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
147    let socket_tx = event_tx.clone();
148    let join = tokio::spawn(transport::uds::accept_loop(
149        listener,
150        move |msg, reply| {
151            let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
152        },
153        socket_shutdown,
154    ));
155
156    Ok((resolved_path, join))
157}
158
159/// Bind a TCP listener and spawn the accept loop.
160///
161/// Tries the default port (6688), falls back to an OS-assigned port.
162/// Returns the join handle and the actual port bound.
163pub fn setup_tcp(
164    shutdown_tx: &broadcast::Sender<()>,
165    event_tx: &DaemonEventSender,
166) -> Result<(tokio::task::JoinHandle<()>, u16)> {
167    let (std_listener, addr) = transport::tcp::bind()?;
168    let listener = tokio::net::TcpListener::from_std(std_listener)?;
169    tracing::info!("daemon listening on tcp://{addr}");
170
171    let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
172    let tcp_tx = event_tx.clone();
173    let join = tokio::spawn(transport::tcp::accept_loop(
174        listener,
175        move |msg, reply| {
176            let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
177        },
178        tcp_shutdown,
179    ));
180
181    Ok((join, addr.port()))
182}
183
184/// Bridge a broadcast receiver into a oneshot receiver.
185pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
186    let (otx, orx) = oneshot::channel();
187    tokio::spawn(async move {
188        let _ = rx.recv().await;
189        let _ = otx.send(());
190    });
191    orx
192}