openlatch-client 0.1.6

The open-source security layer for AI agents — client forwarder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
/// Daemon HTTP server — composes all leaf modules into the running service.
///
/// This module provides:
/// - [`AppState`]: shared state cloned into every axum handler via `Arc`
/// - [`start_server`]: entry point that builds the router, binds TCP, and runs to completion
/// - Signal handling: graceful shutdown on SIGTERM (Unix) or Ctrl+C (all platforms)
/// - Route layout: authenticated POST routes + unauthenticated GET routes
pub mod auth;
pub mod dedup;
pub mod handlers;
pub mod reconciler;
pub mod watcher;

use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};

use axum::{extract::DefaultBodyLimit, middleware, routing::get, routing::post, Router};
use secrecy::SecretString;
use tokio::net::TcpListener;

use crate::cloud::{CloudState, CredentialProvider};
use crate::config::Config;
use crate::core::logging::tamper_log::{TamperLogger, TamperLoggerHandle};
use crate::logging::EventLogger;
use crate::privacy::PrivacyFilter;
use crate::update;

// ---------------------------------------------------------------------------
// CredentialStore adapter for cloud worker CredentialProvider
// ---------------------------------------------------------------------------

/// Adapts a `crate::auth::CredentialStore` to the `CredentialProvider` trait
/// required by the cloud worker.
///
/// The adapter ignores `OlError` from `retrieve()` and converts it to `None`,
/// which causes the worker to skip POSTs (fail-open) until a valid key exists.
struct CredentialStoreAdapter {
    store: Arc<dyn crate::auth::CredentialStore>,
}

impl CredentialProvider for CredentialStoreAdapter {
    fn retrieve(&self) -> Option<SecretString> {
        match self.store.retrieve() {
            Ok(key) => Some(key),
            Err(e) => {
                tracing::debug!(
                    code = e.code,
                    "credential provider: retrieve failed — returning None to worker"
                );
                None
            }
        }
    }
}

/// Shared state injected into every axum handler via `Arc<AppState>`.
///
/// Fields are either inherently thread-safe (`AtomicU64`, `DashMap`, `mpsc::Sender`)
/// or wrapped in appropriate synchronization primitives.
pub struct AppState {
    /// Resolved daemon configuration (port, log dir, retention, etc.)
    pub config: Arc<Config>,
    /// Bearer token for authenticating POST requests.
    /// SECURITY: Never log this value.
    pub token: String,
    /// In-memory dedup store with 100ms TTL.
    pub dedup: dedup::DedupStore,
    /// Async event logger (sends to background writer task via mpsc).
    pub event_logger: EventLogger,
    /// Pre-compiled privacy filter for credential masking.
    pub privacy_filter: PrivacyFilter,
    /// Total events processed (not counting deduped duplicates).
    pub event_counter: AtomicU64,
    /// Oneshot sender for triggering graceful shutdown via POST /shutdown.
    /// Wrapped in Mutex so the handler can take ownership without `&mut self`.
    pub shutdown_tx: tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
    /// Wall-clock time when the daemon started (for uptime reporting).
    pub started_at: std::time::Instant,
    /// Latest available version string, populated by the async update check on startup.
    /// `None` means either the check has not completed yet, or the current version is latest.
    pub available_update: Mutex<Option<String>>,
    /// Cloud forwarding channel sender. `None` if cloud forwarding is disabled or unconfigured.
    ///
    /// Handlers call `try_send(CloudEvent)` — non-blocking, off the verdict critical path.
    /// CLOUD-01/CLOUD-09: fire-and-forget pattern.
    pub cloud_tx: Option<tokio::sync::mpsc::Sender<crate::cloud::CloudEvent>>,
    /// Shared cloud state: auth_error flag visible to the status command (CLOUD-08).
    /// `None` if cloud forwarding is disabled.
    pub cloud_state: Option<CloudState>,
    /// Machine's local (LAN) IPv4 address, resolved once at startup.
    pub local_ipv4: Option<std::net::Ipv4Addr>,
    /// Machine's local (LAN) IPv6 address, resolved once at startup.
    pub local_ipv6: Option<std::net::Ipv6Addr>,
    /// Machine's public (internet-facing) IPv4 address, resolved once at startup.
    pub public_ipv4: Option<std::net::Ipv4Addr>,
    /// Machine's public (internet-facing) IPv6 address, resolved once at startup.
    pub public_ipv6: Option<std::net::Ipv6Addr>,
    /// Async writer for `~/.openlatch/tamper.jsonl`. The reconciler sends
    /// detected/healed `TamperEvent`s through this on drift. `None` only in
    /// test builds that construct `AppState` directly without the daemon
    /// startup path.
    pub tamper_logger: Option<TamperLogger>,
}

