oxirs-fuseki 0.2.4

SPARQL 1.1/1.2 HTTP protocol server with Fuseki-compatible configuration
Documentation
//! High-level `ConnectionPool` wrapping `AdaptivePool` + `BackpressureController`
//!
//! `ConnectionPool` provides a unified entry point for backend connection
//! management.  Callers interact with a single `acquire()` method that
//! transparently applies backpressure before delegating to the underlying
//! `AdaptivePool`.

use crate::error::{FusekiError, FusekiResult};
use crate::pool::adaptive_pool::{AdaptivePool, PoolConfig, PooledConnection};
use crate::pool::backpressure::{
    BackpressureConfig, BackpressureController, BackpressureDecision, BackpressureStats,
};

/// A lightweight snapshot of the pool's connection counts.
#[derive(Debug, Clone)]
pub struct PoolStats {
    /// Connections currently checked out
    pub active: u32,
    /// Connections available for immediate use
    pub idle: u32,
    /// Total connections (active + idle)
    pub total: u32,
}

/// High-level connection pool that combines an `AdaptivePool` with a
/// `BackpressureController`.
///
/// Before every `acquire()`, the backpressure controller evaluates the current
/// pool utilization.  If the pool is overloaded, callers receive an error
/// rather than waiting indefinitely.
pub struct ConnectionPool<C: Send + 'static> {
    inner: AdaptivePool<C>,
    backpressure: BackpressureController,
    backend_name: String,
}

impl<C: Send + 'static> ConnectionPool<C> {
    /// Create a new `ConnectionPool`.
    ///
    /// * `backend`    — logical name used in error messages and metrics
    /// * `pool`       — the underlying `AdaptivePool`
    /// * `bp_config`  — backpressure tuning parameters
    pub fn new(backend: &str, pool: AdaptivePool<C>, bp_config: BackpressureConfig) -> Self {
        ConnectionPool {
            inner: pool,
            backpressure: BackpressureController::new(bp_config),
            backend_name: backend.to_string(),
        }
    }

    /// Acquire a connection from the pool.
    ///
    /// Returns:
    /// - `Ok(conn)` — a live connection
    /// - `Err(FusekiError::ServiceUnavailable)` — backpressure rejected the request
    /// - `Err(...)` — pool error (timeout, factory error, etc.)
    pub fn acquire(&self) -> FusekiResult<PooledConnection<C>> {
        let stats = self.inner.stats();
        let utilization = stats.utilization;

        match self.backpressure.should_accept_request(utilization) {
            BackpressureDecision::Accept | BackpressureDecision::Queue => {
                // For Queue decisions we still attempt to acquire — the
                // BackpressureController has already incremented the queue
                // depth counter; we decrement it once we have a connection.
                let conn = self.inner.acquire();
                // If the request was queued, signal dequeue now
                if utilization >= self.backpressure_config_queue_threshold() {
                    self.backpressure.record_dequeue();
                }
                conn
            }
            BackpressureDecision::Reject { retry_after_ms } => {
                Err(FusekiError::ServiceUnavailable {
                    message: format!(
                        "Backend '{}' is overloaded; retry after {}ms",
                        self.backend_name, retry_after_ms
                    ),
                })
            }
        }
    }

    /// Statistics snapshot for the underlying `AdaptivePool`.
    pub fn pool_stats(&self) -> PoolStats {
        let s = self.inner.stats();
        PoolStats {
            active: s.active_connections as u32,
            idle: s.idle_connections as u32,
            total: s.total_connections as u32,
        }
    }

    /// Statistics snapshot for the `BackpressureController`.
    pub fn backpressure_stats(&self) -> BackpressureStats {
        self.backpressure.stats()
    }

    /// The logical backend name this pool serves.
    pub fn backend_name(&self) -> &str {
        &self.backend_name
    }

    // ── private ───────────────────────────────────────────────────────────────

    /// Returns the configured queue threshold from the backpressure controller.
    /// We keep this as a method to avoid storing a redundant copy of the config.
    fn backpressure_config_queue_threshold(&self) -> f64 {
        // The BackpressureController doesn't expose its config directly;
        // we query the queue_depth to determine whether this was a Queue decision.
        // However, the simplest approach here is to check if queue_depth > 0 after
        // the decision was made — but since we can't read the pre-decision depth
        // without more refactoring, we approximate by checking stats.
        // A value of 0.70 matches the default; for correctness we use a
        // conservative check: only dequeue if queue_depth > 0.
        let depth = self.backpressure.current_queue_depth();
        if depth > 0 {
            0.70
        } else {
            1.1
        } // 1.1 ensures the dequeue branch is skipped
    }
}

