use crate::services::service_base::{Service, ServiceMetrics, ValidationError};
use anyhow::Result;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info, instrument, span, trace, Level};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryInput {
pub event_type: String,
pub service_name: String,
pub operation: String,
pub metrics: OperationMetrics,
pub tags: HashMap<String, String>,
pub properties: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryOutput {
pub event_id: String,
pub recorded_at: u64,
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationMetrics {
pub duration_ms: u64,
pub items_processed: u64,
pub memory_bytes: Option<u64>,
pub cpu_time_ms: Option<u64>,
pub success: bool,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ServiceTelemetryData {
pub service_name: String,
pub total_operations: u64,
pub successful_operations: u64,
pub failed_operations: u64,
pub total_duration_ms: u64,
pub avg_duration_ms: u64,
pub peak_memory_bytes: u64,
pub total_items_processed: u64,
pub success_rate: f64,
pub last_operation_at: u64,
pub operation_counts: HashMap<String, u64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemTelemetryData {
pub system_metrics: ServiceTelemetryData,
pub services: HashMap<String, ServiceTelemetryData>,
pub startup_time: u64,
pub uptime_seconds: u64,
}
#[derive(Debug)]
pub struct TelemetryService {
services: Arc<DashMap<String, ServiceTelemetryData>>,
startup_time: Instant,
event_counter: AtomicU64,
system_metrics: Arc<RwLock<ServiceMetrics>>,
}
impl Default for TelemetryService {
fn default() -> Self {
Self::new()
}
}
impl TelemetryService {
pub fn new() -> Self {
let startup_time = Instant::now();
info!("🔍 Initializing PMAT Telemetry Service");
Self {
services: Arc::new(DashMap::new()),
startup_time,
event_counter: AtomicU64::new(0),
system_metrics: Arc::new(RwLock::new(ServiceMetrics::default())),
}
}
#[instrument(skip(self), fields(service = %input.service_name, operation = %input.operation))]
pub async fn record_operation(&self, input: TelemetryInput) -> Result<TelemetryOutput> {
let event_id = Uuid::new_v4().to_string();
let recorded_at = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
self.update_service_telemetry(&input).await?;
self.update_system_metrics(&input).await?;
let event_count = self.event_counter.fetch_add(1, Ordering::Relaxed) + 1;
if input.metrics.success {
info!(
event_id = %event_id,
service = %input.service_name,
operation = %input.operation,
duration_ms = input.metrics.duration_ms,
items_processed = input.metrics.items_processed,
event_count = event_count,
"✅ Operation completed successfully"
);
} else {
error!(
event_id = %event_id,
service = %input.service_name,
operation = %input.operation,
duration_ms = input.metrics.duration_ms,
error = %input.metrics.error_message.as_deref().unwrap_or("Unknown error"),
event_count = event_count,
"❌ Operation failed"
);
}
trace!(
event_id = %event_id,
tags = ?input.tags,
properties = ?input.properties,
"🔍 Detailed telemetry event recorded"
);
Ok(TelemetryOutput {
event_id,
recorded_at,
success: true,
})
}
async fn update_service_telemetry(&self, input: &TelemetryInput) -> Result<()> {
let mut entry = self
.services
.entry(input.service_name.clone())
.or_insert_with(|| ServiceTelemetryData {
service_name: input.service_name.clone(),
..Default::default()
});
let data = entry.value_mut();
data.total_operations += 1;
if input.metrics.success {
data.successful_operations += 1;
} else {
data.failed_operations += 1;
}
data.total_duration_ms += input.metrics.duration_ms;
data.avg_duration_ms = data.total_duration_ms / data.total_operations;
data.total_items_processed += input.metrics.items_processed;
if let Some(memory_bytes) = input.metrics.memory_bytes {
data.peak_memory_bytes = data.peak_memory_bytes.max(memory_bytes);
}
data.success_rate = data.successful_operations as f64 / data.total_operations as f64;
data.last_operation_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
*data
.operation_counts
.entry(input.operation.clone())
.or_insert(0) += 1;
debug!(
service = %input.service_name,
total_ops = data.total_operations,
success_rate = data.success_rate,
"📊 Service telemetry updated"
);
Ok(())
}
async fn update_system_metrics(&self, input: &TelemetryInput) -> Result<()> {
let mut metrics = self
.system_metrics
.write()
.map_err(|_| anyhow::anyhow!("Failed to acquire system metrics lock"))?;
let duration = Duration::from_millis(input.metrics.duration_ms);
metrics.record_request(duration, input.metrics.success);
Ok(())
}
#[instrument(skip(self))]
pub async fn get_system_telemetry(&self) -> Result<SystemTelemetryData> {
let uptime_seconds = self.startup_time.elapsed().as_secs();
let startup_time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - uptime_seconds;
let _system_metrics = self
.system_metrics
.read()
.map_err(|_| anyhow::anyhow!("Failed to acquire system metrics lock"))?
.clone();
let mut services = HashMap::new();
let mut total_operations = 0;
let mut total_successful = 0;
let mut total_duration = 0;
let mut total_items = 0;
for entry in self.services.iter() {
let service_data = entry.value().clone();
total_operations += service_data.total_operations;
total_successful += service_data.successful_operations;
total_duration += service_data.total_duration_ms;
total_items += service_data.total_items_processed;
services.insert(entry.key().clone(), service_data);
}
let system_data = ServiceTelemetryData {
service_name: "PMAT_SYSTEM".to_string(),
total_operations,
successful_operations: total_successful,
failed_operations: total_operations - total_successful,
total_duration_ms: total_duration,
avg_duration_ms: if total_operations > 0 {
total_duration / total_operations
} else {
0
},
peak_memory_bytes: 0, total_items_processed: total_items,
success_rate: if total_operations > 0 {
total_successful as f64 / total_operations as f64
} else {
0.0
},
last_operation_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
operation_counts: HashMap::new(),
};
info!(
total_operations = total_operations,
success_rate = system_data.success_rate,
uptime_seconds = uptime_seconds,
"📈 System telemetry aggregated"
);
Ok(SystemTelemetryData {
system_metrics: system_data,
services,
startup_time,
uptime_seconds,
})
}
pub async fn get_service_telemetry(&self, service_name: &str) -> Option<ServiceTelemetryData> {
self.services.get(service_name).map(|entry| entry.clone())
}
#[cfg(test)]
pub fn reset(&self) {
self.services.clear();
self.event_counter.store(0, Ordering::Relaxed);
if let Ok(mut metrics) = self.system_metrics.write() {
*metrics = ServiceMetrics::default();
}
}
}
#[async_trait::async_trait]
impl Service for TelemetryService {
type Input = TelemetryInput;
type Output = TelemetryOutput;
type Error = anyhow::Error;
#[instrument(skip(self, input))]
async fn process(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
span!(Level::INFO, "telemetry_process", service = %input.service_name)
.in_scope(|| async { self.record_operation(input).await })
.await
}
fn validate_input(&self, input: &Self::Input) -> Result<(), ValidationError> {
if input.event_type.is_empty() {
return Err(ValidationError::MissingField {
field: "event_type".to_string(),
});
}
if input.service_name.is_empty() {
return Err(ValidationError::MissingField {
field: "service_name".to_string(),
});
}
if input.operation.is_empty() {
return Err(ValidationError::MissingField {
field: "operation".to_string(),
});
}
Ok(())
}
fn metrics(&self) -> ServiceMetrics {
self.system_metrics
.read()
.map(|m| m.clone())
.unwrap_or_default()
}
}
lazy_static::lazy_static! {
static ref TELEMETRY: Arc<TelemetryService> = Arc::new(TelemetryService::new());
}
#[must_use]
pub fn telemetry() -> Arc<TelemetryService> {
TELEMETRY.clone()
}
#[macro_export]
macro_rules! record_telemetry {
($service:expr, $operation:expr, $duration:expr, $success:expr) => {
record_telemetry!($service, $operation, $duration, $success, 1, HashMap::new())
};
($service:expr, $operation:expr, $duration:expr, $success:expr, $items:expr) => {
record_telemetry!(
$service,
$operation,
$duration,
$success,
$items,
HashMap::new()
)
};
($service:expr, $operation:expr, $duration:expr, $success:expr, $items:expr, $tags:expr) => {{
let input = $crate::services::telemetry_service::TelemetryInput {
event_type: "operation".to_string(),
service_name: $service.to_string(),
operation: $operation.to_string(),
metrics: $crate::services::telemetry_service::OperationMetrics {
duration_ms: $duration.as_millis() as u64,
items_processed: $items,
memory_bytes: None,
cpu_time_ms: None,
success: $success,
error_message: None,
},
tags: $tags,
properties: std::collections::HashMap::new(),
};
let _ = $crate::services::telemetry_service::telemetry()
.record_operation(input)
.await;
}};
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_telemetry_service_creation() {
let telemetry = TelemetryService::new();
let system_data = telemetry.get_system_telemetry().await.unwrap();
assert_eq!(system_data.system_metrics.total_operations, 0);
assert_eq!(system_data.services.len(), 0);
}
#[tokio::test]
async fn test_record_successful_operation() {
let telemetry = TelemetryService::new();
let input = TelemetryInput {
event_type: "test".to_string(),
service_name: "complexity_analyzer".to_string(),
operation: "analyze_file".to_string(),
metrics: OperationMetrics {
duration_ms: 150,
items_processed: 5,
memory_bytes: Some(1024),
cpu_time_ms: Some(100),
success: true,
error_message: None,
},
tags: HashMap::new(),
properties: HashMap::new(),
};
let output = telemetry.record_operation(input).await.unwrap();
assert!(output.success);
assert!(!output.event_id.is_empty());
let service_data = telemetry
.get_service_telemetry("complexity_analyzer")
.await
.unwrap();
assert_eq!(service_data.total_operations, 1);
assert_eq!(service_data.successful_operations, 1);
assert_eq!(service_data.total_items_processed, 5);
assert_eq!(service_data.success_rate, 1.0);
}
#[tokio::test]
async fn test_record_failed_operation() {
let telemetry = TelemetryService::new();
let input = TelemetryInput {
event_type: "test".to_string(),
service_name: "refactor_engine".to_string(),
operation: "refactor_function".to_string(),
metrics: OperationMetrics {
duration_ms: 500,
items_processed: 0,
memory_bytes: None,
cpu_time_ms: None,
success: false,
error_message: Some("Parse error".to_string()),
},
tags: HashMap::new(),
properties: HashMap::new(),
};
let output = telemetry.record_operation(input).await.unwrap();
assert!(output.success);
let service_data = telemetry
.get_service_telemetry("refactor_engine")
.await
.unwrap();
assert_eq!(service_data.total_operations, 1);
assert_eq!(service_data.failed_operations, 1);
assert_eq!(service_data.success_rate, 0.0);
}
#[tokio::test]
async fn test_system_telemetry_aggregation() {
let telemetry = TelemetryService::new();
let services = ["complexity_analyzer", "refactor_engine", "satd_detector"];
for service in &services {
let input = TelemetryInput {
event_type: "test".to_string(),
service_name: service.to_string(),
operation: "test_operation".to_string(),
metrics: OperationMetrics {
duration_ms: 100,
items_processed: 2,
memory_bytes: None,
cpu_time_ms: None,
success: true,
error_message: None,
},
tags: HashMap::new(),
properties: HashMap::new(),
};
telemetry.record_operation(input).await.unwrap();
}
let system_data = telemetry.get_system_telemetry().await.unwrap();
assert_eq!(system_data.system_metrics.total_operations, 3);
assert_eq!(system_data.services.len(), 3);
assert_eq!(system_data.system_metrics.success_rate, 1.0);
}
#[tokio::test]
async fn test_validation() {
let telemetry = TelemetryService::new();
let invalid_input = TelemetryInput {
event_type: "".to_string(), service_name: "test_service".to_string(),
operation: "test_op".to_string(),
metrics: OperationMetrics {
duration_ms: 100,
items_processed: 1,
memory_bytes: None,
cpu_time_ms: None,
success: true,
error_message: None,
},
tags: HashMap::new(),
properties: HashMap::new(),
};
let result = telemetry.validate_input(&invalid_input);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("event_type"));
}
#[tokio::test]
async fn test_global_telemetry_service() {
let telemetry1 = telemetry();
let telemetry2 = telemetry();
assert!(Arc::ptr_eq(&telemetry1, &telemetry2));
}
}
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}