use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use crate::catalog::{Catalog, TableIdentifier};
use crate::manifest::{DataFile, Operation, Snapshot};
use crate::metadata::{MetadataError, TableMetadata};
#[derive(Debug, Error)]
pub enum TransactionError {
#[error(
"commit conflict: base sequence {base_sequence} is stale, current is {current_sequence}"
)]
CommitConflict {
base_sequence: i64,
current_sequence: i64,
},
#[error("max retry attempts ({attempts}) exceeded")]
MaxRetriesExceeded { attempts: u32 },
#[error("transaction has no changes to commit")]
NoChanges,
#[error("metadata error: {0}")]
Metadata(#[from] MetadataError),
#[error("io error: {0}")]
Io(String),
#[error("validation error: {0}")]
Validation(String),
#[error("commit failed: {0}")]
CommitFailed(String),
}
pub type TransactionResult<T> = Result<T, TransactionError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IsolationLevel {
#[default]
SnapshotIsolation,
ReadUncommitted,
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub backoff_multiplier: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
backoff_multiplier: 2.0,
}
}
}
impl RetryConfig {
pub fn no_retry() -> Self {
Self {
max_attempts: 1,
..Default::default()
}
}
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::ZERO;
}
let delay_ms = self.initial_delay.as_millis() as f64
* self.backoff_multiplier.powi(attempt as i32 - 1);
Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f64) as u64)
}
}
#[derive(Debug)]
pub struct Transaction {
table_location: String,
base_metadata: TableMetadata,
base_sequence_number: i64,
added_files: Vec<DataFile>,
deleted_file_paths: HashSet<String>,
operation: Operation,
isolation_level: IsolationLevel,
retry_config: RetryConfig,
new_schema: Option<crate::schema::Schema>,
summary_properties: std::collections::HashMap<String, String>,
}
impl Transaction {
pub fn new(metadata: TableMetadata) -> Self {
let base_sequence = metadata.last_sequence_number;
let location = metadata.location.clone();
Self {
table_location: location,
base_metadata: metadata,
base_sequence_number: base_sequence,
added_files: Vec::new(),
deleted_file_paths: HashSet::new(),
operation: Operation::Append,
isolation_level: IsolationLevel::default(),
retry_config: RetryConfig::default(),
new_schema: None,
summary_properties: std::collections::HashMap::new(),
}
}
pub fn with_operation(mut self, operation: Operation) -> Self {
self.operation = operation;
self
}
pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
self.isolation_level = level;
self
}
pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
self.retry_config = config;
self
}
pub fn add_files(&mut self, files: impl IntoIterator<Item = DataFile>) {
self.added_files.extend(files);
}
pub fn add_file(&mut self, file: DataFile) {
self.added_files.push(file);
}
pub fn delete_files(&mut self, paths: impl IntoIterator<Item = String>) {
self.deleted_file_paths.extend(paths);
}
pub fn delete_file(&mut self, path: impl Into<String>) {
self.deleted_file_paths.insert(path.into());
}
pub fn set_schema(&mut self, schema: crate::schema::Schema) {
self.new_schema = Some(schema);
}
pub fn with_summary_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.summary_properties.insert(key.into(), value.into());
self
}
pub fn has_changes(&self) -> bool {
!self.added_files.is_empty()
|| !self.deleted_file_paths.is_empty()
|| self.new_schema.is_some()
}
pub fn added_files_count(&self) -> usize {
self.added_files.len()
}
pub fn deleted_files_count(&self) -> usize {
self.deleted_file_paths.len()
}
pub fn added_records_count(&self) -> i64 {
self.added_files.iter().map(|f| f.record_count).sum()
}
#[tracing::instrument(skip(self, catalog), fields(operation = ?self.operation))]
pub async fn commit(mut self, catalog: Arc<dyn Catalog>) -> TransactionResult<Snapshot> {
if !self.has_changes() {
return Err(TransactionError::NoChanges);
}
let identifier = TableIdentifier::parse(&self.table_location);
let mut attempts = 0;
loop {
attempts += 1;
let current_metadata = catalog
.load_table(&identifier)
.await
.map_err(|e| TransactionError::Io(e.to_string()))?;
self.detect_conflicts(¤t_metadata)?;
let prepared_commit = self.prepare_commit()?;
match catalog
.commit_table(
&identifier,
prepared_commit.base_sequence,
prepared_commit.new_metadata,
)
.await
{
Ok(committed_metadata) => {
return committed_metadata.current_snapshot().cloned().ok_or(
TransactionError::CommitFailed("No current snapshot found".to_string()),
);
}
Err(e) => {
if attempts >= self.retry_config.max_attempts {
return Err(TransactionError::Io(e.to_string()));
}
tracing::warn!(
"Commit might have failed or conflicted. Retrying (attempt {}/{})",
attempts,
self.retry_config.max_attempts
);
tokio::time::sleep(self.retry_config.delay_for_attempt(attempts)).await;
self.base_metadata = catalog
.load_table(&identifier)
.await
.map_err(|e| TransactionError::Io(e.to_string()))?;
self.base_sequence_number = self.base_metadata.last_sequence_number;
}
}
}
}
fn validate(&self) -> TransactionResult<()> {
if !self.has_changes() {
return Err(TransactionError::NoChanges);
}
if let Some(ref new_schema) = self.new_schema {
let current_schema = self.base_metadata.current_schema();
let validator =
crate::validation::SchemaCompatibilityValidator::new(current_schema.clone());
validator.validate(new_schema).map_err(|e| {
TransactionError::Validation(format!("Schema evolution error: {}", e))
})?;
}
Ok(())
}
#[allow(unused)]
fn detect_conflicts(&self, current_metadata: &TableMetadata) -> TransactionResult<()> {
if current_metadata.last_sequence_number != self.base_sequence_number {
return Err(TransactionError::CommitConflict {
base_sequence: self.base_sequence_number,
current_sequence: current_metadata.last_sequence_number,
});
}
Ok(())
}
fn build_snapshot(&self, sequence_number: i64, manifest_list_path: &str) -> Snapshot {
let added_records: i64 = self.added_files.iter().map(|f| f.record_count).sum();
let added_bytes: i64 = self.added_files.iter().map(|f| f.file_size_in_bytes).sum();
let mut builder = Snapshot::builder(sequence_number, manifest_list_path)
.with_operation(self.operation)
.with_sequence_number(sequence_number)
.with_schema_id(self.base_metadata.current_schema_id)
.with_summary_property("added-data-files", self.added_files.len().to_string())
.with_summary_property("added-records", added_records.to_string())
.with_summary_property("added-files-size", added_bytes.to_string())
.with_summary_property(
"deleted-data-files",
self.deleted_file_paths.len().to_string(),
);
for (key, value) in &self.summary_properties {
builder = builder.with_summary_property(key.clone(), value.clone());
}
if let Some(current_snapshot_id) = self.base_metadata.current_snapshot_id {
builder = builder.with_parent(current_snapshot_id);
}
builder.build()
}
pub fn prepare_commit(&self) -> TransactionResult<PreparedCommit> {
self.validate()?;
let new_sequence = self.base_metadata.last_sequence_number + 1;
let manifest_list_path = format!(
"{}/metadata/snap-{}-manifest-list.avro",
self.table_location, new_sequence
);
let snapshot = self.build_snapshot(new_sequence, &manifest_list_path);
let mut new_metadata = self.base_metadata.clone();
if let Some(ref new_schema) = self.new_schema {
new_metadata.add_schema(new_schema.clone());
new_metadata.current_schema_id = new_schema.schema_id;
}
new_metadata.update_metrics(&self.added_files, &self.deleted_file_paths);
new_metadata.add_snapshot(snapshot);
Ok(PreparedCommit {
base_sequence: self.base_sequence_number,
new_metadata,
added_files: self.added_files.clone(),
deleted_files: self.deleted_file_paths.clone(),
manifest_list_path,
})
}
}
#[derive(Debug, Clone)]
pub struct PreparedCommit {
pub base_sequence: i64,
pub new_metadata: TableMetadata,
pub added_files: Vec<DataFile>,
pub deleted_files: HashSet<String>,
pub manifest_list_path: String,
}
impl PreparedCommit {
pub fn snapshot_id(&self) -> Option<i64> {
self.new_metadata.current_snapshot_id
}
pub fn sequence_number(&self) -> i64 {
self.new_metadata.last_sequence_number
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::manifest::FileFormat;
use crate::schema::{Schema, Type};
fn sample_metadata() -> TableMetadata {
let schema = Schema::builder(0)
.with_field(1, "id", Type::Long, true)
.with_field(2, "data", Type::String, false)
.build();
TableMetadata::builder("s3://bucket/table", schema).build()
}
#[test]
fn test_transaction_creation() {
let metadata = sample_metadata();
let tx = Transaction::new(metadata);
assert!(!tx.has_changes());
assert_eq!(tx.added_files_count(), 0);
assert_eq!(tx.deleted_files_count(), 0);
}
#[test]
fn test_add_files() {
let metadata = sample_metadata();
let mut tx = Transaction::new(metadata);
let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024);
tx.add_file(file);
assert!(tx.has_changes());
assert_eq!(tx.added_files_count(), 1);
assert_eq!(tx.added_records_count(), 1000);
}
#[test]
fn test_delete_files() {
let metadata = sample_metadata();
let mut tx = Transaction::new(metadata);
tx.delete_file("/data/old-file.parquet");
tx.delete_file("/data/another-file.parquet");
assert!(tx.has_changes());
assert_eq!(tx.deleted_files_count(), 2);
}
#[test]
fn test_prepare_commit() {
let metadata = sample_metadata();
let mut tx = Transaction::new(metadata);
let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024);
tx.add_file(file);
let prepared = tx.prepare_commit().unwrap();
assert_eq!(prepared.base_sequence, 0);
assert_eq!(prepared.sequence_number(), 1);
assert_eq!(prepared.added_files.len(), 1);
}
#[test]
fn test_empty_transaction_fails() {
let metadata = sample_metadata();
let tx = Transaction::new(metadata);
let result = tx.prepare_commit();
assert!(matches!(result, Err(TransactionError::NoChanges)));
}
#[test]
fn test_retry_config_delay() {
let config = RetryConfig::default();
assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
assert_eq!(config.delay_for_attempt(1), Duration::from_millis(100));
assert_eq!(config.delay_for_attempt(2), Duration::from_millis(200));
assert_eq!(config.delay_for_attempt(3), Duration::from_millis(400));
}
}