Skip to main content

kaizen/daemon/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Local daemon client/lifecycle API.
3
4mod capture_status;
5mod proxy_task;
6mod scanner_task;
7mod server;
8mod supervisor;
9mod worker;
10
11use crate::core::paths::kaizen_dir;
12use crate::ipc::{
13    CaptureStatus, ClientHello, ClientKind, DaemonRequest, DaemonResponse, DaemonStatus,
14    ObservedSession, PROTO_VERSION, ProxyEndpoint, ServerHello, read_frame, write_frame,
15};
16use anyhow::{Context, Result, anyhow};
17use std::path::PathBuf;
18use std::process::{Command, Stdio};
19use std::time::{Duration, Instant};
20use tokio::net::UnixStream;
21
22const PID_FILE: &str = "daemon.pid";
23const SOCK_FILE: &str = "daemon.sock";
24const LOG_FILE: &str = "daemon.log";
25const START_WAIT_MS: u64 = 2_000;
26
27#[derive(Debug, Clone)]
28pub struct RuntimePaths {
29    pub dir: PathBuf,
30    pub pid: PathBuf,
31    pub sock: PathBuf,
32    pub log: PathBuf,
33}
34
35#[derive(Debug, Clone)]
36pub struct BackgroundStart {
37    pub pid: u32,
38    pub paths: RuntimePaths,
39    pub already_running: bool,
40}
41
42#[derive(Debug, Clone)]
43pub enum DaemonStatusOutcome {
44    Running(DaemonStatus),
45    Stopped { socket: PathBuf },
46}
47
48pub fn enabled() -> bool {
49    if let Ok(v) = std::env::var("KAIZEN_DAEMON") {
50        return v != "0";
51    }
52    std::env::args()
53        .next()
54        .and_then(|p| PathBuf::from(p).file_stem().map(|s| s.to_owned()))
55        .and_then(|s| s.to_str().map(str::to_string))
56        .is_some_and(|name| name == "kaizen")
57}
58
59pub fn runtime_paths() -> Result<RuntimePaths> {
60    let dir = kaizen_dir().ok_or_else(|| anyhow!("KAIZEN_HOME/HOME not set"))?;
61    Ok(RuntimePaths {
62        pid: dir.join(PID_FILE),
63        sock: dir.join(SOCK_FILE),
64        log: dir.join(LOG_FILE),
65        dir,
66    })
67}
68
69pub fn ensure_running() -> Result<()> {
70    if !enabled() || try_status().is_ok() {
71        return Ok(());
72    }
73    start_background().map(|_| ())
74}
75
76pub fn start_background() -> Result<BackgroundStart> {
77    let paths = runtime_paths()?;
78    if let Ok(status) = try_status() {
79        return Ok(BackgroundStart {
80            pid: status.pid,
81            paths,
82            already_running: true,
83        });
84    }
85    std::fs::create_dir_all(&paths.dir)?;
86    let log = std::fs::OpenOptions::new()
87        .create(true)
88        .append(true)
89        .open(&paths.log)
90        .with_context(|| format!("open daemon log: {}", paths.log.display()))?;
91    let err = log.try_clone()?;
92    let mut child = Command::new(std::env::current_exe()?)
93        .args(["daemon", "start"])
94        .stdin(Stdio::null())
95        .stdout(Stdio::from(log))
96        .stderr(Stdio::from(err))
97        .spawn()
98        .context("spawn kaizen daemon")?;
99    let deadline = Instant::now() + Duration::from_millis(START_WAIT_MS);
100    while Instant::now() < deadline {
101        if let Some(status) = child.try_wait().context("poll daemon child")? {
102            return Err(anyhow!(
103                "daemon exited before ready with status {status}; see {}",
104                paths.log.display()
105            ));
106        }
107        if let Ok(status) = try_status() {
108            return Ok(BackgroundStart {
109                pid: status.pid,
110                paths,
111                already_running: false,
112            });
113        }
114        std::thread::sleep(Duration::from_millis(25));
115    }
116    Err(anyhow!(
117        "daemon did not become ready at {}; see {}",
118        paths.sock.display(),
119        paths.log.display()
120    ))
121}
122
123pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
124    ensure_running()?;
125    tokio::runtime::Runtime::new()?.block_on(request_async(request))
126}
127
128pub fn try_status() -> Result<DaemonStatus> {
129    let response =
130        tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Status))?;
131    match response {
132        DaemonResponse::Status(status) => Ok(status),
133        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
134        _ => Err(anyhow!("unexpected daemon status response")),
135    }
136}
137
138pub fn status_outcome() -> Result<DaemonStatusOutcome> {
139    match try_status() {
140        Ok(status) => Ok(DaemonStatusOutcome::Running(status)),
141        Err(err) if is_daemon_socket_connect_error(&err) => Ok(DaemonStatusOutcome::Stopped {
142            socket: runtime_paths()?.sock,
143        }),
144        Err(err) => Err(err),
145    }
146}
147
148fn is_daemon_socket_connect_error(err: &anyhow::Error) -> bool {
149    err.chain()
150        .any(|cause| cause.to_string().starts_with("connect daemon socket:"))
151}
152
153pub fn start_foreground() -> Result<()> {
154    tokio::runtime::Builder::new_multi_thread()
155        .enable_all()
156        .build()?
157        .block_on(server::run_server())
158}
159
160pub fn stop() -> Result<String> {
161    match tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Stop))? {
162        DaemonResponse::Ack { message } => Ok(message),
163        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
164        _ => Err(anyhow!("unexpected daemon stop response")),
165    }
166}
167
168pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
169    match request_blocking(DaemonRequest::Hello(ClientHello {
170        proto_version: PROTO_VERSION,
171        client,
172        workspace,
173    }))? {
174        DaemonResponse::Hello(hello) => Ok(hello),
175        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
176        _ => Err(anyhow!("unexpected daemon hello response")),
177    }
178}
179
180pub fn ensure_capture_blocking(workspace: String, deep: bool) -> Result<CaptureStatus> {
181    match request_blocking(DaemonRequest::EnsureWorkspaceCapture { workspace, deep })? {
182        DaemonResponse::CaptureStatus(status) => Ok(*status),
183        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
184        _ => Err(anyhow!("unexpected daemon capture response")),
185    }
186}
187
188pub fn ensure_proxy_blocking(workspace: String, provider: String) -> Result<ProxyEndpoint> {
189    match request_blocking(DaemonRequest::EnsureProxy {
190        workspace,
191        provider,
192    })? {
193        DaemonResponse::ProxyEndpoint(endpoint) => Ok(endpoint),
194        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
195        _ => Err(anyhow!("unexpected daemon proxy response")),
196    }
197}
198
199pub fn begin_observed_session_blocking(
200    workspace: String,
201    agent: String,
202) -> Result<ObservedSession> {
203    match request_blocking(DaemonRequest::BeginObservedSession { workspace, agent })? {
204        DaemonResponse::ObservedSession(session) => Ok(session),
205        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
206        _ => Err(anyhow!("unexpected daemon observe response")),
207    }
208}
209
210async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
211    let paths = runtime_paths()?;
212    let mut stream = UnixStream::connect(&paths.sock)
213        .await
214        .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
215    write_frame(&mut stream, &request).await?;
216    read_frame(&mut stream).await
217}