use std::{
sync::{
Arc,
atomic::{AtomicU32, AtomicU64, Ordering},
},
time::Duration,
};
use fraiseql_core::db::{traits::DatabaseAdapter, types::PoolMetrics};
use crate::config::pool_tuning::PoolPressureMonitorConfig;
#[derive(Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum PoolSizingRecommendation {
Stable,
RecommendScaleUp {
new_size: u32,
reason: String,
},
RecommendScaleDown {
new_size: u32,
reason: String,
},
}
pub struct PoolSizingAdvisor {
pub(crate) config: PoolPressureMonitorConfig,
high_queue_samples: AtomicU32,
low_idle_samples: AtomicU32,
adjustments_total: AtomicU64,
pub(crate) current_target: AtomicU32,
}
impl PoolSizingAdvisor {
#[must_use]
pub const fn new(config: PoolPressureMonitorConfig) -> Self {
Self {
config,
high_queue_samples: AtomicU32::new(0),
low_idle_samples: AtomicU32::new(0),
adjustments_total: AtomicU64::new(0),
current_target: AtomicU32::new(0),
}
}
pub fn evaluate(&self, metrics: &PoolMetrics) -> PoolSizingRecommendation {
let current = self.current_size(metrics);
let min = self.config.min_pool_size;
let max = self.config.max_pool_size;
if metrics.waiting_requests > self.config.target_queue_depth {
let count = self.high_queue_samples.fetch_add(1, Ordering::Relaxed) + 1;
self.low_idle_samples.store(0, Ordering::Relaxed);
if count >= self.config.samples_before_action {
let desired = (current + self.config.scale_up_step).min(max);
if desired > current {
self.high_queue_samples.store(0, Ordering::Relaxed);
self.adjustments_total.fetch_add(1, Ordering::Relaxed);
self.current_target.store(desired, Ordering::Relaxed);
return PoolSizingRecommendation::RecommendScaleUp {
new_size: desired,
reason: format!(
"{} requests waiting (threshold {}); grown by {}",
metrics.waiting_requests,
self.config.target_queue_depth,
self.config.scale_up_step,
),
};
}
self.high_queue_samples.store(0, Ordering::Relaxed);
}
return PoolSizingRecommendation::Stable;
}
self.high_queue_samples.store(0, Ordering::Relaxed);
if current > min && metrics.total_connections > 0 {
let idle_ratio =
f64::from(metrics.idle_connections) / f64::from(metrics.total_connections);
if idle_ratio > self.config.scale_down_idle_ratio && metrics.waiting_requests == 0 {
let count = self.low_idle_samples.fetch_add(1, Ordering::Relaxed) + 1;
if count >= self.config.samples_before_action {
let desired = current.saturating_sub(self.config.scale_down_step).max(min);
self.low_idle_samples.store(0, Ordering::Relaxed);
self.adjustments_total.fetch_add(1, Ordering::Relaxed);
self.current_target.store(desired, Ordering::Relaxed);
return PoolSizingRecommendation::RecommendScaleDown {
new_size: desired,
reason: format!(
"idle ratio {:.0}% > {:.0}% threshold; shrunk by {}",
idle_ratio * 100.0,
self.config.scale_down_idle_ratio * 100.0,
self.config.scale_down_step,
),
};
}
return PoolSizingRecommendation::Stable;
}
}
self.low_idle_samples.store(0, Ordering::Relaxed);
PoolSizingRecommendation::Stable
}
pub fn adjustments_total(&self) -> u64 {
self.adjustments_total.load(Ordering::Relaxed)
}
pub fn recommended_size(&self) -> u32 {
self.current_target.load(Ordering::Relaxed)
}
pub fn start<A: DatabaseAdapter + 'static>(
self: Arc<Self>,
adapter: Arc<A>,
resize_fn: Option<Arc<dyn Fn(usize) + Send + Sync>>,
) -> tokio::task::JoinHandle<()> {
let interval_ms = self.config.tuning_interval_ms;
tokio::spawn(async move {
tracing::debug!(
"Pool pressure monitoring enabled (recommendation mode). \
The pool cannot be resized at runtime; act on \
fraiseql_pool_scaling_recommended events by adjusting \
max_connections and restarting."
);
if resize_fn.is_none() {
tracing::debug!(
"No resize_fn configured — pool pressure monitor is in \
pure advisory mode. Recommendations will appear in \
fraiseql_pool_tuning_* metrics and WARN log lines."
);
}
let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms.max(1)));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
let metrics = adapter.pool_metrics();
match self.evaluate(&metrics) {
PoolSizingRecommendation::Stable => {},
PoolSizingRecommendation::RecommendScaleUp {
new_size,
ref reason,
} => {
if let Some(ref f) = resize_fn {
tracing::info!(
new_size,
reason = reason.as_str(),
"Pool auto-tuner: scaling up"
);
f(new_size as usize);
} else {
tracing::warn!(
new_size,
reason = reason.as_str(),
"Pool auto-tuner recommends scaling up \
(resize not available — configure resize_fn)"
);
}
},
PoolSizingRecommendation::RecommendScaleDown {
new_size,
ref reason,
} => {
if let Some(ref f) = resize_fn {
tracing::info!(
new_size,
reason = reason.as_str(),
"Pool auto-tuner: scaling down"
);
f(new_size as usize);
} else {
tracing::warn!(
new_size,
reason = reason.as_str(),
"Pool auto-tuner recommends scaling down \
(resize not available — configure resize_fn)"
);
}
},
}
}
})
}
fn current_size(&self, metrics: &PoolMetrics) -> u32 {
let recorded = self.current_target.load(Ordering::Relaxed);
if recorded > 0 {
recorded
} else if metrics.total_connections > 0 {
metrics.total_connections
} else {
self.config.min_pool_size
}
}
}