use std::time::{SystemTime, UNIX_EPOCH};
use std::sync::Arc;
use crate::client::ComposioClient;
use crate::models::telemetry::{
push_event, Event, EventType, SourceData, Metadata, TelemetryData, ErrorData,
ServiceType, LanguageType, EnvironmentType,
};
#[derive(Debug, Clone)]
pub enum Environment {
Development,
Production,
Staging,
}
impl Environment {
pub fn from_env() -> Self {
match std::env::var("ENVIRONMENT").as_deref() {
Ok("production") => Environment::Production,
Ok("staging") => Environment::Staging,
_ => Environment::Development,
}
}
pub fn as_str(&self) -> &str {
match self {
Environment::Development => "development",
Environment::Production => "production",
Environment::Staging => "staging",
}
}
}
#[derive(Debug, Clone)]
pub struct TelemetryContext {
pub allow_tracking: bool,
pub environment: Environment,
}
impl Default for TelemetryContext {
fn default() -> Self {
Self {
allow_tracking: true,
environment: Environment::from_env(),
}
}
}
impl TelemetryContext {
pub fn new(allow_tracking: bool) -> Self {
Self {
allow_tracking,
environment: Environment::from_env(),
}
}
pub fn disable_tracking(&mut self) {
self.allow_tracking = false;
}
pub fn enable_tracking(&mut self) {
self.allow_tracking = true;
}
}
pub trait Resource {
fn client(&self) -> &ComposioClient;
fn telemetry_context(&self) -> &TelemetryContext;
fn sanitize_payload<T>(&self, payload: T) -> T
where
T: serde::Serialize + serde::de::DeserializeOwned,
{
payload
}
fn provider(&self) -> Option<String> {
None
}
fn create_method_event(
&self,
function_name: &str,
provider: Option<&str>,
) -> Option<TelemetryData> {
if !self.telemetry_context().allow_tracking {
return None;
}
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let provider_name = provider
.map(|s| s.to_string())
.or_else(|| self.provider());
Some(TelemetryData {
function_name: function_name.to_string(),
duration_ms: None,
timestamp: Some(timestamp),
props: None,
source: Some(SourceData {
host: None,
service: Some(ServiceType::Sdk),
language: Some(LanguageType::Rust),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
platform: None,
environment: Some(match self.telemetry_context().environment {
Environment::Development => EnvironmentType::Development,
Environment::Production => EnvironmentType::Production,
Environment::Staging => EnvironmentType::Staging,
}),
}),
metadata: Some(Metadata {
project_id: None,
provider: provider_name,
}),
error: None,
})
}
fn push_telemetry_event(&self, event: Event) {
if self.telemetry_context().allow_tracking {
push_event(event);
}
}
fn trace_method<F, R>(&self, function_name: &str, provider: Option<&str>, f: F) -> R
where
F: FnOnce() -> R,
{
let mut telemetry_data = self.create_method_event(function_name, provider);
let start_time = SystemTime::now();
let result = f();
if let Some(ref mut data) = telemetry_data {
let duration_ms = SystemTime::now()
.duration_since(start_time)
.unwrap()
.as_millis() as f64;
data.duration_ms = Some(duration_ms);
self.push_telemetry_event((EventType::Metric, data.clone()));
}
result
}
fn trace_method_with_error<F, R, E>(
&self,
function_name: &str,
provider: Option<&str>,
f: F,
) -> Result<R, E>
where
F: FnOnce() -> Result<R, E>,
E: std::fmt::Display,
{
let mut telemetry_data = self.create_method_event(function_name, provider);
let start_time = SystemTime::now();
let result = f();
if let Some(ref mut data) = telemetry_data {
let duration_ms = SystemTime::now()
.duration_since(start_time)
.unwrap()
.as_millis() as f64;
data.duration_ms = Some(duration_ms);
let event_type = if let Err(ref e) = result {
data.error = Some(ErrorData {
name: std::any::type_name::<E>().to_string(),
code: None,
error_id: None,
message: Some(e.to_string()),
stack: None,
});
EventType::Error
} else {
EventType::Metric
};
self.push_telemetry_event((event_type, data.clone()));
}
result
}
}
#[derive(Clone)]
pub struct BaseResource {
pub client: Arc<ComposioClient>,
pub telemetry_context: TelemetryContext,
}
impl BaseResource {
pub fn new(client: Arc<ComposioClient>) -> Self {
Self {
client,
telemetry_context: TelemetryContext::default(),
}
}
pub fn with_telemetry_context(
client: Arc<ComposioClient>,
telemetry_context: TelemetryContext,
) -> Self {
Self {
client,
telemetry_context,
}
}
}
impl Resource for BaseResource {
fn client(&self) -> &ComposioClient {
&self.client
}
fn telemetry_context(&self) -> &TelemetryContext {
&self.telemetry_context
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_environment_from_env() {
let env = Environment::from_env();
assert_eq!(env.as_str(), "development");
}
#[test]
fn test_environment_as_str() {
assert_eq!(Environment::Development.as_str(), "development");
assert_eq!(Environment::Production.as_str(), "production");
assert_eq!(Environment::Staging.as_str(), "staging");
}
#[test]
fn test_telemetry_context_default() {
let ctx = TelemetryContext::default();
assert!(ctx.allow_tracking);
}
#[test]
fn test_telemetry_context_disable_enable() {
let mut ctx = TelemetryContext::default();
assert!(ctx.allow_tracking);
ctx.disable_tracking();
assert!(!ctx.allow_tracking);
ctx.enable_tracking();
assert!(ctx.allow_tracking);
}
#[test]
fn test_base_resource_creation() {
let client = Arc::new(
ComposioClient::builder()
.api_key("test_key")
.build()
.unwrap(),
);
let resource = BaseResource::new(client);
assert!(resource.telemetry_context.allow_tracking);
}
}