walrus_daemon/daemon/
mod.rs1use crate::{
8 DaemonConfig,
9 config::{GLOBAL_CONFIG_DIR, scaffold_work_dir},
10 daemon::event::{DaemonEvent, DaemonEventSender},
11 hook::DaemonHook,
12};
13use ::socket::server::accept_loop;
14use anyhow::Result;
15use model::ProviderManager;
16use std::{
17 path::{Path, PathBuf},
18 sync::Arc,
19};
20use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
21use wcore::Runtime;
22use wcore::protocol::message::client::ClientMessage;
23
24pub(crate) mod builder;
25pub mod event;
26mod protocol;
27
28#[derive(Clone)]
34pub struct Daemon {
35 pub runtime: Arc<RwLock<Arc<Runtime<ProviderManager, DaemonHook>>>>,
37 pub(crate) config_dir: PathBuf,
39 pub(crate) event_tx: DaemonEventSender,
43}
44
45impl Daemon {
46 pub async fn start(config_dir: &Path) -> Result<DaemonHandle> {
53 let config_path = config_dir.join("walrus.toml");
54 let config = DaemonConfig::load(&config_path)?;
55 tracing::info!("loaded configuration from {}", config_path.display());
56
57 scaffold_work_dir(&GLOBAL_CONFIG_DIR, config.work_dir.as_deref())?;
58 let (event_tx, event_rx) = mpsc::unbounded_channel::<DaemonEvent>();
59 let daemon = Daemon::build(&config, config_dir, event_tx.clone()).await?;
60
61 let (shutdown_tx, _) = broadcast::channel::<()>(1);
63 let shutdown_event_tx = event_tx.clone();
64 let mut shutdown_rx = shutdown_tx.subscribe();
65 tokio::spawn(async move {
66 let _ = shutdown_rx.recv().await;
67 let _ = shutdown_event_tx.send(DaemonEvent::Shutdown);
68 });
69
70 let d = daemon.clone();
71 let event_loop_join = tokio::spawn(async move {
72 d.handle_events(event_rx).await;
73 });
74
75 Ok(DaemonHandle {
76 config,
77 event_tx,
78 shutdown_tx,
79 event_loop_join: Some(event_loop_join),
80 })
81 }
82}
83
84pub struct DaemonHandle {
89 pub config: DaemonConfig,
91 pub event_tx: DaemonEventSender,
94 pub shutdown_tx: broadcast::Sender<()>,
97 event_loop_join: Option<tokio::task::JoinHandle<()>>,
98}
99
100impl DaemonHandle {
101 pub async fn shutdown(mut self) -> Result<()> {
105 let _ = self.shutdown_tx.send(());
106 if let Some(join) = self.event_loop_join.take() {
107 join.await?;
108 }
109 Ok(())
110 }
111}
112
113pub fn setup_socket(
117 shutdown_tx: &broadcast::Sender<()>,
118 event_tx: &DaemonEventSender,
119) -> Result<(&'static Path, tokio::task::JoinHandle<()>)> {
120 let resolved_path: &'static Path = &crate::config::SOCKET_PATH;
121 if let Some(parent) = resolved_path.parent() {
122 std::fs::create_dir_all(parent)?;
123 }
124 if resolved_path.exists() {
125 std::fs::remove_file(resolved_path)?;
126 }
127
128 let listener = tokio::net::UnixListener::bind(resolved_path)?;
129 tracing::info!("daemon listening on {}", resolved_path.display());
130
131 let socket_shutdown = bridge_shutdown(shutdown_tx.subscribe());
132 let socket_tx = event_tx.clone();
133 let join = tokio::spawn(accept_loop(
134 listener,
135 move |msg, reply| {
136 let _ = socket_tx.send(DaemonEvent::Message { msg, reply });
137 },
138 socket_shutdown,
139 ));
140
141 Ok((resolved_path, join))
142}
143
144pub async fn setup_channels(config: &DaemonConfig, event_tx: &DaemonEventSender) {
146 let tx = event_tx.clone();
147 let on_message = Arc::new(move |msg: ClientMessage| {
148 let tx = tx.clone();
149 async move {
150 let (reply_tx, reply_rx) = mpsc::unbounded_channel();
151 let _ = tx.send(DaemonEvent::Message {
152 msg,
153 reply: reply_tx,
154 });
155 reply_rx
156 }
157 });
158
159 let agents_dir = crate::config::GLOBAL_CONFIG_DIR.join(crate::config::AGENTS_DIR);
161 let default_agent = crate::config::load_agents_dir(&agents_dir)
162 .ok()
163 .and_then(|agents| agents.into_iter().next())
164 .map(|a| a.name)
165 .unwrap_or_else(|| compact_str::CompactString::from("assistant"));
166 channel::spawn_channels(&config.channel, default_agent, on_message).await;
167}
168
169pub fn bridge_shutdown(mut rx: broadcast::Receiver<()>) -> oneshot::Receiver<()> {
171 let (otx, orx) = oneshot::channel();
172 tokio::spawn(async move {
173 let _ = rx.recv().await;
174 let _ = otx.send(());
175 });
176 orx
177}