openlatch-client 0.1.5

The open-source security layer for AI agents — client forwarder
//! Background client for the telemetry subsystem.
//!
//! Phase A scope: mpsc channel, background task scaffold, debug-mode stderr
//! emission, graceful drain on shutdown. No network path yet — `posthog-rs`
//! arrives in Task 3 along with the baked key.
//!
//! Invariants preserved here:
//! - I1 / I2: when consent resolves disabled, `init()` returns `None`. No
//!   channel, no task, no allocation on the hot path.
//! - I4: drops are silent. No disk queue, no retry state.
//! - I10: failures never produce telemetry events describing themselves.

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use serde_json::{json, Map, Value};
use tokio::sync::mpsc;

use super::consent::Resolved;
use super::events::Event;
use super::super_props::SuperProps;

const CHANNEL_CAPACITY: usize = 1000;
const BATCH_SIZE: usize = 10;
const BATCH_INTERVAL: Duration = Duration::from_secs(1);

/// Queued event waiting to be flushed. Super-properties are merged in at
/// enqueue time so the background task sees a fully-populated payload.
#[derive(Debug, Clone)]
pub struct QueuedEvent {
    pub name: String,
    pub properties: Map<String, Value>,
}

/// Handle returned from `init`. Drops are no-ops; `shutdown().await` drains
/// the queue and stops the background task cleanly.
///
/// Cheap to clone — the inner state is reference-counted.
#[derive(Clone)]
pub struct TelemetryHandle {
    inner: Arc<Inner>,
}

struct Inner {
    sender: Option<mpsc::Sender<QueuedEvent>>,
    super_props: SuperProps,
    last_sent_unix: AtomicU64,
    events_captured: AtomicU64,
    events_dropped: AtomicU64,
    debug_stderr: bool,
    disabled: AtomicBool,
}

impl TelemetryHandle {
    /// Build a disabled handle — `capture()` is a no-op. Used when consent
    /// resolves disabled, when the baked key is empty, or when the module
    /// was compiled with `full-cli-no-telemetry`.
    pub fn disabled(agent_id: String, authenticated: bool) -> Self {
        let inner = Inner {
            sender: None,
            super_props: SuperProps::new(agent_id, authenticated),
            last_sent_unix: AtomicU64::new(0),
            events_captured: AtomicU64::new(0),
            events_dropped: AtomicU64::new(0),
            debug_stderr: false,
            disabled: AtomicBool::new(true),
        };
        Self {
            inner: Arc::new(inner),
        }
    }

    pub fn is_enabled(&self) -> bool {
        !self.inner.disabled.load(Ordering::Relaxed) && self.inner.sender.is_some()
    }

    /// Non-blocking, non-fallible capture. Drops on full channel (I4).
    pub fn capture(&self, event: Event) {
        if self.inner.disabled.load(Ordering::Relaxed) {
            return;
        }
        let Some(sender) = self.inner.sender.as_ref() else {
            return;
        };

        let mut properties = event.properties;
        self.inner.super_props.merge_into(&mut properties);

        let queued = QueuedEvent {
            name: event.name,
            properties,
        };

        if self.inner.debug_stderr {
            emit_debug(&queued);
        }

        match sender.try_send(queued) {
            Ok(()) => {
                self.inner.events_captured.fetch_add(1, Ordering::Relaxed);
            }
            Err(_) => {
                self.inner.events_dropped.fetch_add(1, Ordering::Relaxed);
            }
        }
    }

    /// Switch this handle into a disabled state immediately — used by
    /// `telemetry purge` (I5) to stop capture within the current process.
    pub fn disable_now(&self) {
        self.inner.disabled.store(true, Ordering::Relaxed);
    }

    pub fn last_sent_unix(&self) -> u64 {
        self.inner.last_sent_unix.load(Ordering::Relaxed)
    }

    pub fn events_captured(&self) -> u64 {
        self.inner.events_captured.load(Ordering::Relaxed)
    }

