use crate::error::{DbxError, DbxResult};
use crate::storage::StorageBackend;
use arrow::array::{ArrayRef, BinaryBuilder, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use dashmap::DashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
const DEFAULT_MAX_MEMORY: usize = 1024 * 1024 * 1024;
pub struct ColumnarCache {
tables: DashMap<String, Arc<TableCache>>,
max_memory: usize,
current_memory: AtomicUsize,
access_counter: AtomicU64,
}
struct TableCache {
schema: SchemaRef,
batches: parking_lot::RwLock<Vec<RecordBatch>>,
_last_sync_ts: AtomicU64,
last_access: AtomicU64,
memory_usage: AtomicUsize,
}
impl ColumnarCache {
pub fn new() -> Self {
Self::with_memory_limit(DEFAULT_MAX_MEMORY)
}
pub fn with_memory_limit(max_memory: usize) -> Self {
Self {
tables: DashMap::new(),
max_memory,
current_memory: AtomicUsize::new(0),
access_counter: AtomicU64::new(0),
}
}
pub fn memory_usage(&self) -> usize {
self.current_memory.load(Ordering::Relaxed)
}
pub fn memory_limit(&self) -> usize {
self.max_memory
}
pub fn should_evict(&self) -> bool {
self.memory_usage() > self.max_memory
}
pub fn persist_to_disk(&self, table: &str, cache_dir: &str) -> DbxResult<()> {
use crate::storage::arrow_ipc::write_ipc_batch;
use std::fs;
use std::path::Path;
let table_cache = self.tables.get(table)
.ok_or_else(|| DbxError::Storage(format!("Table '{}' not in cache", table)))?;
let batches = table_cache.batches.read();
if batches.is_empty() {
return Ok(()); }
let cache_path = Path::new(cache_dir);
fs::create_dir_all(cache_path)
.map_err(|e| DbxError::Storage(format!("Failed to create cache dir: {}", e)))?;
for (idx, batch) in batches.iter().enumerate() {
let ipc_bytes = write_ipc_batch(batch)?;
let file_path = cache_path.join(format!("{}_{}.arrow", table, idx));
fs::write(&file_path, ipc_bytes)
.map_err(|e| DbxError::Storage(format!("Failed to write cache file: {}", e)))?;
}
Ok(())
}
pub fn load_from_disk(&self, table: &str, cache_dir: &str) -> DbxResult<Vec<RecordBatch>> {
use crate::storage::arrow_ipc::read_ipc_batch;
use std::fs;
use std::path::Path;
let cache_path = Path::new(cache_dir);
if !cache_path.exists() {
return Ok(vec![]); }
let mut batches = Vec::new();
let mut idx = 0;
loop {
let file_path = cache_path.join(format!("{}_{}.arrow", table, idx));
if !file_path.exists() {
break; }
let ipc_bytes = fs::read(&file_path)
.map_err(|e| DbxError::Storage(format!("Failed to read cache file: {}", e)))?;
let batch = read_ipc_batch(&ipc_bytes)?;
batches.push(batch);
idx += 1;
}
if !batches.is_empty() {
for batch in &batches {
self.insert_batch(table, batch.clone())?;
}
}
Ok(batches)
}
pub fn clear_disk_cache(&self, table: &str, cache_dir: &str) -> DbxResult<()> {
use std::fs;
use std::path::Path;
let cache_path = Path::new(cache_dir);
if !cache_path.exists() {
return Ok(());
}
let mut idx = 0;
loop {
let file_path = cache_path.join(format!("{}_{}.arrow", table, idx));
if !file_path.exists() {
break;
}
fs::remove_file(&file_path)
.map_err(|e| DbxError::Storage(format!("Failed to remove cache file: {}", e)))?;
idx += 1;
}
Ok(())
}