1mod server;
5mod worker;
6
7use crate::core::paths::kaizen_dir;
8use crate::ipc::{
9 ClientHello, ClientKind, DaemonRequest, DaemonResponse, DaemonStatus, PROTO_VERSION,
10 ServerHello, read_frame, write_frame,
11};
12use anyhow::{Context, Result, anyhow};
13use std::path::PathBuf;
14use std::process::{Command, Stdio};
15use std::time::{Duration, Instant};
16use tokio::net::UnixStream;
17
18const PID_FILE: &str = "daemon.pid";
19const SOCK_FILE: &str = "daemon.sock";
20const LOG_FILE: &str = "daemon.log";
21const START_WAIT_MS: u64 = 2_000;
22
23#[derive(Debug, Clone)]
24pub struct RuntimePaths {
25 pub dir: PathBuf,
26 pub pid: PathBuf,
27 pub sock: PathBuf,
28 pub log: PathBuf,
29}
30
31#[derive(Debug, Clone)]
32pub struct BackgroundStart {
33 pub pid: u32,
34 pub paths: RuntimePaths,
35 pub already_running: bool,
36}
37
38#[derive(Debug, Clone)]
39pub enum DaemonStatusOutcome {
40 Running(DaemonStatus),
41 Stopped { socket: PathBuf },
42}
43
44pub fn enabled() -> bool {
45 if let Ok(v) = std::env::var("KAIZEN_DAEMON") {
46 return v != "0";
47 }
48 std::env::args()
49 .next()
50 .and_then(|p| PathBuf::from(p).file_stem().map(|s| s.to_owned()))
51 .and_then(|s| s.to_str().map(str::to_string))
52 .is_some_and(|name| name == "kaizen")
53}
54
55pub fn runtime_paths() -> Result<RuntimePaths> {
56 let dir = kaizen_dir().ok_or_else(|| anyhow!("KAIZEN_HOME/HOME not set"))?;
57 Ok(RuntimePaths {
58 pid: dir.join(PID_FILE),
59 sock: dir.join(SOCK_FILE),
60 log: dir.join(LOG_FILE),
61 dir,
62 })
63}
64
65pub fn ensure_running() -> Result<()> {
66 if !enabled() || try_status().is_ok() {
67 return Ok(());
68 }
69 start_background().map(|_| ())
70}
71
72pub fn start_background() -> Result<BackgroundStart> {
73 let paths = runtime_paths()?;
74 if let Ok(status) = try_status() {
75 return Ok(BackgroundStart {
76 pid: status.pid,
77 paths,
78 already_running: true,
79 });
80 }
81 std::fs::create_dir_all(&paths.dir)?;
82 let log = std::fs::OpenOptions::new()
83 .create(true)
84 .append(true)
85 .open(&paths.log)
86 .with_context(|| format!("open daemon log: {}", paths.log.display()))?;
87 let err = log.try_clone()?;
88 let mut child = Command::new(std::env::current_exe()?)
89 .args(["daemon", "start"])
90 .stdin(Stdio::null())
91 .stdout(Stdio::from(log))
92 .stderr(Stdio::from(err))
93 .spawn()
94 .context("spawn kaizen daemon")?;
95 let deadline = Instant::now() + Duration::from_millis(START_WAIT_MS);
96 while Instant::now() < deadline {
97 if let Some(status) = child.try_wait().context("poll daemon child")? {
98 return Err(anyhow!(
99 "daemon exited before ready with status {status}; see {}",
100 paths.log.display()
101 ));
102 }
103 if let Ok(status) = try_status() {
104 return Ok(BackgroundStart {
105 pid: status.pid,
106 paths,
107 already_running: false,
108 });
109 }
110 std::thread::sleep(Duration::from_millis(25));
111 }
112 Err(anyhow!(
113 "daemon did not become ready at {}; see {}",
114 paths.sock.display(),
115 paths.log.display()
116 ))
117}
118
119pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
120 ensure_running()?;
121 tokio::runtime::Runtime::new()?.block_on(request_async(request))
122}
123
124pub fn try_status() -> Result<DaemonStatus> {
125 let response =
126 tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Status))?;
127 match response {
128 DaemonResponse::Status(status) => Ok(status),
129 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
130 _ => Err(anyhow!("unexpected daemon status response")),
131 }
132}
133
134pub fn status_outcome() -> Result<DaemonStatusOutcome> {
135 match try_status() {
136 Ok(status) => Ok(DaemonStatusOutcome::Running(status)),
137 Err(err) if is_daemon_socket_connect_error(&err) => Ok(DaemonStatusOutcome::Stopped {
138 socket: runtime_paths()?.sock,
139 }),
140 Err(err) => Err(err),
141 }
142}
143
144fn is_daemon_socket_connect_error(err: &anyhow::Error) -> bool {
145 err.chain()
146 .any(|cause| cause.to_string().starts_with("connect daemon socket:"))
147}
148
149pub fn start_foreground() -> Result<()> {
150 tokio::runtime::Builder::new_multi_thread()
151 .enable_all()
152 .build()?
153 .block_on(server::run_server())
154}
155
156pub fn stop() -> Result<String> {
157 match tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Stop))? {
158 DaemonResponse::Ack { message } => Ok(message),
159 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
160 _ => Err(anyhow!("unexpected daemon stop response")),
161 }
162}
163
164pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
165 match request_blocking(DaemonRequest::Hello(ClientHello {
166 proto_version: PROTO_VERSION,
167 client,
168 workspace,
169 }))? {
170 DaemonResponse::Hello(hello) => Ok(hello),
171 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
172 _ => Err(anyhow!("unexpected daemon hello response")),
173 }
174}
175
176async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
177 let paths = runtime_paths()?;
178 let mut stream = UnixStream::connect(&paths.sock)
179 .await
180 .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
181 write_frame(&mut stream, &request).await?;
182 read_frame(&mut stream).await
183}