Skip to main content

kaizen/daemon/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Local daemon client/lifecycle API.
3
4mod server;
5mod worker;
6
7use crate::core::paths::kaizen_dir;
8use crate::ipc::{
9    ClientHello, ClientKind, DaemonRequest, DaemonResponse, DaemonStatus, PROTO_VERSION,
10    ServerHello, read_frame, write_frame,
11};
12use anyhow::{Context, Result, anyhow};
13use std::path::PathBuf;
14use std::process::{Command, Stdio};
15use std::time::{Duration, Instant};
16use tokio::net::UnixStream;
17
18const PID_FILE: &str = "daemon.pid";
19const SOCK_FILE: &str = "daemon.sock";
20const LOG_FILE: &str = "daemon.log";
21const START_WAIT_MS: u64 = 2_000;
22
23#[derive(Debug, Clone)]
24pub struct RuntimePaths {
25    pub dir: PathBuf,
26    pub pid: PathBuf,
27    pub sock: PathBuf,
28    pub log: PathBuf,
29}
30
31#[derive(Debug, Clone)]
32pub struct BackgroundStart {
33    pub pid: u32,
34    pub paths: RuntimePaths,
35    pub already_running: bool,
36}
37
38#[derive(Debug, Clone)]
39pub enum DaemonStatusOutcome {
40    Running(DaemonStatus),
41    Stopped { socket: PathBuf },
42}
43
44pub fn enabled() -> bool {
45    if let Ok(v) = std::env::var("KAIZEN_DAEMON") {
46        return v != "0";
47    }
48    std::env::args()
49        .next()
50        .and_then(|p| PathBuf::from(p).file_stem().map(|s| s.to_owned()))
51        .and_then(|s| s.to_str().map(str::to_string))
52        .is_some_and(|name| name == "kaizen")
53}
54
55pub fn runtime_paths() -> Result<RuntimePaths> {
56    let dir = kaizen_dir().ok_or_else(|| anyhow!("KAIZEN_HOME/HOME not set"))?;
57    Ok(RuntimePaths {
58        pid: dir.join(PID_FILE),
59        sock: dir.join(SOCK_FILE),
60        log: dir.join(LOG_FILE),
61        dir,
62    })
63}
64
65pub fn ensure_running() -> Result<()> {
66    if !enabled() || try_status().is_ok() {
67        return Ok(());
68    }
69    start_background().map(|_| ())
70}
71
72pub fn start_background() -> Result<BackgroundStart> {
73    let paths = runtime_paths()?;
74    if let Ok(status) = try_status() {
75        return Ok(BackgroundStart {
76            pid: status.pid,
77            paths,
78            already_running: true,
79        });
80    }
81    std::fs::create_dir_all(&paths.dir)?;
82    let log = std::fs::OpenOptions::new()
83        .create(true)
84        .append(true)
85        .open(&paths.log)
86        .with_context(|| format!("open daemon log: {}", paths.log.display()))?;
87    let err = log.try_clone()?;
88    let mut child = Command::new(std::env::current_exe()?)
89        .args(["daemon", "start"])
90        .stdin(Stdio::null())
91        .stdout(Stdio::from(log))
92        .stderr(Stdio::from(err))
93        .spawn()
94        .context("spawn kaizen daemon")?;
95    let deadline = Instant::now() + Duration::from_millis(START_WAIT_MS);
96    while Instant::now() < deadline {
97        if let Some(status) = child.try_wait().context("poll daemon child")? {
98            return Err(anyhow!(
99                "daemon exited before ready with status {status}; see {}",
100                paths.log.display()
101            ));
102        }
103        if let Ok(status) = try_status() {
104            return Ok(BackgroundStart {
105                pid: status.pid,
106                paths,
107                already_running: false,
108            });
109        }
110        std::thread::sleep(Duration::from_millis(25));
111    }
112    Err(anyhow!(
113        "daemon did not become ready at {}; see {}",
114        paths.sock.display(),
115        paths.log.display()
116    ))
117}
118
119pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
120    ensure_running()?;
121    tokio::runtime::Runtime::new()?.block_on(request_async(request))
122}
123
124pub fn try_status() -> Result<DaemonStatus> {
125    let response =
126        tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Status))?;
127    match response {
128        DaemonResponse::Status(status) => Ok(status),
129        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
130        _ => Err(anyhow!("unexpected daemon status response")),
131    }
132}
133
134pub fn status_outcome() -> Result<DaemonStatusOutcome> {
135    match try_status() {
136        Ok(status) => Ok(DaemonStatusOutcome::Running(status)),
137        Err(err) if is_daemon_socket_connect_error(&err) => Ok(DaemonStatusOutcome::Stopped {
138            socket: runtime_paths()?.sock,
139        }),
140        Err(err) => Err(err),
141    }
142}
143
144fn is_daemon_socket_connect_error(err: &anyhow::Error) -> bool {
145    err.chain()
146        .any(|cause| cause.to_string().starts_with("connect daemon socket:"))
147}
148
149pub fn start_foreground() -> Result<()> {
150    tokio::runtime::Builder::new_multi_thread()
151        .enable_all()
152        .build()?
153        .block_on(server::run_server())
154}
155
156pub fn stop() -> Result<String> {
157    match tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Stop))? {
158        DaemonResponse::Ack { message } => Ok(message),
159        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
160        _ => Err(anyhow!("unexpected daemon stop response")),
161    }
162}
163
164pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
165    match request_blocking(DaemonRequest::Hello(ClientHello {
166        proto_version: PROTO_VERSION,
167        client,
168        workspace,
169    }))? {
170        DaemonResponse::Hello(hello) => Ok(hello),
171        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
172        _ => Err(anyhow!("unexpected daemon hello response")),
173    }
174}
175
176async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
177    let paths = runtime_paths()?;
178    let mut stream = UnixStream::connect(&paths.sock)
179        .await
180        .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
181    write_frame(&mut stream, &request).await?;
182    read_frame(&mut stream).await
183}