use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde_json::{json, Map, Value};
use tokio::sync::mpsc;
use super::consent::Resolved;
use super::events::Event;
use super::super_props::SuperProps;
const CHANNEL_CAPACITY: usize = 1000;
const BATCH_SIZE: usize = 10;
const BATCH_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Debug, Clone)]
pub struct QueuedEvent {
pub name: String,
pub properties: Map<String, Value>,
}
#[derive(Clone)]
pub struct TelemetryHandle {
inner: Arc<Inner>,
}
struct Inner {
sender: Option<mpsc::Sender<QueuedEvent>>,
super_props: SuperProps,
last_sent_unix: AtomicU64,
events_captured: AtomicU64,
events_dropped: AtomicU64,
debug_stderr: bool,
disabled: AtomicBool,
}
impl TelemetryHandle {
pub fn disabled(agent_id: String, authenticated: bool) -> Self {
let inner = Inner {
sender: None,
super_props: SuperProps::new(agent_id, authenticated),
last_sent_unix: AtomicU64::new(0),
events_captured: AtomicU64::new(0),
events_dropped: AtomicU64::new(0),
debug_stderr: false,
disabled: AtomicBool::new(true),
};
Self {
inner: Arc::new(inner),
}
}
pub fn is_enabled(&self) -> bool {
!self.inner.disabled.load(Ordering::Relaxed) && self.inner.sender.is_some()
}
pub fn capture(&self, event: Event) {
if self.inner.disabled.load(Ordering::Relaxed) {
return;
}
let Some(sender) = self.inner.sender.as_ref() else {
return;
};
let mut properties = event.properties;
self.inner.super_props.merge_into(&mut properties);
let queued = QueuedEvent {
name: event.name,
properties,
};
if self.inner.debug_stderr {
emit_debug(&queued);
}
match sender.try_send(queued) {
Ok(()) => {
self.inner.events_captured.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
self.inner.events_dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn disable_now(&self) {
self.inner.disabled.store(true, Ordering::Relaxed);
}
pub fn last_sent_unix(&self) -> u64 {
self.inner.last_sent_unix.load(Ordering::Relaxed)
}
pub fn events_captured(&self) -> u64 {
self.inner.events_captured.load(Ordering::Relaxed)
}
pub fn events_dropped(&self) -> u64 {
self.inner.events_dropped.load(Ordering::Relaxed)
}
}
pub(super) struct ClientConfig {
pub resolved: Resolved,
pub super_props: SuperProps,
pub debug_stderr: bool,
pub baked_key_present: bool,
}
pub(super) fn start(cfg: ClientConfig) -> TelemetryHandle {
if !cfg.resolved.enabled() || !cfg.baked_key_present {
return TelemetryHandle {
inner: Arc::new(Inner {
sender: None,
super_props: cfg.super_props,
last_sent_unix: AtomicU64::new(0),
events_captured: AtomicU64::new(0),
events_dropped: AtomicU64::new(0),
debug_stderr: cfg.debug_stderr,
disabled: AtomicBool::new(true),
}),
};
}
let (tx, rx) = mpsc::channel::<QueuedEvent>(CHANNEL_CAPACITY);
let last_sent = Arc::new(AtomicU64::new(0));
let last_sent_bg = Arc::clone(&last_sent);
let debug = cfg.debug_stderr;
let http = super::network::build_client();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
run_batch_loop(rx, last_sent_bg, debug, http).await;
});
} else {
let _ = std::thread::Builder::new()
.name("openlatch-telemetry".into())
.spawn(move || {
if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
rt.block_on(run_batch_loop(rx, last_sent_bg, debug, http));
}
});
}
let inner = Inner {
sender: Some(tx),
super_props: cfg.super_props,
last_sent_unix: (*last_sent).load(Ordering::Relaxed).into(),
events_captured: AtomicU64::new(0),
events_dropped: AtomicU64::new(0),
debug_stderr: cfg.debug_stderr,
disabled: AtomicBool::new(false),
};
TelemetryHandle {
inner: Arc::new(inner),
}
}
async fn run_batch_loop(
mut rx: mpsc::Receiver<QueuedEvent>,
last_sent: Arc<AtomicU64>,
_debug: bool,
http: Option<reqwest::Client>,
) {
let mut batch: Vec<QueuedEvent> = Vec::with_capacity(BATCH_SIZE);
let mut deadline = Instant::now() + BATCH_INTERVAL;
loop {
let timeout = deadline.saturating_duration_since(Instant::now());
tokio::select! {
maybe_event = rx.recv() => {
match maybe_event {
Some(event) => {
batch.push(event);
if batch.len() >= BATCH_SIZE {
flush(&mut batch, &last_sent, http.as_ref()).await;
deadline = Instant::now() + BATCH_INTERVAL;
}
}
None => {
flush(&mut batch, &last_sent, http.as_ref()).await;
return;
}
}
}
_ = tokio::time::sleep(timeout) => {
if !batch.is_empty() {
flush(&mut batch, &last_sent, http.as_ref()).await;
}
deadline = Instant::now() + BATCH_INTERVAL;
}
}
}
}
async fn flush(
batch: &mut Vec<QueuedEvent>,
last_sent: &AtomicU64,
http: Option<&reqwest::Client>,
) {
if batch.is_empty() {
return;
}
if let Some(client) = http {
let _ok = super::network::post_batch(client, batch).await;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or_default();
last_sent.store(now, Ordering::Relaxed);
batch.clear();
}
fn emit_debug(event: &QueuedEvent) {
let envelope = json!({
"event": event.name,
"properties": event.properties,
});
eprintln!("[telemetry/debug] {}", envelope);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::telemetry::consent::{ConsentState, DecidedBy};
fn test_props() -> SuperProps {
SuperProps::new("agt_test".into(), false)
}
#[test]
fn test_disabled_handle_noop_capture() {
let h = TelemetryHandle::disabled("agt_x".into(), false);
h.capture(Event::cli_initialized("claude-code", 1, true));
assert_eq!(h.events_captured(), 0);
assert!(!h.is_enabled());
}
#[test]
fn test_start_with_disabled_consent_returns_noop() {
let cfg = ClientConfig {
resolved: Resolved {
state: ConsentState::Disabled,
decided_by: DecidedBy::ConfigFile,
},
super_props: test_props(),
debug_stderr: false,
baked_key_present: true,
};
let h = start(cfg);
assert!(!h.is_enabled());
}
#[test]
fn test_start_without_baked_key_returns_noop() {
let cfg = ClientConfig {
resolved: Resolved {
state: ConsentState::Enabled,
decided_by: DecidedBy::ConfigFile,
},
super_props: test_props(),
debug_stderr: false,
baked_key_present: false,
};
let h = start(cfg);
assert!(!h.is_enabled());
}
#[test]
fn test_start_without_ambient_runtime_spawns_self_hosted_thread() {
let cfg = ClientConfig {
resolved: Resolved {
state: ConsentState::Enabled,
decided_by: DecidedBy::ConfigFile,
},
super_props: test_props(),
debug_stderr: false,
baked_key_present: true,
};
let h = start(cfg);
assert!(h.is_enabled());
h.capture(Event::cli_initialized("claude-code", 1, true));
assert_eq!(h.events_captured(), 1);
drop(h);
}
#[tokio::test]
async fn test_enabled_handle_accepts_captures() {
let cfg = ClientConfig {
resolved: Resolved {
state: ConsentState::Enabled,
decided_by: DecidedBy::ConfigFile,
},
super_props: test_props(),
debug_stderr: false,
baked_key_present: true,
};
let h = start(cfg);
assert!(h.is_enabled());
h.capture(Event::cli_initialized("claude-code", 1, true));
tokio::task::yield_now().await;
assert_eq!(h.events_captured(), 1);
}
#[tokio::test]
async fn test_disable_now_stops_capture_mid_process() {
let cfg = ClientConfig {
resolved: Resolved {
state: ConsentState::Enabled,
decided_by: DecidedBy::ConfigFile,
},
super_props: test_props(),
debug_stderr: false,
baked_key_present: true,
};
let h = start(cfg);
h.disable_now();
h.capture(Event::cli_initialized("claude-code", 1, true));
assert_eq!(h.events_captured(), 0);
}
}