do_memory_storage_turso/prepared/cache.rs
1//! Connection-Aware Prepared Statement Cache
2//!
3//! This module provides a connection-aware prepared statement cache that:
4//! - Tracks prepared statements per connection (using connection ID)
5//! - Handles connection lifecycle (cleanup on connection close)
6//! - Implements LRU eviction tracking
7//! - Provides thread-safe operations
8//! - Tracks cache statistics (hits, misses, evictions)
9//!
10//! ## Architecture
11//!
12//! The cache uses a two-level structure:
13//! ```text
14//! ConnectionId -> { SQL -> CachedStatementMetadata }
15//! ```
16//!
17//! Note: Due to libsql::Statement not implementing Clone or Send, we cannot
18//! actually cache the statement objects. Instead, we cache metadata about
19//! prepared statements and track statistics. The real performance benefit
20//! comes from SQLite's internal statement cache.
21//!
22//! Each connection has its own cache of prepared statement metadata.
23//! When a connection is closed/returned to pool, its cache is cleared.
24
25use crate::pool::ConnectionId;
26#[path = "cache_types.rs"]
27mod types;
28use parking_lot::RwLock;
29use std::collections::HashMap;
30use std::time::{Duration, Instant};
31use tracing::{debug, trace, warn};
32use types::{CachedStatementMetadata, ConnectionCache};
33pub use types::{PreparedCacheConfig, PreparedCacheStats};
34
35/// Maximum age for cached statements before forced refresh (1 hour)
36const MAX_STATEMENT_AGE: Duration = Duration::from_secs(3600);
37
38// ConnectionId is imported from crate::pool::ConnectionId (type alias for u64)
39
40/// Connection-aware prepared statement cache
41///
42/// This cache tracks prepared statements per connection, ensuring that
43/// statement metadata is associated with the correct connection.
44///
45/// ## Thread Safety
46///
47/// The cache uses `RwLock` for interior mutability, allowing concurrent
48/// reads and exclusive writes. All operations are thread-safe.
49///
50/// ## Connection Lifecycle
51///
52/// When a connection is returned to the pool or closed, its cache should
53/// be cleared by calling `clear_connection()`.
54pub struct PreparedStatementCache {
55 /// The cache storage: ConnectionId -> ConnectionCache
56 cache: RwLock<HashMap<ConnectionId, ConnectionCache>>,
57 /// Cache configuration
58 config: PreparedCacheConfig,
59 /// Statistics
60 stats: RwLock<PreparedCacheStats>,
61}
62
63impl PreparedStatementCache {
64 /// Create a new connection-aware prepared statement cache
65 ///
66 /// # Arguments
67 ///
68 /// * `max_size` - Maximum number of statements to cache per connection
69 ///
70 /// # Example
71 ///
72 /// ```rust
73 /// use do_memory_storage_turso::prepared::PreparedStatementCache;
74 ///
75 /// let cache = PreparedStatementCache::new(100);
76 /// ```
77 pub fn new(max_size: usize) -> Self {
78 Self {
79 cache: RwLock::new(HashMap::new()),
80 config: PreparedCacheConfig {
81 max_size,
82 ..Default::default()
83 },
84 stats: RwLock::new(PreparedCacheStats::default()),
85 }
86 }
87
88 /// Create a cache with custom configuration
89 pub fn with_config(config: PreparedCacheConfig) -> Self {
90 Self {
91 cache: RwLock::new(HashMap::new()),
92 config,
93 stats: RwLock::new(PreparedCacheStats::default()),
94 }
95 }
96
97 /// Get or create a connection ID for a connection
98 ///
99 /// This generates a unique ID for tracking the connection in the cache.
100 /// The ID should be stored alongside the connection and used for all
101 /// cache operations with that connection.
102 pub fn get_connection_id(&self) -> ConnectionId {
103 use std::sync::atomic::AtomicU64;
104 use std::sync::atomic::Ordering;
105
106 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
107 NEXT_ID.fetch_add(1, Ordering::Relaxed)
108 }
109
110 /// Record a cache hit for a statement
111 ///
112 /// This should be called when a statement is found in SQLite's internal cache
113 /// or when the application determines a statement was reused.
114 ///
115 /// # Arguments
116 ///
117 /// * `conn_id` - Connection identifier
118 /// * `sql` - SQL statement that was hit
119 pub fn record_hit(&self, conn_id: ConnectionId, sql: &str) {
120 let mut cache = self.cache.write();
121
122 // Get or create connection cache
123 let conn_cache = cache.entry(conn_id).or_insert_with(|| {
124 debug!("Creating new connection cache for {:?}", conn_id);
125 ConnectionCache::new()
126 });
127
128 // Update the statement metadata if it exists
129 if let Some(stmt) = conn_cache.get(sql) {
130 stmt.increment_use();
131 }
132
133 drop(cache);
134 self.stats.write().record_hit();
135 }
136
137 /// Record a cache miss for a statement
138 ///
139 /// This should be called when a statement needs to be prepared.
140 ///
141 /// # Arguments
142 ///
143 /// * `conn_id` - Connection identifier
144 /// * `sql` - SQL statement that was missed
145 /// * `prepare_time_us` - Time taken to prepare the statement (microseconds)
146 pub fn record_miss(&self, conn_id: ConnectionId, sql: &str, prepare_time_us: u64) {
147 let mut cache = self.cache.write();
148
149 // Check if we need to evict at connection level
150 if cache.len() >= self.config.max_connections && !cache.contains_key(&conn_id) {
151 self.evict_lru_connection(&mut cache);
152 }
153
154 // Get or create connection cache
155 let conn_cache = cache.entry(conn_id).or_insert_with(|| {
156 debug!("Creating new connection cache for {:?}", conn_id);
157 ConnectionCache::new()
158 });
159
160 // Check if we need to evict at statement level
161 if conn_cache.len() >= self.config.max_size && !conn_cache.statements.contains_key(sql) {
162 if let Some(evicted) = conn_cache.evict_lru() {
163 debug!("Evicted cached statement: {}", evicted);
164 self.stats.write().record_eviction();
165 }
166 }
167
168 // Insert the new statement metadata
169 let metadata = CachedStatementMetadata::new(sql.to_string());
170 conn_cache.insert(sql.to_string(), metadata);
171
172 // Calculate sizes before dropping the lock
173 let total_size = cache.values().map(|c| c.len()).sum();
174 let connection_count = cache.len();
175
176 drop(cache);
177
178 // Update stats
179 let mut stats = self.stats.write();
180 stats.record_miss();
181 stats.record_prepared(prepare_time_us);
182 stats.update_size(total_size);
183 stats.update_active_connections(connection_count);
184
185 trace!("Recorded cache miss for SQL on {:?}: {}", conn_id, sql);
186 }
187
188 /// Check if a statement is cached for a connection
189 ///
190 /// # Arguments
191 ///
192 /// * `conn_id` - Connection identifier
193 /// * `sql` - SQL statement to check
194 ///
195 /// # Returns
196 ///
197 /// true if the statement is cached and doesn't need refresh
198 pub fn is_cached(&self, conn_id: ConnectionId, sql: &str) -> bool {
199 let mut cache = self.cache.write();
200
201 if let Some(conn_cache) = cache.get_mut(&conn_id) {
202 if let Some(stmt) = conn_cache.get(sql) {
203 return !stmt.needs_refresh(&self.config);
204 }
205 }
206
207 false
208 }
209
210 /// Get a prepared statement or prepare it if not cached
211 ///
212 /// This is a convenience method that generates a new connection ID for each call.
213 /// For proper connection-aware caching, use `get_connection_id()` and the
214 /// connection-specific methods instead.
215 ///
216 /// # Arguments
217 ///
218 /// * `conn` - Database connection to prepare on
219 /// * `sql` - SQL statement to prepare
220 ///
221 /// # Returns
222 ///
223 /// The prepared statement
224 ///
225 /// # Errors
226 ///
227 /// Returns error if statement preparation fails
228 pub async fn get_or_prepare(
229 &self,
230 conn: &libsql::Connection,
231 sql: &str,
232 ) -> Result<libsql::Statement, libsql::Error> {
233 let conn_id = self.get_connection_id();
234
235 // Check if this is a cache hit
236 if self.is_cached(conn_id, sql) {
237 self.record_hit(conn_id, sql);
238 }
239
240 // Prepare the statement
241 let start = Instant::now();
242 let stmt = conn.prepare(sql).await?;
243 let prepare_time_us = start.elapsed().as_micros() as u64;
244
245 // Record the miss (tracks metadata)
246 self.record_miss(conn_id, sql, prepare_time_us);
247
248 Ok(stmt)
249 }
250
251 /// Evict the least recently used connection cache
252 fn evict_lru_connection(&self, cache: &mut HashMap<ConnectionId, ConnectionCache>) {
253 if cache.is_empty() {
254 return;
255 }
256
257 // Find the connection with the oldest last access time
258 let mut oldest = None;
259 let mut oldest_time = Instant::now();
260
261 for (id, conn_cache) in cache.iter() {
262 if conn_cache.last_accessed < oldest_time {
263 oldest_time = conn_cache.last_accessed;
264 oldest = Some(*id);
265 }
266 }
267
268 if let Some(id) = oldest {
269 if cache.remove(&id).is_some() {
270 warn!(
271 "Evicted connection cache for {:?} (max connections exceeded)",
272 id
273 );
274 self.stats.write().record_connection_eviction();
275 }
276 }
277 }
278
279 /// Clear all cached statements for a specific connection
280 ///
281 /// This should be called when a connection is returned to the pool
282 /// or closed to prevent memory leaks.
283 ///
284 /// # Arguments
285 ///
286 /// * `conn_id` - Connection identifier to clear
287 ///
288 /// # Returns
289 ///
290 /// Number of statements cleared
291 pub fn clear_connection(&self, conn_id: ConnectionId) -> usize {
292 let mut cache = self.cache.write();
293 let cleared = if let Some(conn_cache) = cache.remove(&conn_id) {
294 let count = conn_cache.len();
295 debug!(
296 "Cleared {} cached statements for connection {:?}",
297 count, conn_id
298 );
299 count
300 } else {
301 0
302 };
303
304 // Update stats - calculate size while still holding the write lock to avoid deadlock
305 let total_size = cache.values().map(|c| c.len()).sum();
306 let active_connections = cache.len();
307 drop(cache);
308 let mut stats = self.stats.write();
309 stats.update_size(total_size);
310 stats.update_active_connections(active_connections);
311
312 cleared
313 }
314
315 /// Clear all cached statements across all connections
316 pub fn clear(&self) {
317 let mut cache = self.cache.write();
318 let total_statements: usize = cache.values().map(|c| c.len()).sum();
319 cache.clear();
320
321 let mut stats = self.stats.write();
322 stats.update_size(0);
323 stats.update_active_connections(0);
324
325 debug!(
326 "Cleared {} cached statements from {} connections",
327 total_statements,
328 cache.len()
329 );
330 }
331
332 /// Get current cache statistics
333 pub fn stats(&self) -> PreparedCacheStats {
334 self.stats.read().clone()
335 }
336
337 /// Get current total cache size (across all connections)
338 pub fn total_size(&self) -> usize {
339 self.cache.read().values().map(|c| c.len()).sum()
340 }
341
342 /// Get cache size for a specific connection
343 pub fn connection_size(&self, conn_id: ConnectionId) -> usize {
344 self.cache
345 .read()
346 .get(&conn_id)
347 .map(|c| c.len())
348 .unwrap_or(0)
349 }
350
351 /// Check if cache is empty
352 pub fn is_empty(&self) -> bool {
353 self.total_size() == 0
354 }
355
356 /// Get number of tracked connections
357 pub fn connection_count(&self) -> usize {
358 self.cache.read().len()
359 }
360
361 /// Remove a specific statement from a connection's cache
362 pub fn remove(&self, conn_id: ConnectionId, sql: &str) -> bool {
363 let mut cache = self.cache.write();
364 let removed = if let Some(conn_cache) = cache.get_mut(&conn_id) {
365 conn_cache.remove(sql)
366 } else {
367 false
368 };
369
370 if removed {
371 // Calculate size while still holding the write lock to avoid deadlock
372 let total_size = cache.values().map(|c| c.len()).sum();
373 let active_connections = cache.len();
374 drop(cache);
375 let mut stats = self.stats.write();
376 stats.update_size(total_size);
377 stats.update_active_connections(active_connections);
378 }
379
380 removed
381 }
382
383 /// Clean up idle connection caches
384 ///
385 /// Removes connection caches that haven't been accessed for longer than the threshold.
386 /// This helps prevent memory leaks from abandoned connection IDs.
387 ///
388 /// # Arguments
389 ///
390 /// * `max_idle_duration` - Maximum idle time before cleanup
391 ///
392 /// # Returns
393 ///
394 /// Number of connections cleaned up
395 pub fn cleanup_idle_connections(&self, max_idle_duration: Duration) -> usize {
396 let mut cache = self.cache.write();
397 let mut to_remove = Vec::new();
398
399 for (id, conn_cache) in cache.iter() {
400 if conn_cache.idle_time() > max_idle_duration {
401 to_remove.push(*id);
402 }
403 }
404
405 let count = to_remove.len();
406 for id in to_remove {
407 cache.remove(&id);
408 debug!("Cleaned up idle connection cache for {:?}", id);
409 }
410
411 if count > 0 {
412 // Calculate size while still holding the write lock to avoid deadlock
413 let total_size = cache.values().map(|c| c.len()).sum();
414 let active_connections = cache.len();
415 drop(cache);
416 let mut stats = self.stats.write();
417 stats.update_size(total_size);
418 stats.update_active_connections(active_connections);
419 stats.connection_evictions += count as u64;
420 }
421
422 count
423 }
424}
425
426impl Default for PreparedStatementCache {
427 fn default() -> Self {
428 Self::new(100)
429 }
430}
431
432#[cfg(test)]
433#[path = "cache_tests.rs"]
434mod tests;