use std::sync::atomic::AtomicU64;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::time::interval;
#[derive(Debug, Clone)]
pub struct ProfilingConfig {
pub enable_cpu: bool,
pub enable_memory: bool,
pub enable_io: bool,
pub cpu_sample_rate: u64,
pub memory_sample_rate: u64,
pub collection_interval: Duration,
pub max_profiles: usize,
pub output_dir: Option<String>,
}
impl Default for ProfilingConfig {
fn default() -> Self {
Self {
enable_cpu: std::env::var("RS3GW_PROFILING_CPU")
.map(|v| v == "true" || v == "1")
.unwrap_or(false),
enable_memory: std::env::var("RS3GW_PROFILING_MEMORY")
.map(|v| v == "true" || v == "1")
.unwrap_or(false),
enable_io: std::env::var("RS3GW_PROFILING_IO")
.map(|v| v == "true" || v == "1")
.unwrap_or(false),
cpu_sample_rate: std::env::var("RS3GW_PROFILING_CPU_RATE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100), memory_sample_rate: std::env::var("RS3GW_PROFILING_MEMORY_RATE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1000), collection_interval: std::env::var("RS3GW_PROFILING_INTERVAL")
.ok()
.and_then(|s| s.parse().ok())
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(60)), max_profiles: std::env::var("RS3GW_PROFILING_MAX_PROFILES")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(24), output_dir: std::env::var("RS3GW_PROFILING_OUTPUT_DIR").ok(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CpuStats {
pub total_cpu_time_ns: u64,
pub user_time_ns: u64,
pub system_time_ns: u64,
pub context_switches: u64,
pub cpu_usage_percent: f64,
}
#[derive(Debug, Clone, Default)]
pub struct MemoryStats {
pub allocated_bytes: u64,
pub peak_allocated_bytes: u64,
pub total_allocations: u64,
pub total_deallocations: u64,
pub rss_bytes: u64,
pub virtual_bytes: u64,
}
#[derive(Debug, Clone, Default)]
pub struct IoStats {
pub bytes_read: u64,
pub bytes_written: u64,
pub read_ops: u64,
pub write_ops: u64,
pub read_latency_us: u64,
pub write_latency_us: u64,
}
#[derive(Debug, Clone)]
pub struct ProfileSnapshot {
pub timestamp: SystemTime,
pub cpu: CpuStats,
pub memory: MemoryStats,
pub io: IoStats,
}
pub struct Profiler {
config: ProfilingConfig,
cpu_stats: Arc<RwLock<CpuStats>>,
memory_stats: Arc<RwLock<MemoryStats>>,
io_stats: Arc<RwLock<IoStats>>,
snapshots: Arc<RwLock<Vec<ProfileSnapshot>>>,
#[allow(dead_code)] last_cpu_time: Arc<AtomicU64>,
last_measurement: Arc<RwLock<Instant>>,
}
impl Profiler {
pub fn new(config: ProfilingConfig) -> Self {
Self {
config,
cpu_stats: Arc::new(RwLock::new(CpuStats::default())),
memory_stats: Arc::new(RwLock::new(MemoryStats::default())),
io_stats: Arc::new(RwLock::new(IoStats::default())),
snapshots: Arc::new(RwLock::new(Vec::new())),
last_cpu_time: Arc::new(AtomicU64::new(0)),
last_measurement: Arc::new(RwLock::new(Instant::now())),
}
}
pub fn start(self: Arc<Self>) {
let profiler = self.clone();
tokio::spawn(async move {
profiler.run_profiling_loop().await;
});
}
async fn run_profiling_loop(&self) {
let mut interval_timer = interval(self.config.collection_interval);
loop {
interval_timer.tick().await;
if self.config.enable_cpu || self.config.enable_memory || self.config.enable_io {
self.collect_snapshot();
}
}
}
fn collect_snapshot(&self) {
let now = Instant::now();
let elapsed = {
let last = self.last_measurement.read().ok();
match last {
Some(guard) => now.duration_since(*guard),
None => Duration::from_secs(1),
}
};
if self.config.enable_cpu {
self.update_cpu_stats(elapsed);
}
if self.config.enable_memory {
self.update_memory_stats();
}
if self.config.enable_io {
self.update_io_stats();
}
let snapshot = ProfileSnapshot {
timestamp: SystemTime::now(),
cpu: self
.cpu_stats
.read()
.ok()
.map(|g| g.clone())
.unwrap_or_default(),
memory: self
.memory_stats
.read()
.ok()
.map(|g| g.clone())
.unwrap_or_default(),
io: self
.io_stats
.read()
.ok()
.map(|g| g.clone())
.unwrap_or_default(),
};
if let Ok(mut snapshots) = self.snapshots.write() {
snapshots.push(snapshot);
if snapshots.len() > self.config.max_profiles {
snapshots.remove(0);
}
}
if let Ok(mut last) = self.last_measurement.write() {
*last = now;
}
tracing::debug!("Profile snapshot collected");
}
fn update_cpu_stats(&self, elapsed: Duration) {
#[cfg(target_os = "linux")]
{
if let Ok(stat) = std::fs::read_to_string("/proc/self/stat") {
let fields: Vec<&str> = stat.split_whitespace().collect();
if fields.len() > 14 {
let utime = fields[13].parse::<u64>().unwrap_or(0);
let stime = fields[14].parse::<u64>().unwrap_or(0);
let clock_ticks_per_sec = 100u64;
let ns_per_tick = 1_000_000_000 / clock_ticks_per_sec;
let total_time_ns = (utime + stime) * ns_per_tick;
let last_time_ns = self
.last_cpu_time
.load(std::sync::atomic::Ordering::Relaxed);
let cpu_delta = total_time_ns.saturating_sub(last_time_ns);
let elapsed_ns = elapsed.as_nanos() as u64;
let cpu_usage = if elapsed_ns > 0 {
(cpu_delta as f64 / elapsed_ns as f64) * 100.0
} else {
0.0
};
if let Ok(mut stats) = self.cpu_stats.write() {
stats.total_cpu_time_ns = total_time_ns;
stats.user_time_ns = utime * ns_per_tick;
stats.system_time_ns = stime * ns_per_tick;
stats.cpu_usage_percent = cpu_usage;
}
self.last_cpu_time
.store(total_time_ns, std::sync::atomic::Ordering::Relaxed);
}
}
}
#[cfg(not(target_os = "linux"))]
{
let _ = elapsed; }
}
fn update_memory_stats(&self) {
#[cfg(target_os = "linux")]
{
if let Ok(statm) = std::fs::read_to_string("/proc/self/statm") {
let fields: Vec<&str> = statm.split_whitespace().collect();
if fields.len() >= 2 {
let page_size = 4096u64; let virtual_pages = fields[0].parse::<u64>().unwrap_or(0);
let rss_pages = fields[1].parse::<u64>().unwrap_or(0);
if let Ok(mut stats) = self.memory_stats.write() {
stats.virtual_bytes = virtual_pages * page_size;
stats.rss_bytes = rss_pages * page_size;
stats.allocated_bytes = rss_pages * page_size;
if stats.allocated_bytes > stats.peak_allocated_bytes {
stats.peak_allocated_bytes = stats.allocated_bytes;
}
}
}
}
}
#[cfg(not(target_os = "linux"))]
{
}
}
fn update_io_stats(&self) {
#[cfg(target_os = "linux")]
{
if let Ok(io_stat) = std::fs::read_to_string("/proc/self/io") {
let mut read_bytes = 0u64;
let mut write_bytes = 0u64;
for line in io_stat.lines() {
let parts: Vec<&str> = line.split(':').collect();
if parts.len() == 2 {
let value = parts[1].trim().parse::<u64>().unwrap_or(0);
match parts[0].trim() {
"read_bytes" => read_bytes = value,
"write_bytes" => write_bytes = value,
_ => {}
}
}
}
if let Ok(mut stats) = self.io_stats.write() {
stats.bytes_read = read_bytes;
stats.bytes_written = write_bytes;
}
}
}
#[cfg(not(target_os = "linux"))]
{
}
}
pub fn record_read(&self, bytes: u64, latency_us: u64) {
if let Ok(mut stats) = self.io_stats.write() {
stats.read_ops += 1;
stats.read_latency_us += latency_us;
let _ = bytes; }
}
pub fn record_write(&self, bytes: u64, latency_us: u64) {
if let Ok(mut stats) = self.io_stats.write() {
stats.write_ops += 1;
stats.write_latency_us += latency_us;
let _ = bytes; }
}
pub fn get_snapshots(&self) -> Vec<ProfileSnapshot> {
self.snapshots
.read()
.ok()
.map(|s| s.clone())
.unwrap_or_default()
}
pub fn get_current_stats(&self) -> ProfileSnapshot {
ProfileSnapshot {
timestamp: SystemTime::now(),
cpu: self
.cpu_stats
.read()
.ok()
.map(|g| g.clone())
.unwrap_or_default(),
memory: self
.memory_stats
.read()
.ok()
.map(|g| g.clone())
.unwrap_or_default(),
io: self
.io_stats
.read()
.ok()
.map(|g| g.clone())
.unwrap_or_default(),
}
}
pub fn export_pprof(&self) -> Result<String, std::io::Error> {
let snapshots = self.get_snapshots();
let mut output = String::new();
output.push_str("# Continuous Profiling Data\n");
output.push_str(&format!("# Generated at: {:?}\n", SystemTime::now()));
output.push_str(&format!("# Total snapshots: {}\n", snapshots.len()));
output.push('\n');
output.push_str("## CPU Statistics\n");
for snapshot in snapshots.iter() {
let timestamp = snapshot
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
output.push_str(&format!(
"{} cpu_usage={:.2}% user={}ns system={}ns\n",
timestamp,
snapshot.cpu.cpu_usage_percent,
snapshot.cpu.user_time_ns,
snapshot.cpu.system_time_ns
));
}
output.push_str("\n## Memory Statistics\n");
for snapshot in snapshots.iter() {
let timestamp = snapshot
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
output.push_str(&format!(
"{} rss={}MB virtual={}MB peak={}MB\n",
timestamp,
snapshot.memory.rss_bytes / 1_048_576,
snapshot.memory.virtual_bytes / 1_048_576,
snapshot.memory.peak_allocated_bytes / 1_048_576
));
}
output.push_str("\n## I/O Statistics\n");
for snapshot in snapshots.iter() {
let timestamp = snapshot
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
output.push_str(&format!(
"{} read={}MB write={}MB read_ops={} write_ops={}\n",
timestamp,
snapshot.io.bytes_read / 1_048_576,
snapshot.io.bytes_written / 1_048_576,
snapshot.io.read_ops,
snapshot.io.write_ops
));
}
Ok(output)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_profiling_config_default() {
let config = ProfilingConfig::default();
assert_eq!(config.cpu_sample_rate, 100);
assert_eq!(config.memory_sample_rate, 1000);
assert_eq!(config.max_profiles, 24);
}
#[test]
fn test_profiler_creation() {
let config = ProfilingConfig::default();
let profiler = Profiler::new(config);
let snapshot = profiler.get_current_stats();
assert!(snapshot.timestamp.duration_since(UNIX_EPOCH).is_ok());
}
#[test]
fn test_io_recording() {
let config = ProfilingConfig::default();
let profiler = Profiler::new(config);
profiler.record_read(1024, 100);
profiler.record_write(2048, 200);
let stats = profiler.io_stats.read().ok();
assert!(stats.is_some());
let stats = stats.expect("Failed to get I/O stats");
assert_eq!(stats.read_ops, 1);
assert_eq!(stats.write_ops, 1);
assert_eq!(stats.read_latency_us, 100);
assert_eq!(stats.write_latency_us, 200);
}
#[test]
fn test_pprof_export() {
let config = ProfilingConfig::default();
let profiler = Profiler::new(config);
let result = profiler.export_pprof();
assert!(result.is_ok());
let output = result.expect("Failed to export pprof");
assert!(output.contains("# Continuous Profiling Data"));
assert!(output.contains("## CPU Statistics"));
assert!(output.contains("## Memory Statistics"));
assert!(output.contains("## I/O Statistics"));
}
}