1use std::fmt::Debug;
8
9use async_trait::async_trait;
10use iceberg::table::Table;
11use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent, TableRequirement, TableUpdate};
12
13#[async_trait]
19pub trait CatalogExt: Catalog {
20 async fn commit_table(
30 &self,
31 table_ident: &TableIdent,
32 requirements: Vec<TableRequirement>,
33 updates: Vec<TableUpdate>,
34 ) -> Result<Table>;
35
36 async fn update_table_metadata_location(
43 &self,
44 table_ident: &TableIdent,
45 new_metadata_location: String,
46 ) -> Result<Table>;
47
48 async fn commit_tables_atomic(
66 &self,
67 table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
68 ) -> Result<Vec<Table>>;
69
70 async fn storage_health_check(&self) -> Result<StorageHealthStatus>;
79}
80
81#[derive(Debug, Clone)]
83pub struct StorageHealthStatus {
84 pub backend_type: String,
86 pub healthy: bool,
88 pub latency_ms: u64,
90 pub message: Option<String>,
92}
93
94impl StorageHealthStatus {
95 pub fn healthy(backend_type: impl Into<String>, latency_ms: u64) -> Self {
97 Self {
98 backend_type: backend_type.into(),
99 healthy: true,
100 latency_ms,
101 message: None,
102 }
103 }
104
105 pub fn unhealthy(backend_type: impl Into<String>, message: impl Into<String>) -> Self {
107 Self {
108 backend_type: backend_type.into(),
109 healthy: false,
110 latency_ms: 0,
111 message: Some(message.into()),
112 }
113 }
114}
115
116#[derive(Debug)]
118pub struct ExtendedCatalog<C: Catalog> {
119 inner: C,
120}
121
122impl<C: Catalog> ExtendedCatalog<C> {
123 pub fn new(inner: C) -> Self {
125 Self { inner }
126 }
127}
128
129#[async_trait]
130impl<C: Catalog + Send + Sync> Catalog for ExtendedCatalog<C> {
131 async fn list_namespaces(
132 &self,
133 parent: Option<&iceberg::NamespaceIdent>,
134 ) -> Result<Vec<iceberg::NamespaceIdent>> {
135 self.inner.list_namespaces(parent).await
136 }
137
138 async fn create_namespace(
139 &self,
140 namespace: &iceberg::NamespaceIdent,
141 properties: std::collections::HashMap<String, String>,
142 ) -> Result<iceberg::Namespace> {
143 self.inner.create_namespace(namespace, properties).await
144 }
145
146 async fn get_namespace(
147 &self,
148 namespace: &iceberg::NamespaceIdent,
149 ) -> Result<iceberg::Namespace> {
150 self.inner.get_namespace(namespace).await
151 }
152
153 async fn namespace_exists(&self, namespace: &iceberg::NamespaceIdent) -> Result<bool> {
154 self.inner.namespace_exists(namespace).await
155 }
156
157 async fn update_namespace(
158 &self,
159 namespace: &iceberg::NamespaceIdent,
160 properties: std::collections::HashMap<String, String>,
161 ) -> Result<()> {
162 self.inner.update_namespace(namespace, properties).await
163 }
164
165 async fn drop_namespace(&self, namespace: &iceberg::NamespaceIdent) -> Result<()> {
166 self.inner.drop_namespace(namespace).await
167 }
168
169 async fn list_tables(&self, namespace: &iceberg::NamespaceIdent) -> Result<Vec<TableIdent>> {
170 self.inner.list_tables(namespace).await
171 }
172
173 async fn create_table(
174 &self,
175 namespace: &iceberg::NamespaceIdent,
176 creation: iceberg::TableCreation,
177 ) -> Result<Table> {
178 self.inner.create_table(namespace, creation).await
179 }
180
181 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
182 self.inner.load_table(table).await
183 }
184
185 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
186 self.inner.drop_table(table).await
187 }
188
189 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
190 self.inner.table_exists(table).await
191 }
192
193 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
194 self.inner.rename_table(src, dest).await
195 }
196
197 async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table> {
198 self.inner.register_table(table, metadata_location).await
199 }
200
201 async fn update_table(&self, commit: iceberg::TableCommit) -> Result<Table> {
202 self.inner.update_table(commit).await
203 }
204}
205
206#[async_trait]
207impl<C: Catalog + Send + Sync> CatalogExt for ExtendedCatalog<C> {
208 async fn commit_table(
209 &self,
210 table_ident: &TableIdent,
211 requirements: Vec<TableRequirement>,
212 updates: Vec<TableUpdate>,
213 ) -> Result<Table> {
214 let table = self.inner.load_table(table_ident).await?;
216
217 for requirement in &requirements {
219 requirement.check(Some(table.metadata()))?;
220 }
221
222 let current_metadata_location = table
224 .metadata_location()
225 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Table has no metadata location"))?;
226
227 let mut metadata_builder = table
229 .metadata()
230 .clone()
231 .into_builder(Some(current_metadata_location.to_string()));
232
233 for update in updates {
234 metadata_builder = update.apply(metadata_builder)?;
235 }
236
237 let new_metadata = metadata_builder.build()?;
239
240 let new_metadata_location = generate_new_metadata_location(current_metadata_location)?;
242
243 new_metadata
246 .metadata
247 .write_to(table.file_io(), &new_metadata_location)
248 .await?;
249
250 self.update_table_metadata_location(table_ident, new_metadata_location)
252 .await
253 }
254
255 async fn update_table_metadata_location(
256 &self,
257 table_ident: &TableIdent,
258 new_metadata_location: String,
259 ) -> Result<Table> {
260 tracing::debug!(
282 table = %table_ident,
283 new_location = %new_metadata_location,
284 "Updating table metadata location using drop-and-register pattern"
285 );
286
287 self.inner.drop_table(table_ident).await?;
289
290 match self
293 .inner
294 .register_table(table_ident, new_metadata_location.clone())
295 .await
296 {
297 Ok(table) => Ok(table),
298 Err(e) => {
299 tracing::error!(
300 table = %table_ident,
301 metadata_location = %new_metadata_location,
302 error = %e,
303 "CRITICAL: Table dropped but re-registration failed. \
304 Table entry is lost. Metadata file still exists at the \
305 specified location and can be manually recovered."
306 );
307 Err(e)
308 }
309 }
310 }
311
312 async fn commit_tables_atomic(
313 &self,
314 table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
315 ) -> Result<Vec<Table>> {
316 if table_changes.len() <= 1 {
318 let mut results = Vec::with_capacity(table_changes.len());
319 for (ident, reqs, updates) in table_changes {
320 let table = self.commit_table(&ident, reqs, updates).await?;
321 results.push(table);
322 }
323 return Ok(results);
324 }
325
326 Err(Error::new(
331 ErrorKind::FeatureUnsupported,
332 format!(
333 "Atomic multi-table commit ({} tables) not supported by this catalog backend. \
334 Enable slatedb-storage feature and use a persistent storage backend for true atomic commits.",
335 table_changes.len()
336 ),
337 ))
338 }
339
340 async fn storage_health_check(&self) -> Result<StorageHealthStatus> {
341 Ok(StorageHealthStatus::healthy("memory", 0))
344 }
345}
346
347fn generate_new_metadata_location(current_location: &str) -> Result<String> {
349 use std::path::Path;
350
351 let path = Path::new(current_location);
352 let parent = path.parent().map(|p| p.to_string_lossy().to_string());
353 let filename = path
354 .file_name()
355 .and_then(|f| f.to_str())
356 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Invalid metadata location"))?;
357
358 let version = filename
360 .split('-')
361 .next()
362 .and_then(|v| v.parse::<u32>().ok())
363 .unwrap_or(0);
364
365 let new_version = version + 1;
366 let new_uuid = uuid::Uuid::new_v4();
367 let new_filename = format!("{:05}-{}.metadata.json", new_version, new_uuid);
368
369 match parent {
370 Some(p) if !p.is_empty() => Ok(format!("{}/{}", p, new_filename)),
371 _ => Ok(new_filename),
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn test_generate_new_metadata_location() {
381 let current = "s3://bucket/warehouse/db/table/metadata/00001-abc.metadata.json";
382 let new_loc = generate_new_metadata_location(current).unwrap();
383
384 assert!(new_loc.starts_with("s3://bucket/warehouse/db/table/metadata/00002-"));
385 assert!(new_loc.ends_with(".metadata.json"));
386 }
387
388 #[test]
389 fn test_generate_new_metadata_location_local() {
390 let current = "/tmp/warehouse/metadata/00005-xyz.metadata.json";
391 let new_loc = generate_new_metadata_location(current).unwrap();
392
393 assert!(new_loc.starts_with("/tmp/warehouse/metadata/00006-"));
394 assert!(new_loc.ends_with(".metadata.json"));
395 }
396}