1use std::{
8 sync::{
9 Arc,
10 atomic::{AtomicU32, AtomicU64, Ordering},
11 },
12 time::Duration,
13};
14
15use fraiseql_core::db::{traits::DatabaseAdapter, types::PoolMetrics};
16
17use crate::config::pool_tuning::PoolPressureMonitorConfig;
18
19#[derive(Debug, PartialEq, Eq)]
26#[non_exhaustive]
27pub enum PoolSizingRecommendation {
28 Stable,
30 RecommendScaleUp {
34 new_size: u32,
36 reason: String,
38 },
39 RecommendScaleDown {
43 new_size: u32,
45 reason: String,
47 },
48}
49
50pub struct PoolSizingAdvisor {
60 pub(crate) config: PoolPressureMonitorConfig,
62 high_queue_samples: AtomicU32,
64 low_idle_samples: AtomicU32,
66 adjustments_total: AtomicU64,
68 current_target: AtomicU32,
70}
71
72impl PoolSizingAdvisor {
73 pub const fn new(config: PoolPressureMonitorConfig) -> Self {
75 Self {
76 config,
77 high_queue_samples: AtomicU32::new(0),
78 low_idle_samples: AtomicU32::new(0),
79 adjustments_total: AtomicU64::new(0),
80 current_target: AtomicU32::new(0),
81 }
82 }
83
84 pub fn evaluate(&self, metrics: &PoolMetrics) -> PoolSizingRecommendation {
90 let current = self.current_size(metrics);
91 let min = self.config.min_pool_size;
92 let max = self.config.max_pool_size;
93
94 if metrics.waiting_requests > self.config.target_queue_depth {
96 let count = self.high_queue_samples.fetch_add(1, Ordering::Relaxed) + 1;
97 self.low_idle_samples.store(0, Ordering::Relaxed);
98
99 if count >= self.config.samples_before_action {
100 let desired = (current + self.config.scale_up_step).min(max);
101 if desired > current {
102 self.high_queue_samples.store(0, Ordering::Relaxed);
103 self.adjustments_total.fetch_add(1, Ordering::Relaxed);
104 self.current_target.store(desired, Ordering::Relaxed);
105 return PoolSizingRecommendation::RecommendScaleUp {
106 new_size: desired,
107 reason: format!(
108 "{} requests waiting (threshold {}); grown by {}",
109 metrics.waiting_requests,
110 self.config.target_queue_depth,
111 self.config.scale_up_step,
112 ),
113 };
114 }
115 self.high_queue_samples.store(0, Ordering::Relaxed);
117 }
118 return PoolSizingRecommendation::Stable;
119 }
120
121 self.high_queue_samples.store(0, Ordering::Relaxed);
122
123 if current > min && metrics.total_connections > 0 {
125 let idle_ratio =
126 f64::from(metrics.idle_connections) / f64::from(metrics.total_connections);
127
128 if idle_ratio > self.config.scale_down_idle_ratio && metrics.waiting_requests == 0 {
129 let count = self.low_idle_samples.fetch_add(1, Ordering::Relaxed) + 1;
130
131 if count >= self.config.samples_before_action {
132 let desired = current.saturating_sub(self.config.scale_down_step).max(min);
133 self.low_idle_samples.store(0, Ordering::Relaxed);
134 self.adjustments_total.fetch_add(1, Ordering::Relaxed);
135 self.current_target.store(desired, Ordering::Relaxed);
136 return PoolSizingRecommendation::RecommendScaleDown {
137 new_size: desired,
138 reason: format!(
139 "idle ratio {:.0}% > {:.0}% threshold; shrunk by {}",
140 idle_ratio * 100.0,
141 self.config.scale_down_idle_ratio * 100.0,
142 self.config.scale_down_step,
143 ),
144 };
145 }
146 return PoolSizingRecommendation::Stable;
147 }
148 }
149
150 self.low_idle_samples.store(0, Ordering::Relaxed);
151 PoolSizingRecommendation::Stable
152 }
153
154 pub fn adjustments_total(&self) -> u64 {
156 self.adjustments_total.load(Ordering::Relaxed)
157 }
158
159 pub fn recommended_size(&self) -> u32 {
161 self.current_target.load(Ordering::Relaxed)
162 }
163
164 pub fn start<A: DatabaseAdapter + 'static>(
175 self: Arc<Self>,
176 adapter: Arc<A>,
177 resize_fn: Option<Arc<dyn Fn(usize) + Send + Sync>>,
178 ) -> tokio::task::JoinHandle<()> {
179 let interval_ms = self.config.tuning_interval_ms;
180
181 tokio::spawn(async move {
182 tracing::debug!(
183 "Pool pressure monitoring enabled (recommendation mode). \
184 The pool cannot be resized at runtime; act on \
185 fraiseql_pool_scaling_recommended events by adjusting \
186 max_connections and restarting."
187 );
188 if resize_fn.is_none() {
189 tracing::debug!(
190 "No resize_fn configured — pool pressure monitor is in \
191 pure advisory mode. Recommendations will appear in \
192 fraiseql_pool_tuning_* metrics and WARN log lines."
193 );
194 }
195
196 let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms.max(1)));
197 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
198
199 loop {
200 ticker.tick().await;
201 let metrics = adapter.pool_metrics();
202
203 match self.evaluate(&metrics) {
204 PoolSizingRecommendation::Stable => {},
205 PoolSizingRecommendation::RecommendScaleUp {
206 new_size,
207 ref reason,
208 } => {
209 if let Some(ref f) = resize_fn {
210 tracing::info!(
211 new_size,
212 reason = reason.as_str(),
213 "Pool auto-tuner: scaling up"
214 );
215 f(new_size as usize);
216 } else {
217 tracing::warn!(
218 new_size,
219 reason = reason.as_str(),
220 "Pool auto-tuner recommends scaling up \
221 (resize not available — configure resize_fn)"
222 );
223 }
224 },
225 PoolSizingRecommendation::RecommendScaleDown {
226 new_size,
227 ref reason,
228 } => {
229 if let Some(ref f) = resize_fn {
230 tracing::info!(
231 new_size,
232 reason = reason.as_str(),
233 "Pool auto-tuner: scaling down"
234 );
235 f(new_size as usize);
236 } else {
237 tracing::warn!(
238 new_size,
239 reason = reason.as_str(),
240 "Pool auto-tuner recommends scaling down \
241 (resize not available — configure resize_fn)"
242 );
243 }
244 },
245 }
246 }
247 })
248 }
249
250 fn current_size(&self, metrics: &PoolMetrics) -> u32 {
252 let recorded = self.current_target.load(Ordering::Relaxed);
253 if recorded > 0 {
254 recorded
255 } else if metrics.total_connections > 0 {
256 metrics.total_connections
257 } else {
258 self.config.min_pool_size
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 #![allow(clippy::unwrap_used)] #![allow(clippy::cast_precision_loss)] #![allow(clippy::cast_sign_loss)] #![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_possible_wrap)] #![allow(clippy::missing_panics_doc)] #![allow(clippy::missing_errors_doc)] #![allow(missing_docs)] #![allow(clippy::items_after_statements)] use async_trait::async_trait;
276 use fraiseql_core::db::{
277 WhereClause,
278 types::{DatabaseType, JsonbValue, OrderByClause},
279 };
280 use fraiseql_error::Result as FraiseQLResult;
281
282 use super::*;
283
284 struct MockAdapter {
286 metrics: PoolMetrics,
287 }
288
289 impl MockAdapter {
290 fn with_metrics(metrics: PoolMetrics) -> Self {
291 Self { metrics }
292 }
293 }
294
295 #[async_trait]
299 impl DatabaseAdapter for MockAdapter {
300 async fn execute_where_query(
301 &self,
302 _view: &str,
303 _where_clause: Option<&WhereClause>,
304 _limit: Option<u32>,
305 _offset: Option<u32>,
306 _order_by: Option<&[OrderByClause]>,
307 ) -> FraiseQLResult<Vec<JsonbValue>> {
308 Ok(vec![])
309 }
310
311 async fn execute_with_projection(
312 &self,
313 _view: &str,
314 _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
315 _where_clause: Option<&WhereClause>,
316 _limit: Option<u32>,
317 _offset: Option<u32>,
318 _order_by: Option<&[OrderByClause]>,
319 ) -> FraiseQLResult<Vec<JsonbValue>> {
320 Ok(vec![])
321 }
322
323 fn database_type(&self) -> DatabaseType {
324 DatabaseType::SQLite
325 }
326
327 async fn health_check(&self) -> FraiseQLResult<()> {
328 Ok(())
329 }
330
331 fn pool_metrics(&self) -> PoolMetrics {
332 self.metrics
333 }
334
335 async fn execute_raw_query(
336 &self,
337 _sql: &str,
338 ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
339 Ok(vec![])
340 }
341
342 async fn execute_parameterized_aggregate(
343 &self,
344 _sql: &str,
345 _params: &[serde_json::Value],
346 ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
347 Ok(vec![])
348 }
349 }
350
351 fn make_tuner(min: u32, max: u32, target_queue: u32) -> PoolSizingAdvisor {
352 PoolSizingAdvisor::new(PoolPressureMonitorConfig {
353 enabled: true,
354 min_pool_size: min,
355 max_pool_size: max,
356 target_queue_depth: target_queue,
357 samples_before_action: 1,
358 ..Default::default()
359 })
360 }
361
362 fn metrics(total: u32, idle: u32, waiting: u32) -> PoolMetrics {
363 PoolMetrics {
364 total_connections: total,
365 idle_connections: idle,
366 active_connections: total.saturating_sub(idle),
367 waiting_requests: waiting,
368 }
369 }
370
371 #[test]
372 fn test_evaluate_stable_when_queue_low_and_idle_at_threshold() {
373 let tuner = make_tuner(5, 50, 3);
375 assert_eq!(tuner.evaluate(&metrics(20, 10, 0)), PoolSizingRecommendation::Stable);
376 }
377
378 #[test]
379 fn test_evaluate_scale_up_when_queue_exceeds_target() {
380 let tuner = make_tuner(5, 50, 3);
381 let decision = tuner.evaluate(&metrics(20, 2, 8));
382 assert!(
383 matches!(&decision, PoolSizingRecommendation::RecommendScaleUp { new_size, .. } if *new_size == 25),
384 "expected ScaleUp to 25, got {decision:?}"
385 );
386 }
387
388 #[test]
389 fn test_evaluate_scale_down_when_idle_ratio_high() {
390 let tuner = make_tuner(5, 50, 3);
391 let decision = tuner.evaluate(&metrics(20, 18, 0));
393 assert!(
394 matches!(&decision, PoolSizingRecommendation::RecommendScaleDown { new_size, .. } if *new_size == 18),
395 "expected ScaleDown to 18, got {decision:?}"
396 );
397 }
398
399 #[test]
400 fn test_evaluate_never_below_min() {
401 let tuner = make_tuner(10, 50, 3);
402 tuner.current_target.store(12, Ordering::Relaxed);
404 let decision = tuner.evaluate(&metrics(12, 12, 0));
405 assert!(
406 matches!(&decision, PoolSizingRecommendation::RecommendScaleDown { new_size, .. } if *new_size >= 10),
407 "must not go below min=10, got {decision:?}"
408 );
409 }
410
411 #[test]
412 fn test_evaluate_never_above_max() {
413 let tuner = make_tuner(5, 25, 3);
414 let decision = tuner.evaluate(&metrics(25, 0, 20));
416 assert_eq!(decision, PoolSizingRecommendation::Stable, "cannot scale above max");
417 }
418
419 #[test]
420 fn test_consecutive_samples_required_before_action() {
421 let tuner = PoolSizingAdvisor::new(PoolPressureMonitorConfig {
422 enabled: true,
423 min_pool_size: 5,
424 max_pool_size: 50,
425 target_queue_depth: 3,
426 scale_up_step: 5,
427 samples_before_action: 3,
428 ..Default::default()
429 });
430 let high_queue = metrics(20, 2, 8);
431 assert_eq!(tuner.evaluate(&high_queue), PoolSizingRecommendation::Stable);
432 assert_eq!(tuner.evaluate(&high_queue), PoolSizingRecommendation::Stable);
433 assert!(matches!(
434 tuner.evaluate(&high_queue),
435 PoolSizingRecommendation::RecommendScaleUp { .. }
436 ));
437 }
438
439 #[test]
440 fn test_auto_tuner_recommended_size_initialises_to_zero() {
441 let tuner = PoolSizingAdvisor::new(PoolPressureMonitorConfig::default());
442 assert_eq!(tuner.recommended_size(), 0);
443 }
444
445 #[test]
446 fn test_auto_tuner_adjustments_counter_starts_at_zero() {
447 let tuner = PoolSizingAdvisor::new(PoolPressureMonitorConfig::default());
448 assert_eq!(tuner.adjustments_total(), 0);
449 }
450
451 #[test]
452 fn test_adjustments_counter_increments_on_scale_up() {
453 let tuner = make_tuner(5, 50, 3);
454 tuner.evaluate(&metrics(20, 2, 8));
455 assert_eq!(tuner.adjustments_total(), 1);
456 }
457
458 #[test]
459 fn test_adjustments_counter_increments_on_scale_down() {
460 let tuner = make_tuner(5, 50, 3);
461 tuner.evaluate(&metrics(20, 18, 0));
462 assert_eq!(tuner.adjustments_total(), 1);
463 }
464
465 #[test]
466 fn test_recommended_size_updated_after_scale_up() {
467 let tuner = make_tuner(5, 50, 3);
468 tuner.evaluate(&metrics(20, 2, 8));
469 assert_eq!(tuner.recommended_size(), 25);
470 }
471
472 #[tokio::test]
473 async fn test_start_task_samples_at_interval() {
474 let config = PoolPressureMonitorConfig {
475 enabled: true,
476 tuning_interval_ms: 10,
477 samples_before_action: 100, ..Default::default()
479 };
480 let tuner = Arc::new(PoolSizingAdvisor::new(config));
481 let adapter = Arc::new(MockAdapter::with_metrics(metrics(10, 8, 0)));
482 let handle = PoolSizingAdvisor::start(tuner.clone(), adapter, None);
483 tokio::time::sleep(Duration::from_millis(50)).await;
484 assert!(!handle.is_finished());
486 handle.abort();
487 }
488}