use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
use tokio::sync::RwLock;
pub struct CpuProfiler {
profiles: Arc<RwLock<HashMap<String, ResolverCpuProfile>>>,
completed: Arc<RwLock<Vec<CompletedCpuProfile>>>,
system: Arc<RwLock<System>>,
config: CpuProfilingConfig,
pid: Pid,
}
#[derive(Debug, Clone)]
pub struct CpuProfilingConfig {
pub enable_detailed_profiling: bool,
pub sampling_interval: Duration,
pub max_profiles: usize,
pub retention_period: Duration,
pub warning_threshold_percent: f32,
}
impl Default for CpuProfilingConfig {
fn default() -> Self {
Self {
enable_detailed_profiling: true,
sampling_interval: Duration::from_millis(100),
max_profiles: 1000,
retention_period: Duration::from_secs(3600), warning_threshold_percent: 80.0,
}
}
}
#[derive(Debug, Clone)]
pub struct ResolverCpuProfile {
pub resolver_name: String,
pub field_path: String,
pub start_time: Instant,
pub start_timestamp: u64,
pub samples: Vec<CpuSample>,
pub initial_cpu_percent: f32,
pub peak_cpu_percent: f32,
}
impl ResolverCpuProfile {
pub fn new(resolver_name: String, field_path: String, initial_cpu: f32) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs();
Self {
resolver_name,
field_path,
start_time: Instant::now(),
start_timestamp: now,
samples: Vec::new(),
initial_cpu_percent: initial_cpu,
peak_cpu_percent: initial_cpu,
}
}
pub fn add_sample(&mut self, sample: CpuSample) {
self.peak_cpu_percent = self.peak_cpu_percent.max(sample.cpu_percent);
self.samples.push(sample);
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
pub fn average_cpu_percent(&self) -> f32 {
if self.samples.is_empty() {
self.initial_cpu_percent
} else {
let sum: f32 = self.samples.iter().map(|s| s.cpu_percent).sum();
sum / self.samples.len() as f32
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CpuSample {
pub offset_ms: u64,
pub cpu_percent: f32,
pub thread_count: usize,
}
impl CpuSample {
pub fn new(offset_ms: u64, cpu_percent: f32, thread_count: usize) -> Self {
Self {
offset_ms,
cpu_percent,
thread_count,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletedCpuProfile {
pub resolver_name: String,
pub field_path: String,
pub start_time: u64,
pub end_time: u64,
pub duration_ms: u64,
pub initial_cpu_percent: f32,
pub peak_cpu_percent: f32,
pub average_cpu_percent: f32,
pub sample_count: usize,
pub samples: Vec<CpuSample>,
pub metadata: HashMap<String, String>,
}
impl CompletedCpuProfile {
pub fn from_profile(profile: ResolverCpuProfile) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs();
let duration_ms = profile.elapsed().as_millis() as u64;
let average_cpu_percent = profile.average_cpu_percent();
let sample_count = profile.samples.len();
Self {
resolver_name: profile.resolver_name,
field_path: profile.field_path,
start_time: profile.start_timestamp,
end_time: now,
duration_ms,
initial_cpu_percent: profile.initial_cpu_percent,
peak_cpu_percent: profile.peak_cpu_percent,
average_cpu_percent,
sample_count,
samples: profile.samples,
metadata: HashMap::new(),
}
}
pub fn is_high_cpu(&self, threshold: f32) -> bool {
self.average_cpu_percent >= threshold
}
}
impl CpuProfiler {
pub fn new(config: CpuProfilingConfig) -> Self {
let pid = Pid::from_u32(std::process::id());
let refresh_kind = RefreshKind::nothing()
.with_cpu(CpuRefreshKind::everything())
.with_processes(ProcessRefreshKind::nothing().with_cpu());
Self {
profiles: Arc::new(RwLock::new(HashMap::new())),
completed: Arc::new(RwLock::new(Vec::new())),
system: Arc::new(RwLock::new(System::new_with_specifics(refresh_kind))),
config,
pid,
}
}
pub async fn start_profiling(
&self,
profile_id: String,
resolver_name: String,
field_path: String,
) -> Result<()> {
let mut system = self.system.write().await;
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[self.pid]),
false,
ProcessRefreshKind::nothing().with_cpu(),
);
let cpu_percent = system
.process(self.pid)
.map(|p| p.cpu_usage())
.unwrap_or(0.0);
drop(system);
let profile = ResolverCpuProfile::new(resolver_name, field_path, cpu_percent);
let mut profiles = self.profiles.write().await;
profiles.insert(profile_id, profile);
Ok(())
}
pub async fn record_sample(&self, profile_id: &str) -> Result<()> {
let mut system = self.system.write().await;
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[self.pid]),
false,
ProcessRefreshKind::nothing().with_cpu(),
);
let cpu_percent = system
.process(self.pid)
.map(|p| p.cpu_usage())
.unwrap_or(0.0);
let thread_count = system
.process(self.pid)
.map(|p| p.tasks().map(|t| t.len()).unwrap_or(1))
.unwrap_or(1);
drop(system);
let mut profiles = self.profiles.write().await;
if let Some(profile) = profiles.get_mut(profile_id) {
let offset_ms = profile.elapsed().as_millis() as u64;
let sample = CpuSample::new(offset_ms, cpu_percent, thread_count);
profile.add_sample(sample);
if cpu_percent > self.config.warning_threshold_percent {
tracing::warn!(
profile_id = %profile_id,
resolver = %profile.resolver_name,
cpu_percent = cpu_percent,
threshold = self.config.warning_threshold_percent,
"Resolver exceeded CPU warning threshold"
);
}
}
Ok(())
}
pub async fn stop_profiling(&self, profile_id: &str) -> Result<CompletedCpuProfile> {
let mut profiles = self.profiles.write().await;
let profile = profiles
.remove(profile_id)
.ok_or_else(|| anyhow::anyhow!("Profile not found: {}", profile_id))?;
let completed_profile = CompletedCpuProfile::from_profile(profile);
let mut completed = self.completed.write().await;
completed.push(completed_profile.clone());
self.cleanup_old_profiles(&mut completed).await;
Ok(completed_profile)
}
pub async fn get_profile(&self, profile_id: &str) -> Option<ResolverCpuProfile> {
let profiles = self.profiles.read().await;
profiles.get(profile_id).cloned()
}
pub async fn get_completed_profiles(&self) -> Vec<CompletedCpuProfile> {
let completed = self.completed.read().await;
completed.clone()
}
pub async fn get_profiles_by_resolver(&self, resolver_name: &str) -> Vec<CompletedCpuProfile> {
let completed = self.completed.read().await;
completed
.iter()
.filter(|p| p.resolver_name == resolver_name)
.cloned()
.collect()
}
pub async fn get_top_consumers(&self, limit: usize) -> Vec<CompletedCpuProfile> {
let completed = self.completed.read().await;
let mut sorted = completed.clone();
sorted.sort_by(|a, b| {
b.average_cpu_percent
.partial_cmp(&a.average_cpu_percent)
.unwrap_or(std::cmp::Ordering::Equal)
});
sorted.truncate(limit);
sorted
}
pub async fn get_statistics(&self) -> CpuStatistics {
let profiles = self.profiles.read().await;
let completed = self.completed.read().await;
let active_profiles = profiles.len();
let completed_profiles = completed.len();
let mut total_cpu_percent = 0.0;
let mut high_cpu_count = 0;
for profile in completed.iter() {
total_cpu_percent += profile.average_cpu_percent;
if profile.is_high_cpu(self.config.warning_threshold_percent) {
high_cpu_count += 1;
}
}
let avg_cpu_percent = if completed_profiles > 0 {
total_cpu_percent / completed_profiles as f32
} else {
0.0
};
CpuStatistics {
active_profiles,
completed_profiles,
avg_cpu_percent,
high_cpu_count,
}
}
async fn cleanup_old_profiles(&self, completed: &mut Vec<CompletedCpuProfile>) {
let retention_secs = self.config.retention_period.as_secs();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs();
completed.retain(|p| now - p.end_time < retention_secs);
if completed.len() > self.config.max_profiles {
let excess = completed.len() - self.config.max_profiles;
completed.drain(0..excess);
}
}
}
impl Default for CpuProfiler {
fn default() -> Self {
Self::new(CpuProfilingConfig::default())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CpuStatistics {
pub active_profiles: usize,
pub completed_profiles: usize,
pub avg_cpu_percent: f32,
pub high_cpu_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resolver_cpu_profile_creation() {
let profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 10.0);
assert_eq!(profile.resolver_name, "UserResolver");
assert_eq!(profile.field_path, "Query.user");
assert_eq!(profile.initial_cpu_percent, 10.0);
assert_eq!(profile.peak_cpu_percent, 10.0);
assert_eq!(profile.samples.len(), 0);
}
#[test]
fn test_resolver_cpu_profile_add_sample() {
let mut profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 10.0);
profile.add_sample(CpuSample::new(100, 20.0, 4));
profile.add_sample(CpuSample::new(200, 30.0, 4));
assert_eq!(profile.samples.len(), 2);
assert_eq!(profile.peak_cpu_percent, 30.0);
}
#[test]
fn test_resolver_cpu_profile_average() {
let mut profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 10.0);
profile.add_sample(CpuSample::new(100, 20.0, 4));
profile.add_sample(CpuSample::new(200, 30.0, 4));
profile.add_sample(CpuSample::new(300, 40.0, 4));
assert_eq!(profile.average_cpu_percent(), 30.0);
}
#[test]
fn test_cpu_sample_creation() {
let sample = CpuSample::new(100, 25.5, 4);
assert_eq!(sample.offset_ms, 100);
assert_eq!(sample.cpu_percent, 25.5);
assert_eq!(sample.thread_count, 4);
}
#[test]
fn test_completed_cpu_profile_from_profile() {
let mut profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 10.0);
profile.add_sample(CpuSample::new(100, 20.0, 4));
profile.add_sample(CpuSample::new(200, 30.0, 4));
let completed = CompletedCpuProfile::from_profile(profile);
assert_eq!(completed.resolver_name, "UserResolver");
assert_eq!(completed.field_path, "Query.user");
assert_eq!(completed.initial_cpu_percent, 10.0);
assert_eq!(completed.peak_cpu_percent, 30.0);
assert_eq!(completed.average_cpu_percent, 25.0);
assert_eq!(completed.sample_count, 2);
}
#[test]
fn test_completed_cpu_profile_is_high_cpu() {
let mut profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 10.0);
profile.add_sample(CpuSample::new(100, 80.0, 4));
profile.add_sample(CpuSample::new(200, 90.0, 4));
let completed = CompletedCpuProfile::from_profile(profile);
assert!(completed.is_high_cpu(80.0));
assert!(!completed.is_high_cpu(90.0));
}
#[tokio::test]
async fn test_cpu_profiler_start_stop() {
let profiler = CpuProfiler::default();
profiler
.start_profiling(
"profile-1".to_string(),
"UserResolver".to_string(),
"Query.user".to_string(),
)
.await
.expect("should succeed");
let profile = profiler.get_profile("profile-1").await;
assert!(profile.is_some());
let completed = profiler
.stop_profiling("profile-1")
.await
.expect("should succeed");
assert_eq!(completed.resolver_name, "UserResolver");
let profile = profiler.get_profile("profile-1").await;
assert!(profile.is_none());
}
#[tokio::test]
async fn test_cpu_profiler_record_sample() {
let profiler = CpuProfiler::default();
profiler
.start_profiling(
"profile-1".to_string(),
"UserResolver".to_string(),
"Query.user".to_string(),
)
.await
.expect("should succeed");
profiler
.record_sample("profile-1")
.await
.expect("should succeed");
profiler
.record_sample("profile-1")
.await
.expect("should succeed");
let profile = profiler
.get_profile("profile-1")
.await
.expect("should succeed");
assert!(profile.samples.len() >= 2);
}
#[tokio::test]
async fn test_cpu_profiler_completed_profiles() {
let profiler = CpuProfiler::default();
profiler
.start_profiling(
"profile-1".to_string(),
"UserResolver".to_string(),
"Query.user".to_string(),
)
.await
.expect("should succeed");
profiler
.stop_profiling("profile-1")
.await
.expect("should succeed");
let completed = profiler.get_completed_profiles().await;
assert_eq!(completed.len(), 1);
assert_eq!(completed[0].resolver_name, "UserResolver");
}
#[tokio::test]
async fn test_cpu_profiler_profiles_by_resolver() {
let profiler = CpuProfiler::default();
profiler
.start_profiling(
"profile-1".to_string(),
"UserResolver".to_string(),
"Query.user".to_string(),
)
.await
.expect("should succeed");
profiler
.stop_profiling("profile-1")
.await
.expect("should succeed");
profiler
.start_profiling(
"profile-2".to_string(),
"PostResolver".to_string(),
"Query.posts".to_string(),
)
.await
.expect("should succeed");
profiler
.stop_profiling("profile-2")
.await
.expect("should succeed");
let profiles = profiler.get_profiles_by_resolver("UserResolver").await;
assert_eq!(profiles.len(), 1);
assert_eq!(profiles[0].resolver_name, "UserResolver");
}
#[tokio::test]
async fn test_cpu_profiler_top_consumers() {
let profiler = CpuProfiler::default();
profiler
.start_profiling(
"profile-1".to_string(),
"Resolver1".to_string(),
"Query.low".to_string(),
)
.await
.expect("should succeed");
profiler
.stop_profiling("profile-1")
.await
.expect("should succeed");
profiler
.start_profiling(
"profile-2".to_string(),
"Resolver2".to_string(),
"Query.high".to_string(),
)
.await
.expect("should succeed");
profiler
.stop_profiling("profile-2")
.await
.expect("should succeed");
let top = profiler.get_top_consumers(1).await;
assert_eq!(top.len(), 1);
}
#[tokio::test]
async fn test_cpu_profiler_statistics() {
let profiler = CpuProfiler::default();
profiler
.start_profiling(
"profile-1".to_string(),
"UserResolver".to_string(),
"Query.user".to_string(),
)
.await
.expect("should succeed");
profiler
.start_profiling(
"profile-2".to_string(),
"PostResolver".to_string(),
"Query.posts".to_string(),
)
.await
.expect("should succeed");
profiler
.stop_profiling("profile-1")
.await
.expect("should succeed");
let stats = profiler.get_statistics().await;
assert_eq!(stats.active_profiles, 1); assert_eq!(stats.completed_profiles, 1);
}
#[tokio::test]
async fn test_cpu_profiler_not_found() {
let profiler = CpuProfiler::default();
let result = profiler.stop_profiling("nonexistent").await;
assert!(result.is_err());
let profile = profiler.get_profile("nonexistent").await;
assert!(profile.is_none());
}
#[test]
fn test_cpu_profiling_config() {
let config = CpuProfilingConfig::default();
assert!(config.enable_detailed_profiling);
assert_eq!(config.max_profiles, 1000);
assert_eq!(config.warning_threshold_percent, 80.0);
}
#[test]
fn test_resolver_cpu_profile_no_samples_average() {
let profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 15.0);
assert_eq!(profile.average_cpu_percent(), 15.0);
}
#[test]
fn test_completed_cpu_profile_metadata() {
let profile =
ResolverCpuProfile::new("UserResolver".to_string(), "Query.user".to_string(), 10.0);
let mut completed = CompletedCpuProfile::from_profile(profile);
completed
.metadata
.insert("user_id".to_string(), "123".to_string());
completed
.metadata
.insert("query_type".to_string(), "query".to_string());
assert_eq!(completed.metadata.get("user_id"), Some(&"123".to_string()));
assert_eq!(
completed.metadata.get("query_type"),
Some(&"query".to_string())
);
}
}