use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use super::config::ProfilerConfig;
#[derive(Debug)]
pub struct LatencyHistogram {
buckets: Vec<u64>,
counts: Vec<AtomicU64>,
total_count: AtomicU64,
total_sum: AtomicU64,
min: AtomicU64,
max: AtomicU64,
}
impl LatencyHistogram {
pub fn new(buckets: Vec<u64>) -> Self {
let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
Self {
buckets,
counts,
total_count: AtomicU64::new(0),
total_sum: AtomicU64::new(0),
min: AtomicU64::new(u64::MAX),
max: AtomicU64::new(0),
}
}
pub fn with_defaults() -> Self {
Self::new(vec![
50, 100, 250, 500, 1_000, 2_500, 5_000, 10_000, 25_000, 50_000, 100_000, ])
}
pub fn record(&self, latency_us: u64) {
let bucket_index = self
.buckets
.iter()
.position(|&b| latency_us <= b)
.unwrap_or(self.buckets.len());
self.counts[bucket_index].fetch_add(1, Ordering::Relaxed);
self.total_count.fetch_add(1, Ordering::Relaxed);
self.total_sum.fetch_add(latency_us, Ordering::Relaxed);
loop {
let current_min = self.min.load(Ordering::Relaxed);
if latency_us >= current_min {
break;
}
if self
.min
.compare_exchange_weak(
current_min,
latency_us,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
loop {
let current_max = self.max.load(Ordering::Relaxed);
if latency_us <= current_max {
break;
}
if self
.max
.compare_exchange_weak(
current_max,
latency_us,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
}
pub fn record_duration(&self, duration: Duration) {
self.record(duration.as_micros() as u64);
}
pub fn count(&self) -> u64 {
self.total_count.load(Ordering::Relaxed)
}
pub fn average(&self) -> f64 {
let count = self.count();
if count == 0 {
return 0.0;
}
self.total_sum.load(Ordering::Relaxed) as f64 / count as f64
}
pub fn min(&self) -> Option<u64> {
let min = self.min.load(Ordering::Relaxed);
if min == u64::MAX {
None
} else {
Some(min)
}
}
pub fn max(&self) -> Option<u64> {
let max = self.max.load(Ordering::Relaxed);
if max == 0 && self.count() == 0 {
None
} else {
Some(max)
}
}
pub fn percentile(&self, p: f64) -> Option<u64> {
let count = self.count();
if count == 0 {
return None;
}
let target = ((p / 100.0) * count as f64).ceil() as u64;
let mut cumulative = 0u64;
for (i, bucket_count) in self.counts.iter().enumerate() {
cumulative += bucket_count.load(Ordering::Relaxed);
if cumulative >= target {
if i < self.buckets.len() {
return Some(self.buckets[i]);
} else {
return self.max();
}
}
}
self.max()
}
pub fn p50(&self) -> Option<u64> {
self.percentile(50.0)
}
pub fn p90(&self) -> Option<u64> {
self.percentile(90.0)
}
pub fn p95(&self) -> Option<u64> {
self.percentile(95.0)
}
pub fn p99(&self) -> Option<u64> {
self.percentile(99.0)
}
pub fn bucket_counts(&self) -> Vec<(String, u64)> {
let mut result = Vec::with_capacity(self.counts.len());
for (i, count) in self.counts.iter().enumerate() {
let label = if i == 0 {
format!("≤{}µs", self.buckets[0])
} else if i < self.buckets.len() {
format!("≤{}µs", self.buckets[i])
} else {
format!(">{}µs", self.buckets.last().unwrap_or(&0))
};
result.push((label, count.load(Ordering::Relaxed)));
}
result
}
pub fn reset(&self) {
for count in &self.counts {
count.store(0, Ordering::Relaxed);
}
self.total_count.store(0, Ordering::Relaxed);
self.total_sum.store(0, Ordering::Relaxed);
self.min.store(u64::MAX, Ordering::Relaxed);
self.max.store(0, Ordering::Relaxed);
}
pub fn snapshot(&self) -> HistogramSnapshot {
HistogramSnapshot {
count: self.count(),
average_us: self.average(),
min_us: self.min(),
max_us: self.max(),
p50_us: self.p50(),
p90_us: self.p90(),
p95_us: self.p95(),
p99_us: self.p99(),
buckets: self.bucket_counts(),
}
}
}
#[derive(Debug, Clone)]
pub struct HistogramSnapshot {
pub count: u64,
pub average_us: f64,
pub min_us: Option<u64>,
pub max_us: Option<u64>,
pub p50_us: Option<u64>,
pub p90_us: Option<u64>,
pub p95_us: Option<u64>,
pub p99_us: Option<u64>,
pub buckets: Vec<(String, u64)>,
}
pub struct ThroughputCounter {
window_secs: usize,
slots: Vec<AtomicU64>,
current_slot: AtomicUsize,
last_update: RwLock<Instant>,
total: AtomicU64,
}
impl ThroughputCounter {
pub fn new(window_secs: usize) -> Self {
let slots = (0..window_secs).map(|_| AtomicU64::new(0)).collect();
Self {
window_secs,
slots,
current_slot: AtomicUsize::new(0),
last_update: RwLock::new(Instant::now()),
total: AtomicU64::new(0),
}
}
pub fn with_default_window() -> Self {
Self::new(60)
}
pub fn increment(&self) {
self.increment_by(1);
}
pub fn increment_by(&self, count: u64) {
self.advance_slots();
let slot = self.current_slot.load(Ordering::Relaxed) % self.window_secs;
self.slots[slot].fetch_add(count, Ordering::Relaxed);
self.total.fetch_add(count, Ordering::Relaxed);
}
fn advance_slots(&self) {
let mut last_update = self.last_update.write();
let elapsed = last_update.elapsed();
let slots_to_advance = elapsed.as_secs() as usize;
if slots_to_advance > 0 {
*last_update = Instant::now();
let current = self.current_slot.load(Ordering::Relaxed);
let new_slot = current + slots_to_advance;
for i in 1..=slots_to_advance.min(self.window_secs) {
let slot_index = (current + i) % self.window_secs;
self.slots[slot_index].store(0, Ordering::Relaxed);
}
self.current_slot.store(new_slot, Ordering::Relaxed);
}
}
pub fn throughput(&self) -> f64 {
self.advance_slots();
let total: u64 = self.slots.iter().map(|s| s.load(Ordering::Relaxed)).sum();
total as f64 / self.window_secs as f64
}
pub fn total(&self) -> u64 {
self.total.load(Ordering::Relaxed)
}
pub fn reset(&self) {
for slot in &self.slots {
slot.store(0, Ordering::Relaxed);
}
self.total.store(0, Ordering::Relaxed);
*self.last_update.write() = Instant::now();
}
}
#[derive(Debug, Clone, Default)]
pub struct ResourceUsage {
pub memory_bytes: u64,
pub peak_memory_bytes: u64,
pub connections: usize,
pub peak_connections: usize,
pub cpu_utilization: Option<f64>,
}
#[derive(Debug, Clone)]
pub struct ProfileSnapshot {
pub timestamp: Instant,
pub latency: HistogramSnapshot,
pub throughput: f64,
pub total_requests: u64,
pub resources: ResourceUsage,
pub uptime: Duration,
}
#[derive(Debug, Clone)]
pub struct ProfileReport {
pub generated_at: Instant,
pub current: ProfileSnapshot,
pub summary: PerformanceSummary,
pub recommendations: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct PerformanceSummary {
pub latency_ok: bool,
pub throughput_ok: bool,
pub memory_ok: bool,
pub health_score: f64,
}
pub struct PerformanceProfiler {
config: ProfilerConfig,
latency: LatencyHistogram,
throughput: ThroughputCounter,
memory_bytes: AtomicU64,
peak_memory_bytes: AtomicU64,
connections: AtomicUsize,
peak_connections: AtomicUsize,
sample_counter: AtomicU64,
created_at: Instant,
last_report: RwLock<Instant>,
}
impl PerformanceProfiler {
pub fn new(config: ProfilerConfig) -> Self {
let latency = LatencyHistogram::new(config.histogram_buckets.clone());
let throughput = ThroughputCounter::with_default_window();
Self {
config,
latency,
throughput,
memory_bytes: AtomicU64::new(0),
peak_memory_bytes: AtomicU64::new(0),
connections: AtomicUsize::new(0),
peak_connections: AtomicUsize::new(0),
sample_counter: AtomicU64::new(0),
created_at: Instant::now(),
last_report: RwLock::new(Instant::now()),
}
}
pub fn with_defaults() -> Self {
Self::new(ProfilerConfig::default())
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
fn should_sample(&self) -> bool {
if !self.config.enabled {
return false;
}
if self.config.sample_rate >= 1.0 {
return true;
}
let counter = self.sample_counter.fetch_add(1, Ordering::Relaxed);
let threshold = (1.0 / self.config.sample_rate) as u64;
counter % threshold == 0
}
pub fn record_latency(&self, duration: Duration) {
if self.should_sample() {
self.latency.record_duration(duration);
}
self.throughput.increment();
}
pub fn record_latency_us(&self, latency_us: u64) {
if self.should_sample() {
self.latency.record(latency_us);
}
self.throughput.increment();
}
pub fn record_batch(&self, count: u64, total_latency: Duration) {
if self.should_sample() {
let avg_latency = total_latency.as_micros() as u64 / count.max(1);
self.latency.record(avg_latency);
}
self.throughput.increment_by(count);
}
pub fn set_memory(&self, bytes: u64) {
self.memory_bytes.store(bytes, Ordering::Relaxed);
loop {
let current_peak = self.peak_memory_bytes.load(Ordering::Relaxed);
if bytes <= current_peak {
break;
}
if self
.peak_memory_bytes
.compare_exchange_weak(current_peak, bytes, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
pub fn set_connections(&self, count: usize) {
self.connections.store(count, Ordering::Relaxed);
loop {
let current_peak = self.peak_connections.load(Ordering::Relaxed);
if count <= current_peak {
break;
}
if self
.peak_connections
.compare_exchange_weak(current_peak, count, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
pub fn latency(&self) -> &LatencyHistogram {
&self.latency
}
pub fn throughput(&self) -> f64 {
self.throughput.throughput()
}
pub fn total_requests(&self) -> u64 {
self.throughput.total()
}
pub fn resource_usage(&self) -> ResourceUsage {
ResourceUsage {
memory_bytes: self.memory_bytes.load(Ordering::Relaxed),
peak_memory_bytes: self.peak_memory_bytes.load(Ordering::Relaxed),
connections: self.connections.load(Ordering::Relaxed),
peak_connections: self.peak_connections.load(Ordering::Relaxed),
cpu_utilization: None, }
}
pub fn snapshot(&self) -> ProfileSnapshot {
ProfileSnapshot {
timestamp: Instant::now(),
latency: self.latency.snapshot(),
throughput: self.throughput(),
total_requests: self.total_requests(),
resources: self.resource_usage(),
uptime: self.uptime(),
}
}
pub fn report(&self) -> ProfileReport {
let current = self.snapshot();
let summary = self.analyze(¤t);
let recommendations = self.generate_recommendations(¤t, &summary);
ProfileReport {
generated_at: Instant::now(),
current,
summary,
recommendations,
}
}
fn analyze(&self, snapshot: &ProfileSnapshot) -> PerformanceSummary {
let latency_ok = snapshot
.latency
.p99_us
.map(|p99| p99 < 10_000)
.unwrap_or(true);
let throughput_ok = snapshot.throughput >= 100_000.0 || snapshot.total_requests < 1000;
let memory_ok = snapshot.resources.memory_bytes < 2 * 1024 * 1024 * 1024;
let mut score: f64 = 1.0;
if !latency_ok {
score -= 0.3;
}
if !throughput_ok && snapshot.total_requests > 10000 {
score -= 0.3;
}
if !memory_ok {
score -= 0.4;
}
PerformanceSummary {
latency_ok,
throughput_ok,
memory_ok,
health_score: score.max(0.0),
}
}
fn generate_recommendations(
&self,
snapshot: &ProfileSnapshot,
summary: &PerformanceSummary,
) -> Vec<String> {
let mut recommendations = Vec::new();
if !summary.latency_ok {
if let Some(p99) = snapshot.latency.p99_us {
recommendations.push(format!(
"Latency p99 is {}µs (target: <10000µs). Consider:\n\
- Increasing batch size\n\
- Reducing handler complexity\n\
- Adding more shards to connection pool",
p99
));
}
}
if !summary.throughput_ok && snapshot.total_requests > 10000 {
recommendations.push(format!(
"Throughput is {:.0} req/s (target: >100000 req/s). Consider:\n\
- Enabling batch processing\n\
- Increasing worker threads\n\
- Optimizing handler logic",
snapshot.throughput
));
}
if !summary.memory_ok {
let mb = snapshot.resources.memory_bytes / (1024 * 1024);
recommendations.push(format!(
"Memory usage is {}MB (target: <2048MB). Consider:\n\
- Using sparse register store\n\
- Reducing connection pool size\n\
- Implementing memory-mapped storage",
mb
));
}
recommendations
}
pub fn should_report(&self) -> bool {
let last = self.last_report.read().elapsed();
last >= self.config.report_interval
}
pub fn mark_reported(&self) {
*self.last_report.write() = Instant::now();
}
pub fn uptime(&self) -> Duration {
self.created_at.elapsed()
}
pub fn reset(&self) {
self.latency.reset();
self.throughput.reset();
self.memory_bytes.store(0, Ordering::Relaxed);
self.peak_memory_bytes.store(0, Ordering::Relaxed);
self.connections.store(0, Ordering::Relaxed);
self.peak_connections.store(0, Ordering::Relaxed);
self.sample_counter.store(0, Ordering::Relaxed);
}
pub fn config(&self) -> &ProfilerConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::super::config::ProfilerConfig;
use super::*;
#[test]
fn test_histogram_basic() {
let histogram = LatencyHistogram::with_defaults();
histogram.record(100);
histogram.record(500);
histogram.record(1000);
assert_eq!(histogram.count(), 3);
assert!(histogram.average() > 0.0);
assert_eq!(histogram.min(), Some(100));
assert_eq!(histogram.max(), Some(1000));
}
#[test]
fn test_histogram_percentiles() {
let histogram = LatencyHistogram::with_defaults();
for i in 0..100 {
histogram.record(i * 100); }
let p50 = histogram.p50();
let p99 = histogram.p99();
assert!(p50.is_some());
assert!(p99.is_some());
assert!(p99.unwrap() >= p50.unwrap());
}
#[test]
fn test_histogram_buckets() {
let histogram = LatencyHistogram::new(vec![100, 500, 1000]);
histogram.record(50); histogram.record(200); histogram.record(800); histogram.record(5000);
let buckets = histogram.bucket_counts();
assert_eq!(buckets.len(), 4);
assert_eq!(buckets[0].1, 1);
assert_eq!(buckets[1].1, 1);
assert_eq!(buckets[2].1, 1);
assert_eq!(buckets[3].1, 1);
}
#[test]
fn test_throughput_counter() {
let counter = ThroughputCounter::new(10);
counter.increment_by(100);
let throughput = counter.throughput();
assert!(throughput > 0.0);
assert_eq!(counter.total(), 100);
}
#[test]
fn test_profiler_basic() {
let config = ProfilerConfig {
enabled: true,
sample_rate: 1.0,
histogram_buckets: vec![100, 500, 1000, 5000, 10000],
report_interval: Duration::from_secs(60),
track_memory: true,
track_cpu: false,
};
let profiler = PerformanceProfiler::new(config);
profiler.record_latency(Duration::from_micros(500));
profiler.record_latency(Duration::from_micros(1000));
profiler.record_latency(Duration::from_micros(2000));
assert!(profiler.throughput() > 0.0);
assert_eq!(profiler.total_requests(), 3);
let snapshot = profiler.snapshot();
assert_eq!(snapshot.latency.count, 3);
}
#[test]
fn test_profiler_sampling() {
let mut config = ProfilerConfig::default();
config.sample_rate = 0.5;
let profiler = PerformanceProfiler::new(config);
for _ in 0..100 {
profiler.record_latency(Duration::from_micros(100));
}
let count = profiler.latency.count();
assert!(
count > 30 && count < 70,
"Expected ~50 samples, got {}",
count
);
assert_eq!(profiler.total_requests(), 100);
}
#[test]
fn test_profiler_resources() {
let profiler = PerformanceProfiler::with_defaults();
profiler.set_memory(1024 * 1024); profiler.set_connections(100);
let resources = profiler.resource_usage();
assert_eq!(resources.memory_bytes, 1024 * 1024);
assert_eq!(resources.connections, 100);
profiler.set_memory(2 * 1024 * 1024);
profiler.set_connections(200);
profiler.set_memory(1024 * 1024); profiler.set_connections(100);
let resources = profiler.resource_usage();
assert_eq!(resources.peak_memory_bytes, 2 * 1024 * 1024);
assert_eq!(resources.peak_connections, 200);
}
#[test]
fn test_profiler_report() {
let profiler = PerformanceProfiler::with_defaults();
for _ in 0..100 {
profiler.record_latency(Duration::from_micros(500));
}
let report = profiler.report();
assert!(report.summary.latency_ok);
assert!(report.summary.memory_ok);
assert!(report.summary.health_score > 0.0);
}
#[test]
fn test_histogram_reset() {
let histogram = LatencyHistogram::with_defaults();
histogram.record(100);
histogram.record(200);
assert_eq!(histogram.count(), 2);
histogram.reset();
assert_eq!(histogram.count(), 0);
assert!(histogram.min().is_none());
}
#[test]
fn test_profiler_disabled() {
let mut config = ProfilerConfig::default();
config.enabled = false;
let profiler = PerformanceProfiler::new(config);
assert!(!profiler.is_enabled());
profiler.record_latency(Duration::from_micros(100));
assert_eq!(profiler.latency.count(), 0);
assert_eq!(profiler.total_requests(), 1);
}
}