use std::{
collections::{HashMap, HashSet},
future::Future,
io::{Cursor, Read},
iter::{repeat, Map, Repeat, Zip},
sync::Arc,
};
use apache_avro::{
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
};
use futures::{future::join_all, stream, TryFutureExt, TryStreamExt};
use iceberg_rust_spec::{
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
manifest_list::{
avro_value_to_manifest_list_entry, manifest_list_schema_v1, manifest_list_schema_v2,
Content, ManifestListEntry,
},
snapshot::Snapshot,
table_metadata::{FormatVersion, TableMetadata},
util::strip_prefix,
};
use object_store::{ObjectStore, ObjectStoreExt};
use smallvec::SmallVec;
use crate::{
error::Error,
table::datafiles,
util::{summary_to_rectangle, Rectangle, Vec4},
};
use super::{
manifest::{FilteredManifestStats, ManifestReader, ManifestWriter},
transaction::{
append::{
select_manifest_partitioned, select_manifest_unpartitioned, split_datafiles,
SelectedManifest,
},
operation::{
bounding_partition_values, compute_n_splits, new_manifest_list_location,
new_manifest_location, prefetch_manifest,
},
overwrite::{
select_manifest_without_overwrites_partitioned,
select_manifest_without_overwrites_unpartitioned, OverwriteManifest,
},
},
};
type ReaderZip<'a, 'metadata, R> = Zip<AvroReader<'a, R>, Repeat<&'metadata TableMetadata>>;
type ReaderMap<'a, 'metadata, R> = Map<
ReaderZip<'a, 'metadata, R>,
fn((Result<AvroValue, apache_avro::Error>, &TableMetadata)) -> Result<ManifestListEntry, Error>,
>;
pub(crate) struct ManifestListReader<'a, 'metadata, R: Read> {
reader: ReaderMap<'a, 'metadata, R>,
}
impl<R: Read> Iterator for ManifestListReader<'_, '_, R> {
type Item = Result<ManifestListEntry, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.next()
}
}
impl<'metadata, R: Read> ManifestListReader<'_, 'metadata, R> {
pub(crate) fn new(reader: R, table_metadata: &'metadata TableMetadata) -> Result<Self, Error> {
let schema: &AvroSchema = match table_metadata.format_version {
FormatVersion::V1 => manifest_list_schema_v1(),
FormatVersion::V2 => manifest_list_schema_v2(),
};
Ok(Self {
reader: AvroReader::with_schema(schema, reader)?
.zip(repeat(table_metadata))
.map(|(avro_value_res, meta)| {
avro_value_to_manifest_list_entry(avro_value_res, meta).map_err(Error::from)
}),
})
}
}
pub(crate) async fn read_snapshot<'metadata>(
snapshot: &Snapshot,
table_metadata: &'metadata TableMetadata,
object_store: Arc<dyn ObjectStore>,
) -> Result<impl Iterator<Item = Result<ManifestListEntry, Error>> + 'metadata, Error> {
let bytes: Cursor<Vec<u8>> = Cursor::new(
object_store
.get(&strip_prefix(snapshot.manifest_list()).into())
.await?
.bytes()
.await?
.into(),
);
ManifestListReader::new(bytes, table_metadata)
}
pub async fn snapshot_partition_bounds(
snapshot: &Snapshot,
table_metadata: &TableMetadata,
object_store: Arc<dyn ObjectStore>,
) -> Result<Option<Rectangle>, Error> {
let bytes: Cursor<Vec<u8>> = Cursor::new(
object_store
.get(&strip_prefix(snapshot.manifest_list()).into())
.await?
.bytes()
.await?
.into(),
);
ManifestListReader::new(bytes, table_metadata)?.try_fold(None::<Rectangle>, |acc, x| {
if let Some(partitions) = x?.partitions {
let rect = summary_to_rectangle(&partitions)?;
if let Some(mut acc) = acc {
acc.expand(&rect);
Ok(Some(acc))
} else {
Ok(Some(rect))
}
} else {
Ok(acc)
}
})
}
pub async fn snapshot_column_bounds(
snapshot: &Snapshot,
table_metadata: &TableMetadata,
object_store: Arc<dyn ObjectStore>,
) -> Result<Option<Rectangle>, Error> {
let schema = table_metadata
.schema(*snapshot.snapshot_id())
.or(table_metadata.current_schema(None))?;
let manifests = read_snapshot(snapshot, table_metadata, object_store.clone())
.await?
.collect::<Result<Vec<_>, _>>()?;
let datafiles = datafiles(object_store, &manifests, None, (None, None)).await?;
let primitive_field_ids = schema.primitive_field_ids().collect::<Vec<_>>();
let n = primitive_field_ids.len();
stream::iter(datafiles)
.try_fold(None::<Rectangle>, |acc, (_, manifest)| {
let primitive_field_ids = &primitive_field_ids;
async move {
let mut mins = Vec4::with_capacity(n);
let mut maxs = Vec4::with_capacity(n);
for (i, id) in primitive_field_ids.iter().enumerate() {
let min = manifest
.data_file()
.lower_bounds()
.as_ref()
.and_then(|x| x.get(id));
let max = manifest
.data_file()
.upper_bounds()
.as_ref()
.and_then(|x| x.get(id));
let (Some(min), Some(max)) = (min, max) else {
return Err(Error::NotFound("column bounds".to_string()));
};
mins[i] = min.clone();
maxs[i] = max.clone();
}
let rect = Rectangle::new(mins, maxs);
if let Some(mut acc) = acc {
acc.expand(&rect);
Ok(Some(acc))
} else {
Ok(Some(rect))
}
}
})
.await
}
pub(crate) struct ManifestListWriter<'schema, 'metadata> {
table_metadata: &'metadata TableMetadata,
writer: AvroWriter<'schema, Vec<u8>>,
selected_data_manifest: Option<ManifestListEntry>,
selected_delete_manifest: Option<ManifestListEntry>,
bounding_partition_values: Rectangle,
n_existing_files: usize,
commit_uuid: String,
manifest_count: usize,
branch: Option<String>,
}
impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
pub(crate) fn new<'datafiles>(
data_files: impl Iterator<Item = &'datafiles DataFile>,
schema: &'schema AvroSchema,
table_metadata: &'metadata TableMetadata,
branch: Option<&str>,
) -> Result<Self, Error> {
let partition_fields = table_metadata.current_partition_fields(branch)?;
let partition_column_names = partition_fields
.iter()
.map(|x| x.name())
.collect::<SmallVec<[_; 4]>>();
let bounding_partition_values =
bounding_partition_values(data_files, &partition_column_names)?;
let commit_uuid = uuid::Uuid::new_v4().to_string();
let writer = AvroWriter::new(schema, Vec::new());
Ok(Self {
table_metadata,
writer,
selected_data_manifest: None,
selected_delete_manifest: None,
bounding_partition_values,
n_existing_files: 0,
commit_uuid,
manifest_count: 0,
branch: branch.map(ToOwned::to_owned),
})
}
pub(crate) fn from_existing<'datafiles>(
bytes: &[u8],
data_files: impl Iterator<Item = &'datafiles DataFile>,
schema: &'schema AvroSchema,
table_metadata: &'metadata TableMetadata,
branch: Option<&str>,
) -> Result<Self, Error> {
let partition_fields = table_metadata.current_partition_fields(branch)?;
let partition_column_names = partition_fields
.iter()
.map(|x| x.name())
.collect::<SmallVec<[_; 4]>>();
let bounding_partition_values =
bounding_partition_values(data_files, &partition_column_names)?;
let manifest_list_reader = ManifestListReader::new(bytes, table_metadata)?;
let commit_uuid = uuid::Uuid::new_v4().to_string();
let mut writer = AvroWriter::new(schema, Vec::new());
let SelectedManifest {
data_manifest,
delete_manifest,
file_count_all_entries,
} = if partition_column_names.is_empty() {
select_manifest_unpartitioned(manifest_list_reader, &mut writer)?
} else {
select_manifest_partitioned(
manifest_list_reader,
&mut writer,
&bounding_partition_values,
)?
};
Ok(Self {
table_metadata,
writer,
selected_data_manifest: Some(data_manifest),
selected_delete_manifest: delete_manifest,
bounding_partition_values,
n_existing_files: file_count_all_entries,
commit_uuid,
manifest_count: 0,
branch: branch.map(ToOwned::to_owned),
})
}
pub(crate) fn from_existing_without_overwrites<'datafiles>(
bytes: &[u8],
data_files: impl Iterator<Item = &'datafiles DataFile>,
manifests_to_overwrite: &HashSet<String>,
schema: &'schema AvroSchema,
table_metadata: &'metadata TableMetadata,
branch: Option<&str>,
) -> Result<(Self, Vec<ManifestListEntry>), Error> {
let partition_fields = table_metadata.current_partition_fields(branch)?;
let partition_column_names = partition_fields
.iter()
.map(|x| x.name())
.collect::<SmallVec<[_; 4]>>();
let bounding_partition_values =
bounding_partition_values(data_files, &partition_column_names)?;
let manifest_list_reader = ManifestListReader::new(bytes, table_metadata)?;
let commit_uuid = uuid::Uuid::new_v4().to_string();
let mut writer = AvroWriter::new(schema, Vec::new());
let OverwriteManifest {
manifest,
file_count_all_entries,
manifests_to_overwrite: manifests,
} = if partition_column_names.is_empty() {
select_manifest_without_overwrites_unpartitioned(
manifest_list_reader,
&mut writer,
manifests_to_overwrite,
)?
} else {
select_manifest_without_overwrites_partitioned(
manifest_list_reader,
&mut writer,
&bounding_partition_values,
manifests_to_overwrite,
)?
};
Ok((
Self {
table_metadata,
writer,
selected_data_manifest: Some(manifest),
selected_delete_manifest: None,
bounding_partition_values,
n_existing_files: file_count_all_entries,
commit_uuid,
manifest_count: 0,
branch: branch.map(ToOwned::to_owned),
},
manifests,
))
}
pub(crate) fn n_splits(&self, n_data_files: usize, content: Content) -> u32 {
let selected_manifest = match content {
Content::Data => &self.selected_data_manifest,
Content::Deletes => &self.selected_delete_manifest,
};
let selected_manifest_file_count = selected_manifest
.as_ref()
.and_then(|selected_manifest| {
match (
selected_manifest.existing_files_count,
selected_manifest.added_files_count,
) {
(Some(x), Some(y)) => Some(x + y),
(Some(x), None) => Some(x),
(None, Some(y)) => Some(y),
(None, None) => None,
}
})
.unwrap_or(0) as usize;
compute_n_splits(
self.n_existing_files,
n_data_files,
selected_manifest_file_count,
)
}
#[inline]
pub(crate) async fn append(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<(), Error> {
self.append_filtered(
data_files,
snapshot_id,
None::<HashSet<String>>,
object_store,
content,
)
.await
.map(|_| ())
}
#[inline]
pub(crate) async fn append_concurrently(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
self.append_filtered_concurrently(
data_files,
snapshot_id,
None::<HashSet<String>>,
object_store,
content,
)
.await
.map(|(future, _)| future)
}
#[inline]
pub(crate) async fn append_filtered(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
filter: Option<HashSet<String>>,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<Option<FilteredManifestStats>, Error> {
let (future, stats) = self
.append_filtered_concurrently(data_files, snapshot_id, filter, object_store, content)
.await?;
future.await?;
Ok(stats)
}
pub(crate) async fn append_filtered_concurrently(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
filter: Option<HashSet<String>>,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<
(
impl Future<Output = Result<(), Error>>,
Option<FilteredManifestStats>,
),
Error,
> {
let selected_manifest = match content {
Content::Data => self.selected_data_manifest.take(),
Content::Deletes => self.selected_delete_manifest.take(),
};
let selected_manifest_bytes_opt = prefetch_manifest(&selected_manifest, &object_store);
let partition_fields = self
.table_metadata
.current_partition_fields(self.branch.as_deref())?;
let manifest_schema = ManifestEntry::schema(
&partition_value_schema(&partition_fields)?,
&self.table_metadata.format_version,
)?;
let (mut manifest_writer, filtered_stats) =
if let (Some(mut manifest), Some(manifest_bytes)) =
(selected_manifest, selected_manifest_bytes_opt)
{
let manifest_bytes = manifest_bytes.await??;
manifest.manifest_path = self.next_manifest_location();
if let Some(filter) = filter {
let (manifest_writer, filtered_stats) =
ManifestWriter::from_existing_with_filter(
manifest_bytes.as_ref(),
manifest,
&filter,
&manifest_schema,
self.table_metadata,
self.branch.as_deref(),
)?;
(manifest_writer, Some(filtered_stats))
} else {
let manifest_reader = ManifestReader::new(manifest_bytes.as_ref())?;
let manifest_writer = ManifestWriter::from_existing(
manifest_reader,
manifest,
&manifest_schema,
self.table_metadata,
self.branch.as_deref(),
)?;
(manifest_writer, None)
}
} else {
let manifest_location = self.next_manifest_location();
let manifest_writer = ManifestWriter::new(
&manifest_location,
snapshot_id,
&manifest_schema,
self.table_metadata,
content,
self.branch.as_deref(),
)?;
(manifest_writer, None)
};
for manifest_entry in data_files {
manifest_writer.append(manifest_entry?)?;
}
if let Some(filtered_stats) = filtered_stats {
manifest_writer.apply_filtered_stats(&filtered_stats);
}
let (manifest, future) = manifest_writer.finish_concurrently(object_store.clone())?;
self.writer.append_ser(manifest)?;
Ok((future, filtered_stats))
}
pub(crate) async fn append_multiple_concurrently(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
n_splits: u32,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
self.append_multiple_filtered_concurrently(
data_files,
snapshot_id,
n_splits,
None::<HashSet<String>>,
object_store,
content,
)
.await
.map(|(future, _)| future)
}
#[inline]
pub(crate) async fn append_multiple_filtered(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
n_splits: u32,
filter: Option<HashSet<String>>,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<Option<FilteredManifestStats>, Error> {
let (future, stats) = self
.append_multiple_filtered_concurrently(
data_files,
snapshot_id,
n_splits,
filter,
object_store,
content,
)
.await?;
future.await?;
Ok(stats)
}
pub(crate) async fn append_multiple_filtered_concurrently(
&mut self,
data_files: impl Iterator<Item = Result<ManifestEntry, Error>>,
snapshot_id: i64,
n_splits: u32,
filter: Option<HashSet<String>>,
object_store: Arc<dyn ObjectStore>,
content: Content,
) -> Result<
(
impl Future<Output = Result<(), Error>>,
Option<FilteredManifestStats>,
),
Error,
> {
let mut removed_stats = if filter.is_some() {
Some(FilteredManifestStats::default())
} else {
None
};
let partition_fields = self
.table_metadata
.current_partition_fields(self.branch.as_deref())?;
let partition_column_names = partition_fields
.iter()
.map(|x| x.name())
.collect::<SmallVec<[_; 4]>>();
let manifest_schema = ManifestEntry::schema(
&partition_value_schema(&partition_fields)?,
&self.table_metadata.format_version,
)?;
let selected_manifest = match content {
Content::Data => self.selected_data_manifest.take(),
Content::Deletes => self.selected_delete_manifest.take(),
};
let bounds = selected_manifest
.as_ref()
.and_then(|x| x.partitions.as_deref())
.map(summary_to_rectangle)
.transpose()?
.map(|mut x| {
x.expand(&self.bounding_partition_values);
x
})
.unwrap_or(self.bounding_partition_values.clone());
let selected_manifest_bytes_opt = prefetch_manifest(&selected_manifest, &object_store);
let splits = if let (Some(manifest), Some(manifest_bytes)) =
(selected_manifest, selected_manifest_bytes_opt)
{
let manifest_bytes = manifest_bytes.await??;
let manifest_reader = ManifestReader::new(&*manifest_bytes)?.filter_map(|entry| {
let mut entry = match entry {
Ok(entry) => entry,
Err(err) => return Some(Err(err)),
};
if let (Some(files_to_filter), Some(removed_stats)) =
(filter.as_ref(), &mut removed_stats)
{
if files_to_filter.contains(entry.data_file().file_path()) {
if *entry.data_file().content()
== iceberg_rust_spec::manifest::Content::Data
{
removed_stats.removed_records += entry.data_file().record_count();
}
removed_stats.removed_file_size_bytes +=
entry.data_file().file_size_in_bytes();
removed_stats.removed_data_files += 1;
return None;
}
}
*entry.status_mut() = Status::Existing;
if entry.sequence_number().is_none() {
*entry.sequence_number_mut() = Some(manifest.sequence_number);
}
if entry.snapshot_id().is_none() {
*entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
}
Some(Ok(entry))
});
split_datafiles(
data_files.chain(manifest_reader),
bounds,
&partition_column_names,
n_splits,
)?
} else {
split_datafiles(data_files, bounds, &partition_column_names, n_splits)?
};
let (manifests, manifest_futures) = splits
.into_iter()
.map(|entries| {
let manifest_location = self.next_manifest_location();
let mut manifest_writer = ManifestWriter::new(
&manifest_location,
snapshot_id,
&manifest_schema,
self.table_metadata,
content,
self.branch.as_deref(),
)?;
for manifest_entry in entries {
manifest_writer.append(manifest_entry)?;
}
manifest_writer.finish_concurrently(object_store.clone())
})
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
for manifest in manifests {
self.writer.append_ser(manifest)?;
}
let future = futures::future::try_join_all(manifest_futures).map_ok(|_| ());
Ok((future, removed_stats))
}
pub(crate) async fn finish(
mut self,
snapshot_id: i64,
object_store: Arc<dyn ObjectStore>,
) -> Result<String, Error> {
if let Some(selected_data_manifest) = self.selected_data_manifest.take() {
self.writer.append_ser(selected_data_manifest)?;
}
if let Some(selected_delete_manifest) = self.selected_delete_manifest.take() {
self.writer.append_ser(selected_delete_manifest)?;
}
let new_manifest_list_location = new_manifest_list_location(
&self.table_metadata.location,
snapshot_id,
0,
&self.commit_uuid,
);
let manifest_list_bytes = self.writer.into_inner()?;
object_store
.put(
&strip_prefix(&new_manifest_list_location).into(),
manifest_list_bytes.into(),
)
.await?;
Ok(new_manifest_list_location)
}
pub(crate) async fn append_and_filter(
&mut self,
manifests_to_overwrite: Vec<ManifestListEntry>,
data_files_to_filter: &HashMap<String, Vec<String>>,
object_store: Arc<dyn ObjectStore>,
) -> Result<FilteredManifestStats, Error> {
let partition_fields = self
.table_metadata
.current_partition_fields(self.branch.as_deref())?;
let manifest_schema = Arc::new(ManifestEntry::schema(
&partition_value_schema(&partition_fields)?,
&self.table_metadata.format_version,
)?);
let futures = manifests_to_overwrite.into_iter().map(|mut manifest| {
let object_store = object_store.clone();
let manifest_schema = manifest_schema.clone();
let branch = self.branch.clone();
let manifest_location = self.next_manifest_location();
let table_metadata = self.table_metadata;
async move {
let data_files_to_filter: HashSet<String> = data_files_to_filter
.get(&manifest.manifest_path)
.ok_or(Error::NotFound("Datafiles for manifest".to_owned()))?
.iter()
.map(ToOwned::to_owned)
.collect();
let bytes = object_store
.clone()
.get(&strip_prefix(&manifest.manifest_path).into())
.await?
.bytes()
.await?;
manifest.manifest_path = manifest_location;
let (mut manifest_writer, filtered_stats) =
ManifestWriter::from_existing_with_filter(
&bytes,
manifest,
&data_files_to_filter,
&manifest_schema,
table_metadata,
branch.as_deref(),
)?;
manifest_writer.apply_filtered_stats(&filtered_stats);
let new_manifest = manifest_writer.finish(object_store.clone()).await?;
Ok::<_, Error>((new_manifest, filtered_stats))
}
});
let mut removed_stats = FilteredManifestStats::default();
for manifest_res in join_all(futures).await {
let (manifest, filtered_stats) = manifest_res?;
removed_stats.removed_data_files += filtered_stats.removed_data_files;
removed_stats.removed_records += filtered_stats.removed_records;
removed_stats.removed_file_size_bytes += filtered_stats.removed_file_size_bytes;
if manifest.added_files_count.unwrap_or(0) > 0
|| manifest.existing_files_count.unwrap_or(0) > 0
{
self.writer.append_ser(manifest)?;
}
}
Ok(removed_stats)
}
pub(crate) fn selected_data_manifest(&self) -> Option<&ManifestListEntry> {
self.selected_data_manifest.as_ref()
}
fn next_manifest_location(&mut self) -> String {
let next_id = self.manifest_count;
self.manifest_count += 1;
new_manifest_location(&self.table_metadata.location, &self.commit_uuid, next_id)
}
}