iceberg_rust/table/mod.rs
1//! Table module provides the core functionality for working with Iceberg tables
2//!
3//! The main type in this module is [`Table`], which represents an Iceberg table and provides
4//! methods for:
5//! * Reading table data and metadata
6//! * Modifying table structure (schema, partitioning, etc.)
7//! * Managing table snapshots and branches
8//! * Performing atomic transactions
9//!
10//! Tables can be created using [`Table::builder()`] and modified using transactions
11//! created by [`Table::new_transaction()`].
12
13use std::{io::Cursor, sync::Arc};
14
15use futures::future;
16use itertools::Itertools;
17use manifest::ManifestReader;
18use manifest_list::read_snapshot;
19use object_store::{path::Path, ObjectStore};
20
21use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
22use iceberg_rust_spec::util::{self};
23use iceberg_rust_spec::{
24 spec::{
25 manifest::{Content, ManifestEntry},
26 manifest_list::ManifestListEntry,
27 schema::Schema,
28 table_metadata::TableMetadata,
29 },
30 table_metadata::{
31 WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
32 WRITE_PARQUET_COMPRESSION_LEVEL,
33 },
34};
35
36use crate::{
37 catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
38 error::Error,
39 object_store::Bucket,
40 table::transaction::TableTransaction,
41};
42
43pub mod manifest;
44pub mod manifest_list;
45pub mod transaction;
46
47#[derive(Debug, Clone)]
48/// Iceberg table
49pub struct Table {
50 identifier: Identifier,
51 catalog: Arc<dyn Catalog>,
52 metadata: TableMetadata,
53}
54
55/// Public interface of the table.
56impl Table {
57 /// Creates a new table builder with default configuration
58 ///
59 /// Returns a `CreateTableBuilder` initialized with default properties:
60 /// * WRITE_PARQUET_COMPRESSION_CODEC: "zstd"
61 /// * WRITE_PARQUET_COMPRESSION_LEVEL: "1"
62 /// * WRITE_OBJECT_STORAGE_ENABLED: "false"
63 ///
64 /// # Returns
65 /// * `CreateTableBuilder` - A builder for configuring and creating a new table
66 ///
67 /// # Example
68 /// ```
69 /// use iceberg_rust::table::Table;
70 ///
71 /// let builder = Table::builder()
72 /// .with_name("my_table")
73 /// .with_schema(schema);
74 /// ```
75 pub fn builder() -> CreateTableBuilder {
76 let mut builder = CreateTableBuilder::default();
77 builder
78 .with_property((
79 WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
80 "zstd".to_owned(),
81 ))
82 .with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 1.to_string()))
83 .with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
84 builder
85 }
86
87 /// Creates a new table instance with the given identifier, catalog and metadata
88 ///
89 /// # Arguments
90 /// * `identifier` - The unique identifier for this table in the catalog
91 /// * `catalog` - The catalog that this table belongs to
92 /// * `metadata` - The table's metadata containing schema, partitioning, etc.
93 ///
94 /// # Returns
95 /// * `Result<Table, Error>` - The newly created table instance or an error
96 ///
97 /// This is typically called by catalog implementations rather than directly by users.
98 /// For creating new tables, use [`Table::builder()`] instead.
99 pub async fn new(
100 identifier: Identifier,
101 catalog: Arc<dyn Catalog>,
102 metadata: TableMetadata,
103 ) -> Result<Self, Error> {
104 Ok(Table {
105 identifier,
106 catalog,
107 metadata,
108 })
109 }
110 #[inline]
111 /// Returns the unique identifier for this table in the catalog
112 ///
113 /// The identifier contains both the namespace and name that uniquely identify
114 /// this table within its catalog.
115 ///
116 /// # Returns
117 /// * `&Identifier` - A reference to this table's identifier
118 pub fn identifier(&self) -> &Identifier {
119 &self.identifier
120 }
121 #[inline]
122 /// Returns a reference to the catalog containing this table
123 ///
124 /// The returned catalog reference is wrapped in an Arc to allow shared ownership
125 /// and thread-safe access to the catalog implementation.
126 ///
127 /// # Returns
128 /// * `Arc<dyn Catalog>` - A thread-safe reference to the table's catalog
129 pub fn catalog(&self) -> Arc<dyn Catalog> {
130 self.catalog.clone()
131 }
132 #[inline]
133 /// Returns the object store for this table's location
134 ///
135 /// The object store is determined by the table's location and is used for
136 /// reading and writing table data files. The returned store is wrapped in
137 /// an Arc to allow shared ownership and thread-safe access.
138 ///
139 /// # Returns
140 /// * `Arc<dyn ObjectStore>` - A thread-safe reference to the table's object store
141 pub fn object_store(&self) -> Arc<dyn ObjectStore> {
142 self.catalog
143 .object_store(Bucket::from_path(&self.metadata.location).unwrap())
144 }
145 #[inline]
146 /// Returns the current schema for this table, optionally for a specific branch
147 ///
148 /// # Arguments
149 /// * `branch` - Optional branch name to get the schema for. If None, returns the main branch schema
150 ///
151 /// # Returns
152 /// * `Result<&Schema, Error>` - The current schema if found, or an error if the schema cannot be found
153 ///
154 /// # Errors
155 /// Returns an error if the schema ID cannot be found in the table metadata
156 pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
157 self.metadata.current_schema(branch).map_err(Error::from)
158 }
159 #[inline]
160 /// Returns a reference to this table's metadata
161 ///
162 /// The metadata contains all table information including:
163 /// * Schema definitions
164 /// * Partition specifications
165 /// * Snapshots
166 /// * Sort orders
167 /// * Table properties
168 ///
169 /// # Returns
170 /// * `&TableMetadata` - A reference to the table's metadata
171 pub fn metadata(&self) -> &TableMetadata {
172 &self.metadata
173 }
174 #[inline]
175 /// Consumes the table and returns its metadata
176 ///
177 /// This method takes ownership of the table instance and returns just the
178 /// underlying TableMetadata. This is useful when you no longer need the
179 /// table instance but want to retain its metadata.
180 ///
181 /// # Returns
182 /// * `TableMetadata` - The owned metadata from this table
183 pub fn into_metadata(self) -> TableMetadata {
184 self.metadata
185 }
186 /// Returns manifest list entries for snapshots within the given sequence range
187 ///
188 /// # Arguments
189 /// * `start` - Optional starting snapshot ID (exclusive). If None, includes from the beginning
190 /// * `end` - Optional ending snapshot ID (inclusive). If None, uses the current snapshot
191 ///
192 /// # Returns
193 /// * `Result<Vec<ManifestListEntry>, Error>` - Vector of manifest entries in the range,
194 /// or an empty vector if no current snapshot exists
195 ///
196 /// # Errors
197 /// Returns an error if:
198 /// * The end snapshot ID is invalid
199 /// * Reading the manifest list fails
200 pub async fn manifests(
201 &self,
202 start: Option<i64>,
203 end: Option<i64>,
204 ) -> Result<Vec<ManifestListEntry>, Error> {
205 let metadata = self.metadata();
206 let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
207 Some(snapshot) => snapshot,
208 None => {
209 if let Some(current) = metadata.current_snapshot(None)? {
210 current
211 } else {
212 return Ok(vec![]);
213 }
214 }
215 };
216 let start_sequence_number =
217 start
218 .and_then(|id| metadata.snapshots.get(&id))
219 .and_then(|snapshot| {
220 let sequence_number = *snapshot.sequence_number();
221 if sequence_number == 0 {
222 None
223 } else {
224 Some(sequence_number)
225 }
226 });
227 let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
228 match start_sequence_number {
229 Some(start) => iter
230 .filter_ok(|manifest| manifest.sequence_number > start)
231 .collect(),
232 None => iter.collect(),
233 }
234 }
235 /// Returns a stream of manifest entries for the given manifest list entries
236 ///
237 /// # Arguments
238 /// * `manifests` - List of manifest entries to read data files from
239 /// * `filter` - Optional vector of boolean predicates to filter manifest entries
240 /// * `sequence_number_range` - Tuple of (start, end) sequence numbers to filter entries by
241 ///
242 /// # Returns
243 /// * `Result<impl Stream<Item = Result<ManifestEntry, Error>>, Error>` - Stream of manifest entries
244 /// that match the given filters
245 ///
246 /// # Type Parameters
247 /// * `'a` - Lifetime of the manifest list entries reference
248 ///
249 /// # Errors
250 /// Returns an error if reading any manifest file fails
251 #[inline]
252 pub async fn datafiles<'a>(
253 &self,
254 manifests: &'a [ManifestListEntry],
255 filter: Option<Vec<bool>>,
256 sequence_number_range: (Option<i64>, Option<i64>),
257 ) -> Result<impl Stream<Item = Result<ManifestEntry, Error>> + 'a, Error> {
258 datafiles(
259 self.object_store(),
260 manifests,
261 filter,
262 sequence_number_range,
263 )
264 .await
265 }
266 /// Check if datafiles contain deletes
267 pub async fn datafiles_contains_delete(
268 &self,
269 start: Option<i64>,
270 end: Option<i64>,
271 ) -> Result<bool, Error> {
272 let manifests = self.manifests(start, end).await?;
273 let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
274 datafiles
275 .try_any(|entry| async move { !matches!(entry.data_file().content(), Content::Data) })
276 .await
277 }
278 /// Creates a new transaction for atomic modifications to this table
279 ///
280 /// # Arguments
281 /// * `branch` - Optional branch name to create the transaction for. If None, uses the main branch
282 ///
283 /// # Returns
284 /// * `TableTransaction` - A new transaction that can be used to atomically modify this table
285 ///
286 /// The transaction must be committed for any changes to take effect.
287 /// Multiple operations can be chained within a single transaction.
288 pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction {
289 TableTransaction::new(self, branch)
290 }
291}
292
293async fn datafiles(
294 object_store: Arc<dyn ObjectStore>,
295 manifests: &'_ [ManifestListEntry],
296 filter: Option<Vec<bool>>,
297 sequence_number_range: (Option<i64>, Option<i64>),
298) -> Result<impl Stream<Item = Result<ManifestEntry, Error>> + '_, Error> {
299 // filter manifest files according to filter vector
300 let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
301 Some(predicate) => {
302 let iter = manifests
303 .iter()
304 .zip(predicate.into_iter())
305 .filter(|(_, predicate)| *predicate)
306 .map(|(manifest, _)| manifest);
307 Box::new(iter)
308 }
309 None => Box::new(manifests.iter()),
310 };
311
312 // Collect a vector of data files by creating a stream over the manifst files, fetch their content and return a flatten stream over their entries.
313 Ok(stream::iter(iter)
314 .then(move |file| {
315 let object_store = object_store.clone();
316 async move {
317 let path: Path = util::strip_prefix(&file.manifest_path).into();
318 let bytes = Cursor::new(Vec::from(
319 object_store
320 .get(&path)
321 .and_then(|file| file.bytes())
322 .await?,
323 ));
324 Ok::<_, Error>((bytes, file.sequence_number))
325 }
326 })
327 .flat_map_unordered(None, move |result| {
328 let (bytes, sequence_number) = result.unwrap();
329
330 let reader = ManifestReader::new(bytes).unwrap();
331 stream::iter(reader).try_filter_map(move |mut x| {
332 future::ready({
333 let sequence_number = if let Some(sequence_number) = x.sequence_number() {
334 *sequence_number
335 } else {
336 *x.sequence_number_mut() = Some(sequence_number);
337 sequence_number
338 };
339
340 let filter = match sequence_number_range {
341 (Some(start), Some(end)) => {
342 start < sequence_number && sequence_number <= end
343 }
344 (Some(start), None) => start < sequence_number,
345 (None, Some(end)) => sequence_number <= end,
346 _ => true,
347 };
348 if filter {
349 Ok(Some(x))
350 } else {
351 Ok(None)
352 }
353 })
354 })
355 }))
356}
357
358/// delete all datafiles, manifests and metadata files, does not remove table from catalog
359pub(crate) async fn delete_all_table_files(
360 metadata: &TableMetadata,
361 object_store: Arc<dyn ObjectStore>,
362) -> Result<(), Error> {
363 let Some(snapshot) = metadata.current_snapshot(None)? else {
364 return Ok(());
365 };
366 let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
367 .await?
368 .collect::<Result<_, _>>()?;
369
370 let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
371 let snapshots = &metadata.snapshots;
372
373 // stream::iter(datafiles.into_iter())
374 datafiles
375 .try_for_each_concurrent(None, |datafile| {
376 let object_store = object_store.clone();
377 async move {
378 object_store
379 .delete(&datafile.data_file().file_path().as_str().into())
380 .await?;
381 Ok(())
382 }
383 })
384 .await?;
385
386 stream::iter(manifests.into_iter())
387 .map(Ok::<_, Error>)
388 .try_for_each_concurrent(None, |manifest| {
389 let object_store = object_store.clone();
390 async move {
391 object_store.delete(&manifest.manifest_path.into()).await?;
392 Ok(())
393 }
394 })
395 .await?;
396
397 stream::iter(snapshots.values())
398 .map(Ok::<_, Error>)
399 .try_for_each_concurrent(None, |snapshot| {
400 let object_store = object_store.clone();
401 async move {
402 object_store
403 .delete(&snapshot.manifest_list().as_str().into())
404 .await?;
405 Ok(())
406 }
407 })
408 .await?;
409
410 Ok(())
411}