1use std::collections::HashSet;
38use std::sync::Arc;
39use std::time::Duration;
40
41use thiserror::Error;
43
44use crate::catalog::{Catalog, TableIdentifier};
45use crate::manifest::{DataFile, Operation, Snapshot};
46use crate::metadata::{MetadataError, TableMetadata};
47#[derive(Debug, Error)]
49pub enum TransactionError {
50 #[error(
52 "commit conflict: base sequence {base_sequence} is stale, current is {current_sequence}"
53 )]
54 CommitConflict {
55 base_sequence: i64,
56 current_sequence: i64,
57 },
58
59 #[error("max retry attempts ({attempts}) exceeded")]
61 MaxRetriesExceeded { attempts: u32 },
62
63 #[error("transaction has no changes to commit")]
65 NoChanges,
66
67 #[error("metadata error: {0}")]
69 Metadata(#[from] MetadataError),
70
71 #[error("io error: {0}")]
73 Io(String),
74
75 #[error("validation error: {0}")]
77 Validation(String),
78
79 #[error("commit failed: {0}")]
81 CommitFailed(String),
82}
83
84pub type TransactionResult<T> = Result<T, TransactionError>;
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
92pub enum IsolationLevel {
93 #[default]
96 SnapshotIsolation,
97
98 ReadUncommitted,
101}
102
103#[derive(Debug, Clone)]
105pub struct RetryConfig {
106 pub max_attempts: u32,
108
109 pub initial_delay: Duration,
111
112 pub max_delay: Duration,
114
115 pub backoff_multiplier: f64,
117}
118
119impl Default for RetryConfig {
120 fn default() -> Self {
121 Self {
122 max_attempts: 5,
123 initial_delay: Duration::from_millis(100),
124 max_delay: Duration::from_secs(10),
125 backoff_multiplier: 2.0,
126 }
127 }
128}
129
130impl RetryConfig {
131 pub fn no_retry() -> Self {
133 Self {
134 max_attempts: 1,
135 ..Default::default()
136 }
137 }
138
139 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
141 if attempt == 0 {
142 return Duration::ZERO;
143 }
144
145 let delay_ms = self.initial_delay.as_millis() as f64
146 * self.backoff_multiplier.powi(attempt as i32 - 1);
147
148 Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f64) as u64)
149 }
150}
151
152#[derive(Debug)]
163pub struct Transaction {
164 table_location: String,
166
167 base_metadata: TableMetadata,
169
170 base_sequence_number: i64,
172
173 added_files: Vec<DataFile>,
175
176 deleted_file_paths: HashSet<String>,
178
179 operation: Operation,
181
182 isolation_level: IsolationLevel,
184
185 retry_config: RetryConfig,
187
188 new_schema: Option<crate::schema::Schema>,
190
191 summary_properties: std::collections::HashMap<String, String>,
193}
194
195impl Transaction {
196 pub fn new(metadata: TableMetadata) -> Self {
198 let base_sequence = metadata.last_sequence_number;
199 let location = metadata.location.clone();
200
201 Self {
202 table_location: location,
203 base_metadata: metadata,
204 base_sequence_number: base_sequence,
205 added_files: Vec::new(),
206 deleted_file_paths: HashSet::new(),
207 operation: Operation::Append,
208 isolation_level: IsolationLevel::default(),
209 retry_config: RetryConfig::default(),
210 new_schema: None,
211 summary_properties: std::collections::HashMap::new(),
212 }
213 }
214
215 pub fn with_operation(mut self, operation: Operation) -> Self {
217 self.operation = operation;
218 self
219 }
220
221 pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
223 self.isolation_level = level;
224 self
225 }
226
227 pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
229 self.retry_config = config;
230 self
231 }
232
233 pub fn add_files(&mut self, files: impl IntoIterator<Item = DataFile>) {
235 self.added_files.extend(files);
236 }
237
238 pub fn add_file(&mut self, file: DataFile) {
240 self.added_files.push(file);
241 }
242
243 pub fn delete_files(&mut self, paths: impl IntoIterator<Item = String>) {
245 self.deleted_file_paths.extend(paths);
246 }
247
248 pub fn delete_file(&mut self, path: impl Into<String>) {
250 self.deleted_file_paths.insert(path.into());
251 }
252
253 pub fn set_schema(&mut self, schema: crate::schema::Schema) {
255 self.new_schema = Some(schema);
256 }
257
258 pub fn with_summary_property(
260 mut self,
261 key: impl Into<String>,
262 value: impl Into<String>,
263 ) -> Self {
264 self.summary_properties.insert(key.into(), value.into());
265 self
266 }
267
268 pub fn has_changes(&self) -> bool {
270 !self.added_files.is_empty()
271 || !self.deleted_file_paths.is_empty()
272 || self.new_schema.is_some()
273 }
274
275 pub fn added_files_count(&self) -> usize {
277 self.added_files.len()
278 }
279
280 pub fn deleted_files_count(&self) -> usize {
282 self.deleted_file_paths.len()
283 }
284
285 pub fn added_records_count(&self) -> i64 {
287 self.added_files.iter().map(|f| f.record_count).sum()
288 }
289
290 #[tracing::instrument(skip(self, catalog), fields(operation = ?self.operation))]
292 pub async fn commit(mut self, catalog: Arc<dyn Catalog>) -> TransactionResult<Snapshot> {
293 if !self.has_changes() {
294 return Err(TransactionError::NoChanges);
295 }
296
297 let identifier = TableIdentifier::parse(&self.table_location);
298 let mut attempts = 0;
301 loop {
302 attempts += 1;
303
304 let current_metadata = catalog
305 .load_table(&identifier)
306 .await
307 .map_err(|e| TransactionError::Io(e.to_string()))?;
308
309 self.detect_conflicts(¤t_metadata)?;
311
312 let prepared_commit = self.prepare_commit()?;
314
315 match catalog
317 .commit_table(
318 &identifier,
319 prepared_commit.base_sequence,
320 prepared_commit.new_metadata,
321 )
322 .await
323 {
324 Ok(committed_metadata) => {
325 return committed_metadata.current_snapshot().cloned().ok_or(
327 TransactionError::CommitFailed("No current snapshot found".to_string()),
328 );
329 }
330 Err(e) => {
331 if attempts >= self.retry_config.max_attempts {
334 return Err(TransactionError::Io(e.to_string()));
335 }
336
337 tracing::warn!(
338 "Commit might have failed or conflicted. Retrying (attempt {}/{})",
339 attempts,
340 self.retry_config.max_attempts
341 );
342 tokio::time::sleep(self.retry_config.delay_for_attempt(attempts)).await;
343 self.base_metadata = catalog
345 .load_table(&identifier)
346 .await
347 .map_err(|e| TransactionError::Io(e.to_string()))?;
348 self.base_sequence_number = self.base_metadata.last_sequence_number;
349 }
350 }
351 }
352 }
353
354 fn validate(&self) -> TransactionResult<()> {
356 if !self.has_changes() {
357 return Err(TransactionError::NoChanges);
358 }
359
360 if let Some(ref new_schema) = self.new_schema {
362 let current_schema = self.base_metadata.current_schema();
363 let validator =
364 crate::validation::SchemaCompatibilityValidator::new(current_schema.clone());
365 validator.validate(new_schema).map_err(|e| {
366 TransactionError::Validation(format!("Schema evolution error: {}", e))
367 })?;
368 }
369
370 Ok(())
374 }
375
376 #[allow(unused)]
379 fn detect_conflicts(&self, current_metadata: &TableMetadata) -> TransactionResult<()> {
380 if current_metadata.last_sequence_number != self.base_sequence_number {
381 return Err(TransactionError::CommitConflict {
388 base_sequence: self.base_sequence_number,
389 current_sequence: current_metadata.last_sequence_number,
390 });
391 }
392
393 Ok(())
394 }
395
396 fn build_snapshot(&self, sequence_number: i64, manifest_list_path: &str) -> Snapshot {
398 let added_records: i64 = self.added_files.iter().map(|f| f.record_count).sum();
399 let added_bytes: i64 = self.added_files.iter().map(|f| f.file_size_in_bytes).sum();
400
401 let mut builder = Snapshot::builder(sequence_number, manifest_list_path)
402 .with_operation(self.operation)
403 .with_sequence_number(sequence_number)
404 .with_schema_id(self.base_metadata.current_schema_id)
405 .with_summary_property("added-data-files", self.added_files.len().to_string())
406 .with_summary_property("added-records", added_records.to_string())
407 .with_summary_property("added-files-size", added_bytes.to_string())
408 .with_summary_property(
409 "deleted-data-files",
410 self.deleted_file_paths.len().to_string(),
411 );
412
413 for (key, value) in &self.summary_properties {
415 builder = builder.with_summary_property(key.clone(), value.clone());
416 }
417
418 if let Some(current_snapshot_id) = self.base_metadata.current_snapshot_id {
420 builder = builder.with_parent(current_snapshot_id);
421 }
422
423 builder.build()
424 }
425
426 pub fn prepare_commit(&self) -> TransactionResult<PreparedCommit> {
434 self.validate()?;
435
436 let new_sequence = self.base_metadata.last_sequence_number + 1;
437 let manifest_list_path = format!(
438 "{}/metadata/snap-{}-manifest-list.avro",
439 self.table_location, new_sequence
440 );
441
442 let snapshot = self.build_snapshot(new_sequence, &manifest_list_path);
443
444 let mut new_metadata = self.base_metadata.clone();
445
446 if let Some(ref new_schema) = self.new_schema {
448 new_metadata.add_schema(new_schema.clone());
449 new_metadata.current_schema_id = new_schema.schema_id;
450 }
451
452 new_metadata.update_metrics(&self.added_files, &self.deleted_file_paths);
454
455 new_metadata.add_snapshot(snapshot);
457
458 Ok(PreparedCommit {
459 base_sequence: self.base_sequence_number,
460 new_metadata,
461 added_files: self.added_files.clone(),
462 deleted_files: self.deleted_file_paths.clone(),
463 manifest_list_path,
464 })
465 }
466}
467
468#[derive(Debug, Clone)]
470pub struct PreparedCommit {
471 pub base_sequence: i64,
473
474 pub new_metadata: TableMetadata,
476
477 pub added_files: Vec<DataFile>,
479
480 pub deleted_files: HashSet<String>,
482
483 pub manifest_list_path: String,
485}
486
487impl PreparedCommit {
488 pub fn snapshot_id(&self) -> Option<i64> {
490 self.new_metadata.current_snapshot_id
491 }
492
493 pub fn sequence_number(&self) -> i64 {
495 self.new_metadata.last_sequence_number
496 }
497}
498
499#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::manifest::FileFormat;
506 use crate::schema::{Schema, Type};
507
508 fn sample_metadata() -> TableMetadata {
509 let schema = Schema::builder(0)
510 .with_field(1, "id", Type::Long, true)
511 .with_field(2, "data", Type::String, false)
512 .build();
513
514 TableMetadata::builder("s3://bucket/table", schema).build()
515 }
516
517 #[test]
518 fn test_transaction_creation() {
519 let metadata = sample_metadata();
520 let tx = Transaction::new(metadata);
521
522 assert!(!tx.has_changes());
523 assert_eq!(tx.added_files_count(), 0);
524 assert_eq!(tx.deleted_files_count(), 0);
525 }
526
527 #[test]
528 fn test_add_files() {
529 let metadata = sample_metadata();
530 let mut tx = Transaction::new(metadata);
531
532 let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024);
533 tx.add_file(file);
534
535 assert!(tx.has_changes());
536 assert_eq!(tx.added_files_count(), 1);
537 assert_eq!(tx.added_records_count(), 1000);
538 }
539
540 #[test]
541 fn test_delete_files() {
542 let metadata = sample_metadata();
543 let mut tx = Transaction::new(metadata);
544
545 tx.delete_file("/data/old-file.parquet");
546 tx.delete_file("/data/another-file.parquet");
547
548 assert!(tx.has_changes());
549 assert_eq!(tx.deleted_files_count(), 2);
550 }
551
552 #[test]
553 fn test_prepare_commit() {
554 let metadata = sample_metadata();
555 let mut tx = Transaction::new(metadata);
556
557 let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024);
558 tx.add_file(file);
559
560 let prepared = tx.prepare_commit().unwrap();
561
562 assert_eq!(prepared.base_sequence, 0);
563 assert_eq!(prepared.sequence_number(), 1);
564 assert_eq!(prepared.added_files.len(), 1);
565 }
566
567 #[test]
568 fn test_empty_transaction_fails() {
569 let metadata = sample_metadata();
570 let tx = Transaction::new(metadata);
571
572 let result = tx.prepare_commit();
573 assert!(matches!(result, Err(TransactionError::NoChanges)));
574 }
575
576 #[test]
577 fn test_retry_config_delay() {
578 let config = RetryConfig::default();
579
580 assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
581 assert_eq!(config.delay_for_attempt(1), Duration::from_millis(100));
582 assert_eq!(config.delay_for_attempt(2), Duration::from_millis(200));
583 assert_eq!(config.delay_for_attempt(3), Duration::from_millis(400));
584 }
585}