use async_trait::async_trait;
use bytes::Bytes;
use fslock::LockFile;
use serde::{Deserialize, Serialize};
use std::ffi::OsStr;
use std::path::Path;
use std::time::Duration;
use std::{fs, path::PathBuf};
use tokio::time;
use crate::kernel::io::FileExtension;
use crate::KernelError;
pub mod io;
pub mod lsm;
#[cfg(feature = "rocksdb")]
pub mod rocksdb_storage;
#[cfg(feature = "sled")]
pub mod sled_storage;
pub mod utils;
pub type KernelResult<T> = std::result::Result<T, KernelError>;
pub(crate) const DEFAULT_LOCK_FILE: &str = "KipDB.lock";
#[async_trait]
pub trait Storage: Send + Sync + 'static + Sized {
fn name() -> &'static str
where
Self: Sized;
async fn open(path: impl Into<PathBuf> + Send) -> KernelResult<Self>;
async fn flush(&self) -> KernelResult<()>;
async fn set(&self, key: Bytes, value: Bytes) -> KernelResult<()>;
async fn get(&self, key: &[u8]) -> KernelResult<Option<Bytes>>;
async fn remove(&self, key: &[u8]) -> KernelResult<()>;
async fn size_of_disk(&self) -> KernelResult<u64>;
async fn len(&self) -> KernelResult<usize>;
async fn is_empty(&self) -> bool;
}
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
#[non_exhaustive]
pub enum CommandData {
Set { key: Vec<u8>, value: Vec<u8> },
Remove { key: Vec<u8> },
Get { key: Vec<u8> },
}
impl CommandData {
#[inline]
pub fn get_key(&self) -> &Vec<u8> {
match self {
CommandData::Set { key, .. } => key,
CommandData::Remove { key } => key,
CommandData::Get { key } => key,
}
}
#[inline]
pub fn get_key_clone(&self) -> Vec<u8> {
self.get_key().clone()
}
#[inline]
pub fn get_key_owner(self) -> Vec<u8> {
match self {
CommandData::Set { key, .. } => key,
CommandData::Remove { key } => key,
CommandData::Get { key } => key,
}
}
#[inline]
pub fn get_value(&self) -> Option<&Vec<u8>> {
match self {
CommandData::Set { value, .. } => Some(value),
CommandData::Remove { .. } | CommandData::Get { .. } => None,
}
}
#[inline]
pub fn get_value_clone(&self) -> Option<Vec<u8>> {
match self {
CommandData::Set { value, .. } => Some(Vec::clone(value)),
CommandData::Remove { .. } | CommandData::Get { .. } => None,
}
}
#[inline]
pub fn bytes_len(&self) -> usize {
self.get_key().len()
+ self.get_value().map_or(0, Vec::len)
+ match self {
CommandData::Set { .. } => 20,
CommandData::Remove { .. } => 12,
CommandData::Get { .. } => 12,
}
}
#[inline]
pub fn set(key: Vec<u8>, value: Vec<u8>) -> Self {
Self::Set { key, value }
}
#[inline]
pub fn remove(key: Vec<u8>) -> Self {
Self::Remove { key }
}
#[inline]
pub fn get(key: Vec<u8>) -> Self {
Self::Get { key }
}
}
fn sorted_gen_list(file_path: &Path, extension: FileExtension) -> KernelResult<Vec<i64>> {
let mut gen_list: Vec<i64> = fs::read_dir(file_path)?
.flat_map(|res| -> KernelResult<_> { Ok(res?.path()) })
.filter(|path| {
path.is_file() && path.extension() == Some(extension.extension_str().as_ref())
})
.flat_map(|path| {
path.file_name()
.and_then(OsStr::to_str)
.map(|s| s.trim_end_matches(format!(".{}", extension.extension_str()).as_str()))
.map(str::parse::<i64>)
})
.flatten()
.collect();
gen_list.sort_unstable();
Ok(gen_list)
}
async fn lock_or_time_out(path: &PathBuf) -> KernelResult<LockFile> {
let mut lock_file = LockFile::open(path)?;
let mut backoff = 1;
loop {
if lock_file.try_lock()? {
return Ok(lock_file);
} else if backoff > 4 {
return Err(KernelError::ProcessExists);
} else {
time::sleep(Duration::from_millis(backoff * 100)).await;
backoff *= 2;
}
}
}