use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::path::PathBuf;
#[derive(Clone)]
pub struct TelemetryConfig {
pub endpoint: String,
pub batch_size: usize,
pub flush_interval: Duration,
pub enabled: bool,
pub sample_rate: f64,
pub db_path: PathBuf,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
endpoint: "https://analytics.reasonkit.sh".to_string(),
batch_size: 100,
flush_interval: Duration::from_secs(30),
enabled: std::env::var("RK_TELEMETRY").is_ok_and(|v| v == "1" || v == "true"),
sample_rate: 1.0,
db_path: Self::default_db_path(),
}
}
}
impl TelemetryConfig {
pub fn from_env() -> Self {
let mut config = Self::default();
if let Ok(endpoint) = std::env::var("RK_TELEMETRY_ENDPOINT") {
config.endpoint = endpoint;
}
if let Ok(enabled) = std::env::var("RK_TELEMETRY") {
config.enabled = enabled == "1" || enabled == "true";
}
if let Ok(sample_rate) = std::env::var("RK_TELEMETRY_SAMPLE_RATE") {
config.sample_rate = sample_rate.parse().unwrap_or(1.0);
}
if let Ok(db_path) = std::env::var("RK_TELEMETRY_DB") {
config.db_path = PathBuf::from(db_path);
}
config
}
pub fn default_db_path() -> PathBuf {
directories::ProjectDirs::from("sh", "reasonkit", "reasonkit")
.map(|dirs| dirs.data_dir().join("telemetry.db"))
.unwrap_or_else(|| PathBuf::from(".rk_telemetry.db"))
}
}
pub struct TelemetryStorage {
#[allow(dead_code)]
path: PathBuf,
}
impl TelemetryStorage {
pub async fn new(path: &std::path::Path) -> anyhow::Result<Self> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
Ok(Self {
path: path.to_path_buf(),
})
}
}
#[derive(Debug, Clone)]
pub struct Event {
pub name: String,
pub properties: HashMap<String, serde_json::Value>,
pub timestamp: u64,
}
impl Event {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
properties: HashMap::new(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
}
}
pub fn property<V: serde::Serialize>(mut self, key: impl Into<String>, value: V) -> Self {
if let Ok(v) = serde_json::to_value(value) {
self.properties.insert(key.into(), v);
}
self
}
}
#[allow(dead_code)]
pub struct Telemetry {
config: TelemetryConfig,
events: Arc<tokio::sync::Mutex<Vec<Event>>>,
circuit_open: Arc<AtomicBool>,
failure_count: Arc<AtomicU64>,
last_flush: Arc<tokio::sync::Mutex<Instant>>,
}
impl Telemetry {
pub fn new(config: TelemetryConfig) -> Self {
Self {
config,
events: Arc::new(tokio::sync::Mutex::new(Vec::new())),
circuit_open: Arc::new(AtomicBool::new(false)),
failure_count: Arc::new(AtomicU64::new(0)),
last_flush: Arc::new(tokio::sync::Mutex::new(Instant::now())),
}
}
pub async fn track(&self, event: Event) {
if !self.config.enabled || self.circuit_open.load(Ordering::Relaxed) {
return;
}
if self.config.sample_rate < 1.0 {
let pseudo_random = (event.timestamp % 1000) as f64 / 1000.0;
if pseudo_random > self.config.sample_rate {
return;
}
}
let mut events = self.events.lock().await;
events.push(event);
if events.len() >= self.config.batch_size {
drop(events);
self.flush().await;
}
}
pub async fn flush(&self) {
let events: Vec<Event> = {
let mut events = self.events.lock().await;
std::mem::take(&mut *events)
};
if events.is_empty() {
return;
}
#[cfg(debug_assertions)]
{
tracing::debug!("[telemetry] Flushing {} events", events.len());
}
let _ = events;
let mut last_flush = self.last_flush.lock().await;
*last_flush = Instant::now();
}
pub fn is_enabled(&self) -> bool {
self.config.enabled && !self.circuit_open.load(Ordering::Relaxed)
}
}
pub fn thinktool_event(
name: &str,
latency_ms: u64,
success: bool,
confidence: Option<f64>,
) -> Event {
let mut event = Event::new("thinktool_execution")
.property("thinktool", name)
.property("latency_ms", latency_ms)
.property("success", success);
if let Some(conf) = confidence {
event = event.property("confidence", conf);
}
event
}
pub fn api_event(endpoint: &str, method: &str, status: u16, latency_ms: u64) -> Event {
Event::new("api_request")
.property("endpoint", endpoint)
.property("method", method)
.property("status", status)
.property("latency_ms", latency_ms)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_creation() {
let event = Event::new("test")
.property("key", "value")
.property("count", 42);
assert_eq!(event.name, "test");
assert_eq!(event.properties.len(), 2);
}
#[tokio::test]
async fn test_telemetry_disabled() {
let config = TelemetryConfig {
enabled: false,
..Default::default()
};
let telemetry = Telemetry::new(config);
assert!(!telemetry.is_enabled());
}
}