do-memory-storage-turso 0.1.31

Turso/libSQL storage backend for the do-memory-core episodic learning system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
//! True connection pool with stable IDs for prepared statement caching
//!
//! This pool maintains actual reusable connections with stable IDs,
//! enabling effective prepared statement caching.

use super::connection_wrapper::PooledConnection;
use do_memory_core::{Error, Result};
use libsql::Database;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tracing::{debug, info};

/// Configuration for the caching-aware connection pool
#[derive(Debug, Clone)]
pub struct CachingPoolConfig {
    /// Maximum number of connections to maintain
    pub max_connections: usize,
    /// Minimum number of connections to maintain
    pub min_connections: usize,
    /// Maximum time to wait for a connection
    pub connection_timeout: Duration,
    /// Maximum idle time before a connection is eligible for eviction
    pub max_idle_time: Duration,
    /// Maximum connection age before forcing recreation
    pub max_connection_age: Duration,
    /// Enable connection health validation
    pub enable_health_check: bool,
}

impl Default for CachingPoolConfig {
    fn default() -> Self {
        Self {
            max_connections: 10,
            min_connections: 2,
            connection_timeout: Duration::from_secs(5),
            max_idle_time: Duration::from_secs(300),
            max_connection_age: Duration::from_secs(3600),
            enable_health_check: true,
        }
    }
}

/// Statistics for the caching pool
#[derive(Debug, Default, Clone)]
pub struct CachingPoolStats {
    /// Total connections created
    pub total_created: u64,
    /// Total connections checked out
    pub total_checkouts: u64,
    /// Total connections returned
    pub total_returns: u64,
    /// Cache hits (reused connections)
    pub cache_hits: u64,
    /// Cache misses (new connections created)
    pub cache_misses: u64,
    /// Current active connections (checked out)
    pub active_connections: usize,
    /// Current idle connections (available in pool)
    pub idle_connections: usize,
    /// Connections evicted due to age/idle time
    pub evictions: u64,
}

/// A connection pool that maintains reusable connections with stable IDs
///
/// # Architecture
///
/// ```text
/// CachingPool {
///     db: Arc<Database>,
///     idle_connections: Vec<PooledConnection>,  // Available connections
///     active_connections: HashSet<u64>,          // IDs of checked-out connections
///     semaphore: Semaphore,                      // Limits concurrent checkouts
///     cleanup_callback: Arc<dyn Fn(u64)>,       // Called on connection drop
/// }
/// ```
///
/// # Connection Lifecycle
///
/// 1. **Creation**: Connection created with unique stable ID
/// 2. **Checkout**: Connection taken from idle pool or created new
/// 3. **Return**: Connection returned to idle pool (not destroyed)
/// 4. **Eviction**: Old/idle connections destroyed periodically
/// 5. **Drop**: Cleanup callback invoked to clear prepared statement cache
pub struct CachingPool {
    db: Arc<Database>,
    config: CachingPoolConfig,
    idle_connections: Mutex<Vec<PooledConnection>>,
    active_connection_ids: Mutex<std::collections::HashSet<u64>>,
    semaphore: Arc<Semaphore>,
    stats: Mutex<CachingPoolStats>,
    cleanup_callback: Mutex<Option<Arc<dyn Fn(u64) + Send + Sync>>>,
}

impl CachingPool {
    /// Create a new caching-aware connection pool
    ///
    /// # Arguments
    ///
    /// * `db` - The libsql database
    /// * `config` - Pool configuration
    ///
    /// # Errors
    ///
    /// Returns error if database validation fails
    pub async fn new(db: Arc<Database>, config: CachingPoolConfig) -> Result<Self> {
        info!(
            "Creating caching pool: min={}, max={}",
            config.min_connections, config.max_connections
        );

        // Validate database connectivity
        let conn = db
            .connect()
            .map_err(|e| Error::Storage(format!("Failed to connect: {}", e)))?;
        conn.query("SELECT 1", ())
            .await
            .map_err(|e| Error::Storage(format!("Database validation failed: {}", e)))?;

        let semaphore = Arc::new(Semaphore::new(config.max_connections));
        let idle_connections = Mutex::new(Vec::new());
        let active_connection_ids = Mutex::new(std::collections::HashSet::new());
        let stats = Mutex::new(CachingPoolStats::default());

        let pool = Self {
            db,
            config,
            idle_connections,
            active_connection_ids,
            semaphore,
            stats,
            cleanup_callback: Mutex::new(None),
        };

        // Pre-create minimum connections
        pool.pre_create_connections().await?;

        info!("Caching pool created successfully");
        Ok(pool)
    }

