Skip to main content

supertable_core/
cdc.rs

1//! # SuperTable Change Data Capture (CDC)
2//!
3//! This module provides tools for tracking and reading incremental changes
4//! between table snapshots.
5
6use crate::table::Table;
7use anyhow::Result;
8
9/// Operation type for a change.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ChangeOperation {
12    Insert,
13    Delete,
14    Update,
15}
16
17/// A single row-level change.
18#[derive(Debug, Clone)]
19pub struct RowChange {
20    pub operation: ChangeOperation,
21    pub file_path: String,
22    pub snapshot_id: i64,
23}
24
25/// A checkpoint for watermarking incremental progress.
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27pub struct Checkpoint {
28    pub last_snapshot_id: i64,
29    pub last_sequence_number: i64,
30}
31
32/// Change Data Capture engine.
33pub struct ChangeDataCapture {
34    table: Table,
35}
36
37impl ChangeDataCapture {
38    pub fn new(table: Table) -> Self {
39        Self { table }
40    }
41
42    /// Returns changes between two snapshots.
43    pub async fn changes_between(
44        &self,
45        from_snapshot_id: i64,
46        to_snapshot_id: i64,
47    ) -> Result<Vec<RowChange>> {
48        let from_snapshot = self
49            .table
50            .metadata
51            .snapshot(from_snapshot_id)
52            .ok_or_else(|| anyhow::anyhow!("Source snapshot not found: {}", from_snapshot_id))?;
53        let to_snapshot = self
54            .table
55            .metadata
56            .snapshot(to_snapshot_id)
57            .ok_or_else(|| anyhow::anyhow!("Target snapshot not found: {}", to_snapshot_id))?;
58
59        // In a real implementation, we'd compare manifest lists and manifests.
60        // For the prototype, we compare the final data file sets.
61        let from_files = from_snapshot.all_data_files(&self.table.storage).await?;
62        let to_files = to_snapshot.all_data_files(&self.table.storage).await?;
63
64        let mut changes = Vec::new();
65
66        // Files in 'to' but not in 'from' are Inserts
67        for file in &to_files {
68            if !from_files
69                .iter()
70                .any(|f: &crate::manifest::DataFile| f.file_path == file.file_path)
71            {
72                changes.push(RowChange {
73                    operation: ChangeOperation::Insert,
74                    file_path: file.file_path.clone(),
75                    snapshot_id: to_snapshot_id,
76                });
77            }
78        }
79
80        // Files in 'from' but not in 'to' are Deletes
81        for file in &from_files {
82            if !to_files
83                .iter()
84                .any(|f: &crate::manifest::DataFile| f.file_path == file.file_path)
85            {
86                changes.push(RowChange {
87                    operation: ChangeOperation::Delete,
88                    file_path: file.file_path.clone(),
89                    snapshot_id: to_snapshot_id,
90                });
91            }
92        }
93
94        Ok(changes)
95    }
96
97    /// Returns changes since a given snapshot up to the current snapshot.
98    pub async fn changes_since(&self, snapshot_id: i64) -> Result<Vec<RowChange>> {
99        let current_id = self
100            .table
101            .metadata
102            .current_snapshot_id
103            .ok_or_else(|| anyhow::anyhow!("No current snapshot"))?;
104        self.changes_between(snapshot_id, current_id).await
105    }
106
107    /// Saves a checkpoint for incremental reads.
108    pub async fn save_checkpoint(&self, checkpoint: &Checkpoint, name: &str) -> Result<()> {
109        let path = format!("{}/checkpoints/{}.json", self.table.metadata.location, name);
110        let data = serde_json::to_vec(checkpoint)?;
111        self.table.storage.write(&path, data.into()).await?;
112        Ok(())
113    }
114
115    /// Loads a checkpoint for incremental reads.
116    pub async fn load_checkpoint(&self, name: &str) -> Result<Checkpoint> {
117        let path = format!("{}/checkpoints/{}.json", self.table.metadata.location, name);
118        let data = self.table.storage.read(&path).await?;
119        let checkpoint = serde_json::from_slice(&data)?;
120        Ok(checkpoint)
121    }
122
123    /// Returns changes since the last checkpoint.
124    pub async fn changes_since_checkpoint(&self, name: &str) -> Result<Vec<RowChange>> {
125        let checkpoint = self.load_checkpoint(name).await?;
126        self.changes_since(checkpoint.last_snapshot_id).await
127    }
128}