use std::sync::Arc;
use async_trait::async_trait;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileId(pub Uuid);
impl FileId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
impl Default for FileId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for FileId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileStats<S> {
pub id: FileId,
pub partition: String,
pub sketch: S,
}
#[derive(Debug, Error)]
pub enum StorageError {
#[error("catalog backend error: {0}")]
Backend(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("file not found: {0}")]
NotFound(FileId),
}
impl StorageError {
pub fn backend<E>(err: E) -> Self
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Self::Backend(err.into())
}
}
#[async_trait]
pub trait MetadataCatalog<S>: Send + Sync
where
S: Send + Sync + Clone + 'static,
{
async fn list_files(&self, partition: Option<&str>) -> Result<Vec<FileStats<S>>, StorageError>;
async fn get(&self, id: FileId) -> Result<FileStats<S>, StorageError>;
}
#[derive(Debug)]
pub struct InMemoryMetadataCatalog<S> {
files: DashMap<FileId, FileStats<S>>,
}
impl<S> Default for InMemoryMetadataCatalog<S> {
fn default() -> Self {
Self {
files: DashMap::new(),
}
}
}
impl<S> InMemoryMetadataCatalog<S>
where
S: Send + Sync + Clone + 'static,
{
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn insert(&self, stats: FileStats<S>) {
self.files.insert(stats.id, stats);
}
pub fn len(&self) -> usize {
self.files.len()
}
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
#[async_trait]
impl<S> MetadataCatalog<S> for InMemoryMetadataCatalog<S>
where
S: Send + Sync + Clone + 'static,
{
async fn list_files(&self, partition: Option<&str>) -> Result<Vec<FileStats<S>>, StorageError> {
let files = match partition {
None => self.files.iter().map(|kv| kv.value().clone()).collect(),
Some(p) => self
.files
.iter()
.filter(|kv| kv.value().partition == p)
.map(|kv| kv.value().clone())
.collect(),
};
Ok(files)
}
async fn get(&self, id: FileId) -> Result<FileStats<S>, StorageError> {
self.files
.get(&id)
.map(|kv| kv.value().clone())
.ok_or(StorageError::NotFound(id))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn stats(partition: &str, sketch: u32) -> FileStats<u32> {
FileStats {
id: FileId::new(),
partition: partition.into(),
sketch,
}
}
#[tokio::test]
async fn list_filters_by_partition() {
let cat = InMemoryMetadataCatalog::<u32>::new();
cat.insert(stats("a", 1));
cat.insert(stats("a", 2));
cat.insert(stats("b", 3));
assert_eq!(cat.list_files(None).await.unwrap().len(), 3);
assert_eq!(cat.list_files(Some("a")).await.unwrap().len(), 2);
assert_eq!(cat.list_files(Some("b")).await.unwrap().len(), 1);
assert_eq!(cat.list_files(Some("missing")).await.unwrap().len(), 0);
}
#[tokio::test]
async fn get_returns_not_found_for_unknown() {
let cat = InMemoryMetadataCatalog::<u32>::new();
match cat.get(FileId::new()).await {
Err(StorageError::NotFound(_)) => {}
other => panic!("expected NotFound, got {other:?}"),
}
}
#[tokio::test]
async fn insert_and_get_round_trips() {
let cat = InMemoryMetadataCatalog::<u32>::new();
let s = stats("p", 42);
let id = s.id;
cat.insert(s);
let got = cat.get(id).await.unwrap();
assert_eq!(got.sketch, 42);
assert_eq!(got.partition, "p");
}
#[tokio::test]
async fn len_and_is_empty_track_inserts() {
let cat = InMemoryMetadataCatalog::<u32>::new();
assert!(cat.is_empty());
cat.insert(stats("p", 1));
assert_eq!(cat.len(), 1);
assert!(!cat.is_empty());
}
}