openlatch_client/daemon/
mod.rs1pub mod auth;
9pub mod dedup;
10pub mod handlers;
11
12use std::sync::atomic::AtomicU64;
13use std::sync::{Arc, Mutex};
14
15use axum::{
16 extract::{DefaultBodyLimit, Request},
17 http::{header::CONTENT_TYPE, StatusCode},
18 middleware::{self, Next},
19 response::Response,
20 routing::get,
21 routing::post,
22 Router,
23};
24use tokio::net::TcpListener;
25
26use crate::config::Config;
27use crate::logging::EventLogger;
28use crate::privacy::PrivacyFilter;
29use crate::update;
30
31pub struct AppState {
36 pub config: Arc<Config>,
38 pub token: String,
41 pub dedup: dedup::DedupStore,
43 pub event_logger: EventLogger,
45 pub privacy_filter: PrivacyFilter,
47 pub event_counter: AtomicU64,
49 pub shutdown_tx: tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
52 pub started_at: std::time::Instant,
54 pub available_update: Mutex<Option<String>>,
57}
58
59impl AppState {
60 pub fn set_available_update(&self, version: String) {
62 if let Ok(mut guard) = self.available_update.lock() {
63 *guard = Some(version);
64 }
65 }
66
67 pub fn get_available_update(&self) -> Option<String> {
69 self.available_update.lock().ok().and_then(|g| g.clone())
70 }
71}
72
73pub async fn start_server(config: Config, token: String) -> anyhow::Result<(u64, u64)> {
85 let bind_host = if std::env::var("OPENLATCH_BIND_ALL").is_ok() {
88 "0.0.0.0"
89 } else {
90 "127.0.0.1"
91 };
92 let bind_addr = format!("{}:{}", bind_host, config.port);
93 let listener = TcpListener::bind(&bind_addr).await?;
94
95 tracing::info!(
96 port = config.port,
97 addr = %bind_addr,
98 "daemon listening"
99 );
100
101 if let Err(e) = crate::config::write_port_file(config.port) {
103 tracing::warn!(error = %e, "failed to write daemon.port file");
104 }
105
106 serve_with_listener(listener, config, token).await
107}
108
109pub async fn start_server_with_listener(
118 listener: TcpListener,
119 config: Config,
120 token: String,
121) -> anyhow::Result<(u64, u64)> {
122 serve_with_listener(listener, config, token).await
123}
124
125async fn serve_with_listener(
127 listener: TcpListener,
128 config: Config,
129 token: String,
130) -> anyhow::Result<(u64, u64)> {
131 let log_dir = config.log_dir.clone();
132 let (event_logger, logger_handle) = EventLogger::new(log_dir.clone());
133
134 let privacy_filter = PrivacyFilter::new(&config.extra_patterns);
135
136 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
137
138 let state = Arc::new(AppState {
139 config: Arc::new(config.clone()),
140 token,
141 dedup: dedup::DedupStore::new(),
142 event_logger,
143 privacy_filter,
144 event_counter: AtomicU64::new(0),
145 shutdown_tx: tokio::sync::Mutex::new(Some(shutdown_tx)),
146 started_at: std::time::Instant::now(),
147 available_update: Mutex::new(None),
148 });
149
150 if config.update.check {
152 let current = env!("CARGO_PKG_VERSION").to_string();
153 let state_for_update = state.clone();
154 tokio::spawn(async move {
155 if let Some(latest) = update::check_for_update(¤t).await {
156 tracing::warn!(code = crate::error::ERR_VERSION_OUTDATED, latest_version = %latest, "Update available: run `npx openlatch@latest`");
157 state_for_update.set_available_update(latest);
158 }
159 });
160 }
161
162 let state_for_evict = state.clone();
164 tokio::spawn(async move {
165 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
166 loop {
167 interval.tick().await;
168 state_for_evict.dedup.evict_expired();
169 }
170 });
171
172 let hook_routes = Router::new()
174 .route("/hooks/pre-tool-use", post(handlers::pre_tool_use))
175 .route(
176 "/hooks/user-prompt-submit",
177 post(handlers::user_prompt_submit),
178 )
179 .route("/hooks/stop", post(handlers::stop))
180 .route_layer(middleware::from_fn(require_json_content_type))
181 .route_layer(middleware::from_fn_with_state(
182 state.clone(),
183 auth::bearer_auth,
184 ));
185
186 let shutdown_route = Router::new()
188 .route("/shutdown", post(handlers::shutdown_handler))
189 .route_layer(middleware::from_fn_with_state(
190 state.clone(),
191 auth::bearer_auth,
192 ));
193
194 let public_routes = Router::new()
196 .route("/health", get(handlers::health))
197 .route("/metrics", get(handlers::metrics));
198
199 let app = Router::new()
200 .merge(hook_routes)
201 .merge(shutdown_route)
202 .merge(public_routes)
203 .layer(DefaultBodyLimit::max(1_048_576))
205 .with_state(state.clone());
206
207 axum::serve(listener, app)
208 .with_graceful_shutdown(async move {
209 tokio::select! {
210 _ = signal_handler() => {
211 tracing::info!("received OS shutdown signal");
212 }
213 _ = shutdown_rx => {
214 tracing::info!("received shutdown via /shutdown endpoint");
215 }
216 }
217 })
218 .await?;
219
220 let uptime_secs = state.started_at.elapsed().as_secs();
222 let events = state
223 .event_counter
224 .load(std::sync::atomic::Ordering::Relaxed);
225
226 crate::logging::daemon_log::log_shutdown(uptime_secs, events);
227
228 match Arc::try_unwrap(state) {
231 Ok(_state) => { }
232 Err(arc) => {
233 tracing::warn!(
234 strong_refs = Arc::strong_count(&arc),
235 "AppState still has references at shutdown — log drain may be incomplete"
236 );
237 drop(arc);
238 }
239 }
240 logger_handle.shutdown().await;
241
242 Ok((uptime_secs, events))
243}
244
245pub fn format_uptime(secs: u64) -> String {
249 let hours = secs / 3600;
250 let minutes = (secs % 3600) / 60;
251 let seconds = secs % 60;
252 if hours > 0 {
253 format!("{}h{}m", hours, minutes)
254 } else if minutes > 0 {
255 format!("{}m{}s", minutes, seconds)
256 } else {
257 format!("{}s", seconds)
258 }
259}
260
261async fn require_json_content_type(request: Request, next: Next) -> Result<Response, StatusCode> {
266 let ct = request
267 .headers()
268 .get(CONTENT_TYPE)
269 .and_then(|v| v.to_str().ok())
270 .unwrap_or("");
271 if !ct.starts_with("application/json") {
272 return Err(StatusCode::UNSUPPORTED_MEDIA_TYPE);
273 }
274 Ok(next.run(request).await)
275}
276
277async fn signal_handler() {
279 #[cfg(unix)]
280 {
281 use tokio::signal::unix::{signal, SignalKind};
282 let mut sigterm =
283 signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
284 tokio::select! {
285 _ = tokio::signal::ctrl_c() => {}
286 _ = sigterm.recv() => {}
287 }
288 }
289 #[cfg(not(unix))]
290 {
291 tokio::signal::ctrl_c()
292 .await
293 .expect("failed to register ctrl_c handler");
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn test_format_uptime_seconds_only() {
303 assert_eq!(format_uptime(0), "0s");
304 assert_eq!(format_uptime(45), "45s");
305 assert_eq!(format_uptime(59), "59s");
306 }
307
308 #[test]
309 fn test_format_uptime_minutes_and_seconds() {
310 assert_eq!(format_uptime(60), "1m0s");
311 assert_eq!(format_uptime(192), "3m12s");
312 assert_eq!(format_uptime(3599), "59m59s");
313 }
314
315 #[test]
316 fn test_format_uptime_hours_and_minutes() {
317 assert_eq!(format_uptime(3600), "1h0m");
318 assert_eq!(format_uptime(8094), "2h14m");
319 assert_eq!(format_uptime(7200), "2h0m");
320 }
321}