iceberg_rust/table/mod.rs
1//! Table module provides the core functionality for working with Iceberg tables
2//!
3//! The main type in this module is [`Table`], which represents an Iceberg table and provides
4//! methods for:
5//! * Reading table data and metadata
6//! * Modifying table structure (schema, partitioning, etc.)
7//! * Managing table snapshots and branches
8//! * Performing atomic transactions
9//!
10//! Tables can be created using [`Table::builder()`] and modified using transactions
11//! created by [`Table::new_transaction()`].
12
13use std::{io::Cursor, sync::Arc};
14
15use futures::future::try_join_all;
16use itertools::Itertools;
17use manifest::ManifestReader;
18use manifest_list::read_snapshot;
19use object_store::{path::Path, ObjectStore};
20
21use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
22use iceberg_rust_spec::util::{self};
23use iceberg_rust_spec::{
24 spec::{
25 manifest::{Content, ManifestEntry},
26 manifest_list::ManifestListEntry,
27 schema::Schema,
28 table_metadata::TableMetadata,
29 },
30 table_metadata::{
31 WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
32 WRITE_PARQUET_COMPRESSION_LEVEL,
33 },
34};
35
36use tracing::{instrument, Instrument};
37
38use crate::{
39 catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
40 error::Error,
41 table::transaction::TableTransaction,
42};
43
44pub mod manifest;
45pub mod manifest_list;
46pub mod transaction;
47
48#[derive(Debug, Clone)]
49/// Iceberg table
50pub struct Table {
51 identifier: Identifier,
52 catalog: Arc<dyn Catalog>,
53 object_store: Arc<dyn ObjectStore>,
54 metadata: TableMetadata,
55}
56
57/// Public interface of the table.
58impl Table {
59 /// Creates a new table builder with default configuration
60 ///
61 /// Returns a `CreateTableBuilder` initialized with default properties:
62 /// * WRITE_PARQUET_COMPRESSION_CODEC: "zstd"
63 /// * WRITE_PARQUET_COMPRESSION_LEVEL: "3"
64 /// * WRITE_OBJECT_STORAGE_ENABLED: "false"
65 ///
66 /// # Returns
67 /// * `CreateTableBuilder` - A builder for configuring and creating a new table
68 ///
69 /// # Example
70 /// ```
71 /// use iceberg_rust::table::Table;
72 ///
73 /// let builder = Table::builder()
74 /// .with_name("my_table")
75 /// .with_schema(schema);
76 /// ```
77 pub fn builder() -> CreateTableBuilder {
78 let mut builder = CreateTableBuilder::default();
79 builder
80 .with_property((
81 WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
82 "zstd".to_owned(),
83 ))
84 .with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 3.to_string()))
85 .with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
86 builder
87 }
88
89 /// Creates a new table instance with the given identifier, catalog and metadata
90 ///
91 /// # Arguments
92 /// * `identifier` - The unique identifier for this table in the catalog
93 /// * `catalog` - The catalog that this table belongs to
94 /// * `metadata` - The table's metadata containing schema, partitioning, etc.
95 ///
96 /// # Returns
97 /// * `Result<Table, Error>` - The newly created table instance or an error
98 ///
99 /// This is typically called by catalog implementations rather than directly by users.
100 /// For creating new tables, use [`Table::builder()`] instead.
101 pub async fn new(
102 identifier: Identifier,
103 catalog: Arc<dyn Catalog>,
104 object_store: Arc<dyn ObjectStore>,
105 metadata: TableMetadata,
106 ) -> Result<Self, Error> {
107 Ok(Table {
108 identifier,
109 catalog,
110 object_store,
111 metadata,
112 })
113 }
114 #[inline]
115 /// Returns the unique identifier for this table in the catalog
116 ///
117 /// The identifier contains both the namespace and name that uniquely identify
118 /// this table within its catalog.
119 ///
120 /// # Returns
121 /// * `&Identifier` - A reference to this table's identifier
122 pub fn identifier(&self) -> &Identifier {
123 &self.identifier
124 }
125 #[inline]
126 /// Returns a reference to the catalog containing this table
127 ///
128 /// The returned catalog reference is wrapped in an Arc to allow shared ownership
129 /// and thread-safe access to the catalog implementation.
130 ///
131 /// # Returns
132 /// * `Arc<dyn Catalog>` - A thread-safe reference to the table's catalog
133 pub fn catalog(&self) -> Arc<dyn Catalog> {
134 self.catalog.clone()
135 }
136 #[inline]
137 /// Returns the object store for this table's location
138 ///
139 /// The object store is determined by the table's location and is used for
140 /// reading and writing table data files. The returned store is wrapped in
141 /// an Arc to allow shared ownership and thread-safe access.
142 ///
143 /// # Returns
144 /// * `Arc<dyn ObjectStore>` - A thread-safe reference to the table's object store
145 pub fn object_store(&self) -> Arc<dyn ObjectStore> {
146 self.object_store.clone()
147 }
148 #[inline]
149 /// Returns the current schema for this table, optionally for a specific branch
150 ///
151 /// # Arguments
152 /// * `branch` - Optional branch name to get the schema for. If None, returns the main branch schema
153 ///
154 /// # Returns
155 /// * `Result<&Schema, Error>` - The current schema if found, or an error if the schema cannot be found
156 ///
157 /// # Errors
158 /// Returns an error if the schema ID cannot be found in the table metadata
159 pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
160 self.metadata.current_schema(branch).map_err(Error::from)
161 }
162 #[inline]
163 /// Returns a reference to this table's metadata
164 ///
165 /// The metadata contains all table information including:
166 /// * Schema definitions
167 /// * Partition specifications
168 /// * Snapshots
169 /// * Sort orders
170 /// * Table properties
171 ///
172 /// # Returns
173 /// * `&TableMetadata` - A reference to the table's metadata
174 pub fn metadata(&self) -> &TableMetadata {
175 &self.metadata
176 }
177 #[inline]
178 /// Consumes the table and returns its metadata
179 ///
180 /// This method takes ownership of the table instance and returns just the
181 /// underlying TableMetadata. This is useful when you no longer need the
182 /// table instance but want to retain its metadata.
183 ///
184 /// # Returns
185 /// * `TableMetadata` - The owned metadata from this table
186 pub fn into_metadata(self) -> TableMetadata {
187 self.metadata
188 }
189 /// Returns manifest list entries for snapshots within the given sequence range
190 ///
191 /// # Arguments
192 /// * `start` - Optional starting snapshot ID (exclusive). If None, includes from the beginning
193 /// * `end` - Optional ending snapshot ID (inclusive). If None, uses the current snapshot
194 ///
195 /// # Returns
196 /// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
197 /// or an empty vector if no current snapshot exists
198 ///
199 /// # Errors
200 /// Returns an error if:
201 /// * The end snapshot ID is invalid
202 /// * Reading the manifest list fails
203 #[instrument(name = "iceberg_rust::table::manifests", level = "debug", skip(self), fields(
204 table_identifier = %self.identifier,
205 start = ?start,
206 end = ?end
207 ))]
208 pub async fn manifests(
209 &self,
210 start: Option<i64>,
211 end: Option<i64>,
212 ) -> Result<Vec<ManifestListEntry>, Error> {
213 let metadata = self.metadata();
214 let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
215 Some(snapshot) => snapshot,
216 None => {
217 if let Some(current) = metadata.current_snapshot(None)? {
218 current
219 } else {
220 return Ok(vec![]);
221 }
222 }
223 };
224 let start_sequence_number =
225 start
226 .and_then(|id| metadata.snapshots.get(&id))
227 .and_then(|snapshot| {
228 let sequence_number = *snapshot.sequence_number();
229 if sequence_number == 0 {
230 None
231 } else {
232 Some(sequence_number)
233 }
234 });
235 let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
236 match start_sequence_number {
237 Some(start) => iter
238 .filter_ok(|manifest| manifest.sequence_number > start)
239 .collect(),
240 None => iter.collect(),
241 }
242 }
243 /// Returns a stream of manifest entries for the given manifest list entries
244 ///
245 /// # Arguments
246 /// * `manifests` - List of manifest entries to read data files from
247 /// * `filter` - Optional vector of boolean predicates to filter manifest entries
248 /// * `sequence_number_range` - Tuple of (start, end) sequence numbers to filter entries by
249 ///
250 /// # Returns
251 /// * `Result<impl Stream<Item = Result<ManifestEntry, Error>>, Error>` - Stream of manifest entries
252 /// that match the given filters
253 ///
254 /// # Type Parameters
255 /// * `'a` - Lifetime of the manifest list entries reference
256 ///
257 /// # Errors
258 /// Returns an error if reading any manifest file fails
259 #[inline]
260 pub async fn datafiles<'a>(
261 &self,
262 manifests: &'a [ManifestListEntry],
263 filter: Option<Vec<bool>>,
264 sequence_number_range: (Option<i64>, Option<i64>),
265 ) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error>
266 {
267 datafiles(
268 self.object_store(),
269 manifests,
270 filter,
271 sequence_number_range,
272 )
273 .await
274 }
275 /// Check if datafiles contain deletes
276 pub async fn datafiles_contains_delete(
277 &self,
278 start: Option<i64>,
279 end: Option<i64>,
280 ) -> Result<bool, Error> {
281 let manifests = self.manifests(start, end).await?;
282 let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
283 stream::iter(datafiles)
284 .try_any(|entry| async move { !matches!(entry.1.data_file().content(), Content::Data) })
285 .await
286 }
287 /// Creates a new transaction for atomic modifications to this table
288 ///
289 /// # Arguments
290 /// * `branch` - Optional branch name to create the transaction for. If None, uses the main branch
291 ///
292 /// # Returns
293 /// * `TableTransaction` - A new transaction that can be used to atomically modify this table
294 ///
295 /// The transaction must be committed for any changes to take effect.
296 /// Multiple operations can be chained within a single transaction.
297 pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction<'_> {
298 TableTransaction::new(self, branch)
299 }
300}
301
302/// Path of a Manifest file
303pub type ManifestPath = String;
304
305#[instrument(name = "iceberg_rust::table::datafiles", level = "debug", skip(object_store, manifests), fields(
306 manifest_count = manifests.len(),
307 filter_provided = filter.is_some(),
308 sequence_range = ?sequence_number_range
309))]
310async fn datafiles(
311 object_store: Arc<dyn ObjectStore>,
312 manifests: &'_ [ManifestListEntry],
313 filter: Option<Vec<bool>>,
314 sequence_number_range: (Option<i64>, Option<i64>),
315) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
316 // filter manifest files according to filter vector
317 let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
318 Some(predicate) => {
319 let iter = manifests
320 .iter()
321 .zip(predicate.into_iter())
322 .filter(|(_, predicate)| *predicate)
323 .map(|(manifest, _)| manifest);
324 Box::new(iter)
325 }
326 None => Box::new(manifests.iter()),
327 };
328
329 let futures: Vec<_> = iter
330 .map(move |file| {
331 let object_store = object_store.clone();
332 async move {
333 let manifest_path = &file.manifest_path;
334 let path: Path = util::strip_prefix(manifest_path).into();
335 let bytes = Cursor::new(Vec::from(
336 object_store
337 .get(&path)
338 .and_then(|file| file.bytes())
339 .instrument(tracing::trace_span!("iceberg_rust::get_manifest"))
340 .await?,
341 ));
342 Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
343 }
344 })
345 .collect();
346
347 let results = try_join_all(futures).await?;
348
349 Ok(results.into_iter().flat_map(move |result| {
350 let (bytes, path, sequence_number) = result;
351
352 let reader = ManifestReader::new(bytes).unwrap();
353 reader.filter_map(move |x| {
354 let mut x = match x {
355 Ok(entry) => entry,
356 Err(_) => return None,
357 };
358
359 let sequence_number = if let Some(sequence_number) = x.sequence_number() {
360 *sequence_number
361 } else {
362 *x.sequence_number_mut() = Some(sequence_number);
363 sequence_number
364 };
365
366 let filter = match sequence_number_range {
367 (Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
368 (Some(start), None) => start < sequence_number,
369 (None, Some(end)) => sequence_number <= end,
370 _ => true,
371 };
372 if filter {
373 Some(Ok((path.to_owned(), x)))
374 } else {
375 None
376 }
377 })
378 }))
379}
380
381/// delete all datafiles, manifests and metadata files, does not remove table from catalog
382pub(crate) async fn delete_all_table_files(
383 metadata: &TableMetadata,
384 object_store: Arc<dyn ObjectStore>,
385) -> Result<(), Error> {
386 let Some(snapshot) = metadata.current_snapshot(None)? else {
387 return Ok(());
388 };
389 let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
390 .await?
391 .collect::<Result<_, _>>()?;
392
393 let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
394 let snapshots = &metadata.snapshots;
395
396 // stream::iter(datafiles.into_iter())
397 stream::iter(datafiles)
398 .try_for_each_concurrent(None, |datafile| {
399 let object_store = object_store.clone();
400 async move {
401 object_store
402 .delete(&datafile.1.data_file().file_path().as_str().into())
403 .await?;
404 Ok(())
405 }
406 })
407 .await?;
408
409 stream::iter(manifests.into_iter())
410 .map(Ok::<_, Error>)
411 .try_for_each_concurrent(None, |manifest| {
412 let object_store = object_store.clone();
413 async move {
414 object_store.delete(&manifest.manifest_path.into()).await?;
415 Ok(())
416 }
417 })
418 .await?;
419
420 stream::iter(snapshots.values())
421 .map(Ok::<_, Error>)
422 .try_for_each_concurrent(None, |snapshot| {
423 let object_store = object_store.clone();
424 async move {
425 object_store
426 .delete(&snapshot.manifest_list().as_str().into())
427 .await?;
428 Ok(())
429 }
430 })
431 .await?;
432
433 Ok(())
434}