iceberg_rust/table/mod.rs
1//! Table module provides the core functionality for working with Iceberg tables
2//!
3//! The main type in this module is [`Table`], which represents an Iceberg table and provides
4//! methods for:
5//! * Reading table data and metadata
6//! * Modifying table structure (schema, partitioning, etc.)
7//! * Managing table snapshots and branches
8//! * Performing atomic transactions
9//!
10//! Tables can be created using [`Table::builder()`] and modified using transactions
11//! created by [`Table::new_transaction()`].
12
13use std::{io::Cursor, sync::Arc};
14
15use futures::future::try_join_all;
16use itertools::Itertools;
17use manifest::ManifestReader;
18use manifest_list::read_snapshot;
19use object_store::ObjectStoreExt;
20use object_store::{path::Path, ObjectStore};
21
22use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
23use iceberg_rust_spec::util::{self};
24use iceberg_rust_spec::{
25 spec::{
26 manifest::{Content, ManifestEntry},
27 manifest_list::ManifestListEntry,
28 schema::Schema,
29 table_metadata::TableMetadata,
30 },
31 table_metadata::{
32 WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
33 WRITE_PARQUET_COMPRESSION_LEVEL,
34 },
35};
36
37use tracing::{instrument, Instrument};
38
39use crate::{
40 catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
41 error::Error,
42 table::transaction::TableTransaction,
43};
44
45pub mod manifest;
46pub mod manifest_list;
47pub mod transaction;
48
49#[derive(Debug, Clone)]
50/// Iceberg table
51pub struct Table {
52 identifier: Identifier,
53 catalog: Arc<dyn Catalog>,
54 object_store: Arc<dyn ObjectStore>,
55 metadata: TableMetadata,
56}
57
58/// Public interface of the table.
59impl Table {
60 /// Creates a new table builder with default configuration
61 ///
62 /// Returns a `CreateTableBuilder` initialized with default properties:
63 /// * WRITE_PARQUET_COMPRESSION_CODEC: "zstd"
64 /// * WRITE_PARQUET_COMPRESSION_LEVEL: "3"
65 /// * WRITE_OBJECT_STORAGE_ENABLED: "false"
66 ///
67 /// # Returns
68 /// * `CreateTableBuilder` - A builder for configuring and creating a new table
69 ///
70 /// # Example
71 /// ```
72 /// use iceberg_rust::table::Table;
73 ///
74 /// let builder = Table::builder()
75 /// .with_name("my_table")
76 /// .with_schema(schema);
77 /// ```
78 pub fn builder() -> CreateTableBuilder {
79 let mut builder = CreateTableBuilder::default();
80 builder
81 .with_property((
82 WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
83 "zstd".to_owned(),
84 ))
85 .with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 3.to_string()))
86 .with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
87 builder
88 }
89
90 /// Creates a new table instance with the given identifier, catalog and metadata
91 ///
92 /// # Arguments
93 /// * `identifier` - The unique identifier for this table in the catalog
94 /// * `catalog` - The catalog that this table belongs to
95 /// * `metadata` - The table's metadata containing schema, partitioning, etc.
96 ///
97 /// # Returns
98 /// * `Result<Table, Error>` - The newly created table instance or an error
99 ///
100 /// This is typically called by catalog implementations rather than directly by users.
101 /// For creating new tables, use [`Table::builder()`] instead.
102 pub async fn new(
103 identifier: Identifier,
104 catalog: Arc<dyn Catalog>,
105 object_store: Arc<dyn ObjectStore>,
106 metadata: TableMetadata,
107 ) -> Result<Self, Error> {
108 Ok(Table {
109 identifier,
110 catalog,
111 object_store,
112 metadata,
113 })
114 }
115 #[inline]
116 /// Returns the unique identifier for this table in the catalog
117 ///
118 /// The identifier contains both the namespace and name that uniquely identify
119 /// this table within its catalog.
120 ///
121 /// # Returns
122 /// * `&Identifier` - A reference to this table's identifier
123 pub fn identifier(&self) -> &Identifier {
124 &self.identifier
125 }
126 #[inline]
127 /// Returns a reference to the catalog containing this table
128 ///
129 /// The returned catalog reference is wrapped in an Arc to allow shared ownership
130 /// and thread-safe access to the catalog implementation.
131 ///
132 /// # Returns
133 /// * `Arc<dyn Catalog>` - A thread-safe reference to the table's catalog
134 pub fn catalog(&self) -> Arc<dyn Catalog> {
135 self.catalog.clone()
136 }
137 #[inline]
138 /// Returns the object store for this table's location
139 ///
140 /// The object store is determined by the table's location and is used for
141 /// reading and writing table data files. The returned store is wrapped in
142 /// an Arc to allow shared ownership and thread-safe access.
143 ///
144 /// # Returns
145 /// * `Arc<dyn ObjectStore>` - A thread-safe reference to the table's object store
146 pub fn object_store(&self) -> Arc<dyn ObjectStore> {
147 self.object_store.clone()
148 }
149 #[inline]
150 /// Returns the current schema for this table, optionally for a specific branch
151 ///
152 /// # Arguments
153 /// * `branch` - Optional branch name to get the schema for. If None, returns the main branch schema
154 ///
155 /// # Returns
156 /// * `Result<&Schema, Error>` - The current schema if found, or an error if the schema cannot be found
157 ///
158 /// # Errors
159 /// Returns an error if the schema ID cannot be found in the table metadata
160 pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
161 self.metadata.current_schema(branch).map_err(Error::from)
162 }
163 #[inline]
164 /// Returns a reference to this table's metadata
165 ///
166 /// The metadata contains all table information including:
167 /// * Schema definitions
168 /// * Partition specifications
169 /// * Snapshots
170 /// * Sort orders
171 /// * Table properties
172 ///
173 /// # Returns
174 /// * `&TableMetadata` - A reference to the table's metadata
175 pub fn metadata(&self) -> &TableMetadata {
176 &self.metadata
177 }
178 #[inline]
179 /// Consumes the table and returns its metadata
180 ///
181 /// This method takes ownership of the table instance and returns just the
182 /// underlying TableMetadata. This is useful when you no longer need the
183 /// table instance but want to retain its metadata.
184 ///
185 /// # Returns
186 /// * `TableMetadata` - The owned metadata from this table
187 pub fn into_metadata(self) -> TableMetadata {
188 self.metadata
189 }
190 /// Returns manifest list entries for snapshots within the given sequence range
191 ///
192 /// # Arguments
193 /// * `start` - Optional starting snapshot ID (exclusive). If None, includes from the beginning
194 /// * `end` - Optional ending snapshot ID (inclusive). If None, uses the current snapshot
195 ///
196 /// # Returns
197 /// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
198 /// or an empty vector if no current snapshot exists
199 ///
200 /// # Errors
201 /// Returns an error if:
202 /// * The end snapshot ID is invalid
203 /// * Reading the manifest list fails
204 #[instrument(name = "iceberg_rust::table::manifests", level = "debug", skip(self), fields(
205 table_identifier = %self.identifier,
206 start = ?start,
207 end = ?end
208 ))]
209 pub async fn manifests(
210 &self,
211 start: Option<i64>,
212 end: Option<i64>,
213 ) -> Result<Vec<ManifestListEntry>, Error> {
214 let metadata = self.metadata();
215 let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
216 Some(snapshot) => snapshot,
217 None => {
218 if let Some(current) = metadata.current_snapshot(None)? {
219 current
220 } else {
221 return Ok(vec![]);
222 }
223 }
224 };
225 let start_sequence_number =
226 start
227 .and_then(|id| metadata.snapshots.get(&id))
228 .and_then(|snapshot| {
229 let sequence_number = *snapshot.sequence_number();
230 if sequence_number == 0 {
231 None
232 } else {
233 Some(sequence_number)
234 }
235 });
236 let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
237 match start_sequence_number {
238 Some(start) => iter
239 .filter_ok(|manifest| manifest.sequence_number > start)
240 .collect(),
241 None => iter.collect(),
242 }
243 }
244 /// Returns a stream of manifest entries for the given manifest list entries
245 ///
246 /// # Arguments
247 /// * `manifests` - List of manifest entries to read data files from
248 /// * `filter` - Optional vector of boolean predicates to filter manifest entries
249 /// * `sequence_number_range` - Tuple of (start, end) sequence numbers to filter entries by
250 ///
251 /// # Returns
252 /// * `Result<impl Stream<Item = Result<ManifestEntry, Error>>, Error>` - Stream of manifest entries
253 /// that match the given filters
254 ///
255 /// # Type Parameters
256 /// * `'a` - Lifetime of the manifest list entries reference
257 ///
258 /// # Errors
259 /// Returns an error if reading any manifest file fails
260 #[inline]
261 pub async fn datafiles<'a>(
262 &self,
263 manifests: &'a [ManifestListEntry],
264 filter: Option<Vec<bool>>,
265 sequence_number_range: (Option<i64>, Option<i64>),
266 ) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error>
267 {
268 datafiles(
269 self.object_store(),
270 manifests,
271 filter,
272 sequence_number_range,
273 )
274 .await
275 }
276 /// Check if datafiles contain deletes
277 pub async fn datafiles_contains_delete(
278 &self,
279 start: Option<i64>,
280 end: Option<i64>,
281 ) -> Result<bool, Error> {
282 let manifests = self.manifests(start, end).await?;
283 let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
284 stream::iter(datafiles)
285 .try_any(|entry| async move { !matches!(entry.1.data_file().content(), Content::Data) })
286 .await
287 }
288 /// Creates a new transaction for atomic modifications to this table
289 ///
290 /// # Arguments
291 /// * `branch` - Optional branch name to create the transaction for. If None, uses the main branch
292 ///
293 /// # Returns
294 /// * `TableTransaction` - A new transaction that can be used to atomically modify this table
295 ///
296 /// The transaction must be committed for any changes to take effect.
297 /// Multiple operations can be chained within a single transaction.
298 pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction<'_> {
299 TableTransaction::new(self, branch)
300 }
301}
302
303/// Path of a Manifest file
304pub type ManifestPath = String;
305
306#[instrument(name = "iceberg_rust::table::datafiles", level = "debug", skip(object_store, manifests), fields(
307 manifest_count = manifests.len(),
308 filter_provided = filter.is_some(),
309 sequence_range = ?sequence_number_range
310))]
311async fn datafiles(
312 object_store: Arc<dyn ObjectStore>,
313 manifests: &'_ [ManifestListEntry],
314 filter: Option<Vec<bool>>,
315 sequence_number_range: (Option<i64>, Option<i64>),
316) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
317 // filter manifest files according to filter vector
318 let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
319 Some(predicate) => {
320 let iter = manifests
321 .iter()
322 .zip(predicate.into_iter())
323 .filter(|(_, predicate)| *predicate)
324 .map(|(manifest, _)| manifest);
325 Box::new(iter)
326 }
327 None => Box::new(manifests.iter()),
328 };
329
330 let futures: Vec<_> = iter
331 .map(move |file| {
332 let object_store = object_store.clone();
333 async move {
334 let manifest_path = &file.manifest_path;
335 let path: Path = util::strip_prefix(manifest_path).into();
336 let bytes = Cursor::new(Vec::from(
337 object_store
338 .get(&path)
339 .and_then(|file| file.bytes())
340 .instrument(tracing::trace_span!("iceberg_rust::get_manifest"))
341 .await?,
342 ));
343 Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
344 }
345 })
346 .collect();
347
348 let results = try_join_all(futures).await?;
349
350 Ok(results.into_iter().flat_map(move |result| {
351 let (bytes, path, sequence_number) = result;
352
353 let reader = ManifestReader::new(bytes).unwrap();
354 reader.filter_map(move |x| {
355 let mut x = match x {
356 Ok(entry) => entry,
357 Err(_) => return None,
358 };
359
360 let sequence_number = if let Some(sequence_number) = x.sequence_number() {
361 *sequence_number
362 } else {
363 *x.sequence_number_mut() = Some(sequence_number);
364 sequence_number
365 };
366
367 let filter = match sequence_number_range {
368 (Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
369 (Some(start), None) => start < sequence_number,
370 (None, Some(end)) => sequence_number <= end,
371 _ => true,
372 };
373 if filter {
374 Some(Ok((path.to_owned(), x)))
375 } else {
376 None
377 }
378 })
379 }))
380}
381
382/// delete all datafiles, manifests and metadata files, does not remove table from catalog
383pub(crate) async fn delete_all_table_files(
384 metadata: &TableMetadata,
385 object_store: Arc<dyn ObjectStore>,
386) -> Result<(), Error> {
387 let Some(snapshot) = metadata.current_snapshot(None)? else {
388 return Ok(());
389 };
390 let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
391 .await?
392 .collect::<Result<_, _>>()?;
393
394 let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
395 let snapshots = &metadata.snapshots;
396
397 // stream::iter(datafiles.into_iter())
398 stream::iter(datafiles)
399 .try_for_each_concurrent(None, |datafile| {
400 let object_store = object_store.clone();
401 async move {
402 object_store
403 .delete(&datafile.1.data_file().file_path().as_str().into())
404 .await?;
405 Ok(())
406 }
407 })
408 .await?;
409
410 stream::iter(manifests.into_iter())
411 .map(Ok::<_, Error>)
412 .try_for_each_concurrent(None, |manifest| {
413 let object_store = object_store.clone();
414 async move {
415 object_store.delete(&manifest.manifest_path.into()).await?;
416 Ok(())
417 }
418 })
419 .await?;
420
421 stream::iter(snapshots.values())
422 .map(Ok::<_, Error>)
423 .try_for_each_concurrent(None, |snapshot| {
424 let object_store = object_store.clone();
425 async move {
426 object_store
427 .delete(&snapshot.manifest_list().as_str().into())
428 .await?;
429 Ok(())
430 }
431 })
432 .await?;
433
434 Ok(())
435}