Skip to main content

agent_can/daemon/
lifecycle.rs

1use crate::daemon::config::{DaemonConfig, write_daemon_config};
2use crate::daemon::error::DaemonError;
3use crate::ipc;
4use crate::name::validate_bus_name;
5use crate::process::StartupLock;
6use std::io::Read;
7use std::path::{Path, PathBuf};
8use std::process::Stdio;
9use std::time::Duration;
10use tokio::time::sleep;
11
12#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
13pub struct BusRuntimeInfo {
14    pub bus: String,
15    pub socket_path: PathBuf,
16    pub running: bool,
17}
18
19#[derive(Debug, Clone, Copy, Default)]
20pub struct BusRegistry;
21
22pub fn runtime_root() -> PathBuf {
23    if let Some(path) = std::env::var_os("AGENT_CAN_HOME") {
24        return PathBuf::from(path);
25    }
26
27    dirs::home_dir()
28        .unwrap_or_else(|| PathBuf::from("."))
29        .join(".agent-can")
30}
31
32pub fn socket_path(bus: &str) -> PathBuf {
33    runtime_root().join(format!("{bus}.sock"))
34}
35
36pub fn pid_path(bus: &str) -> PathBuf {
37    runtime_root().join(format!("{bus}.pid"))
38}
39
40pub fn bootstrap_dir() -> PathBuf {
41    runtime_root().join("bootstrap")
42}
43
44fn startup_lock_path(bus: &str) -> PathBuf {
45    runtime_root()
46        .join("locks")
47        .join(format!("{bus}.daemon.lock"))
48}
49
50pub async fn ensure_daemon_running(bus: &str) -> Result<(), DaemonError> {
51    BusRegistry.ensure_running(bus).await
52}
53
54pub async fn bootstrap_daemon(bus: &str, config: &DaemonConfig) -> Result<(), DaemonError> {
55    BusRegistry.bootstrap(bus, config).await
56}
57
58impl BusRegistry {
59    pub async fn ensure_running(self, bus: &str) -> Result<(), DaemonError> {
60        validate_bus_name(bus).map_err(DaemonError::Request)?;
61        std::fs::create_dir_all(runtime_root())?;
62        let socket = socket_path(bus);
63        if can_connect(&socket).await {
64            return Ok(());
65        }
66        Err(DaemonError::NotRunning(bus.to_string()))
67    }
68
69    pub async fn bootstrap(self, bus: &str, config: &DaemonConfig) -> Result<(), DaemonError> {
70        let exe = std::env::current_exe()?;
71        self.bootstrap_with_exe(bus, config, &exe).await
72    }
73
74    async fn bootstrap_with_exe(
75        self,
76        bus: &str,
77        config: &DaemonConfig,
78        exe: &Path,
79    ) -> Result<(), DaemonError> {
80        validate_bus_name(bus).map_err(DaemonError::Request)?;
81        std::fs::create_dir_all(runtime_root())?;
82        std::fs::create_dir_all(bootstrap_dir())?;
83        let _startup_lock =
84            StartupLock::acquire(startup_lock_path(bus), Duration::from_secs(10)).await?;
85        let socket = socket_path(bus);
86        if can_connect(&socket).await {
87            return Err(DaemonError::AlreadyRunning(bus.to_string()));
88        }
89        let config_path = bootstrap_dir().join(format!("{bus}-{}.json", uuid::Uuid::new_v4()));
90        write_daemon_config(&config_path, config)
91            .map_err(|err| DaemonError::Request(err.to_string()))?;
92        let mut child = match self.spawn_daemon_with_exe(exe, bus, &config_path) {
93            Ok(child) => child,
94            Err(err) => {
95                let _ = std::fs::remove_file(&config_path);
96                return Err(err);
97            }
98        };
99
100        let timeout = Duration::from_secs(5);
101        let mut waited = Duration::from_millis(0);
102        while waited < timeout {
103            if read_pid(bus) == Some(child.id()) && can_connect(&socket).await {
104                let _ = std::fs::remove_file(&config_path);
105                return Ok(());
106            }
107            if let Some(status) = child.try_wait()? {
108                let stderr = if let Some(mut pipe) = child.stderr.take() {
109                    tokio::task::spawn_blocking(move || {
110                        let mut stderr = String::new();
111                        let _ = pipe.read_to_string(&mut stderr);
112                        stderr
113                    })
114                    .await
115                    .unwrap_or_else(|_| String::new())
116                } else {
117                    String::new()
118                };
119                let _ = std::fs::remove_file(&config_path);
120                let details = stderr.trim();
121                let message = if details.is_empty() {
122                    format!("daemon exited with status {status}")
123                } else {
124                    details.to_string()
125                };
126                return Err(DaemonError::StartupFailed(message));
127            }
128            sleep(Duration::from_millis(100)).await;
129            waited += Duration::from_millis(100);
130        }
131        cleanup_bootstrap_timeout(&mut child, &config_path);
132        Err(DaemonError::StartupTimeout)
133    }
134
135    fn spawn_daemon_with_exe(
136        self,
137        exe: &Path,
138        bus: &str,
139        config_path: &Path,
140    ) -> Result<std::process::Child, DaemonError> {
141        let mut command = std::process::Command::new(exe);
142        command
143            .arg("__internal")
144            .arg("bus-daemon")
145            .arg("--bus")
146            .arg(bus)
147            .arg("--config-path")
148            .arg(config_path)
149            .stdout(Stdio::null())
150            .stderr(Stdio::piped());
151        Ok(command.spawn()?)
152    }
153
154    pub async fn list_buses(self) -> Result<Vec<BusRuntimeInfo>, DaemonError> {
155        let root = runtime_root();
156        std::fs::create_dir_all(&root)?;
157        let mut out = Vec::new();
158        for entry in std::fs::read_dir(root)? {
159            let entry = entry?;
160            let path = entry.path();
161            if path.extension().and_then(|value| value.to_str()) != Some("sock") {
162                continue;
163            }
164            let Some(stem) = path.file_stem().and_then(|value| value.to_str()) else {
165                continue;
166            };
167            out.push(BusRuntimeInfo {
168                bus: stem.to_string(),
169                running: can_connect(&path).await,
170                socket_path: path,
171            });
172        }
173        out.sort_by(|lhs, rhs| lhs.bus.cmp(&rhs.bus));
174        Ok(out)
175    }
176}
177
178pub async fn list_buses() -> Result<Vec<BusRuntimeInfo>, DaemonError> {
179    BusRegistry.list_buses().await
180}
181
182pub fn cleanup_runtime_artifacts(bus: &str) {
183    ipc::cleanup_endpoint(&socket_path(bus));
184    let pid = pid_path(bus);
185    if pid.exists() {
186        let _ = std::fs::remove_file(pid);
187    }
188}
189
190pub fn read_pid(bus: &str) -> Option<u32> {
191    std::fs::read_to_string(pid_path(bus))
192        .ok()
193        .and_then(|value| value.trim().parse::<u32>().ok())
194}
195
196fn cleanup_bootstrap_timeout(child: &mut std::process::Child, config_path: &Path) {
197    let _ = std::fs::remove_file(config_path);
198    let _ = child.kill();
199    let _ = child.wait();
200}
201
202async fn can_connect(socket: &Path) -> bool {
203    ipc::connect(socket).await.is_ok()
204}