use std::{io::Cursor, sync::Arc};
use futures::future::try_join_all;
use itertools::Itertools;
use manifest::ManifestReader;
use manifest_list::read_snapshot;
use object_store::ObjectStoreExt;
use object_store::{path::Path, ObjectStore};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use iceberg_rust_spec::util::{self};
use iceberg_rust_spec::{
spec::{
manifest::{Content, ManifestEntry},
manifest_list::ManifestListEntry,
schema::Schema,
table_metadata::TableMetadata,
},
table_metadata::{
WRITE_OBJECT_STORAGE_ENABLED, WRITE_PARQUET_COMPRESSION_CODEC,
WRITE_PARQUET_COMPRESSION_LEVEL,
},
};
use tracing::{instrument, Instrument};
use crate::{
catalog::{create::CreateTableBuilder, identifier::Identifier, Catalog},
error::Error,
table::transaction::TableTransaction,
};
pub mod manifest;
pub mod manifest_list;
pub mod transaction;
#[derive(Debug, Clone)]
pub struct Table {
identifier: Identifier,
catalog: Arc<dyn Catalog>,
object_store: Arc<dyn ObjectStore>,
metadata: TableMetadata,
}
impl Table {
pub fn builder() -> CreateTableBuilder {
let mut builder = CreateTableBuilder::default();
builder
.with_property((
WRITE_PARQUET_COMPRESSION_CODEC.to_owned(),
"zstd".to_owned(),
))
.with_property((WRITE_PARQUET_COMPRESSION_LEVEL.to_owned(), 3.to_string()))
.with_property((WRITE_OBJECT_STORAGE_ENABLED.to_owned(), "false".to_owned()));
builder
}
pub async fn new(
identifier: Identifier,
catalog: Arc<dyn Catalog>,
object_store: Arc<dyn ObjectStore>,
metadata: TableMetadata,
) -> Result<Self, Error> {
Ok(Table {
identifier,
catalog,
object_store,
metadata,
})
}
#[inline]
pub fn identifier(&self) -> &Identifier {
&self.identifier
}
#[inline]
pub fn catalog(&self) -> Arc<dyn Catalog> {
self.catalog.clone()
}
#[inline]
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
self.object_store.clone()
}
#[inline]
pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
self.metadata.current_schema(branch).map_err(Error::from)
}
#[inline]
pub fn metadata(&self) -> &TableMetadata {
&self.metadata
}
#[inline]
pub fn into_metadata(self) -> TableMetadata {
self.metadata
}
#[instrument(name = "iceberg_rust::table::manifests", level = "debug", skip(self), fields(
table_identifier = %self.identifier,
start = ?start,
end = ?end
))]
pub async fn manifests(
&self,
start: Option<i64>,
end: Option<i64>,
) -> Result<Vec<ManifestListEntry>, Error> {
let metadata = self.metadata();
let end_snapshot = match end.and_then(|id| metadata.snapshots.get(&id)) {
Some(snapshot) => snapshot,
None => {
if let Some(current) = metadata.current_snapshot(None)? {
current
} else {
return Ok(vec![]);
}
}
};
let start_sequence_number =
start
.and_then(|id| metadata.snapshots.get(&id))
.and_then(|snapshot| {
let sequence_number = *snapshot.sequence_number();
if sequence_number == 0 {
None
} else {
Some(sequence_number)
}
});
let iter = read_snapshot(end_snapshot, metadata, self.object_store().clone()).await?;
match start_sequence_number {
Some(start) => iter
.filter_ok(|manifest| manifest.sequence_number > start)
.collect(),
None => iter.collect(),
}
}
#[inline]
pub async fn datafiles<'a>(
&self,
manifests: &'a [ManifestListEntry],
filter: Option<Vec<bool>>,
sequence_number_range: (Option<i64>, Option<i64>),
) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error>
{
datafiles(
self.object_store(),
manifests,
filter,
sequence_number_range,
)
.await
}
pub async fn datafiles_contains_delete(
&self,
start: Option<i64>,
end: Option<i64>,
) -> Result<bool, Error> {
let manifests = self.manifests(start, end).await?;
let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
stream::iter(datafiles)
.try_any(|entry| async move { !matches!(entry.1.data_file().content(), Content::Data) })
.await
}
pub fn new_transaction(&mut self, branch: Option<&str>) -> TableTransaction<'_> {
TableTransaction::new(self, branch)
}
}
pub type ManifestPath = String;
#[instrument(name = "iceberg_rust::table::datafiles", level = "debug", skip(object_store, manifests), fields(
manifest_count = manifests.len(),
filter_provided = filter.is_some(),
sequence_range = ?sequence_number_range
))]
async fn datafiles(
object_store: Arc<dyn ObjectStore>,
manifests: &'_ [ManifestListEntry],
filter: Option<Vec<bool>>,
sequence_number_range: (Option<i64>, Option<i64>),
) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
Some(predicate) => {
let iter = manifests
.iter()
.zip(predicate.into_iter())
.filter(|(_, predicate)| *predicate)
.map(|(manifest, _)| manifest);
Box::new(iter)
}
None => Box::new(manifests.iter()),
};
let futures: Vec<_> = iter
.map(move |file| {
let object_store = object_store.clone();
async move {
let manifest_path = &file.manifest_path;
let path: Path = util::strip_prefix(manifest_path).into();
let bytes = Cursor::new(Vec::from(
object_store
.get(&path)
.and_then(|file| file.bytes())
.instrument(tracing::trace_span!("iceberg_rust::get_manifest"))
.await?,
));
Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
}
})
.collect();
let results = try_join_all(futures).await?;
Ok(results.into_iter().flat_map(move |result| {
let (bytes, path, sequence_number) = result;
let reader = ManifestReader::new(bytes).unwrap();
reader.filter_map(move |x| {
let mut x = match x {
Ok(entry) => entry,
Err(_) => return None,
};
let sequence_number = if let Some(sequence_number) = x.sequence_number() {
*sequence_number
} else {
*x.sequence_number_mut() = Some(sequence_number);
sequence_number
};
let filter = match sequence_number_range {
(Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
(Some(start), None) => start < sequence_number,
(None, Some(end)) => sequence_number <= end,
_ => true,
};
if filter {
Some(Ok((path.to_owned(), x)))
} else {
None
}
})
}))
}
pub(crate) async fn delete_all_table_files(
metadata: &TableMetadata,
object_store: Arc<dyn ObjectStore>,
) -> Result<(), Error> {
let Some(snapshot) = metadata.current_snapshot(None)? else {
return Ok(());
};
let manifests: Vec<ManifestListEntry> = read_snapshot(snapshot, metadata, object_store.clone())
.await?
.collect::<Result<_, _>>()?;
let datafiles = datafiles(object_store.clone(), &manifests, None, (None, None)).await?;
let snapshots = &metadata.snapshots;
stream::iter(datafiles)
.try_for_each_concurrent(None, |datafile| {
let object_store = object_store.clone();
async move {
object_store
.delete(&datafile.1.data_file().file_path().as_str().into())
.await?;
Ok(())
}
})
.await?;
stream::iter(manifests.into_iter())
.map(Ok::<_, Error>)
.try_for_each_concurrent(None, |manifest| {
let object_store = object_store.clone();
async move {
object_store.delete(&manifest.manifest_path.into()).await?;
Ok(())
}
})
.await?;
stream::iter(snapshots.values())
.map(Ok::<_, Error>)
.try_for_each_concurrent(None, |snapshot| {
let object_store = object_store.clone();
async move {
object_store
.delete(&snapshot.manifest_list().as_str().into())
.await?;
Ok(())
}
})
.await?;
Ok(())
}