1use std::collections::HashMap;
6use std::future::Future;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use iceberg::io::{FileIOBuilder, StorageFactory};
11use iceberg::{CatalogBuilder, Error, ErrorKind, Result};
12
13use crate::catalog::RedbCatalog;
14use crate::error::map_redb;
15use crate::store::Store;
16
17pub const REDB_CATALOG_PROP_DB_PATH: &str = "redb.db-path";
21
22pub const REDB_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
26
27pub const REDB_CATALOG_PROP_METADATA_CACHE_BYTES: &str = "redb.metadata-cache-bytes";
32
33pub const REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY: &str = "redb.table-handle-cache-capacity";
38
39pub const REDB_CATALOG_PROP_DURABILITY: &str = "redb.durability";
43
44#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
47pub enum WriteDurability {
48 #[default]
50 Immediate,
51 Eventual,
55 None,
59}
60
61impl WriteDurability {
62 pub(crate) fn to_redb(self) -> redb::Durability {
63 match self {
64 WriteDurability::Immediate => redb::Durability::Immediate,
65 WriteDurability::Eventual => redb::Durability::Eventual,
66 WriteDurability::None => redb::Durability::None,
67 }
68 }
69
70 fn parse(s: &str) -> Option<Self> {
71 match s.trim().to_ascii_lowercase().as_str() {
72 "immediate" => Some(Self::Immediate),
73 "eventual" => Some(Self::Eventual),
74 "none" => Some(Self::None),
75 _ => None,
76 }
77 }
78}
79
80#[derive(Debug, Default)]
82pub struct RedbCatalogBuilder {
83 db_path: Option<PathBuf>,
84 warehouse_location: Option<String>,
85 storage_factory: Option<Arc<dyn StorageFactory>>,
86 metadata_cache_bytes: Option<u64>,
87 table_handle_cache_capacity: Option<u64>,
88 durability: Option<WriteDurability>,
89 props: HashMap<String, String>,
90}
91
92impl RedbCatalogBuilder {
93 pub fn db_path(mut self, path: impl Into<PathBuf>) -> Self {
95 self.db_path = Some(path.into());
96 self
97 }
98
99 pub fn warehouse_location(mut self, loc: impl Into<String>) -> Self {
101 self.warehouse_location = Some(loc.into());
102 self
103 }
104
105 pub fn metadata_cache_bytes(mut self, bytes: u64) -> Self {
109 self.metadata_cache_bytes = Some(bytes);
110 self
111 }
112
113 pub fn table_handle_cache_capacity(mut self, capacity: u64) -> Self {
119 self.table_handle_cache_capacity = Some(capacity);
120 self
121 }
122
123 pub fn durability(mut self, durability: WriteDurability) -> Self {
126 self.durability = Some(durability);
127 self
128 }
129
130 pub fn props(mut self, props: HashMap<String, String>) -> Self {
132 for (k, v) in props {
133 self.props.insert(k, v);
134 }
135 self
136 }
137
138 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
140 self.props.insert(key.into(), value.into());
141 self
142 }
143}
144
145impl CatalogBuilder for RedbCatalogBuilder {
146 type C = RedbCatalog;
147
148 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
149 self.storage_factory = Some(storage_factory);
150 self
151 }
152
153 fn load(
154 mut self,
155 name: impl Into<String>,
156 props: HashMap<String, String>,
157 ) -> impl Future<Output = Result<Self::C>> + Send {
158 for (k, v) in props {
159 self.props.insert(k, v);
160 }
161 if let Some(p) = self.props.remove(REDB_CATALOG_PROP_DB_PATH) {
162 self.db_path = Some(PathBuf::from(p));
163 }
164 if let Some(w) = self.props.remove(REDB_CATALOG_PROP_WAREHOUSE) {
165 self.warehouse_location = Some(w);
166 }
167 let cache_bytes_prop = self.props.remove(REDB_CATALOG_PROP_METADATA_CACHE_BYTES);
168 let table_cache_prop = self
169 .props
170 .remove(REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY);
171 let durability_prop = self.props.remove(REDB_CATALOG_PROP_DURABILITY);
172
173 let name = name.into();
174 let db_path = self.db_path.clone();
175 let warehouse_location = self.warehouse_location.clone();
176 let storage_factory = self.storage_factory.clone();
177 let metadata_cache_bytes = self.metadata_cache_bytes;
178 let table_handle_cache_capacity = self.table_handle_cache_capacity;
179 let durability = self.durability;
180
181 async move {
182 if name.trim().is_empty() {
183 return Err(Error::new(
184 ErrorKind::DataInvalid,
185 "Catalog name cannot be empty",
186 ));
187 }
188 let db_path = db_path.ok_or_else(|| {
189 Error::new(
190 ErrorKind::DataInvalid,
191 format!(
192 "redb catalog requires `{REDB_CATALOG_PROP_DB_PATH}` to be set, \
193 either via RedbCatalogBuilder::db_path or props"
194 ),
195 )
196 })?;
197 let warehouse_location = warehouse_location.ok_or_else(|| {
198 Error::new(
199 ErrorKind::DataInvalid,
200 format!(
201 "redb catalog requires `{REDB_CATALOG_PROP_WAREHOUSE}` to be set, \
202 either via RedbCatalogBuilder::warehouse_location or props"
203 ),
204 )
205 })?;
206 let factory = storage_factory.ok_or_else(|| {
207 Error::new(
208 ErrorKind::Unexpected,
209 "StorageFactory must be provided for RedbCatalog. \
210 Use `with_storage_factory` to configure it.",
211 )
212 })?;
213 let metadata_cache_bytes = match cache_bytes_prop {
217 Some(s) => s.trim().parse::<u64>().map_err(|_| {
218 Error::new(
219 ErrorKind::DataInvalid,
220 format!(
221 "`{REDB_CATALOG_PROP_METADATA_CACHE_BYTES}` must be a non-negative \
222 integer number of bytes, got `{s}`"
223 ),
224 )
225 })?,
226 None => metadata_cache_bytes.unwrap_or(crate::DEFAULT_METADATA_CACHE_BYTES),
227 };
228
229 let table_handle_cache_capacity = match table_cache_prop {
230 Some(s) => s.trim().parse::<u64>().map_err(|_| {
231 Error::new(
232 ErrorKind::DataInvalid,
233 format!(
234 "`{REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY}` must be a \
235 non-negative integer, got `{s}`"
236 ),
237 )
238 })?,
239 None => table_handle_cache_capacity
240 .unwrap_or(crate::DEFAULT_TABLE_HANDLE_CACHE_CAPACITY),
241 };
242
243 let durability = match durability_prop {
244 Some(s) => WriteDurability::parse(&s).ok_or_else(|| {
245 Error::new(
246 ErrorKind::DataInvalid,
247 format!(
248 "`{REDB_CATALOG_PROP_DURABILITY}` must be one of \
249 immediate|eventual|none, got `{s}`"
250 ),
251 )
252 })?,
253 None => durability.unwrap_or_default(),
254 };
255
256 let fileio = FileIOBuilder::new(factory).build();
257
258 if let Some(parent) = db_path.parent() {
259 if !parent.as_os_str().is_empty() {
260 std::fs::create_dir_all(parent).map_err(|e| {
261 Error::new(
262 ErrorKind::Unexpected,
263 format!("failed to create catalog dir {}: {e}", parent.display()),
264 )
265 })?;
266 }
267 }
268
269 let store = Store::open(db_path, durability.to_redb()).map_err(map_redb)?;
270
271 Ok(RedbCatalog {
272 name,
273 warehouse_location,
274 fileio,
275 store,
276 meta_cache: crate::meta_cache::MetadataCache::new(metadata_cache_bytes),
277 table_cache: crate::table_cache::TableHandleCache::new(table_handle_cache_capacity),
278 })
279 }
280 }
281}