use iceberg_rust_spec::{
manifest::Content, manifest::DataFile, manifest::ManifestEntry,
manifest_list::ManifestListEntry,
};
use smallvec::SmallVec;
use std::cmp::Ordering;
use std::collections::HashMap;
use crate::{
error::Error,
table::manifest_list::ManifestListReader,
util::{cmp_with_priority, partition_struct_to_vec, summary_to_rectangle, try_sub, Rectangle},
};
#[allow(clippy::type_complexity)]
fn split_datafiles_once(
files: impl Iterator<Item = Result<ManifestEntry, Error>>,
rect: Rectangle,
names: &[&str],
) -> Result<[(Vec<ManifestEntry>, Rectangle); 2], Error> {
if let Ordering::Equal = cmp_with_priority(&rect.min, &rect.max)? {
let mut smaller = files.collect::<Result<Vec<_>, Error>>()?;
let larger = smaller.split_off(smaller.len() / 2);
return Ok([
(smaller, Rectangle::new(SmallVec::new(), SmallVec::new())),
(larger, Rectangle::new(SmallVec::new(), SmallVec::new())),
]);
}
let mut smaller = Vec::new();
let mut larger = Vec::new();
let mut smaller_rect = None;
let mut larger_rect = None;
for manifest_entry in files {
let manifest_entry = manifest_entry?;
let position = partition_struct_to_vec(manifest_entry.data_file().partition(), names)?;
if let Ordering::Greater = cmp_with_priority(
&try_sub(&position, &rect.min)?,
&try_sub(&rect.max, &position)?,
)? {
larger.push(manifest_entry);
if larger_rect.is_none() {
larger_rect = Some(Rectangle::new(position.clone(), position));
} else if let Some(larger_rect) = larger_rect.as_mut() {
larger_rect.expand_with_node(position);
}
} else {
smaller.push(manifest_entry);
if smaller_rect.is_none() {
smaller_rect = Some(Rectangle::new(position.clone(), position));
} else if let Some(smaller_rect) = smaller_rect.as_mut() {
smaller_rect.expand_with_node(position);
}
}
}
Ok([
(
smaller,
smaller_rect.expect("No files selected for the smaller rectangle"),
),
(
larger,
larger_rect.expect("No files selected for the smaller rectangle"),
),
])
}
pub(crate) fn split_datafiles(
files: impl Iterator<Item = Result<ManifestEntry, Error>>,
rect: Rectangle,
names: &[&str],
n_split: u32,
) -> Result<Vec<Vec<ManifestEntry>>, Error> {
let [(smaller, smaller_rect), (larger, larger_rect)] =
split_datafiles_once(files, rect, names)?;
if n_split == 1 {
let mut result = Vec::new();
if !smaller.is_empty() {
result.push(smaller);
}
if !larger.is_empty() {
result.push(larger);
}
Ok(result)
} else {
let mut smaller = split_datafiles(
smaller.into_iter().map(Ok),
smaller_rect,
names,
n_split - 1,
)?;
let mut larger =
split_datafiles(larger.into_iter().map(Ok), larger_rect, names, n_split - 1)?;
smaller.append(&mut larger);
Ok(smaller)
}
}
pub(crate) struct SelectedManifest {
pub data_manifest: ManifestListEntry,
pub delete_manifest: Option<ManifestListEntry>,
pub file_count_all_entries: usize,
}
pub(crate) fn select_manifest_partitioned(
manifest_list_reader: ManifestListReader<&[u8]>,
manifest_list_writer: &mut apache_avro::Writer<Vec<u8>>,
bounding_partition_values: &Rectangle,
) -> Result<SelectedManifest, Error> {
let mut selected_data_state = None;
let mut selected_delete_state = None;
let mut file_count_all_entries = 0;
for manifest_res in manifest_list_reader {
let manifest = manifest_res?;
let mut bounds =
summary_to_rectangle(manifest.partitions.as_ref().ok_or(Error::NotFound(format!(
"Partition struct in manifest {}",
manifest.manifest_path
)))?)?;
bounds.expand(bounding_partition_values);
file_count_all_entries += manifest.added_files_count.unwrap_or(0) as usize;
match manifest.content {
iceberg_rust_spec::manifest_list::Content::Data => {
let Some((selected_bounds, selected_manifest)) = &selected_data_state else {
selected_data_state = Some((bounds, manifest));
continue;
};
match selected_bounds.cmp_with_priority(&bounds)? {
Ordering::Greater => {
manifest_list_writer.append_ser(selected_manifest)?;
selected_data_state = Some((bounds, manifest));
continue;
}
_ => {
manifest_list_writer.append_ser(manifest)?;
continue;
}
}
}
iceberg_rust_spec::manifest_list::Content::Deletes => {
let Some((selected_bounds, selected_manifest)) = &selected_delete_state else {
selected_delete_state = Some((bounds, manifest));
continue;
};
match selected_bounds.cmp_with_priority(&bounds)? {
Ordering::Greater => {
manifest_list_writer.append_ser(selected_manifest)?;
selected_delete_state = Some((bounds, manifest));
continue;
}
_ => {
manifest_list_writer.append_ser(manifest)?;
continue;
}
}
}
}
}
let (_, data_manifest) =
selected_data_state.ok_or(Error::NotFound("Manifest for insert".to_owned()))?;
Ok(SelectedManifest {
data_manifest,
delete_manifest: selected_delete_state.map(|(_, x)| x),
file_count_all_entries,
})
}
pub(crate) fn select_manifest_unpartitioned(
manifest_list_reader: ManifestListReader<&[u8]>,
manifest_list_writer: &mut apache_avro::Writer<Vec<u8>>,
) -> Result<SelectedManifest, Error> {
let mut selected_data_state = None;
let mut selected_delete_state = None;
let mut file_count_all_entries = 0;
for manifest_res in manifest_list_reader {
let manifest = manifest_res?;
let row_count = manifest.added_rows_count;
file_count_all_entries += manifest.added_files_count.unwrap_or(0) as usize;
match manifest.content {
iceberg_rust_spec::manifest_list::Content::Data => {
let Some((selected_row_count, selected_manifest)) = &selected_data_state else {
selected_data_state = Some((row_count, manifest));
continue;
};
let Some(row_count) = row_count else {
selected_data_state = Some((row_count, manifest));
continue;
};
if selected_row_count.is_some_and(|x| x > row_count) {
manifest_list_writer.append_ser(selected_manifest)?;
selected_data_state = Some((Some(row_count), manifest));
continue;
} else {
manifest_list_writer.append_ser(manifest)?;
continue;
}
}
iceberg_rust_spec::manifest_list::Content::Deletes => {
let Some((selected_row_count, selected_manifest)) = &selected_delete_state else {
selected_delete_state = Some((row_count, manifest));
continue;
};
let Some(row_count) = row_count else {
selected_delete_state = Some((row_count, manifest));
continue;
};
if selected_row_count.is_some_and(|x| x > row_count) {
manifest_list_writer.append_ser(selected_manifest)?;
selected_delete_state = Some((Some(row_count), manifest));
continue;
} else {
manifest_list_writer.append_ser(manifest)?;
continue;
}
}
}
}
let (_, data_manifest) =
selected_data_state.ok_or(Error::NotFound("Manifest for insert".to_owned()))?;
Ok(SelectedManifest {
data_manifest,
delete_manifest: selected_delete_state.map(|(_, x)| x),
file_count_all_entries,
})
}
pub(crate) fn append_summary(files: &[DataFile]) -> Option<HashMap<String, String>> {
if files.is_empty() {
return None;
}
let (mut added_data_files, mut added_records, mut added_files_size) = (0usize, 0i64, 0i64);
for file in files.iter().filter(|f| *f.content() == Content::Data) {
added_data_files += 1;
added_records += file.record_count();
added_files_size += file.file_size_in_bytes();
}
Some(HashMap::from([
("added-files-size".into(), added_files_size.to_string()),
("added-records".into(), added_records.to_string()),
("added-data-files".into(), added_data_files.to_string()),
]))
}