impl AppState {
    /// Store a newly discovered available version.
    pub fn set_available_update(&self, version: String) {
        if let Ok(mut guard) = self.available_update.lock() {
            *guard = Some(version);
        }
    }

    /// Return the latest available version, if one has been discovered.
    pub fn get_available_update(&self) -> Option<String> {
        self.available_update.lock().ok().and_then(|g| g.clone())
    }
}

/// Start the daemon HTTP server and run until a shutdown signal is received.
///
/// Binds to `127.0.0.1:{config.port}`. The server shuts down gracefully on:
/// - SIGTERM (Unix) or Ctrl+C (all platforms)
/// - HTTP POST /shutdown (authenticated)
///
/// After shutdown, prints a summary to stderr and waits for the event logger to drain.
///
/// # Parameters
///
/// - `credential_store`: optional credential store for cloud forwarding.
///   When `Some`, the cloud worker is spawned if `config.cloud.enabled` is true.
///   When `None`, cloud forwarding is disabled regardless of config.
///
/// # Errors
///
/// Returns an error if the TCP listener cannot be bound (e.g., port in use).
pub async fn start_server(
    config: Config,
    token: String,
    credential_store: Option<Arc<dyn crate::auth::CredentialStore>>,
) -> anyhow::Result<(u64, u64)> {
    // SECURITY: Bind to 127.0.0.1 by default.
    // Only bind 0.0.0.0 inside Docker/container environments where the network
    // boundary provides isolation instead of the loopback interface.
    // OPENLATCH_BIND_ALL must be set to "true" or "1" (not merely present) to
    // avoid `OPENLATCH_BIND_ALL=false` silently enabling wide binding.
    let bind_host = match std::env::var("OPENLATCH_BIND_ALL").as_deref() {
        Ok("true") | Ok("1") => "0.0.0.0",
        _ => "127.0.0.1",
    };
    let bind_addr = format!("{}:{}", bind_host, config.port);
    let listener = TcpListener::bind(&bind_addr).await?;

    tracing::info!(
        port = config.port,
        addr = %bind_addr,
        "daemon listening"
    );

    // Write daemon.port file so the hook binary can discover the port
    if let Err(e) = crate::config::write_port_file(config.port) {
        tracing::warn!(error = %e, "failed to write daemon.port file");
    }

    serve_with_listener(listener, config, token, credential_store).await
}

/// Start the daemon with a pre-bound TCP listener.
///
/// Accepts an already-bound listener — useful for integration tests where port 0
/// is bound by the OS for a random free port, avoiding test conflicts.
///
/// # Errors
///
/// Returns an error if the axum server fails during operation.
pub async fn start_server_with_listener(
    listener: TcpListener,
    config: Config,
    token: String,
    credential_store: Option<Arc<dyn crate::auth::CredentialStore>>,
) -> anyhow::Result<(u64, u64)> {
    serve_with_listener(listener, config, token, credential_store).await
}

