Skip to main content

rig_ballista/
catalog.rs

1//! Read-only metadata catalog seam.
2//!
3//! A "file" is the unit of pruning — equivalent to one Iceberg data
4//! file or any other column-store object the planner can skip without
5//! materialising. Implementations expose per-file statistics (the
6//! caller's sketch type `S`, e.g. an HLL + variance + grammar
7//! histogram) so the planner can prune *before* paying for I/O.
8//!
9//! The trait is intentionally generic over `S`. rig-ballista does not
10//! prescribe a sketch shape; downstream agents pick whatever
11//! statistics their pruner needs. The companion
12//! [`InMemoryMetadataCatalog`] is a thread-safe reference
13//! implementation suitable for tests and offline harnesses; the
14//! Iceberg+Ballista-backed implementation will plug into the same
15//! trait once the upstream toolchain stabilises.
16//!
17//! ```no_run
18//! use rig_ballista::catalog::{
19//!     FileId, FileStats, InMemoryMetadataCatalog, MetadataCatalog,
20//! };
21//!
22//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
23//! let cat = InMemoryMetadataCatalog::<u64>::new();
24//! cat.insert(FileStats { id: FileId::new(), partition: "auth".into(), sketch: 7 });
25//! let files = cat.list_files(Some("auth")).await?;
26//! assert_eq!(files.len(), 1);
27//! # Ok(()) }
28//! ```
29
30use std::sync::Arc;
31
32use async_trait::async_trait;
33use dashmap::DashMap;
34use serde::{Deserialize, Serialize};
35use thiserror::Error;
36use uuid::Uuid;
37
38/// Stable identifier for a stored file.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
40pub struct FileId(pub Uuid);
41
42impl FileId {
43    /// Mint a fresh, random [`FileId`].
44    pub fn new() -> Self {
45        Self(Uuid::new_v4())
46    }
47}
48
49impl Default for FileId {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl std::fmt::Display for FileId {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        write!(f, "{}", self.0)
58    }
59}
60
61/// File-level sketch summary — the Puffin-blob equivalent.
62///
63/// Generic over the caller's sketch type `S`. The sketch is the
64/// *aggregate* over the records within the file; the catalog backend
65/// (in-memory today, Iceberg+Puffin tomorrow) is responsible for that
66/// aggregation at write time.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct FileStats<S> {
69    /// Stable file identifier.
70    pub id: FileId,
71    /// Partition tag. Convention is caller-defined; the catalog only
72    /// uses string equality for filtering.
73    pub partition: String,
74    /// Caller-supplied summary statistics (HLL, variance, histograms,
75    /// etc.).
76    pub sketch: S,
77}
78
79/// Errors surfaced by [`MetadataCatalog`] implementations.
80#[derive(Debug, Error)]
81pub enum StorageError {
82    /// The catalog backend failed to enumerate or read metadata.
83    ///
84    /// The wrapped source is preserved so `Error::source()` walks to the
85    /// originating I/O / object-store / Iceberg failure.
86    #[error("catalog backend error: {0}")]
87    Backend(#[source] Box<dyn std::error::Error + Send + Sync>),
88
89    /// Direct lookup against an unknown file id.
90    #[error("file not found: {0}")]
91    NotFound(FileId),
92}
93
94impl StorageError {
95    /// Wrap an arbitrary backend error as [`StorageError::Backend`].
96    pub fn backend<E>(err: E) -> Self
97    where
98        E: Into<Box<dyn std::error::Error + Send + Sync>>,
99    {
100        Self::Backend(err.into())
101    }
102}
103
104/// Read-only metadata catalog. Implementations expose Puffin/Iceberg
105/// statistics so the planner can prune before materialising blocks.
106///
107/// `S` is the caller's sketch type. The trait is `async` because real
108/// Iceberg-backed implementations will perform network I/O against an
109/// object store.
110#[async_trait]
111pub trait MetadataCatalog<S>: Send + Sync
112where
113    S: Send + Sync + Clone + 'static,
114{
115    /// List every file matching `partition`, or all files when `None`.
116    async fn list_files(&self, partition: Option<&str>) -> Result<Vec<FileStats<S>>, StorageError>;
117
118    /// Fetch stats for one file by id.
119    async fn get(&self, id: FileId) -> Result<FileStats<S>, StorageError>;
120}
121
122/// In-memory [`MetadataCatalog`] used by tests, examples, and offline
123/// harnesses.
124///
125/// Backed by a [`DashMap`] keyed on [`FileId`] so concurrent inserts
126/// from multiple writer tasks are safe without coarse locking.
127#[derive(Debug)]
128pub struct InMemoryMetadataCatalog<S> {
129    files: DashMap<FileId, FileStats<S>>,
130}
131
132impl<S> Default for InMemoryMetadataCatalog<S> {
133    fn default() -> Self {
134        Self {
135            files: DashMap::new(),
136        }
137    }
138}
139
140impl<S> InMemoryMetadataCatalog<S>
141where
142    S: Send + Sync + Clone + 'static,
143{
144    /// Create a fresh catalog wrapped in [`Arc`] for cheap cloning.
145    pub fn new() -> Arc<Self> {
146        Arc::new(Self::default())
147    }
148
149    /// Insert (or overwrite) the stats for one file.
150    pub fn insert(&self, stats: FileStats<S>) {
151        self.files.insert(stats.id, stats);
152    }
153
154    /// Number of files currently catalogued.
155    pub fn len(&self) -> usize {
156        self.files.len()
157    }
158
159    /// Whether the catalog is empty.
160    pub fn is_empty(&self) -> bool {
161        self.files.is_empty()
162    }
163}
164
165#[async_trait]
166impl<S> MetadataCatalog<S> for InMemoryMetadataCatalog<S>
167where
168    S: Send + Sync + Clone + 'static,
169{
170    async fn list_files(&self, partition: Option<&str>) -> Result<Vec<FileStats<S>>, StorageError> {
171        let files = match partition {
172            None => self.files.iter().map(|kv| kv.value().clone()).collect(),
173            Some(p) => self
174                .files
175                .iter()
176                .filter(|kv| kv.value().partition == p)
177                .map(|kv| kv.value().clone())
178                .collect(),
179        };
180        Ok(files)
181    }
182
183    async fn get(&self, id: FileId) -> Result<FileStats<S>, StorageError> {
184        self.files
185            .get(&id)
186            .map(|kv| kv.value().clone())
187            .ok_or(StorageError::NotFound(id))
188    }
189}
190
191#[cfg(test)]
192#[allow(clippy::unwrap_used, clippy::panic)]
193mod tests {
194    use super::*;
195
196    fn stats(partition: &str, sketch: u32) -> FileStats<u32> {
197        FileStats {
198            id: FileId::new(),
199            partition: partition.into(),
200            sketch,
201        }
202    }
203
204    #[tokio::test]
205    async fn list_filters_by_partition() {
206        let cat = InMemoryMetadataCatalog::<u32>::new();
207        cat.insert(stats("a", 1));
208        cat.insert(stats("a", 2));
209        cat.insert(stats("b", 3));
210        assert_eq!(cat.list_files(None).await.unwrap().len(), 3);
211        assert_eq!(cat.list_files(Some("a")).await.unwrap().len(), 2);
212        assert_eq!(cat.list_files(Some("b")).await.unwrap().len(), 1);
213        assert_eq!(cat.list_files(Some("missing")).await.unwrap().len(), 0);
214    }
215
216    #[tokio::test]
217    async fn get_returns_not_found_for_unknown() {
218        let cat = InMemoryMetadataCatalog::<u32>::new();
219        match cat.get(FileId::new()).await {
220            Err(StorageError::NotFound(_)) => {}
221            other => panic!("expected NotFound, got {other:?}"),
222        }
223    }
224
225    #[tokio::test]
226    async fn insert_and_get_round_trips() {
227        let cat = InMemoryMetadataCatalog::<u32>::new();
228        let s = stats("p", 42);
229        let id = s.id;
230        cat.insert(s);
231        let got = cat.get(id).await.unwrap();
232        assert_eq!(got.sketch, 42);
233        assert_eq!(got.partition, "p");
234    }
235
236    #[tokio::test]
237    async fn len_and_is_empty_track_inserts() {
238        let cat = InMemoryMetadataCatalog::<u32>::new();
239        assert!(cat.is_empty());
240        cat.insert(stats("p", 1));
241        assert_eq!(cat.len(), 1);
242        assert!(!cat.is_empty());
243    }
244}