1use crate::error::{DbxError, DbxResult};
7
8use arrow::array::{ArrayRef, RecordBatch};
9
10use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
11use arrow::ipc::reader::StreamReader;
12
13use dashmap::DashMap;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16
17const DEFAULT_MAX_MEMORY: usize = 1024 * 1024 * 1024;
19
20#[derive(Clone)]
22pub enum CachedData {
23 Typed {
25 schema: SchemaRef,
26 batches: Vec<RecordBatch>,
27 },
28
29 Raw {
31 batches: Vec<RecordBatch>, },
33}
34
35impl CachedData {
36 pub fn batches(&self) -> &[RecordBatch] {
38 match self {
39 CachedData::Typed { batches, .. } => batches,
40 CachedData::Raw { batches } => batches,
41 }
42 }
43
44 pub fn schema(&self) -> SchemaRef {
46 match self {
47 CachedData::Typed { schema, .. } => schema.clone(),
48 CachedData::Raw { batches } => {
49 if batches.is_empty() {
50 Arc::new(Schema::new(vec![
52 Field::new("key", DataType::Binary, false),
53 Field::new("value", DataType::Binary, true),
54 ]))
55 } else {
56 batches[0].schema()
57 }
58 }
59 }
60 }
61
62 pub fn is_typed(&self) -> bool {
64 matches!(self, CachedData::Typed { .. })
65 }
66}
67
68pub struct ColumnarCache {
70 tables: DashMap<String, Arc<TableCache>>,
72
73 max_memory: usize,
75
76 current_memory: AtomicUsize,
78
79 access_counter: AtomicU64,
81}
82
83struct TableCache {
85 data: parking_lot::RwLock<CachedData>,
87
88 _last_sync_ts: AtomicU64,
90
91 last_access: AtomicU64,
93
94 memory_usage: AtomicUsize,
96}
97
98impl ColumnarCache {
99 pub fn new() -> Self {
101 Self::with_memory_limit(DEFAULT_MAX_MEMORY)
102 }
103
104 pub fn with_memory_limit(max_memory: usize) -> Self {
106 Self {
107 tables: DashMap::new(),
108 max_memory,
109 current_memory: AtomicUsize::new(0),
110 access_counter: AtomicU64::new(0),
111 }
112 }
113
114 pub fn memory_usage(&self) -> usize {
116 self.current_memory.load(Ordering::Relaxed)
117 }
118
119 pub fn memory_limit(&self) -> usize {
121 self.max_memory
122 }
123
124 pub fn should_evict(&self) -> bool {
126 self.memory_usage() > self.max_memory
127 }
128
129 pub fn persist_to_disk(&self, table: &str, cache_dir: &str) -> DbxResult<()> {
135 use crate::storage::arrow_ipc::write_ipc_batch;
136 use std::fs;
137 use std::path::Path;
138
139 let table_cache = self
140 .tables
141 .get(table)
142 .ok_or_else(|| DbxError::Storage(format!("Table '{}' not in cache", table)))?;
143
144 let data = table_cache.data.read();
145 let batches = data.batches();
146
147 if batches.is_empty() {
148 return Ok(());
149 }
150
151 let cache_path = Path::new(cache_dir);
152 fs::create_dir_all(cache_path)
153 .map_err(|e| DbxError::Storage(format!("Failed to create cache dir: {}", e)))?;
154
155 for (idx, batch) in batches.iter().enumerate() {
156 let ipc_bytes = write_ipc_batch(batch)?;
157 let file_path = cache_path.join(format!("{}_{}.arrow", table, idx));
158 fs::write(&file_path, ipc_bytes)
159 .map_err(|e| DbxError::Storage(format!("Failed to write cache file: {}", e)))?;
160 }
161
162 Ok(())
163 }
164
165 pub fn load_from_disk(&self, table: &str, cache_dir: &str) -> DbxResult<Vec<RecordBatch>> {
171 use crate::storage::arrow_ipc::read_ipc_batch;
172 use std::fs;
173 use std::path::Path;
174
175 let cache_path = Path::new(cache_dir);
176 if !cache_path.exists() {
177 return Ok(vec![]);
178 }
179
180 let mut batches = Vec::new();
181 let mut idx = 0;
182
183 loop {
184 let file_path = cache_path.join(format!("{}_{}.arrow", table, idx));
185 if !file_path.exists() {
186 break;
187 }
188
189 let ipc_bytes = fs::read(&file_path)
190 .map_err(|e| DbxError::Storage(format!("Failed to read cache file: {}", e)))?;
191 let batch = read_ipc_batch(&ipc_bytes)?;
192 batches.push(batch);
193 idx += 1;
194 }
195
196 if !batches.is_empty() {
197 for batch in &batches {
198 self.insert_batch(table, batch.clone())?;
199 }
200 }
201
202 Ok(batches)
203 }
204
205 pub fn clear_disk_cache(&self, table: &str, cache_dir: &str) -> DbxResult<()> {
207 use std::fs;
208 use std::path::Path;
209
210 let cache_path = Path::new(cache_dir);
211 if !cache_path.exists() {
212 return Ok(());
213 }
214
215 let mut idx = 0;
216 loop {
217 let file_path = cache_path.join(format!("{}_{}.arrow", table, idx));
218 if !file_path.exists() {
219 break;
220 }
221 fs::remove_file(&file_path)
222 .map_err(|e| DbxError::Storage(format!("Failed to remove cache file: {}", e)))?;
223 idx += 1;
224 }
225
226 Ok(())
227 }
228
229 pub fn insert_batch(&self, table: &str, batch: RecordBatch) -> DbxResult<()> {
231 let memory_size = estimate_batch_memory(&batch);
232
233 let mut attempts = 0;
235 const MAX_EVICTION_ATTEMPTS: usize = 10;
236
237 while self.current_memory.load(Ordering::Relaxed) + memory_size > self.max_memory {
238 if attempts >= MAX_EVICTION_ATTEMPTS {
239 return Err(DbxError::Storage(
240 "Columnar cache memory limit exceeded (eviction failed)".to_string(),
241 ));
242 }
243 if !self.evict_lru() {
244 return Err(DbxError::Storage(
246 "Columnar cache memory limit exceeded (nothing to evict)".to_string(),
247 ));
248 }
249 attempts += 1;
250 }
251
252 let table_cache = self.tables.entry(table.to_string()).or_insert_with(|| {
254 Arc::new(TableCache {
255 data: parking_lot::RwLock::new(CachedData::Raw {
256 batches: Vec::new(),
257 }),
258 _last_sync_ts: AtomicU64::new(0),
259 last_access: AtomicU64::new(self.access_counter.fetch_add(1, Ordering::Relaxed)),
260 memory_usage: AtomicUsize::new(0),
261 })
262 });
263
264 table_cache.last_access.store(
266 self.access_counter.fetch_add(1, Ordering::Relaxed),
267 Ordering::Relaxed,
268 );
269
270 let mut data = table_cache.data.write();
272 match &mut *data {
273 CachedData::Raw { batches } => batches.push(batch),
274 CachedData::Typed { batches, .. } => batches.push(batch),
275 }
276
277 table_cache
278 .memory_usage
279 .fetch_add(memory_size, Ordering::Relaxed);
280 self.current_memory
281 .fetch_add(memory_size, Ordering::Relaxed);
282
283 Ok(())
284 }
285
286 pub fn sync_from_storage(
291 &self,
292 table: &str,
293 rows: Vec<(Vec<u8>, Vec<u8>)>,
294 table_schema: Option<SchemaRef>,
295 ) -> DbxResult<usize> {
296 if rows.is_empty() {
297 self.clear_table(table)?;
298 return Ok(0);
299 }
300
301 if let Some(schema) = table_schema {
303 self.sync_typed(table, rows, schema)
304 } else {
305 self.sync_raw(table, rows)
306 }
307 }
308
309 fn deserialize_arrow_ipc(value: &[u8]) -> DbxResult<RecordBatch> {
311 let cursor = std::io::Cursor::new(value);
312 let mut reader = StreamReader::try_new(cursor, None)
313 .map_err(|e| DbxError::Serialization(format!("Arrow IPC read error: {}", e)))?;
314
315 reader
317 .next()
318 .ok_or_else(|| DbxError::Serialization("No batch in Arrow IPC stream".to_string()))?
319 .map_err(|e| DbxError::Serialization(format!("Arrow IPC batch error: {}", e)))
320 }
321
322 fn sync_typed(
324 &self,
325 table: &str,
326 rows: Vec<(Vec<u8>, Vec<u8>)>,
327 schema: SchemaRef,
328 ) -> DbxResult<usize> {
329 let mut batches = Vec::new();
331
332 for (_key, value) in rows.iter() {
333 let batch = Self::deserialize_arrow_ipc(value)?;
334 batches.push(batch);
335 }
336
337 self.clear_table(table)?;
339 if !batches.is_empty() {
340 let consolidated = arrow::compute::concat_batches(&schema, &batches)
341 .map_err(|e| DbxError::Storage(format!("Failed to consolidate batches: {}", e)))?;
342 self.insert_typed_batch(table, schema, consolidated)?;
343 }
344
345 Ok(rows.len())
346 }
347
348 fn sync_raw(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<usize> {
350 use arrow::array::builder::BinaryBuilder;
352 let schema = Arc::new(Schema::new(vec![
353 Field::new("key", DataType::Binary, false),
354 Field::new("value", DataType::Binary, true),
355 ]));
356
357 let mut key_builder = BinaryBuilder::with_capacity(rows.len(), rows.len() * 32);
358 let mut val_builder = BinaryBuilder::with_capacity(rows.len(), rows.len() * 128);
359
360 for (k, v) in rows {
361 let user_key = if k.len() > 8 {
363 if let Ok(vk) = crate::transaction::mvcc::version::VersionedKey::decode(&k) {
364 vk.user_key
365 } else {
366 k
367 }
368 } else {
369 k
370 };
371 key_builder.append_value(user_key);
372 val_builder.append_value(v);
373 }
374
375 let batch = RecordBatch::try_new(
376 schema,
377 vec![
378 Arc::new(key_builder.finish()),
379 Arc::new(val_builder.finish()),
380 ],
381 )?;
382
383 let row_count = batch.num_rows();
384
385 self.clear_table(table)?;
387 self.insert_batch(table, batch)?;
388
389 Ok(row_count)
390 }
391
392 fn insert_typed_batch(
394 &self,
395 table: &str,
396 schema: SchemaRef,
397 batch: RecordBatch,
398 ) -> DbxResult<()> {
399 let memory_size = estimate_batch_memory(&batch);
400
401 let mut attempts = 0;
403 const MAX_EVICTION_ATTEMPTS: usize = 10;
404
405 while self.current_memory.load(Ordering::Relaxed) + memory_size > self.max_memory {
406 if attempts >= MAX_EVICTION_ATTEMPTS {
407 return Err(DbxError::Storage(
408 "Columnar cache memory limit exceeded (eviction failed)".to_string(),
409 ));
410 }
411 if !self.evict_lru() {
412 return Err(DbxError::Storage(
413 "Columnar cache memory limit exceeded (nothing to evict)".to_string(),
414 ));
415 }
416 attempts += 1;
417 }
418
419 let table_cache = {
421 self.tables
422 .entry(table.to_string())
423 .or_insert_with(|| {
424 Arc::new(TableCache {
425 data: parking_lot::RwLock::new(CachedData::Typed {
426 schema: schema.clone(),
427 batches: Vec::new(),
428 }),
429 _last_sync_ts: AtomicU64::new(0),
430 last_access: AtomicU64::new(
431 self.access_counter.fetch_add(1, Ordering::Relaxed),
432 ),
433 memory_usage: AtomicUsize::new(0),
434 })
435 })
436 .clone()
437 };
438
439 table_cache.last_access.store(
441 self.access_counter.fetch_add(1, Ordering::Relaxed),
442 Ordering::Relaxed,
443 );
444
445 let mut data = table_cache.data.write();
447 match &mut *data {
448 CachedData::Typed { batches, .. } => {
449 batches.push(batch);
450 }
451 CachedData::Raw { .. } => {
452 *data = CachedData::Typed {
453 schema,
454 batches: vec![batch],
455 };
456 }
457 }
458 drop(data);
459
460 table_cache
461 .memory_usage
462 .fetch_add(memory_size, Ordering::Relaxed);
463 self.current_memory
464 .fetch_add(memory_size, Ordering::Relaxed);
465
466 Ok(())
467 }
468
469 pub fn get_batches_with_filter<F>(
474 &self,
475 table: &str,
476 projection: Option<&[usize]>,
477 filter: F,
478 ) -> DbxResult<Option<Vec<RecordBatch>>>
479 where
480 F: Fn(&RecordBatch) -> DbxResult<arrow::array::BooleanArray>,
481 {
482 let Some(table_cache) = self.tables.get(table) else {
483 return Ok(None);
484 };
485
486 let current_access = self.access_counter.fetch_add(1, Ordering::Relaxed);
488 table_cache
489 .last_access
490 .store(current_access, Ordering::Relaxed);
491
492 let data = table_cache.data.read();
493 let batches = data.batches();
494
495 if batches.is_empty() {
496 return Ok(None);
497 }
498
499 let mut result = Vec::with_capacity(batches.len());
500
501 for batch in batches.iter() {
502 let mask = filter(batch)?;
504
505 let filtered_batch = arrow::compute::filter_record_batch(batch, &mask)
507 .map_err(|e| DbxError::Storage(format!("Failed to filter batch: {}", e)))?;
508
509 if filtered_batch.num_rows() == 0 {
510 continue;
511 }
512
513 let final_batch = if let Some(indices) = projection {
515 project_batch(&filtered_batch, indices)?
516 } else {
517 filtered_batch
518 };
519
520 result.push(final_batch);
521 }
522
523 Ok(Some(result))
524 }
525
526 pub fn get_batches(
528 &self,
529 table: &str,
530 projection: Option<&[usize]>,
531 ) -> DbxResult<Option<Vec<RecordBatch>>> {
532 let table_key = self
534 .tables
535 .iter()
536 .find(|entry| entry.key().to_lowercase() == table.to_lowercase())
537 .map(|entry| entry.key().clone());
538
539 let lookup_key = table_key.as_deref().unwrap_or(table);
540
541 let Some(table_cache) = self.tables.get(lookup_key) else {
542 return Ok(None);
543 };
544
545 let current_access = self.access_counter.fetch_add(1, Ordering::Relaxed);
547 table_cache
548 .last_access
549 .store(current_access, Ordering::Relaxed);
550
551 let data = table_cache.data.read();
552 let batches = data.batches();
553
554 if batches.is_empty() {
555 return Ok(None);
556 }
557
558 let result = if let Some(indices) = projection {
560 batches
561 .iter()
562 .map(|batch| project_batch(batch, indices))
563 .collect::<DbxResult<Vec<_>>>()?
564 } else {
565 batches.to_vec()
566 };
567
568 Ok(Some(result))
569 }
570
571 pub fn clear_table(&self, table: &str) -> DbxResult<()> {
573 if let Some((_, table_cache)) = self.tables.remove(table) {
574 let memory = table_cache.memory_usage.load(Ordering::Relaxed);
575 self.current_memory.fetch_sub(memory, Ordering::Relaxed);
576 }
577 Ok(())
578 }
579
580 pub fn clear_all(&self) -> DbxResult<()> {
582 self.tables.clear();
583 self.current_memory.store(0, Ordering::Relaxed);
584 Ok(())
585 }
586
587 pub fn get_schema(&self, table: &str) -> Option<SchemaRef> {
589 self.tables.get(table).map(|tc| {
590 let data = tc.data.read();
591 data.schema()
592 })
593 }
594
595 fn evict_lru(&self) -> bool {
598 let candidate = self
604 .tables
605 .iter()
606 .min_by_key(|entry| entry.value().last_access.load(Ordering::Relaxed))
607 .map(|entry| entry.key().clone());
608
609 if let Some(table_to_evict) = candidate {
610 if let Some((_, table_cache)) = self.tables.remove(&table_to_evict) {
612 let memory = table_cache.memory_usage.load(Ordering::Relaxed);
613 self.current_memory.fetch_sub(memory, Ordering::Relaxed);
614 return true;
615 }
616 }
617
618 false
619 }
620
621 pub fn table_names(&self) -> Vec<String> {
623 self.tables.iter().map(|e| e.key().clone()).collect()
624 }
625
626 pub fn has_table(&self, table: &str) -> bool {
628 self.tables.contains_key(table)
629 }
630}
631
632impl Default for ColumnarCache {
633 fn default() -> Self {
634 Self::new()
635 }
636}
637
638fn estimate_batch_memory(batch: &RecordBatch) -> usize {
640 batch
641 .columns()
642 .iter()
643 .map(|array| array.get_array_memory_size())
644 .sum()
645}
646
647fn project_batch(batch: &RecordBatch, indices: &[usize]) -> DbxResult<RecordBatch> {
649 let schema = batch.schema();
650 let columns: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
651
652 let projected_fields: Vec<_> = indices.iter().map(|&i| schema.field(i).clone()).collect();
653 let projected_schema = Arc::new(arrow::datatypes::Schema::new(projected_fields));
654
655 RecordBatch::try_new(projected_schema, columns)
656 .map_err(|e| DbxError::Storage(format!("Failed to project batch: {}", e)))
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662 use arrow::array::{Int32Array, StringArray};
663 use arrow::datatypes::{DataType, Field, Schema};
664
665 fn create_test_batch() -> RecordBatch {
666 let schema = Arc::new(Schema::new(vec![
667 Field::new("id", DataType::Int32, false),
668 Field::new("name", DataType::Utf8, false),
669 ]));
670
671 let id_array = Int32Array::from(vec![1, 2, 3]);
672 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
673
674 RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
675 }
676
677 #[test]
678 fn test_insert_and_get() {
679 let cache = ColumnarCache::new();
680 let batch = create_test_batch();
681
682 cache.insert_batch("users", batch.clone()).unwrap();
683
684 let result = cache.get_batches("users", None).unwrap();
685 assert!(result.is_some());
686 assert_eq!(result.unwrap().len(), 1);
687 }
688
689 #[test]
690 fn test_projection() {
691 let cache = ColumnarCache::new();
692 let batch = create_test_batch();
693
694 cache.insert_batch("users", batch).unwrap();
695
696 let result = cache.get_batches("users", Some(&[0])).unwrap().unwrap();
698 assert_eq!(result[0].num_columns(), 1);
699 assert_eq!(result[0].schema().field(0).name(), "id");
700 }
701
702 #[test]
703 fn test_memory_tracking() {
704 let cache = ColumnarCache::new();
705 let batch = create_test_batch();
706
707 let initial_memory = cache.memory_usage();
708 cache.insert_batch("users", batch).unwrap();
709 let after_insert = cache.memory_usage();
710
711 assert!(after_insert > initial_memory);
712 }
713
714 #[test]
715 fn test_clear_table() {
716 let cache = ColumnarCache::new();
717 let batch = create_test_batch();
718
719 cache.insert_batch("users", batch).unwrap();
720 assert!(cache.get_batches("users", None).unwrap().is_some());
721
722 cache.clear_table("users").unwrap();
723 assert!(cache.get_batches("users", None).unwrap().is_none());
724 assert_eq!(cache.memory_usage(), 0);
725 }
726
727 #[test]
728 fn test_memory_limit() {
729 let cache = ColumnarCache::with_memory_limit(100); let batch = create_test_batch();
731
732 let result = cache.insert_batch("users", batch);
733 assert!(result.is_err()); }
735
736 #[test]
737 fn test_table_names() {
738 let cache = ColumnarCache::new();
739 let batch = create_test_batch();
740
741 cache.insert_batch("users", batch.clone()).unwrap();
742 cache.insert_batch("orders", batch).unwrap();
743
744 let mut names = cache.table_names();
745 names.sort();
746 assert_eq!(names, vec!["orders", "users"]);
747 }
748
749 #[test]
750 fn test_lru_eviction() {
751 let batch = create_test_batch();
752 let batch_size = estimate_batch_memory(&batch);
753
754 let cache = ColumnarCache::with_memory_limit(batch_size * 2 + 100);
756
757 cache.insert_batch("A", batch.clone()).unwrap();
759 cache.insert_batch("B", batch.clone()).unwrap();
760
761 cache.get_batches("A", None).unwrap();
763
764 cache.insert_batch("C", batch.clone()).unwrap();
770
771 let names = cache.table_names();
773 assert!(names.contains(&"A".to_string()));
774 assert!(names.contains(&"C".to_string()));
775 assert!(!names.contains(&"B".to_string())); }
777
778 #[test]
779 fn test_filter_pushdown() {
780 let cache = ColumnarCache::new();
781 let batch = create_test_batch(); cache.insert_batch("users", batch).unwrap();
784
785 let result = cache
787 .get_batches_with_filter("users", None, |batch| {
788 use arrow::array::Array; let id_col = batch
790 .column(0)
791 .as_any()
792 .downcast_ref::<Int32Array>()
793 .unwrap();
794 let mut builder = arrow::array::BooleanBuilder::with_capacity(id_col.len());
795
796 for i in 0..id_col.len() {
797 if id_col.is_null(i) {
798 builder.append_null();
799 } else {
800 builder.append_value(id_col.value(i) > 1);
801 }
802 }
803 Ok(builder.finish())
804 })
805 .unwrap()
806 .unwrap();
807
808 assert_eq!(result.len(), 1);
809 assert_eq!(result[0].num_rows(), 2); let ids = result[0]
812 .column(0)
813 .as_any()
814 .downcast_ref::<Int32Array>()
815 .unwrap();
816 assert_eq!(ids.value(0), 2);
817 assert_eq!(ids.value(1), 3);
818 }
819}