use crate::{Bytes, drive::DriveError};
use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
use std::path::PathBuf;
#[derive(Debug, thiserror::Error)]
pub enum DiskError {
#[error(transparent)]
DbError(#[from] FjallError),
#[error("Failed to join a tokio blocking task: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("Maximum disk size reached")]
MaxSizeExceeded,
}
#[derive(Debug, Clone)]
pub struct DiskBuilder {
pub cache_size_mb: usize,
pub max_stored_mb: usize,
}
impl Default for DiskBuilder {
fn default() -> Self {
Self {
cache_size_mb: 64,
max_stored_mb: 10 * 1024, }
}
}
impl DiskBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
self.cache_size_mb = size;
self
}
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
self.max_stored_mb = max;
self
}
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
}
}
pub struct DiskStore {
#[allow(unused)]
db: Database,
keyspace: Keyspace,
max_stored: usize,
stored: usize,
}
impl DiskStore {
pub async fn new(
path: PathBuf,
cache_mb: usize,
max_stored_mb: usize,
) -> Result<Self, DiskError> {
let max_stored = max_stored_mb * 2_usize.pow(20);
let (db, keyspace) = tokio::task::spawn_blocking(move || {
let db = Database::builder(path)
.manual_journal_persist(true)
.worker_threads(1)
.cache_size(cache_mb as u64 * 2_u64.pow(20) / 2)
.temporary(true)
.open()?;
let opts = KeyspaceCreateOptions::default()
.expect_point_read_hits(true)
.max_memtable_size(16 * 2_u64.pow(20));
let keyspace = db.keyspace("z", || opts)?;
Ok::<_, DiskError>((db, keyspace))
})
.await??;
Ok(Self {
db,
keyspace,
max_stored,
stored: 0,
})
}
pub(crate) fn put_many(
&mut self,
kv: impl Iterator<Item = (Vec<u8>, Bytes)>,
) -> Result<(), DriveError> {
let mut batch = self.db.batch();
for (k, v) in kv {
self.stored += v.len();
if self.stored > self.max_stored {
return Err(DiskError::MaxSizeExceeded.into());
}
batch.insert(&self.keyspace, k, v);
}
batch.commit().map_err(DiskError::DbError)?;
Ok(())
}
#[inline]
pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
self.keyspace.get(key)
}
pub async fn reset(&self) -> Result<(), DiskError> {
let keyspace = self.keyspace.clone();
Ok(tokio::task::spawn_blocking(move || keyspace.clear()).await??)
}
}