openlatch-client 0.1.6

The open-source security layer for AI agents — client forwarder
//! HTTP request handlers for all daemon routes.
//!
//! The ingest handler is CloudEvents v1.0.2 structured-mode aware:
//! 1. Validate Content-Type (accept application/cloudevents+json or
//!    application/cloudevents-batch+json; application/json during transition)
//! 2. Deserialize body as `EventEnvelope` (single) or `Vec<EventEnvelope>` (batch)
//! 3. For each envelope: dedup check, privacy filter on `data`, audit log,
//!    cloud forward, verdict decision
//! 4. Return a VerdictResponse for the first/only event — batch mode aggregates
//!    verdicts by returning the most-restrictive (deny > approve > allow).
//!
//! Unknown `source` / `type` strings are valid — the tagged-enum lens carries
//! them through as `Unknown(String)` and the client never rejects.

use std::sync::atomic::Ordering;
use std::sync::Arc;

use axum::{
    body::Bytes,
    extract::State,
    http::{header::CONTENT_TYPE, HeaderMap, StatusCode},
    Json,
};

use crate::daemon::AppState;
use crate::envelope::{
    new_event_id, AgentType, EventEnvelope, HookEventType, Verdict, VerdictResponse,
};
use crate::privacy;

/// Accepted Content-Types for CloudEvents ingest.
const CT_CLOUDEVENTS_SINGLE: &str = "application/cloudevents+json";
const CT_CLOUDEVENTS_BATCH: &str = "application/cloudevents-batch+json";
const CT_JSON: &str = "application/json";

/// POST /hooks — CloudEvents v1.0.2 structured-mode ingest.
///
/// Accepts a single envelope (object body) or a batch (array body) per the
/// CloudEvents HTTP binding spec. Dispatches each envelope through the
/// per-event pipeline and returns a single VerdictResponse.
pub async fn ingest_cloudevent(
    State(state): State<Arc<AppState>>,
    headers: HeaderMap,
    body: Bytes,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
    let start = std::time::Instant::now();

    let ct = headers
        .get(CONTENT_TYPE)
        .and_then(|v| v.to_str().ok())
        .unwrap_or("");

    let is_batch = ct.starts_with(CT_CLOUDEVENTS_BATCH);
    let is_single = ct.starts_with(CT_CLOUDEVENTS_SINGLE) || ct.starts_with(CT_JSON);
    if !is_batch && !is_single {
        return reject(
            StatusCode::UNSUPPORTED_MEDIA_TYPE,
            "unsupported Content-Type; expected application/cloudevents+json or application/cloudevents-batch+json",
            start,
        );
    }

    // Parse body — single envelope or array of envelopes.
    let envelopes: Vec<EventEnvelope> = if is_batch {
        match serde_json::from_slice::<Vec<EventEnvelope>>(&body) {
            Ok(v) if !v.is_empty() => v,
            Ok(_) => {
                return reject(
                    StatusCode::BAD_REQUEST,
                    "cloudevents-batch body must not be empty",
                    start,
                );
            }
            Err(e) => {
                return reject(
                    StatusCode::BAD_REQUEST,
                    &format!("invalid cloudevents batch body: {e}"),
                    start,
                );
            }
        }
    } else {
        match serde_json::from_slice::<EventEnvelope>(&body) {
            Ok(env) => vec![env],
            Err(e) => {
                return reject(
                    StatusCode::BAD_REQUEST,
                    &format!("invalid cloudevent body: {e}"),
                    start,
                );
            }
        }
    };

    // Process each envelope. In batch mode the response reflects the most
    // restrictive verdict (deny > approve > allow) so a single blocker halts
    // the whole batch from the agent's perspective.
    let mut worst: Option<(HookEventType, String)> = None;
    let mut response_headers = HeaderMap::new();
    for envelope in envelopes {
        let (ev_type, ev_id, dedup_hit) = process_envelope(state.clone(), envelope, start).await;
        if dedup_hit {
            response_headers.insert(
                "x-openlatch-dedup",
                "true".parse().expect("static header value is valid"),
            );
        }
        // Upgrade severity: Stop → Approve > anything else → Allow.
        // Unknown telemetry is tracked separately; for verdict we keep allow.
        let is_stop = matches!(ev_type, HookEventType::Stop);
        match &worst {
            None => worst = Some((ev_type, ev_id)),
            Some((HookEventType::Stop, _)) => {}
            Some(_) if is_stop => worst = Some((ev_type, ev_id)),
            _ => {}
        }
    }

    let (verdict_ev_type, verdict_ev_id) = worst.unwrap_or_else(|| {
        // Empty batch would have been rejected above; this branch is unreachable
        // but keep fail-open semantics just in case.
        (HookEventType::Unknown(String::new()), new_event_id())
    });

    let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
    let response = match verdict_ev_type {
        HookEventType::Stop => VerdictResponse::approve(verdict_ev_id, latency_ms),
        _ => VerdictResponse::allow(verdict_ev_id, latency_ms),
    };

    (StatusCode::OK, response_headers, Json(response))
}

