Skip to main content

crabtalk_daemon/daemon/
mod.rs

1//! Daemon — the core struct composing runtime, transports, and lifecycle.
2//!
3//! [`Daemon`] owns the runtime and shared state. [`DaemonHandle`] owns the
4//! spawned tasks and provides graceful shutdown. Transport setup is
5//! decomposed into private helpers called from [`Daemon::start`].
6
7use crate::{
8    DaemonConfig,
9    daemon::event::{DaemonEvent, DaemonEventSender},
10    hook::DaemonHook,
11};
12use ::transport::uds::accept_loop;
13use anyhow::Result;
14use model::ProviderRegistry;
15use std::{
16    path::{Path, PathBuf},
17    sync::Arc,
18};
19use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
20use wcore::Runtime;
21
22pub(crate) mod builder;
23pub mod event;
24mod protocol;
25
26/// Shared daemon state — holds the runtime. Cheap to clone (`Arc`-backed).
27///
28/// The runtime is stored behind `Arc<RwLock<Arc<Runtime>>>` so that
29/// [`Daemon::reload`] can swap it atomically while in-flight requests that
30/// already cloned the inner `Arc` complete normally.
31#[derive(Clone)]
32pub struct Daemon {
33    /// The crabtalk runtime, swappable via [`Daemon::reload`].
34    pub runtime: Arc<RwLock<Arc<Runtime<ProviderRegistry, DaemonHook>>>>,
35    /// Config directory — stored so [`Daemon::reload`] can re-read config from disk.
36    pub(crate) config_dir: PathBuf,
37    /// Sender for the daemon event loop — cloned into `Runtime` as `ToolSender`
38    /// so agents can dispatch tool calls. Stored here so [`Daemon::reload`] can
39    /// pass a fresh clone into the rebuilt runtime.
40    pub(crate) event_tx: DaemonEventSender,
41}
42
43impl Daemon {
44    /// Load config, build runtime, and start the event loop.
45    ///
46    /// Returns a [`DaemonHandle`] with the event sender exposed. The caller
47    /// spawns transports (socket, channels) using the handle's `event_tx`
48    /// and `shutdown_tx`, then integrates its own channels by cloning
49    /// `event_tx` and sending [`DaemonEvent::Message`] variants.
50    pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
51        let config_path = config_dir.join(wcore::paths::CONFIG_FILE);
52        let config = DaemonConfig::load(&config_path)?;
53        tracing::info!("loaded configuration from {}", config_path.display());
54
55        let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
56        let daemon = Daemon::build(&config, config_dir, event_tx.clone()).await?;
57
58        // Broadcast shutdown — all subsystems subscribe.
59        let (shutdown_tx, _) = broadcast::channel::<()>(1);
60        let shutdown_event_tx = event_tx.clone();
61        let mut shutdown_rx = shutdown_tx.subscribe();
62        tokio::spawn(async move {
63            let _ = shutdown_rx.recv().await;
64            let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
65        });
66
67        // Per-agent heartbeat timers — only agents with interval > 0 run.
68        for (name, agent) in &config.agents {
69            if agent.heartbeat.interval == 0 {
70                continue;
71            }
72            let agent_name = name.clone();
73            let heartbeat_tx = event_tx.clone();
74            let mut heartbeat_shutdown = shutdown_tx.subscribe();
75            let interval_secs = agent.heartbeat.interval * 60;
76            tokio::spawn(async move {
77                let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
78                tick.tick().await; // skip the immediate first tick
79                loop {
80                    tokio::select! {
81                        _ = tick.tick() => {
82                            let event = DaemonEvent::Heartbeat {
83                                agent: agent_name.clone(),
84                            };
85                            if heartbeat_tx.send(event).is_err() {
86                                break;
87                            }
88                        }
89                        _ = heartbeat_shutdown.recv() => break,
90                    }
91                }
92            });
93            tracing::info!(
94                "heartbeat timer started for '{}' (interval: {}m)",
95                name,
96                agent.heartbeat.interval,
97            );
98        }
99
100        let d = daemon.clone();
101        let event_loop_join = tokio::spawn(async move {
102            d.handle_events(event_rx).await;
103        });
104
105        Ok(DaemonHandle {
106            config,
107            event_tx,
108            shutdown_tx,
109            daemon,
110            event_loop_join: Some(event_loop_join),
111        })
112    }
113}
114
115/// Handle returned by [`Daemon::start`] — holds the event sender and shutdown trigger.
116///
117/// The caller spawns transports (socket, TCP) using [`setup_socket`] and
118/// [`setup_tcp`], passing clones of `event_tx` and `shutdown_tx`.
119pub struct DaemonHandle {
120    /// The loaded daemon configuration.
121    pub config: DaemonConfig,
122    /// Sender for injecting events into the daemon event loop.
123    /// Clone this and pass to transport setup functions.
124    pub event_tx: DaemonEventSender,
125    /// Broadcast shutdown — call `.subscribe()` for transport shutdown,
126    /// or use [`DaemonHandle::shutdown`] to trigger.
127    pub shutdown_tx: broadcast::Sender<()>,
128    #[allow(unused)]
129    daemon: Daemon,
130    event_loop_join: Option<tokio::task::JoinHandle<()>>,
131}
132
133impl DaemonHandle {
134    /// Wait until the active model provider is ready.
135    ///
136    /// No-op for remote providers. Kept for API compatibility.
137    pub async fn wait_until_ready(&self) -> Result<()> {
138        Ok(())
139    }
140
141    /// Trigger graceful shutdown and wait for the event loop to stop.
142    ///
143    /// Transport tasks (socket, channels) are the caller's responsibility.
144    pub async fn shutdown(mut self) -> Result<()> {
145        let _ = self.shutdown_tx.send(());
146        if let Some(join) = self.event_loop_join.take() {
147            join.await?;
148        }
149        Ok(())
150    }
151}
152
153// ── Transport setup helpers ──────────────────────────────────────────
154
155/// Bind the Unix domain socket and spawn the accept loop.
156pub fn setup_socket(
157    shutdown_tx: &broadcast::Sender<()>,
158    event_tx: &DaemonEventSender,
159) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
160    let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
161    if let Some(parent) = resolved_path.parent() {
162        std::fs::create_dir_all(parent)?;
163    }
164    if resolved_path.exists() {
165        std::fs::remove_file(resolved_path)?;
166    }
167
168    let listener = tokio::net::UnixListener::bind(resolved_path)?;
169    tracing::info!("daemon listening on {}", resolved_path.display());
170
171    let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
172    let socket_tx = event_tx.clone();
173    let join = tokio::spawn(accept_loop(
174        listener,
175        move |msg, reply| {
176            let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
177        },
178        socket_shutdown,
179    ));
180
181    Ok((resolved_path, join))
182}
183
184/// Bind a TCP listener and spawn the accept loop.
185///
186/// Tries the default port (6688), falls back to an OS-assigned port.
187/// Returns the join handle and the actual port bound.
188pub fn setup_tcp(
189    shutdown_tx: &broadcast::Sender<()>,
190    event_tx: &DaemonEventSender,
191) -> Result<(tokio::task::JoinHandle<()>, u16)> {
192    let (std_listener, addr) = transport::tcp::bind()?;
193    let listener = tokio::net::TcpListener::from_std(std_listener)?;
194    tracing::info!("daemon listening on tcp://{addr}");
195
196    let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
197    let tcp_tx = event_tx.clone();
198    let join = tokio::spawn(transport::tcp::accept_loop(
199        listener,
200        move |msg, reply| {
201            let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
202        },
203        tcp_shutdown,
204    ));
205
206    Ok((join, addr.port()))
207}
208
209/// Bridge a broadcast receiver into a oneshot receiver.
210pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
211    let (otx, orx) = oneshot::channel();
212    tokio::spawn(async move {
213        let _ = rx.recv().await;
214        let _ = otx.send(());
215    });
216    orx
217}