openlatch-provider 0.1.0

Self-service onboarding CLI + runtime daemon for OpenLatch Editors and Providers
//! Telemetry handle + background batch task.
//!
//! Cloned in spirit from openlatch-client `src/core/telemetry/client.rs` —
//! same invariants:
//!   - I1 / I2: when consent is disabled, returns a no-op handle. No channel,
//!     no spawned task.
//!   - I4: drops are silent. No retry, no disk queue.
//!   - I10: failures never produce telemetry-about-telemetry events.

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use serde_json::{json, Map, Value};
use tokio::sync::mpsc;

use super::consent::Resolved;
use super::events::Event;
use super::super_props::SuperProps;

const CHANNEL_CAPACITY: usize = 1000;
const BATCH_SIZE: usize = 10;
const BATCH_INTERVAL: Duration = Duration::from_secs(1);

#[derive(Debug, Clone)]
pub struct QueuedEvent {
    pub name: String,
    pub properties: Map<String, Value>,
}

#[derive(Clone)]
pub struct TelemetryHandle {
    inner: Arc<Inner>,
}

struct Inner {
    sender: Option<mpsc::Sender<QueuedEvent>>,
    super_props: SuperProps,
    last_sent_unix: AtomicU64,
    events_captured: AtomicU64,
    events_dropped: AtomicU64,
    debug_stderr: bool,
    disabled: AtomicBool,
}

impl TelemetryHandle {
    pub fn disabled(machine_id: String, authenticated: bool) -> Self {
        let inner = Inner {
            sender: None,
            super_props: SuperProps::new(machine_id, authenticated),
            last_sent_unix: AtomicU64::new(0),
            events_captured: AtomicU64::new(0),
            events_dropped: AtomicU64::new(0),
            debug_stderr: false,
            disabled: AtomicBool::new(true),
        };
        Self {
            inner: Arc::new(inner),
        }
    }

    pub fn is_enabled(&self) -> bool {
        !self.inner.disabled.load(Ordering::Relaxed) && self.inner.sender.is_some()
    }

    pub fn capture(&self, event: Event) {
        if self.inner.disabled.load(Ordering::Relaxed) {
            return;
        }
        let Some(sender) = self.inner.sender.as_ref() else {
            return;
        };

        let mut properties = event.properties;
        self.inner.super_props.merge_into(&mut properties);

        let queued = QueuedEvent {
            name: event.name,
            properties,
        };

        if self.inner.debug_stderr {
            emit_debug(&queued);
        }

        match sender.try_send(queued) {
            Ok(()) => {
                self.inner.events_captured.fetch_add(1, Ordering::Relaxed);
            }
            Err(_) => {
                self.inner.events_dropped.fetch_add(1, Ordering::Relaxed);
            }
        }
    }

    pub fn disable_now(&self) {
        self.inner.disabled.store(true, Ordering::Relaxed);
    }

    pub fn last_sent_unix(&self) -> u64 {
        self.inner.last_sent_unix.load(Ordering::Relaxed)
    }

    pub fn events_captured(&self) -> u64 {
        self.inner.events_captured.load(Ordering::Relaxed)
    }

    pub fn events_dropped(&self) -> u64 {
        self.inner.events_dropped.load(Ordering::Relaxed)
    }
}

pub(super) struct ClientConfig {
    pub resolved: Resolved,
    pub super_props: SuperProps,
    pub debug_stderr: bool,
    pub baked_key_present: bool,
}

