use serde_json::json;
use std::io::Write as _;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
const POSTHOG_HOST: &str = "https://us.i.posthog.com";
const POSTHOG_API_KEY: &str = match option_env!("POSTHOG_API_KEY") {
Some(v) => v,
None => "phc_Xpd1b1yslA6CCT4lTgYc14Chnj03jPDpfqglXerbHWt",
};
const SENTRY_DSN: &str = match option_env!("SENTRY_DSN") {
Some(v) => v,
None => "https://498029f4e2283ddb0522df1835527d82@o238879.ingest.us.sentry.io/4511091812663296",
};
const KILL_SWITCH_URL: &str = "https://telemetry.collet.dev/v1/kill-switch";
const KILL_SWITCH_CACHE_FILE: &str = "telemetry_kill_switch.json";
const KILL_SWITCH_CACHE_TTL_SECS: u64 = 24 * 60 * 60;
static GLOBAL_CLIENT: OnceLock<TelemetryClient> = OnceLock::new();
static KILL_SWITCH_ACTIVE: AtomicBool = AtomicBool::new(false);
#[derive(Clone)]
pub struct TelemetryClient {
inner: Arc<Inner>,
}
struct Inner {
enabled: bool,
error_reporting: bool,
analytics: bool,
http: reqwest::Client,
machine_id: String,
version: String,
collet_home: PathBuf,
}
pub fn init(config: &crate::config::Config) -> TelemetryClient {
let machine_id = load_or_create_machine_id(&config.collet_home);
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap_or_default();
let kill_switch_active = check_kill_switch_cached(&config.collet_home, &http);
KILL_SWITCH_ACTIVE.store(kill_switch_active, Ordering::Relaxed);
let client = TelemetryClient {
inner: Arc::new(Inner {
enabled: config.telemetry_enabled,
error_reporting: config.telemetry_error_reporting,
analytics: config.telemetry_analytics,
http,
machine_id,
version: env!("CARGO_PKG_VERSION").to_string(),
collet_home: config.collet_home.clone(),
}),
};
let _ = GLOBAL_CLIENT.set(client.clone());
let prev = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
capture_panic_global(info);
prev(info);
}));
upload_pending_crash(&client);
client
}
impl TelemetryClient {
pub fn track_event(&self, name: &str, properties: serde_json::Value) {
if cfg!(debug_assertions) {
tracing::debug!(
target: "telemetry",
event = %name,
properties = %properties,
"Telemetry event (debug build, not sent)"
);
return;
}
if !self.inner.enabled || !self.inner.analytics {
return;
}
if KILL_SWITCH_ACTIVE.load(Ordering::Relaxed) {
return;
}
let client = self.clone();
let name = name.to_string();
tokio::spawn(async move {
let url = format!("{}/capture/", POSTHOG_HOST);
let payload = json!({
"api_key": POSTHOG_API_KEY,
"event": name,
"distinct_id": client.inner.machine_id,
"properties": {
"$lib": "collet",
"$lib_version": client.inner.version,
},
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let mut payload = payload;
if let (Some(base), Some(extra)) = (
payload["properties"].as_object().cloned(),
properties.as_object(),
) {
let mut merged = base;
for (k, v) in extra {
merged.insert(k.clone(), v.clone());
}
payload["properties"] = json!(merged);
}
let _ = client.inner.http.post(&url).json(&payload).send().await;
});
}
pub fn capture_error(&self, error: &str, context: serde_json::Value) {
if cfg!(debug_assertions) {
tracing::debug!(
target: "telemetry",
error = %error,
"Telemetry error (debug build, not sent)"
);
return;
}
if !self.inner.enabled || !self.inner.error_reporting {
return;
}
if KILL_SWITCH_ACTIVE.load(Ordering::Relaxed) {
return;
}
let Some(url) = sentry_store_url() else {
return;
};
let client = self.clone();
let error = error.to_string();
tokio::spawn(async move {
let payload = build_error_payload(
&client.inner.machine_id,
&client.inner.version,
"error",
"Error",
&error,
None,
context,
);
let _ = client.inner.http.post(&url).json(&payload).send().await;
});
}
fn capture_panic(&self, info: &std::panic::PanicHookInfo<'_>) {
if !self.inner.enabled || !self.inner.error_reporting {
return;
}
let message = match info.payload().downcast_ref::<&str>() {
Some(s) => s.to_string(),
None => match info.payload().downcast_ref::<String>() {
Some(s) => s.clone(),
None => "unknown panic".to_string(),
},
};
let location = info
.location()
.map(|l| format!("{}:{}:{}", l.file(), l.line(), l.column()));
let payload = build_error_payload(
&self.inner.machine_id,
&self.inner.version,
"fatal",
"Panic",
&message,
location,
json!({}),
);
let logs_dir = self.inner.collet_home.join("logs");
let _ = std::fs::create_dir_all(&logs_dir);
let date = chrono::Utc::now().format("%Y-%m-%d");
let crash_path = logs_dir.join(format!("crash_{date}.jsonl"));
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&crash_path)
{
let line = serde_json::to_string(&payload).unwrap_or_default();
let _ = writeln!(file, "{line}");
}
}
pub fn is_enabled(&self) -> bool {
self.inner.enabled
}
}
pub fn global() -> Option<&'static TelemetryClient> {
GLOBAL_CLIENT.get()
}
pub fn track(name: &str, properties: serde_json::Value) {
if is_kill_switch_active() {
return;
}
if let Some(client) = global()
&& client.is_enabled()
{
client.track_event(name, properties);
}
}
pub fn error(message: &str, context: serde_json::Value) {
if let Some(client) = global() {
client.capture_error(message, context);
}
}
fn capture_panic_global(info: &std::panic::PanicHookInfo<'_>) {
if let Some(client) = GLOBAL_CLIENT.get() {
client.capture_panic(info);
}
}
fn upload_pending_crash(client: &TelemetryClient) {
let logs_dir = client.inner.collet_home.join("logs");
let Ok(entries) = std::fs::read_dir(&logs_dir) else {
return;
};
let Some(url) = sentry_store_url() else {
return;
};
for entry in entries.flatten() {
let path = entry.path();
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if !name.starts_with("crash_") || !name.ends_with(".jsonl") {
continue;
}
let Ok(data) = std::fs::read_to_string(&path) else {
let _ = std::fs::remove_file(&path);
continue;
};
let payloads: Vec<serde_json::Value> = data
.lines()
.filter_map(|l| serde_json::from_str(l).ok())
.collect();
if payloads.is_empty() {
let _ = std::fs::remove_file(&path);
continue;
}
let c = client.clone();
let url = url.clone();
tokio::spawn(async move {
for payload in payloads {
let _ = c.inner.http.post(&url).json(&payload).send().await;
}
});
let _ = std::fs::remove_file(&path);
}
}
fn check_kill_switch_cached(collet_home: &Path, _http: &reqwest::Client) -> bool {
let cache_path = collet_home.join(KILL_SWITCH_CACHE_FILE);
if let Ok(data) = std::fs::read_to_string(&cache_path)
&& let Ok(cached) = serde_json::from_str::<KillSwitchCache>(&data)
{
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if now.saturating_sub(cached.checked_at) < KILL_SWITCH_CACHE_TTL_SECS {
return cached.disabled;
}
}
if KILL_SWITCH_URL.contains("telemetry.collet.dev") {
return false;
}
false
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct KillSwitchCache {
disabled: bool,
checked_at: u64,
}
pub fn refresh_kill_switch() {
let Some(client) = GLOBAL_CLIENT.get() else {
return;
};
if !client.inner.enabled {
return;
}
let collet_home = client.inner.collet_home.clone();
let http = client.inner.http.clone();
tokio::spawn(async move {
let cache_path = collet_home.join(KILL_SWITCH_CACHE_FILE);
let disabled = match http
.get(KILL_SWITCH_URL)
.timeout(std::time::Duration::from_secs(2))
.send()
.await
{
Ok(response) => {
if let Ok(json) = response.json::<serde_json::Value>().await {
json.get("disabled")
.and_then(|v| v.as_bool())
.unwrap_or(false)
} else {
return;
}
}
Err(_) => {
return;
}
};
KILL_SWITCH_ACTIVE.store(disabled, Ordering::Relaxed);
let cache = KillSwitchCache {
disabled,
checked_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
let _ = std::fs::create_dir_all(&collet_home);
let _ = std::fs::write(
&cache_path,
serde_json::to_string(&cache).unwrap_or_default(),
);
});
}
pub fn is_kill_switch_active() -> bool {
KILL_SWITCH_ACTIVE.load(Ordering::Relaxed)
}
pub fn status_text() -> String {
let Some(client) = GLOBAL_CLIENT.get() else {
return "Telemetry: not initialized".to_string();
};
let i = &client.inner;
let kill = KILL_SWITCH_ACTIVE.load(Ordering::Relaxed);
let debug_build = cfg!(debug_assertions);
let placeholder = POSTHOG_API_KEY.is_empty();
let mut lines = Vec::with_capacity(16);
lines.push("── Telemetry Status ──".to_string());
lines.push(String::new());
let master = if !i.enabled {
"❌ Disabled"
} else if kill {
"⛔ Disabled (remote kill switch)"
} else {
"✅ Enabled"
};
lines.push(format!("Master: {master}"));
lines.push(format!(
"Analytics: {}",
if i.analytics { "✅ On" } else { "❌ Off" }
));
lines.push(format!(
"Error Reporting: {}",
if i.error_reporting {
"✅ On"
} else {
"❌ Off"
}
));
lines.push(String::new());
if debug_build {
lines.push("Data Sending: 🔇 Inactive (debug build)".to_string());
lines.push(" No data leaves your machine.".to_string());
} else if placeholder {
lines.push("Data Sending: 🔇 Inactive (no API key configured)".to_string());
lines.push(" No data leaves your machine.".to_string());
} else {
lines.push("Data Sending: 📡 Active".to_string());
}
lines.push(String::new());
lines.push(format!("Machine ID: {}", i.machine_id));
lines.push(format!("Version: {}", i.version));
lines.push(String::new());
let crash_path = i.collet_home.join("logs").join("crash_report.json");
if crash_path.exists() {
lines.push("Pending Crash: ⚠️ crash_report.json found".to_string());
} else {
lines.push("Pending Crash: None".to_string());
}
lines.push(String::new());
lines.push("── Collected Events ──".to_string());
lines.push("session_start model, mode, version, os, arch".to_string());
lines.push("session_end model, mode, outcome, iterations, duration".to_string());
lines.push("feature_used feature name (MCP server name, skill, subagent)".to_string());
lines.push("command_used slash command name".to_string());
lines.push("model_switch from/to model names".to_string());
lines.push("compaction pass, tokens before/after".to_string());
lines.push("tool_error tool name, model (no code content)".to_string());
lines.push(String::new());
lines.push("── NOT Collected ──".to_string());
lines.push("• Source code, file paths, prompts, API keys".to_string());
lines.push("• Personal information (username, IP, email)".to_string());
lines.push(String::new());
lines.push("── Disable ──".to_string());
lines.push("export COLLET_TELEMETRY=0".to_string());
lines.push("# or in config.toml:".to_string());
lines.push("[telemetry]".to_string());
lines.push("enabled = false".to_string());
lines.join("\n")
}
fn build_error_payload(
machine_id: &str,
version: &str,
level: &str,
error_type: &str,
message: &str,
location: Option<String>,
contexts: serde_json::Value,
) -> serde_json::Value {
let event_id = uuid::Uuid::new_v4().to_string().replace('-', "");
let mut exception = json!({
"type": error_type,
"value": message,
});
if let Some(loc) = location {
exception["stacktrace"] = json!({"frames": [{"function": loc}]});
}
json!({
"event_id": event_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
"platform": "rust",
"release": format!("collet@{version}"),
"server_name": machine_id,
"level": level,
"exception": { "values": [exception] },
"contexts": contexts,
})
}
fn load_or_create_machine_id(collet_home: &Path) -> String {
let path = collet_home.join("machine_id");
if let Ok(id) = std::fs::read_to_string(&path) {
let id = id.trim().to_string();
if !id.is_empty() {
return id;
}
}
let id = uuid::Uuid::new_v4().to_string();
let _ = std::fs::create_dir_all(collet_home);
let _ = std::fs::write(&path, &id);
id
}
fn sentry_store_url() -> Option<String> {
if SENTRY_DSN.is_empty() {
return None;
}
let without_scheme = SENTRY_DSN.strip_prefix("https://")?;
let (key, rest) = without_scheme.split_once('@')?;
let (host, project_id) = rest.rsplit_once('/')?;
if key.is_empty() || host.is_empty() || project_id.is_empty() {
return None;
}
Some(format!(
"https://{host}/api/{project_id}/store/?sentry_key={key}"
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_or_create_machine_id() {
let dir = tempfile::tempdir().unwrap();
let id1 = load_or_create_machine_id(dir.path());
assert!(!id1.is_empty());
let id2 = load_or_create_machine_id(dir.path());
assert_eq!(id1, id2);
}
#[test]
fn test_build_error_payload() {
let payload =
build_error_payload("mid", "1.0.0", "error", "TestErr", "boom", None, json!({}));
assert_eq!(payload["level"], "error");
assert_eq!(payload["exception"]["values"][0]["value"], "boom");
}
#[test]
fn test_sentry_store_url_parsing() {
let url = sentry_store_url();
assert!(url.is_some(), "default SENTRY_DSN should parse");
let url = url.unwrap();
assert!(url.contains("/api/"));
assert!(url.contains("/store/"));
assert!(url.contains("sentry_key="));
}
#[test]
fn test_posthog_api_key_present() {
assert!(
!POSTHOG_API_KEY.is_empty(),
"POSTHOG_API_KEY should have a default"
);
assert!(POSTHOG_API_KEY.starts_with("phc_"));
}
#[test]
fn test_build_error_payload_with_location() {
let payload = build_error_payload(
"machine123",
"0.1.0",
"fatal",
"Panic",
"index out of bounds",
Some("src/main.rs:42:10".to_string()),
json!({"thread": "main"}),
);
assert_eq!(payload["level"], "fatal");
assert_eq!(payload["exception"]["values"][0]["type"], "Panic");
assert_eq!(
payload["exception"]["values"][0]["value"],
"index out of bounds"
);
assert!(payload["exception"]["values"][0]["stacktrace"].is_object());
assert_eq!(payload["contexts"]["thread"], "main");
assert_eq!(payload["server_name"], "machine123");
assert_eq!(payload["platform"], "rust");
}
#[test]
fn test_machine_id_is_valid_uuid() {
let dir = tempfile::tempdir().unwrap();
let id = load_or_create_machine_id(dir.path());
assert!(
uuid::Uuid::parse_str(&id).is_ok(),
"machine_id should be valid UUID"
);
}
#[test]
fn test_disabled_client_skips_events() {
let dir = tempfile::tempdir().unwrap();
let client = TelemetryClient {
inner: Arc::new(Inner {
enabled: false,
error_reporting: true,
analytics: true,
http: reqwest::Client::new(),
machine_id: "test-id".to_string(),
version: "0.1.0".to_string(),
collet_home: dir.path().to_path_buf(),
}),
};
client.track_event("test_event", json!({"foo": "bar"}));
client.capture_error("test error", json!({}));
assert!(!client.is_enabled());
}
#[test]
fn test_analytics_disabled_skips_events() {
let dir = tempfile::tempdir().unwrap();
let client = TelemetryClient {
inner: Arc::new(Inner {
enabled: true,
error_reporting: true,
analytics: false, http: reqwest::Client::new(),
machine_id: "test-id".to_string(),
version: "0.1.0".to_string(),
collet_home: dir.path().to_path_buf(),
}),
};
client.track_event("test_event", json!({}));
assert!(client.inner.enabled);
assert!(!client.inner.analytics);
}
#[test]
fn test_error_reporting_disabled_skips_errors() {
let dir = tempfile::tempdir().unwrap();
let client = TelemetryClient {
inner: Arc::new(Inner {
enabled: true,
error_reporting: false, analytics: true,
http: reqwest::Client::new(),
machine_id: "test-id".to_string(),
version: "0.1.0".to_string(),
collet_home: dir.path().to_path_buf(),
}),
};
client.capture_error("test error", json!({}));
assert!(client.inner.enabled);
assert!(!client.inner.error_reporting);
}
#[test]
fn test_event_payload_structure() {
let expected_keys = [
"event",
"distinct_id",
"properties",
"timestamp",
"app_version",
];
let sample = json!({
"event": "session_start",
"distinct_id": "test-uuid",
"properties": {"mode": "agent", "model": "gpt-4"},
"timestamp": "2024-01-01T00:00:00Z",
"app_version": "0.1.0",
});
for key in expected_keys {
assert!(sample.get(key).is_some(), "Missing key: {}", key);
}
}
#[test]
fn test_status_text_contains_sections() {
let text = status_text();
assert!(
text.contains("Telemetry") || text.contains("not initialized"),
"status_text should contain Telemetry or not-initialized msg"
);
}
#[test]
fn test_session_end_payload_outcomes() {
let outcomes = ["success", "error", "guard_stop", "cancelled"];
for outcome in outcomes {
let payload = json!({
"event": "session_end",
"distinct_id": "test-id",
"properties": {
"outcome": outcome,
"model": "test-model",
"mode": "agent",
"iter": 5,
"secs": 60,
},
});
assert_eq!(payload["properties"]["outcome"], outcome);
}
}
}