1use lru::LruCache;
4use parking_lot::RwLock;
5use std::collections::HashMap;
6use std::num::NonZeroUsize;
7use std::sync::Arc;
8
9use crate::{types::TableId, Config, Result, Value};
10
11#[derive(Debug)]
13pub struct MemoryManager {
14 block_cache: Arc<RwLock<BlockCache>>,
16
17 row_cache: Arc<RwLock<RowCache>>,
19
20 buffer_pool: Arc<RwLock<BufferPool>>,
22
23 stats: Arc<RwLock<MemoryStats>>,
25}
26
27struct BlockCache {
29 cache: LruCache<BlockKey, Arc<Block>>,
31
32 max_size: usize,
34
35 current_size: usize,
37}
38
39struct RowCache {
41 cache: LruCache<RowKey, Arc<CachedRow>>,
43
44 max_size: usize,
46
47 current_size: usize,
49}
50
51#[derive(Debug)]
53struct BufferPool {
54 free_buffers: HashMap<usize, Vec<Vec<u8>>>,
56
57 allocated_count: usize,
59
60 total_memory: usize,
62
63 max_memory: usize,
65}
66
67#[derive(Debug, Clone, Hash, PartialEq, Eq)]
69struct BlockKey {
70 table_id: TableId,
71 block_id: u64,
72}
73
74#[derive(Debug, Clone, Hash, PartialEq, Eq)]
76struct RowKey {
77 table_id: TableId,
78 row_key: String,
79}
80
81#[derive(Debug)]
83struct Block {
84 size: usize,
86
87 _last_access: std::time::Instant,
89}
90
91#[derive(Debug)]
93struct CachedRow {
94 _data: Vec<Value>,
96
97 size: usize,
99}
100
101impl MemoryManager {
102 pub fn new(config: &Config) -> Result<Self> {
104 let block_cache = Arc::new(RwLock::new(BlockCache::new(
105 config.memory.block_cache.max_size as usize,
106 )));
107 let row_cache = Arc::new(RwLock::new(RowCache::new(
108 config.memory.row_cache.max_size as usize,
109 )));
110 let buffer_pool = Arc::new(RwLock::new(BufferPool::new(
111 config.memory.max_memory as usize,
112 )));
113
114 Ok(Self {
115 block_cache,
116 row_cache,
117 buffer_pool,
118 stats: Arc::new(RwLock::new(MemoryStats::default())),
119 })
120 }
121
122 pub fn get_block(&self, table_id: &TableId, block_id: u64) -> Option<Arc<Block>> {
124 let key = BlockKey {
125 table_id: table_id.clone(),
126 block_id,
127 };
128
129 let mut cache = self.block_cache.write();
130
131 if let Some(block) = cache.cache.get(&key) {
133 {
135 let mut stats = self.stats.write();
136 stats.block_cache_hits += 1;
137 }
138
139 Some(Arc::clone(block))
140 } else {
141 {
143 let mut stats = self.stats.write();
144 stats.block_cache_misses += 1;
145 }
146
147 None
148 }
149 }
150
151 pub fn put_block(&self, table_id: &TableId, block_id: u64, data: Vec<u8>) {
153 let key = BlockKey {
154 table_id: table_id.clone(),
155 block_id,
156 };
157
158 let block = Arc::new(Block {
159 size: data.len(),
160 _last_access: std::time::Instant::now(),
161 });
162
163 let mut cache = self.block_cache.write();
164
165 while cache.current_size + block.size > cache.max_size {
167 if let Some((_, evicted_block)) = cache.cache.pop_lru() {
169 cache.current_size -= evicted_block.size;
170 } else {
171 break;
173 }
174 }
175
176 cache.current_size += block.size;
178 cache.cache.put(key, block);
179 }
180
181 pub fn get_row(&self, table_id: &TableId, row_key: &str) -> Option<Arc<CachedRow>> {
183 let key = RowKey {
184 table_id: table_id.clone(),
185 row_key: row_key.to_string(),
186 };
187
188 let mut cache = self.row_cache.write();
189
190 if let Some(row) = cache.cache.get(&key) {
192 {
194 let mut stats = self.stats.write();
195 stats.row_cache_hits += 1;
196 }
197
198 Some(Arc::clone(row))
199 } else {
200 {
202 let mut stats = self.stats.write();
203 stats.row_cache_misses += 1;
204 }
205
206 None
207 }
208 }
209
210 pub fn put_row(&self, table_id: &TableId, row_key: &str, data: Vec<Value>) {
212 let key = RowKey {
213 table_id: table_id.clone(),
214 row_key: row_key.to_string(),
215 };
216
217 let size = self.estimate_row_size(&data);
218 let row = Arc::new(CachedRow { _data: data, size });
219
220 let mut cache = self.row_cache.write();
221
222 while cache.current_size + row.size > cache.max_size {
224 if let Some((_, evicted_row)) = cache.cache.pop_lru() {
226 cache.current_size -= evicted_row.size;
227 } else {
228 break;
230 }
231 }
232
233 cache.current_size += row.size;
235 cache.cache.put(key, row);
236 }
237
238 pub fn allocate_buffer(&self, size: usize) -> Result<Vec<u8>> {
240 let mut pool = self.buffer_pool.write();
241
242 if let Some(buffers) = pool.free_buffers.get_mut(&size) {
243 if let Some(buffer) = buffers.pop() {
244 pool.allocated_count += 1;
245 pool.total_memory += size;
246
247 let mut stats = self.stats.write();
249 stats.buffer_allocations += 1;
250 stats.total_memory_used = pool.total_memory;
251
252 return Ok(buffer);
253 }
254 }
255
256 if pool.total_memory + size > pool.max_memory {
258 return Err(crate::Error::Memory(format!(
259 "Memory limit exceeded: requested {} bytes would exceed limit of {} bytes (current usage: {} bytes)",
260 size, pool.max_memory, pool.total_memory
261 )));
262 }
263
264 pool.allocated_count += 1;
266 pool.total_memory += size;
267
268 let mut stats = self.stats.write();
270 stats.buffer_allocations += 1;
271 stats.total_memory_used = pool.total_memory;
272
273 Ok(vec![0u8; size])
274 }
275
276 pub fn deallocate_buffer(&self, mut buffer: Vec<u8>) {
278 let size = buffer.len();
279 buffer.clear();
280 buffer.resize(size, 0);
282
283 let mut pool = self.buffer_pool.write();
284 pool.total_memory -= size;
285 pool.free_buffers.entry(size).or_default().push(buffer);
286 pool.allocated_count -= 1;
287
288 let mut stats = self.stats.write();
290 stats.buffer_deallocations += 1;
291 stats.total_memory_used = pool.total_memory;
292 }
293
294 pub fn stats(&self) -> Result<MemoryStats> {
296 let stats = self.stats.read();
297 Ok(stats.clone())
298 }
299
300 pub fn clear_caches(&self) {
302 {
303 let mut cache = self.block_cache.write();
304 cache.cache.clear();
305 cache.current_size = 0;
306 }
307
308 {
309 let mut cache = self.row_cache.write();
310 cache.cache.clear();
311 cache.current_size = 0;
312 }
313 }
314
315 fn estimate_row_size(&self, data: &[Value]) -> usize {
317 data.iter().map(|v| self.estimate_value_size(v)).sum()
318 }
319
320 #[allow(clippy::only_used_in_recursion)]
322 fn estimate_value_size(&self, value: &Value) -> usize {
323 match value {
324 Value::Null => 1,
325 Value::Boolean(_) => 1,
326 Value::Integer(_) => 4,
327 Value::BigInt(_) => 8,
328 Value::Counter(_) => 8,
329 Value::Float(_) => 8,
330 Value::Text(s) => s.len(),
331 Value::Blob(b) => b.len(),
332 Value::Timestamp(_) => 8,
333 Value::Date(_) => 4,
334 Value::Time(_) => 8,
335 Value::Uuid(_) => 16,
336 Value::Inet(bytes) => bytes.len(),
337 Value::Json(json) => json.to_string().len(),
338 Value::List(items) => items.iter().map(|v| self.estimate_value_size(v)).sum(),
339 Value::Map(map) => map
340 .iter()
341 .map(|(k, v)| self.estimate_value_size(k) + self.estimate_value_size(v))
342 .sum(),
343 Value::TinyInt(_) => 1,
344 Value::SmallInt(_) => 2,
345 Value::Float32(_) => 4,
346 Value::Set(items) => items.iter().map(|v| self.estimate_value_size(v)).sum(),
347 Value::Tuple(items) => items.iter().map(|v| self.estimate_value_size(v)).sum(),
348 Value::Udt(udt) => udt
349 .fields
350 .iter()
351 .map(|f| f.value.as_ref().map_or(0, |v| self.estimate_value_size(v)))
352 .sum(),
353 Value::Frozen(boxed_value) => self.estimate_value_size(boxed_value),
354 Value::Varint(data) => data.len(),
355 Value::Decimal { unscaled, .. } => 4 + unscaled.len(), Value::Duration { .. } => 12, Value::Tombstone(_) => 16, }
359 }
360}
361
362impl std::fmt::Debug for BlockCache {
363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364 f.debug_struct("BlockCache")
365 .field("max_size", &self.max_size)
366 .field("current_size", &self.current_size)
367 .field("cache_len", &self.cache.len())
368 .finish()
369 }
370}
371
372impl BlockCache {
373 fn new(max_size: usize) -> Self {
374 let capacity = NonZeroUsize::new(1000).expect("capacity must be non-zero");
378 Self {
379 cache: LruCache::new(capacity),
380 max_size,
381 current_size: 0,
382 }
383 }
384}
385
386impl std::fmt::Debug for RowCache {
387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388 f.debug_struct("RowCache")
389 .field("max_size", &self.max_size)
390 .field("current_size", &self.current_size)
391 .field("cache_len", &self.cache.len())
392 .finish()
393 }
394}
395
396impl RowCache {
397 fn new(max_size: usize) -> Self {
398 let capacity = NonZeroUsize::new(1000).expect("capacity must be non-zero");
402 Self {
403 cache: LruCache::new(capacity),
404 max_size,
405 current_size: 0,
406 }
407 }
408}
409
410impl BufferPool {
411 fn new(max_memory: usize) -> Self {
412 Self {
413 free_buffers: HashMap::new(),
414 allocated_count: 0,
415 total_memory: 0,
416 max_memory,
417 }
418 }
419}
420
421#[derive(Debug, Clone, Default)]
423pub struct MemoryStats {
424 pub block_cache_hits: u64,
426
427 pub block_cache_misses: u64,
429
430 pub row_cache_hits: u64,
432
433 pub row_cache_misses: u64,
435
436 pub total_memory_used: usize,
438
439 pub buffer_allocations: u64,
441
442 pub buffer_deallocations: u64,
444}
445
446impl MemoryStats {
447 pub fn block_cache_hit_rate(&self) -> f64 {
449 let total = self.block_cache_hits + self.block_cache_misses;
450 if total > 0 {
451 self.block_cache_hits as f64 / total as f64
452 } else {
453 0.0
454 }
455 }
456
457 pub fn row_cache_hit_rate(&self) -> f64 {
459 let total = self.row_cache_hits + self.row_cache_misses;
460 if total > 0 {
461 self.row_cache_hits as f64 / total as f64
462 } else {
463 0.0
464 }
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use crate::types::TableId;
472
473 #[test]
474 fn test_memory_manager_creation() {
475 let config = Config::default();
476 let manager = MemoryManager::new(&config).unwrap();
477
478 let stats = manager.stats().unwrap();
479 assert_eq!(stats.block_cache_hits, 0);
480 assert_eq!(stats.block_cache_misses, 0);
481 }
482
483 #[test]
484 fn test_block_cache() {
485 let config = Config::default();
486 let manager = MemoryManager::new(&config).unwrap();
487
488 let table_id = TableId::new("test_table");
489 let block_id = 1;
490 let data = vec![1, 2, 3, 4, 5];
491
492 let result = manager.get_block(&table_id, block_id);
494 assert!(result.is_none());
495
496 manager.put_block(&table_id, block_id, data.clone());
498
499 let result = manager.get_block(&table_id, block_id);
501 assert!(result.is_some());
502 assert_eq!(result.unwrap().size, data.len());
503 }
504
505 #[test]
506 fn test_block_cache_eviction_updates_stats() {
507 let mut config = Config::default();
508 config.memory.block_cache.max_size = 8;
509 let manager = MemoryManager::new(&config).unwrap();
510
511 let table_id = TableId::new("ks_table");
512
513 manager.put_block(&table_id, 1, vec![0u8; 8]);
514 manager.put_block(&table_id, 2, vec![0u8; 4]); assert!(manager.get_block(&table_id, 1).is_none());
517 assert!(manager.get_block(&table_id, 2).is_some());
518
519 let stats = manager.stats().unwrap();
520 assert_eq!(stats.block_cache_hits, 1);
521 assert_eq!(stats.block_cache_misses, 1);
522 }
523
524 #[test]
525 fn test_row_cache() {
526 let config = Config::default();
527 let manager = MemoryManager::new(&config).unwrap();
528
529 let table_id = TableId::new("test_table");
530 let row_key = "test_key";
531 let data = vec![Value::Integer(42), Value::Text("hello".to_string())];
532
533 let result = manager.get_row(&table_id, row_key);
535 assert!(result.is_none());
536
537 manager.put_row(&table_id, row_key, data.clone());
539
540 let result = manager.get_row(&table_id, row_key);
542 assert!(result.is_some());
543 assert_eq!(result.unwrap()._data, data);
544 }
545
546 #[test]
547 fn test_row_cache_eviction_and_stats() {
548 let mut config = Config::default();
549 config.memory.row_cache.max_size = 8;
550 let manager = MemoryManager::new(&config).unwrap();
551
552 let table_id = TableId::new("ks_table");
553
554 manager.put_row(&table_id, "k1", vec![Value::Text("abcd".into())]);
555 manager.put_row(&table_id, "k2", vec![Value::Text("efgh".into())]);
556 manager.put_row(&table_id, "k3", vec![Value::Text("ijkl".into())]);
557
558 assert!(manager.get_row(&table_id, "k1").is_none());
559 assert!(manager.get_row(&table_id, "k3").is_some());
560
561 let stats = manager.stats().unwrap();
562 assert_eq!(stats.row_cache_hits, 1);
563 assert_eq!(stats.row_cache_misses, 1);
564 }
565
566 #[test]
567 fn test_buffer_pool() {
568 let config = Config::default();
569 let manager = MemoryManager::new(&config).unwrap();
570
571 let size = 1024;
572 let buffer = manager.allocate_buffer(size).unwrap();
573 assert_eq!(buffer.len(), size);
574
575 manager.deallocate_buffer(buffer);
576
577 let buffer2 = manager.allocate_buffer(size).unwrap();
579 assert_eq!(buffer2.len(), size);
580 }
581
582 #[test]
583 fn test_clear_caches() {
584 let mut config = Config::default();
585 config.memory.block_cache.max_size = 8;
586 config.memory.row_cache.max_size = 8;
587 let manager = MemoryManager::new(&config).unwrap();
588
589 let table_id = TableId::new("ks_table");
590 manager.put_block(&table_id, 1, vec![0u8; 8]);
591 manager.put_row(&table_id, "k1", vec![Value::Text("abcd".into())]);
592
593 manager.clear_caches();
594
595 assert!(manager.get_block(&table_id, 1).is_none());
596 assert!(manager.get_row(&table_id, "k1").is_none());
597 }
598
599 #[test]
600 fn test_memory_limit_enforcement() {
601 let mut config = Config::default();
602 config.memory.max_memory = 128 * 1024 * 1024; let manager = MemoryManager::new(&config).unwrap();
604
605 let buffer1 = manager
607 .allocate_buffer(64 * 1024 * 1024)
608 .expect("first 64MB should succeed");
609 let buffer2 = manager
610 .allocate_buffer(64 * 1024 * 1024)
611 .expect("second 64MB should succeed");
612
613 let result = manager.allocate_buffer(1024);
615 assert!(result.is_err(), "allocation exceeding limit should fail");
616
617 if let Err(e) = result {
619 let err_msg = e.to_string();
620 assert!(
621 err_msg.contains("Memory limit exceeded"),
622 "error should mention memory limit"
623 );
624 }
625
626 let stats = manager.stats().unwrap();
628 assert_eq!(
629 stats.buffer_allocations, 2,
630 "should have 2 successful allocations"
631 );
632 assert_eq!(
633 stats.total_memory_used,
634 128 * 1024 * 1024,
635 "should be at memory limit"
636 );
637
638 manager.deallocate_buffer(buffer1);
640 let stats = manager.stats().unwrap();
641 assert_eq!(stats.buffer_deallocations, 1);
642 assert_eq!(
643 stats.total_memory_used,
644 64 * 1024 * 1024,
645 "memory should be freed"
646 );
647
648 let buffer3 = manager
650 .allocate_buffer(32 * 1024 * 1024)
651 .expect("allocation after free should succeed");
652
653 manager.deallocate_buffer(buffer2);
655 manager.deallocate_buffer(buffer3);
656
657 let final_stats = manager.stats().unwrap();
658 assert_eq!(
659 final_stats.total_memory_used, 0,
660 "all memory should be freed"
661 );
662 }
663
664 #[test]
665 fn test_memory_limit_with_buffer_reuse() {
666 let mut config = Config::default();
667 config.memory.max_memory = 128 * 1024 * 1024; let manager = MemoryManager::new(&config).unwrap();
669
670 let buffer1 = manager
672 .allocate_buffer(64 * 1024 * 1024)
673 .expect("first 64MB should succeed");
674 let buffer2 = manager
675 .allocate_buffer(64 * 1024 * 1024)
676 .expect("second 64MB should succeed");
677
678 manager.deallocate_buffer(buffer1);
680
681 let stats = manager.stats().unwrap();
682 assert_eq!(
683 stats.total_memory_used,
684 64 * 1024 * 1024,
685 "should have 64MB in use after deallocation"
686 );
687
688 let buffer3 = manager
690 .allocate_buffer(64 * 1024 * 1024)
691 .expect("reuse should succeed");
692
693 let stats = manager.stats().unwrap();
695 assert_eq!(
696 stats.total_memory_used,
697 128 * 1024 * 1024,
698 "reused buffer should count toward memory limit"
699 );
700
701 let result = manager.allocate_buffer(1024);
703 assert!(
704 result.is_err(),
705 "allocation should fail when limit reached via buffer reuse"
706 );
707
708 if let Err(e) = result {
710 let err_msg = e.to_string();
711 assert!(
712 err_msg.contains("Memory limit exceeded"),
713 "error should mention memory limit"
714 );
715 }
716
717 manager.deallocate_buffer(buffer2);
719 manager.deallocate_buffer(buffer3);
720
721 let final_stats = manager.stats().unwrap();
722 assert_eq!(final_stats.total_memory_used, 0, "all memory freed");
723 }
724}