// ──────────────────────────────────────────────────────────────────────────────
// Tests
// ──────────────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use crate::pool::adaptive_pool::PoolConfig;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::sync::Arc;
    use std::time::Duration;

    struct DummyConn {
        id: usize,
    }

    fn make_pool(min: usize, max: usize) -> FusekiResult<AdaptivePool<DummyConn>> {
        let counter = Arc::new(AtomicUsize::new(0));
        AdaptivePool::new(
            PoolConfig {
                min_connections: min,
                max_connections: max,
                acquire_timeout: Duration::from_millis(200),
                idle_timeout: Duration::from_secs(300),
                max_lifetime: Duration::from_secs(3600),
                target_utilization: 0.7,
                resize_interval: Duration::from_secs(60),
            },
            move || {
                Ok(DummyConn {
                    id: counter.fetch_add(1, Ordering::Relaxed),
                })
            },
        )
    }

    fn make_accepting_cp(min: usize, max: usize) -> ConnectionPool<DummyConn> {
        let pool = make_pool(min, max).unwrap();
        // Accept threshold > 1.0 so all requests are accepted
        let bp = BackpressureConfig {
            queue_threshold: 1.1,
            reject_threshold: 1.1,
            max_queue_depth: 1000,
            base_retry_after_ms: 100,
        };
        ConnectionPool::new("test_backend", pool, bp)
    }

    fn make_rejecting_cp(min: usize, max: usize) -> ConnectionPool<DummyConn> {
        let pool = make_pool(min, max).unwrap();
        // Reject threshold = 0.0 so all requests are rejected
        let bp = BackpressureConfig {
            queue_threshold: 0.0,
            reject_threshold: 0.0,
            max_queue_depth: 1000,
            base_retry_after_ms: 50,
        };
        ConnectionPool::new("overloaded_backend", pool, bp)
    }

    // ── ConnectionPool: basic ops ─────────────────────────────────────────────

    #[test]
    fn test_connection_pool_acquire_ok() {
        let cp = make_accepting_cp(2, 10);
        let conn = cp.acquire();
        assert!(conn.is_ok(), "Should acquire successfully");
    }

    #[test]
    fn test_connection_pool_backend_name() {
        let cp = make_accepting_cp(1, 5);
        assert_eq!(cp.backend_name(), "test_backend");
    }

    #[test]
    fn test_connection_pool_pool_stats_initial() {
        let cp = make_accepting_cp(2, 10);
        let stats = cp.pool_stats();
        assert_eq!(stats.total, 2);
        assert_eq!(stats.active, 0);
        assert_eq!(stats.idle, 2);
    }

    #[test]
    fn test_connection_pool_pool_stats_active_increments() {
        let cp = make_accepting_cp(2, 10);
        let _conn = cp.acquire().unwrap();
        let stats = cp.pool_stats();
        assert_eq!(stats.active, 1);
    }

    #[test]
    fn test_connection_pool_pool_stats_active_decrements_on_drop() {
        let cp = make_accepting_cp(2, 10);
        {
            let _conn = cp.acquire().unwrap();
            assert_eq!(cp.pool_stats().active, 1);
        }
        assert_eq!(cp.pool_stats().active, 0);
    }

    // ── ConnectionPool: backpressure rejection ────────────────────────────────

    #[test]
    fn test_connection_pool_rejected_when_overloaded() {
        let cp = make_rejecting_cp(2, 10);
        let result = cp.acquire();
        assert!(result.is_err(), "Should be rejected due to backpressure");
        match result {
            Err(FusekiError::ServiceUnavailable { .. }) => {}
            Err(other) => panic!("Expected ServiceUnavailable, got error: {}", other),
            Ok(_) => panic!("Expected error but got Ok"),
        }
    }

    #[test]
    fn test_connection_pool_error_message_contains_backend_name() {
        let cp = make_rejecting_cp(2, 10);
        match cp.acquire() {
            Err(FusekiError::ServiceUnavailable { message }) => {
                assert!(
                    message.contains("overloaded_backend"),
                    "Error message should mention backend name"
                );
            }
            Err(other) => panic!("Expected ServiceUnavailable, got error: {}", other),
            Ok(_) => panic!("Expected error but got Ok"),
        }
    }

    // ── BackpressureController standalone tests ───────────────────────────────

    #[test]
    fn test_backpressure_accept_at_low_util() {
        let ctrl = BackpressureController::new(BackpressureConfig::default());
        assert_eq!(
            ctrl.should_accept_request(0.0),
            BackpressureDecision::Accept
        );
    }

    #[test]
    fn test_backpressure_queue_at_medium_util() {
        let ctrl = BackpressureController::new(BackpressureConfig::default());
        assert_eq!(
            ctrl.should_accept_request(0.75),
            BackpressureDecision::Queue
        );
    }

    #[test]
    fn test_backpressure_reject_at_high_util() {
        let ctrl = BackpressureController::new(BackpressureConfig::default());
        match ctrl.should_accept_request(0.95) {
            BackpressureDecision::Reject { .. } => {}
            other => panic!("Expected Reject, got {:?}", other),
        }
    }

    #[test]
    fn test_backpressure_stats_after_decisions() {
        let ctrl = BackpressureController::new(BackpressureConfig::default());
        ctrl.should_accept_request(0.0); // Accept
        ctrl.should_accept_request(0.75); // Queue
        ctrl.should_accept_request(0.95); // Reject

        let stats = ctrl.stats();
        assert_eq!(stats.total_accepted, 1);
        assert_eq!(stats.total_queued, 1);
        assert_eq!(stats.total_rejected, 1);
    }

    // ── ConnectionPool: backpressure stats ───────────────────────────────────

    #[test]
    fn test_connection_pool_backpressure_stats_accessible() {
        let cp = make_accepting_cp(2, 10);
        let _conn = cp.acquire().unwrap();
        let bp_stats = cp.backpressure_stats();
        assert_eq!(bp_stats.total_rejected, 0);
        // At least one Accept or Queue decision was made
        assert!(bp_stats.total_accepted + bp_stats.total_queued >= 1);
    }

    // ── PoolStats ────────────────────────────────────────────────────────────

    #[test]
    fn test_pool_stats_idle_plus_active_equals_total() {
        let cp = make_accepting_cp(3, 10);
        let stats = cp.pool_stats();
        assert_eq!(stats.idle + stats.active, stats.total);
    }

    #[test]
    fn test_pool_stats_after_multiple_acquires() {
        let cp = make_accepting_cp(4, 10);
        let _c1 = cp.acquire().unwrap();
        let _c2 = cp.acquire().unwrap();
        let stats = cp.pool_stats();
        assert_eq!(stats.active, 2);
    }

    // ── ConnectionPool: multiple backends ────────────────────────────────────

    #[test]
    fn test_multiple_pools_independent() {
        let cp1 = make_accepting_cp(1, 5);
        let cp2 = make_accepting_cp(1, 5);

        let _c1 = cp1.acquire().unwrap();
        // cp2 should be unaffected by cp1's activity
        let stats2 = cp2.pool_stats();
        assert_eq!(stats2.active, 0, "cp2 should have no active connections");
    }
}