1use serde::{Deserialize, Serialize};
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex, OnceLock};
38use std::time::Duration;
39use tokio::sync::mpsc;
40
41const METRIC_ENDPOINT: &str = "https://telemetry.composio.dev/v1/metrics/invocations";
43
44const ERROR_ENDPOINT: &str = "https://telemetry.composio.dev/v1/errors";
46
47#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct ErrorData {
50 pub name: String,
52
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub code: Option<String>,
56
57 #[serde(rename = "errorId", skip_serializing_if = "Option::is_none")]
59 pub error_id: Option<String>,
60
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub message: Option<String>,
64
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub stack: Option<String>,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, Default)]
72pub struct SourceData {
73 #[serde(skip_serializing_if = "Option::is_none")]
75 pub host: Option<String>,
76
77 #[serde(skip_serializing_if = "Option::is_none")]
79 pub service: Option<ServiceType>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub language: Option<LanguageType>,
84
85 #[serde(skip_serializing_if = "Option::is_none")]
87 pub version: Option<String>,
88
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub platform: Option<String>,
92
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub environment: Option<EnvironmentType>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "lowercase")]
101pub enum ServiceType {
102 Sdk,
103 Apollo,
104 Hermes,
105 Thermos,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum LanguageType {
112 Python,
113 Typescript,
114 Go,
115 Rust,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(rename_all = "lowercase")]
121pub enum EnvironmentType {
122 Development,
123 Production,
124 Ci,
125 Staging,
126 Test,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, Default)]
131pub struct Metadata {
132 #[serde(rename = "projectId", skip_serializing_if = "Option::is_none")]
134 pub project_id: Option<String>,
135
136 #[serde(skip_serializing_if = "Option::is_none")]
138 pub provider: Option<String>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, Default)]
143pub struct TelemetryData {
144 #[serde(rename = "functionName")]
146 pub function_name: String,
147
148 #[serde(rename = "durationMs", skip_serializing_if = "Option::is_none")]
150 pub duration_ms: Option<f64>,
151
152 #[serde(skip_serializing_if = "Option::is_none")]
154 pub timestamp: Option<f64>,
155
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub props: Option<HashMap<String, serde_json::Value>>,
159
160 #[serde(skip_serializing_if = "Option::is_none")]
162 pub source: Option<SourceData>,
163
164 #[serde(skip_serializing_if = "Option::is_none")]
166 pub metadata: Option<Metadata>,
167
168 #[serde(skip_serializing_if = "Option::is_none")]
170 pub error: Option<ErrorData>,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum EventType {
176 Metric,
177 Error,
178}
179
180pub type Event = (EventType, TelemetryData);
182
183struct TelemetryState {
185 sender: mpsc::UnboundedSender<Event>,
186}
187
188static TELEMETRY: OnceLock<Arc<Mutex<Option<TelemetryState>>>> = OnceLock::new();
189
190fn setup() -> Arc<Mutex<Option<TelemetryState>>> {
192 TELEMETRY
193 .get_or_init(|| {
194 let (tx, mut rx) = mpsc::unbounded_channel::<Event>();
195
196 tokio::spawn(async move {
198 let client = reqwest::Client::builder()
199 .timeout(Duration::from_secs(2))
200 .build()
201 .unwrap_or_default();
202
203 while let Some(event) = rx.recv().await {
204 push_to_server(&client, event).await;
205 }
206 });
207
208 Arc::new(Mutex::new(Some(TelemetryState { sender: tx })))
209 })
210 .clone()
211}
212
213async fn push_to_server(client: &reqwest::Client, event: Event) {
215 let (event_type, data) = event;
216
217 let result = match event_type {
218 EventType::Metric => {
219 client
220 .post(METRIC_ENDPOINT)
221 .json(&vec![data])
222 .send()
223 .await
224 }
225 EventType::Error => {
226 client
227 .post(ERROR_ENDPOINT)
228 .json(&data)
229 .send()
230 .await
231 }
232 };
233
234 if let Err(_e) = result {
236 #[cfg(feature = "local-debug")]
237 eprintln!("Telemetry error: {:?}", _e);
238 }
239}
240
241pub fn push_event(event: Event) {
257 let state = setup();
258 let guard = state.lock();
259 if let Ok(guard) = guard {
260 if let Some(telemetry) = guard.as_ref() {
261 let _ = telemetry.sender.send(event);
263 }
264 }
265}
266
267pub fn create_event(event_type: &str, data: TelemetryData) -> Event {
281 let typ = match event_type {
282 "error" => EventType::Error,
283 _ => EventType::Metric,
284 };
285 (typ, data)
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_create_metric_event() {
294 let data = TelemetryData {
295 function_name: "test_function".to_string(),
296 duration_ms: Some(123.45),
297 ..Default::default()
298 };
299
300 let event = create_event("metric", data.clone());
301 assert_eq!(event.0, EventType::Metric);
302 assert_eq!(event.1.function_name, "test_function");
303 assert_eq!(event.1.duration_ms, Some(123.45));
304 }
305
306 #[test]
307 fn test_create_error_event() {
308 let data = TelemetryData {
309 function_name: "test_function".to_string(),
310 error: Some(ErrorData {
311 name: "TestError".to_string(),
312 message: Some("Test error message".to_string()),
313 ..Default::default()
314 }),
315 ..Default::default()
316 };
317
318 let event = create_event("error", data.clone());
319 assert_eq!(event.0, EventType::Error);
320 assert!(event.1.error.is_some());
321 }
322
323 #[test]
324 fn test_push_event_does_not_panic() {
325 let data = TelemetryData {
326 function_name: "test".to_string(),
327 ..Default::default()
328 };
329 let event = create_event("metric", data);
330
331 push_event(event);
333 }
334}