crabtalk_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 cron::CronStore,
10 daemon::event::{DaemonEvent, DaemonEventSender},
11 hook::DaemonHook,
12};
13use anyhow::Result;
14use model::ProviderRegistry;
15use std::{
16 path::{Path, PathBuf},
17 sync::Arc,
18};
19use tokio::sync::{Mutex, RwLock, broadcast, mpsc, oneshot};
20use wcore::Runtime;
21
22pub(crate) mod builder;
23pub mod event;
24mod protocol;
25
26#[derive(Clone)]
32pub struct Daemon {
33 pub runtime: Arc<RwLock<Arc<Runtime<ProviderRegistry, DaemonHook>>>>,
35 pub(crate) config_dir: PathBuf,
37 pub(crate) event_tx: DaemonEventSender,
41 pub(crate) started_at: std::time::Instant,
43 pub(crate) crons: Arc<Mutex<CronStore>>,
45}
46
47impl Daemon {
48 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
55 let config_path = config_dir.join(wcore::paths::CONFIG_FILE);
56 let config = DaemonConfig::load(&config_path)?;
57 tracing::info!("loaded configuration from {}", config_path.display());
58
59 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
60
61 let (shutdown_tx, _) = broadcast::channel::<()>(1);
63 let shutdown_event_tx = event_tx.clone();
64 let mut shutdown_rx = shutdown_tx.subscribe();
65 tokio::spawn(async move {
66 let _ = shutdown_rx.recv().await;
67 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
68 });
69
70 let daemon =
71 Daemon::build(&config, config_dir, event_tx.clone(), shutdown_tx.clone()).await?;
72
73 let d = daemon.clone();
74 let event_loop_join = tokio::spawn(async move {
75 d.handle_events(event_rx).await;
76 });
77
78 Ok(DaemonHandle {
79 config,
80 event_tx,
81 shutdown_tx,
82 daemon,
83 event_loop_join: Some(event_loop_join),
84 })
85 }
86}
87
88pub struct DaemonHandle {
93 pub config: DaemonConfig,
95 pub event_tx: DaemonEventSender,
98 pub shutdown_tx: broadcast::Sender<()>,
101 pub daemon: Daemon,
104 event_loop_join: Option<tokio::task::JoinHandle<()>>,
105}
106
107impl DaemonHandle {
108 pub async fn wait_until_ready(&self) -> Result<()> {
112 Ok(())
113 }
114
115 pub async fn shutdown(mut self) -> Result<()> {
119 let _ = self.shutdown_tx.send(());
120 if let Some(join) = self.event_loop_join.take() {
121 join.await?;
122 }
123 Ok(())
124 }
125}
126
127#[cfg(unix)]
131pub fn setup_socket(
132 shutdown_tx: &broadcast::Sender<()>,
133 event_tx: &DaemonEventSender,
134) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
135 let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
136 if let Some(parent) = resolved_path.parent() {
137 std::fs::create_dir_all(parent)?;
138 }
139 if resolved_path.exists() {
140 std::fs::remove_file(resolved_path)?;
141 }
142
143 let listener = tokio::net::UnixListener::bind(resolved_path)?;
144 tracing::info!("daemon listening on {}", resolved_path.display());
145
146 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
147 let socket_tx = event_tx.clone();
148 let join = tokio::spawn(transport::uds::accept_loop(
149 listener,
150 move |msg, reply| {
151 let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
152 },
153 socket_shutdown,
154 ));
155
156 Ok((resolved_path, join))
157}
158
159pub fn setup_tcp(
164 shutdown_tx: &broadcast::Sender<()>,
165 event_tx: &DaemonEventSender,
166) -> Result<(tokio::task::JoinHandle<()>, u16)> {
167 let (std_listener, addr) = transport::tcp::bind()?;
168 let listener = tokio::net::TcpListener::from_std(std_listener)?;
169 tracing::info!("daemon listening on tcp://{addr}");
170
171 let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
172 let tcp_tx = event_tx.clone();
173 let join = tokio::spawn(transport::tcp::accept_loop(
174 listener,
175 move |msg, reply| {
176 let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
177 },
178 tcp_shutdown,
179 ));
180
181 Ok((join, addr.port()))
182}
183
184pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
186 let (otx, orx) = oneshot::channel();
187 tokio::spawn(async move {
188 let _ = rx.recv().await;
189 let _ = otx.send(());
190 });
191 orx
192}