/// Per-envelope pipeline: dedup, privacy-filter, log, cloud-forward.
///
/// Returns (type, id, dedup_hit) so the caller can build the aggregate
/// verdict and attach the `x-openlatch-dedup` response header.
async fn process_envelope(
    state: Arc<AppState>,
    mut envelope: EventEnvelope,
    _start: std::time::Instant,
) -> (HookEventType, String, bool) {
    let event_type = envelope.type_.clone();
    let event_id = envelope.id.clone();

    // Dedup by (subject, type, data-hash) — identical-content events fired
    // within 100ms land on the same dedup key. CloudEvent `id` is a retry-
    // level identifier; content dedup catches same-action duplicates.
    let subject = envelope.subject.clone().unwrap_or_else(|| "unknown".into());
    let type_str = envelope.type_.as_str().to_string();
    let data_for_dedup = envelope.data.clone().unwrap_or(serde_json::Value::Null);
    let is_duplicate = state
        .dedup
        .check_and_insert(&subject, &type_str, &data_for_dedup);

    if is_duplicate {
        tracing::debug!(
            code = crate::error::ERR_EVENT_DEDUPED,
            session_id = %subject,
            event_type = %type_str,
            "Event deduplicated within TTL window"
        );
        return (event_type, event_id, true);
    }

    // Unknown-variant telemetry: emit one anonymised counter per unknown
    // source or type string so we can promote high-volume unknowns to named
    // variants in future releases. No PII: only the string label + client
    // version. Fail-silent if telemetry is disabled or not initialised.
    if matches!(envelope.source, AgentType::Unknown(_)) {
        crate::telemetry::capture_hook_source_unknown(envelope.source.as_str());
    }
    if matches!(envelope.type_, HookEventType::Unknown(_)) {
        crate::telemetry::capture_hook_type_unknown(envelope.type_.as_str());
    }

    // SECURITY: Privacy filter runs BEFORE logging — never log unredacted
    // credentials. The filter operates on the opaque `data` payload.
    if let Some(data) = envelope.data.as_mut() {
        privacy::filter_event_with(data, &state.privacy_filter);
    }

    // Stamp missing-but-known metadata if the hook binary didn't set it.
    // Client version always wins over whatever the hook passed — it identifies
    // the forwarder, not the emitter.
    if envelope.os.is_none() {
        envelope.os = Some(crate::envelope::os_string().to_string());
    }
    if envelope.arch.is_none() {
        envelope.arch = Some(crate::envelope::arch_string().to_string());
    }
    envelope.clientversion = Some(env!("CARGO_PKG_VERSION").to_string());
    // datacontenttype defaults to application/json for every OpenLatch event.
    if envelope.datacontenttype.is_none() {
        envelope.datacontenttype = Some("application/json".to_string());
    }
    if envelope.localipv4.is_none() {
        envelope.localipv4 = state.local_ipv4;
    }
    if envelope.localipv6.is_none() {
        envelope.localipv6 = state.local_ipv6;
    }
    if envelope.publicipv4.is_none() {
        envelope.publicipv4 = state.public_ipv4;
    }
    if envelope.publicipv6.is_none() {
        envelope.publicipv6 = state.public_ipv6;
    }

    // Log the fully-enriched envelope (non-blocking — try_send drops event
    // if channel is full). The JSONL audit line is a valid CloudEvent plus
    // the olverdict/ollatencyms extension attrs, stamped in below.
    let mut envelope_value = serde_json::to_value(&envelope).unwrap_or(serde_json::Value::Null);
    let verdict_for_stored = match event_type {
        HookEventType::Stop => Verdict::Approve,
        _ => Verdict::Allow,
    };
    let latency_snapshot_ms = _start.elapsed().as_millis() as u64;
    if let Some(obj) = envelope_value.as_object_mut() {
        obj.insert(
            "olverdict".to_string(),
            serde_json::json!(match verdict_for_stored {
                Verdict::Allow => "allow",
                Verdict::Approve => "approve",
                Verdict::Deny => "deny",
            }),
        );
        obj.insert(
            "ollatencyms".to_string(),
            serde_json::json!(latency_snapshot_ms),
        );
    }
    let envelope_json = serde_json::to_string(&envelope_value).unwrap_or_default();
    state.event_logger.log(envelope_json);

    state.event_counter.fetch_add(1, Ordering::Relaxed);

    // CLOUD-01/CLOUD-09: fire-and-forget cloud forwarding via try_send.
    // The envelope passed to the worker omits olverdict/ollatencyms — the
    // platform stamps those itself when the event is processed.
    if let Some(tx) = &state.cloud_tx {
        let agent_id = state.config.agent_id.as_deref().unwrap_or_default();
        if agent_id.is_empty() {
            tracing::warn!(
                code = "OL-1200",
                "cloud forward skipped: [daemon].agent_id is empty in config.toml — run 'openlatch init' to populate it"
            );
        } else {
            let cloud_event = crate::cloud::CloudEvent {
                envelope: serde_json::to_value(&envelope).unwrap_or_default(),
                agent_id: agent_id.to_string(),
            };
            match tx.try_send(cloud_event) {
                Ok(()) => {}
                Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
                    tracing::warn!(code = "OL-1200", "cloud channel full - event dropped");
                }
                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                    tracing::warn!("cloud channel closed - worker has exited");
                }
            }
        }
    }

    (event_type, event_id, false)
}

