use crate::Hash;
use crate::lock_guard::acquire_lock;
use futures::future;
use rogue_logging::Failure;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use thiserror::Error as ThisError;
use tokio::fs::{read, read_dir, write};
use tokio::task;
use tracing::{debug, trace};
const CHUNK_FILE_EXTENSION: &str = "yml";
pub struct Table<const K: usize, const C: usize, T> {
pub(crate) directory: PathBuf,
pub phantom: PhantomData<T>,
}
impl<const K: usize, const C: usize, T> Table<K, C, T> {
#[must_use]
pub fn new(directory: impl Into<PathBuf>) -> Self {
Self {
directory: directory.into(),
phantom: PhantomData,
}
}
fn get_chunk_path(&self, hash: Hash<C>) -> PathBuf {
self.directory
.join(format!("{hash}.{CHUNK_FILE_EXTENSION}"))
}
}
impl<const K: usize, const C: usize, T> Default for Table<K, C, T> {
fn default() -> Self {
Self::new(PathBuf::new())
}
}
impl<const K: usize, const C: usize, T> Table<K, C, T>
where
T: Clone + DeserializeOwned,
{
pub async fn get(&self, hash: Hash<K>) -> Result<Option<T>, Failure<TableAction>> {
let chunk_path = self.get_chunk_path(get_chunk_hash(hash));
if chunk_path.exists() {
let chunk = read_chunk::<K, C, T>(&chunk_path)
.await
.map_err(Failure::wrap(TableAction::Get))?;
let item = chunk.get(&hash).cloned();
trace!(hash = %hash, found = item.is_some(), "Get item");
Ok(item)
} else {
trace!(hash = %hash, found = false, "Get item");
Ok(None)
}
}
pub async fn get_all(&self) -> Result<BTreeMap<Hash<K>, T>, Failure<TableAction>> {
let mut items = BTreeMap::new();
let dir_path = self.directory.clone();
let mut dir = read_dir(&self.directory)
.await
.map_err(Failure::wrap_with_path(TableAction::ReadDir, &dir_path))?;
while let Some(entry) = dir
.next_entry()
.await
.map_err(Failure::wrap(TableAction::ReadEntry))?
{
let path = entry.path();
let extension = path
.extension()
.unwrap_or_default()
.to_string_lossy()
.to_string();
if !path.is_file() || extension != CHUNK_FILE_EXTENSION {
trace!("Skipping non-chunk file: {}", path.display());
continue;
}
let chunk = read_chunk::<K, C, T>(&path)
.await
.map_err(Failure::wrap(TableAction::GetAll))?;
items.extend(chunk);
}
trace!(count = items.len(), "Get all items");
Ok(items)
}
}
impl<const K: usize, const C: usize, T> Table<K, C, T>
where
T: Clone + Send + Serialize + DeserializeOwned + 'static,
{
pub async fn set(&self, hash: Hash<K>, item: T) -> Result<(), Failure<TableAction>> {
trace!(hash = %hash, "Set item");
let chunk_path = self.get_chunk_path(get_chunk_hash(hash));
let _lock = acquire_lock(&chunk_path)
.await
.map_err(Failure::wrap(TableAction::Set))?;
let mut chunk = if chunk_path.exists() {
read_chunk::<K, C, T>(&chunk_path)
.await
.map_err(Failure::wrap(TableAction::Set))?
} else {
BTreeMap::new()
};
chunk.insert(hash, item.clone());
write_chunk::<K, C, T>(&chunk_path, chunk)
.await
.map_err(Failure::wrap(TableAction::Set))?;
Ok(())
}
pub async fn set_many(
&self,
items: BTreeMap<Hash<K>, T>,
replace: bool,
) -> Result<usize, Failure<TableAction>> {
let item_count = items.len();
let chunks = group_by_chunk(items);
let chunk_count = chunks.len();
trace!(
items = item_count,
chunks = chunk_count,
replace,
"Set many items"
);
let futures = chunks.into_iter().map(|(chunk_hash, new_chunk)| {
let chunk_path = self.get_chunk_path(chunk_hash);
task::spawn(
async move { update_chunk::<K, C, T>(chunk_path, new_chunk, replace).await },
)
});
let results = future::join_all(futures).await;
let mut added = 0;
let mut errors = Vec::new();
for result in results {
match result {
Ok(Ok(count)) => added += count,
Ok(Err(e)) => errors.push(e),
Err(source) => errors.push(Failure::new(TableAction::JoinTask, source)),
}
}
if errors.is_empty() {
trace!(added, "Set many items complete");
Ok(added)
} else {
let succeeded = chunk_count - errors.len();
let failed = errors.len();
trace!(succeeded, failed, "Set many items complete");
let mut failure = Failure::from_action(TableAction::SetMany)
.with("succeeded", succeeded.to_string())
.with("failed", failed.to_string());
for error in errors {
failure = failure.with_related(error);
}
Err(failure)
}
}
pub async fn remove(&self, hash: Hash<K>) -> Result<Option<T>, Failure<TableAction>> {
let chunk_path = self.get_chunk_path(get_chunk_hash(hash));
let _lock = acquire_lock(&chunk_path)
.await
.map_err(Failure::wrap(TableAction::Remove))?;
let mut chunk = if chunk_path.exists() {
read_chunk::<K, C, T>(&chunk_path)
.await
.map_err(Failure::wrap(TableAction::Remove))?
} else {
BTreeMap::new()
};
let item = chunk.remove(&hash);
if item.is_some() {
write_chunk::<K, C, T>(&chunk_path, chunk)
.await
.map_err(Failure::wrap(TableAction::Remove))?;
}
trace!(hash = %hash, found = item.is_some(), "Remove item");
Ok(item)
}
}
fn get_chunk_hash<const K: usize, const C: usize>(hash: Hash<K>) -> Hash<C> {
hash.truncate::<C>().expect("should be able to truncate")
}
fn group_by_chunk<const K: usize, const C: usize, T>(
items: BTreeMap<Hash<K>, T>,
) -> BTreeMap<Hash<C>, BTreeMap<Hash<K>, T>> {
let mut chunks: BTreeMap<Hash<C>, BTreeMap<Hash<K>, T>> = BTreeMap::new();
for (hash, item) in items {
let chunk_hash = get_chunk_hash(hash);
chunks.entry(chunk_hash).or_insert_with(|| BTreeMap::new());
chunks
.get_mut(&chunk_hash)
.expect("should be created in not exist")
.insert(hash, item);
}
chunks
}
async fn read_chunk<const K: usize, const C: usize, T>(
path: impl AsRef<Path>,
) -> Result<BTreeMap<Hash<K>, T>, Failure<TableAction>>
where
T: DeserializeOwned,
{
let path = path.as_ref();
debug!(path = %path.display(), "Reading chunk");
let bytes = read(path)
.await
.map_err(Failure::wrap_with_path(TableAction::ReadChunk, path))?;
serde_yaml::from_slice(&bytes).map_err(Failure::wrap_with_path(TableAction::Deserialize, path))
}
async fn write_chunk<const K: usize, const C: usize, T>(
path: impl AsRef<Path>,
chunk: BTreeMap<Hash<K>, T>,
) -> Result<(), Failure<TableAction>>
where
T: Serialize,
{
let path = path.as_ref();
debug!(path = %path.display(), "Writing chunk");
let yaml = serde_yaml::to_string(&chunk)
.map_err(Failure::wrap_with_path(TableAction::Serialize, path))?;
write(path, yaml)
.await
.map_err(Failure::wrap_with_path(TableAction::WriteChunk, path))?;
Ok(())
}
async fn update_chunk<const K: usize, const C: usize, T>(
chunk_path: impl AsRef<Path>,
new_chunk: BTreeMap<Hash<K>, T>,
replace: bool,
) -> Result<usize, Failure<TableAction>>
where
T: DeserializeOwned + Serialize,
{
let chunk_path = chunk_path.as_ref();
let mut added = 0;
let _lock = acquire_lock(chunk_path)
.await
.map_err(Failure::wrap(TableAction::UpdateChunk))?;
let mut chunk = if chunk_path.exists() {
read_chunk::<K, C, T>(chunk_path)
.await
.map_err(Failure::wrap(TableAction::UpdateChunk))?
} else {
BTreeMap::new()
};
for (hash, item) in new_chunk {
if replace || !chunk.contains_key(&hash) {
chunk.insert(hash, item);
added += 1;
}
}
write_chunk::<K, C, T>(chunk_path, chunk)
.await
.map_err(Failure::wrap(TableAction::UpdateChunk))?;
Ok(added)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
pub enum TableAction {
#[error("get item")]
Get,
#[error("get all items")]
GetAll,
#[error("set item")]
Set,
#[error("remove item")]
Remove,
#[error("update chunk")]
UpdateChunk,
#[error("read chunk")]
ReadChunk,
#[error("write chunk")]
WriteChunk,
#[error("read directory")]
ReadDir,
#[error("read entry")]
ReadEntry,
#[error("acquire lock")]
AcquireLock,
#[error("serialize")]
Serialize,
#[error("deserialize chunk")]
Deserialize,
#[error("update multiple chunks")]
JoinTask,
#[error("set items")]
SetMany,
}