1use crate::table::Table;
7use anyhow::Result;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ChangeOperation {
12 Insert,
13 Delete,
14 Update,
15}
16
17#[derive(Debug, Clone)]
19pub struct RowChange {
20 pub operation: ChangeOperation,
21 pub file_path: String,
22 pub snapshot_id: i64,
23}
24
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27pub struct Checkpoint {
28 pub last_snapshot_id: i64,
29 pub last_sequence_number: i64,
30}
31
32pub struct ChangeDataCapture {
34 table: Table,
35}
36
37impl ChangeDataCapture {
38 pub fn new(table: Table) -> Self {
39 Self { table }
40 }
41
42 pub async fn changes_between(
44 &self,
45 from_snapshot_id: i64,
46 to_snapshot_id: i64,
47 ) -> Result<Vec<RowChange>> {
48 let from_snapshot = self
49 .table
50 .metadata
51 .snapshot(from_snapshot_id)
52 .ok_or_else(|| anyhow::anyhow!("Source snapshot not found: {}", from_snapshot_id))?;
53 let to_snapshot = self
54 .table
55 .metadata
56 .snapshot(to_snapshot_id)
57 .ok_or_else(|| anyhow::anyhow!("Target snapshot not found: {}", to_snapshot_id))?;
58
59 let from_files = from_snapshot.all_data_files(&self.table.storage).await?;
62 let to_files = to_snapshot.all_data_files(&self.table.storage).await?;
63
64 let mut changes = Vec::new();
65
66 for file in &to_files {
68 if !from_files
69 .iter()
70 .any(|f: &crate::manifest::DataFile| f.file_path == file.file_path)
71 {
72 changes.push(RowChange {
73 operation: ChangeOperation::Insert,
74 file_path: file.file_path.clone(),
75 snapshot_id: to_snapshot_id,
76 });
77 }
78 }
79
80 for file in &from_files {
82 if !to_files
83 .iter()
84 .any(|f: &crate::manifest::DataFile| f.file_path == file.file_path)
85 {
86 changes.push(RowChange {
87 operation: ChangeOperation::Delete,
88 file_path: file.file_path.clone(),
89 snapshot_id: to_snapshot_id,
90 });
91 }
92 }
93
94 Ok(changes)
95 }
96
97 pub async fn changes_since(&self, snapshot_id: i64) -> Result<Vec<RowChange>> {
99 let current_id = self
100 .table
101 .metadata
102 .current_snapshot_id
103 .ok_or_else(|| anyhow::anyhow!("No current snapshot"))?;
104 self.changes_between(snapshot_id, current_id).await
105 }
106
107 pub async fn save_checkpoint(&self, checkpoint: &Checkpoint, name: &str) -> Result<()> {
109 let path = format!("{}/checkpoints/{}.json", self.table.metadata.location, name);
110 let data = serde_json::to_vec(checkpoint)?;
111 self.table.storage.write(&path, data.into()).await?;
112 Ok(())
113 }
114
115 pub async fn load_checkpoint(&self, name: &str) -> Result<Checkpoint> {
117 let path = format!("{}/checkpoints/{}.json", self.table.metadata.location, name);
118 let data = self.table.storage.read(&path).await?;
119 let checkpoint = serde_json::from_slice(&data)?;
120 Ok(checkpoint)
121 }
122
123 pub async fn changes_since_checkpoint(&self, name: &str) -> Result<Vec<RowChange>> {
125 let checkpoint = self.load_checkpoint(name).await?;
126 self.changes_since(checkpoint.last_snapshot_id).await
127 }
128}