1use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7
8use enum_display::EnumDisplay;
9use uuid::Uuid;
10
11use crate::config::{TableConfig, TableConfigRef};
12use crate::error::Result;
13use crate::table::{Namespace, TableIdentifier};
14use crate::types::{
15 PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, SnapshotReferenceType,
16 SortOrder, TableMetadata,
17};
18use crate::Table;
19
20mod rest;
21
22pub use rest::*;
23
24mod storage;
25
26use crate::error::{Error, ErrorKind};
27pub use storage::*;
28
29mod io;
30
31pub use io::*;
32
33mod layer;
34#[cfg(feature = "prometheus")]
35pub mod prometheus;
36
37pub use layer::*;
38
39pub type CatalogRef = Arc<dyn Catalog>;
41
42#[async_trait]
44pub trait Catalog: Send + Sync {
45 fn name(&self) -> &str;
47
48 async fn list_tables(self: Arc<Self>, _ns: &Namespace) -> Result<Vec<TableIdentifier>> {
50 Err(Error::new(
51 ErrorKind::IcebergFeatureUnsupported,
52 format!("list_tables is not supported by {}", self.name()),
53 ))
54 }
55
56 async fn create_table(
58 self: Arc<Self>,
59 _table_name: &TableIdentifier,
60 _schema: &Schema,
61 _spec: &PartitionSpec,
62 _location: &str,
63 _props: HashMap<String, String>,
64 ) -> Result<Table> {
65 Err(Error::new(
66 ErrorKind::IcebergFeatureUnsupported,
67 format!("create_table is not supported by {}", self.name()),
68 ))
69 }
70
71 async fn table_exists(self: Arc<Self>, _table_name: &TableIdentifier) -> Result<bool> {
73 Err(Error::new(
74 ErrorKind::IcebergFeatureUnsupported,
75 format!("table_exists is not supported by {}", self.name()),
76 ))
77 }
78
79 async fn drop_table(
81 self: Arc<Self>,
82 _table_name: &TableIdentifier,
83 _purge: bool,
84 ) -> Result<()> {
85 Err(Error::new(
86 ErrorKind::IcebergFeatureUnsupported,
87 format!("drop_table is not supported by {}", self.name()),
88 ))
89 }
90
91 async fn rename_table(
93 self: Arc<Self>,
94 _from: &TableIdentifier,
95 _to: &TableIdentifier,
96 ) -> Result<()> {
97 Err(Error::new(
98 ErrorKind::IcebergFeatureUnsupported,
99 format!("rename_table is not supported by {}", self.name()),
100 ))
101 }
102
103 async fn load_table(self: Arc<Self>, _table_name: &TableIdentifier) -> Result<Table> {
105 Err(Error::new(
106 ErrorKind::IcebergFeatureUnsupported,
107 format!("load_table is not supported by {}", self.name()),
108 ))
109 }
110
111 async fn invalidate_table(self: Arc<Self>, _table_name: &TableIdentifier) -> Result<()> {
113 Err(Error::new(
114 ErrorKind::IcebergFeatureUnsupported,
115 format!("invalidate_table is not supported by {}", self.name()),
116 ))
117 }
118
119 async fn register_table(
121 self: Arc<Self>,
122 _table_name: &TableIdentifier,
123 _metadata_file_location: &str,
124 ) -> Result<Table> {
125 Err(Error::new(
126 ErrorKind::IcebergFeatureUnsupported,
127 format!("register_table is not supported by {}", self.name()),
128 ))
129 }
130
131 async fn update_table(self: Arc<Self>, _update_table: &UpdateTable) -> Result<Table> {
133 Err(Error::new(
134 ErrorKind::IcebergFeatureUnsupported,
135 format!("update_table is not supported by {}", self.name()),
136 ))
137 }
138}
139
140#[derive(Debug, Clone, EnumDisplay)]
142pub enum UpdateRequirement {
143 AssertTableDoesNotExist,
145 AssertTableUUID(Uuid),
147 AssertRefSnapshotID {
149 name: String,
151 snapshot_id: i64,
153 },
154 AssertLastAssignedFieldId {
156 last_assigned_field_id: i32,
158 },
159 AssertCurrentSchemaID {
161 schema_id: i32,
163 },
164 AssertLastAssignedPartitionId {
166 last_assigned_partition_id: i32,
168 },
169 AssertDefaultSpecID {
171 spec_id: i32,
173 },
174 AssertDefaultSortOrderID {
176 sort_order_id: i32,
178 },
179}
180
181impl UpdateRequirement {
182 fn check(&self, table_metadata: &TableMetadata) -> bool {
183 match self {
184 UpdateRequirement::AssertTableDoesNotExist => false,
185 UpdateRequirement::AssertTableUUID(uuid) => {
186 table_metadata.table_uuid == uuid.to_string()
187 }
188 _ => todo!(),
189 }
190 }
191}
192
193#[derive(Debug, Clone, EnumDisplay)]
195pub enum MetadataUpdate {
196 AssignUuid(Uuid),
198 UpgradeFormatVersion(i32),
200 AddSchema {
202 schema: Schema,
204 last_column_id: i32,
206 },
207 SetCurrentSchema {
209 schema_id: i32,
211 },
212 AddPartitionSpec {
214 spec_id: i32,
216 fields: Vec<PartitionField>,
218 },
219 SetDefaultPartitionSpec {
221 spec_id: i32,
223 },
224 AddSortOrder {
226 sort_order: SortOrder,
228 },
229 SetDefaultSortOrder {
231 sort_order_id: i32,
233 },
234 AddSnapshot {
236 snapshot: Snapshot,
238 },
239 RemoveSnapshot {
241 snapshot_id: i64,
243 },
244 RemoveSnapshotRef {
246 ref_name: String,
248 },
249 SetSnapshotRef {
251 ref_name: String,
253 snapshot_id: i64,
255 typ: SnapshotReferenceType,
257 min_snapshots_to_keep: Option<i32>,
259 max_snapshot_ages: Option<i64>,
261 max_ref_ages: Option<i64>,
263 },
264 SetProperties {
266 props: HashMap<String, String>,
268 },
269 RemoveProperties {
271 removed: HashSet<String>,
273 },
274 SetLocation {
276 location: String,
278 },
279}
280
281impl MetadataUpdate {
282 fn apply(&self, metadata: &mut TableMetadata) -> Result<()> {
283 match self {
284 MetadataUpdate::AddSnapshot { snapshot } => metadata.add_snapshot(snapshot.clone()),
285 MetadataUpdate::SetSnapshotRef {
286 ref_name,
287 snapshot_id,
288 typ,
289 min_snapshots_to_keep,
290 max_snapshot_ages,
291 max_ref_ages,
292 } => metadata.set_snapshot_ref(
293 ref_name.as_str(),
294 SnapshotReference {
295 snapshot_id: *snapshot_id,
296 typ: *typ,
297 min_snapshots_to_keep: *min_snapshots_to_keep,
298 max_snapshot_age_ms: *max_snapshot_ages,
299 max_ref_age_ms: *max_ref_ages,
300 },
301 ),
302 other => Err(Error::new(
303 ErrorKind::IcebergFeatureUnsupported,
304 format!("update {other} is not supported"),
305 )),
306 }
307 }
308}
309
310pub struct UpdateTable {
312 table_name: TableIdentifier,
313 requirements: Vec<UpdateRequirement>,
314 updates: Vec<MetadataUpdate>,
315}
316
317pub struct UpdateTableBuilder(UpdateTable);
319
320impl UpdateTable {
321 pub fn builder(table_name: TableIdentifier) -> UpdateTableBuilder {
323 UpdateTableBuilder(UpdateTable {
324 table_name,
325 requirements: vec![],
326 updates: vec![],
327 })
328 }
329
330 pub fn table_name(&self) -> &TableIdentifier {
332 &self.table_name
333 }
334
335 pub fn requirements(&self) -> &[UpdateRequirement] {
337 &self.requirements
338 }
339
340 pub fn updates(&self) -> &[MetadataUpdate] {
342 &self.updates
343 }
344}
345
346impl UpdateTableBuilder {
347 pub fn add_requirements(
349 &mut self,
350 requirements: impl IntoIterator<Item = UpdateRequirement>,
351 ) -> &mut Self {
352 self.0.requirements.extend(requirements);
353 self
354 }
355
356 pub fn add_updates(&mut self, updates: impl IntoIterator<Item = MetadataUpdate>) -> &mut Self {
358 self.0.updates.extend(updates);
359 self
360 }
361
362 pub fn build(self) -> UpdateTable {
364 self.0
365 }
366}
367
368pub const CATALOG_TYPE: &str = "iceberg.catalog.type";
370pub const CATALOG_NAME: &str = "iceberg.catalog.name";
372pub(crate) const CATALOG_CONFIG_PREFIX: &str = "iceberg.catalog.";
373const TABLE_IO_PREFIX: &str = "iceberg.table.io.";
374
375#[derive(Debug, Default)]
377pub struct BaseCatalogConfig {
378 pub name: String,
380 pub table_io_configs: HashMap<String, String>,
382 pub table_config: TableConfigRef,
384}
385
386pub async fn load_catalog(configs: &HashMap<String, String>) -> Result<CatalogRef> {
460 log::info!("Loading catalog from configs: {:?}", configs);
461 let catalog_type = configs.get(CATALOG_TYPE).ok_or_else(|| {
462 Error::new(
463 ErrorKind::IcebergDataInvalid,
464 format!("{CATALOG_TYPE} is not set"),
465 )
466 })?;
467
468 let base_catalog_config = load_iceberg_base_catalog_config(configs)?;
469
470 match catalog_type.as_str() {
471 "storage" => Ok(Arc::new(
472 StorageCatalog::from_config(base_catalog_config, configs).await?,
473 )),
474 "rest" => Ok(Arc::new(
475 RestCatalog::new(base_catalog_config, configs).await?,
476 )),
477 _ => Err(Error::new(
478 ErrorKind::IcebergDataInvalid,
479 format!("Unsupported catalog type: {catalog_type}"),
480 )),
481 }
482}
483
484pub fn load_iceberg_base_catalog_config(
486 configs: &HashMap<String, String>,
487) -> Result<BaseCatalogConfig> {
488 log::info!("Loading base catalog config from configs: {:?}", configs);
489
490 let catalog_name = configs.get(CATALOG_NAME).ok_or_else(|| {
491 Error::new(
492 ErrorKind::IcebergDataInvalid,
493 format!("{CATALOG_NAME} is not set"),
494 )
495 })?;
496
497 let table_io_configs = configs
498 .iter()
499 .filter(|(k, _v)| k.starts_with(TABLE_IO_PREFIX))
500 .map(|(k, v)| (k[TABLE_IO_PREFIX.len()..].to_string(), v.to_string()))
501 .collect();
502
503 let table_config = Arc::new(TableConfig::try_from(configs)?);
504
505 let base_catalog_config = BaseCatalogConfig {
506 name: catalog_name.to_string(),
507 table_io_configs,
508 table_config,
509 };
510
511 log::info!("Parsed base catalog config: {:?}", base_catalog_config);
512
513 Ok(base_catalog_config)
514}