    /// Set the cleanup callback for connection lifecycle events
    ///
    /// This callback is invoked when a connection is permanently destroyed,
    /// allowing the prepared statement cache to clean up entries for that connection.
    pub fn set_cleanup_callback<F>(&self, callback: F)
    where
        F: Fn(u64) + Send + Sync + 'static,
    {
        *self.cleanup_callback.lock() = Some(Arc::new(callback));
    }

    /// Pre-create the minimum number of connections
    async fn pre_create_connections(&self) -> Result<()> {
        let current_count = self.idle_connections.lock().len();
        let needed = self.config.min_connections.saturating_sub(current_count);

        for _ in 0..needed {
            let conn = self.create_connection().await?;
            self.idle_connections.lock().push(conn);
        }

        debug!("Pre-created {} connections", needed);
        Ok(())
    }

    /// Create a new physical database connection
    async fn create_connection(&self) -> Result<PooledConnection> {
        let conn = self
            .db
            .connect()
            .map_err(|e| Error::Storage(format!("Failed to create connection: {}", e)))?;

        let pooled_conn = PooledConnection::new(conn);

        // Validate if enabled
        if self.config.enable_health_check {
            pooled_conn
                .validate()
                .await
                .map_err(|e| Error::Storage(format!("Connection health check failed: {}", e)))?;
        }

        // Update stats
        self.stats.lock().total_created += 1;
        self.stats.lock().cache_misses += 1;

        Ok(pooled_conn)
    }

    /// Check out a connection from the pool
    ///
    /// # Returns
    ///
    /// A guard that automatically returns the connection to the pool when dropped
    ///
    /// # Errors
    ///
    /// Returns error if timeout waiting for available connection or connection creation fails
    pub async fn get(&self) -> Result<ConnectionGuard> {
        // Acquire semaphore permit (limits concurrent checkouts)
        let permit = tokio::time::timeout(
            self.config.connection_timeout,
            self.semaphore.clone().acquire_owned(),
        )
        .await
        .map_err(|_| {
            Error::Storage(format!(
                "Connection pool timeout after {:?}",
                self.config.connection_timeout
            ))
        })?
        .map_err(|e| Error::Storage(format!("Failed to acquire permit: {}", e)))?;

        // Try to get an idle connection
        let mut pooled_conn = {
            let mut idle = self.idle_connections.lock();
            idle.pop()
        };

        let conn_id = if let Some(ref conn) = pooled_conn {
            // Reusing existing connection - cache hit
            debug!("Reusing connection {}", conn.id());
            self.stats.lock().cache_hits += 1;
            conn.id()
        } else {
            // No idle connection available - create new
            let new_conn = self.create_connection().await?;
            let id = new_conn.id();
            pooled_conn = Some(new_conn);
            id
        };

        // Mark as active
        self.active_connection_ids.lock().insert(conn_id);
        self.stats.lock().total_checkouts += 1;
        self.stats.lock().active_connections += 1;
        self.stats.lock().idle_connections = self.idle_connections.lock().len();

        // SAFETY: pooled_conn is guaranteed to be Some at this point:
        // - Either we got it from idle.pop() and it was Some
        // - Or we created a new connection and stored it in pooled_conn
        let connection = pooled_conn.ok_or_else(|| {
            Error::Storage("Failed to get connection from pool: connection is None".to_string())
        })?;

        Ok(ConnectionGuard {
            pool: self as *const Self as usize, // Store as pointer-sized integer
            connection: Some(connection),
            _permit: Some(permit),
        })
    }

    /// Return a connection to the pool
    fn return_connection(&self, mut connection: PooledConnection) {
        let conn_id = connection.id();

        debug!("Returning connection {} to pool", conn_id);

        // Mark as no longer active
        self.active_connection_ids.lock().remove(&conn_id);
        self.stats.lock().total_returns += 1;
        self.stats.lock().active_connections = self.active_connection_ids.lock().len();
        self.stats.lock().idle_connections = self.idle_connections.lock().len() + 1;

        // Update last-used time
        connection.touch();

        // Return to idle pool
        self.idle_connections.lock().push(connection);
    }