/// Internal implementation: serve HTTP on the given listener.
async fn serve_with_listener(
    listener: TcpListener,
    config: Config,
    token: String,
    credential_store: Option<Arc<dyn crate::auth::CredentialStore>>,
) -> anyhow::Result<(u64, u64)> {
    let log_dir = config.log_dir.clone();
    let (event_logger, logger_handle) = EventLogger::new(log_dir.clone());

    let privacy_filter = PrivacyFilter::new(&config.extra_patterns);

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();

    // Cloud forwarding setup (CLOUD-01, D-03):
    // Spawn the worker only when cloud is enabled AND a credential store is available.
    let (cloud_tx, cloud_state_opt) = if config.cloud.enabled {
        if let Some(store) = credential_store {
            let (tx, rx) = tokio::sync::mpsc::channel(config.cloud.channel_size);
            let cloud_state = CloudState::new();

            // Build cloud worker config from resolved daemon config
            let cloud_config = crate::cloud::CloudConfig {
                api_url: config.cloud.api_url.clone(),
                timeout_connect_ms: config.cloud.timeout_connect_ms,
                timeout_total_ms: config.cloud.timeout_total_ms,
                retry_count: config.cloud.retry_count,
                retry_delay_ms: config.cloud.retry_delay_ms,
                channel_size: config.cloud.channel_size,
                rate_limit_default_secs: 30,
                credential_poll_interval_ms: config.cloud.credential_poll_interval_ms,
            };

            let openlatch_dir = crate::config::openlatch_dir();
            let provider: Arc<dyn CredentialProvider> = Arc::new(CredentialStoreAdapter { store });
            let worker_state = cloud_state.clone();

            tokio::spawn(crate::cloud::worker::run_cloud_worker(
                rx,
                provider,
                cloud_config,
                worker_state,
                openlatch_dir,
            ));

            tracing::info!(
                api_url = %config.cloud.api_url,
                channel_size = config.cloud.channel_size,
                "cloud forwarding worker started"
            );

            (Some(tx), Some(cloud_state))
        } else {
            tracing::info!("cloud forwarding enabled in config but no credential store provided — cloud forwarding disabled");
            (None, None)
        }
    } else {
        (None, None)
    };

    let startup_started = std::time::Instant::now();

    // Detect host IPs once at startup (bounded to ~3s worst case per resolver).
    // See `src/core/net/mod.rs`: failures collapse to None, the daemon never blocks on this.
    let host_ips = crate::net::HostIps::detect().await;
    tracing::info!(
        local_ipv4 = host_ips
            .local_ipv4
            .map(|a| a.to_string())
            .as_deref()
            .unwrap_or("none"),
        local_ipv6 = host_ips
            .local_ipv6
            .map(|a| a.to_string())
            .as_deref()
            .unwrap_or("none"),
        public_ipv4 = host_ips
            .public_ipv4
            .map(|a| a.to_string())
            .as_deref()
            .unwrap_or("none"),
        public_ipv6 = host_ips
            .public_ipv6
            .map(|a| a.to_string())
            .as_deref()
            .unwrap_or("none"),
        "host ips detected"
    );

    // Build the tamper-evidence logger alongside the event logger. The
    // handle is bound in function scope so the background writer task
    // stays alive for the daemon's lifetime — dropping the handle would
    // close the channel and the task would exit.
    let openlatch_dir_for_tamper = crate::config::openlatch_dir();
    let (tamper_logger, _tamper_logger_handle): (TamperLogger, TamperLoggerHandle) =
        TamperLogger::new(openlatch_dir_for_tamper);

    let state = Arc::new(AppState {
        config: Arc::new(config.clone()),
        token,
        dedup: dedup::DedupStore::new(),
        event_logger,
        privacy_filter,
        event_counter: AtomicU64::new(0),
        shutdown_tx: tokio::sync::Mutex::new(Some(shutdown_tx)),
        started_at: std::time::Instant::now(),
        available_update: Mutex::new(None),
        cloud_tx,
        cloud_state: cloud_state_opt,
        local_ipv4: host_ips.local_ipv4,
        local_ipv6: host_ips.local_ipv6,
        public_ipv4: host_ips.public_ipv4,
        public_ipv6: host_ips.public_ipv6,
        tamper_logger: Some(tamper_logger),
    });

    // Telemetry: daemon_started — port + measured startup duration + cloud
    // forwarding state. Captured here once we're committed to serving (the
    // listener is bound, state is built); the actual `axum::serve` call
    // happens immediately below.
    crate::telemetry::capture_global(crate::telemetry::Event::daemon_started(
        state.config.port,
        startup_started
            .elapsed()
            .as_millis()
            .min(u128::from(u64::MAX)) as u64,
        state.config.cloud.enabled,
    ));

    // UPDT-01: Spawn async update check at startup (T-02-14: 2s timeout, non-blocking)
    if config.update.check {
        let current = env!("CARGO_PKG_VERSION").to_string();
        let state_for_update = state.clone();
        tokio::spawn(async move {
            if let Some(latest) = update::check_for_update(&current).await {
                tracing::warn!(code = crate::error::ERR_VERSION_OUTDATED, latest_version = %latest, "Update available: run `npx openlatch@latest`");
                state_for_update.set_available_update(latest);
            }
        });
    }

    // Spawn periodic dedup eviction to prevent unbounded memory growth
    let state_for_evict = state.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
        loop {
            interval.tick().await;
            state_for_evict.dedup.evict_expired();
        }
    });

    // Tamper-evidence: reconciler + reactive watcher + poll safety net.
    // Startup reconciliation runs BEFORE axum binds to catch Scenario-2 drift.
    let _watcher_guard;
    let _poll_handle;
    if let Ok(agent) = crate::hooks::detect_agent() {
        let settings_path = match &agent {
            crate::hooks::DetectedAgent::ClaudeCode { settings_path, .. } => settings_path.clone(),
        };
        let openlatch_dir = crate::config::openlatch_dir();
        let token_file = openlatch_dir.join("daemon.token");

        reconciler::run_startup_reconcile(&settings_path, &openlatch_dir, config.port);

        let (reconcile_tx, reconcile_rx) = tokio::sync::mpsc::channel(100);

        let sinks = reconciler::TamperSinks {
            logger: state.tamper_logger.clone(),
            cloud_tx: state.cloud_tx.clone(),
            agent_id: state.config.agent_id.clone().unwrap_or_default(),
            client_version: env!("CARGO_PKG_VERSION").to_string(),
        };

        let r = reconciler::Reconciler::new_with_sinks(
            reconcile_rx,
            settings_path.clone(),
            openlatch_dir,
            config.port,
            token_file,
            sinks,
        );
        tokio::spawn(r.run());

        _watcher_guard = match watcher::spawn_watcher(&settings_path, reconcile_tx.clone()) {
            Ok(w) => {
                tracing::info!("filesystem watcher active");
                Some(w)
            }
            Err(e) => {
                tracing::warn!(error = %e, "filesystem watcher failed — falling back to poll-only");
                None
            }
        };

        _poll_handle = Some(watcher::spawn_poll_fallback(reconcile_tx));
        tracing::info!("reconciler started (reactive watcher + 30s poll)");
    } else {
        _watcher_guard = None;
        _poll_handle = None;
    }

    // Hook route — single generic CloudEvents v1.0.2 ingest endpoint. The
    // handler validates Content-Type inline (accepts
    // application/cloudevents+json, application/cloudevents-batch+json, and
    // application/json during the transition) and parses the envelope as
    // either a single object or a JSON array.
    let hook_routes = Router::new()
        .route("/hooks", post(handlers::ingest_cloudevent))
        .route_layer(middleware::from_fn_with_state(
            state.clone(),
            auth::bearer_auth,
        ));

    // Shutdown route — requires Bearer token but no JSON body
    let shutdown_route = Router::new()
        .route("/shutdown", post(handlers::shutdown_handler))
        .route_layer(middleware::from_fn_with_state(
            state.clone(),
            auth::bearer_auth,
        ));

    // Public routes — no authentication required
    let public_routes = Router::new()
        .route("/health", get(handlers::health))
        .route("/metrics", get(handlers::metrics));

    let app = Router::new()
        .merge(hook_routes)
        .merge(shutdown_route)
        .merge(public_routes)
        // SECURITY: 1MB body limit — reject oversized payloads with 413 before parsing
        .layer(DefaultBodyLimit::max(1_048_576))
        .with_state(state.clone());

    axum::serve(listener, app)
        .with_graceful_shutdown(async move {
            tokio::select! {
                _ = signal_handler() => {
                    tracing::info!("received OS shutdown signal");
                }
                _ = shutdown_rx => {
                    tracing::info!("received shutdown via /shutdown endpoint");
                }
            }
        })
        .await?;

    // Capture final stats before releasing state
    let uptime_secs = state.started_at.elapsed().as_secs();
    let events = state
        .event_counter
        .load(std::sync::atomic::Ordering::Relaxed);

    crate::logging::daemon_log::log_shutdown(uptime_secs, events);
    crate::telemetry::capture_global(crate::telemetry::Event::daemon_stopped(uptime_secs, events));

    // Release Arc so EventLogger's sender is dropped, signaling the writer task to exit.
    // If handlers still hold Arc clones at shutdown, warn — log drain may be incomplete.
    match Arc::try_unwrap(state) {
        Ok(_state) => { /* sole owner — sender dropped cleanly */ }
        Err(arc) => {
            tracing::warn!(
                strong_refs = Arc::strong_count(&arc),
                "AppState still has references at shutdown — log drain may be incomplete"
            );
            drop(arc);
        }
    }
    logger_handle.shutdown().await;

    Ok((uptime_secs, events))
}

