walrus_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 daemon::event::{DaemonEvent, DaemonEventSender},
10 hook::DaemonHook,
11 service::ServiceManager,
12};
13use ::transport::uds::accept_loop;
14use anyhow::Result;
15use model::ProviderRegistry;
16use std::{
17 path::{Path, PathBuf},
18 sync::Arc,
19};
20use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
21use wcore::Runtime;
22
23pub(crate) mod builder;
24pub mod event;
25mod protocol;
26
27#[derive(Clone)]
33pub struct Daemon {
34 pub runtime: Arc<RwLock<Arc<Runtime<ProviderRegistry, DaemonHook>>>>,
36 pub(crate) config_dir: PathBuf,
38 pub(crate) event_tx: DaemonEventSender,
42}
43
44impl Daemon {
45 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
52 let config_path = config_dir.join("walrus.toml");
53 let config = DaemonConfig::load(&config_path)?;
54 tracing::info!("loaded configuration from {}", config_path.display());
55
56 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
57 let (daemon, service_manager) =
58 Daemon::build(&config, config_dir, event_tx.clone()).await?;
59
60 let (shutdown_tx, _) = broadcast::channel::<()>(1);
62 let shutdown_event_tx = event_tx.clone();
63 let mut shutdown_rx = shutdown_tx.subscribe();
64 tokio::spawn(async move {
65 let _ = shutdown_rx.recv().await;
66 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
67 });
68
69 for (name, agent) in &config.agents {
71 if agent.heartbeat.interval == 0 {
72 continue;
73 }
74 let agent_name = name.clone();
75 let heartbeat_tx = event_tx.clone();
76 let mut heartbeat_shutdown = shutdown_tx.subscribe();
77 let interval_secs = agent.heartbeat.interval * 60;
78 tokio::spawn(async move {
79 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
80 tick.tick().await; loop {
82 tokio::select! {
83 _ = tick.tick() => {
84 let event = DaemonEvent::Heartbeat {
85 agent: agent_name.clone(),
86 };
87 if heartbeat_tx.send(event).is_err() {
88 break;
89 }
90 }
91 _ = heartbeat_shutdown.recv() => break,
92 }
93 }
94 });
95 tracing::info!(
96 "heartbeat timer started for '{}' (interval: {}m)",
97 name,
98 agent.heartbeat.interval,
99 );
100 }
101
102 let d = daemon.clone();
103 let event_loop_join = tokio::spawn(async move {
104 d.handle_events(event_rx).await;
105 });
106
107 Ok(DaemonHandle {
108 config,
109 event_tx,
110 shutdown_tx,
111 daemon,
112 event_loop_join: Some(event_loop_join),
113 service_manager,
114 })
115 }
116}
117
118pub struct DaemonHandle {
123 pub config: DaemonConfig,
125 pub event_tx: DaemonEventSender,
128 pub shutdown_tx: broadcast::Sender<()>,
131 #[allow(unused)]
132 daemon: Daemon,
133 event_loop_join: Option<tokio::task::JoinHandle<()>>,
134 service_manager: Option<ServiceManager>,
136}
137
138impl DaemonHandle {
139 pub async fn wait_until_ready(&self) -> Result<()> {
143 Ok(())
144 }
145
146 pub async fn shutdown(mut self) -> Result<()> {
150 if let Some(ref mut sm) = self.service_manager {
152 sm.shutdown_all().await;
153 }
154 let _ = self.shutdown_tx.send(());
155 if let Some(join) = self.event_loop_join.take() {
156 join.await?;
157 }
158 Ok(())
159 }
160}
161
162pub fn setup_socket(
166 shutdown_tx: &broadcast::Sender<()>,
167 event_tx: &DaemonEventSender,
168) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
169 let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
170 if let Some(parent) = resolved_path.parent() {
171 std::fs::create_dir_all(parent)?;
172 }
173 if resolved_path.exists() {
174 std::fs::remove_file(resolved_path)?;
175 }
176
177 let listener = tokio::net::UnixListener::bind(resolved_path)?;
178 tracing::info!("daemon listening on {}", resolved_path.display());
179
180 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
181 let socket_tx = event_tx.clone();
182 let join = tokio::spawn(accept_loop(
183 listener,
184 move |msg, reply| {
185 let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
186 },
187 socket_shutdown,
188 ));
189
190 Ok((resolved_path, join))
191}
192
193pub fn setup_tcp(
198 shutdown_tx: &broadcast::Sender<()>,
199 event_tx: &DaemonEventSender,
200) -> Result<(tokio::task::JoinHandle<()>, u16)> {
201 let (std_listener, addr) = transport::tcp::bind()?;
202 let listener = tokio::net::TcpListener::from_std(std_listener)?;
203 tracing::info!("daemon listening on tcp://{addr}");
204
205 let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
206 let tcp_tx = event_tx.clone();
207 let join = tokio::spawn(transport::tcp::accept_loop(
208 listener,
209 move |msg, reply| {
210 let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
211 },
212 tcp_shutdown,
213 ));
214
215 Ok((join, addr.port()))
216}
217
218pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
220 let (otx, orx) = oneshot::channel();
221 tokio::spawn(async move {
222 let _ = rx.recv().await;
223 let _ = otx.send(());
224 });
225 orx
226}