iceberg_rust/table/
mod.rs

1//! Table module provides the core functionality for working with Iceberg tables
2//!
3//! The main type in this module is [`Table`], which represents an Iceberg table and provides
4//! methods for:
5//! * Reading table data and metadata
6//! * Modifying table structure (schema, partitioning, etc.)
7//! * Managing table snapshots and branches
8//! * Performing atomic transactions
9//!
10//! Tables can be created using [`Table::builder()`] and modified using transactions
11//! created by [`Table::new_transaction()`].
12
13use std::{io::Cursor, sync::Arc};
14
15use futures::future::try_join_all;
16use itertools::Itertools;
17use manifest::ManifestReader;
18use manifest_list::read_snapshot;
19use object_store::{path::Path, ObjectStore};
20
21use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
22use iceberg_rust_spec::util::{self};
23use iceberg_rust_spec::{
24    spec::{
25        manifest::{Content, ManifestEntry},
26        manifest_list::ManifestListEntry,
27        schema::Schema,
28        table_metadata::TableMetadata,
29    },
30    table_metadata::{
31        WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
32        WRITE_PARQUET_COMPRESSION_LEVEL,
33    },
34};
35
36use tracing::{instrument, Instrument};
37
38use crate::{
39    catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
40    error::Error,
41    table::transaction::TableTransaction,
42};
43
44pub mod manifest;
45pub mod manifest_list;
46pub mod transaction;
47
48#[derive(Debug, Clone)]
49/// Iceberg table
50pub struct Table {
51    identifier: Identifier,
52    catalog: Arc<dyn Catalog>,
53    object_store: Arc<dyn ObjectStore>,
54    metadata: TableMetadata,
55}
56
57/// Public interface of the table.
58impl Table {
59    /// Creates a new table builder with default configuration
60    ///
61    /// Returns a `CreateTableBuilder` initialized with default properties:
62    /// * WRITE_PARQUET_COMPRESSION_CODEC: "zstd"
63    /// * WRITE_PARQUET_COMPRESSION_LEVEL: "3"
64    /// * WRITE_OBJECT_STORAGE_ENABLED: "false"
65    ///
66    /// # Returns
67    /// * `CreateTableBuilder` - A builder for configuring and creating a new table
68    ///
69    /// # Example
70    /// ```
71    /// use iceberg_rust::table::Table;
72    ///
73    /// let builder = Table::builder()
74    ///     .with_name("my_table")
75    ///     .with_schema(schema);
76    /// ```
77    pub fn builder() -> CreateTableBuilder {
78        let mut builder = CreateTableBuilder::default();
79        builder
80            .with_property((
81                WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
82                "zstd".to_owned(),
83            ))
84            .with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 3.to_string()))
85            .with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
86        builder
87    }
88
89    /// Creates a new table instance with the given identifier, catalog and metadata
90    ///
91    /// # Arguments
92    /// * `identifier` - The unique identifier for this table in the catalog
93    /// * `catalog` - The catalog that this table belongs to
94    /// * `metadata` - The table's metadata containing schema, partitioning, etc.
95    ///
96    /// # Returns
97    /// * `Result<Table, Error>` - The newly created table instance or an error
98    ///
99    /// This is typically called by catalog implementations rather than directly by users.
100    /// For creating new tables, use [`Table::builder()`] instead.
101    pub async fn new(
102        identifier: Identifier,
103        catalog: Arc<dyn Catalog>,
104        object_store: Arc<dyn ObjectStore>,
105        metadata: TableMetadata,
106    ) -> Result<Self, Error> {
107        Ok(Table {
108            identifier,
109            catalog,
110            object_store,
111            metadata,
112        })
113    }
114    #[inline]
115    /// Returns the unique identifier for this table in the catalog
116    ///
117    /// The identifier contains both the namespace and name that uniquely identify
118    /// this table within its catalog.
119    ///
120    /// # Returns
121    /// * `&Identifier` - A reference to this table's identifier
122    pub fn identifier(&self) -> &Identifier {
123        &self.identifier
124    }
125    #[inline]
126    /// Returns a reference to the catalog containing this table
127    ///
128    /// The returned catalog reference is wrapped in an Arc to allow shared ownership
129    /// and thread-safe access to the catalog implementation.
130    ///
131    /// # Returns
132    /// * `Arc<dyn Catalog>` - A thread-safe reference to the table's catalog
133    pub fn catalog(&self) -> Arc<dyn Catalog> {
134        self.catalog.clone()
135    }
136    #[inline]
137    /// Returns the object store for this table's location
138    ///
139    /// The object store is determined by the table's location and is used for
140    /// reading and writing table data files. The returned store is wrapped in
141    /// an Arc to allow shared ownership and thread-safe access.
142    ///
143    /// # Returns
144    /// * `Arc<dyn ObjectStore>` - A thread-safe reference to the table's object store
145    pub fn object_store(&self) -> Arc<dyn ObjectStore> {
146        self.object_store.clone()
147    }
148    #[inline]
149    /// Returns the current schema for this table, optionally for a specific branch
150    ///
151    /// # Arguments
152    /// * `branch` - Optional branch name to get the schema for. If None, returns the main branch schema
153    ///
154    /// # Returns
155    /// * `Result<&Schema, Error>` - The current schema if found, or an error if the schema cannot be found
156    ///
157    /// # Errors
158    /// Returns an error if the schema ID cannot be found in the table metadata
159    pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
160        self.metadata.current_schema(branch).map_err(Error::from)
161    }
162    #[inline]
163    /// Returns a reference to this table's metadata
164    ///
165    /// The metadata contains all table information including:
166    /// * Schema definitions
167    /// * Partition specifications
168    /// * Snapshots
169    /// * Sort orders
170    /// * Table properties
171    ///
172    /// # Returns
173    /// * `&TableMetadata` - A reference to the table's metadata
174    pub fn metadata(&self) -> &TableMetadata {
175        &self.metadata
176    }
177    #[inline]
178    /// Consumes the table and returns its metadata
179    ///
180    /// This method takes ownership of the table instance and returns just the
181    /// underlying TableMetadata. This is useful when you no longer need the
182    /// table instance but want to retain its metadata.
183    ///
184    /// # Returns
185    /// * `TableMetadata` - The owned metadata from this table
186    pub fn into_metadata(self) -> TableMetadata {
187        self.metadata
188    }
189    /// Returns manifest list entries for snapshots within the given sequence range
190    ///
191    /// # Arguments
192    /// * `start` - Optional starting snapshot ID (exclusive). If None, includes from the beginning
193    /// * `end` - Optional ending snapshot ID (inclusive). If None, uses the current snapshot
194    ///
195    /// # Returns
196    /// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
197    ///   or an empty vector if no current snapshot exists
198    ///
199    /// # Errors
200    /// Returns an error if:
201    /// * The end snapshot ID is invalid
202    /// * Reading the manifest list fails
203    #[instrument(name = "iceberg_rust::table::manifests", level = "debug", skip(self), fields(
204        table_identifier = %self.identifier,
205        start = ?start,
206        end = ?end
207    ))]
208    pub async fn manifests(
209        &self,
210        start: Option<i64>,
211        end: Option<i64>,
212    ) -> Result<Vec<ManifestListEntry>, Error> {
213        let metadata = self.metadata();
214        let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
215            Some(snapshot) => snapshot,
216            None => {
217                if let Some(current) = metadata.current_snapshot(None)? {
218                    current
219                } else {
220                    return Ok(vec![]);
221                }
222            }
223        };
224        let start_sequence_number =
225            start
226                .and_then(|id| metadata.snapshots.get(&id))
227                .and_then(|snapshot| {
228                    let sequence_number = *snapshot.sequence_number();
229                    if sequence_number == 0 {
230                        None
231                    } else {
232                        Some(sequence_number)
233                    }
234                });
235        let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
236        match start_sequence_number {
237            Some(start) => iter
238                .filter_ok(|manifest| manifest.sequence_number > start)
239                .collect(),
240            None => iter.collect(),
241        }
242    }
243    /// Returns a stream of manifest entries for the given manifest list entries
244    ///
245    /// # Arguments
246    /// * `manifests` - List of manifest entries to read data files from
247    /// * `filter` - Optional vector of boolean predicates to filter manifest entries
248    /// * `sequence_number_range` - Tuple of (start, end) sequence numbers to filter entries by
249    ///
250    /// # Returns
251    /// * `Result<impl Stream<Item = Result<ManifestEntry, Error>>, Error>` - Stream of manifest entries
252    ///   that match the given filters
253    ///
254    /// # Type Parameters
255    /// * `'a` - Lifetime of the manifest list entries reference
256    ///
257    /// # Errors
258    /// Returns an error if reading any manifest file fails
259    #[inline]
260    pub async fn datafiles<'a>(
261        &self,
262        manifests: &'a [ManifestListEntry],
263        filter: Option<Vec<bool>>,
264        sequence_number_range: (Option<i64>, Option<i64>),
265    ) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error>
266    {
267        datafiles(
268            self.object_store(),
269            manifests,
270            filter,
271            sequence_number_range,
272        )
273        .await
274    }
275    /// Check if datafiles contain deletes
276    pub async fn datafiles_contains_delete(
277        &self,
278        start: Option<i64>,
279        end: Option<i64>,
280    ) -> Result<bool, Error> {
281        let manifests = self.manifests(start, end).await?;
282        let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
283        stream::iter(datafiles)
284            .try_any(|entry| async move { !matches!(entry.1.data_file().content(), Content::Data) })
285            .await
286    }
287    /// Creates a new transaction for atomic modifications to this table
288    ///
289    /// # Arguments
290    /// * `branch` - Optional branch name to create the transaction for. If None, uses the main branch
291    ///
292    /// # Returns
293    /// * `TableTransaction` - A new transaction that can be used to atomically modify this table
294    ///
295    /// The transaction must be committed for any changes to take effect.
296    /// Multiple operations can be chained within a single transaction.
297    pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction<'_> {
298        TableTransaction::new(self, branch)
299    }
300}
301
302/// Path of a Manifest file
303pub type ManifestPath = String;
304
305#[instrument(name = "iceberg_rust::table::datafiles", level = "debug", skip(object_store, manifests), fields(
306    manifest_count = manifests.len(),
307    filter_provided = filter.is_some(),
308    sequence_range = ?sequence_number_range
309))]
310async fn datafiles(
311    object_store: Arc<dyn ObjectStore>,
312    manifests: &'_ [ManifestListEntry],
313    filter: Option<Vec<bool>>,
314    sequence_number_range: (Option<i64>, Option<i64>),
315) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
316    // filter manifest files according to filter vector
317    let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
318        Some(predicate) => {
319            let iter = manifests
320                .iter()
321                .zip(predicate.into_iter())
322                .filter(|(_, predicate)| *predicate)
323                .map(|(manifest, _)| manifest);
324            Box::new(iter)
325        }
326        None => Box::new(manifests.iter()),
327    };
328
329    let futures: Vec<_> = iter
330        .map(move |file| {
331            let object_store = object_store.clone();
332            async move {
333                let manifest_path = &file.manifest_path;
334                let path: Path = util::strip_prefix(manifest_path).into();
335                let bytes = Cursor::new(Vec::from(
336                    object_store
337                        .get(&path)
338                        .and_then(|file| file.bytes())
339                        .instrument(tracing::trace_span!("iceberg_rust::get_manifest"))
340                        .await?,
341                ));
342                Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
343            }
344        })
345        .collect();
346
347    let results = try_join_all(futures).await?;
348
349    Ok(results.into_iter().flat_map(move |result| {
350        let (bytes, path, sequence_number) = result;
351
352        let reader = ManifestReader::new(bytes).unwrap();
353        reader.filter_map(move |x| {
354            let mut x = match x {
355                Ok(entry) => entry,
356                Err(_) => return None,
357            };
358
359            let sequence_number = if let Some(sequence_number) = x.sequence_number() {
360                *sequence_number
361            } else {
362                *x.sequence_number_mut() = Some(sequence_number);
363                sequence_number
364            };
365
366            let filter = match sequence_number_range {
367                (Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
368                (Some(start), None) => start < sequence_number,
369                (None, Some(end)) => sequence_number <= end,
370                _ => true,
371            };
372            if filter {
373                Some(Ok((path.to_owned(), x)))
374            } else {
375                None
376            }
377        })
378    }))
379}
380
381/// delete all datafiles, manifests and metadata files, does not remove table from catalog
382pub(crate) async fn delete_all_table_files(
383    metadata: &TableMetadata,
384    object_store: Arc<dyn ObjectStore>,
385) -> Result<(), Error> {
386    let Some(snapshot) = metadata.current_snapshot(None)? else {
387        return Ok(());
388    };
389    let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
390        .await?
391        .collect::<Result<_, _>>()?;
392
393    let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
394    let snapshots = &metadata.snapshots;
395
396    // stream::iter(datafiles.into_iter())
397    stream::iter(datafiles)
398        .try_for_each_concurrent(None, |datafile| {
399            let object_store = object_store.clone();
400            async move {
401                object_store
402                    .delete(&datafile.1.data_file().file_path().as_str().into())
403                    .await?;
404                Ok(())
405            }
406        })
407        .await?;
408
409    stream::iter(manifests.into_iter())
410        .map(Ok::<_, Error>)
411        .try_for_each_concurrent(None, |manifest| {
412            let object_store = object_store.clone();
413            async move {
414                object_store.delete(&manifest.manifest_path.into()).await?;
415                Ok(())
416            }
417        })
418        .await?;
419
420    stream::iter(snapshots.values())
421        .map(Ok::<_, Error>)
422        .try_for_each_concurrent(None, |snapshot| {
423            let object_store = object_store.clone();
424            async move {
425                object_store
426                    .delete(&snapshot.manifest_list().as_str().into())
427                    .await?;
428                Ok(())
429            }
430        })
431        .await?;
432
433    Ok(())
434}