1use std::sync::Arc;
31
32use async_trait::async_trait;
33use dashmap::DashMap;
34use serde::{Deserialize, Serialize};
35use thiserror::Error;
36use uuid::Uuid;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
40pub struct FileId(pub Uuid);
41
42impl FileId {
43 pub fn new() -> Self {
45 Self(Uuid::new_v4())
46 }
47}
48
49impl Default for FileId {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl std::fmt::Display for FileId {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 write!(f, "{}", self.0)
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct FileStats<S> {
69 pub id: FileId,
71 pub partition: String,
74 pub sketch: S,
77}
78
79#[derive(Debug, Error)]
81pub enum StorageError {
82 #[error("catalog backend error: {0}")]
87 Backend(#[source] Box<dyn std::error::Error + Send + Sync>),
88
89 #[error("file not found: {0}")]
91 NotFound(FileId),
92}
93
94impl StorageError {
95 pub fn backend<E>(err: E) -> Self
97 where
98 E: Into<Box<dyn std::error::Error + Send + Sync>>,
99 {
100 Self::Backend(err.into())
101 }
102}
103
104#[async_trait]
111pub trait MetadataCatalog<S>: Send + Sync
112where
113 S: Send + Sync + Clone + 'static,
114{
115 async fn list_files(&self, partition: Option<&str>) -> Result<Vec<FileStats<S>>, StorageError>;
117
118 async fn get(&self, id: FileId) -> Result<FileStats<S>, StorageError>;
120}
121
122#[derive(Debug)]
128pub struct InMemoryMetadataCatalog<S> {
129 files: DashMap<FileId, FileStats<S>>,
130}
131
132impl<S> Default for InMemoryMetadataCatalog<S> {
133 fn default() -> Self {
134 Self {
135 files: DashMap::new(),
136 }
137 }
138}
139
140impl<S> InMemoryMetadataCatalog<S>
141where
142 S: Send + Sync + Clone + 'static,
143{
144 pub fn new() -> Arc<Self> {
146 Arc::new(Self::default())
147 }
148
149 pub fn insert(&self, stats: FileStats<S>) {
151 self.files.insert(stats.id, stats);
152 }
153
154 pub fn len(&self) -> usize {
156 self.files.len()
157 }
158
159 pub fn is_empty(&self) -> bool {
161 self.files.is_empty()
162 }
163}
164
165#[async_trait]
166impl<S> MetadataCatalog<S> for InMemoryMetadataCatalog<S>
167where
168 S: Send + Sync + Clone + 'static,
169{
170 async fn list_files(&self, partition: Option<&str>) -> Result<Vec<FileStats<S>>, StorageError> {
171 let files = match partition {
172 None => self.files.iter().map(|kv| kv.value().clone()).collect(),
173 Some(p) => self
174 .files
175 .iter()
176 .filter(|kv| kv.value().partition == p)
177 .map(|kv| kv.value().clone())
178 .collect(),
179 };
180 Ok(files)
181 }
182
183 async fn get(&self, id: FileId) -> Result<FileStats<S>, StorageError> {
184 self.files
185 .get(&id)
186 .map(|kv| kv.value().clone())
187 .ok_or(StorageError::NotFound(id))
188 }
189}
190
191#[cfg(test)]
192#[allow(clippy::unwrap_used, clippy::panic)]
193mod tests {
194 use super::*;
195
196 fn stats(partition: &str, sketch: u32) -> FileStats<u32> {
197 FileStats {
198 id: FileId::new(),
199 partition: partition.into(),
200 sketch,
201 }
202 }
203
204 #[tokio::test]
205 async fn list_filters_by_partition() {
206 let cat = InMemoryMetadataCatalog::<u32>::new();
207 cat.insert(stats("a", 1));
208 cat.insert(stats("a", 2));
209 cat.insert(stats("b", 3));
210 assert_eq!(cat.list_files(None).await.unwrap().len(), 3);
211 assert_eq!(cat.list_files(Some("a")).await.unwrap().len(), 2);
212 assert_eq!(cat.list_files(Some("b")).await.unwrap().len(), 1);
213 assert_eq!(cat.list_files(Some("missing")).await.unwrap().len(), 0);
214 }
215
216 #[tokio::test]
217 async fn get_returns_not_found_for_unknown() {
218 let cat = InMemoryMetadataCatalog::<u32>::new();
219 match cat.get(FileId::new()).await {
220 Err(StorageError::NotFound(_)) => {}
221 other => panic!("expected NotFound, got {other:?}"),
222 }
223 }
224
225 #[tokio::test]
226 async fn insert_and_get_round_trips() {
227 let cat = InMemoryMetadataCatalog::<u32>::new();
228 let s = stats("p", 42);
229 let id = s.id;
230 cat.insert(s);
231 let got = cat.get(id).await.unwrap();
232 assert_eq!(got.sketch, 42);
233 assert_eq!(got.partition, "p");
234 }
235
236 #[tokio::test]
237 async fn len_and_is_empty_track_inserts() {
238 let cat = InMemoryMetadataCatalog::<u32>::new();
239 assert!(cat.is_empty());
240 cat.insert(stats("p", 1));
241 assert_eq!(cat.len(), 1);
242 assert!(!cat.is_empty());
243 }
244}