    pub fn events_dropped(&self) -> u64 {
        self.inner.events_dropped.load(Ordering::Relaxed)
    }
}

/// Configuration resolved by `super::init()` and passed to the client.
pub(super) struct ClientConfig {
    pub resolved: Resolved,
    pub super_props: SuperProps,
    pub debug_stderr: bool,
    pub baked_key_present: bool,
}

/// Construct a live handle and spawn the background batch task. When the
/// runtime has no baked key or consent is disabled, returns a no-op handle
/// and never spawns a task (I1 / I2).
pub(super) fn start(cfg: ClientConfig) -> TelemetryHandle {
    if !cfg.resolved.enabled() || !cfg.baked_key_present {
        // Invariant I2: zero code path when disabled. No channel, no task.
        return TelemetryHandle {
            inner: Arc::new(Inner {
                sender: None,
                super_props: cfg.super_props,
                last_sent_unix: AtomicU64::new(0),
                events_captured: AtomicU64::new(0),
                events_dropped: AtomicU64::new(0),
                debug_stderr: cfg.debug_stderr,
                disabled: AtomicBool::new(true),
            }),
        };
    }

    let (tx, rx) = mpsc::channel::<QueuedEvent>(CHANNEL_CAPACITY);
    let last_sent = Arc::new(AtomicU64::new(0));

    // Run the batch loop regardless of caller context. If we're already
    // inside a tokio runtime (tests, `rt.block_on` closures) we spawn onto
    // it. Otherwise — the common case for the main `openlatch` binary, whose
    // `main()` is synchronous — we own a dedicated OS thread that builds its
    // own single-threaded runtime and drives the loop to completion.
    //
    // Shutdown is structural: when the final `TelemetryHandle` clone drops,
    // `Inner` drops, the mpsc `Sender` drops, `rx.recv()` in the batch loop
    // returns `None`, `run_batch_loop` exits, the owned runtime drops, and
    // the thread joins. No explicit stop signal needed.
    let last_sent_bg = Arc::clone(&last_sent);
    let debug = cfg.debug_stderr;
    // Build a long-lived reqwest client up front so the batch loop owns a
    // connection pool. `None` means "skip POSTs" — preserves I1 if the
    // builder fails for some unforeseen reason.
    let http = super::network::build_client();
    if let Ok(handle) = tokio::runtime::Handle::try_current() {
        handle.spawn(async move {
            run_batch_loop(rx, last_sent_bg, debug, http).await;
        });
    } else {
        let _ = std::thread::Builder::new()
            .name("openlatch-telemetry".into())
            .spawn(move || {
                // Thread spawn failures and runtime build failures are both
                // fatally silent (I10 — no telemetry about telemetry). On
                // failure the receiver drops, senders see the closed channel
                // and discard events (I4 — drops are silent).
                if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                {
                    rt.block_on(run_batch_loop(rx, last_sent_bg, debug, http));
                }
            });
    }

    let inner = Inner {
        sender: Some(tx),
        super_props: cfg.super_props,
        last_sent_unix: (*last_sent).load(Ordering::Relaxed).into(),
        events_captured: AtomicU64::new(0),
        events_dropped: AtomicU64::new(0),
        debug_stderr: cfg.debug_stderr,
        disabled: AtomicBool::new(false),
    };
    TelemetryHandle {
        inner: Arc::new(inner),
    }
}

