openlatch-client 0.0.0

The open-source security layer for AI agents — client forwarder
Documentation
/// HTTP request handlers for all daemon routes.
///
/// Each handler follows the same event processing pipeline:
/// 1. Parse JSON body (axum handles deserialization errors as 422)
/// 2. Extract session_id, tool_name, tool_input from the raw event
/// 3. Dedup check — duplicates return a verdict with `X-OpenLatch-Dedup: true` header, not logged
/// 4. Apply privacy filter to tool_input (SECURITY: filter BEFORE logging)
/// 5. Build EventEnvelope with UUIDv7 event ID, timestamp, OS metadata
/// 6. Send to EventLogger (async, non-blocking via mpsc)
/// 7. Increment event counter
/// 8. Return VerdictResponse (always allow/approve in M1)
use std::sync::atomic::Ordering;
use std::sync::Arc;

use axum::{
    extract::State,
    http::{HeaderMap, StatusCode},
    Json,
};

use crate::daemon::AppState;
use crate::envelope::{
    arch_string, current_timestamp, new_event_id, AgentType, EventEnvelope, HookEventType, Verdict,
    VerdictResponse,
};
use crate::privacy;

/// POST /hooks/pre-tool-use — intercept tool calls before execution (DAEM-02).
///
/// Returns `allow` verdict. Body must be a JSON object (Claude Code PreToolUse event).
pub async fn pre_tool_use(
    State(state): State<Arc<AppState>>,
    Json(body): Json<serde_json::Value>,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
    let start = std::time::Instant::now();
    process_hook_event(state, body, HookEventType::PreToolUse, start).await
}

/// POST /hooks/user-prompt-submit — intercept user prompt submissions (DAEM-03).
///
/// Returns `allow` verdict.
pub async fn user_prompt_submit(
    State(state): State<Arc<AppState>>,
    Json(body): Json<serde_json::Value>,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
    let start = std::time::Instant::now();
    process_hook_event(state, body, HookEventType::UserPromptSubmit, start).await
}

/// POST /hooks/stop — intercept agent stop events (DAEM-04).
///
/// Returns `approve` verdict (Stop events require `approve`, not `allow`).
pub async fn stop(
    State(state): State<Arc<AppState>>,
    Json(body): Json<serde_json::Value>,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
    let start = std::time::Instant::now();
    process_hook_event(state, body, HookEventType::Stop, start).await
}