fn reject(
    status: StatusCode,
    message: &str,
    start: std::time::Instant,
) -> (StatusCode, HeaderMap, Json<VerdictResponse>) {
    tracing::warn!(%message, http_status = %status, "cloudevents ingest rejected");
    let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
    // Fail-open: return an allow verdict even on malformed bodies so the
    // agent never blocks on daemon-internal validation failures.
    let response = VerdictResponse::allow(new_event_id(), latency_ms);
    (status, HeaderMap::new(), Json(response))
}

/// GET /health — liveness probe, no authentication required (DAEM-05).
///
/// Returns 200 with daemon status, version, and uptime in seconds.
pub async fn health(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
    let uptime_secs = state.started_at.elapsed().as_secs();
    Json(serde_json::json!({
        "status": "ok",
        "version": env!("CARGO_PKG_VERSION"),
        "uptime_secs": uptime_secs,
    }))
}

/// GET /metrics — event counters, no authentication required (DAEM-06).
///
/// Returns total events processed (deduped events excluded), uptime in seconds,
/// the available update version (if one was discovered at startup), and cloud
/// forwarding metrics (cloud_status, cloud_forwarded_count, cloud_last_sync_secs).
pub async fn metrics(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
    let events = state.event_counter.load(Ordering::Relaxed);
    let uptime_secs = state.started_at.elapsed().as_secs();
    let update_available = state.get_available_update();

    let (
        cloud_status,
        cloud_forwarded_count,
        cloud_last_sync_secs,
        cloud_drop_count,
        cloud_api_url,
    ) = if let Some(ref cs) = state.cloud_state {
        let forwarded = cs.forwarded_count();
        let last_sync = cs.last_sync_secs();
        let drops = cs.drop_count();
        let status = if cs.is_auth_error() {
            "auth_error"
        } else if cs.is_no_credential() {
            "no_credential"
        } else if cs.consecutive_drops() > 0 {
            "network_error"
        } else {
            "connected"
        };
        (
            status,
            forwarded,
            last_sync,
            drops,
            state.config.cloud.api_url.as_str(),
        )
    } else {
        ("not_configured", 0u64, 0u64, 0u64, "")
    };

    Json(serde_json::json!({
        "events_processed": events,
        "uptime_secs": uptime_secs,
        "update_available": update_available,
        "cloud_status": cloud_status,
        "cloud_forwarded_count": cloud_forwarded_count,
        "cloud_last_sync_secs": cloud_last_sync_secs,
        "cloud_drop_count": cloud_drop_count,
        "cloud_api_url": cloud_api_url,
    }))
}

/// POST /shutdown — graceful shutdown trigger, requires authentication (DAEM-07).
pub async fn shutdown_handler(State(state): State<Arc<AppState>>) -> StatusCode {
    let mut tx = state.shutdown_tx.lock().await;
    if let Some(sender) = tx.take() {
        let _ = sender.send(());
        StatusCode::OK
    } else {
        StatusCode::GONE
    }
}