    /// Destroy a connection permanently
    fn destroy_connection(&self, connection: PooledConnection) {
        let conn_id = connection.id();

        debug!("Destroying connection {}", conn_id);

        // Mark as no longer active
        self.active_connection_ids.lock().remove(&conn_id);
        self.stats.lock().evictions += 1;

        // Invoke cleanup callback to clear prepared statement cache
        if let Some(callback) = self.cleanup_callback.lock().as_ref() {
            callback(conn_id);
        }

        // Connection is dropped here
    }

    /// Clean up idle connections that exceed max age or idle time
    pub fn cleanup_idle_connections(&self) -> usize {
        let mut idle = self.idle_connections.lock();
        let original_len = idle.len();

        // Retain only connections that are within limits
        idle.retain(|conn| {
            let age = conn.age();
            let idle_time = conn.idle_time();

            let should_keep =
                age < self.config.max_connection_age && idle_time < self.config.max_idle_time;

            if !should_keep {
                // Invoke cleanup callback for evicted connections
                if let Some(callback) = self.cleanup_callback.lock().as_ref() {
                    callback(conn.id());
                }
                self.stats.lock().evictions += 1;
            }

            should_keep
        });

        let evicted = original_len - idle.len();
        if evicted > 0 {
            info!(
                "Cleaned up {} idle connections (remaining: {})",
                evicted,
                idle.len()
            );
        }

        self.stats.lock().idle_connections = idle.len();
        evicted
    }

    /// Get current pool statistics
    pub fn stats(&self) -> CachingPoolStats {
        self.stats.lock().clone()
    }

    /// Get the cache hit rate
    pub fn cache_hit_rate(&self) -> f64 {
        let stats = self.stats.lock();
        let total = stats.cache_hits + stats.cache_misses;
        if total == 0 {
            0.0
        } else {
            stats.cache_hits as f64 / total as f64
        }
    }

    /// Get number of available connections
    pub fn available_connections(&self) -> usize {
        self.idle_connections.lock().len()
    }

    /// Get number of active (checked out) connections
    pub fn active_connections(&self) -> usize {
        self.active_connection_ids.lock().len()
    }
}

/// Guard for a checked-out connection
///
/// Automatically returns the connection to the pool when dropped.
pub struct ConnectionGuard {
    pool: usize,
    connection: Option<PooledConnection>,
    _permit: Option<tokio::sync::OwnedSemaphorePermit>,
}

impl ConnectionGuard {
    /// Get the stable connection ID
    ///
    /// # Errors
    ///
    /// Returns an error if the connection has been taken (should not happen in normal usage).
    pub fn id(&self) -> do_memory_core::Result<u64> {
        self.connection.as_ref().map(|c| c.id()).ok_or_else(|| {
            do_memory_core::Error::Storage(
                "ConnectionGuard::id() called on guard without connection".to_string(),
            )
        })
    }

    /// Get a reference to the underlying connection
    ///
    /// # Errors
    ///
    /// Returns an error if the connection has been taken (should not happen in normal usage).
    pub fn connection(&self) -> do_memory_core::Result<&libsql::Connection> {
        self.connection
            .as_ref()
            .map(|c| c.connection())
            .ok_or_else(|| {
                do_memory_core::Error::Storage(
                    "ConnectionGuard::connection() called on guard without connection".to_string(),
                )
            })
    }

    /// Get the pooled connection wrapper
    pub fn pooled(&self) -> Option<&PooledConnection> {
        self.connection.as_ref()
    }
}

impl Drop for ConnectionGuard {
    fn drop(&mut self) {
        // Convert pointer back to reference
        let pool = unsafe { &*(self.pool as *const CachingPool) };

        // Return connection to pool instead of destroying it
        if let (Some(_permit), Some(connection)) = (self._permit.take(), self.connection.take()) {
            pool.return_connection(connection);
        }
    }
}

// SAFETY: The connection guard is Send because the pool reference is never accessed concurrently
// from different threads (it's only accessed in Drop which runs sequentially).
unsafe impl Send for ConnectionGuard {}

#[cfg(test)]
#[path = "caching_pool_tests.rs"]
mod tests;