crabtalk_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 daemon::event::{DaemonEvent, DaemonEventSender},
10 hook::DaemonHook,
11};
12use ::transport::uds::accept_loop;
13use anyhow::Result;
14use model::ProviderRegistry;
15use std::{
16 path::{Path, PathBuf},
17 sync::Arc,
18};
19use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
20use wcore::Runtime;
21
22pub(crate) mod builder;
23pub mod event;
24mod protocol;
25
26#[derive(Clone)]
32pub struct Daemon {
33 pub runtime: Arc<RwLock<Arc<Runtime<ProviderRegistry, DaemonHook>>>>,
35 pub(crate) config_dir: PathBuf,
37 pub(crate) event_tx: DaemonEventSender,
41}
42
43impl Daemon {
44 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
51 let config_path = config_dir.join(wcore::paths::CONFIG_FILE);
52 let config = DaemonConfig::load(&config_path)?;
53 tracing::info!("loaded configuration from {}", config_path.display());
54
55 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
56 let daemon = Daemon::build(&config, config_dir, event_tx.clone()).await?;
57
58 let (shutdown_tx, _) = broadcast::channel::<()>(1);
60 let shutdown_event_tx = event_tx.clone();
61 let mut shutdown_rx = shutdown_tx.subscribe();
62 tokio::spawn(async move {
63 let _ = shutdown_rx.recv().await;
64 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
65 });
66
67 for (name, agent) in &config.agents {
69 if agent.heartbeat.interval == 0 {
70 continue;
71 }
72 let agent_name = name.clone();
73 let heartbeat_tx = event_tx.clone();
74 let mut heartbeat_shutdown = shutdown_tx.subscribe();
75 let interval_secs = agent.heartbeat.interval * 60;
76 tokio::spawn(async move {
77 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
78 tick.tick().await; loop {
80 tokio::select! {
81 _ = tick.tick() => {
82 let event = DaemonEvent::Heartbeat {
83 agent: agent_name.clone(),
84 };
85 if heartbeat_tx.send(event).is_err() {
86 break;
87 }
88 }
89 _ = heartbeat_shutdown.recv() => break,
90 }
91 }
92 });
93 tracing::info!(
94 "heartbeat timer started for '{}' (interval: {}m)",
95 name,
96 agent.heartbeat.interval,
97 );
98 }
99
100 let d = daemon.clone();
101 let event_loop_join = tokio::spawn(async move {
102 d.handle_events(event_rx).await;
103 });
104
105 Ok(DaemonHandle {
106 config,
107 event_tx,
108 shutdown_tx,
109 daemon,
110 event_loop_join: Some(event_loop_join),
111 })
112 }
113}
114
115pub struct DaemonHandle {
120 pub config: DaemonConfig,
122 pub event_tx: DaemonEventSender,
125 pub shutdown_tx: broadcast::Sender<()>,
128 #[allow(unused)]
129 daemon: Daemon,
130 event_loop_join: Option<tokio::task::JoinHandle<()>>,
131}
132
133impl DaemonHandle {
134 pub async fn wait_until_ready(&self) -> Result<()> {
138 Ok(())
139 }
140
141 pub async fn shutdown(mut self) -> Result<()> {
145 let _ = self.shutdown_tx.send(());
146 if let Some(join) = self.event_loop_join.take() {
147 join.await?;
148 }
149 Ok(())
150 }
151}
152
153pub fn setup_socket(
157 shutdown_tx: &broadcast::Sender<()>,
158 event_tx: &DaemonEventSender,
159) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
160 let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
161 if let Some(parent) = resolved_path.parent() {
162 std::fs::create_dir_all(parent)?;
163 }
164 if resolved_path.exists() {
165 std::fs::remove_file(resolved_path)?;
166 }
167
168 let listener = tokio::net::UnixListener::bind(resolved_path)?;
169 tracing::info!("daemon listening on {}", resolved_path.display());
170
171 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
172 let socket_tx = event_tx.clone();
173 let join = tokio::spawn(accept_loop(
174 listener,
175 move |msg, reply| {
176 let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
177 },
178 socket_shutdown,
179 ));
180
181 Ok((resolved_path, join))
182}
183
184pub fn setup_tcp(
189 shutdown_tx: &broadcast::Sender<()>,
190 event_tx: &DaemonEventSender,
191) -> Result<(tokio::task::JoinHandle<()>, u16)> {
192 let (std_listener, addr) = transport::tcp::bind()?;
193 let listener = tokio::net::TcpListener::from_std(std_listener)?;
194 tracing::info!("daemon listening on tcp://{addr}");
195
196 let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
197 let tcp_tx = event_tx.clone();
198 let join = tokio::spawn(transport::tcp::accept_loop(
199 listener,
200 move |msg, reply| {
201 let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
202 },
203 tcp_shutdown,
204 ));
205
206 Ok((join, addr.port()))
207}
208
209pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
211 let (otx, orx) = oneshot::channel();
212 tokio::spawn(async move {
213 let _ = rx.recv().await;
214 let _ = otx.send(());
215 });
216 orx
217}