walrus_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 daemon::event::{DaemonEvent, DaemonEventSender},
10 hook::DaemonHook,
11};
12use ::socket::server::accept_loop;
13use anyhow::Result;
14use compact_str::CompactString;
15use model::ProviderManager;
16use std::{
17 path::{Path, PathBuf},
18 sync::Arc,
19};
20use tokio::sync::{broadcast, mpsc, oneshot};
21use wcore::Runtime;
22
23pub(crate) mod builder;
24pub(crate) mod event;
25mod protocol;
26
27#[derive(Clone)]
29pub struct Daemon {
30 pub runtime: Arc<Runtime<ProviderManager, DaemonHook>>,
32}
33
34pub struct DaemonHandle {
36 pub socket_path: PathBuf,
38 shutdown_tx: Option<broadcast::Sender<()>>,
39 socket_join: Option<tokio::task::JoinHandle<()>>,
40 event_loop_join: Option<tokio::task::JoinHandle<()>>,
41}
42
43impl DaemonHandle {
44 pub async fn shutdown(mut self) -> Result<()> {
46 if let Some(tx) = self.shutdown_tx.take() {
47 let _ = tx.send(());
48 }
49 if let Some(join) = self.socket_join.take() {
50 join.await?;
51 }
52 if let Some(join) = self.event_loop_join.take() {
53 join.await?;
54 }
55 let _ = std::fs::remove_file(&self.socket_path);
56 Ok(())
57 }
58}
59
60impl Daemon {
61 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
65 let config_path = config_dir.join("walrus.toml");
66 let config = DaemonConfig::load(&config_path)?;
67 tracing::info!("loaded configuration from {}", config_path.display());
68 Self::start_with_config(&config, config_dir).await
69 }
70
71 pub async fn start_with_config(
74 config: &DaemonConfig,
75 config_dir: &Path,
76 ) -> Result<DaemonHandle> {
77 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
78 let runtime = builder::Builder::new(config, config_dir).build().await?;
79 let runtime = Arc::new(runtime);
80 let daemon = Daemon {
81 runtime: Arc::clone(&runtime),
82 };
83
84 let (shutdown_tx, _) = broadcast::channel::<()>(1);
86
87 let shutdown_event_tx = event_tx.clone();
89 let mut shutdown_rx = shutdown_tx.subscribe();
90 tokio::spawn(async move {
91 let _ = shutdown_rx.recv().await;
92 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
93 });
94
95 let (socket_path, socket_join) = setup_socket(&shutdown_tx, &event_tx)?;
96 setup_channels(config, &event_tx).await;
97
98 let d = daemon.clone();
99 let event_loop_join = tokio::spawn(async move {
100 d.handle_events(event_rx).await;
101 });
102
103 Ok(DaemonHandle {
104 socket_path,
105 shutdown_tx: Some(shutdown_tx),
106 socket_join: Some(socket_join),
107 event_loop_join: Some(event_loop_join),
108 })
109 }
110}
111
112fn setup_socket(
116 shutdown_tx: &broadcast::Sender<()>,
117 event_tx: &DaemonEventSender,
118) -> Result<(PathBuf, tokio::task::JoinHandle<()>)> {
119 let resolved_path = crate::config::socket_path();
120 if let Some(parent) = resolved_path.parent() {
121 std::fs::create_dir_all(parent)?;
122 }
123 if resolved_path.exists() {
124 std::fs::remove_file(&resolved_path)?;
125 }
126
127 let listener = tokio::net::UnixListener::bind(&resolved_path)?;
128 tracing::info!("daemon listening on {}", resolved_path.display());
129
130 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
131 let socket_tx = event_tx.clone();
132 let join = tokio::spawn(accept_loop(
133 listener,
134 move |msg, reply| {
135 let _ = socket_tx.send(DaemonEvent::Socket { msg, reply });
136 },
137 socket_shutdown,
138 ));
139
140 Ok((resolved_path, join))
141}
142
143async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
145 let channels = config.channels.values().cloned().collect::<Vec<_>>();
146 let router = channel::build_router(&channels);
147 let router = Arc::new(router);
148 let channel_tx = event_tx.clone();
149 let on_message = Arc::new(move |agent: CompactString, content: String| {
150 let tx = channel_tx.clone();
151 async move {
152 let (reply_tx, reply_rx) = oneshot::channel();
153 let event = DaemonEvent::Channel {
154 agent,
155 content,
156 reply: reply_tx,
157 };
158 if tx.send(event).is_err() {
159 return Err("event loop closed".to_owned());
160 }
161 reply_rx
162 .await
163 .unwrap_or(Err("event loop dropped".to_owned()))
164 }
165 });
166 channel::spawn_channels(&channels, router, on_message).await;
167}
168
169fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
171 let (otx, orx) = oneshot::channel();
172 tokio::spawn(async move {
173 let _ = rx.recv().await;
174 let _ = otx.send(());
175 });
176 orx
177}