Skip to main content

agent_can/daemon/
lifecycle.rs

1use crate::daemon::config::{DaemonConfig, write_daemon_config};
2use crate::daemon::error::DaemonError;
3use crate::ipc;
4use crate::process::StartupLock;
5use std::io::Read;
6use std::path::{Path, PathBuf};
7use std::process::Stdio;
8use std::time::Duration;
9use tokio::time::sleep;
10
11#[derive(Debug, Clone, Copy, Default)]
12pub struct SessionRegistry;
13
14pub fn runtime_root() -> PathBuf {
15    dirs::home_dir()
16        .unwrap_or_else(|| PathBuf::from("."))
17        .join(".agent-can")
18}
19
20pub fn socket_path() -> PathBuf {
21    runtime_root().join("daemon.sock")
22}
23
24pub fn pid_path() -> PathBuf {
25    runtime_root().join("daemon.pid")
26}
27
28pub fn bootstrap_dir() -> PathBuf {
29    runtime_root().join("bootstrap")
30}
31
32fn startup_lock_path() -> PathBuf {
33    runtime_root().join("locks").join("daemon.lock")
34}
35
36pub async fn ensure_daemon_running() -> Result<(), DaemonError> {
37    SessionRegistry.ensure_running().await
38}
39
40pub async fn bootstrap_daemon(config: &DaemonConfig) -> Result<(), DaemonError> {
41    SessionRegistry.bootstrap(config).await
42}
43
44pub async fn wait_for_daemon_shutdown(timeout: Duration) -> Result<(), DaemonError> {
45    SessionRegistry.wait_for_shutdown(timeout).await
46}
47
48impl SessionRegistry {
49    pub async fn ensure_running(self) -> Result<(), DaemonError> {
50        std::fs::create_dir_all(runtime_root())?;
51        let socket = socket_path();
52        if can_connect(&socket).await {
53            return Ok(());
54        }
55        Err(DaemonError::NotRunning)
56    }
57
58    pub async fn bootstrap(self, config: &DaemonConfig) -> Result<(), DaemonError> {
59        let exe = std::env::current_exe()?;
60        self.bootstrap_with_exe(config, &exe).await
61    }
62
63    pub async fn wait_for_shutdown(self, timeout: Duration) -> Result<(), DaemonError> {
64        let deadline = std::time::Instant::now() + timeout;
65        let socket = socket_path();
66        loop {
67            if !can_connect(&socket).await {
68                return Ok(());
69            }
70            if std::time::Instant::now() >= deadline {
71                return Err(DaemonError::ShutdownTimeout);
72            }
73            sleep(Duration::from_millis(50)).await;
74        }
75    }
76
77    async fn bootstrap_with_exe(
78        self,
79        config: &DaemonConfig,
80        exe: &Path,
81    ) -> Result<(), DaemonError> {
82        std::fs::create_dir_all(runtime_root())?;
83        std::fs::create_dir_all(bootstrap_dir())?;
84        let _startup_lock =
85            StartupLock::acquire(startup_lock_path(), Duration::from_secs(10)).await?;
86        let socket = socket_path();
87        if can_connect(&socket).await {
88            return Err(DaemonError::AlreadyRunning);
89        }
90        let config_path = bootstrap_dir().join(format!("session-{}.json", uuid::Uuid::new_v4()));
91        write_daemon_config(&config_path, config)
92            .map_err(|err| DaemonError::Request(err.to_string()))?;
93        let mut child = match self.spawn_daemon_with_exe(exe, &config_path) {
94            Ok(child) => child,
95            Err(err) => {
96                let _ = std::fs::remove_file(&config_path);
97                return Err(err);
98            }
99        };
100
101        let timeout = Duration::from_secs(5);
102        let mut waited = Duration::from_millis(0);
103        while waited < timeout {
104            if read_pid() == Some(child.id()) && can_connect(&socket).await {
105                let _ = std::fs::remove_file(&config_path);
106                return Ok(());
107            }
108            if let Some(status) = child.try_wait()? {
109                let stderr = if let Some(mut pipe) = child.stderr.take() {
110                    tokio::task::spawn_blocking(move || {
111                        let mut stderr = String::new();
112                        let _ = pipe.read_to_string(&mut stderr);
113                        stderr
114                    })
115                    .await
116                    .unwrap_or_else(|_| String::new())
117                } else {
118                    String::new()
119                };
120                let _ = std::fs::remove_file(&config_path);
121                let details = stderr.trim();
122                let message = if details.is_empty() {
123                    format!("daemon exited with status {status}")
124                } else {
125                    details.to_string()
126                };
127                return Err(DaemonError::StartupFailed(message));
128            }
129            sleep(Duration::from_millis(100)).await;
130            waited += Duration::from_millis(100);
131        }
132        cleanup_bootstrap_timeout(&mut child, &config_path);
133        Err(DaemonError::StartupTimeout)
134    }
135
136    fn spawn_daemon_with_exe(
137        self,
138        exe: &Path,
139        config_path: &Path,
140    ) -> Result<std::process::Child, DaemonError> {
141        let mut command = std::process::Command::new(exe);
142        command
143            .arg("__internal")
144            .arg("session-daemon")
145            .arg("--config-path")
146            .arg(config_path)
147            .stdout(Stdio::null())
148            .stderr(Stdio::piped());
149        Ok(command.spawn()?)
150    }
151}
152
153pub fn read_pid() -> Option<u32> {
154    std::fs::read_to_string(pid_path())
155        .ok()
156        .and_then(|value| value.trim().parse::<u32>().ok())
157}
158
159fn cleanup_bootstrap_timeout(child: &mut std::process::Child, config_path: &Path) {
160    let _ = std::fs::remove_file(config_path);
161    let _ = child.kill();
162    let _ = child.wait();
163}
164
165async fn can_connect(socket: &Path) -> bool {
166    ipc::connect(socket).await.is_ok()
167}