Skip to main content

do_memory_storage_turso/prepared/
cache.rs

1//! Connection-Aware Prepared Statement Cache
2//!
3//! This module provides a connection-aware prepared statement cache that:
4//! - Tracks prepared statements per connection (using connection ID)
5//! - Handles connection lifecycle (cleanup on connection close)
6//! - Implements LRU eviction tracking
7//! - Provides thread-safe operations
8//! - Tracks cache statistics (hits, misses, evictions)
9//!
10//! ## Architecture
11//!
12//! The cache uses a two-level structure:
13//! ```text
14//! ConnectionId -> { SQL -> CachedStatementMetadata }
15//! ```
16//!
17//! Note: Due to libsql::Statement not implementing Clone or Send, we cannot
18//! actually cache the statement objects. Instead, we cache metadata about
19//! prepared statements and track statistics. The real performance benefit
20//! comes from SQLite's internal statement cache.
21//!
22//! Each connection has its own cache of prepared statement metadata.
23//! When a connection is closed/returned to pool, its cache is cleared.
24
25use crate::pool::ConnectionId;
26#[path = "cache_types.rs"]
27mod types;
28use parking_lot::RwLock;
29use std::collections::HashMap;
30use std::time::{Duration, Instant};
31use tracing::{debug, trace, warn};
32use types::{CachedStatementMetadata, ConnectionCache};
33pub use types::{PreparedCacheConfig, PreparedCacheStats};
34
35/// Maximum age for cached statements before forced refresh (1 hour)
36const MAX_STATEMENT_AGE: Duration = Duration::from_secs(3600);
37
38// ConnectionId is imported from crate::pool::ConnectionId (type alias for u64)
39
40/// Connection-aware prepared statement cache
41///
42/// This cache tracks prepared statements per connection, ensuring that
43/// statement metadata is associated with the correct connection.
44///
45/// ## Thread Safety
46///
47/// The cache uses `RwLock` for interior mutability, allowing concurrent
48/// reads and exclusive writes. All operations are thread-safe.
49///
50/// ## Connection Lifecycle
51///
52/// When a connection is returned to the pool or closed, its cache should
53/// be cleared by calling `clear_connection()`.
54pub struct PreparedStatementCache {
55    /// The cache storage: ConnectionId -> ConnectionCache
56    cache: RwLock<HashMap<ConnectionId, ConnectionCache>>,
57    /// Cache configuration
58    config: PreparedCacheConfig,
59    /// Statistics
60    stats: RwLock<PreparedCacheStats>,
61}
62
63impl PreparedStatementCache {
64    /// Create a new connection-aware prepared statement cache
65    ///
66    /// # Arguments
67    ///
68    /// * `max_size` - Maximum number of statements to cache per connection
69    ///
70    /// # Example
71    ///
72    /// ```rust
73    /// use do_memory_storage_turso::prepared::PreparedStatementCache;
74    ///
75    /// let cache = PreparedStatementCache::new(100);
76    /// ```
77    pub fn new(max_size: usize) -> Self {
78        Self {
79            cache: RwLock::new(HashMap::new()),
80            config: PreparedCacheConfig {
81                max_size,
82                ..Default::default()
83            },
84            stats: RwLock::new(PreparedCacheStats::default()),
85        }
86    }
87
88    /// Create a cache with custom configuration
89    pub fn with_config(config: PreparedCacheConfig) -> Self {
90        Self {
91            cache: RwLock::new(HashMap::new()),
92            config,
93            stats: RwLock::new(PreparedCacheStats::default()),
94        }
95    }
96
97    /// Get or create a connection ID for a connection
98    ///
99    /// This generates a unique ID for tracking the connection in the cache.
100    /// The ID should be stored alongside the connection and used for all
101    /// cache operations with that connection.
102    pub fn get_connection_id(&self) -> ConnectionId {
103        use std::sync::atomic::AtomicU64;
104        use std::sync::atomic::Ordering;
105
106        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
107        NEXT_ID.fetch_add(1, Ordering::Relaxed)
108    }
109
110    /// Record a cache hit for a statement
111    ///
112    /// This should be called when a statement is found in SQLite's internal cache
113    /// or when the application determines a statement was reused.
114    ///
115    /// # Arguments
116    ///
117    /// * `conn_id` - Connection identifier
118    /// * `sql` - SQL statement that was hit
119    pub fn record_hit(&self, conn_id: ConnectionId, sql: &str) {
120        let mut cache = self.cache.write();
121
122        // Get or create connection cache
123        let conn_cache = cache.entry(conn_id).or_insert_with(|| {
124            debug!("Creating new connection cache for {:?}", conn_id);
125            ConnectionCache::new()
126        });
127
128        // Update the statement metadata if it exists
129        if let Some(stmt) = conn_cache.get(sql) {
130            stmt.increment_use();
131        }
132
133        drop(cache);
134        self.stats.write().record_hit();
135    }
136
137    /// Record a cache miss for a statement
138    ///
139    /// This should be called when a statement needs to be prepared.
140    ///
141    /// # Arguments
142    ///
143    /// * `conn_id` - Connection identifier
144    /// * `sql` - SQL statement that was missed
145    /// * `prepare_time_us` - Time taken to prepare the statement (microseconds)
146    pub fn record_miss(&self, conn_id: ConnectionId, sql: &str, prepare_time_us: u64) {
147        let mut cache = self.cache.write();
148
149        // Check if we need to evict at connection level
150        if cache.len() >= self.config.max_connections && !cache.contains_key(&conn_id) {
151            self.evict_lru_connection(&mut cache);
152        }
153
154        // Get or create connection cache
155        let conn_cache = cache.entry(conn_id).or_insert_with(|| {
156            debug!("Creating new connection cache for {:?}", conn_id);
157            ConnectionCache::new()
158        });
159
160        // Check if we need to evict at statement level
161        if conn_cache.len() >= self.config.max_size && !conn_cache.statements.contains_key(sql) {
162            if let Some(evicted) = conn_cache.evict_lru() {
163                debug!("Evicted cached statement: {}", evicted);
164                self.stats.write().record_eviction();
165            }
166        }
167
168        // Insert the new statement metadata
169        let metadata = CachedStatementMetadata::new(sql.to_string());
170        conn_cache.insert(sql.to_string(), metadata);
171
172        // Calculate sizes before dropping the lock
173        let total_size = cache.values().map(|c| c.len()).sum();
174        let connection_count = cache.len();
175
176        drop(cache);
177
178        // Update stats
179        let mut stats = self.stats.write();
180        stats.record_miss();
181        stats.record_prepared(prepare_time_us);
182        stats.update_size(total_size);
183        stats.update_active_connections(connection_count);
184
185        trace!("Recorded cache miss for SQL on {:?}: {}", conn_id, sql);
186    }
187
188    /// Check if a statement is cached for a connection
189    ///
190    /// # Arguments
191    ///
192    /// * `conn_id` - Connection identifier
193    /// * `sql` - SQL statement to check
194    ///
195    /// # Returns
196    ///
197    /// true if the statement is cached and doesn't need refresh
198    pub fn is_cached(&self, conn_id: ConnectionId, sql: &str) -> bool {
199        let mut cache = self.cache.write();
200
201        if let Some(conn_cache) = cache.get_mut(&conn_id) {
202            if let Some(stmt) = conn_cache.get(sql) {
203                return !stmt.needs_refresh(&self.config);
204            }
205        }
206
207        false
208    }
209
210    /// Get a prepared statement or prepare it if not cached
211    ///
212    /// This is a convenience method that generates a new connection ID for each call.
213    /// For proper connection-aware caching, use `get_connection_id()` and the
214    /// connection-specific methods instead.
215    ///
216    /// # Arguments
217    ///
218    /// * `conn` - Database connection to prepare on
219    /// * `sql` - SQL statement to prepare
220    ///
221    /// # Returns
222    ///
223    /// The prepared statement
224    ///
225    /// # Errors
226    ///
227    /// Returns error if statement preparation fails
228    pub async fn get_or_prepare(
229        &self,
230        conn: &libsql::Connection,
231        sql: &str,
232    ) -> Result<libsql::Statement, libsql::Error> {
233        let conn_id = self.get_connection_id();
234
235        // Check if this is a cache hit
236        if self.is_cached(conn_id, sql) {
237            self.record_hit(conn_id, sql);
238        }
239
240        // Prepare the statement
241        let start = Instant::now();
242        let stmt = conn.prepare(sql).await?;
243        let prepare_time_us = start.elapsed().as_micros() as u64;
244
245        // Record the miss (tracks metadata)
246        self.record_miss(conn_id, sql, prepare_time_us);
247
248        Ok(stmt)
249    }
250
251    /// Evict the least recently used connection cache
252    fn evict_lru_connection(&self, cache: &mut HashMap<ConnectionId, ConnectionCache>) {
253        if cache.is_empty() {
254            return;
255        }
256
257        // Find the connection with the oldest last access time
258        let mut oldest = None;
259        let mut oldest_time = Instant::now();
260
261        for (id, conn_cache) in cache.iter() {
262            if conn_cache.last_accessed < oldest_time {
263                oldest_time = conn_cache.last_accessed;
264                oldest = Some(*id);
265            }
266        }
267
268        if let Some(id) = oldest {
269            if cache.remove(&id).is_some() {
270                warn!(
271                    "Evicted connection cache for {:?} (max connections exceeded)",
272                    id
273                );
274                self.stats.write().record_connection_eviction();
275            }
276        }
277    }
278
279    /// Clear all cached statements for a specific connection
280    ///
281    /// This should be called when a connection is returned to the pool
282    /// or closed to prevent memory leaks.
283    ///
284    /// # Arguments
285    ///
286    /// * `conn_id` - Connection identifier to clear
287    ///
288    /// # Returns
289    ///
290    /// Number of statements cleared
291    pub fn clear_connection(&self, conn_id: ConnectionId) -> usize {
292        let mut cache = self.cache.write();
293        let cleared = if let Some(conn_cache) = cache.remove(&conn_id) {
294            let count = conn_cache.len();
295            debug!(
296                "Cleared {} cached statements for connection {:?}",
297                count, conn_id
298            );
299            count
300        } else {
301            0
302        };
303
304        // Update stats - calculate size while still holding the write lock to avoid deadlock
305        let total_size = cache.values().map(|c| c.len()).sum();
306        let active_connections = cache.len();
307        drop(cache);
308        let mut stats = self.stats.write();
309        stats.update_size(total_size);
310        stats.update_active_connections(active_connections);
311
312        cleared
313    }
314
315    /// Clear all cached statements across all connections
316    pub fn clear(&self) {
317        let mut cache = self.cache.write();
318        let total_statements: usize = cache.values().map(|c| c.len()).sum();
319        cache.clear();
320
321        let mut stats = self.stats.write();
322        stats.update_size(0);
323        stats.update_active_connections(0);
324
325        debug!(
326            "Cleared {} cached statements from {} connections",
327            total_statements,
328            cache.len()
329        );
330    }
331
332    /// Get current cache statistics
333    pub fn stats(&self) -> PreparedCacheStats {
334        self.stats.read().clone()
335    }
336
337    /// Get current total cache size (across all connections)
338    pub fn total_size(&self) -> usize {
339        self.cache.read().values().map(|c| c.len()).sum()
340    }
341
342    /// Get cache size for a specific connection
343    pub fn connection_size(&self, conn_id: ConnectionId) -> usize {
344        self.cache
345            .read()
346            .get(&conn_id)
347            .map(|c| c.len())
348            .unwrap_or(0)
349    }
350
351    /// Check if cache is empty
352    pub fn is_empty(&self) -> bool {
353        self.total_size() == 0
354    }
355
356    /// Get number of tracked connections
357    pub fn connection_count(&self) -> usize {
358        self.cache.read().len()
359    }
360
361    /// Remove a specific statement from a connection's cache
362    pub fn remove(&self, conn_id: ConnectionId, sql: &str) -> bool {
363        let mut cache = self.cache.write();
364        let removed = if let Some(conn_cache) = cache.get_mut(&conn_id) {
365            conn_cache.remove(sql)
366        } else {
367            false
368        };
369
370        if removed {
371            // Calculate size while still holding the write lock to avoid deadlock
372            let total_size = cache.values().map(|c| c.len()).sum();
373            let active_connections = cache.len();
374            drop(cache);
375            let mut stats = self.stats.write();
376            stats.update_size(total_size);
377            stats.update_active_connections(active_connections);
378        }
379
380        removed
381    }
382
383    /// Clean up idle connection caches
384    ///
385    /// Removes connection caches that haven't been accessed for longer than the threshold.
386    /// This helps prevent memory leaks from abandoned connection IDs.
387    ///
388    /// # Arguments
389    ///
390    /// * `max_idle_duration` - Maximum idle time before cleanup
391    ///
392    /// # Returns
393    ///
394    /// Number of connections cleaned up
395    pub fn cleanup_idle_connections(&self, max_idle_duration: Duration) -> usize {
396        let mut cache = self.cache.write();
397        let mut to_remove = Vec::new();
398
399        for (id, conn_cache) in cache.iter() {
400            if conn_cache.idle_time() > max_idle_duration {
401                to_remove.push(*id);
402            }
403        }
404
405        let count = to_remove.len();
406        for id in to_remove {
407            cache.remove(&id);
408            debug!("Cleaned up idle connection cache for {:?}", id);
409        }
410
411        if count > 0 {
412            // Calculate size while still holding the write lock to avoid deadlock
413            let total_size = cache.values().map(|c| c.len()).sum();
414            let active_connections = cache.len();
415            drop(cache);
416            let mut stats = self.stats.write();
417            stats.update_size(total_size);
418            stats.update_active_connections(active_connections);
419            stats.connection_evictions += count as u64;
420        }
421
422        count
423    }
424}
425
426impl Default for PreparedStatementCache {
427    fn default() -> Self {
428        Self::new(100)
429    }
430}
431
432#[cfg(test)]
433#[path = "cache_tests.rs"]
434mod tests;