Skip to main content

skade_katalog/
builder.rs

1// Apache-2.0 licensed.
2
3//! Builder for [`RedbCatalog`].
4
5use std::collections::HashMap;
6use std::future::Future;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use iceberg::io::{FileIOBuilder, StorageFactory};
11use iceberg::{CatalogBuilder, Error, ErrorKind, Result};
12
13use crate::catalog::RedbCatalog;
14use crate::error::map_redb;
15use crate::store::Store;
16
17/// Property key for the redb database file path. May be set either via
18/// [`RedbCatalogBuilder::db_path`] or in the `props` map passed to
19/// [`iceberg::CatalogBuilder::load`].
20pub const REDB_CATALOG_PROP_DB_PATH: &str = "redb.db-path";
21
22/// Property key for the warehouse root URI. May be set either via
23/// [`RedbCatalogBuilder::warehouse_location`] or in the `props` map passed to
24/// [`iceberg::CatalogBuilder::load`].
25pub const REDB_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
26
27/// Property key for the L0 parsed-metadata cache budget, in bytes. May be set
28/// via [`RedbCatalogBuilder::metadata_cache_bytes`] or in the `props` map. A
29/// value of `0` disables the cache. Defaults to
30/// [`crate::DEFAULT_METADATA_CACHE_BYTES`] (128 MiB) when unset.
31pub const REDB_CATALOG_PROP_METADATA_CACHE_BYTES: &str = "redb.metadata-cache-bytes";
32
33/// Property key for the built-`Table` handle cache capacity (number of resident
34/// handles). May be set via [`RedbCatalogBuilder::table_handle_cache_capacity`]
35/// or in the `props` map. A value of `0` disables the cache. Defaults to
36/// [`crate::DEFAULT_TABLE_HANDLE_CACHE_CAPACITY`] when unset.
37pub const REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY: &str = "redb.table-handle-cache-capacity";
38
39/// Property key for write durability. Values: `immediate` (default, fsync per
40/// commit), `eventual`, `none`. May be set via
41/// [`RedbCatalogBuilder::durability`] or in the `props` map.
42pub const REDB_CATALOG_PROP_DURABILITY: &str = "redb.durability";
43
44/// Durability applied to every catalog-mutation commit. Maps onto redb's
45/// durability levels.
46#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
47pub enum WriteDurability {
48    /// fsync on every commit — crash-safe. The default.
49    #[default]
50    Immediate,
51    /// Queued for persistence; persists shortly after `commit` returns. Higher
52    /// write throughput, may lose the last few commits on a hard crash (the
53    /// database stays internally consistent).
54    Eventual,
55    /// No fsync — fastest writes, durability is best-effort. Intended for bulk
56    /// import / benchmarking. Note: redb only frees pages at higher durability
57    /// levels, so exclusive `None` use grows the file.
58    None,
59}
60
61impl WriteDurability {
62    pub(crate) fn to_redb(self) -> redb::Durability {
63        match self {
64            WriteDurability::Immediate => redb::Durability::Immediate,
65            WriteDurability::Eventual => redb::Durability::Eventual,
66            WriteDurability::None => redb::Durability::None,
67        }
68    }
69
70    fn parse(s: &str) -> Option<Self> {
71        match s.trim().to_ascii_lowercase().as_str() {
72            "immediate" => Some(Self::Immediate),
73            "eventual" => Some(Self::Eventual),
74            "none" => Some(Self::None),
75            _ => None,
76        }
77    }
78}
79
80/// Builder for [`RedbCatalog`].
81#[derive(Debug, Default)]
82pub struct RedbCatalogBuilder {
83    db_path: Option<PathBuf>,
84    warehouse_location: Option<String>,
85    storage_factory: Option<Arc<dyn StorageFactory>>,
86    metadata_cache_bytes: Option<u64>,
87    table_handle_cache_capacity: Option<u64>,
88    durability: Option<WriteDurability>,
89    props: HashMap<String, String>,
90}
91
92impl RedbCatalogBuilder {
93    /// Set the redb database file path.
94    pub fn db_path(mut self, path: impl Into<PathBuf>) -> Self {
95        self.db_path = Some(path.into());
96        self
97    }
98
99    /// Set the warehouse root URI (e.g. `file:///var/.../warehouse`).
100    pub fn warehouse_location(mut self, loc: impl Into<String>) -> Self {
101        self.warehouse_location = Some(loc.into());
102        self
103    }
104
105    /// Set the L0 parsed-metadata cache budget in bytes (e.g. `64 * 1024 * 1024`
106    /// for 64 MiB). `0` disables the cache. Defaults to
107    /// [`crate::DEFAULT_METADATA_CACHE_BYTES`] (128 MiB) when unset.
108    pub fn metadata_cache_bytes(mut self, bytes: u64) -> Self {
109        self.metadata_cache_bytes = Some(bytes);
110        self
111    }
112
113    /// Set the built-`Table` handle cache capacity (number of resident handles).
114    /// `0` disables it. Defaults to
115    /// [`crate::DEFAULT_TABLE_HANDLE_CACHE_CAPACITY`] when unset. A warm
116    /// `load_table` hit then costs an `Arc`-sharing `Table` clone (~100 ns)
117    /// instead of rebuilding iceberg's per-`Table` `ObjectCache` (~14 µs).
118    pub fn table_handle_cache_capacity(mut self, capacity: u64) -> Self {
119        self.table_handle_cache_capacity = Some(capacity);
120        self
121    }
122
123    /// Set write durability for catalog mutations. Defaults to
124    /// [`WriteDurability::Immediate`] (fsync per commit).
125    pub fn durability(mut self, durability: WriteDurability) -> Self {
126        self.durability = Some(durability);
127        self
128    }
129
130    /// Bulk-set arbitrary properties. Equivalent to repeated `prop` calls.
131    pub fn props(mut self, props: HashMap<String, String>) -> Self {
132        for (k, v) in props {
133            self.props.insert(k, v);
134        }
135        self
136    }
137
138    /// Set a single property.
139    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
140        self.props.insert(key.into(), value.into());
141        self
142    }
143}
144
145impl CatalogBuilder for RedbCatalogBuilder {
146    type C = RedbCatalog;
147
148    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
149        self.storage_factory = Some(storage_factory);
150        self
151    }
152
153    fn load(
154        mut self,
155        name: impl Into<String>,
156        props: HashMap<String, String>,
157    ) -> impl Future<Output = Result<Self::C>> + Send {
158        for (k, v) in props {
159            self.props.insert(k, v);
160        }
161        if let Some(p) = self.props.remove(REDB_CATALOG_PROP_DB_PATH) {
162            self.db_path = Some(PathBuf::from(p));
163        }
164        if let Some(w) = self.props.remove(REDB_CATALOG_PROP_WAREHOUSE) {
165            self.warehouse_location = Some(w);
166        }
167        let cache_bytes_prop = self.props.remove(REDB_CATALOG_PROP_METADATA_CACHE_BYTES);
168        let table_cache_prop = self
169            .props
170            .remove(REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY);
171        let durability_prop = self.props.remove(REDB_CATALOG_PROP_DURABILITY);
172
173        let name = name.into();
174        let db_path = self.db_path.clone();
175        let warehouse_location = self.warehouse_location.clone();
176        let storage_factory = self.storage_factory.clone();
177        let metadata_cache_bytes = self.metadata_cache_bytes;
178        let table_handle_cache_capacity = self.table_handle_cache_capacity;
179        let durability = self.durability;
180
181        async move {
182            if name.trim().is_empty() {
183                return Err(Error::new(
184                    ErrorKind::DataInvalid,
185                    "Catalog name cannot be empty",
186                ));
187            }
188            let db_path = db_path.ok_or_else(|| {
189                Error::new(
190                    ErrorKind::DataInvalid,
191                    format!(
192                        "redb catalog requires `{REDB_CATALOG_PROP_DB_PATH}` to be set, \
193                         either via RedbCatalogBuilder::db_path or props"
194                    ),
195                )
196            })?;
197            let warehouse_location = warehouse_location.ok_or_else(|| {
198                Error::new(
199                    ErrorKind::DataInvalid,
200                    format!(
201                        "redb catalog requires `{REDB_CATALOG_PROP_WAREHOUSE}` to be set, \
202                         either via RedbCatalogBuilder::warehouse_location or props"
203                    ),
204                )
205            })?;
206            let factory = storage_factory.ok_or_else(|| {
207                Error::new(
208                    ErrorKind::Unexpected,
209                    "StorageFactory must be provided for RedbCatalog. \
210                     Use `with_storage_factory` to configure it.",
211                )
212            })?;
213            // A `props` entry overrides the typed builder field (consistent with
214            // db-path / warehouse handling above); otherwise fall back to the
215            // field, then the crate default.
216            let metadata_cache_bytes = match cache_bytes_prop {
217                Some(s) => s.trim().parse::<u64>().map_err(|_| {
218                    Error::new(
219                        ErrorKind::DataInvalid,
220                        format!(
221                            "`{REDB_CATALOG_PROP_METADATA_CACHE_BYTES}` must be a non-negative \
222                             integer number of bytes, got `{s}`"
223                        ),
224                    )
225                })?,
226                None => metadata_cache_bytes.unwrap_or(crate::DEFAULT_METADATA_CACHE_BYTES),
227            };
228
229            let table_handle_cache_capacity = match table_cache_prop {
230                Some(s) => s.trim().parse::<u64>().map_err(|_| {
231                    Error::new(
232                        ErrorKind::DataInvalid,
233                        format!(
234                            "`{REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY}` must be a \
235                             non-negative integer, got `{s}`"
236                        ),
237                    )
238                })?,
239                None => table_handle_cache_capacity
240                    .unwrap_or(crate::DEFAULT_TABLE_HANDLE_CACHE_CAPACITY),
241            };
242
243            let durability = match durability_prop {
244                Some(s) => WriteDurability::parse(&s).ok_or_else(|| {
245                    Error::new(
246                        ErrorKind::DataInvalid,
247                        format!(
248                            "`{REDB_CATALOG_PROP_DURABILITY}` must be one of \
249                             immediate|eventual|none, got `{s}`"
250                        ),
251                    )
252                })?,
253                None => durability.unwrap_or_default(),
254            };
255
256            let fileio = FileIOBuilder::new(factory).build();
257
258            if let Some(parent) = db_path.parent() {
259                if !parent.as_os_str().is_empty() {
260                    std::fs::create_dir_all(parent).map_err(|e| {
261                        Error::new(
262                            ErrorKind::Unexpected,
263                            format!("failed to create catalog dir {}: {e}", parent.display()),
264                        )
265                    })?;
266                }
267            }
268
269            let store = Store::open(db_path, durability.to_redb()).map_err(map_redb)?;
270
271            Ok(RedbCatalog {
272                name,
273                warehouse_location,
274                fileio,
275                store,
276                meta_cache: crate::meta_cache::MetadataCache::new(metadata_cache_bytes),
277                table_cache: crate::table_cache::TableHandleCache::new(table_handle_cache_capacity),
278            })
279        }
280    }
281}