agent_can/daemon/
lifecycle.rs1use crate::daemon::config::{DaemonConfig, write_daemon_config};
2use crate::daemon::error::DaemonError;
3use crate::ipc;
4use crate::name::validate_bus_name;
5use crate::process::StartupLock;
6use std::io::Read;
7use std::path::{Path, PathBuf};
8use std::process::Stdio;
9use std::time::Duration;
10use tokio::time::sleep;
11
12#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
13pub struct BusRuntimeInfo {
14 pub bus: String,
15 pub socket_path: PathBuf,
16 pub running: bool,
17}
18
19#[derive(Debug, Clone, Copy, Default)]
20pub struct BusRegistry;
21
22pub fn runtime_root() -> PathBuf {
23 if let Some(path) = std::env::var_os("AGENT_CAN_HOME") {
24 return PathBuf::from(path);
25 }
26
27 dirs::home_dir()
28 .unwrap_or_else(|| PathBuf::from("."))
29 .join(".agent-can")
30}
31
32pub fn socket_path(bus: &str) -> PathBuf {
33 runtime_root().join(format!("{bus}.sock"))
34}
35
36pub fn pid_path(bus: &str) -> PathBuf {
37 runtime_root().join(format!("{bus}.pid"))
38}
39
40pub fn bootstrap_dir() -> PathBuf {
41 runtime_root().join("bootstrap")
42}
43
44fn startup_lock_path(bus: &str) -> PathBuf {
45 runtime_root()
46 .join("locks")
47 .join(format!("{bus}.daemon.lock"))
48}
49
50pub async fn ensure_daemon_running(bus: &str) -> Result<(), DaemonError> {
51 BusRegistry.ensure_running(bus).await
52}
53
54pub async fn bootstrap_daemon(bus: &str, config: &DaemonConfig) -> Result<(), DaemonError> {
55 BusRegistry.bootstrap(bus, config).await
56}
57
58impl BusRegistry {
59 pub async fn ensure_running(self, bus: &str) -> Result<(), DaemonError> {
60 validate_bus_name(bus).map_err(DaemonError::Request)?;
61 std::fs::create_dir_all(runtime_root())?;
62 let socket = socket_path(bus);
63 if can_connect(&socket).await {
64 return Ok(());
65 }
66 Err(DaemonError::NotRunning(bus.to_string()))
67 }
68
69 pub async fn bootstrap(self, bus: &str, config: &DaemonConfig) -> Result<(), DaemonError> {
70 let exe = std::env::current_exe()?;
71 self.bootstrap_with_exe(bus, config, &exe).await
72 }
73
74 async fn bootstrap_with_exe(
75 self,
76 bus: &str,
77 config: &DaemonConfig,
78 exe: &Path,
79 ) -> Result<(), DaemonError> {
80 validate_bus_name(bus).map_err(DaemonError::Request)?;
81 std::fs::create_dir_all(runtime_root())?;
82 std::fs::create_dir_all(bootstrap_dir())?;
83 let _startup_lock =
84 StartupLock::acquire(startup_lock_path(bus), Duration::from_secs(10)).await?;
85 let socket = socket_path(bus);
86 if can_connect(&socket).await {
87 return Err(DaemonError::AlreadyRunning(bus.to_string()));
88 }
89 let config_path = bootstrap_dir().join(format!("{bus}-{}.json", uuid::Uuid::new_v4()));
90 write_daemon_config(&config_path, config)
91 .map_err(|err| DaemonError::Request(err.to_string()))?;
92 let mut child = match self.spawn_daemon_with_exe(exe, bus, &config_path) {
93 Ok(child) => child,
94 Err(err) => {
95 let _ = std::fs::remove_file(&config_path);
96 return Err(err);
97 }
98 };
99
100 let timeout = Duration::from_secs(5);
101 let mut waited = Duration::from_millis(0);
102 while waited < timeout {
103 if read_pid(bus) == Some(child.id()) && can_connect(&socket).await {
104 let _ = std::fs::remove_file(&config_path);
105 return Ok(());
106 }
107 if let Some(status) = child.try_wait()? {
108 let stderr = if let Some(mut pipe) = child.stderr.take() {
109 tokio::task::spawn_blocking(move || {
110 let mut stderr = String::new();
111 let _ = pipe.read_to_string(&mut stderr);
112 stderr
113 })
114 .await
115 .unwrap_or_else(|_| String::new())
116 } else {
117 String::new()
118 };
119 let _ = std::fs::remove_file(&config_path);
120 let details = stderr.trim();
121 let message = if details.is_empty() {
122 format!("daemon exited with status {status}")
123 } else {
124 details.to_string()
125 };
126 return Err(DaemonError::StartupFailed(message));
127 }
128 sleep(Duration::from_millis(100)).await;
129 waited += Duration::from_millis(100);
130 }
131 cleanup_bootstrap_timeout(&mut child, &config_path);
132 Err(DaemonError::StartupTimeout)
133 }
134
135 fn spawn_daemon_with_exe(
136 self,
137 exe: &Path,
138 bus: &str,
139 config_path: &Path,
140 ) -> Result<std::process::Child, DaemonError> {
141 let mut command = std::process::Command::new(exe);
142 command
143 .arg("__internal")
144 .arg("bus-daemon")
145 .arg("--bus")
146 .arg(bus)
147 .arg("--config-path")
148 .arg(config_path)
149 .stdout(Stdio::null())
150 .stderr(Stdio::piped());
151 Ok(command.spawn()?)
152 }
153
154 pub async fn list_buses(self) -> Result<Vec<BusRuntimeInfo>, DaemonError> {
155 let root = runtime_root();
156 std::fs::create_dir_all(&root)?;
157 let mut out = Vec::new();
158 for entry in std::fs::read_dir(root)? {
159 let entry = entry?;
160 let path = entry.path();
161 if path.extension().and_then(|value| value.to_str()) != Some("sock") {
162 continue;
163 }
164 let Some(stem) = path.file_stem().and_then(|value| value.to_str()) else {
165 continue;
166 };
167 out.push(BusRuntimeInfo {
168 bus: stem.to_string(),
169 running: can_connect(&path).await,
170 socket_path: path,
171 });
172 }
173 out.sort_by(|lhs, rhs| lhs.bus.cmp(&rhs.bus));
174 Ok(out)
175 }
176}
177
178pub async fn list_buses() -> Result<Vec<BusRuntimeInfo>, DaemonError> {
179 BusRegistry.list_buses().await
180}
181
182pub fn cleanup_runtime_artifacts(bus: &str) {
183 ipc::cleanup_endpoint(&socket_path(bus));
184 let pid = pid_path(bus);
185 if pid.exists() {
186 let _ = std::fs::remove_file(pid);
187 }
188}
189
190pub fn read_pid(bus: &str) -> Option<u32> {
191 std::fs::read_to_string(pid_path(bus))
192 .ok()
193 .and_then(|value| value.trim().parse::<u32>().ok())
194}
195
196fn cleanup_bootstrap_timeout(child: &mut std::process::Child, config_path: &Path) {
197 let _ = std::fs::remove_file(config_path);
198 let _ = child.kill();
199 let _ = child.wait();
200}
201
202async fn can_connect(socket: &Path) -> bool {
203 ipc::connect(socket).await.is_ok()
204}