Skip to main content

supertable_core/
transaction.rs

1//! # SuperTable Transactions
2//!
3//! This module implements ACID transactions with optimistic concurrency control (OCC).
4//! Transactions provide atomicity and isolation for table operations, enabling
5//! multiple writers to work concurrently without conflicts.
6//!
7//! ## Optimistic Concurrency Control
8//!
9//! SuperTable uses OCC rather than pessimistic locking, which is better suited
10//! for data lake workloads where:
11//! - Reads vastly outnumber writes
12//! - Writes typically occur in batches (not high frequency)
13//! - Lock contention would be impractical across distributed systems
14//!
15//! ## Conflict Detection
16//!
17//! Conflicts are detected at commit time by comparing the transaction's base
18//! sequence number with the current table state. If another writer committed
19//! changes that overlap with this transaction's write set, the commit fails.
20//!
21//! ## Retry Strategy
22//!
23//! When a conflict is detected, the transaction can optionally retry with
24//! configurable exponential backoff. The transaction re-reads the current
25//! state, re-applies changes, and attempts to commit again.
26//!
27//! # Example
28//!
29//! ```rust,ignore
30//! use supercore::transaction::Transaction;
31//!
32//! let tx = Transaction::new(table_metadata);
33//! tx.add_data_files(vec![data_file]);
34//! let result = tx.commit(&catalog).await?;
35//! ```
36
37use std::collections::HashSet;
38use std::sync::Arc;
39use std::time::Duration;
40
41// use async_trait::async_trait;
42use thiserror::Error;
43
44use crate::catalog::{Catalog, TableIdentifier};
45use crate::manifest::{DataFile, Operation, Snapshot};
46use crate::metadata::{MetadataError, TableMetadata};
47/// Errors that can occur during transaction operations.
48#[derive(Debug, Error)]
49pub enum TransactionError {
50    /// A conflict occurred - another transaction committed first.
51    #[error(
52        "commit conflict: base sequence {base_sequence} is stale, current is {current_sequence}"
53    )]
54    CommitConflict {
55        base_sequence: i64,
56        current_sequence: i64,
57    },
58
59    /// Maximum retry attempts exceeded.
60    #[error("max retry attempts ({attempts}) exceeded")]
61    MaxRetriesExceeded { attempts: u32 },
62
63    /// The transaction was read-only (no changes to commit).
64    #[error("transaction has no changes to commit")]
65    NoChanges,
66
67    /// A metadata error occurred.
68    #[error("metadata error: {0}")]
69    Metadata(#[from] MetadataError),
70
71    /// An I/O error occurred.
72    #[error("io error: {0}")]
73    Io(String),
74
75    /// A validation error occurred.
76    #[error("validation error: {0}")]
77    Validation(String),
78
79    /// A generic commit failure.
80    #[error("commit failed: {0}")]
81    CommitFailed(String),
82}
83
84/// Result type for transaction operations.
85pub type TransactionResult<T> = Result<T, TransactionError>;
86
87/// Isolation level for transactions.
88///
89/// SuperTable currently supports snapshot isolation, which provides
90/// a consistent view of the table as of the transaction start time.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
92pub enum IsolationLevel {
93    /// Each read sees the most recent committed data.
94    /// This is the default and provides the best consistency.
95    #[default]
96    SnapshotIsolation,
97
98    /// Reads can see uncommitted data from other transactions.
99    /// Faster but less consistent. Use with caution.
100    ReadUncommitted,
101}
102
103/// Configuration for transaction retry behavior.
104#[derive(Debug, Clone)]
105pub struct RetryConfig {
106    /// Maximum number of retry attempts.
107    pub max_attempts: u32,
108
109    /// Initial delay between retries.
110    pub initial_delay: Duration,
111
112    /// Maximum delay between retries.
113    pub max_delay: Duration,
114
115    /// Multiplier for exponential backoff.
116    pub backoff_multiplier: f64,
117}
118
119impl Default for RetryConfig {
120    fn default() -> Self {
121        Self {
122            max_attempts: 5,
123            initial_delay: Duration::from_millis(100),
124            max_delay: Duration::from_secs(10),
125            backoff_multiplier: 2.0,
126        }
127    }
128}
129
130impl RetryConfig {
131    /// Creates a new config with no retries.
132    pub fn no_retry() -> Self {
133        Self {
134            max_attempts: 1,
135            ..Default::default()
136        }
137    }
138
139    /// Calculates the delay for a given attempt number.
140    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
141        if attempt == 0 {
142            return Duration::ZERO;
143        }
144
145        let delay_ms = self.initial_delay.as_millis() as f64
146            * self.backoff_multiplier.powi(attempt as i32 - 1);
147
148        Duration::from_millis(delay_ms.min(self.max_delay.as_millis() as f64) as u64)
149    }
150}
151
152/// A transaction represents a unit of work against a table.
153///
154/// Transactions track:
155/// - The base state (snapshot) at transaction start
156/// - Files to be added
157/// - Files to be deleted
158/// - Schema changes (if any)
159///
160/// At commit time, the transaction validates that no conflicting changes
161/// have been made and atomically updates the table metadata.
162#[derive(Debug)]
163pub struct Transaction {
164    /// The table location.
165    table_location: String,
166
167    /// Base metadata at transaction start.
168    base_metadata: TableMetadata,
169
170    /// Sequence number at transaction start.
171    base_sequence_number: i64,
172
173    /// Files to be added.
174    added_files: Vec<DataFile>,
175
176    /// Paths of files to be deleted.
177    deleted_file_paths: HashSet<String>,
178
179    /// The operation type for this transaction.
180    operation: Operation,
181
182    /// Transaction isolation level.
183    isolation_level: IsolationLevel,
184
185    /// Retry configuration.
186    retry_config: RetryConfig,
187
188    /// Optional new schema to apply.
189    new_schema: Option<crate::schema::Schema>,
190
191    /// Custom summary properties.
192    summary_properties: std::collections::HashMap<String, String>,
193}
194
195impl Transaction {
196    /// Creates a new transaction based on the current table state.
197    pub fn new(metadata: TableMetadata) -> Self {
198        let base_sequence = metadata.last_sequence_number;
199        let location = metadata.location.clone();
200
201        Self {
202            table_location: location,
203            base_metadata: metadata,
204            base_sequence_number: base_sequence,
205            added_files: Vec::new(),
206            deleted_file_paths: HashSet::new(),
207            operation: Operation::Append,
208            isolation_level: IsolationLevel::default(),
209            retry_config: RetryConfig::default(),
210            new_schema: None,
211            summary_properties: std::collections::HashMap::new(),
212        }
213    }
214
215    /// Sets the operation type.
216    pub fn with_operation(mut self, operation: Operation) -> Self {
217        self.operation = operation;
218        self
219    }
220
221    /// Sets the isolation level.
222    pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
223        self.isolation_level = level;
224        self
225    }
226
227    /// Sets the retry configuration.
228    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
229        self.retry_config = config;
230        self
231    }
232
233    /// Adds data files to be committed.
234    pub fn add_files(&mut self, files: impl IntoIterator<Item = DataFile>) {
235        self.added_files.extend(files);
236    }
237
238    /// Adds a single data file to be committed.
239    pub fn add_file(&mut self, file: DataFile) {
240        self.added_files.push(file);
241    }
242
243    /// Marks files for deletion.
244    pub fn delete_files(&mut self, paths: impl IntoIterator<Item = String>) {
245        self.deleted_file_paths.extend(paths);
246    }
247
248    /// Marks a single file for deletion.
249    pub fn delete_file(&mut self, path: impl Into<String>) {
250        self.deleted_file_paths.insert(path.into());
251    }
252
253    /// Sets a new schema to be applied.
254    pub fn set_schema(&mut self, schema: crate::schema::Schema) {
255        self.new_schema = Some(schema);
256    }
257
258    /// Adds a custom summary property.
259    pub fn with_summary_property(
260        mut self,
261        key: impl Into<String>,
262        value: impl Into<String>,
263    ) -> Self {
264        self.summary_properties.insert(key.into(), value.into());
265        self
266    }
267
268    /// Returns true if this transaction has changes to commit.
269    pub fn has_changes(&self) -> bool {
270        !self.added_files.is_empty()
271            || !self.deleted_file_paths.is_empty()
272            || self.new_schema.is_some()
273    }
274
275    /// Returns the number of files to be added.
276    pub fn added_files_count(&self) -> usize {
277        self.added_files.len()
278    }
279
280    /// Returns the number of files to be deleted.
281    pub fn deleted_files_count(&self) -> usize {
282        self.deleted_file_paths.len()
283    }
284
285    /// Returns the total records to be added.
286    pub fn added_records_count(&self) -> i64 {
287        self.added_files.iter().map(|f| f.record_count).sum()
288    }
289
290    /// Commits the transaction to the catalog.
291    #[tracing::instrument(skip(self, catalog), fields(operation = ?self.operation))]
292    pub async fn commit(mut self, catalog: Arc<dyn Catalog>) -> TransactionResult<Snapshot> {
293        if !self.has_changes() {
294            return Err(TransactionError::NoChanges);
295        }
296
297        let identifier = TableIdentifier::parse(&self.table_location);
298        // Error handling simplified here - parse should probably return a Result
299
300        let mut attempts = 0;
301        loop {
302            attempts += 1;
303
304            let current_metadata = catalog
305                .load_table(&identifier)
306                .await
307                .map_err(|e| TransactionError::Io(e.to_string()))?;
308
309            // Detect conflicts
310            self.detect_conflicts(&current_metadata)?;
311
312            // Prepare the commit
313            let prepared_commit = self.prepare_commit()?;
314
315            // Attempt to commit
316            match catalog
317                .commit_table(
318                    &identifier,
319                    prepared_commit.base_sequence,
320                    prepared_commit.new_metadata,
321                )
322                .await
323            {
324                Ok(committed_metadata) => {
325                    // Commit successful, return the new snapshot
326                    return committed_metadata.current_snapshot().cloned().ok_or(
327                        TransactionError::CommitFailed("No current snapshot found".to_string()),
328                    );
329                }
330                Err(e) => {
331                    // In a real catalog, we'd check if e is a conflict.
332                    // For now we assume if it fails it might be a conflict if it's not the first attempt.
333                    if attempts >= self.retry_config.max_attempts {
334                        return Err(TransactionError::Io(e.to_string()));
335                    }
336
337                    tracing::warn!(
338                        "Commit might have failed or conflicted. Retrying (attempt {}/{})",
339                        attempts,
340                        self.retry_config.max_attempts
341                    );
342                    tokio::time::sleep(self.retry_config.delay_for_attempt(attempts)).await;
343                    // Reload base metadata for next attempt
344                    self.base_metadata = catalog
345                        .load_table(&identifier)
346                        .await
347                        .map_err(|e| TransactionError::Io(e.to_string()))?;
348                    self.base_sequence_number = self.base_metadata.last_sequence_number;
349                }
350            }
351        }
352    }
353
354    /// Validates the transaction state before commit.
355    fn validate(&self) -> TransactionResult<()> {
356        if !self.has_changes() {
357            return Err(TransactionError::NoChanges);
358        }
359
360        // Validate schema compatibility if changing schema
361        if let Some(ref new_schema) = self.new_schema {
362            let current_schema = self.base_metadata.current_schema();
363            let validator =
364                crate::validation::SchemaCompatibilityValidator::new(current_schema.clone());
365            validator.validate(new_schema).map_err(|e| {
366                TransactionError::Validation(format!("Schema evolution error: {}", e))
367            })?;
368        }
369
370        // Validates that deleted files exist in the current snapshot
371        // (This would require loading manifest files - simplified here)
372
373        Ok(())
374    }
375
376    /// Detects conflicts between this transaction and updates that occurred
377    /// since the transaction started.
378    #[allow(unused)]
379    fn detect_conflicts(&self, current_metadata: &TableMetadata) -> TransactionResult<()> {
380        if current_metadata.last_sequence_number != self.base_sequence_number {
381            // For now, we use a simple strategy: any concurrent modification is a conflict.
382            // A more sophisticated approach would check:
383            // 1. Partition overlap for append operations
384            // 2. File overlap for delete operations
385            // 3. Schema compatibility for schema changes
386
387            return Err(TransactionError::CommitConflict {
388                base_sequence: self.base_sequence_number,
389                current_sequence: current_metadata.last_sequence_number,
390            });
391        }
392
393        Ok(())
394    }
395
396    /// Builds the snapshot for this transaction.
397    fn build_snapshot(&self, sequence_number: i64, manifest_list_path: &str) -> Snapshot {
398        let added_records: i64 = self.added_files.iter().map(|f| f.record_count).sum();
399        let added_bytes: i64 = self.added_files.iter().map(|f| f.file_size_in_bytes).sum();
400
401        let mut builder = Snapshot::builder(sequence_number, manifest_list_path)
402            .with_operation(self.operation)
403            .with_sequence_number(sequence_number)
404            .with_schema_id(self.base_metadata.current_schema_id)
405            .with_summary_property("added-data-files", self.added_files.len().to_string())
406            .with_summary_property("added-records", added_records.to_string())
407            .with_summary_property("added-files-size", added_bytes.to_string())
408            .with_summary_property(
409                "deleted-data-files",
410                self.deleted_file_paths.len().to_string(),
411            );
412
413        // Add custom summary properties
414        for (key, value) in &self.summary_properties {
415            builder = builder.with_summary_property(key.clone(), value.clone());
416        }
417
418        // Set parent snapshot if exists
419        if let Some(current_snapshot_id) = self.base_metadata.current_snapshot_id {
420            builder = builder.with_parent(current_snapshot_id);
421        }
422
423        builder.build()
424    }
425
426    /// Prepares the transaction for commit.
427    ///
428    /// This method:
429    /// 1. Validates the transaction
430    /// 2. Builds manifest files for new data
431    /// 3. Creates the new snapshot
432    /// 4. Returns the updated metadata ready for atomic commit
433    pub fn prepare_commit(&self) -> TransactionResult<PreparedCommit> {
434        self.validate()?;
435
436        let new_sequence = self.base_metadata.last_sequence_number + 1;
437        let manifest_list_path = format!(
438            "{}/metadata/snap-{}-manifest-list.avro",
439            self.table_location, new_sequence
440        );
441
442        let snapshot = self.build_snapshot(new_sequence, &manifest_list_path);
443
444        let mut new_metadata = self.base_metadata.clone();
445
446        // Apply schema changes if any
447        if let Some(ref new_schema) = self.new_schema {
448            new_metadata.add_schema(new_schema.clone());
449            new_metadata.current_schema_id = new_schema.schema_id;
450        }
451
452        // Update table metrics
453        new_metadata.update_metrics(&self.added_files, &self.deleted_file_paths);
454
455        // Add the new snapshot
456        new_metadata.add_snapshot(snapshot);
457
458        Ok(PreparedCommit {
459            base_sequence: self.base_sequence_number,
460            new_metadata,
461            added_files: self.added_files.clone(),
462            deleted_files: self.deleted_file_paths.clone(),
463            manifest_list_path,
464        })
465    }
466}
467
468/// A prepared commit ready for atomic application.
469#[derive(Debug, Clone)]
470pub struct PreparedCommit {
471    /// The base sequence number this commit is based on.
472    pub base_sequence: i64,
473
474    /// The new metadata to be written.
475    pub new_metadata: TableMetadata,
476
477    /// Files added in this commit.
478    pub added_files: Vec<DataFile>,
479
480    /// Files deleted in this commit.
481    pub deleted_files: HashSet<String>,
482
483    /// Path where the manifest list should be written.
484    pub manifest_list_path: String,
485}
486
487impl PreparedCommit {
488    /// Returns the new snapshot ID.
489    pub fn snapshot_id(&self) -> Option<i64> {
490        self.new_metadata.current_snapshot_id
491    }
492
493    /// Returns the new sequence number.
494    pub fn sequence_number(&self) -> i64 {
495        self.new_metadata.last_sequence_number
496    }
497}
498
499// Trait for catalog implementations that support atomic commits.
500// (Catalog trait already provides this)
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::manifest::FileFormat;
506    use crate::schema::{Schema, Type};
507
508    fn sample_metadata() -> TableMetadata {
509        let schema = Schema::builder(0)
510            .with_field(1, "id", Type::Long, true)
511            .with_field(2, "data", Type::String, false)
512            .build();
513
514        TableMetadata::builder("s3://bucket/table", schema).build()
515    }
516
517    #[test]
518    fn test_transaction_creation() {
519        let metadata = sample_metadata();
520        let tx = Transaction::new(metadata);
521
522        assert!(!tx.has_changes());
523        assert_eq!(tx.added_files_count(), 0);
524        assert_eq!(tx.deleted_files_count(), 0);
525    }
526
527    #[test]
528    fn test_add_files() {
529        let metadata = sample_metadata();
530        let mut tx = Transaction::new(metadata);
531
532        let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024);
533        tx.add_file(file);
534
535        assert!(tx.has_changes());
536        assert_eq!(tx.added_files_count(), 1);
537        assert_eq!(tx.added_records_count(), 1000);
538    }
539
540    #[test]
541    fn test_delete_files() {
542        let metadata = sample_metadata();
543        let mut tx = Transaction::new(metadata);
544
545        tx.delete_file("/data/old-file.parquet");
546        tx.delete_file("/data/another-file.parquet");
547
548        assert!(tx.has_changes());
549        assert_eq!(tx.deleted_files_count(), 2);
550    }
551
552    #[test]
553    fn test_prepare_commit() {
554        let metadata = sample_metadata();
555        let mut tx = Transaction::new(metadata);
556
557        let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024);
558        tx.add_file(file);
559
560        let prepared = tx.prepare_commit().unwrap();
561
562        assert_eq!(prepared.base_sequence, 0);
563        assert_eq!(prepared.sequence_number(), 1);
564        assert_eq!(prepared.added_files.len(), 1);
565    }
566
567    #[test]
568    fn test_empty_transaction_fails() {
569        let metadata = sample_metadata();
570        let tx = Transaction::new(metadata);
571
572        let result = tx.prepare_commit();
573        assert!(matches!(result, Err(TransactionError::NoChanges)));
574    }
575
576    #[test]
577    fn test_retry_config_delay() {
578        let config = RetryConfig::default();
579
580        assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
581        assert_eq!(config.delay_for_attempt(1), Duration::from_millis(100));
582        assert_eq!(config.delay_for_attempt(2), Duration::from_millis(200));
583        assert_eq!(config.delay_for_attempt(3), Duration::from_millis(400));
584    }
585}