do-memory-storage-turso 0.1.26

Turso/libSQL storage backend for the do-memory-core episodic learning system
Documentation
//! Keep-Alive Connection Pool for Turso
//!
//! This module provides connection keep-alive functionality to reduce connection
//! overhead from 45ms to ~5ms by maintaining active connections and refreshing
//! them proactively before they become stale.

use do_memory_core::{Error, Result};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};

// Submodules
pub mod config;
pub mod connection;
mod monitoring;

#[cfg(test)]
mod tests;

// Re-export public items
pub use config::{KeepAliveConfig, KeepAliveStatistics};
pub use connection::KeepAliveConnection;

// Import from parent pool module
use crate::pool::{ConnectionPool, PoolStatistics, PooledConnection};

/// Keep-Alive Connection Pool
///
/// This pool wraps the existing ConnectionPool and adds keep-alive functionality
/// to reduce connection overhead. It tracks connection usage and proactively
/// refreshes stale connections to avoid TLS handshake overhead.
pub struct KeepAlivePool {
    /// The underlying connection pool
    pool: Arc<ConnectionPool>,
    /// Configuration for keep-alive behavior
    config: KeepAliveConfig,
    /// Track last used time per connection (by connection ID)
    last_used: RwLock<HashMap<usize, Instant>>,
    /// Statistics for monitoring
    stats: Arc<RwLock<KeepAliveStatistics>>,
    /// Next connection ID to assign
    next_conn_id: RwLock<usize>,
    /// Background task handle for cleanup (stored for drop)
    _cleanup_handle: tokio::task::JoinHandle<()>,
}

impl KeepAlivePool {
    /// Create a new keep-alive pool from an existing connection pool
    pub async fn new(pool: Arc<ConnectionPool>, config: Option<KeepAliveConfig>) -> Result<Self> {
        let config = config.unwrap_or_default();

        info!(
            "Creating keep-alive pool with interval={:?}, stale_threshold={:?}",
            config.keep_alive_interval, config.stale_threshold
        );

        let stats = Arc::new(RwLock::new(KeepAliveStatistics::default()));
        let last_used = RwLock::new(HashMap::new());
        let next_conn_id = RwLock::new(0);

        let pool_instance = Self {
            pool: Arc::clone(&pool),
            config: config.clone(),
            last_used,
            stats: Arc::clone(&stats),
            next_conn_id,
            _cleanup_handle: tokio::spawn(async move {
                // Keep-alive cleanup task is handled externally
            }),
        };

        // Validate the pool works
        let _ = pool
            .get()
            .await
            .map_err(|e| Error::Storage(format!("Failed to validate connection pool: {}", e)))?;

        info!("Keep-alive pool created successfully");
        Ok(pool_instance)
    }

    /// Create with custom configuration
    pub async fn with_config(pool: Arc<ConnectionPool>, config: KeepAliveConfig) -> Result<Self> {
        Self::new(pool, Some(config)).await
    }

    /// Get a connection with keep-alive tracking
    pub async fn get(&self) -> Result<KeepAliveConnection> {
        let start = Instant::now();

        // Get connection from underlying pool
        let pooled = self.pool.get().await?;
        let conn_id = {
            let mut next_id = self.next_conn_id.write();
            let id = *next_id;
            *next_id += 1;
            id
        };

        let now = Instant::now();

        // Check if this connection ID was previously used and is stale
        let was_stale = {
            let last_used_map = self.last_used.read();
            if let Some(last_used_time) = last_used_map.get(&conn_id) {
                let elapsed = now.duration_since(*last_used_time);
                elapsed > self.config.stale_threshold
            } else {
                false
            }
        };

        // Update statistics
        {
            let mut stats = self.stats.write();
            stats.total_connections_created += 1;
            stats.active_connections += 1;
            stats.update_activity();
        }

        // Track the connection
        {
            let mut last_used_map = self.last_used.write();
            last_used_map.insert(conn_id, now);
        }

        // If the connection was stale, refresh it
        if was_stale {
            self.refresh_connection(conn_id, &pooled).await?;
        }

        let elapsed = start.elapsed();
        debug!(
            "Keep-alive connection acquired (id={}, stale={}, elapsed={:?})",
            conn_id, was_stale, elapsed
        );

        // Create a shared reference to stats for the connection
        let stats_ref = Arc::clone(&self.stats);

        Ok(KeepAliveConnection::new(pooled, conn_id, now, stats_ref))
    }

    /// Check if a connection is stale
    pub fn is_stale(&self, conn_id: usize) -> bool {
        let last_used_map = self.last_used.read();
        if let Some(last_used_time) = last_used_map.get(&conn_id) {
            Instant::now().duration_since(*last_used_time) > self.config.stale_threshold
        } else {
            true
        }
    }

    /// Refresh a stale connection
    async fn refresh_connection(&self, conn_id: usize, pooled: &PooledConnection) -> Result<()> {
        debug!("Refreshing stale connection {}", conn_id);

        {
            let mut stats = self.stats.write();
            stats.total_connections_refreshed += 1;
            stats.total_stale_detected += 1;
        }

        if self.config.enable_proactive_ping {
            if let Err(e) = self.ping_connection(pooled).await {
                let mut stats = self.stats.write();
                stats.total_ping_failures += 1;

                warn!(
                    "Ping failed for connection {}, may need refresh: {}",
                    conn_id, e
                );
            } else {
                let mut stats = self.stats.write();
                stats.total_proactive_pings += 1;
            }
        }

        {
            let mut last_used_map = self.last_used.write();
            last_used_map.insert(conn_id, Instant::now());
        }

        Ok(())
    }

    /// Ping a connection to verify it's still alive
    async fn ping_connection(&self, pooled: &PooledConnection) -> Result<()> {
        if let Some(conn) = pooled.connection() {
            tokio::time::timeout(self.config.ping_timeout, conn.query("SELECT 1", ()))
                .await
                .map_err(|_| Error::Storage("Ping timeout".to_string()))?
                .map_err(|e| Error::Storage(format!("Ping failed: {}", e)))?;

            Ok(())
        } else {
            Err(Error::Storage("Connection not available".to_string()))
        }
    }

    /// Get current statistics
    pub fn statistics(&self) -> KeepAliveStatistics {
        self.stats.read().clone()
    }

    /// Get underlying pool statistics
    pub async fn pool_statistics(&self) -> PoolStatistics {
        self.pool.statistics().await
    }

    /// Get the configuration
    pub fn config(&self) -> &KeepAliveConfig {
        &self.config
    }

    /// Get the number of active connections
    pub fn active_connections(&self) -> usize {
        self.stats.read().active_connections
    }

    /// Get connection count by tracking map size
    pub fn tracked_connections(&self) -> usize {
        self.last_used.read().len()
    }

    /// Gracefully shutdown the pool
    pub async fn shutdown(&self) {
        info!("Shutting down keep-alive pool");

        let timeout = Duration::from_secs(30);
        let start = Instant::now();

        while start.elapsed() < timeout {
            {
                let stats = self.stats.read();
                if stats.active_connections == 0 {
                    break;
                }
            }
            tokio::time::sleep(Duration::from_millis(100)).await;
        }

        let stats = self.stats.read();
        if stats.active_connections > 0 {
            warn!(
                "Keep-alive pool shutdown with {} active connections",
                stats.active_connections
            );
        } else {
            info!("Keep-alive pool shutdown complete");
        }
    }
}