Skip to main content

kaizen/daemon/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Local daemon client/lifecycle API.
3
4mod capture_status;
5mod proxy_task;
6mod scanner_task;
7mod server;
8mod supervisor;
9mod worker;
10
11use crate::core::paths::kaizen_dir;
12use crate::ipc::{
13    CaptureStatus, ClientHello, ClientKind, DaemonRequest, DaemonResponse, DaemonStatus,
14    ObservedSession, PROTO_VERSION, ProxyEndpoint, ServerHello, WebEndpoint, read_frame,
15    write_frame,
16};
17use anyhow::{Context, Result, anyhow};
18use std::path::PathBuf;
19use std::process::{Command, Stdio};
20use std::time::{Duration, Instant};
21use tokio::net::UnixStream;
22
23const PID_FILE: &str = "daemon.pid";
24const SOCK_FILE: &str = "daemon.sock";
25const LOG_FILE: &str = "daemon.log";
26const START_WAIT_MS: u64 = 2_000;
27
28#[derive(Debug, Clone)]
29pub struct RuntimePaths {
30    pub dir: PathBuf,
31    pub pid: PathBuf,
32    pub sock: PathBuf,
33    pub log: PathBuf,
34}
35
36#[derive(Debug, Clone)]
37pub struct BackgroundStart {
38    pub pid: u32,
39    pub paths: RuntimePaths,
40    pub already_running: bool,
41    pub web: Option<WebEndpoint>,
42}
43
44#[derive(Debug, Clone)]
45pub enum DaemonStatusOutcome {
46    Running(DaemonStatus),
47    Stopped { socket: PathBuf },
48}
49
50pub fn enabled() -> bool {
51    if let Ok(v) = std::env::var("KAIZEN_DAEMON") {
52        return v != "0";
53    }
54    std::env::args()
55        .next()
56        .and_then(|p| PathBuf::from(p).file_stem().map(|s| s.to_owned()))
57        .and_then(|s| s.to_str().map(str::to_string))
58        .is_some_and(|name| name == "kaizen")
59}
60
61pub fn runtime_paths() -> Result<RuntimePaths> {
62    let dir = kaizen_dir().ok_or_else(|| anyhow!("KAIZEN_HOME/HOME not set"))?;
63    Ok(RuntimePaths {
64        pid: dir.join(PID_FILE),
65        sock: dir.join(SOCK_FILE),
66        log: dir.join(LOG_FILE),
67        dir,
68    })
69}
70
71pub fn ensure_running() -> Result<()> {
72    if !enabled() || try_status().is_ok() {
73        return Ok(());
74    }
75    start_background().map(|_| ())
76}
77
78pub fn start_background() -> Result<BackgroundStart> {
79    let paths = runtime_paths()?;
80    if let Ok(status) = try_status() {
81        return Ok(BackgroundStart {
82            pid: status.pid,
83            paths,
84            already_running: true,
85            web: status.web,
86        });
87    }
88    std::fs::create_dir_all(&paths.dir)?;
89    let log = std::fs::OpenOptions::new()
90        .create(true)
91        .append(true)
92        .open(&paths.log)
93        .with_context(|| format!("open daemon log: {}", paths.log.display()))?;
94    let err = log.try_clone()?;
95    let mut child = Command::new(std::env::current_exe()?)
96        .args(["daemon", "start"])
97        .stdin(Stdio::null())
98        .stdout(Stdio::from(log))
99        .stderr(Stdio::from(err))
100        .spawn()
101        .context("spawn kaizen daemon")?;
102    let deadline = Instant::now() + Duration::from_millis(START_WAIT_MS);
103    while Instant::now() < deadline {
104        if let Some(status) = child.try_wait().context("poll daemon child")? {
105            return Err(anyhow!(
106                "daemon exited before ready with status {status}; see {}",
107                paths.log.display()
108            ));
109        }
110        if let Ok(status) = try_status() {
111            return Ok(BackgroundStart {
112                pid: status.pid,
113                paths,
114                already_running: false,
115                web: status.web,
116            });
117        }
118        std::thread::sleep(Duration::from_millis(25));
119    }
120    Err(anyhow!(
121        "daemon did not become ready at {}; see {}",
122        paths.sock.display(),
123        paths.log.display()
124    ))
125}
126
127pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
128    ensure_running()?;
129    tokio::runtime::Runtime::new()?.block_on(request_async(request))
130}
131
132pub fn try_status() -> Result<DaemonStatus> {
133    let response =
134        tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Status))?;
135    match response {
136        DaemonResponse::Status(status) => Ok(status),
137        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
138        _ => Err(anyhow!("unexpected daemon status response")),
139    }
140}
141
142pub fn status_outcome() -> Result<DaemonStatusOutcome> {
143    match try_status() {
144        Ok(status) => Ok(DaemonStatusOutcome::Running(status)),
145        Err(err) if is_daemon_socket_connect_error(&err) => Ok(DaemonStatusOutcome::Stopped {
146            socket: runtime_paths()?.sock,
147        }),
148        Err(err) => Err(err),
149    }
150}
151
152fn is_daemon_socket_connect_error(err: &anyhow::Error) -> bool {
153    err.chain()
154        .any(|cause| cause.to_string().starts_with("connect daemon socket:"))
155}
156
157pub fn start_foreground() -> Result<()> {
158    tokio::runtime::Builder::new_multi_thread()
159        .enable_all()
160        .build()?
161        .block_on(server::run_server())
162}
163
164pub fn stop() -> Result<String> {
165    match tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Stop))? {
166        DaemonResponse::Ack { message } => Ok(message),
167        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
168        _ => Err(anyhow!("unexpected daemon stop response")),
169    }
170}
171
172pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
173    match request_blocking(DaemonRequest::Hello(ClientHello {
174        proto_version: PROTO_VERSION,
175        client,
176        workspace,
177    }))? {
178        DaemonResponse::Hello(hello) => Ok(hello),
179        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
180        _ => Err(anyhow!("unexpected daemon hello response")),
181    }
182}
183
184pub fn ensure_capture_blocking(workspace: String, deep: bool) -> Result<CaptureStatus> {
185    match request_blocking(DaemonRequest::EnsureWorkspaceCapture { workspace, deep })? {
186        DaemonResponse::CaptureStatus(status) => Ok(*status),
187        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
188        _ => Err(anyhow!("unexpected daemon capture response")),
189    }
190}
191
192pub fn ensure_proxy_blocking(workspace: String, provider: String) -> Result<ProxyEndpoint> {
193    match request_blocking(DaemonRequest::EnsureProxy {
194        workspace,
195        provider,
196    })? {
197        DaemonResponse::ProxyEndpoint(endpoint) => Ok(endpoint),
198        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
199        _ => Err(anyhow!("unexpected daemon proxy response")),
200    }
201}
202
203pub fn begin_observed_session_blocking(
204    workspace: String,
205    agent: String,
206) -> Result<ObservedSession> {
207    match request_blocking(DaemonRequest::BeginObservedSession { workspace, agent })? {
208        DaemonResponse::ObservedSession(session) => Ok(session),
209        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
210        _ => Err(anyhow!("unexpected daemon observe response")),
211    }
212}
213
214async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
215    let paths = runtime_paths()?;
216    let mut stream = UnixStream::connect(&paths.sock)
217        .await
218        .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
219    write_frame(&mut stream, &request).await?;
220    read_frame(&mut stream).await
221}