1mod capture_status;
5mod proxy_task;
6mod scanner_task;
7mod server;
8mod supervisor;
9mod worker;
10
11use crate::core::paths::kaizen_dir;
12use crate::ipc::{
13 CaptureStatus, ClientHello, ClientKind, DaemonRequest, DaemonResponse, DaemonStatus,
14 ObservedSession, PROTO_VERSION, ProxyEndpoint, ServerHello, WebEndpoint, read_frame,
15 write_frame,
16};
17use anyhow::{Context, Result, anyhow};
18use std::path::PathBuf;
19use std::process::{Command, Stdio};
20use std::time::{Duration, Instant};
21use tokio::net::UnixStream;
22
23const PID_FILE: &str = "daemon.pid";
24const SOCK_FILE: &str = "daemon.sock";
25const LOG_FILE: &str = "daemon.log";
26const START_WAIT_MS: u64 = 2_000;
27
28#[derive(Debug, Clone)]
29pub struct RuntimePaths {
30 pub dir: PathBuf,
31 pub pid: PathBuf,
32 pub sock: PathBuf,
33 pub log: PathBuf,
34}
35
36#[derive(Debug, Clone)]
37pub struct BackgroundStart {
38 pub pid: u32,
39 pub paths: RuntimePaths,
40 pub already_running: bool,
41 pub web: Option<WebEndpoint>,
42}
43
44#[derive(Debug, Clone)]
45pub enum DaemonStatusOutcome {
46 Running(DaemonStatus),
47 Stopped { socket: PathBuf },
48}
49
50pub fn enabled() -> bool {
51 if let Ok(v) = std::env::var("KAIZEN_DAEMON") {
52 return v != "0";
53 }
54 std::env::args()
55 .next()
56 .and_then(|p| PathBuf::from(p).file_stem().map(|s| s.to_owned()))
57 .and_then(|s| s.to_str().map(str::to_string))
58 .is_some_and(|name| name == "kaizen")
59}
60
61pub fn runtime_paths() -> Result<RuntimePaths> {
62 let dir = kaizen_dir().ok_or_else(|| anyhow!("KAIZEN_HOME/HOME not set"))?;
63 Ok(RuntimePaths {
64 pid: dir.join(PID_FILE),
65 sock: dir.join(SOCK_FILE),
66 log: dir.join(LOG_FILE),
67 dir,
68 })
69}
70
71pub fn ensure_running() -> Result<()> {
72 if !enabled() || try_status().is_ok() {
73 return Ok(());
74 }
75 start_background().map(|_| ())
76}
77
78pub fn start_background() -> Result<BackgroundStart> {
79 let paths = runtime_paths()?;
80 if let Ok(status) = try_status() {
81 return Ok(BackgroundStart {
82 pid: status.pid,
83 paths,
84 already_running: true,
85 web: status.web,
86 });
87 }
88 std::fs::create_dir_all(&paths.dir)?;
89 let log = std::fs::OpenOptions::new()
90 .create(true)
91 .append(true)
92 .open(&paths.log)
93 .with_context(|| format!("open daemon log: {}", paths.log.display()))?;
94 let err = log.try_clone()?;
95 let mut child = Command::new(std::env::current_exe()?)
96 .args(["daemon", "start"])
97 .stdin(Stdio::null())
98 .stdout(Stdio::from(log))
99 .stderr(Stdio::from(err))
100 .spawn()
101 .context("spawn kaizen daemon")?;
102 let deadline = Instant::now() + Duration::from_millis(START_WAIT_MS);
103 while Instant::now() < deadline {
104 if let Some(status) = child.try_wait().context("poll daemon child")? {
105 return Err(anyhow!(
106 "daemon exited before ready with status {status}; see {}",
107 paths.log.display()
108 ));
109 }
110 if let Ok(status) = try_status() {
111 return Ok(BackgroundStart {
112 pid: status.pid,
113 paths,
114 already_running: false,
115 web: status.web,
116 });
117 }
118 std::thread::sleep(Duration::from_millis(25));
119 }
120 Err(anyhow!(
121 "daemon did not become ready at {}; see {}",
122 paths.sock.display(),
123 paths.log.display()
124 ))
125}
126
127pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
128 ensure_running()?;
129 tokio::runtime::Runtime::new()?.block_on(request_async(request))
130}
131
132pub fn try_status() -> Result<DaemonStatus> {
133 let response =
134 tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Status))?;
135 match response {
136 DaemonResponse::Status(status) => Ok(status),
137 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
138 _ => Err(anyhow!("unexpected daemon status response")),
139 }
140}
141
142pub fn status_outcome() -> Result<DaemonStatusOutcome> {
143 match try_status() {
144 Ok(status) => Ok(DaemonStatusOutcome::Running(status)),
145 Err(err) if is_daemon_socket_connect_error(&err) => Ok(DaemonStatusOutcome::Stopped {
146 socket: runtime_paths()?.sock,
147 }),
148 Err(err) => Err(err),
149 }
150}
151
152fn is_daemon_socket_connect_error(err: &anyhow::Error) -> bool {
153 err.chain()
154 .any(|cause| cause.to_string().starts_with("connect daemon socket:"))
155}
156
157pub fn start_foreground() -> Result<()> {
158 tokio::runtime::Builder::new_multi_thread()
159 .enable_all()
160 .build()?
161 .block_on(server::run_server())
162}
163
164pub fn stop() -> Result<String> {
165 match tokio::runtime::Runtime::new()?.block_on(request_async(DaemonRequest::Stop))? {
166 DaemonResponse::Ack { message } => Ok(message),
167 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
168 _ => Err(anyhow!("unexpected daemon stop response")),
169 }
170}
171
172pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
173 match request_blocking(DaemonRequest::Hello(ClientHello {
174 proto_version: PROTO_VERSION,
175 client,
176 workspace,
177 }))? {
178 DaemonResponse::Hello(hello) => Ok(hello),
179 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
180 _ => Err(anyhow!("unexpected daemon hello response")),
181 }
182}
183
184pub fn ensure_capture_blocking(workspace: String, deep: bool) -> Result<CaptureStatus> {
185 match request_blocking(DaemonRequest::EnsureWorkspaceCapture { workspace, deep })? {
186 DaemonResponse::CaptureStatus(status) => Ok(*status),
187 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
188 _ => Err(anyhow!("unexpected daemon capture response")),
189 }
190}
191
192pub fn ensure_proxy_blocking(workspace: String, provider: String) -> Result<ProxyEndpoint> {
193 match request_blocking(DaemonRequest::EnsureProxy {
194 workspace,
195 provider,
196 })? {
197 DaemonResponse::ProxyEndpoint(endpoint) => Ok(endpoint),
198 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
199 _ => Err(anyhow!("unexpected daemon proxy response")),
200 }
201}
202
203pub fn begin_observed_session_blocking(
204 workspace: String,
205 agent: String,
206) -> Result<ObservedSession> {
207 match request_blocking(DaemonRequest::BeginObservedSession { workspace, agent })? {
208 DaemonResponse::ObservedSession(session) => Ok(session),
209 DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
210 _ => Err(anyhow!("unexpected daemon observe response")),
211 }
212}
213
214async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
215 let paths = runtime_paths()?;
216 let mut stream = UnixStream::connect(&paths.sock)
217 .await
218 .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
219 write_frame(&mut stream, &request).await?;
220 read_frame(&mut stream).await
221}