walrus_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 daemon::event::{DaemonEvent, DaemonEventSender},
10 hook::DaemonHook,
11 service::ServiceManager,
12};
13use ::socket::server::accept_loop;
14use anyhow::Result;
15use model::ProviderManager;
16use std::{
17 collections::BTreeMap,
18 path::{Path, PathBuf},
19 sync::Arc,
20};
21use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
22use wcore::AgentConfig;
23use wcore::Runtime;
24
25pub(crate) mod builder;
26pub mod event;
27mod protocol;
28
29#[derive(Clone)]
35pub struct Daemon {
36 pub runtime: Arc<RwLock<Arc<Runtime<ProviderManager, DaemonHook>>>>,
38 pub(crate) config_dir: PathBuf,
40 pub(crate) event_tx: DaemonEventSender,
44 pub(crate) agents_config: BTreeMap<String, AgentConfig>,
46}
47
48impl Daemon {
49 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
56 let config_path = config_dir.join("walrus.toml");
57 let config = DaemonConfig::load(&config_path)?;
58 tracing::info!("loaded configuration from {}", config_path.display());
59
60 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
61 let (daemon, service_manager) =
62 Daemon::build(&config, config_dir, event_tx.clone()).await?;
63
64 let (shutdown_tx, _) = broadcast::channel::<()>(1);
66 let shutdown_event_tx = event_tx.clone();
67 let mut shutdown_rx = shutdown_tx.subscribe();
68 tokio::spawn(async move {
69 let _ = shutdown_rx.recv().await;
70 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
71 });
72
73 for (name, agent) in &config.agents {
75 if agent.heartbeat.interval == 0 {
76 continue;
77 }
78 let agent_name = compact_str::CompactString::from(name.as_str());
79 let heartbeat_tx = event_tx.clone();
80 let mut heartbeat_shutdown = shutdown_tx.subscribe();
81 let interval_secs = agent.heartbeat.interval * 60;
82 tokio::spawn(async move {
83 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
84 tick.tick().await; loop {
86 tokio::select! {
87 _ = tick.tick() => {
88 let event = DaemonEvent::Heartbeat {
89 agent: agent_name.clone(),
90 };
91 if heartbeat_tx.send(event).is_err() {
92 break;
93 }
94 }
95 _ = heartbeat_shutdown.recv() => break,
96 }
97 }
98 });
99 tracing::info!(
100 "heartbeat timer started for '{}' (interval: {}m)",
101 name,
102 agent.heartbeat.interval,
103 );
104 }
105
106 let d = daemon.clone();
107 let event_loop_join = tokio::spawn(async move {
108 d.handle_events(event_rx).await;
109 });
110
111 Ok(DaemonHandle {
112 config,
113 event_tx,
114 shutdown_tx,
115 daemon,
116 event_loop_join: Some(event_loop_join),
117 service_manager,
118 })
119 }
120}
121
122pub struct DaemonHandle {
127 pub config: DaemonConfig,
129 pub event_tx: DaemonEventSender,
132 pub shutdown_tx: broadcast::Sender<()>,
135 #[allow(unused)]
136 daemon: Daemon,
137 event_loop_join: Option<tokio::task::JoinHandle<()>>,
138 service_manager: Option<ServiceManager>,
140}
141
142impl DaemonHandle {
143 pub async fn wait_until_ready(&self) -> Result<()> {
147 Ok(())
148 }
149
150 pub async fn shutdown(mut self) -> Result<()> {
154 if let Some(ref mut sm) = self.service_manager {
156 sm.shutdown_all().await;
157 }
158 let _ = self.shutdown_tx.send(());
159 if let Some(join) = self.event_loop_join.take() {
160 join.await?;
161 }
162 Ok(())
163 }
164}
165
166pub fn setup_socket(
170 shutdown_tx: &broadcast::Sender<()>,
171 event_tx: &DaemonEventSender,
172) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
173 let resolved_path: &'static Path = &wcore::paths::SOCKET_PATH;
174 if let Some(parent) = resolved_path.parent() {
175 std::fs::create_dir_all(parent)?;
176 }
177 if resolved_path.exists() {
178 std::fs::remove_file(resolved_path)?;
179 }
180
181 let listener = tokio::net::UnixListener::bind(resolved_path)?;
182 tracing::info!("daemon listening on {}", resolved_path.display());
183
184 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
185 let socket_tx = event_tx.clone();
186 let join = tokio::spawn(accept_loop(
187 listener,
188 move |msg, reply| {
189 let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
190 },
191 socket_shutdown,
192 ));
193
194 Ok((resolved_path, join))
195}
196
197pub fn setup_tcp(
202 shutdown_tx: &broadcast::Sender<()>,
203 event_tx: &DaemonEventSender,
204) -> Result<(tokio::task::JoinHandle<()>, u16)> {
205 let (std_listener, addr) = tcp::server::bind()?;
206 let listener = tokio::net::TcpListener::from_std(std_listener)?;
207 tracing::info!("daemon listening on tcp://{addr}");
208
209 let tcp_shutdown = bridge_shutdown(shutdown_tx.subscribe());
210 let tcp_tx = event_tx.clone();
211 let join = tokio::spawn(tcp::server::accept_loop(
212 listener,
213 move |msg, reply| {
214 let _ = tcp_tx.send(DaemonEvent::Message { msg, reply });
215 },
216 tcp_shutdown,
217 ));
218
219 Ok((join, addr.port()))
220}
221
222pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
224 let (otx, orx) = oneshot::channel();
225 tokio::spawn(async move {
226 let _ = rx.recv().await;
227 let _ = otx.send(());
228 });
229 orx
230}