Skip to main content

openlatch_client/daemon/
mod.rs

1/// Daemon HTTP server — composes all leaf modules into the running service.
2///
3/// This module provides:
4/// - [`AppState`]: shared state cloned into every axum handler via `Arc`
5/// - [`start_server`]: entry point that builds the router, binds TCP, and runs to completion
6/// - Signal handling: graceful shutdown on SIGTERM (Unix) or Ctrl+C (all platforms)
7/// - Route layout: authenticated POST routes + unauthenticated GET routes
8pub mod auth;
9pub mod dedup;
10pub mod handlers;
11
12use std::sync::atomic::AtomicU64;
13use std::sync::{Arc, Mutex};
14
15use axum::{
16    extract::{DefaultBodyLimit, Request},
17    http::{header::CONTENT_TYPE, StatusCode},
18    middleware::{self, Next},
19    response::Response,
20    routing::get,
21    routing::post,
22    Router,
23};
24use tokio::net::TcpListener;
25
26use crate::config::Config;
27use crate::logging::EventLogger;
28use crate::privacy::PrivacyFilter;
29use crate::update;
30
31/// Shared state injected into every axum handler via `Arc<AppState>`.
32///
33/// Fields are either inherently thread-safe (`AtomicU64`, `DashMap`, `mpsc::Sender`)
34/// or wrapped in appropriate synchronization primitives.
35pub struct AppState {
36    /// Resolved daemon configuration (port, log dir, retention, etc.)
37    pub config: Arc<Config>,
38    /// Bearer token for authenticating POST requests.
39    /// SECURITY: Never log this value.
40    pub token: String,
41    /// In-memory dedup store with 100ms TTL.
42    pub dedup: dedup::DedupStore,
43    /// Async event logger (sends to background writer task via mpsc).
44    pub event_logger: EventLogger,
45    /// Pre-compiled privacy filter for credential masking.
46    pub privacy_filter: PrivacyFilter,
47    /// Total events processed (not counting deduped duplicates).
48    pub event_counter: AtomicU64,
49    /// Oneshot sender for triggering graceful shutdown via POST /shutdown.
50    /// Wrapped in Mutex so the handler can take ownership without `&mut self`.
51    pub shutdown_tx: tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
52    /// Wall-clock time when the daemon started (for uptime reporting).
53    pub started_at: std::time::Instant,
54    /// Latest available version string, populated by the async update check on startup.
55    /// `None` means either the check has not completed yet, or the current version is latest.
56    pub available_update: Mutex<Option<String>>,
57}
58
59impl AppState {
60    /// Store a newly discovered available version.
61    pub fn set_available_update(&self, version: String) {
62        if let Ok(mut guard) = self.available_update.lock() {
63            *guard = Some(version);
64        }
65    }
66
67    /// Return the latest available version, if one has been discovered.
68    pub fn get_available_update(&self) -> Option<String> {
69        self.available_update.lock().ok().and_then(|g| g.clone())
70    }
71}
72
73/// Start the daemon HTTP server and run until a shutdown signal is received.
74///
75/// Binds to `127.0.0.1:{config.port}`. The server shuts down gracefully on:
76/// - SIGTERM (Unix) or Ctrl+C (all platforms)
77/// - HTTP POST /shutdown (authenticated)
78///
79/// After shutdown, prints a summary to stderr and waits for the event logger to drain.
80///
81/// # Errors
82///
83/// Returns an error if the TCP listener cannot be bound (e.g., port in use).
84pub async fn start_server(config: Config, token: String) -> anyhow::Result<(u64, u64)> {
85    // SECURITY: Bind to 127.0.0.1 by default — only 0.0.0.0 inside containers
86    // where Docker network isolation provides the boundary instead.
87    let bind_host = if std::env::var("OPENLATCH_BIND_ALL").is_ok() {
88        "0.0.0.0"
89    } else {
90        "127.0.0.1"
91    };
92    let bind_addr = format!("{}:{}", bind_host, config.port);
93    let listener = TcpListener::bind(&bind_addr).await?;
94
95    tracing::info!(
96        port = config.port,
97        addr = %bind_addr,
98        "daemon listening"
99    );
100
101    // Write daemon.port file so the hook binary can discover the port
102    if let Err(e) = crate::config::write_port_file(config.port) {
103        tracing::warn!(error = %e, "failed to write daemon.port file");
104    }
105
106    serve_with_listener(listener, config, token).await
107}
108
109/// Start the daemon with a pre-bound TCP listener.
110///
111/// Accepts an already-bound listener — useful for integration tests where port 0
112/// is bound by the OS for a random free port, avoiding test conflicts.
113///
114/// # Errors
115///
116/// Returns an error if the axum server fails during operation.
117pub async fn start_server_with_listener(
118    listener: TcpListener,
119    config: Config,
120    token: String,
121) -> anyhow::Result<(u64, u64)> {
122    serve_with_listener(listener, config, token).await
123}
124
125/// Internal implementation: serve HTTP on the given listener.
126async fn serve_with_listener(
127    listener: TcpListener,
128    config: Config,
129    token: String,
130) -> anyhow::Result<(u64, u64)> {
131    let log_dir = config.log_dir.clone();
132    let (event_logger, logger_handle) = EventLogger::new(log_dir.clone());
133
134    let privacy_filter = PrivacyFilter::new(&config.extra_patterns);
135
136    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
137
138    let state = Arc::new(AppState {
139        config: Arc::new(config.clone()),
140        token,
141        dedup: dedup::DedupStore::new(),
142        event_logger,
143        privacy_filter,
144        event_counter: AtomicU64::new(0),
145        shutdown_tx: tokio::sync::Mutex::new(Some(shutdown_tx)),
146        started_at: std::time::Instant::now(),
147        available_update: Mutex::new(None),
148    });
149
150    // UPDT-01: Spawn async update check at startup (T-02-14: 2s timeout, non-blocking)
151    if config.update.check {
152        let current = env!("CARGO_PKG_VERSION").to_string();
153        let state_for_update = state.clone();
154        tokio::spawn(async move {
155            if let Some(latest) = update::check_for_update(&current).await {
156                tracing::warn!(code = crate::error::ERR_VERSION_OUTDATED, latest_version = %latest, "Update available: run `npx openlatch@latest`");
157                state_for_update.set_available_update(latest);
158            }
159        });
160    }
161
162    // Spawn periodic dedup eviction to prevent unbounded memory growth
163    let state_for_evict = state.clone();
164    tokio::spawn(async move {
165        let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
166        loop {
167            interval.tick().await;
168            state_for_evict.dedup.evict_expired();
169        }
170    });
171
172    // Hook routes — require Bearer token + JSON content type
173    let hook_routes = Router::new()
174        .route("/hooks/pre-tool-use", post(handlers::pre_tool_use))
175        .route(
176            "/hooks/user-prompt-submit",
177            post(handlers::user_prompt_submit),
178        )
179        .route("/hooks/stop", post(handlers::stop))
180        .route_layer(middleware::from_fn(require_json_content_type))
181        .route_layer(middleware::from_fn_with_state(
182            state.clone(),
183            auth::bearer_auth,
184        ));
185
186    // Shutdown route — requires Bearer token but no JSON body
187    let shutdown_route = Router::new()
188        .route("/shutdown", post(handlers::shutdown_handler))
189        .route_layer(middleware::from_fn_with_state(
190            state.clone(),
191            auth::bearer_auth,
192        ));
193
194    // Public routes — no authentication required
195    let public_routes = Router::new()
196        .route("/health", get(handlers::health))
197        .route("/metrics", get(handlers::metrics));
198
199    let app = Router::new()
200        .merge(hook_routes)
201        .merge(shutdown_route)
202        .merge(public_routes)
203        // SECURITY: 1MB body limit — reject oversized payloads with 413 before parsing
204        .layer(DefaultBodyLimit::max(1_048_576))
205        .with_state(state.clone());
206
207    axum::serve(listener, app)
208        .with_graceful_shutdown(async move {
209            tokio::select! {
210                _ = signal_handler() => {
211                    tracing::info!("received OS shutdown signal");
212                }
213                _ = shutdown_rx => {
214                    tracing::info!("received shutdown via /shutdown endpoint");
215                }
216            }
217        })
218        .await?;
219
220    // Capture final stats before releasing state
221    let uptime_secs = state.started_at.elapsed().as_secs();
222    let events = state
223        .event_counter
224        .load(std::sync::atomic::Ordering::Relaxed);
225
226    crate::logging::daemon_log::log_shutdown(uptime_secs, events);
227
228    // Release Arc so EventLogger's sender is dropped, signaling the writer task to exit.
229    // If handlers still hold Arc clones at shutdown, warn — log drain may be incomplete.
230    match Arc::try_unwrap(state) {
231        Ok(_state) => { /* sole owner — sender dropped cleanly */ }
232        Err(arc) => {
233            tracing::warn!(
234                strong_refs = Arc::strong_count(&arc),
235                "AppState still has references at shutdown — log drain may be incomplete"
236            );
237            drop(arc);
238        }
239    }
240    logger_handle.shutdown().await;
241
242    Ok((uptime_secs, events))
243}
244
245/// Format a duration in seconds as a human-readable uptime string.
246///
247/// Examples: `"45s"`, `"3m12s"`, `"2h14m"`
248pub fn format_uptime(secs: u64) -> String {
249    let hours = secs / 3600;
250    let minutes = (secs % 3600) / 60;
251    let seconds = secs % 60;
252    if hours > 0 {
253        format!("{}h{}m", hours, minutes)
254    } else if minutes > 0 {
255        format!("{}m{}s", minutes, seconds)
256    } else {
257        format!("{}s", seconds)
258    }
259}
260
261/// SECURITY: Reject non-JSON content types with 415 Unsupported Media Type.
262///
263/// Applied to POST routes before the JSON body extractor, so wrong content types
264/// are caught before axum's Json<T> returns 422.
265async fn require_json_content_type(request: Request, next: Next) -> Result<Response, StatusCode> {
266    let ct = request
267        .headers()
268        .get(CONTENT_TYPE)
269        .and_then(|v| v.to_str().ok())
270        .unwrap_or("");
271    if !ct.starts_with("application/json") {
272        return Err(StatusCode::UNSUPPORTED_MEDIA_TYPE);
273    }
274    Ok(next.run(request).await)
275}
276
277/// Wait for an OS shutdown signal (SIGTERM on Unix, Ctrl+C on all platforms).
278async fn signal_handler() {
279    #[cfg(unix)]
280    {
281        use tokio::signal::unix::{signal, SignalKind};
282        let mut sigterm =
283            signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
284        tokio::select! {
285            _ = tokio::signal::ctrl_c() => {}
286            _ = sigterm.recv() => {}
287        }
288    }
289    #[cfg(not(unix))]
290    {
291        tokio::signal::ctrl_c()
292            .await
293            .expect("failed to register ctrl_c handler");
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_format_uptime_seconds_only() {
303        assert_eq!(format_uptime(0), "0s");
304        assert_eq!(format_uptime(45), "45s");
305        assert_eq!(format_uptime(59), "59s");
306    }
307
308    #[test]
309    fn test_format_uptime_minutes_and_seconds() {
310        assert_eq!(format_uptime(60), "1m0s");
311        assert_eq!(format_uptime(192), "3m12s");
312        assert_eq!(format_uptime(3599), "59m59s");
313    }
314
315    #[test]
316    fn test_format_uptime_hours_and_minutes() {
317        assert_eq!(format_uptime(3600), "1h0m");
318        assert_eq!(format_uptime(8094), "2h14m");
319        assert_eq!(format_uptime(7200), "2h0m");
320    }
321}