use crate::table::Table;
use anyhow::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeOperation {
Insert,
Delete,
Update,
}
#[derive(Debug, Clone)]
pub struct RowChange {
pub operation: ChangeOperation,
pub file_path: String,
pub snapshot_id: i64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Checkpoint {
pub last_snapshot_id: i64,
pub last_sequence_number: i64,
}
pub struct ChangeDataCapture {
table: Table,
}
impl ChangeDataCapture {
pub fn new(table: Table) -> Self {
Self { table }
}
pub async fn changes_between(
&self,
from_snapshot_id: i64,
to_snapshot_id: i64,
) -> Result<Vec<RowChange>> {
let from_snapshot = self
.table
.metadata
.snapshot(from_snapshot_id)
.ok_or_else(|| anyhow::anyhow!("Source snapshot not found: {}", from_snapshot_id))?;
let to_snapshot = self
.table
.metadata
.snapshot(to_snapshot_id)
.ok_or_else(|| anyhow::anyhow!("Target snapshot not found: {}", to_snapshot_id))?;
let from_files = from_snapshot.all_data_files(&self.table.storage).await?;
let to_files = to_snapshot.all_data_files(&self.table.storage).await?;
let mut changes = Vec::new();
for file in &to_files {
if !from_files
.iter()
.any(|f: &crate::manifest::DataFile| f.file_path == file.file_path)
{
changes.push(RowChange {
operation: ChangeOperation::Insert,
file_path: file.file_path.clone(),
snapshot_id: to_snapshot_id,
});
}
}
for file in &from_files {
if !to_files
.iter()
.any(|f: &crate::manifest::DataFile| f.file_path == file.file_path)
{
changes.push(RowChange {
operation: ChangeOperation::Delete,
file_path: file.file_path.clone(),
snapshot_id: to_snapshot_id,
});
}
}
Ok(changes)
}
pub async fn changes_since(&self, snapshot_id: i64) -> Result<Vec<RowChange>> {
let current_id = self
.table
.metadata
.current_snapshot_id
.ok_or_else(|| anyhow::anyhow!("No current snapshot"))?;
self.changes_between(snapshot_id, current_id).await
}
pub async fn save_checkpoint(&self, checkpoint: &Checkpoint, name: &str) -> Result<()> {
let path = format!("{}/checkpoints/{}.json", self.table.metadata.location, name);
let data = serde_json::to_vec(checkpoint)?;
self.table.storage.write(&path, data.into()).await?;
Ok(())
}
pub async fn load_checkpoint(&self, name: &str) -> Result<Checkpoint> {
let path = format!("{}/checkpoints/{}.json", self.table.metadata.location, name);
let data = self.table.storage.read(&path).await?;
let checkpoint = serde_json::from_slice(&data)?;
Ok(checkpoint)
}
pub async fn changes_since_checkpoint(&self, name: &str) -> Result<Vec<RowChange>> {
let checkpoint = self.load_checkpoint(name).await?;
self.changes_since(checkpoint.last_snapshot_id).await
}
}