icelake/catalog/
mod.rs

1//! This module defines catalog api for icelake.
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7
8use enum_display::EnumDisplay;
9use uuid::Uuid;
10
11use crate::config::{TableConfig, TableConfigRef};
12use crate::error::Result;
13use crate::table::{Namespace, TableIdentifier};
14use crate::types::{
15    PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, SnapshotReferenceType,
16    SortOrder, TableMetadata,
17};
18use crate::Table;
19
20mod rest;
21
22pub use rest::*;
23
24mod storage;
25
26use crate::error::{Error, ErrorKind};
27pub use storage::*;
28
29mod io;
30
31pub use io::*;
32
33mod layer;
34#[cfg(feature = "prometheus")]
35pub mod prometheus;
36
37pub use layer::*;
38
39/// Reference to catalog.
40pub type CatalogRef = Arc<dyn Catalog>;
41
42/// Catalog definition.
43#[async_trait]
44pub trait Catalog: Send + Sync {
45    /// Return catalog's name.
46    fn name(&self) -> &str;
47
48    /// List tables under namespace.
49    async fn list_tables(self: Arc<Self>, _ns: &Namespace) -> Result<Vec<TableIdentifier>> {
50        Err(Error::new(
51            ErrorKind::IcebergFeatureUnsupported,
52            format!("list_tables is not supported by {}", self.name()),
53        ))
54    }
55
56    /// Creates a table.
57    async fn create_table(
58        self: Arc<Self>,
59        _table_name: &TableIdentifier,
60        _schema: &Schema,
61        _spec: &PartitionSpec,
62        _location: &str,
63        _props: HashMap<String, String>,
64    ) -> Result<Table> {
65        Err(Error::new(
66            ErrorKind::IcebergFeatureUnsupported,
67            format!("create_table is not supported by {}", self.name()),
68        ))
69    }
70
71    /// Check table exists.
72    async fn table_exists(self: Arc<Self>, _table_name: &TableIdentifier) -> Result<bool> {
73        Err(Error::new(
74            ErrorKind::IcebergFeatureUnsupported,
75            format!("table_exists is not supported by {}", self.name()),
76        ))
77    }
78
79    /// Drop table.
80    async fn drop_table(
81        self: Arc<Self>,
82        _table_name: &TableIdentifier,
83        _purge: bool,
84    ) -> Result<()> {
85        Err(Error::new(
86            ErrorKind::IcebergFeatureUnsupported,
87            format!("drop_table is not supported by {}", self.name()),
88        ))
89    }
90
91    /// Rename table.
92    async fn rename_table(
93        self: Arc<Self>,
94        _from: &TableIdentifier,
95        _to: &TableIdentifier,
96    ) -> Result<()> {
97        Err(Error::new(
98            ErrorKind::IcebergFeatureUnsupported,
99            format!("rename_table is not supported by {}", self.name()),
100        ))
101    }
102
103    /// Load table.
104    async fn load_table(self: Arc<Self>, _table_name: &TableIdentifier) -> Result<Table> {
105        Err(Error::new(
106            ErrorKind::IcebergFeatureUnsupported,
107            format!("load_table is not supported by {}", self.name()),
108        ))
109    }
110
111    /// Invalidate table.
112    async fn invalidate_table(self: Arc<Self>, _table_name: &TableIdentifier) -> Result<()> {
113        Err(Error::new(
114            ErrorKind::IcebergFeatureUnsupported,
115            format!("invalidate_table is not supported by {}", self.name()),
116        ))
117    }
118
119    /// Register a table using metadata file location.
120    async fn register_table(
121        self: Arc<Self>,
122        _table_name: &TableIdentifier,
123        _metadata_file_location: &str,
124    ) -> Result<Table> {
125        Err(Error::new(
126            ErrorKind::IcebergFeatureUnsupported,
127            format!("register_table is not supported by {}", self.name()),
128        ))
129    }
130
131    /// Update table.
132    async fn update_table(self: Arc<Self>, _update_table: &UpdateTable) -> Result<Table> {
133        Err(Error::new(
134            ErrorKind::IcebergFeatureUnsupported,
135            format!("update_table is not supported by {}", self.name()),
136        ))
137    }
138}
139
140/// Update table requirements
141#[derive(Debug, Clone, EnumDisplay)]
142pub enum UpdateRequirement {
143    /// Requirest table exists.
144    AssertTableDoesNotExist,
145    /// Requirest current table's uuid .
146    AssertTableUUID(Uuid),
147    /// Requirest current table branch's snapshot id.
148    AssertRefSnapshotID {
149        /// Branch name
150        name: String,
151        /// Snapshot id
152        snapshot_id: i64,
153    },
154    /// Requirest current table's last assigned field id.
155    AssertLastAssignedFieldId {
156        /// Last assigned field id.
157        last_assigned_field_id: i32,
158    },
159    /// Requirest current table's schema id.
160    AssertCurrentSchemaID {
161        /// Schema id
162        schema_id: i32,
163    },
164    /// Requirest current table's last assigned partition id.
165    AssertLastAssignedPartitionId {
166        /// Partition id.
167        last_assigned_partition_id: i32,
168    },
169    /// Requirest current table's default spec assigned partition id.
170    AssertDefaultSpecID {
171        /// Spec id
172        spec_id: i32,
173    },
174    /// Requirest current table's default spec sort order id.
175    AssertDefaultSortOrderID {
176        /// Sort order id.
177        sort_order_id: i32,
178    },
179}
180
181impl UpdateRequirement {
182    fn check(&self, table_metadata: &TableMetadata) -> bool {
183        match self {
184            UpdateRequirement::AssertTableDoesNotExist => false,
185            UpdateRequirement::AssertTableUUID(uuid) => {
186                table_metadata.table_uuid == uuid.to_string()
187            }
188            _ => todo!(),
189        }
190    }
191}
192
193/// Metadata updates.
194#[derive(Debug, Clone, EnumDisplay)]
195pub enum MetadataUpdate {
196    /// Assign uuid.
197    AssignUuid(Uuid),
198    /// Upgrade format version.
199    UpgradeFormatVersion(i32),
200    /// Add schema
201    AddSchema {
202        /// New schema
203        schema: Schema,
204        /// Last column id
205        last_column_id: i32,
206    },
207    /// Set current schema id.
208    SetCurrentSchema {
209        /// Schema id.
210        schema_id: i32,
211    },
212    /// Add partition spec
213    AddPartitionSpec {
214        /// Spec id
215        spec_id: i32,
216        /// Partition fields
217        fields: Vec<PartitionField>,
218    },
219    /// Set default partition spec.
220    SetDefaultPartitionSpec {
221        /// partition spec id
222        spec_id: i32,
223    },
224    /// Add sort order.
225    AddSortOrder {
226        /// Sort order
227        sort_order: SortOrder,
228    },
229    /// Set default sort order
230    SetDefaultSortOrder {
231        /// Sort order id
232        sort_order_id: i32,
233    },
234    /// Add snapshot
235    AddSnapshot {
236        /// Snapshot
237        snapshot: Snapshot,
238    },
239    /// Remove snapshot
240    RemoveSnapshot {
241        /// Snapshot id
242        snapshot_id: i64,
243    },
244    /// Remove snapshot ref
245    RemoveSnapshotRef {
246        /// Ref name.
247        ref_name: String,
248    },
249    /// Update snapshot reference.
250    SetSnapshotRef {
251        /// Branch name
252        ref_name: String,
253        /// Snapshot shot id.
254        snapshot_id: i64,
255        /// Type
256        typ: SnapshotReferenceType,
257        /// Number of snapshots to keep.
258        min_snapshots_to_keep: Option<i32>,
259        /// Max snapshot ages
260        max_snapshot_ages: Option<i64>,
261        /// Max ref ages
262        max_ref_ages: Option<i64>,
263    },
264    /// Update table properties.
265    SetProperties {
266        /// Table properties.
267        props: HashMap<String, String>,
268    },
269    /// Remove table properties.
270    RemoveProperties {
271        /// Keys to remove.
272        removed: HashSet<String>,
273    },
274    /// Set table location
275    SetLocation {
276        /// Table Location
277        location: String,
278    },
279}
280
281impl MetadataUpdate {
282    fn apply(&self, metadata: &mut TableMetadata) -> Result<()> {
283        match self {
284            MetadataUpdate::AddSnapshot { snapshot } => metadata.add_snapshot(snapshot.clone()),
285            MetadataUpdate::SetSnapshotRef {
286                ref_name,
287                snapshot_id,
288                typ,
289                min_snapshots_to_keep,
290                max_snapshot_ages,
291                max_ref_ages,
292            } => metadata.set_snapshot_ref(
293                ref_name.as_str(),
294                SnapshotReference {
295                    snapshot_id: *snapshot_id,
296                    typ: *typ,
297                    min_snapshots_to_keep: *min_snapshots_to_keep,
298                    max_snapshot_age_ms: *max_snapshot_ages,
299                    max_ref_age_ms: *max_ref_ages,
300                },
301            ),
302            other => Err(Error::new(
303                ErrorKind::IcebergFeatureUnsupported,
304                format!("update {other} is not supported"),
305            )),
306        }
307    }
308}
309
310/// Update table request.
311pub struct UpdateTable {
312    table_name: TableIdentifier,
313    requirements: Vec<UpdateRequirement>,
314    updates: Vec<MetadataUpdate>,
315}
316
317/// Update table builder.
318pub struct UpdateTableBuilder(UpdateTable);
319
320impl UpdateTable {
321    /// Creates a update table builder.
322    pub fn builder(table_name: TableIdentifier) -> UpdateTableBuilder {
323        UpdateTableBuilder(UpdateTable {
324            table_name,
325            requirements: vec![],
326            updates: vec![],
327        })
328    }
329
330    /// Get table name.
331    pub fn table_name(&self) -> &TableIdentifier {
332        &self.table_name
333    }
334
335    /// Get update requirements.
336    pub fn requirements(&self) -> &[UpdateRequirement] {
337        &self.requirements
338    }
339
340    /// Get updates.
341    pub fn updates(&self) -> &[MetadataUpdate] {
342        &self.updates
343    }
344}
345
346impl UpdateTableBuilder {
347    /// Add requirements.
348    pub fn add_requirements(
349        &mut self,
350        requirements: impl IntoIterator<Item = UpdateRequirement>,
351    ) -> &mut Self {
352        self.0.requirements.extend(requirements);
353        self
354    }
355
356    /// Add updates.
357    pub fn add_updates(&mut self, updates: impl IntoIterator<Item = MetadataUpdate>) -> &mut Self {
358        self.0.updates.extend(updates);
359        self
360    }
361
362    /// Build.
363    pub fn build(self) -> UpdateTable {
364        self.0
365    }
366}
367
368/// Catalog type: rest, storage.
369pub const CATALOG_TYPE: &str = "iceberg.catalog.type";
370/// Catalog name
371pub const CATALOG_NAME: &str = "iceberg.catalog.name";
372pub(crate) const CATALOG_CONFIG_PREFIX: &str = "iceberg.catalog.";
373const TABLE_IO_PREFIX: &str = "iceberg.table.io.";
374
375/// Base catalog config.
376#[derive(Debug, Default)]
377pub struct BaseCatalogConfig {
378    /// Name of catalog.
379    pub name: String,
380    /// Table io configs.
381    pub table_io_configs: HashMap<String, String>,
382    /// Table config.
383    pub table_config: TableConfigRef,
384}
385
386/// Load catalog from configuration.
387///
388/// The following two configurations must be provides:
389///
390/// - [`CATALOG_TYPE`]: Type of catalog, must be one of `storage`, `rest`.
391/// - [`CATALOG_NAME`]: Name of catalog.
392///
393/// ## Catalog specific configuration.
394///
395/// Catalog specific configurations are prefixed with `iceberg.catalog.<catalog name>`.
396/// For example, if catalog name is `demo`, then all catalog specific configuration keys must be prefixed with `iceberg.catalog.demo.`.:
397///
398/// ### Storage catalog
399///
400/// Currently the only required configuration is `iceberg.catalog.demo.warehouse`, which is the root path of warehouse.
401///
402/// ### Rest catalog
403///
404/// Currently the only required configuration is `iceberg.catalog.demo.uri`, which is the uri of rest catalog server.
405///
406/// ## IO Configuration
407///
408/// All configurations for table io are prefixed with `iceberg.table.io.`.
409/// For example if underlying storage is s3, then we can use the following configurations can be provided:
410///
411/// - `iceberg.table.io.region`: Region of s3.
412/// - `iceberg.table.io.endpoint`: Endpoint of s3.
413///
414/// ## Table reader/writer configuration.
415///
416/// User can control the behavior of table reader/writer by providing configurations, see [`TableConfig`] for details.
417///
418/// # Examples
419///
420/// ## Rest catalog
421///
422/// ```text
423/// iceberg.catalog.name=demo # required
424/// iceberg.catalog.type=rest # required
425///
426/// iceberg.catalog.demo.uri = http://localhost:9090 # required
427///
428/// ## Configurations for s3
429/// iceberg.table.io.region = us-east-1
430/// iceberg.table.io.endpoint = http://minio:9000
431/// iceberg.table.io.bucket = icebergdata
432/// iceberg.table.io.root = demo
433/// iceberg.table.io.access_key_id=admin
434/// iceberg.table.io.secret_access_key=password
435///
436/// ## Configurations for table reader/writer, following are optional.
437/// iceberg.table.parquet_writer.enable_bloom_filter = true
438/// ```
439///
440/// ## Storage catalog
441///
442/// ```text
443/// iceberg.catalog.name=demo # required
444/// iceberg.catalog.type=storage # required
445///
446/// iceberg.catalog.demo.warehouse = s3://icebergdata/demo # required
447///
448/// # Configuration for s3
449/// iceberg.table.io.region=us-east-1
450/// iceberg.table.io.endpoint=http://localhost:8181
451/// iceberg.table.io.bucket = icebergdata
452/// iceberg.table.io.root = demo
453/// iceberg.table.io.access_key_id = admin
454/// iceberg.table.io.secret_access_key = password
455///
456/// ## Configurations for table reader/writer, following are optional.
457/// iceberg.table.parquet_writer.enable_bloom_filter = true
458/// ```
459pub async fn load_catalog(configs: &HashMap<String, String>) -> Result<CatalogRef> {
460    log::info!("Loading catalog from configs: {:?}", configs);
461    let catalog_type = configs.get(CATALOG_TYPE).ok_or_else(|| {
462        Error::new(
463            ErrorKind::IcebergDataInvalid,
464            format!("{CATALOG_TYPE} is not set"),
465        )
466    })?;
467
468    let base_catalog_config = load_iceberg_base_catalog_config(configs)?;
469
470    match catalog_type.as_str() {
471        "storage" => Ok(Arc::new(
472            StorageCatalog::from_config(base_catalog_config, configs).await?,
473        )),
474        "rest" => Ok(Arc::new(
475            RestCatalog::new(base_catalog_config, configs).await?,
476        )),
477        _ => Err(Error::new(
478            ErrorKind::IcebergDataInvalid,
479            format!("Unsupported catalog type: {catalog_type}"),
480        )),
481    }
482}
483
484/// Load base catalog config from configuration.
485pub fn load_iceberg_base_catalog_config(
486    configs: &HashMap<String, String>,
487) -> Result<BaseCatalogConfig> {
488    log::info!("Loading base catalog config from configs: {:?}", configs);
489
490    let catalog_name = configs.get(CATALOG_NAME).ok_or_else(|| {
491        Error::new(
492            ErrorKind::IcebergDataInvalid,
493            format!("{CATALOG_NAME} is not set"),
494        )
495    })?;
496
497    let table_io_configs = configs
498        .iter()
499        .filter(|(k, _v)| k.starts_with(TABLE_IO_PREFIX))
500        .map(|(k, v)| (k[TABLE_IO_PREFIX.len()..].to_string(), v.to_string()))
501        .collect();
502
503    let table_config = Arc::new(TableConfig::try_from(configs)?);
504
505    let base_catalog_config = BaseCatalogConfig {
506        name: catalog_name.to_string(),
507        table_io_configs,
508        table_config,
509    };
510
511    log::info!("Parsed base catalog config: {:?}", base_catalog_config);
512
513    Ok(base_catalog_config)
514}