use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::RangeFrom;
use uuid::Uuid;
use crate::error::Result;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot,
SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary,
TableProperties, update_snapshot_summaries,
};
use crate::table::Table;
use crate::transaction::ActionCommit;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
const META_ROOT_PATH: &str = "metadata";
pub(crate) trait SnapshotProduceOperation: Send + Sync {
fn operation(&self) -> Operation;
#[allow(unused)]
fn delete_entries(
&self,
snapshot_produce: &SnapshotProducer,
) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
fn existing_manifest(
&self,
snapshot_produce: &SnapshotProducer<'_>,
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
}
pub(crate) struct DefaultManifestProcess;
impl ManifestProcess for DefaultManifestProcess {
fn process_manifests(
&self,
_snapshot_produce: &SnapshotProducer<'_>,
manifests: Vec<ManifestFile>,
) -> Vec<ManifestFile> {
manifests
}
}
pub(crate) trait ManifestProcess: Send + Sync {
fn process_manifests(
&self,
snapshot_produce: &SnapshotProducer<'_>,
manifests: Vec<ManifestFile>,
) -> Vec<ManifestFile>;
}
pub(crate) struct SnapshotProducer<'a> {
pub(crate) table: &'a Table,
snapshot_id: i64,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
manifest_counter: RangeFrom<u64>,
}
impl<'a> SnapshotProducer<'a> {
pub(crate) fn new(
table: &'a Table,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
) -> Self {
Self {
table,
snapshot_id: Self::generate_unique_snapshot_id(table),
commit_uuid,
key_metadata,
snapshot_properties,
added_data_files,
manifest_counter: (0..),
}
}
pub(crate) fn validate_added_data_files(&self) -> Result<()> {
for data_file in &self.added_data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
));
}
if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Data file partition spec id does not match table default partition spec id",
));
}
Self::validate_partition_value(
data_file.partition(),
self.table.metadata().default_partition_type(),
)?;
}
Ok(())
}
pub(crate) async fn validate_duplicate_files(&self) -> Result<()> {
let new_files: HashSet<&str> = self
.added_data_files
.iter()
.map(|df| df.file_path.as_str())
.collect();
let mut referenced_files = Vec::new();
if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest_list_entry in manifest_list.entries() {
let manifest = manifest_list_entry
.load_manifest(self.table.file_io())
.await?;
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
referenced_files.push(file_path.to_string());
}
}
}
}
if !referenced_files.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add files that are already referenced by table, files: {}",
referenced_files.join(", ")
),
));
}
Ok(())
}
fn generate_unique_snapshot_id(table: &Table) -> i64 {
let generate_random_id = || -> i64 {
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
let snapshot_id = (lhs ^ rhs) as i64;
if snapshot_id < 0 {
-snapshot_id
} else {
snapshot_id
}
};
let mut snapshot_id = generate_random_id();
while table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
{
snapshot_id = generate_random_id();
}
snapshot_id
}
fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
let new_manifest_path = format!(
"{}/{}/{}-m{}.{}",
self.table.metadata().location(),
META_ROOT_PATH,
self.commit_uuid,
self.manifest_counter.next().unwrap(),
DataFileFormat::Avro
);
let output_file = self.table.file_io().new_output(new_manifest_path)?;
let builder = ManifestWriterBuilder::new(
output_file,
Some(self.snapshot_id),
self.key_metadata.clone(),
self.table.metadata().current_schema().clone(),
self.table
.metadata()
.default_partition_spec()
.as_ref()
.clone(),
);
match self.table.metadata().format_version() {
FormatVersion::V1 => Ok(builder.build_v1()),
FormatVersion::V2 => match content {
ManifestContentType::Data => Ok(builder.build_v2_data()),
ManifestContentType::Deletes => Ok(builder.build_v2_deletes()),
},
FormatVersion::V3 => match content {
ManifestContentType::Data => Ok(builder.build_v3_data()),
ManifestContentType::Deletes => Ok(builder.build_v3_deletes()),
},
}
}
fn validate_partition_value(
partition_value: &Struct,
partition_type: &StructType,
) -> Result<()> {
if partition_value.fields().len() != partition_type.fields().len() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatible with partition type",
));
}
for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) {
let field = field.field_type.as_primitive_type().ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Partition field should only be primitive type.",
)
})?;
if let Some(value) = value
&& !field.compatible(&value.as_primitive_literal().unwrap())
{
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatible partition type",
));
}
}
Ok(())
}
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
if added_data_files.is_empty() {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"No added data files found when write an added manifest file",
));
}
let snapshot_id = self.snapshot_id;
let format_version = self.table.metadata().format_version();
let manifest_entries = added_data_files.into_iter().map(|data_file| {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
.data_file(data_file);
if format_version == FormatVersion::V1 {
builder.snapshot_id(snapshot_id).build()
} else {
builder.build()
}
});
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
for entry in manifest_entries {
writer.add_entry(entry)?;
}
writer.write_manifest_file().await
}
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
&mut self,
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"No added data files or added snapshot properties found when write a manifest file",
));
}
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
let mut manifest_files = existing_manifests;
if !self.added_data_files.is_empty() {
let added_manifest = self.write_added_manifest().await?;
manifest_files.push(added_manifest);
}
let manifest_files = manifest_process.process_manifests(self, manifest_files);
Ok(manifest_files)
}
fn summary<OP: SnapshotProduceOperation>(
&self,
snapshot_produce_operation: &OP,
) -> Result<Summary> {
let mut summary_collector = SnapshotSummaryCollector::default();
let table_metadata = self.table.metadata_ref();
let partition_summary_limit = if let Some(limit) = table_metadata
.properties()
.get(TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
{
if let Ok(limit) = limit.parse::<u64>() {
limit
} else {
TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
}
} else {
TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
};
summary_collector.set_partition_summary_limit(partition_summary_limit);
for data_file in &self.added_data_files {
summary_collector.add_file(
data_file,
table_metadata.current_schema().clone(),
table_metadata.default_partition_spec().clone(),
);
}
let previous_snapshot = table_metadata
.snapshot_by_id(self.snapshot_id)
.and_then(|snapshot| snapshot.parent_snapshot_id())
.and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
let mut additional_properties = summary_collector.build();
additional_properties.extend(self.snapshot_properties.clone());
let summary = Summary {
operation: snapshot_produce_operation.operation(),
additional_properties,
};
update_snapshot_summaries(
summary,
previous_snapshot.map(|s| s.summary()),
snapshot_produce_operation.operation() == Operation::Overwrite,
)
}
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
format!(
"{}/{}/snap-{}-{}-{}.{}",
self.table.metadata().location(),
META_ROOT_PATH,
self.snapshot_id,
attempt,
self.commit_uuid,
DataFileFormat::Avro
)
}
pub(crate) async fn commit<OP: SnapshotProduceOperation, MP: ManifestProcess>(
mut self,
snapshot_produce_operation: OP,
process: MP,
) -> Result<ActionCommit> {
let manifest_list_path = self.generate_manifest_list_file_path(0);
let next_seq_num = self.table.metadata().next_sequence_number();
let first_row_id = self.table.metadata().next_row_id();
let mut manifest_list_writer = match self.table.metadata().format_version() {
FormatVersion::V1 => ManifestListWriter::v1(
self.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
self.table.metadata().current_snapshot_id(),
),
FormatVersion::V2 => ManifestListWriter::v2(
self.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
self.table.metadata().current_snapshot_id(),
next_seq_num,
),
FormatVersion::V3 => ManifestListWriter::v3(
self.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
self.table.metadata().current_snapshot_id(),
next_seq_num,
Some(first_row_id),
),
};
let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err)
})?;
let new_manifests = self
.manifest_file(&snapshot_produce_operation, &process)
.await?;
manifest_list_writer.add_manifests(new_manifests.into_iter())?;
let writer_next_row_id = manifest_list_writer.next_row_id();
manifest_list_writer.close().await?;
let commit_ts = chrono::Utc::now().timestamp_millis();
let new_snapshot = Snapshot::builder()
.with_manifest_list(manifest_list_path)
.with_snapshot_id(self.snapshot_id)
.with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
.with_sequence_number(next_seq_num)
.with_summary(summary)
.with_schema_id(self.table.metadata().current_schema_id())
.with_timestamp_ms(commit_ts);
let new_snapshot = if let Some(writer_next_row_id) = writer_next_row_id {
let assigned_rows = writer_next_row_id - self.table.metadata().next_row_id();
new_snapshot
.with_row_range(first_row_id, assigned_rows)
.build()
} else {
new_snapshot.build()
};
let updates = vec![
TableUpdate::AddSnapshot {
snapshot: new_snapshot,
},
TableUpdate::SetSnapshotRef {
ref_name: MAIN_BRANCH.to_string(),
reference: SnapshotReference::new(
self.snapshot_id,
SnapshotRetention::branch(None, None, None),
),
},
];
let requirements = vec![
TableRequirement::UuidMatch {
uuid: self.table.metadata().uuid(),
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: self.table.metadata().current_snapshot_id(),
},
];
Ok(ActionCommit::new(updates, requirements))
}
}