iceberg_rust/table/
mod.rs

1//! Table module provides the core functionality for working with Iceberg tables
2//!
3//! The main type in this module is [`Table`], which represents an Iceberg table and provides
4//! methods for:
5//! * Reading table data and metadata
6//! * Modifying table structure (schema, partitioning, etc.)
7//! * Managing table snapshots and branches
8//! * Performing atomic transactions
9//!
10//! Tables can be created using [`Table::builder()`] and modified using transactions
11//! created by [`Table::new_transaction()`].
12
13use std::{io::Cursor, sync::Arc};
14
15use futures::future;
16use itertools::Itertools;
17use manifest::ManifestReader;
18use manifest_list::read_snapshot;
19use object_store::{path::Path, ObjectStore};
20
21use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
22use iceberg_rust_spec::util::{self};
23use iceberg_rust_spec::{
24    spec::{
25        manifest::{Content, ManifestEntry},
26        manifest_list::ManifestListEntry,
27        schema::Schema,
28        table_metadata::TableMetadata,
29    },
30    table_metadata::{
31        WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
32        WRITE_PARQUET_COMPRESSION_LEVEL,
33    },
34};
35
36use crate::{
37    catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
38    error::Error,
39    object_store::Bucket,
40    table::transaction::TableTransaction,
41};
42
43pub mod manifest;
44pub mod manifest_list;
45pub mod transaction;
46
47#[derive(Debug, Clone)]
48/// Iceberg table
49pub struct Table {
50    identifier: Identifier,
51    catalog: Arc<dyn Catalog>,
52    metadata: TableMetadata,
53}
54
55/// Public interface of the table.
56impl Table {
57    /// Creates a new table builder with default configuration
58    ///
59    /// Returns a `CreateTableBuilder` initialized with default properties:
60    /// * WRITE_PARQUET_COMPRESSION_CODEC: "zstd"
61    /// * WRITE_PARQUET_COMPRESSION_LEVEL: "1"
62    /// * WRITE_OBJECT_STORAGE_ENABLED: "false"
63    ///
64    /// # Returns
65    /// * `CreateTableBuilder` - A builder for configuring and creating a new table
66    ///
67    /// # Example
68    /// ```
69    /// use iceberg_rust::table::Table;
70    ///
71    /// let builder = Table::builder()
72    ///     .with_name("my_table")
73    ///     .with_schema(schema);
74    /// ```
75    pub fn builder() -> CreateTableBuilder {
76        let mut builder = CreateTableBuilder::default();
77        builder
78            .with_property((
79                WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
80                "zstd".to_owned(),
81            ))
82            .with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 1.to_string()))
83            .with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
84        builder
85    }
86
87    /// Creates a new table instance with the given identifier, catalog and metadata
88    ///
89    /// # Arguments
90    /// * `identifier` - The unique identifier for this table in the catalog
91    /// * `catalog` - The catalog that this table belongs to
92    /// * `metadata` - The table's metadata containing schema, partitioning, etc.
93    ///
94    /// # Returns
95    /// * `Result<Table, Error>` - The newly created table instance or an error
96    ///
97    /// This is typically called by catalog implementations rather than directly by users.
98    /// For creating new tables, use [`Table::builder()`] instead.
99    pub async fn new(
100        identifier: Identifier,
101        catalog: Arc<dyn Catalog>,
102        metadata: TableMetadata,
103    ) -> Result<Self, Error> {
104        Ok(Table {
105            identifier,
106            catalog,
107            metadata,
108        })
109    }
110    #[inline]
111    /// Returns the unique identifier for this table in the catalog
112    ///
113    /// The identifier contains both the namespace and name that uniquely identify
114    /// this table within its catalog.
115    ///
116    /// # Returns
117    /// * `&Identifier` - A reference to this table's identifier
118    pub fn identifier(&self) -> &Identifier {
119        &self.identifier
120    }
121    #[inline]
122    /// Returns a reference to the catalog containing this table
123    ///
124    /// The returned catalog reference is wrapped in an Arc to allow shared ownership
125    /// and thread-safe access to the catalog implementation.
126    ///
127    /// # Returns
128    /// * `Arc<dyn Catalog>` - A thread-safe reference to the table's catalog
129    pub fn catalog(&self) -> Arc<dyn Catalog> {
130        self.catalog.clone()
131    }
132    #[inline]
133    /// Returns the object store for this table's location
134    ///
135    /// The object store is determined by the table's location and is used for
136    /// reading and writing table data files. The returned store is wrapped in
137    /// an Arc to allow shared ownership and thread-safe access.
138    ///
139    /// # Returns
140    /// * `Arc<dyn ObjectStore>` - A thread-safe reference to the table's object store
141    pub fn object_store(&self) -> Arc<dyn ObjectStore> {
142        self.catalog
143            .object_store(Bucket::from_path(&self.metadata.location).unwrap())
144    }
145    #[inline]
146    /// Returns the current schema for this table, optionally for a specific branch
147    ///
148    /// # Arguments
149    /// * `branch` - Optional branch name to get the schema for. If None, returns the main branch schema
150    ///
151    /// # Returns
152    /// * `Result<&Schema, Error>` - The current schema if found, or an error if the schema cannot be found
153    ///
154    /// # Errors
155    /// Returns an error if the schema ID cannot be found in the table metadata
156    pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
157        self.metadata.current_schema(branch).map_err(Error::from)
158    }
159    #[inline]
160    /// Returns a reference to this table's metadata
161    ///
162    /// The metadata contains all table information including:
163    /// * Schema definitions
164    /// * Partition specifications
165    /// * Snapshots
166    /// * Sort orders
167    /// * Table properties
168    ///
169    /// # Returns
170    /// * `&TableMetadata` - A reference to the table's metadata
171    pub fn metadata(&self) -> &TableMetadata {
172        &self.metadata
173    }
174    #[inline]
175    /// Consumes the table and returns its metadata
176    ///
177    /// This method takes ownership of the table instance and returns just the
178    /// underlying TableMetadata. This is useful when you no longer need the
179    /// table instance but want to retain its metadata.
180    ///
181    /// # Returns
182    /// * `TableMetadata` - The owned metadata from this table
183    pub fn into_metadata(self) -> TableMetadata {
184        self.metadata
185    }
186    /// Returns manifest list entries for snapshots within the given sequence range
187    ///
188    /// # Arguments
189    /// * `start` - Optional starting snapshot ID (exclusive). If None, includes from the beginning
190    /// * `end` - Optional ending snapshot ID (inclusive). If None, uses the current snapshot
191    ///
192    /// # Returns
193    /// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
194    ///    or an empty vector if no current snapshot exists
195    ///
196    /// # Errors
197    /// Returns an error if:
198    /// * The end snapshot ID is invalid
199    /// * Reading the manifest list fails
200    pub async fn manifests(
201        &self,
202        start: Option<i64>,
203        end: Option<i64>,
204    ) -> Result<Vec<ManifestListEntry>, Error> {
205        let metadata = self.metadata();
206        let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
207            Some(snapshot) => snapshot,
208            None => {
209                if let Some(current) = metadata.current_snapshot(None)? {
210                    current
211                } else {
212                    return Ok(vec![]);
213                }
214            }
215        };
216        let start_sequence_number =
217            start
218                .and_then(|id| metadata.snapshots.get(&id))
219                .and_then(|snapshot| {
220                    let sequence_number = *snapshot.sequence_number();
221                    if sequence_number == 0 {
222                        None
223                    } else {
224                        Some(sequence_number)
225                    }
226                });
227        let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
228        match start_sequence_number {
229            Some(start) => iter
230                .filter_ok(|manifest| manifest.sequence_number > start)
231                .collect(),
232            None => iter.collect(),
233        }
234    }
235    /// Returns a stream of manifest entries for the given manifest list entries
236    ///
237    /// # Arguments
238    /// * `manifests` - List of manifest entries to read data files from
239    /// * `filter` - Optional vector of boolean predicates to filter manifest entries
240    /// * `sequence_number_range` - Tuple of (start, end) sequence numbers to filter entries by
241    ///
242    /// # Returns
243    /// * `Result<impl Stream<Item = Result<ManifestEntry, Error>>, Error>` - Stream of manifest entries
244    ///   that match the given filters
245    ///
246    /// # Type Parameters
247    /// * `'a` - Lifetime of the manifest list entries reference
248    ///
249    /// # Errors
250    /// Returns an error if reading any manifest file fails
251    #[inline]
252    pub async fn datafiles<'a>(
253        &self,
254        manifests: &'a [ManifestListEntry],
255        filter: Option<Vec<bool>>,
256        sequence_number_range: (Option<i64>, Option<i64>),
257    ) -> Result<impl Stream<Item = Result<ManifestEntry, Error>> + 'a, Error> {
258        datafiles(
259            self.object_store(),
260            manifests,
261            filter,
262            sequence_number_range,
263        )
264        .await
265    }
266    /// Check if datafiles contain deletes
267    pub async fn datafiles_contains_delete(
268        &self,
269        start: Option<i64>,
270        end: Option<i64>,
271    ) -> Result<bool, Error> {
272        let manifests = self.manifests(start, end).await?;
273        let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
274        datafiles
275            .try_any(|entry| async move { !matches!(entry.data_file().content(), Content::Data) })
276            .await
277    }
278    /// Creates a new transaction for atomic modifications to this table
279    ///
280    /// # Arguments
281    /// * `branch` - Optional branch name to create the transaction for. If None, uses the main branch
282    ///
283    /// # Returns
284    /// * `TableTransaction` - A new transaction that can be used to atomically modify this table
285    ///
286    /// The transaction must be committed for any changes to take effect.
287    /// Multiple operations can be chained within a single transaction.
288    pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction {
289        TableTransaction::new(self, branch)
290    }
291}
292
293async fn datafiles(
294    object_store: Arc<dyn ObjectStore>,
295    manifests: &'_ [ManifestListEntry],
296    filter: Option<Vec<bool>>,
297    sequence_number_range: (Option<i64>, Option<i64>),
298) -> Result<impl Stream<Item = Result<ManifestEntry, Error>> + '_, Error> {
299    // filter manifest files according to filter vector
300    let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
301        Some(predicate) => {
302            let iter = manifests
303                .iter()
304                .zip(predicate.into_iter())
305                .filter(|(_, predicate)| *predicate)
306                .map(|(manifest, _)| manifest);
307            Box::new(iter)
308        }
309        None => Box::new(manifests.iter()),
310    };
311
312    // Collect a vector of data files by creating a stream over the manifst files, fetch their content and return a flatten stream over their entries.
313    Ok(stream::iter(iter)
314        .then(move |file| {
315            let object_store = object_store.clone();
316            async move {
317                let path: Path = util::strip_prefix(&file.manifest_path).into();
318                let bytes = Cursor::new(Vec::from(
319                    object_store
320                        .get(&path)
321                        .and_then(|file| file.bytes())
322                        .await?,
323                ));
324                Ok::<_, Error>((bytes, file.sequence_number))
325            }
326        })
327        .flat_map_unordered(None, move |result| {
328            let (bytes, sequence_number) = result.unwrap();
329
330            let reader = ManifestReader::new(bytes).unwrap();
331            stream::iter(reader).try_filter_map(move |mut x| {
332                future::ready({
333                    let sequence_number = if let Some(sequence_number) = x.sequence_number() {
334                        *sequence_number
335                    } else {
336                        *x.sequence_number_mut() = Some(sequence_number);
337                        sequence_number
338                    };
339
340                    let filter = match sequence_number_range {
341                        (Some(start), Some(end)) => {
342                            start < sequence_number && sequence_number <= end
343                        }
344                        (Some(start), None) => start < sequence_number,
345                        (None, Some(end)) => sequence_number <= end,
346                        _ => true,
347                    };
348                    if filter {
349                        Ok(Some(x))
350                    } else {
351                        Ok(None)
352                    }
353                })
354            })
355        }))
356}
357
358/// delete all datafiles, manifests and metadata files, does not remove table from catalog
359pub(crate) async fn delete_all_table_files(
360    metadata: &TableMetadata,
361    object_store: Arc<dyn ObjectStore>,
362) -> Result<(), Error> {
363    let Some(snapshot) = metadata.current_snapshot(None)? else {
364        return Ok(());
365    };
366    let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
367        .await?
368        .collect::<Result<_, _>>()?;
369
370    let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
371    let snapshots = &metadata.snapshots;
372
373    // stream::iter(datafiles.into_iter())
374    datafiles
375        .try_for_each_concurrent(None, |datafile| {
376            let object_store = object_store.clone();
377            async move {
378                object_store
379                    .delete(&datafile.data_file().file_path().as_str().into())
380                    .await?;
381                Ok(())
382            }
383        })
384        .await?;
385
386    stream::iter(manifests.into_iter())
387        .map(Ok::<_, Error>)
388        .try_for_each_concurrent(None, |manifest| {
389            let object_store = object_store.clone();
390            async move {
391                object_store.delete(&manifest.manifest_path.into()).await?;
392                Ok(())
393            }
394        })
395        .await?;
396
397    stream::iter(snapshots.values())
398        .map(Ok::<_, Error>)
399        .try_for_each_concurrent(None, |snapshot| {
400            let object_store = object_store.clone();
401            async move {
402                object_store
403                    .delete(&snapshot.manifest_list().as_str().into())
404                    .await?;
405                Ok(())
406            }
407        })
408        .await?;
409
410    Ok(())
411}