/// Format a duration in seconds as a human-readable uptime string.
///
/// Examples: `"45s"`, `"3m12s"`, `"2h14m"`
pub fn format_uptime(secs: u64) -> String {
    let hours = secs / 3600;
    let minutes = (secs % 3600) / 60;
    let seconds = secs % 60;
    if hours > 0 {
        format!("{}h{}m", hours, minutes)
    } else if minutes > 0 {
        format!("{}m{}s", minutes, seconds)
    } else {
        format!("{}s", seconds)
    }
}

/// Wait for an OS shutdown signal (SIGTERM on Unix, Ctrl+C on all platforms).
async fn signal_handler() {
    #[cfg(unix)]
    {
        use tokio::signal::unix::{signal, SignalKind};
        let mut sigterm =
            signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {}
            _ = sigterm.recv() => {}
        }
    }
    #[cfg(not(unix))]
    {
        tokio::signal::ctrl_c()
            .await
            .expect("failed to register ctrl_c handler");
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_openlatch_marker_detected_in_settings() {
        let with_hooks = r#"{"hooks": {"_openlatch": true, "preToolUse": []}}"#;
        assert!(with_hooks.contains("\"_openlatch\""));

        let without_hooks = r#"{"hooks": {"preToolUse": []}}"#;
        assert!(!without_hooks.contains("\"_openlatch\""));
    }

    #[test]
    fn test_format_uptime_seconds_only() {
        assert_eq!(format_uptime(0), "0s");
        assert_eq!(format_uptime(45), "45s");
        assert_eq!(format_uptime(59), "59s");
    }

    #[test]
    fn test_format_uptime_minutes_and_seconds() {
        assert_eq!(format_uptime(60), "1m0s");
        assert_eq!(format_uptime(192), "3m12s");
        assert_eq!(format_uptime(3599), "59m59s");
    }

    #[test]
    fn test_format_uptime_hours_and_minutes() {
        assert_eq!(format_uptime(3600), "1h0m");
        assert_eq!(format_uptime(8094), "2h14m");
        assert_eq!(format_uptime(7200), "2h0m");
    }
}