1mod capture_status;
5mod proxy_task;
6mod scanner_task;
7mod server;
8mod supervisor;
9mod worker;
10
11use crate::core::paths::kaizen_dir;
12use crate::ipc::{
13 CaptureStatus, ClientHello, ClientKind, DaemonRequest, DaemonResponse, DaemonStatus,
14 ObservedSession, PROTO_VERSION, ProxyEndpoint, ServerHello, read_frame, write_frame,
15};
16use anyhow::{Context, Result, anyhow};
17use std::path::PathBuf;
18use std::process::{Command, Stdio};
19use std::time::{Duration, Instant};
20use tokio::net::UnixStream;
21
22const PID_FILE: &str = "daemon.pid";
23const SOCK_FILE: &str = "daemon.sock";
24const LOG_FILE: &str = "daemon.log";
25const START_WAIT_MS: u64 = 2_000;
26
27#[derive(Debug, Clone)]
28pub struct RuntimePaths {
29 pub dir: PathBuf,
30 pub pid: PathBuf,
31 pub sock: PathBuf,
32 pub log: PathBuf,
33}
34
35#[derive(Debug, Clone)]
36pub struct BackgroundStart {
37 pub pid: u32,
38 pub paths: RuntimePaths,
39 pub already_running: bool,
40}
41
42#[derive(Debug, Clone)]
43pub enum DaemonStatusOutcome {
44 Running(DaemonStatus),
45 Stopped { socket: PathBuf },
46}
47
48pub fn enabled() -> bool {
49 if let Ok(v) = std::env::var("KAIZEN_DAEMON") {
50 return v != "0";
51 }
52 std::env::args()
53 .next()
54 .and_then(|p| PathBuf::from(p).file_stem().map(|s| s.to_owned()))
55 .and_then(|s| s.to_str().map(str::to_string))
56 .is_some_and(|name| name == "kaizen")
57}
58
59pub fn runtime_paths() -> Result<RuntimePaths> {
60 let dir = kaizen_dir().ok_or_else(|| anyhow!("KAIZEN_HOME/HOME not set"))?;
61 Ok(RuntimePaths {
62 pid: dir.join(PID_FILE),
63 sock: dir.join(SOCK_FILE),
64 log: dir.join(LOG_FILE),
65 dir,
66 })
67}
68
69pub fn ensure_running() -> Result<()> {
70 if !enabled() || try_status().is_ok() {
71 return Ok(());
72 }
73 start_background().map(|_| ())
74}
75
76pub fn start_background() -> Result<BackgroundStart> {
77 let paths = runtime_paths()?;
78 if let Ok(status) = try_status() {
79 return Ok(BackgroundStart {
80 pid: status.pid,
81 paths,
82 already_running: true,
83 });
84 }
85 std::fs::create_dir_all(&paths.dir)?;
86 let log = std::fs::OpenOptions::new()
87 .create(true)
88 .append(true)
89 .open(&paths.log)
90 .with_context(|| format!("open daemon log: {}", paths.log.display()))?;
91 let err = log.try_clone()?;
92 let mut child = Command::new(std::env::current_exe()?)
93 .args(["daemon", "start"])
94 .stdin(Stdio::null())
95 .stdout(Stdio::from(log))
96 .stderr(Stdio::from(err))
97 .spawn()
98 .context("spawn kaizen daemon")?;
99 let deadline = Instant::now() + Duration::from_millis(START_WAIT_MS);
100 while Instant::now() < deadline {
101 if let Some(status) = child.try_wait().context("poll daemon child")? {
102 return Err(anyhow!(
103 "daemon exited before ready with status {status}; see {}",
104 paths.log.display()
105 ));
106 }
107 if let Ok(status) = try_status() {
108 return Ok(BackgroundStart {
109 pid: status.pid,
110 paths,
111 already_running: false,
112 });
113 }
114 std::thread::sleep(Duration::from_millis(25));
115 }
116 Err(anyhow!(
117 "daemon did not become ready at {}; see {}",
118 paths.sock.display(),
119 paths.log.display()
120 ))
121}
122
123pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
124 ensure_running()?;
125 tokio::runtime::Runtime::new()?.block_on(request_async(request))
126}
127
128pub fn try_status() -> Result<DaemonStatus> {
129 let response =
130 tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Status))?;
131 match response {
132 DaemonResponse::Status(status) => Ok(status),
133 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
134 _ => Err(anyhow!("unexpected daemon status response")),
135 }
136}
137
138pub fn status_outcome() -> Result<DaemonStatusOutcome> {
139 match try_status() {
140 Ok(status) => Ok(DaemonStatusOutcome::Running(status)),
141 Err(err) if is_daemon_socket_connect_error(&err) => Ok(DaemonStatusOutcome::Stopped {
142 socket: runtime_paths()?.sock,
143 }),
144 Err(err) => Err(err),
145 }
146}
147
148fn is_daemon_socket_connect_error(err: &anyhow::Error) -> bool {
149 err.chain()
150 .any(|cause| cause.to_string().starts_with("connect daemon socket:"))
151}
152
153pub fn start_foreground() -> Result<()> {
154 tokio::runtime::Builder::new_multi_thread()
155 .enable_all()
156 .build()?
157 .block_on(server::run_server())
158}
159
160pub fn stop() -> Result<String> {
161 match tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Stop))? {
162 DaemonResponse::Ack { message } => Ok(message),
163 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
164 _ => Err(anyhow!("unexpected daemon stop response")),
165 }
166}
167
168pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
169 match request_blocking(DaemonRequest::Hello(ClientHello {
170 proto_version: PROTO_VERSION,
171 client,
172 workspace,
173 }))? {
174 DaemonResponse::Hello(hello) => Ok(hello),
175 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
176 _ => Err(anyhow!("unexpected daemon hello response")),
177 }
178}
179
180pub fn ensure_capture_blocking(workspace: String, deep: bool) -> Result<CaptureStatus> {
181 match request_blocking(DaemonRequest::EnsureWorkspaceCapture { workspace, deep })? {
182 DaemonResponse::CaptureStatus(status) => Ok(*status),
183 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
184 _ => Err(anyhow!("unexpected daemon capture response")),
185 }
186}
187
188pub fn ensure_proxy_blocking(workspace: String, provider: String) -> Result<ProxyEndpoint> {
189 match request_blocking(DaemonRequest::EnsureProxy {
190 workspace,
191 provider,
192 })? {
193 DaemonResponse::ProxyEndpoint(endpoint) => Ok(endpoint),
194 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
195 _ => Err(anyhow!("unexpected daemon proxy response")),
196 }
197}
198
199pub fn begin_observed_session_blocking(
200 workspace: String,
201 agent: String,
202) -> Result<ObservedSession> {
203 match request_blocking(DaemonRequest::BeginObservedSession { workspace, agent })? {
204 DaemonResponse::ObservedSession(session) => Ok(session),
205 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
206 _ => Err(anyhow!("unexpected daemon observe response")),
207 }
208}
209
210async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
211 let paths = runtime_paths()?;
212 let mut stream = UnixStream::connect(&paths.sock)
213 .await
214 .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
215 write_frame(&mut stream, &request).await?;
216 read_frame(&mut stream).await
217}