#[cfg(feature = "chunking")]
use crate::cdc::{ContentDefinedChunker, ContentDefinedChunkerConfig};
use crate::{
block::{ChunkingBlockStoreAdapter, DataStore},
chunker::FixedSizeChunker,
settings::{Chunking, Compression, Settings},
};
#[cfg(feature = "compression")]
use crate::{
block::{EncodingBlockStore, EncodingMode},
compression::ZstdEncoder,
};
#[derive(Debug)]
pub struct SqlStore<'conn> {
pub(super) db: rusqlite::Savepoint<'conn>,
pub(super) settings: Settings,
pub(super) block_buf: Vec<u8>,
}
impl<'conn> SqlStore<'conn> {
pub fn new_savepoint<'a>(&'a mut self) -> crate::Result<rusqlite::Savepoint<'a>> {
Ok(self.db.savepoint()?)
}
pub fn new(db: rusqlite::Savepoint<'conn>, settings: Settings) -> Self {
Self {
db,
block_buf: Vec::with_capacity(match &settings.chunking {
Chunking::Fixed { size } => *size,
Chunking::FastCdc { max_size, .. } => *max_size,
}),
settings,
}
}
pub fn data_store<'a>(&'a mut self) -> Box<dyn DataStore + 'a> {
let settings = self.settings.clone();
match (self.settings.chunking, self.settings.compression) {
(Chunking::Fixed { size }, Compression::None) => {
#[cfg(feature = "compression")]
return Box::new(ChunkingBlockStoreAdapter::new(
EncodingBlockStore::new(self, ZstdEncoder::new(1), EncodingMode::DecodeOnly),
FixedSizeChunker::new(size as u32),
settings.clone(),
));
#[cfg(not(feature = "compression"))]
Box::new(ChunkingBlockStoreAdapter::new(
self,
FixedSizeChunker::new(size as u32),
settings.clone(),
))
}
#[cfg(feature = "compression")]
(Chunking::Fixed { size }, Compression::Zstd { level }) => {
Box::new(ChunkingBlockStoreAdapter::new(
EncodingBlockStore::new(
self,
ZstdEncoder::new(level),
EncodingMode::EncodeDecode,
),
FixedSizeChunker::new(size as u32),
settings.clone(),
))
}
#[cfg(feature = "chunking")]
(
Chunking::FastCdc {
min_size,
avg_size,
max_size,
},
Compression::None,
) => {
#[cfg(feature = "compression")]
return Box::new(ChunkingBlockStoreAdapter::new(
EncodingBlockStore::new(self, ZstdEncoder::new(1), EncodingMode::DecodeOnly),
ContentDefinedChunker::new(ContentDefinedChunkerConfig {
min_size: min_size as u32,
avg_size: avg_size as u32,
max_size: max_size as u32,
}),
settings.clone(),
));
#[cfg(not(feature = "compression"))]
Box::new(ChunkingBlockStoreAdapter::new(
self,
ContentDefinedChunker::new(ContentDefinedChunkerConfig {
min_size: min_size as u32,
avg_size: avg_size as u32,
max_size: max_size as u32,
}),
settings.clone(),
))
}
#[cfg(all(feature = "chunking", feature = "compression"))]
(
Chunking::FastCdc {
min_size,
avg_size,
max_size,
},
Compression::Zstd { level },
) => Box::new(ChunkingBlockStoreAdapter::new(
EncodingBlockStore::new(self, ZstdEncoder::new(level), EncodingMode::EncodeDecode),
ContentDefinedChunker::new(ContentDefinedChunkerConfig {
min_size: min_size as u32,
avg_size: avg_size as u32,
max_size: max_size as u32,
}),
settings.clone(),
)),
#[cfg(any(not(feature = "chunking"), not(feature = "compression")))]
_ => {
unreachable!(
"By this point, we should have already checked whether the relevant features are enabled."
);
}
}
}
}
#[derive(Debug)]
pub struct SqlStoreGuard<'conn> {
store: SqlStore<'conn>,
}
impl<'conn> SqlStoreGuard<'conn> {
pub fn new(store: SqlStore<'conn>) -> Self {
Self { store }
}
#[cfg(any(feature = "compression", feature = "chunking"))]
pub fn update_settings(&mut self, settings: &Settings) {
self.store.settings = settings.clone();
}
pub fn exec<T, F>(&mut self, f: F) -> crate::Result<T>
where
F: FnOnce(&mut SqlStore) -> crate::Result<T>,
{
let settings = self.store.settings.clone();
let new_savepoint = self.store.new_savepoint()?;
let mut inner_store = SqlStore::new(new_savepoint, settings);
match f(&mut inner_store) {
Ok(result) => {
inner_store.commit()?;
Ok(result)
}
Err(err) => Err(err),
}
}
}