Skip to main content

composio_sdk/models/
telemetry.rs

1//! Telemetry module for tracking SDK usage and errors
2//!
3//! This module provides functionality to collect and send telemetry data to Composio's
4//! telemetry service. It runs in a background thread and batches events to minimize
5//! performance impact.
6//!
7//! # Features
8//!
9//! - Non-blocking event collection
10//! - Background thread for sending telemetry
11//! - Automatic cleanup on shutdown
12//! - Silent failure (telemetry errors don't affect application)
13//!
14//! # Examples
15//!
16//! ```rust
17//! use composio_sdk::models::telemetry::{push_event, create_event, TelemetryData};
18//!
19//! // Create and push a metric event
20//! let event = create_event(
21//!     "metric",
22//!     TelemetryData {
23//!         function_name: "execute_tool".to_string(),
24//!         duration_ms: Some(150.5),
25//!         timestamp: Some(std::time::SystemTime::now()
26//!             .duration_since(std::time::UNIX_EPOCH)
27//!             .unwrap()
28//!             .as_secs_f64()),
29//!         ..Default::default()
30//!     }
31//! );
32//! push_event(event);
33//! ```
34
35use serde::{Deserialize, Serialize};
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex, OnceLock};
38use std::time::Duration;
39use tokio::sync::mpsc;
40
41/// Metrics endpoint
42const METRIC_ENDPOINT: &str = "https://telemetry.composio.dev/v1/metrics/invocations";
43
44/// Errors endpoint
45const ERROR_ENDPOINT: &str = "https://telemetry.composio.dev/v1/errors";
46
47/// Error data structure
48#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct ErrorData {
50    /// The name of the error
51    pub name: String,
52
53    /// The code of the error
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub code: Option<String>,
56
57    /// The error ID of the error
58    #[serde(rename = "errorId", skip_serializing_if = "Option::is_none")]
59    pub error_id: Option<String>,
60
61    /// The message of the error
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub message: Option<String>,
64
65    /// The stack trace of the error
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub stack: Option<String>,
68}
69
70/// Source/host information
71#[derive(Debug, Clone, Serialize, Deserialize, Default)]
72pub struct SourceData {
73    /// The name of the source/host
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub host: Option<String>,
76
77    /// The service of the source
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub service: Option<ServiceType>,
80
81    /// The language of the function that was invoked
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub language: Option<LanguageType>,
84
85    /// The version of the source
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub version: Option<String>,
88
89    /// The platform of the source
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub platform: Option<String>,
92
93    /// The environment of the source
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub environment: Option<EnvironmentType>,
96}
97
98/// Service type enumeration
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "lowercase")]
101pub enum ServiceType {
102    Sdk,
103    Apollo,
104    Hermes,
105    Thermos,
106}
107
108/// Language type enumeration
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum LanguageType {
112    Python,
113    Typescript,
114    Go,
115    Rust,
116}
117
118/// Environment type enumeration
119#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(rename_all = "lowercase")]
121pub enum EnvironmentType {
122    Development,
123    Production,
124    Ci,
125    Staging,
126    Test,
127}
128
129/// Runtime metadata
130#[derive(Debug, Clone, Serialize, Deserialize, Default)]
131pub struct Metadata {
132    /// The project ID of the source
133    #[serde(rename = "projectId", skip_serializing_if = "Option::is_none")]
134    pub project_id: Option<String>,
135
136    /// The provider used in the source
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub provider: Option<String>,
139}
140
141/// Telemetry data structure
142#[derive(Debug, Clone, Serialize, Deserialize, Default)]
143pub struct TelemetryData {
144    /// The name of the function that was invoked
145    #[serde(rename = "functionName")]
146    pub function_name: String,
147
148    /// The duration of the function invocation in milliseconds
149    #[serde(rename = "durationMs", skip_serializing_if = "Option::is_none")]
150    pub duration_ms: Option<f64>,
151
152    /// The timestamp of the function invocation in epoch seconds
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub timestamp: Option<f64>,
155
156    /// The properties of the function invocation
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub props: Option<HashMap<String, serde_json::Value>>,
159
160    /// Source of the metric
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub source: Option<SourceData>,
163
164    /// Runtime metadata
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub metadata: Option<Metadata>,
167
168    /// Error data
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub error: Option<ErrorData>,
171}
172
173/// Event type enumeration
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum EventType {
176    Metric,
177    Error,
178}
179
180/// Event tuple containing type and data
181pub type Event = (EventType, TelemetryData);
182
183/// Global telemetry state
184struct TelemetryState {
185    sender: mpsc::UnboundedSender<Event>,
186}
187
188static TELEMETRY: OnceLock<Arc<Mutex<Option<TelemetryState>>>> = OnceLock::new();
189
190/// Initialize the telemetry system
191fn setup() -> Arc<Mutex<Option<TelemetryState>>> {
192    TELEMETRY
193        .get_or_init(|| {
194            let (tx, mut rx) = mpsc::unbounded_channel::<Event>();
195
196            // Spawn background task to process events
197            tokio::spawn(async move {
198                let client = reqwest::Client::builder()
199                    .timeout(Duration::from_secs(2))
200                    .build()
201                    .unwrap_or_default();
202
203                while let Some(event) = rx.recv().await {
204                    push_to_server(&client, event).await;
205                }
206            });
207
208            Arc::new(Mutex::new(Some(TelemetryState { sender: tx })))
209        })
210        .clone()
211}
212
213/// Push event to telemetry server
214async fn push_to_server(client: &reqwest::Client, event: Event) {
215    let (event_type, data) = event;
216
217    let result = match event_type {
218        EventType::Metric => {
219            client
220                .post(METRIC_ENDPOINT)
221                .json(&vec![data])
222                .send()
223                .await
224        }
225        EventType::Error => {
226            client
227                .post(ERROR_ENDPOINT)
228                .json(&data)
229                .send()
230                .await
231        }
232    };
233
234    // Silently fail - telemetry shouldn't break the application
235    if let Err(_e) = result {
236        #[cfg(feature = "local-debug")]
237        eprintln!("Telemetry error: {:?}", _e);
238    }
239}
240
241/// Push an event to the telemetry queue
242///
243/// This function is non-blocking and will not fail if telemetry is unavailable.
244///
245/// # Examples
246///
247/// ```rust
248/// use composio_sdk::models::telemetry::{push_event, create_event, TelemetryData};
249///
250/// let event = create_event("metric", TelemetryData {
251///     function_name: "my_function".to_string(),
252///     ..Default::default()
253/// });
254/// push_event(event);
255/// ```
256pub fn push_event(event: Event) {
257    let state = setup();
258    let guard = state.lock();
259    if let Ok(guard) = guard {
260        if let Some(telemetry) = guard.as_ref() {
261            // Ignore send errors - telemetry is best-effort
262            let _ = telemetry.sender.send(event);
263        }
264    }
265}
266
267/// Create a telemetry event
268///
269/// # Examples
270///
271/// ```rust
272/// use composio_sdk::models::telemetry::{create_event, TelemetryData, EventType};
273///
274/// let event = create_event("metric", TelemetryData {
275///     function_name: "execute_tool".to_string(),
276///     duration_ms: Some(100.0),
277///     ..Default::default()
278/// });
279/// ```
280pub fn create_event(event_type: &str, data: TelemetryData) -> Event {
281    let typ = match event_type {
282        "error" => EventType::Error,
283        _ => EventType::Metric,
284    };
285    (typ, data)
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_create_metric_event() {
294        let data = TelemetryData {
295            function_name: "test_function".to_string(),
296            duration_ms: Some(123.45),
297            ..Default::default()
298        };
299
300        let event = create_event("metric", data.clone());
301        assert_eq!(event.0, EventType::Metric);
302        assert_eq!(event.1.function_name, "test_function");
303        assert_eq!(event.1.duration_ms, Some(123.45));
304    }
305
306    #[test]
307    fn test_create_error_event() {
308        let data = TelemetryData {
309            function_name: "test_function".to_string(),
310            error: Some(ErrorData {
311                name: "TestError".to_string(),
312                message: Some("Test error message".to_string()),
313                ..Default::default()
314            }),
315            ..Default::default()
316        };
317
318        let event = create_event("error", data.clone());
319        assert_eq!(event.0, EventType::Error);
320        assert!(event.1.error.is_some());
321    }
322
323    #[test]
324    fn test_push_event_does_not_panic() {
325        let data = TelemetryData {
326            function_name: "test".to_string(),
327            ..Default::default()
328        };
329        let event = create_event("metric", data);
330
331        // Should not panic even if telemetry system is not fully initialized
332        push_event(event);
333    }
334}