#![cfg_attr(not(test), warn(missing_docs))]
use crate::{
dynamic::{ArchivedDBData, DynVec, LeanVec},
storage::buffer_cache::{FBuf, FBufSerializer},
};
use rkyv::{
AlignedBytes,
de::deserializers::SharedDeserializeMap,
ser::{
ScratchSpace,
serializers::{
AllocScratchError, BufferScratch, CompositeSerializerError, SharedSerializeMapError,
},
},
};
use rkyv::{
Archive, Archived, Deserialize, Fallible, Serialize,
ser::{Serializer as _, serializers::AllocScratch},
};
use rkyv::{
de::{SharedDeserializeRegistry, SharedPointer},
ser::SharedSerializeRegistry,
};
use std::{
alloc::{Layout, alloc_zeroed},
cell::RefCell,
collections::{HashMap, hash_map::Entry},
};
use std::{any::Any, sync::Arc};
use std::{fmt::Debug, ptr::NonNull};
mod filter;
pub mod format;
mod item;
pub mod reader;
pub mod writer;
use crate::{
DBData,
dynamic::{DataTrait, Erase, Factory, WithFactory},
storage::file::item::RefTup2Factory,
};
pub use filter::BatchKeyFilter;
pub use filter::FilterPlan;
pub use filter::TrackingRoaringBitmap;
pub use filter::{FilterKind, FilterStats, TrackingFilterStats};
pub(crate) use filter::{TouchedWindowCounter, collect_roaring_metadata};
pub use format::TouchedWindowCount;
pub use item::{ArchivedItem, Item, ItemFactory, WithItemFactory};
const BLOOM_FILTER_SEED: u128 = 42;
pub const BLOOM_FILTER_FALSE_POSITIVE_RATE: f64 = 0.0001;
pub struct Factories<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
pub key_factory: &'static dyn Factory<K>,
pub item_factory: &'static dyn ItemFactory<K, A>,
pub keys_factory: &'static dyn Factory<DynVec<K>>,
pub auxes_factory: &'static dyn Factory<DynVec<A>>,
}
impl<K, A> Clone for Factories<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn clone(&self) -> Self {
Self {
key_factory: self.key_factory,
item_factory: self.item_factory,
keys_factory: self.keys_factory,
auxes_factory: self.auxes_factory,
}
}
}
impl<K, A> Factories<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
pub fn new<KType, AType>() -> Self
where
KType: DBData + Erase<K>,
AType: DBData + Erase<A>,
{
Self {
key_factory: WithFactory::<KType>::FACTORY,
item_factory: <RefTup2Factory<KType, AType> as WithItemFactory<K, A>>::ITEM_FACTORY,
keys_factory: WithFactory::<LeanVec<KType>>::FACTORY,
auxes_factory: WithFactory::<LeanVec<AType>>::FACTORY,
}
}
pub fn any_factories(&self) -> AnyFactories {
AnyFactories {
key_factory: Arc::new(self.key_factory),
item_factory: Arc::new(self.item_factory),
keys_factory: Arc::new(self.keys_factory),
auxes_factory: Arc::new(self.auxes_factory),
}
}
}
#[derive(Clone)]
pub struct AnyFactories {
key_factory: Arc<dyn Any + Send + Sync + 'static>,
item_factory: Arc<dyn Any + Send + Sync + 'static>,
keys_factory: Arc<dyn Any + Send + Sync + 'static>,
auxes_factory: Arc<dyn Any + Send + Sync + 'static>,
}
impl Debug for AnyFactories {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AnyFactories").finish()
}
}
impl AnyFactories {
fn key_factory<K>(&self) -> &'static dyn Factory<K>
where
K: DataTrait + ?Sized,
{
*self
.key_factory
.as_ref()
.downcast_ref::<&'static dyn Factory<K>>()
.unwrap()
}
fn item_factory<K, A>(&self) -> &'static dyn ItemFactory<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
*self
.item_factory
.as_ref()
.downcast_ref::<&'static dyn ItemFactory<K, A>>()
.unwrap()
}
fn keys_factory<K>(&self) -> &'static dyn Factory<DynVec<K>>
where
K: DataTrait + ?Sized,
{
*self
.keys_factory
.as_ref()
.downcast_ref::<&'static dyn Factory<DynVec<K>>>()
.unwrap()
}
fn auxes_factory<K>(&self) -> &'static dyn Factory<DynVec<K>>
where
K: DataTrait + ?Sized,
{
*self
.auxes_factory
.as_ref()
.downcast_ref::<&'static dyn Factory<DynVec<K>>>()
.unwrap()
}
fn factories<K, A>(&self) -> Factories<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
Factories {
key_factory: self.key_factory(),
item_factory: self.item_factory(),
keys_factory: self.keys_factory(),
auxes_factory: self.auxes_factory(),
}
}
}
pub trait Rkyv: Archive + for<'a> Serialize<DbspSerializer<'a>> + Deserializable {}
impl<T> Rkyv for T where T: Archive + for<'a> Serialize<DbspSerializer<'a>> + Deserializable {}
pub trait Deserializable: Archive<Archived = Self::ArchivedDeser> + Sized {
type ArchivedDeser: Deserialize<Self, Deserializer>;
}
impl<T: Archive> Deserializable for T
where
Archived<T>: Deserialize<T, Deserializer>,
{
type ArchivedDeser = Archived<T>;
}
pub struct SerializerInner {
scratch: DbspScratch,
shared_resolvers: HashMap<*const u8, usize>,
}
impl SerializerInner {
pub fn new() -> Self {
Self {
scratch: DbspScratch::default(),
shared_resolvers: HashMap::new(),
}
}
pub fn with<F, R>(&mut self, serializer: FBufSerializer<&mut FBuf>, f: F) -> R
where
F: FnOnce(&mut DbspSerializer) -> R,
{
self.scratch.clear();
self.shared_resolvers.clear();
let mut serializer = DbspSerializer {
serializer,
inner: self,
};
f(&mut serializer)
}
pub fn with_thread_local<F, R>(serializer: FBufSerializer<&mut FBuf>, f: F) -> R
where
F: FnOnce(&mut DbspSerializer) -> R,
{
thread_local! {
static INNER: RefCell<SerializerInner> = RefCell::new(SerializerInner::new());
}
INNER.with_borrow_mut(|inner| inner.with(serializer, f))
}
pub fn to_fbuf_with_thread_local<F, E, R>(f: F) -> FBuf
where
F: FnOnce(&mut DbspSerializer) -> Result<R, E>,
E: Debug,
{
let mut fbuf = FBuf::default();
Self::with_thread_local(FBufSerializer::new(&mut fbuf), f).unwrap();
fbuf
}
}
impl Default for SerializerInner {
fn default() -> Self {
Self::new()
}
}
unsafe impl Send for SerializerInner {}
unsafe impl Sync for SerializerInner {}
pub struct DbspSerializer<'a> {
serializer: FBufSerializer<&'a mut FBuf>,
inner: &'a mut SerializerInner,
}
impl Fallible for DbspSerializer<'_> {
type Error = CompositeSerializerError<
<FBufSerializer<FBuf> as Fallible>::Error,
<DbspScratch as Fallible>::Error,
SharedSerializeMapError,
>;
}
impl rkyv::ser::Serializer for DbspSerializer<'_> {
fn pos(&self) -> usize {
self.serializer.pos()
}
fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> {
self.serializer
.write(bytes)
.map_err(CompositeSerializerError::SerializerError)
}
}
impl ScratchSpace for DbspSerializer<'_> {
unsafe fn push_scratch(&mut self, layout: Layout) -> Result<NonNull<[u8]>, Self::Error> {
unsafe { self.inner.scratch.push_scratch(layout) }
.map_err(CompositeSerializerError::ScratchSpaceError)
}
unsafe fn pop_scratch(&mut self, ptr: NonNull<u8>, layout: Layout) -> Result<(), Self::Error> {
unsafe { self.inner.scratch.pop_scratch(ptr, layout) }
.map_err(CompositeSerializerError::ScratchSpaceError)
}
}
impl SharedSerializeRegistry for DbspSerializer<'_> {
fn get_shared_ptr(&self, value: *const u8) -> Option<usize> {
self.inner.shared_resolvers.get(&value).copied()
}
fn add_shared_ptr(&mut self, value: *const u8, pos: usize) -> Result<(), Self::Error> {
match self.inner.shared_resolvers.entry(value) {
Entry::Occupied(_) => Err(CompositeSerializerError::SharedError(
SharedSerializeMapError::DuplicateSharedPointer(value),
)),
Entry::Vacant(e) => {
e.insert(pos);
Ok(())
}
}
}
}
pub struct DbspScratch {
main: BufferScratch<Box<AlignedBytes<SCRATCH_SIZE>>>,
fallback: ReusableAllocScratch,
}
impl Default for DbspScratch {
fn default() -> Self {
Self::new()
}
}
impl DbspScratch {
fn new() -> Self {
Self {
main: {
let layout = Layout::new::<AlignedBytes<SCRATCH_SIZE>>();
unsafe {
let ptr = alloc_zeroed(layout).cast::<AlignedBytes<SCRATCH_SIZE>>();
assert!(!ptr.is_null());
BufferScratch::new(Box::from_raw(ptr))
}
},
fallback: ReusableAllocScratch::new(),
}
}
fn clear(&mut self) {
self.main.clear();
self.fallback.clear();
}
}
impl Fallible for DbspScratch {
type Error = <AllocScratch as Fallible>::Error;
}
impl ScratchSpace for DbspScratch {
#[inline]
unsafe fn push_scratch(&mut self, layout: Layout) -> Result<NonNull<[u8]>, Self::Error> {
unsafe {
self.main
.push_scratch(layout)
.or_else(|_| self.fallback.push_scratch(layout))
}
}
#[inline]
unsafe fn pop_scratch(&mut self, ptr: NonNull<u8>, layout: Layout) -> Result<(), Self::Error> {
unsafe {
self.main
.pop_scratch(ptr, layout)
.or_else(|_| self.fallback.pop_scratch(ptr, layout))
}
}
}
#[derive(Debug, Default)]
struct ReusableAllocScratch {
allocations: Vec<(*mut u8, Layout)>,
}
unsafe impl Send for ReusableAllocScratch {}
unsafe impl Sync for ReusableAllocScratch {}
impl ReusableAllocScratch {
fn new() -> Self {
Self::default()
}
fn clear(&mut self) {
for (ptr, layout) in self.allocations.drain(..).rev() {
unsafe {
std::alloc::dealloc(ptr, layout);
}
}
}
}
impl Drop for ReusableAllocScratch {
fn drop(&mut self) {
self.clear();
}
}
impl Fallible for ReusableAllocScratch {
type Error = AllocScratchError;
}
impl ScratchSpace for ReusableAllocScratch {
#[inline]
unsafe fn push_scratch(&mut self, layout: Layout) -> Result<NonNull<[u8]>, Self::Error> {
let result_ptr = unsafe { std::alloc::alloc(layout) };
assert!(!result_ptr.is_null());
self.allocations.push((result_ptr, layout));
let result_slice = ptr_meta::from_raw_parts_mut(result_ptr.cast(), layout.size());
let result = unsafe { NonNull::new_unchecked(result_slice) };
Ok(result)
}
#[inline]
unsafe fn pop_scratch(&mut self, ptr: NonNull<u8>, layout: Layout) -> Result<(), Self::Error> {
if let Some(&(last_ptr, last_layout)) = self.allocations.last() {
if ptr.as_ptr() == last_ptr && layout == last_layout {
unsafe { std::alloc::dealloc(ptr.as_ptr(), layout) };
self.allocations.pop();
Ok(())
} else {
Err(AllocScratchError::NotPoppedInReverseOrder {
expected: last_ptr,
expected_layout: last_layout,
actual: ptr.as_ptr(),
actual_layout: layout,
})
}
} else {
Err(AllocScratchError::NoAllocationsToPop)
}
}
}
#[derive(Debug)]
pub struct Deserializer {
version: u32,
inner: SharedDeserializeMap,
}
impl Deserializer {
pub fn new(version: u32) -> Self {
assert!(
version >= format::MIN_SUPPORTED_VERSION,
"Unable to read checkpoint data with unsupported old storage format version {version} on this feldera version.",
);
Self {
version,
inner: SharedDeserializeMap::new(),
}
}
pub fn with_capacity(version: u32, capacity: usize) -> Self {
Self {
version,
inner: SharedDeserializeMap::with_capacity(capacity),
}
}
pub fn version(&self) -> u32 {
self.version
}
}
impl Default for Deserializer {
fn default() -> Self {
Self::new(format::VERSION_NUMBER)
}
}
impl Fallible for Deserializer {
type Error = <SharedDeserializeMap as Fallible>::Error;
}
impl SharedDeserializeRegistry for Deserializer {
fn get_shared_ptr(&mut self, ptr: *const u8) -> Option<&dyn SharedPointer> {
self.inner.get_shared_ptr(ptr)
}
fn add_shared_ptr(
&mut self,
ptr: *const u8,
shared: Box<dyn SharedPointer>,
) -> Result<(), Self::Error> {
self.inner.add_shared_ptr(ptr, shared)
}
}
pub const SCRATCH_SIZE: usize = 65536;
pub fn to_bytes<T>(value: &T) -> Result<FBuf, <DbspSerializer<'_> as Fallible>::Error>
where
T: for<'a> Serialize<DbspSerializer<'a>>,
{
Ok(SerializerInner::to_fbuf_with_thread_local(|serializer| {
serializer.serialize_value(value)
}))
}
pub fn to_bytes_dyn<T>(value: &T) -> Result<FBuf, <DbspSerializer<'_> as Fallible>::Error>
where
T: ArchivedDBData,
{
Ok(SerializerInner::to_fbuf_with_thread_local(|serializer| {
serializer.serialize_value(value)
}))
}
#[cfg(test)]
mod test;