Skip to main content

iceberg_rust/table/
mod.rs

1//! Table module provides the core functionality for working with Iceberg tables
2//!
3//! The main type in this module is [`Table`], which represents an Iceberg table and provides
4//! methods for:
5//! * Reading table data and metadata
6//! * Modifying table structure (schema, partitioning, etc.)
7//! * Managing table snapshots and branches
8//! * Performing atomic transactions
9//!
10//! Tables can be created using [`Table::builder()`] and modified using transactions
11//! created by [`Table::new_transaction()`].
12
13use std::{io::Cursor, sync::Arc};
14
15use futures::future::try_join_all;
16use itertools::Itertools;
17use manifest::ManifestReader;
18use manifest_list::read_snapshot;
19use object_store::ObjectStoreExt;
20use object_store::{path::Path, ObjectStore};
21
22use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
23use iceberg_rust_spec::util::{self};
24use iceberg_rust_spec::{
25    spec::{
26        manifest::{Content, ManifestEntry},
27        manifest_list::ManifestListEntry,
28        schema::Schema,
29        table_metadata::TableMetadata,
30    },
31    table_metadata::{
32        WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
33        WRITE_PARQUET_COMPRESSION_LEVEL,
34    },
35};
36
37use tracing::{instrument, Instrument};
38
39use crate::{
40    catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
41    error::Error,
42    table::transaction::TableTransaction,
43};
44
45pub mod manifest;
46pub mod manifest_list;
47pub mod transaction;
48
49#[derive(Debug, Clone)]
50/// Iceberg table
51pub struct Table {
52    identifier: Identifier,
53    catalog: Arc<dyn Catalog>,
54    object_store: Arc<dyn ObjectStore>,
55    metadata: TableMetadata,
56}
57
58/// Public interface of the table.
59impl Table {
60    /// Creates a new table builder with default configuration
61    ///
62    /// Returns a `CreateTableBuilder` initialized with default properties:
63    /// * WRITE_PARQUET_COMPRESSION_CODEC: "zstd"
64    /// * WRITE_PARQUET_COMPRESSION_LEVEL: "3"
65    /// * WRITE_OBJECT_STORAGE_ENABLED: "false"
66    ///
67    /// # Returns
68    /// * `CreateTableBuilder` - A builder for configuring and creating a new table
69    ///
70    /// # Example
71    /// ```
72    /// use iceberg_rust::table::Table;
73    ///
74    /// let builder = Table::builder()
75    ///     .with_name("my_table")
76    ///     .with_schema(schema);
77    /// ```
78    pub fn builder() -> CreateTableBuilder {
79        let mut builder = CreateTableBuilder::default();
80        builder
81            .with_property((
82                WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
83                "zstd".to_owned(),
84            ))
85            .with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 3.to_string()))
86            .with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
87        builder
88    }
89
90    /// Creates a new table instance with the given identifier, catalog and metadata
91    ///
92    /// # Arguments
93    /// * `identifier` - The unique identifier for this table in the catalog
94    /// * `catalog` - The catalog that this table belongs to
95    /// * `metadata` - The table's metadata containing schema, partitioning, etc.
96    ///
97    /// # Returns
98    /// * `Result<Table, Error>` - The newly created table instance or an error
99    ///
100    /// This is typically called by catalog implementations rather than directly by users.
101    /// For creating new tables, use [`Table::builder()`] instead.
102    pub async fn new(
103        identifier: Identifier,
104        catalog: Arc<dyn Catalog>,
105        object_store: Arc<dyn ObjectStore>,
106        metadata: TableMetadata,
107    ) -> Result<Self, Error> {
108        Ok(Table {
109            identifier,
110            catalog,
111            object_store,
112            metadata,
113        })
114    }
115    #[inline]
116    /// Returns the unique identifier for this table in the catalog
117    ///
118    /// The identifier contains both the namespace and name that uniquely identify
119    /// this table within its catalog.
120    ///
121    /// # Returns
122    /// * `&Identifier` - A reference to this table's identifier
123    pub fn identifier(&self) -> &Identifier {
124        &self.identifier
125    }
126    #[inline]
127    /// Returns a reference to the catalog containing this table
128    ///
129    /// The returned catalog reference is wrapped in an Arc to allow shared ownership
130    /// and thread-safe access to the catalog implementation.
131    ///
132    /// # Returns
133    /// * `Arc<dyn Catalog>` - A thread-safe reference to the table's catalog
134    pub fn catalog(&self) -> Arc<dyn Catalog> {
135        self.catalog.clone()
136    }
137    #[inline]
138    /// Returns the object store for this table's location
139    ///
140    /// The object store is determined by the table's location and is used for
141    /// reading and writing table data files. The returned store is wrapped in
142    /// an Arc to allow shared ownership and thread-safe access.
143    ///
144    /// # Returns
145    /// * `Arc<dyn ObjectStore>` - A thread-safe reference to the table's object store
146    pub fn object_store(&self) -> Arc<dyn ObjectStore> {
147        self.object_store.clone()
148    }
149    #[inline]
150    /// Returns the current schema for this table, optionally for a specific branch
151    ///
152    /// # Arguments
153    /// * `branch` - Optional branch name to get the schema for. If None, returns the main branch schema
154    ///
155    /// # Returns
156    /// * `Result<&Schema, Error>` - The current schema if found, or an error if the schema cannot be found
157    ///
158    /// # Errors
159    /// Returns an error if the schema ID cannot be found in the table metadata
160    pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
161        self.metadata.current_schema(branch).map_err(Error::from)
162    }
163    #[inline]
164    /// Returns a reference to this table's metadata
165    ///
166    /// The metadata contains all table information including:
167    /// * Schema definitions
168    /// * Partition specifications
169    /// * Snapshots
170    /// * Sort orders
171    /// * Table properties
172    ///
173    /// # Returns
174    /// * `&TableMetadata` - A reference to the table's metadata
175    pub fn metadata(&self) -> &TableMetadata {
176        &self.metadata
177    }
178    #[inline]
179    /// Consumes the table and returns its metadata
180    ///
181    /// This method takes ownership of the table instance and returns just the
182    /// underlying TableMetadata. This is useful when you no longer need the
183    /// table instance but want to retain its metadata.
184    ///
185    /// # Returns
186    /// * `TableMetadata` - The owned metadata from this table
187    pub fn into_metadata(self) -> TableMetadata {
188        self.metadata
189    }
190    /// Returns manifest list entries for snapshots within the given sequence range
191    ///
192    /// # Arguments
193    /// * `start` - Optional starting snapshot ID (exclusive). If None, includes from the beginning
194    /// * `end` - Optional ending snapshot ID (inclusive). If None, uses the current snapshot
195    ///
196    /// # Returns
197    /// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
198    ///   or an empty vector if no current snapshot exists
199    ///
200    /// # Errors
201    /// Returns an error if:
202    /// * The end snapshot ID is invalid
203    /// * Reading the manifest list fails
204    #[instrument(name = "iceberg_rust::table::manifests", level = "debug", skip(self), fields(
205        table_identifier = %self.identifier,
206        start = ?start,
207        end = ?end
208    ))]
209    pub async fn manifests(
210        &self,
211        start: Option<i64>,
212        end: Option<i64>,
213    ) -> Result<Vec<ManifestListEntry>, Error> {
214        let metadata = self.metadata();
215        let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
216            Some(snapshot) => snapshot,
217            None => {
218                if let Some(current) = metadata.current_snapshot(None)? {
219                    current
220                } else {
221                    return Ok(vec![]);
222                }
223            }
224        };
225        let start_sequence_number =
226            start
227                .and_then(|id| metadata.snapshots.get(&id))
228                .and_then(|snapshot| {
229                    let sequence_number = *snapshot.sequence_number();
230                    if sequence_number == 0 {
231                        None
232                    } else {
233                        Some(sequence_number)
234                    }
235                });
236        let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
237        match start_sequence_number {
238            Some(start) => iter
239                .filter_ok(|manifest| manifest.sequence_number > start)
240                .collect(),
241            None => iter.collect(),
242        }
243    }
244    /// Returns a stream of manifest entries for the given manifest list entries
245    ///
246    /// # Arguments
247    /// * `manifests` - List of manifest entries to read data files from
248    /// * `filter` - Optional vector of boolean predicates to filter manifest entries
249    /// * `sequence_number_range` - Tuple of (start, end) sequence numbers to filter entries by
250    ///
251    /// # Returns
252    /// * `Result<impl Stream<Item = Result<ManifestEntry, Error>>, Error>` - Stream of manifest entries
253    ///   that match the given filters
254    ///
255    /// # Type Parameters
256    /// * `'a` - Lifetime of the manifest list entries reference
257    ///
258    /// # Errors
259    /// Returns an error if reading any manifest file fails
260    #[inline]
261    pub async fn datafiles<'a>(
262        &self,
263        manifests: &'a [ManifestListEntry],
264        filter: Option<Vec<bool>>,
265        sequence_number_range: (Option<i64>, Option<i64>),
266    ) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error>
267    {
268        datafiles(
269            self.object_store(),
270            manifests,
271            filter,
272            sequence_number_range,
273        )
274        .await
275    }
276    /// Check if datafiles contain deletes
277    pub async fn datafiles_contains_delete(
278        &self,
279        start: Option<i64>,
280        end: Option<i64>,
281    ) -> Result<bool, Error> {
282        let manifests = self.manifests(start, end).await?;
283        let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
284        stream::iter(datafiles)
285            .try_any(|entry| async move { !matches!(entry.1.data_file().content(), Content::Data) })
286            .await
287    }
288    /// Creates a new transaction for atomic modifications to this table
289    ///
290    /// # Arguments
291    /// * `branch` - Optional branch name to create the transaction for. If None, uses the main branch
292    ///
293    /// # Returns
294    /// * `TableTransaction` - A new transaction that can be used to atomically modify this table
295    ///
296    /// The transaction must be committed for any changes to take effect.
297    /// Multiple operations can be chained within a single transaction.
298    pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction<'_> {
299        TableTransaction::new(self, branch)
300    }
301}
302
303/// Path of a Manifest file
304pub type ManifestPath = String;
305
306#[instrument(name = "iceberg_rust::table::datafiles", level = "debug", skip(object_store, manifests), fields(
307    manifest_count = manifests.len(),
308    filter_provided = filter.is_some(),
309    sequence_range = ?sequence_number_range
310))]
311async fn datafiles(
312    object_store: Arc<dyn ObjectStore>,
313    manifests: &'_ [ManifestListEntry],
314    filter: Option<Vec<bool>>,
315    sequence_number_range: (Option<i64>, Option<i64>),
316) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
317    // filter manifest files according to filter vector
318    let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
319        Some(predicate) => {
320            let iter = manifests
321                .iter()
322                .zip(predicate.into_iter())
323                .filter(|(_, predicate)| *predicate)
324                .map(|(manifest, _)| manifest);
325            Box::new(iter)
326        }
327        None => Box::new(manifests.iter()),
328    };
329
330    let futures: Vec<_> = iter
331        .map(move |file| {
332            let object_store = object_store.clone();
333            async move {
334                let manifest_path = &file.manifest_path;
335                let path: Path = util::strip_prefix(manifest_path).into();
336                let bytes = Cursor::new(Vec::from(
337                    object_store
338                        .get(&path)
339                        .and_then(|file| file.bytes())
340                        .instrument(tracing::trace_span!("iceberg_rust::get_manifest"))
341                        .await?,
342                ));
343                Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
344            }
345        })
346        .collect();
347
348    let results = try_join_all(futures).await?;
349
350    Ok(results.into_iter().flat_map(move |result| {
351        let (bytes, path, sequence_number) = result;
352
353        let reader = ManifestReader::new(bytes).unwrap();
354        reader.filter_map(move |x| {
355            let mut x = match x {
356                Ok(entry) => entry,
357                Err(_) => return None,
358            };
359
360            let sequence_number = if let Some(sequence_number) = x.sequence_number() {
361                *sequence_number
362            } else {
363                *x.sequence_number_mut() = Some(sequence_number);
364                sequence_number
365            };
366
367            let filter = match sequence_number_range {
368                (Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
369                (Some(start), None) => start < sequence_number,
370                (None, Some(end)) => sequence_number <= end,
371                _ => true,
372            };
373            if filter {
374                Some(Ok((path.to_owned(), x)))
375            } else {
376                None
377            }
378        })
379    }))
380}
381
382/// delete all datafiles, manifests and metadata files, does not remove table from catalog
383pub(crate) async fn delete_all_table_files(
384    metadata: &TableMetadata,
385    object_store: Arc<dyn ObjectStore>,
386) -> Result<(), Error> {
387    let Some(snapshot) = metadata.current_snapshot(None)? else {
388        return Ok(());
389    };
390    let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
391        .await?
392        .collect::<Result<_, _>>()?;
393
394    let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
395    let snapshots = &metadata.snapshots;
396
397    // stream::iter(datafiles.into_iter())
398    stream::iter(datafiles)
399        .try_for_each_concurrent(None, |datafile| {
400            let object_store = object_store.clone();
401            async move {
402                object_store
403                    .delete(&datafile.1.data_file().file_path().as_str().into())
404                    .await?;
405                Ok(())
406            }
407        })
408        .await?;
409
410    stream::iter(manifests.into_iter())
411        .map(Ok::<_, Error>)
412        .try_for_each_concurrent(None, |manifest| {
413            let object_store = object_store.clone();
414            async move {
415                object_store.delete(&manifest.manifest_path.into()).await?;
416                Ok(())
417            }
418        })
419        .await?;
420
421    stream::iter(snapshots.values())
422        .map(Ok::<_, Error>)
423        .try_for_each_concurrent(None, |snapshot| {
424            let object_store = object_store.clone();
425            async move {
426                object_store
427                    .delete(&snapshot.manifest_list().as_str().into())
428                    .await?;
429                Ok(())
430            }
431        })
432        .await?;
433
434    Ok(())
435}