use std::fmt::Debug;
use bytes::Bytes;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescriptor;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PageKey(u64);
impl PageKey {
pub const fn new(raw: u64) -> Self {
Self(raw)
}
pub const fn get(self) -> u64 {
self.0
}
}
pub trait PageStore: Send {
fn put(&mut self, value: Bytes) -> Result<PageKey>;
fn take(&mut self, key: PageKey) -> Result<Bytes>;
fn memory_size(&self) -> usize {
0
}
}
pub struct PageStoreArgs<'a> {
column_index: usize,
column_descriptor: &'a ColumnDescriptor,
}
impl<'a> PageStoreArgs<'a> {
#[cfg(feature = "arrow")]
pub(crate) fn new(column_index: usize, column_descriptor: &'a ColumnDescriptor) -> Self {
Self {
column_index,
column_descriptor,
}
}
pub fn column_index(&self) -> usize {
self.column_index
}
pub fn column_descriptor(&self) -> &ColumnDescriptor {
self.column_descriptor
}
}
pub trait PageStoreFactory: Send + Sync + Debug {
fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
}
#[derive(Debug, Default)]
pub struct InMemoryPageStore {
blobs: Vec<Bytes>,
resident: usize,
}
impl PageStore for InMemoryPageStore {
fn put(&mut self, value: Bytes) -> Result<PageKey> {
let key = PageKey(self.blobs.len() as u64);
self.resident += value.len();
self.blobs.push(value);
Ok(key)
}
fn take(&mut self, key: PageKey) -> Result<Bytes> {
let blob = self
.blobs
.get_mut(key.0 as usize)
.map(std::mem::take)
.ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?;
self.resident -= blob.len();
Ok(blob)
}
fn memory_size(&self) -> usize {
self.resident
}
}
#[derive(Debug, Default)]
pub struct InMemoryPageStoreFactory;
impl PageStoreFactory for InMemoryPageStoreFactory {
fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
Ok(Box::new(InMemoryPageStore::default()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn in_memory_round_trips_blobs_in_handle_order() {
let mut store = InMemoryPageStore::default();
let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
let k1 = store.put(Bytes::from_static(b"world")).unwrap();
assert_ne!(k0, k1);
assert_eq!(&store.take(k0).unwrap()[..], b"hello");
assert_eq!(&store.take(k1).unwrap()[..], b"world");
}
#[test]
fn in_memory_take_releases_the_slot() {
let mut store = InMemoryPageStore::default();
let k = store.put(Bytes::from_static(b"abc")).unwrap();
assert_eq!(&store.take(k).unwrap()[..], b"abc");
assert!(store.take(k).unwrap().is_empty());
}
#[test]
fn in_memory_invalid_key_errors() {
let mut store = InMemoryPageStore::default();
assert!(store.take(PageKey(99)).is_err());
}
#[test]
fn in_memory_reports_resident_bytes() {
let mut store = InMemoryPageStore::default();
assert_eq!(store.memory_size(), 0);
let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
let k1 = store.put(Bytes::from_static(b"!")).unwrap();
assert_eq!(store.memory_size(), 6);
store.take(k0).unwrap();
assert_eq!(store.memory_size(), 1);
store.take(k1).unwrap();
assert_eq!(store.memory_size(), 0);
}
#[test]
fn default_store_memory_size_is_zero() {
struct OffHeap;
impl PageStore for OffHeap {
fn put(&mut self, _value: Bytes) -> Result<PageKey> {
Ok(PageKey::new(0))
}
fn take(&mut self, _key: PageKey) -> Result<Bytes> {
Ok(Bytes::new())
}
}
assert_eq!(OffHeap.memory_size(), 0);
}
}