use arrow::array::cast::AsArray;
use arrow::array::{ArrayRef, BooleanArray};
use arrow::buffer::BooleanBuffer;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use futures::StreamExt;
use super::{
budget::BudgetAccounting,
builders::{EvaluatePredicate, Get, Insert},
cached_batch::{CacheEntry, CachedBatchType},
io_context::{EntryMetadata, entry_id_to_key},
observer::{CacheTracer, InternalEvent, Observer},
policies::{CachePolicy, HydrationPolicy, HydrationRequest, MaterializedEntry},
utils::CacheConfig,
};
use crate::cache::DefaultSqueezeIo;
use crate::cache::policies::{SqueezeOutcome, SqueezePolicy};
use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
use crate::cache::{CacheExpression, LiquidExpr, index::ArtIndex, utils::EntryID};
use crate::cache::{CacheFull, CacheStats, EventTrace};
use crate::liquid_array::{
LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, SqueezedDate32Array,
VariantStructSqueezedArray,
};
use crate::sync::Arc;
#[derive(Debug)]
pub struct LiquidCache {
index: ArtIndex,
config: CacheConfig,
budget: BudgetAccounting,
cache_policy: Box<dyn CachePolicy>,
hydration_policy: Box<dyn HydrationPolicy>,
squeeze_policy: Box<dyn SqueezePolicy>,
observer: Arc<Observer>,
metadata: Arc<dyn EntryMetadata>,
store: t4::Store,
squeeze_victims_concurrently: bool,
}
impl LiquidCache {
pub fn stats(&self) -> CacheStats {
let total_entries = self.index.entry_count();
let mut memory_arrow_entries = 0usize;
let mut memory_liquid_entries = 0usize;
let mut memory_squeezed_liquid_entries = 0usize;
let mut disk_liquid_entries = 0usize;
let mut disk_arrow_entries = 0usize;
let mut memory_arrow_bytes = 0usize;
let mut memory_liquid_bytes = 0usize;
let mut memory_squeezed_liquid_bytes = 0usize;
self.index.for_each(|_, batch| match batch {
CacheEntry::MemoryArrow(array) => {
memory_arrow_entries += 1;
memory_arrow_bytes += array.get_array_memory_size();
}
CacheEntry::MemoryLiquid(array) => {
memory_liquid_entries += 1;
memory_liquid_bytes += array.get_array_memory_size();
}
CacheEntry::MemorySqueezedLiquid(array) => {
memory_squeezed_liquid_entries += 1;
memory_squeezed_liquid_bytes += array.get_array_memory_size();
}
CacheEntry::DiskLiquid { .. } => disk_liquid_entries += 1,
CacheEntry::DiskArrow { .. } => disk_arrow_entries += 1,
});
let memory_usage_bytes = self.budget.memory_usage_bytes();
let disk_usage_bytes = self.budget.disk_usage_bytes();
let runtime = self.observer.runtime_snapshot();
CacheStats {
total_entries,
memory_arrow_entries,
memory_liquid_entries,
memory_squeezed_liquid_entries,
disk_liquid_entries,
disk_arrow_entries,
memory_arrow_bytes,
memory_liquid_bytes,
memory_squeezed_liquid_bytes,
memory_usage_bytes,
disk_usage_bytes,
max_memory_bytes: self.config.max_memory_bytes(),
max_disk_bytes: self.config.max_disk_bytes(),
runtime,
}
}
pub fn insert<'a>(
self: &'a Arc<Self>,
entry_id: EntryID,
batch_to_cache: ArrayRef,
) -> Insert<'a> {
Insert::new(self, entry_id, batch_to_cache)
}
pub fn get<'a>(&'a self, entry_id: &'a EntryID) -> Get<'a> {
Get::new(self, entry_id)
}
pub fn eval_predicate<'a>(
&'a self,
entry_id: &'a EntryID,
predicate: &'a LiquidExpr,
) -> EvaluatePredicate<'a> {
EvaluatePredicate::new(self, entry_id, predicate)
}
pub async fn try_read_liquid(
&self,
entry_id: &EntryID,
) -> Option<crate::liquid_array::LiquidArrayRef> {
self.observer.on_try_read_liquid();
self.trace(InternalEvent::TryReadLiquid { entry: *entry_id });
let batch = self.index.get(entry_id)?;
self.cache_policy
.notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
match batch.as_ref() {
CacheEntry::MemoryLiquid(array) => Some(array.clone()),
entry @ CacheEntry::DiskLiquid { .. } => {
let liquid = self.read_disk_liquid_array(entry_id).await;
self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
.await;
Some(liquid)
}
CacheEntry::MemorySqueezedLiquid(array) => match array.disk_backing() {
SqueezedBacking::Liquid(_) => {
let liquid = self.read_disk_liquid_array(entry_id).await;
Some(liquid)
}
SqueezedBacking::Arrow(_) => None,
},
CacheEntry::DiskArrow { .. } | CacheEntry::MemoryArrow(_) => None,
}
}
pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CacheEntry)) {
self.index.for_each(&mut f);
}
pub fn reset(&self) {
self.index.reset();
self.budget.reset_usage();
}
pub fn is_cached(&self, entry_id: &EntryID) -> bool {
self.index.is_cached(entry_id)
}
pub fn config(&self) -> &CacheConfig {
&self.config
}
pub fn budget(&self) -> &BudgetAccounting {
&self.budget
}
pub fn tracer(&self) -> &CacheTracer {
self.observer.cache_tracer()
}
pub fn observer(&self) -> &Observer {
&self.observer
}
pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
self.metadata.get_compressor(entry_id)
}
pub fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
self.metadata.add_squeeze_hint(entry_id, expression);
}
pub async fn flush_all_to_disk(&self) -> Result<(), CacheFull> {
let mut entires = Vec::new();
self.for_each_entry(|entry_id, batch| {
entires.push((*entry_id, batch.clone()));
});
for (entry_id, batch) in entires {
match &batch {
CacheEntry::MemoryArrow(array) => {
let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
let disk_bytes = bytes.len();
match self.write_batch_to_disk(entry_id, &batch, bytes).await {
Ok(()) => {
self.try_insert(
entry_id,
CacheEntry::disk_arrow(array.data_type().clone(), disk_bytes),
)
.expect("failed to insert disk arrow entry");
}
Err(CacheFull) => self.drop_memory_entry(entry_id, &batch),
}
}
CacheEntry::MemoryLiquid(liquid_array) => {
let liquid_bytes = liquid_array.to_bytes();
let disk_bytes = liquid_bytes.len();
match self
.write_batch_to_disk(entry_id, &batch, Bytes::from(liquid_bytes))
.await
{
Ok(()) => {
self.try_insert(
entry_id,
CacheEntry::disk_liquid(
liquid_array.original_arrow_data_type(),
disk_bytes,
),
)
.expect("failed to insert disk liquid entry");
}
Err(CacheFull) => self.drop_memory_entry(entry_id, &batch),
}
}
CacheEntry::MemorySqueezedLiquid(array) => {
let disk_entry = Self::disk_entry_from_squeezed(array);
self.try_insert(entry_id, disk_entry)
.expect("failed to insert disk entry");
}
CacheEntry::DiskArrow { .. } | CacheEntry::DiskLiquid { .. } => {
}
}
}
Ok(())
}
}
impl LiquidCache {
async fn write_in_memory_batch_to_disk(
&self,
entry_id: EntryID,
batch: CacheEntry,
) -> Result<CacheEntry, CacheFull> {
match &batch {
batch @ CacheEntry::MemoryArrow(_) => {
let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
self.store.clone(),
entry_id,
self.observer.clone(),
));
let outcome = self.squeeze_policy.squeeze(
batch,
self.metadata.get_compressor(&entry_id).as_ref(),
None,
&squeeze_io,
);
let SqueezeOutcome::Replace {
entry: new_batch,
bytes_to_write,
} = outcome
else {
unreachable!("memory arrow squeeze cannot remove entry");
};
if let Some(bytes_to_write) = bytes_to_write {
self.write_batch_to_disk(entry_id, &new_batch, bytes_to_write)
.await?;
}
Ok(new_batch)
}
CacheEntry::MemoryLiquid(liquid_array) => {
let liquid_bytes = Bytes::from(liquid_array.to_bytes());
let disk_bytes = liquid_bytes.len();
self.write_batch_to_disk(entry_id, &batch, liquid_bytes)
.await?;
Ok(CacheEntry::disk_liquid(
liquid_array.original_arrow_data_type(),
disk_bytes,
))
}
CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
let data_type = squeezed_array.original_arrow_data_type();
let entry = match squeezed_array.disk_backing() {
SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
};
Ok(entry)
}
CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => {
unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
}
}
}
pub(crate) async fn insert_inner(
&self,
entry_id: EntryID,
mut batch_to_cache: CacheEntry,
) -> Result<(), CacheFull> {
loop {
let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
return Ok(());
};
self.trace(InternalEvent::InsertFailed {
entry: entry_id,
kind: CachedBatchType::from(¬_inserted),
});
let victims = self.cache_policy.find_memory_victim(8);
if victims.is_empty() {
let on_disk_batch = self
.write_in_memory_batch_to_disk(entry_id, not_inserted)
.await?;
batch_to_cache = on_disk_batch;
continue;
}
self.squeeze_victims(victims).await?;
batch_to_cache = not_inserted;
crate::utils::yield_now_if_shuttle();
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
batch_size: usize,
max_memory_bytes: usize,
max_disk_bytes: usize,
squeeze_policy: Box<dyn SqueezePolicy>,
cache_policy: Box<dyn CachePolicy>,
hydration_policy: Box<dyn HydrationPolicy>,
metadata: Arc<dyn EntryMetadata>,
store: t4::Store,
squeeze_victims_concurrently: bool,
) -> Self {
let config = CacheConfig::new(batch_size, max_memory_bytes, max_disk_bytes);
let observer = Arc::new(Observer::new());
Self {
index: ArtIndex::new(),
budget: BudgetAccounting::new(
config.max_memory_bytes(),
config.max_disk_bytes(),
observer.clone(),
),
config,
cache_policy,
hydration_policy,
squeeze_policy,
observer,
metadata,
store,
squeeze_victims_concurrently,
}
}
fn try_insert(&self, entry_id: EntryID, to_insert: CacheEntry) -> Result<(), CacheEntry> {
let new_memory_size = to_insert.memory_usage_bytes();
let cached_batch_type = if let Some(entry) = self.index.get(&entry_id) {
let old_memory_size = entry.memory_usage_bytes();
if self
.budget
.try_update_memory_usage(old_memory_size, new_memory_size)
.is_err()
{
return Err(to_insert);
}
let batch_type = CachedBatchType::from(&to_insert);
self.index.insert(&entry_id, to_insert);
batch_type
} else {
if self.budget.try_reserve_memory(new_memory_size).is_err() {
return Err(to_insert);
}
let batch_type = CachedBatchType::from(&to_insert);
self.index.insert(&entry_id, to_insert);
batch_type
};
self.trace(InternalEvent::InsertSuccess {
entry: entry_id,
kind: cached_batch_type,
});
self.cache_policy
.notify_insert(&entry_id, cached_batch_type);
Ok(())
}
fn drop_memory_entry(&self, entry_id: EntryID, _expected: &CacheEntry) {
let Some(removed) = self.index.remove(&entry_id) else {
return;
};
assert!(
matches!(
removed.as_ref(),
CacheEntry::MemoryArrow(_)
| CacheEntry::MemoryLiquid(_)
| CacheEntry::MemorySqueezedLiquid(_)
),
"flush should only drop memory entries"
);
self.budget
.try_update_memory_usage(removed.memory_usage_bytes(), 0)
.expect("memory release cannot fail");
self.cache_policy.notify_remove(&entry_id);
}
async fn remove_disk_entry(&self, entry_id: EntryID) {
let Some(removed) = self.index.remove(&entry_id) else {
return;
};
let disk_bytes = match removed.as_ref() {
CacheEntry::DiskLiquid { disk_bytes, .. }
| CacheEntry::DiskArrow { disk_bytes, .. } => *disk_bytes,
_ => panic!("remove_disk_entry called for non-disk entry"),
};
self.store
.remove(&entry_id_to_key(&entry_id))
.await
.expect("disk remove failed");
self.budget.release_disk(disk_bytes);
self.cache_policy.notify_remove(&entry_id);
self.trace(InternalEvent::DiskEvict {
entry: entry_id,
bytes: disk_bytes,
});
}
pub fn consume_event_trace(&self) -> EventTrace {
self.observer.consume_event_trace()
}
pub(crate) fn trace(&self, event: InternalEvent) {
self.observer.record_internal(event);
}
#[cfg(test)]
pub(crate) fn index(&self) -> &ArtIndex {
&self.index
}
#[fastrace::trace]
async fn squeeze_victims(&self, victims: Vec<EntryID>) -> Result<(), CacheFull> {
self.trace(InternalEvent::SqueezeBegin {
victims: victims.clone(),
});
if self.squeeze_victims_concurrently {
let results = futures::stream::iter(victims)
.map(|victim| self.squeeze_victim_inner(victim))
.buffer_unordered(usize::MAX)
.collect::<Vec<_>>()
.await;
results.into_iter().collect::<Result<Vec<_>, _>>()?;
} else {
for victim in victims {
self.squeeze_victim_inner(victim).await?;
}
}
Ok(())
}
async fn squeeze_victim_inner(&self, to_squeeze: EntryID) -> Result<(), CacheFull> {
let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
return Ok(());
};
self.trace(InternalEvent::SqueezeVictim { entry: to_squeeze });
let compressor = self.metadata.get_compressor(&to_squeeze);
let squeeze_hint_arc = self.metadata.squeeze_hint(&to_squeeze);
let squeeze_hint = squeeze_hint_arc.as_deref();
let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
self.store.clone(),
to_squeeze,
self.observer.clone(),
));
loop {
let outcome = self.squeeze_policy.squeeze(
to_squeeze_batch.as_ref(),
compressor.as_ref(),
squeeze_hint,
&squeeze_io,
);
match outcome {
SqueezeOutcome::Replace {
entry: new_batch,
bytes_to_write,
} => {
if let Some(bytes_to_write) = bytes_to_write {
self.write_batch_to_disk(to_squeeze, &new_batch, bytes_to_write)
.await?;
}
match self.try_insert(to_squeeze, new_batch) {
Ok(()) => {
break;
}
Err(batch) => {
to_squeeze_batch = Arc::new(batch);
}
}
}
SqueezeOutcome::Remove => {
self.remove_disk_entry(to_squeeze).await;
break;
}
}
}
Ok(())
}
fn disk_entry_from_squeezed(array: &LiquidSqueezedArrayRef) -> CacheEntry {
let data_type = array.original_arrow_data_type();
match array.disk_backing() {
SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
}
}
async fn maybe_hydrate(
&self,
entry_id: &EntryID,
cached: &CacheEntry,
materialized: MaterializedEntry<'_>,
expression: Option<&CacheExpression>,
) {
let compressor = self.metadata.get_compressor(entry_id);
if let Some(new_entry) = self.hydration_policy.hydrate(&HydrationRequest {
entry_id: *entry_id,
cached,
materialized,
expression,
compressor,
}) {
let cached_type = CachedBatchType::from(cached);
let new_type = CachedBatchType::from(&new_entry);
self.trace(InternalEvent::Hydrate {
entry: *entry_id,
cached: cached_type,
new: new_type,
});
let _ = self.insert_inner(*entry_id, new_entry).await;
}
}
pub(crate) async fn read_arrow_array(
&self,
entry_id: &EntryID,
selection: Option<&BooleanBuffer>,
expression: Option<&CacheExpression>,
) -> Option<ArrayRef> {
use arrow::array::BooleanArray;
let batch = self.index.get(entry_id)?;
self.cache_policy
.notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
self.trace(InternalEvent::Read {
entry: *entry_id,
selection: selection.is_some(),
expr: expression.cloned(),
cached: CachedBatchType::from(batch.as_ref()),
});
match batch.as_ref() {
CacheEntry::MemoryArrow(array) => match selection {
Some(selection) => {
let selection_array = BooleanArray::new(selection.clone(), None);
arrow::compute::filter(array, &selection_array).ok()
}
None => Some(array.clone()),
},
CacheEntry::MemoryLiquid(array) => match selection {
Some(selection) => Some(array.filter(selection)),
None => Some(array.to_arrow_array()),
},
CacheEntry::DiskArrow { .. } | CacheEntry::DiskLiquid { .. } => {
self.read_disk_array(batch.as_ref(), entry_id, expression, selection)
.await
}
CacheEntry::MemorySqueezedLiquid(array) => {
self.read_squeezed_array(array, entry_id, expression, selection)
.await
}
}
}
async fn read_disk_array(
&self,
entry: &CacheEntry,
entry_id: &EntryID,
expression: Option<&CacheExpression>,
selection: Option<&BooleanBuffer>,
) -> Option<ArrayRef> {
match entry {
CacheEntry::DiskArrow { data_type, .. } => {
if let Some(selection) = selection
&& selection.count_set_bits() == 0
{
return Some(arrow::array::new_empty_array(data_type));
}
let full_array = self.read_disk_arrow_array(entry_id).await;
self.maybe_hydrate(
entry_id,
entry,
MaterializedEntry::Arrow(&full_array),
expression,
)
.await;
match selection {
Some(selection) => {
let selection_array = BooleanArray::new(selection.clone(), None);
arrow::compute::filter(&full_array, &selection_array).ok()
}
None => Some(full_array),
}
}
CacheEntry::DiskLiquid { data_type, .. } => {
if let Some(selection) = selection
&& selection.count_set_bits() == 0
{
return Some(arrow::array::new_empty_array(data_type));
}
let liquid = self.read_disk_liquid_array(entry_id).await;
self.maybe_hydrate(
entry_id,
entry,
MaterializedEntry::Liquid(&liquid),
expression,
)
.await;
match selection {
Some(selection) => Some(liquid.filter(selection)),
None => Some(liquid.to_arrow_array()),
}
}
_ => unreachable!("Unexpected batch in read_disk_array"),
}
}
async fn read_squeezed_array(
&self,
array: &LiquidSqueezedArrayRef,
entry_id: &EntryID,
expression: Option<&CacheExpression>,
selection: Option<&BooleanBuffer>,
) -> Option<ArrayRef> {
if let Some(array) = self.try_read_squeezed_date32_array(array, expression, selection) {
self.observer.on_get_squeezed_success();
self.trace(InternalEvent::ReadSqueezedData {
entry: *entry_id,
expression: expression.unwrap().clone(),
});
return Some(array);
}
if let Some(array) = self
.try_read_squeezed_variant_array(array, entry_id, expression, selection)
.await
{
self.observer.on_get_squeezed_success();
self.trace(InternalEvent::ReadSqueezedData {
entry: *entry_id,
expression: expression.unwrap().clone(),
});
return Some(array);
}
let out = match selection {
Some(selection) => array.filter(selection).await,
None => array.to_arrow_array().await,
};
Some(out)
}
fn try_read_squeezed_date32_array(
&self,
array: &LiquidSqueezedArrayRef,
expression: Option<&CacheExpression>,
selection: Option<&BooleanBuffer>,
) -> Option<ArrayRef> {
if let Some(CacheExpression::ExtractDate32 { field }) = expression
&& let Some(squeezed) = array.as_any().downcast_ref::<SqueezedDate32Array>()
&& squeezed.field() == *field
{
let component = squeezed.to_component_array();
self.observer.on_hit_date32_expression();
if let Some(selection) = selection {
let selection_array = BooleanArray::new(selection.clone(), None);
let filtered = arrow::compute::filter(&component, &selection_array).ok()?;
return Some(filtered);
}
return Some(component);
}
None
}
async fn try_read_squeezed_variant_array(
&self,
array: &LiquidSqueezedArrayRef,
entry_id: &EntryID,
expression: Option<&CacheExpression>,
selection: Option<&BooleanBuffer>,
) -> Option<ArrayRef> {
let requests = expression.and_then(|expr| expr.variant_requests())?;
let variant_squeezed = array
.as_any()
.downcast_ref::<VariantStructSqueezedArray>()?;
let all_paths_present = requests
.iter()
.all(|request| variant_squeezed.contains_path(request.path()));
let full_array = if !all_paths_present {
let batch = CacheEntry::MemorySqueezedLiquid(array.clone());
self.observer.on_get_squeezed_needs_io();
let full_array = self.read_disk_arrow_array(entry_id).await;
self.maybe_hydrate(
entry_id,
&batch,
MaterializedEntry::Arrow(&full_array),
expression,
)
.await;
full_array
} else {
let requested_paths = requests.iter().map(|r| r.path());
variant_squeezed
.to_arrow_array_with_paths(requested_paths)
.unwrap()
};
match selection {
Some(selection) => {
let selection_array = BooleanArray::new(selection.clone(), None);
arrow::compute::filter(&full_array, &selection_array).ok()
}
None => Some(full_array),
}
}
async fn write_batch_to_disk(
&self,
entry_id: EntryID,
batch: &CacheEntry,
bytes: Bytes,
) -> Result<(), CacheFull> {
let len = bytes.len();
loop {
if self.budget.try_reserve_disk(len).is_ok() {
break;
}
let victims = self.cache_policy.find_disk_victim(8);
if victims.is_empty() {
return Err(CacheFull);
}
for victim in victims {
self.remove_disk_entry(victim).await;
}
}
self.trace(InternalEvent::IoWrite {
entry: entry_id,
kind: CachedBatchType::from(batch),
bytes: len,
});
self.store
.put(entry_id_to_key(&entry_id), bytes.to_vec())
.await
.expect("write failed");
Ok(())
}
async fn read_disk_arrow_array(&self, entry_id: &EntryID) -> ArrayRef {
let bytes = self
.store
.get(&entry_id_to_key(entry_id))
.await
.expect("read failed");
let bytes_len = bytes.len();
let cursor = std::io::Cursor::new(bytes);
let mut reader =
arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("create reader failed");
let batch = reader.next().unwrap().expect("read batch failed");
let array = batch.column(0).clone();
self.trace(InternalEvent::IoReadArrow {
entry: *entry_id,
bytes: bytes_len,
});
array
}
async fn read_disk_liquid_array(
&self,
entry_id: &EntryID,
) -> crate::liquid_array::LiquidArrayRef {
let bytes = self
.store
.get(&entry_id_to_key(entry_id))
.await
.expect("read failed");
self.trace(InternalEvent::IoReadLiquid {
entry: *entry_id,
bytes: bytes.len(),
});
let compressor_states = self.metadata.get_compressor(entry_id);
let compressor = compressor_states.fsst_compressor();
(crate::liquid_array::ipc::read_from_bytes(
Bytes::from(bytes),
&crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
)) as _
}
pub(crate) async fn eval_predicate_internal(
&self,
entry_id: &EntryID,
selection_opt: Option<&BooleanBuffer>,
predicate: &LiquidExpr,
) -> Option<BooleanArray> {
use arrow::array::BooleanArray;
self.observer.on_eval_predicate();
let batch = self.index.get(entry_id)?;
self.cache_policy
.notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
self.trace(InternalEvent::EvalPredicate {
entry: *entry_id,
selection: selection_opt.is_some(),
cached: CachedBatchType::from(batch.as_ref()),
});
match batch.as_ref() {
CacheEntry::MemoryArrow(array) => {
let mut owned = None;
let selection = selection_opt.unwrap_or_else(|| {
owned = Some(BooleanBuffer::new_set(array.len()));
owned.as_ref().unwrap()
});
let selection_array = BooleanArray::new(selection.clone(), None);
let filtered = arrow::compute::filter(array, &selection_array)
.expect("selection must match array length");
Some(self.eval_predicate_on_array(filtered, predicate))
}
entry @ CacheEntry::DiskArrow { .. } => {
let array = self.read_disk_arrow_array(entry_id).await;
self.maybe_hydrate(entry_id, entry, MaterializedEntry::Arrow(&array), None)
.await;
let mut owned = None;
let selection = selection_opt.unwrap_or_else(|| {
owned = Some(BooleanBuffer::new_set(array.len()));
owned.as_ref().unwrap()
});
let selection_array = BooleanArray::new(selection.clone(), None);
let filtered = arrow::compute::filter(&array, &selection_array)
.expect("selection must match array length");
Some(self.eval_predicate_on_array(filtered, predicate))
}
CacheEntry::MemoryLiquid(array) => {
let mut owned = None;
let selection = selection_opt.unwrap_or_else(|| {
owned = Some(BooleanBuffer::new_set(array.len()));
owned.as_ref().unwrap()
});
Some(array.try_eval_predicate(predicate, selection))
}
entry @ CacheEntry::DiskLiquid { .. } => {
let liquid = self.read_disk_liquid_array(entry_id).await;
self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
.await;
let mut owned = None;
let selection = selection_opt.unwrap_or_else(|| {
owned = Some(BooleanBuffer::new_set(liquid.len()));
owned.as_ref().unwrap()
});
Some(liquid.try_eval_predicate(predicate, selection))
}
CacheEntry::MemorySqueezedLiquid(array) => {
self.eval_predicate_on_squeezed(array, selection_opt, predicate)
.await
}
}
}
async fn eval_predicate_on_squeezed(
&self,
array: &LiquidSqueezedArrayRef,
selection_opt: Option<&BooleanBuffer>,
predicate: &LiquidExpr,
) -> Option<BooleanArray> {
let mut owned = None;
let selection = selection_opt.unwrap_or_else(|| {
owned = Some(BooleanBuffer::new_set(array.len()));
owned.as_ref().unwrap()
});
Some(array.try_eval_predicate(predicate, selection).await)
}
fn eval_predicate_on_array(&self, array: ArrayRef, predicate: &LiquidExpr) -> BooleanArray {
let schema = Arc::new(Schema::new(vec![Field::new(
"liquid_predicate_col",
array.data_type().clone(),
true,
)]));
let record_batch =
RecordBatch::try_new(schema, vec![array]).expect("single-column predicate batch");
let result = predicate
.physical_expr()
.evaluate(&record_batch)
.expect("validated LiquidExpr must evaluate");
let boolean_array = result
.into_array(record_batch.num_rows())
.expect("predicate output must be an array");
boolean_array.as_boolean().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::{
CacheEntry, CacheExpression, CachePolicy, LiquidCacheBuilder, LiquidPolicy,
TranscodeSqueezeEvict, transcode_liquid_inner,
utils::{
LiquidCompressorStates, arrow_to_bytes, create_cache_store, create_test_array,
create_test_arrow_array,
},
};
use crate::liquid_array::{
Date32Field, LiquidPrimitiveArray, LiquidSqueezedArrayRef, SqueezedDate32Array,
};
use crate::sync::thread;
use arrow::array::{Array, ArrayRef, Date32Array, Int32Array};
use arrow::datatypes::Date32Type;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct TestPolicy {
target_id: Option<EntryID>,
advice_count: AtomicUsize,
}
impl TestPolicy {
fn new(target_id: Option<EntryID>) -> Self {
Self {
target_id,
advice_count: AtomicUsize::new(0),
}
}
}
impl CachePolicy for TestPolicy {
fn find_memory_victim(&self, _cnt: usize) -> Vec<EntryID> {
self.advice_count.fetch_add(1, Ordering::SeqCst);
let id_to_use = self.target_id.unwrap();
vec![id_to_use]
}
}
#[tokio::test]
async fn test_basic_cache_operations() {
let budget_size = 10 * 1024;
let store = create_cache_store(budget_size, Box::new(LiquidPolicy::new())).await;
assert_eq!(store.budget.memory_usage_bytes(), 0);
let entry_id1: EntryID = EntryID::from(1);
let array1 = create_test_array(100);
let size1 = array1.memory_usage_bytes();
store.insert_inner(entry_id1, array1).await.unwrap();
assert_eq!(store.budget.memory_usage_bytes(), size1);
let retrieved1 = store.index().get(&entry_id1).unwrap();
match retrieved1.as_ref() {
CacheEntry::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
_ => panic!("Expected ArrowMemory"),
}
let entry_id2: EntryID = EntryID::from(2);
let array2 = create_test_array(200);
let size2 = array2.memory_usage_bytes();
store.insert_inner(entry_id2, array2).await.unwrap();
assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
let array3 = create_test_array(150);
let size3 = array3.memory_usage_bytes();
store.insert_inner(entry_id1, array3).await.unwrap();
assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
assert!(store.index().get(&EntryID::from(999)).is_none());
}
#[tokio::test]
async fn get_arrow_array_with_expression_extracts_year() {
let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
let entry_id = EntryID::from(42);
let date_values = Date32Array::from(vec![Some(2), Some(365 + 1), None, Some(365 + 100)]);
let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date_values.clone());
let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, Date32Field::Year);
let squeezed: LiquidSqueezedArrayRef = Arc::new(squeezed);
store
.insert_inner(
entry_id,
CacheEntry::memory_squeezed_liquid(squeezed.clone()),
)
.await
.unwrap();
let expr = Arc::new(CacheExpression::extract_date32(Date32Field::Year));
let result = store
.get(&entry_id)
.with_expression_hint(expr)
.read()
.await
.expect("array present");
let result = result
.as_any()
.downcast_ref::<Date32Array>()
.expect("date32 result");
assert_eq!(result.len(), 4);
assert_eq!(result.value(0), 0);
assert_eq!(result.value(1), 365);
assert!(result.is_null(2));
assert_eq!(result.value(3), 365);
}
#[tokio::test]
async fn test_cache_advice_strategies() {
let entry_id1 = EntryID::from(1);
let entry_id2 = EntryID::from(2);
{
let advisor = TestPolicy::new(Some(entry_id1));
let store = create_cache_store(8000, Box::new(advisor)).await;
store
.insert_inner(entry_id1, create_test_array(800))
.await
.unwrap();
match store.index().get(&entry_id1).unwrap().as_ref() {
CacheEntry::MemoryArrow(_) => {}
other => panic!("Expected ArrowMemory, got {other:?}"),
}
store
.insert_inner(entry_id2, create_test_array(800))
.await
.unwrap();
match store.index().get(&entry_id1).unwrap().as_ref() {
CacheEntry::MemoryLiquid(_) => {}
other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
}
}
}
#[tokio::test]
async fn test_concurrent_cache_operations() {
concurrent_cache_operations().await;
}
pub fn block_on<F: Future>(future: F) -> F::Output {
#[cfg(feature = "shuttle")]
{
shuttle::future::block_on(future)
}
#[cfg(not(feature = "shuttle"))]
{
tokio_test::block_on(future)
}
}
async fn concurrent_cache_operations() {
let num_threads = 3;
let ops_per_thread = 50;
let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
let store = create_cache_store(budget_size, Box::new(LiquidPolicy::new())).await;
let mut handles = vec![];
for thread_id in 0..num_threads {
let store = store.clone();
handles.push(thread::spawn(move || {
block_on(async {
for i in 0..ops_per_thread {
let unique_id = thread_id * ops_per_thread + i;
let entry_id: EntryID = EntryID::from(unique_id);
let array = create_test_arrow_array(100);
store.insert(entry_id, array).await.unwrap();
}
});
}));
}
for handle in handles {
handle.join().unwrap();
}
for thread_id in 0..num_threads {
for i in 0..ops_per_thread {
let unique_id = thread_id * ops_per_thread + i;
let entry_id: EntryID = EntryID::from(unique_id);
assert!(store.index().get(&entry_id).is_some());
}
}
assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
}
#[tokio::test]
async fn test_cache_stats_memory_and_disk_usage() {
let storage = LiquidCacheBuilder::new()
.with_max_memory_bytes(10 * 1024 * 1024)
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.build()
.await;
let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
storage.insert(EntryID::from(1usize), arr1).await.unwrap();
storage.insert(EntryID::from(2usize), arr2).await.unwrap();
let s = storage.stats();
assert_eq!(s.total_entries, 2);
assert!(s.memory_usage_bytes > 0);
assert_eq!(s.disk_usage_bytes, 0);
assert_eq!(s.max_memory_bytes, 10 * 1024 * 1024);
storage.flush_all_to_disk().await.unwrap();
let s2 = storage.stats();
assert_eq!(s2.total_entries, 2);
assert!(s2.disk_usage_bytes > 0);
assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
}
#[tokio::test]
async fn hydrate_disk_arrow_on_get_promotes_to_memory() {
let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
let entry_id = EntryID::from(321usize);
let array = create_test_arrow_array(8);
store.insert(entry_id, array.clone()).await.unwrap();
store.flush_all_to_disk().await.unwrap();
{
let entry = store.index().get(&entry_id).unwrap();
assert!(matches!(entry.as_ref(), CacheEntry::DiskArrow { .. }));
}
let result = store.get(&entry_id).await.expect("present");
assert_eq!(result.as_ref(), array.as_ref());
{
let entry = store.index().get(&entry_id).unwrap();
assert!(matches!(entry.as_ref(), CacheEntry::MemoryArrow(_)));
}
}
#[tokio::test]
async fn hydrate_disk_liquid_on_get_promotes_to_memory_liquid() {
let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
let entry_id = EntryID::from(322usize);
let arrow_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let compressor = LiquidCompressorStates::new();
let liquid = transcode_liquid_inner(&arrow_array, &compressor).unwrap();
store
.insert_inner(entry_id, CacheEntry::memory_liquid(liquid.clone()))
.await
.unwrap();
store.flush_all_to_disk().await.unwrap();
{
let entry = store.index().get(&entry_id).unwrap();
assert!(matches!(entry.as_ref(), CacheEntry::DiskLiquid { .. }));
}
let result = store.get(&entry_id).await.expect("present");
assert_eq!(result.as_ref(), arrow_array.as_ref());
{
let entry = store.index().get(&entry_id).unwrap();
assert!(matches!(entry.as_ref(), CacheEntry::MemoryLiquid(_)));
}
}
#[tokio::test]
async fn insert_returns_cache_full_when_memory_and_disk_are_saturated() {
let cache = LiquidCacheBuilder::new()
.with_max_memory_bytes(0)
.with_max_disk_bytes(0)
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.build()
.await;
let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
let err = cache.insert(EntryID::from(900usize), array).await;
assert_eq!(err, Err(CacheFull));
assert!(!cache.is_cached(&EntryID::from(900usize)));
}
#[tokio::test]
async fn insert_until_disk_full_then_evicts_oldest_disk_entry() {
let first_array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
let second_array: ArrayRef = Arc::new(Int32Array::from_iter_values(16..32));
let first_bytes = arrow_to_bytes(&first_array).unwrap().len();
let second_bytes = arrow_to_bytes(&second_array).unwrap().len();
let cache = LiquidCacheBuilder::new()
.with_max_memory_bytes(1 << 20)
.with_max_disk_bytes(first_bytes.max(second_bytes))
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.with_cache_policy(Box::new(LiquidPolicy::new()))
.build()
.await;
let first = EntryID::from(910usize);
let second = EntryID::from(911usize);
cache.insert(first, first_array).await.unwrap();
cache.flush_all_to_disk().await.unwrap();
assert!(cache.is_cached(&first));
cache.insert(second, second_array).await.unwrap();
cache.flush_all_to_disk().await.unwrap();
assert!(!cache.is_cached(&first));
assert!(matches!(
cache.index().get(&second).unwrap().as_ref(),
CacheEntry::DiskArrow { .. }
));
}
#[tokio::test]
async fn flush_all_to_disk_evicts_when_overflow() {
let first_array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
let second_array: ArrayRef = Arc::new(Int32Array::from_iter_values(16..32));
let disk_bytes = arrow_to_bytes(&first_array).unwrap().len();
let cache = LiquidCacheBuilder::new()
.with_max_memory_bytes(1 << 20)
.with_max_disk_bytes(disk_bytes)
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.with_cache_policy(Box::new(LiquidPolicy::new()))
.build()
.await;
let first = EntryID::from(912usize);
let second = EntryID::from(913usize);
cache.insert(first, first_array).await.unwrap();
cache.flush_all_to_disk().await.unwrap();
cache.insert(second, second_array).await.unwrap();
cache.flush_all_to_disk().await.unwrap();
assert!(!cache.is_cached(&first) || !cache.is_cached(&second));
}
#[tokio::test]
async fn disk_eviction_releases_budget() {
let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
let disk_bytes = arrow_to_bytes(&array).unwrap().len();
let cache = LiquidCacheBuilder::new()
.with_max_memory_bytes(1 << 20)
.with_max_disk_bytes(disk_bytes)
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.with_cache_policy(Box::new(LiquidPolicy::new()))
.build()
.await;
let entry = EntryID::from(914usize);
cache.insert(entry, array).await.unwrap();
cache.flush_all_to_disk().await.unwrap();
let before = cache.stats().disk_usage_bytes;
cache.remove_disk_entry(entry).await;
assert_eq!(cache.stats().disk_usage_bytes, before - disk_bytes);
assert!(!cache.is_cached(&entry));
}
#[tokio::test]
async fn flush_all_to_disk_drops_entry_on_unrecoverable_overflow() {
let cache = LiquidCacheBuilder::new()
.with_max_memory_bytes(1 << 20)
.with_max_disk_bytes(0)
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.build()
.await;
let entry_id = EntryID::from(901usize);
let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
cache.insert(entry_id, array).await.unwrap();
let result = cache.flush_all_to_disk().await;
assert_eq!(result, Ok(()));
assert!(!cache.is_cached(&entry_id));
}
}