Skip to main content

fraiseql_server/pool/
auto_tuner.rs

1//! Adaptive connection pool auto-tuner.
2//!
3//! Monitors connection pool health via [`PoolMetrics`] and either resizes the
4//! pool or emits a recommended size when the queue depth or idle ratio crosses
5//! configured thresholds.
6
7use std::{
8    sync::{
9        Arc,
10        atomic::{AtomicU32, AtomicU64, Ordering},
11    },
12    time::Duration,
13};
14
15use fraiseql_core::db::{traits::DatabaseAdapter, types::PoolMetrics};
16
17use crate::config::pool_tuning::PoolPressureMonitorConfig;
18
19/// Recommendation produced by [`PoolSizingAdvisor::evaluate`].
20///
21/// `RecommendScaleUp` and `RecommendScaleDown` are *recommendations*, not executed actions.
22/// Whether they are applied depends on whether a `resize_fn` was configured in
23/// [`PoolSizingAdvisor::start`]. Without a `resize_fn`, decisions are logged at INFO level
24/// only — no actual pool resize occurs.
25#[derive(Debug, PartialEq, Eq)]
26#[non_exhaustive]
27pub enum PoolSizingRecommendation {
28    /// Pool size is appropriate. No action needed.
29    Stable,
30    /// Pool should be grown to `new_size` connections.
31    ///
32    /// Applied only if a `resize_fn` was configured; otherwise logged as an advisory.
33    RecommendScaleUp {
34        /// New target pool size.
35        new_size: u32,
36        /// Human-readable reason for the recommendation.
37        reason:   String,
38    },
39    /// Pool should be shrunk to `new_size` connections.
40    ///
41    /// Applied only if a `resize_fn` was configured; otherwise logged as an advisory.
42    RecommendScaleDown {
43        /// New target pool size.
44        new_size: u32,
45        /// Human-readable reason for the recommendation.
46        reason:   String,
47    },
48}
49
50/// Connection pool pressure monitor with scaling recommendations.
51///
52/// Call [`PoolSizingAdvisor::evaluate`] with current [`PoolMetrics`] to get a
53/// [`PoolSizingRecommendation`], or call [`PoolSizingAdvisor::start`] to launch a
54/// background task that polls the adapter automatically.
55///
56/// **Note**: This monitor operates in recommendation mode only. The pool is not
57/// resized at runtime — act on `fraiseql_pool_tuning_*` events by adjusting
58/// `max_connections` in `fraiseql.toml` and restarting the server.
59pub struct PoolSizingAdvisor {
60    /// Pressure monitoring configuration.
61    pub(crate) config:  PoolPressureMonitorConfig,
62    /// Consecutive samples with high queue depth.
63    high_queue_samples: AtomicU32,
64    /// Consecutive samples with high idle ratio.
65    low_idle_samples:   AtomicU32,
66    /// Total resize operations applied or recommended.
67    adjustments_total:  AtomicU64,
68    /// Current recommended/actual target pool size (0 = not yet sampled).
69    current_target:     AtomicU32,
70}
71
72impl PoolSizingAdvisor {
73    /// Create a new pool pressure monitor with the given configuration.
74    pub const fn new(config: PoolPressureMonitorConfig) -> Self {
75        Self {
76            config,
77            high_queue_samples: AtomicU32::new(0),
78            low_idle_samples: AtomicU32::new(0),
79            adjustments_total: AtomicU64::new(0),
80            current_target: AtomicU32::new(0),
81        }
82    }
83
84    /// Evaluate current pool metrics and return a scaling decision.
85    ///
86    /// This method is pure computation — no I/O, no async.  It updates internal
87    /// sample counters so consecutive calls with the same condition accumulate
88    /// toward `samples_before_action`.
89    pub fn evaluate(&self, metrics: &PoolMetrics) -> PoolSizingRecommendation {
90        let current = self.current_size(metrics);
91        let min = self.config.min_pool_size;
92        let max = self.config.max_pool_size;
93
94        // ── Scale-up check ──────────────────────────────────────────────────
95        if metrics.waiting_requests > self.config.target_queue_depth {
96            let count = self.high_queue_samples.fetch_add(1, Ordering::Relaxed) + 1;
97            self.low_idle_samples.store(0, Ordering::Relaxed);
98
99            if count >= self.config.samples_before_action {
100                let desired = (current + self.config.scale_up_step).min(max);
101                if desired > current {
102                    self.high_queue_samples.store(0, Ordering::Relaxed);
103                    self.adjustments_total.fetch_add(1, Ordering::Relaxed);
104                    self.current_target.store(desired, Ordering::Relaxed);
105                    return PoolSizingRecommendation::RecommendScaleUp {
106                        new_size: desired,
107                        reason:   format!(
108                            "{} requests waiting (threshold {}); grown by {}",
109                            metrics.waiting_requests,
110                            self.config.target_queue_depth,
111                            self.config.scale_up_step,
112                        ),
113                    };
114                }
115                // Already at max — reset and stay stable
116                self.high_queue_samples.store(0, Ordering::Relaxed);
117            }
118            return PoolSizingRecommendation::Stable;
119        }
120
121        self.high_queue_samples.store(0, Ordering::Relaxed);
122
123        // ── Scale-down check ─────────────────────────────────────────────────
124        if current > min && metrics.total_connections > 0 {
125            let idle_ratio =
126                f64::from(metrics.idle_connections) / f64::from(metrics.total_connections);
127
128            if idle_ratio > self.config.scale_down_idle_ratio && metrics.waiting_requests == 0 {
129                let count = self.low_idle_samples.fetch_add(1, Ordering::Relaxed) + 1;
130
131                if count >= self.config.samples_before_action {
132                    let desired = current.saturating_sub(self.config.scale_down_step).max(min);
133                    self.low_idle_samples.store(0, Ordering::Relaxed);
134                    self.adjustments_total.fetch_add(1, Ordering::Relaxed);
135                    self.current_target.store(desired, Ordering::Relaxed);
136                    return PoolSizingRecommendation::RecommendScaleDown {
137                        new_size: desired,
138                        reason:   format!(
139                            "idle ratio {:.0}% > {:.0}% threshold; shrunk by {}",
140                            idle_ratio * 100.0,
141                            self.config.scale_down_idle_ratio * 100.0,
142                            self.config.scale_down_step,
143                        ),
144                    };
145                }
146                return PoolSizingRecommendation::Stable;
147            }
148        }
149
150        self.low_idle_samples.store(0, Ordering::Relaxed);
151        PoolSizingRecommendation::Stable
152    }
153
154    /// Total number of resize operations applied or recommended.
155    pub fn adjustments_total(&self) -> u64 {
156        self.adjustments_total.load(Ordering::Relaxed)
157    }
158
159    /// Current recommended pool size (0 = not yet sampled).
160    pub fn recommended_size(&self) -> u32 {
161        self.current_target.load(Ordering::Relaxed)
162    }
163
164    /// Start a background polling task.
165    ///
166    /// The task samples `adapter.pool_metrics()` every `tuning_interval_ms`
167    /// milliseconds and calls [`Self::evaluate`].  If `resize_fn` is
168    /// provided, it is called with the new pool size whenever a resize is
169    /// decided.  If `resize_fn` is `None`, the tuner operates in
170    /// **recommendation mode**: it updates `recommended_size` and logs a
171    /// warning without modifying the pool.
172    ///
173    /// Returns a [`tokio::task::JoinHandle`] that can be aborted for shutdown.
174    pub fn start<A: DatabaseAdapter + 'static>(
175        self: Arc<Self>,
176        adapter: Arc<A>,
177        resize_fn: Option<Arc<dyn Fn(usize) + Send + Sync>>,
178    ) -> tokio::task::JoinHandle<()> {
179        let interval_ms = self.config.tuning_interval_ms;
180
181        tokio::spawn(async move {
182            tracing::debug!(
183                "Pool pressure monitoring enabled (recommendation mode). \
184                 The pool cannot be resized at runtime; act on \
185                 fraiseql_pool_scaling_recommended events by adjusting \
186                 max_connections and restarting."
187            );
188            if resize_fn.is_none() {
189                tracing::debug!(
190                    "No resize_fn configured — pool pressure monitor is in \
191                     pure advisory mode. Recommendations will appear in \
192                     fraiseql_pool_tuning_* metrics and WARN log lines."
193                );
194            }
195
196            let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms.max(1)));
197            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
198
199            loop {
200                ticker.tick().await;
201                let metrics = adapter.pool_metrics();
202
203                match self.evaluate(&metrics) {
204                    PoolSizingRecommendation::Stable => {},
205                    PoolSizingRecommendation::RecommendScaleUp {
206                        new_size,
207                        ref reason,
208                    } => {
209                        if let Some(ref f) = resize_fn {
210                            tracing::info!(
211                                new_size,
212                                reason = reason.as_str(),
213                                "Pool auto-tuner: scaling up"
214                            );
215                            f(new_size as usize);
216                        } else {
217                            tracing::warn!(
218                                new_size,
219                                reason = reason.as_str(),
220                                "Pool auto-tuner recommends scaling up \
221                                 (resize not available — configure resize_fn)"
222                            );
223                        }
224                    },
225                    PoolSizingRecommendation::RecommendScaleDown {
226                        new_size,
227                        ref reason,
228                    } => {
229                        if let Some(ref f) = resize_fn {
230                            tracing::info!(
231                                new_size,
232                                reason = reason.as_str(),
233                                "Pool auto-tuner: scaling down"
234                            );
235                            f(new_size as usize);
236                        } else {
237                            tracing::warn!(
238                                new_size,
239                                reason = reason.as_str(),
240                                "Pool auto-tuner recommends scaling down \
241                                 (resize not available — configure resize_fn)"
242                            );
243                        }
244                    },
245                }
246            }
247        })
248    }
249
250    /// Current pool size from metrics, falling back to `min_pool_size`.
251    fn current_size(&self, metrics: &PoolMetrics) -> u32 {
252        let recorded = self.current_target.load(Ordering::Relaxed);
253        if recorded > 0 {
254            recorded
255        } else if metrics.total_connections > 0 {
256            metrics.total_connections
257        } else {
258            self.config.min_pool_size
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
266    #![allow(clippy::cast_precision_loss)] // Reason: test metrics reporting
267    #![allow(clippy::cast_sign_loss)] // Reason: test data uses small positive integers
268    #![allow(clippy::cast_possible_truncation)] // Reason: test data values are bounded
269    #![allow(clippy::cast_possible_wrap)] // Reason: test data values are bounded
270    #![allow(clippy::missing_panics_doc)] // Reason: test helpers
271    #![allow(clippy::missing_errors_doc)] // Reason: test helpers
272    #![allow(missing_docs)] // Reason: test code
273    #![allow(clippy::items_after_statements)] // Reason: test helpers defined near use site
274
275    use async_trait::async_trait;
276    use fraiseql_core::db::{
277        WhereClause,
278        types::{DatabaseType, JsonbValue, OrderByClause},
279    };
280    use fraiseql_error::Result as FraiseQLResult;
281
282    use super::*;
283
284    // Minimal mock adapter for tests — no database required.
285    struct MockAdapter {
286        metrics: PoolMetrics,
287    }
288
289    impl MockAdapter {
290        fn with_metrics(metrics: PoolMetrics) -> Self {
291            Self { metrics }
292        }
293    }
294
295    // Reason: DatabaseAdapter is defined with #[async_trait]; all implementations must match
296    // its transformed method signatures to satisfy the trait contract
297    // async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
298    #[async_trait]
299    impl DatabaseAdapter for MockAdapter {
300        async fn execute_where_query(
301            &self,
302            _view: &str,
303            _where_clause: Option<&WhereClause>,
304            _limit: Option<u32>,
305            _offset: Option<u32>,
306            _order_by: Option<&[OrderByClause]>,
307        ) -> FraiseQLResult<Vec<JsonbValue>> {
308            Ok(vec![])
309        }
310
311        async fn execute_with_projection(
312            &self,
313            _view: &str,
314            _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
315            _where_clause: Option<&WhereClause>,
316            _limit: Option<u32>,
317            _offset: Option<u32>,
318            _order_by: Option<&[OrderByClause]>,
319        ) -> FraiseQLResult<Vec<JsonbValue>> {
320            Ok(vec![])
321        }
322
323        fn database_type(&self) -> DatabaseType {
324            DatabaseType::SQLite
325        }
326
327        async fn health_check(&self) -> FraiseQLResult<()> {
328            Ok(())
329        }
330
331        fn pool_metrics(&self) -> PoolMetrics {
332            self.metrics
333        }
334
335        async fn execute_raw_query(
336            &self,
337            _sql: &str,
338        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
339            Ok(vec![])
340        }
341
342        async fn execute_parameterized_aggregate(
343            &self,
344            _sql: &str,
345            _params: &[serde_json::Value],
346        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
347            Ok(vec![])
348        }
349    }
350
351    fn make_tuner(min: u32, max: u32, target_queue: u32) -> PoolSizingAdvisor {
352        PoolSizingAdvisor::new(PoolPressureMonitorConfig {
353            enabled: true,
354            min_pool_size: min,
355            max_pool_size: max,
356            target_queue_depth: target_queue,
357            samples_before_action: 1,
358            ..Default::default()
359        })
360    }
361
362    fn metrics(total: u32, idle: u32, waiting: u32) -> PoolMetrics {
363        PoolMetrics {
364            total_connections:  total,
365            idle_connections:   idle,
366            active_connections: total.saturating_sub(idle),
367            waiting_requests:   waiting,
368        }
369    }
370
371    #[test]
372    fn test_evaluate_stable_when_queue_low_and_idle_at_threshold() {
373        // 10/20 = 50% idle — exactly at threshold, NOT above it → Stable
374        let tuner = make_tuner(5, 50, 3);
375        assert_eq!(tuner.evaluate(&metrics(20, 10, 0)), PoolSizingRecommendation::Stable);
376    }
377
378    #[test]
379    fn test_evaluate_scale_up_when_queue_exceeds_target() {
380        let tuner = make_tuner(5, 50, 3);
381        let decision = tuner.evaluate(&metrics(20, 2, 8));
382        assert!(
383            matches!(&decision, PoolSizingRecommendation::RecommendScaleUp { new_size, .. } if *new_size == 25),
384            "expected ScaleUp to 25, got {decision:?}"
385        );
386    }
387
388    #[test]
389    fn test_evaluate_scale_down_when_idle_ratio_high() {
390        let tuner = make_tuner(5, 50, 3);
391        // 18/20 = 0.9 idle ratio > 0.5 threshold
392        let decision = tuner.evaluate(&metrics(20, 18, 0));
393        assert!(
394            matches!(&decision, PoolSizingRecommendation::RecommendScaleDown { new_size, .. } if *new_size == 18),
395            "expected ScaleDown to 18, got {decision:?}"
396        );
397    }
398
399    #[test]
400    fn test_evaluate_never_below_min() {
401        let tuner = make_tuner(10, 50, 3);
402        // Force initial current_target to 12
403        tuner.current_target.store(12, Ordering::Relaxed);
404        let decision = tuner.evaluate(&metrics(12, 12, 0));
405        assert!(
406            matches!(&decision, PoolSizingRecommendation::RecommendScaleDown { new_size, .. } if *new_size >= 10),
407            "must not go below min=10, got {decision:?}"
408        );
409    }
410
411    #[test]
412    fn test_evaluate_never_above_max() {
413        let tuner = make_tuner(5, 25, 3);
414        // Pool already at max with high queue
415        let decision = tuner.evaluate(&metrics(25, 0, 20));
416        assert_eq!(decision, PoolSizingRecommendation::Stable, "cannot scale above max");
417    }
418
419    #[test]
420    fn test_consecutive_samples_required_before_action() {
421        let tuner = PoolSizingAdvisor::new(PoolPressureMonitorConfig {
422            enabled: true,
423            min_pool_size: 5,
424            max_pool_size: 50,
425            target_queue_depth: 3,
426            scale_up_step: 5,
427            samples_before_action: 3,
428            ..Default::default()
429        });
430        let high_queue = metrics(20, 2, 8);
431        assert_eq!(tuner.evaluate(&high_queue), PoolSizingRecommendation::Stable);
432        assert_eq!(tuner.evaluate(&high_queue), PoolSizingRecommendation::Stable);
433        assert!(matches!(
434            tuner.evaluate(&high_queue),
435            PoolSizingRecommendation::RecommendScaleUp { .. }
436        ));
437    }
438
439    #[test]
440    fn test_auto_tuner_recommended_size_initialises_to_zero() {
441        let tuner = PoolSizingAdvisor::new(PoolPressureMonitorConfig::default());
442        assert_eq!(tuner.recommended_size(), 0);
443    }
444
445    #[test]
446    fn test_auto_tuner_adjustments_counter_starts_at_zero() {
447        let tuner = PoolSizingAdvisor::new(PoolPressureMonitorConfig::default());
448        assert_eq!(tuner.adjustments_total(), 0);
449    }
450
451    #[test]
452    fn test_adjustments_counter_increments_on_scale_up() {
453        let tuner = make_tuner(5, 50, 3);
454        tuner.evaluate(&metrics(20, 2, 8));
455        assert_eq!(tuner.adjustments_total(), 1);
456    }
457
458    #[test]
459    fn test_adjustments_counter_increments_on_scale_down() {
460        let tuner = make_tuner(5, 50, 3);
461        tuner.evaluate(&metrics(20, 18, 0));
462        assert_eq!(tuner.adjustments_total(), 1);
463    }
464
465    #[test]
466    fn test_recommended_size_updated_after_scale_up() {
467        let tuner = make_tuner(5, 50, 3);
468        tuner.evaluate(&metrics(20, 2, 8));
469        assert_eq!(tuner.recommended_size(), 25);
470    }
471
472    #[tokio::test]
473    async fn test_start_task_samples_at_interval() {
474        let config = PoolPressureMonitorConfig {
475            enabled: true,
476            tuning_interval_ms: 10,
477            samples_before_action: 100, // never actually act
478            ..Default::default()
479        };
480        let tuner = Arc::new(PoolSizingAdvisor::new(config));
481        let adapter = Arc::new(MockAdapter::with_metrics(metrics(10, 8, 0)));
482        let handle = PoolSizingAdvisor::start(tuner.clone(), adapter, None);
483        tokio::time::sleep(Duration::from_millis(50)).await;
484        // Not crashing and handle is alive = success
485        assert!(!handle.is_finished());
486        handle.abort();
487    }
488}