liquid-cache 0.1.12

10x lower latency for cloud-native DataFusion
Documentation
use std::{fmt::Debug, ops::Range, path::PathBuf};

use ahash::AHashMap;
use bytes::Bytes;

use crate::sync::{Arc, RwLock};
use crate::{
    cache::{
        CacheExpression, Observer,
        observer::InternalEvent,
        utils::{EntryID, LiquidCompressorStates},
    },
    liquid_array::SqueezeIoHandler,
};

/// A trait for objects that can handle IO operations for the cache.
#[async_trait::async_trait]
pub trait IoContext: Debug + Send + Sync {
    /// Add a squeeze hint for an entry.
    fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
        // Do nothing by default
    }

    /// Get the squeeze hint for an entry.
    /// If None, the entry will be evicted to disk entirely.
    /// If Some, the entry will be squeezed according to the cache expressions previously recorded for this column.
    /// For example, if expression is ExtractDate32 { field: Date32Field::Year },
    /// the entry will be squeezed to a [crate::liquid_array::SqueezedDate32Array] with the year
    /// component (Date32 or Timestamp input).
    fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
        None
    }

    /// Get the compressor for an entry.
    fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;

    /// Get the disk path for a cache entry.
    fn disk_path(&self, entry_id: &EntryID) -> PathBuf;

    /// Read bytes from the file at the given path, optionally restricted to the provided range.
    async fn read(&self, path: PathBuf, range: Option<Range<u64>>)
    -> Result<Bytes, std::io::Error>;

    /// Write the entire buffer to a file at the given path.
    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error>;
}

/// A default implementation of [IoContext] that uses the default compressor.
/// It uses tokio's async IO by default.
#[derive(Debug)]
pub struct DefaultIoContext {
    compressor_state: Arc<LiquidCompressorStates>,
    squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
    base_dir: PathBuf,
}

impl DefaultIoContext {
    /// Create a new instance of [DefaultIoContext].
    pub fn new(base_dir: PathBuf) -> Self {
        Self {
            compressor_state: Arc::new(LiquidCompressorStates::new()),
            base_dir,
            squeeze_hints: RwLock::new(AHashMap::new()),
        }
    }
}

#[async_trait::async_trait]
impl IoContext for DefaultIoContext {
    fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
        let mut guard = self.squeeze_hints.write().unwrap();
        guard.insert(*entry_id, expression);
    }

    fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
        let guard = self.squeeze_hints.read().unwrap();
        guard.get(entry_id).cloned()
    }

    fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
        self.compressor_state.clone()
    }

    fn disk_path(&self, entry_id: &EntryID) -> PathBuf {
        self.base_dir
            .join(format!("{:016x}.liquid", usize::from(*entry_id)))
    }

    async fn read(
        &self,
        path: PathBuf,
        range: Option<Range<u64>>,
    ) -> Result<Bytes, std::io::Error> {
        if cfg!(test) {
            let mut file = std::fs::File::open(path)?;
            match range {
                Some(range) => {
                    let len = (range.end - range.start) as usize;
                    let mut bytes = vec![0u8; len];
                    std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(range.start))?;
                    std::io::Read::read_exact(&mut file, &mut bytes)?;
                    Ok(Bytes::from(bytes))
                }
                None => {
                    let mut bytes = Vec::new();
                    std::io::Read::read_to_end(&mut file, &mut bytes)?;
                    Ok(Bytes::from(bytes))
                }
            }
        } else {
            use tokio::io::AsyncReadExt;
            use tokio::io::AsyncSeekExt;
            let mut file = tokio::fs::File::open(path).await?;

            match range {
                Some(range) => {
                    let len = (range.end - range.start) as usize;
                    let mut bytes = vec![0u8; len];
                    file.seek(tokio::io::SeekFrom::Start(range.start)).await?;
                    file.read_exact(&mut bytes).await?;
                    Ok(Bytes::from(bytes))
                }
                None => {
                    let mut bytes = Vec::new();
                    file.read_to_end(&mut bytes).await?;
                    Ok(Bytes::from(bytes))
                }
            }
        }
    }

    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
        if cfg!(test) {
            std::fs::write(path, data.as_ref())?;
            Ok(())
        } else {
            use tokio::io::AsyncWriteExt;
            let mut file = tokio::fs::File::create(path).await?;
            file.write_all(&data).await?;
            file.sync_all().await?;
            Ok(())
        }
    }
}

/// A default implementation of [SqueezeIoHandler] that uses the default [IoContext].
#[derive(Debug)]
pub struct DefaultSqueezeIo {
    io_context: Arc<dyn IoContext>,
    entry_id: EntryID,
    observer: Arc<Observer>,
}

impl DefaultSqueezeIo {
    /// Create a new instance of [DefaultSqueezeIo].
    pub fn new(io_context: Arc<dyn IoContext>, entry_id: EntryID, observer: Arc<Observer>) -> Self {
        Self {
            io_context,
            entry_id,
            observer,
        }
    }
}

#[async_trait::async_trait]
impl SqueezeIoHandler for DefaultSqueezeIo {
    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
        let path = self.io_context.disk_path(&self.entry_id);
        let bytes = self.io_context.read(path, range).await?;
        self.observer
            .record_internal(InternalEvent::IoReadSqueezedBacking {
                entry: self.entry_id,
                bytes: bytes.len(),
            });
        Ok(bytes)
    }

    fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
        self.observer
            .record_internal(InternalEvent::DecompressSqueezed {
                entry: self.entry_id,
                decompressed: decompress_cnt,
                total: total_cnt,
            });
    }

    fn trace_io_saved(&self) {
        self.observer.runtime_stats().incr_squeeze_io_saved();
    }
}

#[cfg(test)]
#[derive(Debug, Default)]
pub(crate) struct TestSqueezeIo {
    bytes: std::sync::Mutex<Option<Bytes>>,
    reads: std::sync::atomic::AtomicUsize,
}

#[cfg(test)]
impl TestSqueezeIo {
    pub(crate) fn set_bytes(&self, bytes: Bytes) {
        *self.bytes.lock().unwrap() = Some(bytes);
    }

    pub(crate) fn reads(&self) -> usize {
        self.reads.load(std::sync::atomic::Ordering::SeqCst)
    }

    pub(crate) fn reset_reads(&self) {
        self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
    }
}

#[cfg(test)]
#[async_trait::async_trait]
impl SqueezeIoHandler for TestSqueezeIo {
    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
        self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        let bytes = self
            .bytes
            .lock()
            .unwrap()
            .clone()
            .expect("test squeeze backing set");
        Ok(match range {
            Some(range) => bytes.slice(range.start as usize..range.end as usize),
            None => bytes,
        })
    }
}