walrus_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 daemon::event::{DaemonEvent, DaemonEventSender},
10 hook::DaemonHook,
11};
12use anyhow::Result;
13use compact_str::CompactString;
14use model::ProviderManager;
15use runtime::Runtime;
16use std::{
17 path::{Path, PathBuf},
18 sync::Arc,
19};
20use tokio::sync::{broadcast, mpsc, oneshot};
21
22pub(crate) mod builder;
23pub(crate) mod event;
24mod protocol;
25
26#[derive(Clone)]
28pub struct Daemon {
29 pub runtime: Arc<Runtime<ProviderManager, DaemonHook>>,
31}
32
33pub struct DaemonHandle {
35 pub socket_path: PathBuf,
37 shutdown_tx: Option<broadcast::Sender<()>>,
38 socket_join: Option<tokio::task::JoinHandle<()>>,
39 event_loop_join: Option<tokio::task::JoinHandle<()>>,
40}
41
42impl DaemonHandle {
43 pub async fn shutdown(mut self) -> Result<()> {
45 if let Some(tx) = self.shutdown_tx.take() {
46 let _ = tx.send(());
47 }
48 if let Some(join) = self.socket_join.take() {
49 join.await?;
50 }
51 if let Some(join) = self.event_loop_join.take() {
52 join.await?;
53 }
54 let _ = std::fs::remove_file(&self.socket_path);
55 Ok(())
56 }
57}
58
59impl Daemon {
60 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
64 let config_path = config_dir.join("walrus.toml");
65 let config = DaemonConfig::load(&config_path)?;
66 tracing::info!("loaded configuration from {}", config_path.display());
67 Self::start_with_config(&config, config_dir).await
68 }
69
70 pub async fn start_with_config(
73 config: &DaemonConfig,
74 config_dir: &Path,
75 ) -> Result<DaemonHandle> {
76 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
77
78 let runtime = builder::Builder::new(config, config_dir, event_tx.clone())
79 .build()
80 .await?;
81 let runtime = Arc::new(runtime);
82 let daemon = Daemon {
83 runtime: Arc::clone(&runtime),
84 };
85
86 let (shutdown_tx, _) = broadcast::channel::<()>(1);
88
89 let shutdown_event_tx = event_tx.clone();
91 let mut shutdown_rx = shutdown_tx.subscribe();
92 tokio::spawn(async move {
93 let _ = shutdown_rx.recv().await;
94 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
95 });
96
97 let (socket_path, socket_join) = setup_socket(&shutdown_tx, &event_tx)?;
98 setup_channels(config, &event_tx).await;
99 let cron_add_tx = setup_cron(&runtime, &shutdown_tx, &event_tx).await;
100
101 let d = daemon.clone();
102 let event_loop_join = tokio::spawn(async move {
103 d.handle_events(event_rx, cron_add_tx).await;
104 });
105
106 Ok(DaemonHandle {
107 socket_path,
108 shutdown_tx: Some(shutdown_tx),
109 socket_join: Some(socket_join),
110 event_loop_join: Some(event_loop_join),
111 })
112 }
113}
114
115fn setup_socket(
119 shutdown_tx: &broadcast::Sender<()>,
120 event_tx: &DaemonEventSender,
121) -> Result<(PathBuf, tokio::task::JoinHandle<()>)> {
122 let resolved_path = crate::config::socket_path();
123 if let Some(parent) = resolved_path.parent() {
124 std::fs::create_dir_all(parent)?;
125 }
126 if resolved_path.exists() {
127 std::fs::remove_file(&resolved_path)?;
128 }
129
130 let listener = tokio::net::UnixListener::bind(&resolved_path)?;
131 tracing::info!("daemon listening on {}", resolved_path.display());
132
133 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
134 let socket_tx = event_tx.clone();
135 let join = tokio::spawn(socket::server::accept_loop(
136 listener,
137 move |msg, reply| {
138 let _ = socket_tx.send(DaemonEvent::Socket { msg, reply });
139 },
140 socket_shutdown,
141 ));
142
143 Ok((resolved_path, join))
144}
145
146async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
148 let router = router::build_router(&config.channels);
149 let router = Arc::new(router);
150 let channel_tx = event_tx.clone();
151 let on_message = Arc::new(move |agent: CompactString, content: String| {
152 let tx = channel_tx.clone();
153 async move {
154 let (reply_tx, reply_rx) = oneshot::channel();
155 let event = DaemonEvent::Channel {
156 agent,
157 content,
158 reply: reply_tx,
159 };
160 if tx.send(event).is_err() {
161 return Err("event loop closed".to_owned());
162 }
163 reply_rx
164 .await
165 .unwrap_or(Err("event loop dropped".to_owned()))
166 }
167 });
168 router::spawn_channels(&config.channels, router, on_message).await;
169}
170
171async fn setup_cron(
173 runtime: &Arc<Runtime<ProviderManager, DaemonHook>>,
174 shutdown_tx: &broadcast::Sender<()>,
175 event_tx: &DaemonEventSender,
176) -> mpsc::UnboundedSender<wcron::CronJob> {
177 let cron_jobs = runtime.hook.cron.jobs().await;
178 let cron_tx = event_tx.clone();
179 wcron::spawn_with_callback(
180 cron_jobs,
181 move |job| {
182 let tx = cron_tx.clone();
183 async move {
184 let _ = tx.send(DaemonEvent::Cron {
185 agent: job.agent.clone(),
186 content: job.message.clone(),
187 job_name: job.name.clone(),
188 });
189 }
190 },
191 shutdown_tx.subscribe(),
192 )
193}
194
195fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
197 let (otx, orx) = oneshot::channel();
198 tokio::spawn(async move {
199 let _ = rx.recv().await;
200 let _ = otx.send(());
201 });
202 orx
203}