/// Background loop: batch up to `BATCH_SIZE` events or up to `BATCH_INTERVAL`
/// elapsed, then "send" them. Phase A has no network path — events are
/// discarded after optional stderr echo. Task 3 adds the POST.
async fn run_batch_loop(
    mut rx: mpsc::Receiver<QueuedEvent>,
    last_sent: Arc<AtomicU64>,
    _debug: bool,
    http: Option<reqwest::Client>,
) {
    let mut batch: Vec<QueuedEvent> = Vec::with_capacity(BATCH_SIZE);
    let mut deadline = Instant::now() + BATCH_INTERVAL;

    loop {
        let timeout = deadline.saturating_duration_since(Instant::now());
        tokio::select! {
            maybe_event = rx.recv() => {
                match maybe_event {
                    Some(event) => {
                        batch.push(event);
                        if batch.len() >= BATCH_SIZE {
                            flush(&mut batch, &last_sent, http.as_ref()).await;
                            deadline = Instant::now() + BATCH_INTERVAL;
                        }
                    }
                    None => {
                        // Sender dropped — final drain and exit.
                        flush(&mut batch, &last_sent, http.as_ref()).await;
                        return;
                    }
                }
            }
            _ = tokio::time::sleep(timeout) => {
                if !batch.is_empty() {
                    flush(&mut batch, &last_sent, http.as_ref()).await;
                }
                deadline = Instant::now() + BATCH_INTERVAL;
            }
        }
    }
}

async fn flush(
    batch: &mut Vec<QueuedEvent>,
    last_sent: &AtomicU64,
    http: Option<&reqwest::Client>,
) {
    if batch.is_empty() {
        return;
    }
    if let Some(client) = http {
        let _ok = super::network::post_batch(client, batch).await;
    }
    // Stamp last-sent regardless of success — `telemetry status` shows
    // attempt timing, and we deliberately do not surface failures (I4).
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or_default();
    last_sent.store(now, Ordering::Relaxed);
    batch.clear();
}

fn emit_debug(event: &QueuedEvent) {
    let envelope = json!({
        "event": event.name,
        "properties": event.properties,
    });
    // eprintln! is appropriate here: this is explicit opt-in developer output
    // gated on `OPENLATCH_TELEMETRY_DEBUG=1`, not operational logging.
    eprintln!("[telemetry/debug] {}", envelope);
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::telemetry::consent::{ConsentState, DecidedBy};

    fn test_props() -> SuperProps {
        SuperProps::new("agt_test".into(), false)
    }

    #[test]
    fn test_disabled_handle_noop_capture() {
        let h = TelemetryHandle::disabled("agt_x".into(), false);
        h.capture(Event::cli_initialized("claude-code", 1, true));
        assert_eq!(h.events_captured(), 0);
        assert!(!h.is_enabled());
    }

    #[test]
    fn test_start_with_disabled_consent_returns_noop() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Disabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: true,
        };
        let h = start(cfg);
        assert!(!h.is_enabled());
    }

    #[test]
    fn test_start_without_baked_key_returns_noop() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Enabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: false,
        };
        let h = start(cfg);
        assert!(!h.is_enabled());
    }

    #[test]
    fn test_start_without_ambient_runtime_spawns_self_hosted_thread() {
        // Pure sync context — no `#[tokio::test]`. Proves the fallback
        // std::thread + current_thread runtime actually drives the batch
        // loop. Without the fix, the handle would be "enabled" but no
        // receiver would ever drain the channel.
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Enabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: true,
        };
        let h = start(cfg);
        assert!(h.is_enabled());
        h.capture(Event::cli_initialized("claude-code", 1, true));
        assert_eq!(h.events_captured(), 1);
        // Dropping the handle closes the channel; the batch thread sees
        // `rx.recv() -> None` and exits. We don't join the thread here
        // (it's daemonic by design) but the test completes cleanly, which
        // means the spawned thread didn't panic or deadlock.
        drop(h);
    }

    #[tokio::test]
    async fn test_enabled_handle_accepts_captures() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Enabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: true,
        };
        let h = start(cfg);
        assert!(h.is_enabled());
        h.capture(Event::cli_initialized("claude-code", 1, true));
        // Yield so the batch loop can observe the send.
        tokio::task::yield_now().await;
        assert_eq!(h.events_captured(), 1);
    }

    #[tokio::test]
    async fn test_disable_now_stops_capture_mid_process() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Enabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: true,
        };
        let h = start(cfg);
        h.disable_now();
        h.capture(Event::cli_initialized("claude-code", 1, true));
        assert_eq!(h.events_captured(), 0);
    }
}