/// Shared event processing pipeline for all hook handlers.
///
/// Deduplication, privacy filtering, logging, and verdict construction are all
/// handled here to avoid duplication across the three hook endpoints.
async fn process_hook_event(
    state: Arc<AppState>,
    body: serde_json::Value,
    event_type: HookEventType,
    start: std::time::Instant,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
    let event_id = new_event_id();
    let mut headers = HeaderMap::new();

    // Extract event identity fields (best-effort — raw events are untrusted/variable shape)
    let session_id = body
        .get("session_id")
        .and_then(|v| v.as_str())
        .unwrap_or("unknown")
        .to_string();

    let tool_name = body
        .get("tool_name")
        .or_else(|| body.get("tool").and_then(|t| t.get("name")))
        .and_then(|v| v.as_str())
        .unwrap_or("unknown")
        .to_string();

    let tool_input = body
        .get("tool_input")
        .or_else(|| body.get("tool").and_then(|t| t.get("input")))
        .cloned()
        .unwrap_or(serde_json::Value::Null);

    // EVNT-06: Dedup check — duplicate events return verdict but are NOT logged
    let is_duplicate = state
        .dedup
        .check_and_insert(&session_id, &tool_name, &tool_input);

    let verdict = match event_type {
        HookEventType::Stop => Verdict::Approve,
        _ => Verdict::Allow,
    };

    if is_duplicate {
        // EVNT-07: Deduped events return the verdict but skip logging (OL-1003)
        tracing::debug!(
            code = crate::error::ERR_EVENT_DEDUPED,
            session_id,
            tool_name,
            "Event deduplicated within TTL window"
        );
        let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
        let response = match event_type {
            HookEventType::Stop => VerdictResponse::approve(event_id.clone(), latency_ms),
            _ => VerdictResponse::allow(event_id.clone(), latency_ms),
        };
        // D-09: Signal dedup to caller via response header
        headers.insert(
            "x-openlatch-dedup",
            "true".parse().expect("static header value is valid"),
        );
        return (StatusCode::OK, headers, Json(response));
    }

    // Extract user_prompt (for UserPromptSubmit events) and reason (for Stop events)
    let user_prompt = body
        .get("user_prompt")
        .or_else(|| body.get("prompt"))
        .and_then(|v| v.as_str())
        .map(|s| s.to_string());

    let reason = body
        .get("reason")
        .or_else(|| body.get("stopReason"))
        .and_then(|v| v.as_str())
        .map(|s| s.to_string());

    // SECURITY: Privacy filter runs BEFORE logging — never log unredacted credentials.
    // Pipeline order enforced here (T-01-05-06).
    let mut filtered_input = tool_input;
    privacy::filter_event_with(&mut filtered_input, &state.privacy_filter);

    // Also filter user_prompt through privacy filter
    let filtered_user_prompt = user_prompt.map(|p| {
        let mut val = serde_json::Value::String(p);
        privacy::filter_event_with(&mut val, &state.privacy_filter);
        val.as_str().unwrap_or_default().to_string()
    });

    // Build the audit envelope (EVNT-02)
    // latency_ms in the envelope is a snapshot before serialization; the response
    // carries the final measurement taken after all processing completes.
    let pre_log_latency_ms = start.elapsed().as_millis() as u64;
    let envelope = EventEnvelope {
        schema_version: "1.0".to_string(),
        id: event_id.clone(),
        timestamp: current_timestamp(),
        event_type,
        session_id,
        tool_name: Some(tool_name),
        tool_input: Some(filtered_input),
        user_prompt: filtered_user_prompt,
        reason,
        verdict,
        latency_ms: pre_log_latency_ms,
        agent_platform: AgentType::ClaudeCode, // M1: Claude Code only
        agent_version: body
            .get("agent_version")
            .and_then(|v| v.as_str())
            .map(|s| s.to_string()),
        os: crate::envelope::os_string().to_string(),
        arch: arch_string().to_string(),
        client_version: env!("CARGO_PKG_VERSION").to_string(),
    };

    // Log the envelope (non-blocking — try_send drops event if channel is full)
    let envelope_json = serde_json::to_string(&envelope).unwrap_or_default();
    state.event_logger.log(envelope_json);

    // Increment global event counter (relaxed ordering — approximate count is sufficient)
    state.event_counter.fetch_add(1, Ordering::Relaxed);

    // Measure latency AFTER all processing (dedup, privacy filter, envelope, serialization, log)
    let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
    let response = match event_type {
        HookEventType::Stop => VerdictResponse::approve(event_id, latency_ms),
        _ => VerdictResponse::allow(event_id, latency_ms),
    };

    (StatusCode::OK, headers, Json(response))
}

/// GET /health — liveness probe, no authentication required (DAEM-05).
///
/// Returns 200 with daemon status, version, and uptime in seconds.
pub async fn health(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
    let uptime_secs = state.started_at.elapsed().as_secs();
    Json(serde_json::json!({
        "status": "ok",
        "version": env!("CARGO_PKG_VERSION"),
        "uptime_secs": uptime_secs,
    }))
}

/// GET /metrics — event counters, no authentication required (DAEM-06).
///
/// Returns total events processed (deduped events excluded), uptime in seconds,
/// and the available update version (if one was discovered at startup).
pub async fn metrics(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
    let events = state.event_counter.load(Ordering::Relaxed);
    let uptime_secs = state.started_at.elapsed().as_secs();
    let update_available = state.get_available_update();
    Json(serde_json::json!({
        "events_processed": events,
        "uptime_secs": uptime_secs,
        "update_available": update_available,
    }))
}

/// POST /shutdown — graceful shutdown trigger, requires authentication (DAEM-07).
///
/// Sends on the oneshot channel that `start_server` is listening on.
/// Returns 200 on success, 410 Gone if shutdown is already in progress.
pub async fn shutdown_handler(State(state): State<Arc<AppState>>) -> StatusCode {
    let mut tx = state.shutdown_tx.lock().await;
    if let Some(sender) = tx.take() {
        let _ = sender.send(());
        StatusCode::OK
    } else {
        // Shutdown already initiated — idempotent response
        StatusCode::GONE
    }
}