pub(super) fn start(cfg: ClientConfig) -> TelemetryHandle {
    if !cfg.resolved.enabled() || !cfg.baked_key_present {
        return TelemetryHandle {
            inner: Arc::new(Inner {
                sender: None,
                super_props: cfg.super_props,
                last_sent_unix: AtomicU64::new(0),
                events_captured: AtomicU64::new(0),
                events_dropped: AtomicU64::new(0),
                debug_stderr: cfg.debug_stderr,
                disabled: AtomicBool::new(true),
            }),
        };
    }

    let (tx, rx) = mpsc::channel::<QueuedEvent>(CHANNEL_CAPACITY);
    let last_sent = Arc::new(AtomicU64::new(0));
    let last_sent_bg = Arc::clone(&last_sent);
    let debug = cfg.debug_stderr;
    let http = super::network::build_client();

    if let Ok(handle) = tokio::runtime::Handle::try_current() {
        handle.spawn(async move {
            run_batch_loop(rx, last_sent_bg, debug, http).await;
        });
    } else {
        // Sync caller (e.g. test that built no runtime). Spin up a daemon
        // OS thread with its own current-thread runtime so the loop drains.
        let _ = std::thread::Builder::new()
            .name("openlatch-provider-telemetry".into())
            .spawn(move || {
                if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                {
                    rt.block_on(run_batch_loop(rx, last_sent_bg, debug, http));
                }
            });
    }

    let inner = Inner {
        sender: Some(tx),
        super_props: cfg.super_props,
        last_sent_unix: AtomicU64::new(0),
        events_captured: AtomicU64::new(0),
        events_dropped: AtomicU64::new(0),
        debug_stderr: cfg.debug_stderr,
        disabled: AtomicBool::new(false),
    };
    let _ = last_sent; // last_sent is owned by the bg task; main inner uses its own.
    TelemetryHandle {
        inner: Arc::new(inner),
    }
}

async fn run_batch_loop(
    mut rx: mpsc::Receiver<QueuedEvent>,
    last_sent: Arc<AtomicU64>,
    _debug: bool,
    http: Option<reqwest::Client>,
) {
    let mut batch: Vec<QueuedEvent> = Vec::with_capacity(BATCH_SIZE);
    let mut deadline = Instant::now() + BATCH_INTERVAL;

    loop {
        let timeout = deadline.saturating_duration_since(Instant::now());
        tokio::select! {
            maybe_event = rx.recv() => {
                match maybe_event {
                    Some(event) => {
                        batch.push(event);
                        if batch.len() >= BATCH_SIZE {
                            flush(&mut batch, &last_sent, http.as_ref()).await;
                            deadline = Instant::now() + BATCH_INTERVAL;
                        }
                    }
                    None => {
                        flush(&mut batch, &last_sent, http.as_ref()).await;
                        return;
                    }
                }
            }
            _ = tokio::time::sleep(timeout) => {
                if !batch.is_empty() {
                    flush(&mut batch, &last_sent, http.as_ref()).await;
                }
                deadline = Instant::now() + BATCH_INTERVAL;
            }
        }
    }
}

async fn flush(
    batch: &mut Vec<QueuedEvent>,
    last_sent: &AtomicU64,
    http: Option<&reqwest::Client>,
) {
    if batch.is_empty() {
        return;
    }
    if let Some(client) = http {
        let _ok = super::network::post_batch(client, batch).await;
    }
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or_default();
    last_sent.store(now, Ordering::Relaxed);
    batch.clear();
}

fn emit_debug(event: &QueuedEvent) {
    let envelope = json!({
        "event": event.name,
        "properties": event.properties,
    });
    eprintln!("[telemetry/debug] {}", envelope);
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::telemetry::consent::{ConsentState, DecidedBy};

    fn test_props() -> SuperProps {
        SuperProps::new("mach_test".into(), false)
    }

    #[test]
    fn disabled_handle_noop_capture() {
        let h = TelemetryHandle::disabled("mach_x".into(), false);
        h.capture(Event::cli_command_invoked("tools.list", "table", 0, 1));
        assert_eq!(h.events_captured(), 0);
        assert!(!h.is_enabled());
    }

    #[test]
    fn start_with_disabled_consent_returns_noop() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Disabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: true,
        };
        let h = start(cfg);
        assert!(!h.is_enabled());
    }

    #[test]
    fn start_without_baked_key_returns_noop() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Enabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: false,
        };
        let h = start(cfg);
        assert!(!h.is_enabled());
    }

    #[tokio::test]
    async fn enabled_handle_accepts_captures() {
        let cfg = ClientConfig {
            resolved: Resolved {
                state: ConsentState::Enabled,
                decided_by: DecidedBy::ConfigFile,
            },
            super_props: test_props(),
            debug_stderr: false,
            baked_key_present: true,
        };
        let h = start(cfg);
        assert!(h.is_enabled());
        h.capture(Event::cli_command_invoked("tools.list", "table", 0, 1));
        tokio::task::yield_now().await;
        assert_eq!(h.events_captured(), 1);
    }
}