use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Duration;
use tokio::sync::mpsc;
const METRIC_ENDPOINT: &str = "https://telemetry.composio.dev/v1/metrics/invocations";
const ERROR_ENDPOINT: &str = "https://telemetry.composio.dev/v1/errors";
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ErrorData {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub code: Option<String>,
#[serde(rename = "errorId", skip_serializing_if = "Option::is_none")]
pub error_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stack: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SourceData {
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service: Option<ServiceType>,
#[serde(skip_serializing_if = "Option::is_none")]
pub language: Option<LanguageType>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub platform: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub environment: Option<EnvironmentType>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ServiceType {
Sdk,
Apollo,
Hermes,
Thermos,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LanguageType {
Python,
Typescript,
Go,
Rust,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum EnvironmentType {
Development,
Production,
Ci,
Staging,
Test,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Metadata {
#[serde(rename = "projectId", skip_serializing_if = "Option::is_none")]
pub project_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provider: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TelemetryData {
#[serde(rename = "functionName")]
pub function_name: String,
#[serde(rename = "durationMs", skip_serializing_if = "Option::is_none")]
pub duration_ms: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub props: Option<HashMap<String, serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source: Option<SourceData>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<Metadata>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ErrorData>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EventType {
Metric,
Error,
}
pub type Event = (EventType, TelemetryData);
struct TelemetryState {
sender: mpsc::UnboundedSender<Event>,
}
static TELEMETRY: OnceLock<Arc<Mutex<Option<TelemetryState>>>> = OnceLock::new();
fn setup() -> Arc<Mutex<Option<TelemetryState>>> {
TELEMETRY
.get_or_init(|| {
let (tx, mut rx) = mpsc::unbounded_channel::<Event>();
tokio::spawn(async move {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(2))
.build()
.unwrap_or_default();
while let Some(event) = rx.recv().await {
push_to_server(&client, event).await;
}
});
Arc::new(Mutex::new(Some(TelemetryState { sender: tx })))
})
.clone()
}
async fn push_to_server(client: &reqwest::Client, event: Event) {
let (event_type, data) = event;
let result = match event_type {
EventType::Metric => {
client
.post(METRIC_ENDPOINT)
.json(&vec![data])
.send()
.await
}
EventType::Error => {
client
.post(ERROR_ENDPOINT)
.json(&data)
.send()
.await
}
};
if let Err(_e) = result {
#[cfg(feature = "local-debug")]
eprintln!("Telemetry error: {:?}", _e);
}
}
pub fn push_event(event: Event) {
let state = setup();
let guard = state.lock();
if let Ok(guard) = guard {
if let Some(telemetry) = guard.as_ref() {
let _ = telemetry.sender.send(event);
}
}
}
pub fn create_event(event_type: &str, data: TelemetryData) -> Event {
let typ = match event_type {
"error" => EventType::Error,
_ => EventType::Metric,
};
(typ, data)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_metric_event() {
let data = TelemetryData {
function_name: "test_function".to_string(),
duration_ms: Some(123.45),
..Default::default()
};
let event = create_event("metric", data.clone());
assert_eq!(event.0, EventType::Metric);
assert_eq!(event.1.function_name, "test_function");
assert_eq!(event.1.duration_ms, Some(123.45));
}
#[test]
fn test_create_error_event() {
let data = TelemetryData {
function_name: "test_function".to_string(),
error: Some(ErrorData {
name: "TestError".to_string(),
message: Some("Test error message".to_string()),
..Default::default()
}),
..Default::default()
};
let event = create_event("error", data.clone());
assert_eq!(event.0, EventType::Error);
assert!(event.1.error.is_some());
}
#[test]
fn test_push_event_does_not_panic() {
let data = TelemetryData {
function_name: "test".to_string(),
..Default::default()
};
let event = create_event("metric", data);
push_event(event);
}
}