Skip to main content

cqlite_core/memory/
mod.rs

1//! Memory management for CQLite
2
3use lru::LruCache;
4use parking_lot::RwLock;
5use std::collections::HashMap;
6use std::num::NonZeroUsize;
7use std::sync::Arc;
8
9use crate::{types::TableId, Config, Result, Value};
10
11/// Memory manager for caching and buffer management
12#[derive(Debug)]
13pub struct MemoryManager {
14    /// Block cache for storage blocks
15    block_cache: Arc<RwLock<BlockCache>>,
16
17    /// Row cache for frequently accessed rows
18    row_cache: Arc<RwLock<RowCache>>,
19
20    /// Buffer pool for memory allocation
21    buffer_pool: Arc<RwLock<BufferPool>>,
22
23    /// Memory statistics
24    stats: Arc<RwLock<MemoryStats>>,
25}
26
27/// Block cache for storage blocks
28struct BlockCache {
29    /// LRU cache for blocks (provides O(1) get/put)
30    cache: LruCache<BlockKey, Arc<Block>>,
31
32    /// Maximum cache size in bytes
33    max_size: usize,
34
35    /// Current size in bytes
36    current_size: usize,
37}
38
39/// Row cache for frequently accessed rows
40struct RowCache {
41    /// LRU cache for rows (provides O(1) get/put)
42    cache: LruCache<RowKey, Arc<CachedRow>>,
43
44    /// Maximum cache size in bytes
45    max_size: usize,
46
47    /// Current size in bytes
48    current_size: usize,
49}
50
51/// Buffer pool for memory allocation
52#[derive(Debug)]
53struct BufferPool {
54    /// Free buffers by size
55    free_buffers: HashMap<usize, Vec<Vec<u8>>>,
56
57    /// Allocated buffers
58    allocated_count: usize,
59
60    /// Total memory used
61    total_memory: usize,
62
63    /// Maximum memory allowed
64    max_memory: usize,
65}
66
67/// Block key for cache lookup
68#[derive(Debug, Clone, Hash, PartialEq, Eq)]
69struct BlockKey {
70    table_id: TableId,
71    block_id: u64,
72}
73
74/// Row key for cache lookup
75#[derive(Debug, Clone, Hash, PartialEq, Eq)]
76struct RowKey {
77    table_id: TableId,
78    row_key: String,
79}
80
81/// Cached block
82#[derive(Debug)]
83struct Block {
84    /// Block size
85    size: usize,
86
87    /// Last access time (reserved for future LRU enhancements)
88    _last_access: std::time::Instant,
89}
90
91/// Cached row
92#[derive(Debug)]
93struct CachedRow {
94    /// Row data
95    _data: Vec<Value>,
96
97    /// Row size estimate
98    size: usize,
99}
100
101impl MemoryManager {
102    /// Create a new memory manager
103    pub fn new(config: &Config) -> Result<Self> {
104        let block_cache = Arc::new(RwLock::new(BlockCache::new(
105            config.memory.block_cache.max_size as usize,
106        )));
107        let row_cache = Arc::new(RwLock::new(RowCache::new(
108            config.memory.row_cache.max_size as usize,
109        )));
110        let buffer_pool = Arc::new(RwLock::new(BufferPool::new(
111            config.memory.max_memory as usize,
112        )));
113
114        Ok(Self {
115            block_cache,
116            row_cache,
117            buffer_pool,
118            stats: Arc::new(RwLock::new(MemoryStats::default())),
119        })
120    }
121
122    /// Get a block from cache
123    pub fn get_block(&self, table_id: &TableId, block_id: u64) -> Option<Arc<Block>> {
124        let key = BlockKey {
125            table_id: table_id.clone(),
126            block_id,
127        };
128
129        let mut cache = self.block_cache.write();
130
131        // LruCache::get() is O(1) and automatically updates LRU order
132        if let Some(block) = cache.cache.get(&key) {
133            // Update stats
134            {
135                let mut stats = self.stats.write();
136                stats.block_cache_hits += 1;
137            }
138
139            Some(Arc::clone(block))
140        } else {
141            // Update stats
142            {
143                let mut stats = self.stats.write();
144                stats.block_cache_misses += 1;
145            }
146
147            None
148        }
149    }
150
151    /// Put a block in cache
152    pub fn put_block(&self, table_id: &TableId, block_id: u64, data: Vec<u8>) {
153        let key = BlockKey {
154            table_id: table_id.clone(),
155            block_id,
156        };
157
158        let block = Arc::new(Block {
159            size: data.len(),
160            _last_access: std::time::Instant::now(),
161        });
162
163        let mut cache = self.block_cache.write();
164
165        // Evict LRU entries until we have space
166        while cache.current_size + block.size > cache.max_size {
167            // LruCache::pop_lru() is O(1) and removes the least recently used entry
168            if let Some((_, evicted_block)) = cache.cache.pop_lru() {
169                cache.current_size -= evicted_block.size;
170            } else {
171                // Cache is empty, stop eviction
172                break;
173            }
174        }
175
176        // LruCache::put() is O(1) and automatically updates LRU order
177        cache.current_size += block.size;
178        cache.cache.put(key, block);
179    }
180
181    /// Get a row from cache
182    pub fn get_row(&self, table_id: &TableId, row_key: &str) -> Option<Arc<CachedRow>> {
183        let key = RowKey {
184            table_id: table_id.clone(),
185            row_key: row_key.to_string(),
186        };
187
188        let mut cache = self.row_cache.write();
189
190        // LruCache::get() is O(1) and automatically updates LRU order
191        if let Some(row) = cache.cache.get(&key) {
192            // Update stats
193            {
194                let mut stats = self.stats.write();
195                stats.row_cache_hits += 1;
196            }
197
198            Some(Arc::clone(row))
199        } else {
200            // Update stats
201            {
202                let mut stats = self.stats.write();
203                stats.row_cache_misses += 1;
204            }
205
206            None
207        }
208    }
209
210    /// Put a row in cache
211    pub fn put_row(&self, table_id: &TableId, row_key: &str, data: Vec<Value>) {
212        let key = RowKey {
213            table_id: table_id.clone(),
214            row_key: row_key.to_string(),
215        };
216
217        let size = self.estimate_row_size(&data);
218        let row = Arc::new(CachedRow { _data: data, size });
219
220        let mut cache = self.row_cache.write();
221
222        // Evict LRU entries until we have space
223        while cache.current_size + row.size > cache.max_size {
224            // LruCache::pop_lru() is O(1) and removes the least recently used entry
225            if let Some((_, evicted_row)) = cache.cache.pop_lru() {
226                cache.current_size -= evicted_row.size;
227            } else {
228                // Cache is empty, stop eviction
229                break;
230            }
231        }
232
233        // LruCache::put() is O(1) and automatically updates LRU order
234        cache.current_size += row.size;
235        cache.cache.put(key, row);
236    }
237
238    /// Allocate buffer from pool
239    pub fn allocate_buffer(&self, size: usize) -> Result<Vec<u8>> {
240        let mut pool = self.buffer_pool.write();
241
242        if let Some(buffers) = pool.free_buffers.get_mut(&size) {
243            if let Some(buffer) = buffers.pop() {
244                pool.allocated_count += 1;
245                pool.total_memory += size;
246
247                // Update stats
248                let mut stats = self.stats.write();
249                stats.buffer_allocations += 1;
250                stats.total_memory_used = pool.total_memory;
251
252                return Ok(buffer);
253            }
254        }
255
256        // Check memory limit before allocating new buffer
257        if pool.total_memory + size > pool.max_memory {
258            return Err(crate::Error::Memory(format!(
259                "Memory limit exceeded: requested {} bytes would exceed limit of {} bytes (current usage: {} bytes)",
260                size, pool.max_memory, pool.total_memory
261            )));
262        }
263
264        // Allocate new buffer
265        pool.allocated_count += 1;
266        pool.total_memory += size;
267
268        // Update stats
269        let mut stats = self.stats.write();
270        stats.buffer_allocations += 1;
271        stats.total_memory_used = pool.total_memory;
272
273        Ok(vec![0u8; size])
274    }
275
276    /// Return buffer to pool
277    pub fn deallocate_buffer(&self, mut buffer: Vec<u8>) {
278        let size = buffer.len();
279        buffer.clear();
280        // Don't shrink_to_fit() as we want to preserve capacity for reuse
281        buffer.resize(size, 0);
282
283        let mut pool = self.buffer_pool.write();
284        pool.total_memory -= size;
285        pool.free_buffers.entry(size).or_default().push(buffer);
286        pool.allocated_count -= 1;
287
288        // Update stats
289        let mut stats = self.stats.write();
290        stats.buffer_deallocations += 1;
291        stats.total_memory_used = pool.total_memory;
292    }
293
294    /// Get memory statistics
295    pub fn stats(&self) -> Result<MemoryStats> {
296        let stats = self.stats.read();
297        Ok(stats.clone())
298    }
299
300    /// Clear all caches
301    pub fn clear_caches(&self) {
302        {
303            let mut cache = self.block_cache.write();
304            cache.cache.clear();
305            cache.current_size = 0;
306        }
307
308        {
309            let mut cache = self.row_cache.write();
310            cache.cache.clear();
311            cache.current_size = 0;
312        }
313    }
314
315    /// Estimate row size
316    fn estimate_row_size(&self, data: &[Value]) -> usize {
317        data.iter().map(|v| self.estimate_value_size(v)).sum()
318    }
319
320    /// Estimate value size
321    #[allow(clippy::only_used_in_recursion)]
322    fn estimate_value_size(&self, value: &Value) -> usize {
323        match value {
324            Value::Null => 1,
325            Value::Boolean(_) => 1,
326            Value::Integer(_) => 4,
327            Value::BigInt(_) => 8,
328            Value::Counter(_) => 8,
329            Value::Float(_) => 8,
330            Value::Text(s) => s.len(),
331            Value::Blob(b) => b.len(),
332            Value::Timestamp(_) => 8,
333            Value::Date(_) => 4,
334            Value::Time(_) => 8,
335            Value::Uuid(_) => 16,
336            Value::Inet(bytes) => bytes.len(),
337            Value::Json(json) => json.to_string().len(),
338            Value::List(items) => items.iter().map(|v| self.estimate_value_size(v)).sum(),
339            Value::Map(map) => map
340                .iter()
341                .map(|(k, v)| self.estimate_value_size(k) + self.estimate_value_size(v))
342                .sum(),
343            Value::TinyInt(_) => 1,
344            Value::SmallInt(_) => 2,
345            Value::Float32(_) => 4,
346            Value::Set(items) => items.iter().map(|v| self.estimate_value_size(v)).sum(),
347            Value::Tuple(items) => items.iter().map(|v| self.estimate_value_size(v)).sum(),
348            Value::Udt(udt) => udt
349                .fields
350                .iter()
351                .map(|f| f.value.as_ref().map_or(0, |v| self.estimate_value_size(v)))
352                .sum(),
353            Value::Frozen(boxed_value) => self.estimate_value_size(boxed_value),
354            Value::Varint(data) => data.len(),
355            Value::Decimal { unscaled, .. } => 4 + unscaled.len(), // scale + unscaled data
356            Value::Duration { .. } => 12,                          // 3 * 4 bytes
357            Value::Tombstone(_) => 16, // timestamp + type + optional TTL
358        }
359    }
360}
361
362impl std::fmt::Debug for BlockCache {
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364        f.debug_struct("BlockCache")
365            .field("max_size", &self.max_size)
366            .field("current_size", &self.current_size)
367            .field("cache_len", &self.cache.len())
368            .finish()
369    }
370}
371
372impl BlockCache {
373    fn new(max_size: usize) -> Self {
374        // LruCache requires NonZeroUsize for capacity
375        // We use a reasonable default capacity (1000 entries) for the LRU structure
376        // The actual memory limit is enforced separately via max_size
377        let capacity = NonZeroUsize::new(1000).expect("capacity must be non-zero");
378        Self {
379            cache: LruCache::new(capacity),
380            max_size,
381            current_size: 0,
382        }
383    }
384}
385
386impl std::fmt::Debug for RowCache {
387    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388        f.debug_struct("RowCache")
389            .field("max_size", &self.max_size)
390            .field("current_size", &self.current_size)
391            .field("cache_len", &self.cache.len())
392            .finish()
393    }
394}
395
396impl RowCache {
397    fn new(max_size: usize) -> Self {
398        // LruCache requires NonZeroUsize for capacity
399        // We use a reasonable default capacity (1000 entries) for the LRU structure
400        // The actual memory limit is enforced separately via max_size
401        let capacity = NonZeroUsize::new(1000).expect("capacity must be non-zero");
402        Self {
403            cache: LruCache::new(capacity),
404            max_size,
405            current_size: 0,
406        }
407    }
408}
409
410impl BufferPool {
411    fn new(max_memory: usize) -> Self {
412        Self {
413            free_buffers: HashMap::new(),
414            allocated_count: 0,
415            total_memory: 0,
416            max_memory,
417        }
418    }
419}
420
421/// Memory statistics
422#[derive(Debug, Clone, Default)]
423pub struct MemoryStats {
424    /// Block cache hits
425    pub block_cache_hits: u64,
426
427    /// Block cache misses
428    pub block_cache_misses: u64,
429
430    /// Row cache hits
431    pub row_cache_hits: u64,
432
433    /// Row cache misses
434    pub row_cache_misses: u64,
435
436    /// Total memory used
437    pub total_memory_used: usize,
438
439    /// Buffer pool allocations
440    pub buffer_allocations: u64,
441
442    /// Buffer pool deallocations
443    pub buffer_deallocations: u64,
444}
445
446impl MemoryStats {
447    /// Calculate block cache hit rate
448    pub fn block_cache_hit_rate(&self) -> f64 {
449        let total = self.block_cache_hits + self.block_cache_misses;
450        if total > 0 {
451            self.block_cache_hits as f64 / total as f64
452        } else {
453            0.0
454        }
455    }
456
457    /// Calculate row cache hit rate
458    pub fn row_cache_hit_rate(&self) -> f64 {
459        let total = self.row_cache_hits + self.row_cache_misses;
460        if total > 0 {
461            self.row_cache_hits as f64 / total as f64
462        } else {
463            0.0
464        }
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use crate::types::TableId;
472
473    #[test]
474    fn test_memory_manager_creation() {
475        let config = Config::default();
476        let manager = MemoryManager::new(&config).unwrap();
477
478        let stats = manager.stats().unwrap();
479        assert_eq!(stats.block_cache_hits, 0);
480        assert_eq!(stats.block_cache_misses, 0);
481    }
482
483    #[test]
484    fn test_block_cache() {
485        let config = Config::default();
486        let manager = MemoryManager::new(&config).unwrap();
487
488        let table_id = TableId::new("test_table");
489        let block_id = 1;
490        let data = vec![1, 2, 3, 4, 5];
491
492        // Cache miss
493        let result = manager.get_block(&table_id, block_id);
494        assert!(result.is_none());
495
496        // Put block
497        manager.put_block(&table_id, block_id, data.clone());
498
499        // Cache hit
500        let result = manager.get_block(&table_id, block_id);
501        assert!(result.is_some());
502        assert_eq!(result.unwrap().size, data.len());
503    }
504
505    #[test]
506    fn test_block_cache_eviction_updates_stats() {
507        let mut config = Config::default();
508        config.memory.block_cache.max_size = 8;
509        let manager = MemoryManager::new(&config).unwrap();
510
511        let table_id = TableId::new("ks_table");
512
513        manager.put_block(&table_id, 1, vec![0u8; 8]);
514        manager.put_block(&table_id, 2, vec![0u8; 4]); // triggers eviction of block 1
515
516        assert!(manager.get_block(&table_id, 1).is_none());
517        assert!(manager.get_block(&table_id, 2).is_some());
518
519        let stats = manager.stats().unwrap();
520        assert_eq!(stats.block_cache_hits, 1);
521        assert_eq!(stats.block_cache_misses, 1);
522    }
523
524    #[test]
525    fn test_row_cache() {
526        let config = Config::default();
527        let manager = MemoryManager::new(&config).unwrap();
528
529        let table_id = TableId::new("test_table");
530        let row_key = "test_key";
531        let data = vec![Value::Integer(42), Value::Text("hello".to_string())];
532
533        // Cache miss
534        let result = manager.get_row(&table_id, row_key);
535        assert!(result.is_none());
536
537        // Put row
538        manager.put_row(&table_id, row_key, data.clone());
539
540        // Cache hit
541        let result = manager.get_row(&table_id, row_key);
542        assert!(result.is_some());
543        assert_eq!(result.unwrap()._data, data);
544    }
545
546    #[test]
547    fn test_row_cache_eviction_and_stats() {
548        let mut config = Config::default();
549        config.memory.row_cache.max_size = 8;
550        let manager = MemoryManager::new(&config).unwrap();
551
552        let table_id = TableId::new("ks_table");
553
554        manager.put_row(&table_id, "k1", vec![Value::Text("abcd".into())]);
555        manager.put_row(&table_id, "k2", vec![Value::Text("efgh".into())]);
556        manager.put_row(&table_id, "k3", vec![Value::Text("ijkl".into())]);
557
558        assert!(manager.get_row(&table_id, "k1").is_none());
559        assert!(manager.get_row(&table_id, "k3").is_some());
560
561        let stats = manager.stats().unwrap();
562        assert_eq!(stats.row_cache_hits, 1);
563        assert_eq!(stats.row_cache_misses, 1);
564    }
565
566    #[test]
567    fn test_buffer_pool() {
568        let config = Config::default();
569        let manager = MemoryManager::new(&config).unwrap();
570
571        let size = 1024;
572        let buffer = manager.allocate_buffer(size).unwrap();
573        assert_eq!(buffer.len(), size);
574
575        manager.deallocate_buffer(buffer);
576
577        // Should reuse buffer
578        let buffer2 = manager.allocate_buffer(size).unwrap();
579        assert_eq!(buffer2.len(), size);
580    }
581
582    #[test]
583    fn test_clear_caches() {
584        let mut config = Config::default();
585        config.memory.block_cache.max_size = 8;
586        config.memory.row_cache.max_size = 8;
587        let manager = MemoryManager::new(&config).unwrap();
588
589        let table_id = TableId::new("ks_table");
590        manager.put_block(&table_id, 1, vec![0u8; 8]);
591        manager.put_row(&table_id, "k1", vec![Value::Text("abcd".into())]);
592
593        manager.clear_caches();
594
595        assert!(manager.get_block(&table_id, 1).is_none());
596        assert!(manager.get_row(&table_id, "k1").is_none());
597    }
598
599    #[test]
600    fn test_memory_limit_enforcement() {
601        let mut config = Config::default();
602        config.memory.max_memory = 128 * 1024 * 1024; // 128MB
603        let manager = MemoryManager::new(&config).unwrap();
604
605        // Allocate buffers up to the limit
606        let buffer1 = manager
607            .allocate_buffer(64 * 1024 * 1024)
608            .expect("first 64MB should succeed");
609        let buffer2 = manager
610            .allocate_buffer(64 * 1024 * 1024)
611            .expect("second 64MB should succeed");
612
613        // Try to exceed the limit
614        let result = manager.allocate_buffer(1024);
615        assert!(result.is_err(), "allocation exceeding limit should fail");
616
617        // Verify error message
618        if let Err(e) = result {
619            let err_msg = e.to_string();
620            assert!(
621                err_msg.contains("Memory limit exceeded"),
622                "error should mention memory limit"
623            );
624        }
625
626        // Verify stats
627        let stats = manager.stats().unwrap();
628        assert_eq!(
629            stats.buffer_allocations, 2,
630            "should have 2 successful allocations"
631        );
632        assert_eq!(
633            stats.total_memory_used,
634            128 * 1024 * 1024,
635            "should be at memory limit"
636        );
637
638        // Deallocate and verify we can allocate again
639        manager.deallocate_buffer(buffer1);
640        let stats = manager.stats().unwrap();
641        assert_eq!(stats.buffer_deallocations, 1);
642        assert_eq!(
643            stats.total_memory_used,
644            64 * 1024 * 1024,
645            "memory should be freed"
646        );
647
648        // Should be able to allocate again after freeing
649        let buffer3 = manager
650            .allocate_buffer(32 * 1024 * 1024)
651            .expect("allocation after free should succeed");
652
653        // Clean up remaining buffers
654        manager.deallocate_buffer(buffer2);
655        manager.deallocate_buffer(buffer3);
656
657        let final_stats = manager.stats().unwrap();
658        assert_eq!(
659            final_stats.total_memory_used, 0,
660            "all memory should be freed"
661        );
662    }
663
664    #[test]
665    fn test_memory_limit_with_buffer_reuse() {
666        let mut config = Config::default();
667        config.memory.max_memory = 128 * 1024 * 1024; // 128MB
668        let manager = MemoryManager::new(&config).unwrap();
669
670        // Allocate two 64MB buffers to reach limit
671        let buffer1 = manager
672            .allocate_buffer(64 * 1024 * 1024)
673            .expect("first 64MB should succeed");
674        let buffer2 = manager
675            .allocate_buffer(64 * 1024 * 1024)
676            .expect("second 64MB should succeed");
677
678        // Deallocate first buffer - it goes to free pool
679        manager.deallocate_buffer(buffer1);
680
681        let stats = manager.stats().unwrap();
682        assert_eq!(
683            stats.total_memory_used,
684            64 * 1024 * 1024,
685            "should have 64MB in use after deallocation"
686        );
687
688        // Allocate same size - should REUSE buffer from free pool
689        let buffer3 = manager
690            .allocate_buffer(64 * 1024 * 1024)
691            .expect("reuse should succeed");
692
693        // Critical: reused buffer should still count toward memory limit
694        let stats = manager.stats().unwrap();
695        assert_eq!(
696            stats.total_memory_used,
697            128 * 1024 * 1024,
698            "reused buffer should count toward memory limit"
699        );
700
701        // Now at limit again - allocation should fail
702        let result = manager.allocate_buffer(1024);
703        assert!(
704            result.is_err(),
705            "allocation should fail when limit reached via buffer reuse"
706        );
707
708        // Verify error message
709        if let Err(e) = result {
710            let err_msg = e.to_string();
711            assert!(
712                err_msg.contains("Memory limit exceeded"),
713                "error should mention memory limit"
714            );
715        }
716
717        // Clean up
718        manager.deallocate_buffer(buffer2);
719        manager.deallocate_buffer(buffer3);
720
721        let final_stats = manager.stats().unwrap();
722        assert_eq!(final_stats.total_memory_used, 0, "all memory freed");
723    }
724}