use std::sync::{Arc, LazyLock};
use crate::actions::visitors::SetTransactionVisitor;
use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::{
DeltaResult, Engine, EngineData, Expression as Expr, ExpressionRef, RowVisitor as _, SchemaRef,
};
pub use crate::actions::visitors::SetTransactionMap;
pub struct SetTransactionScanner {
snapshot: Arc<Snapshot>,
}
impl SetTransactionScanner {
pub fn new(snapshot: Arc<Snapshot>) -> Self {
SetTransactionScanner { snapshot }
}
fn scan_application_transactions(
&self,
engine: &dyn Engine,
application_id: Option<&str>,
) -> DeltaResult<SetTransactionMap> {
let schema = Self::get_txn_schema()?;
let mut visitor = SetTransactionVisitor::new(application_id.map(|s| s.to_owned()));
for maybe_data in self.replay_for_app_ids(engine, schema.clone())? {
let (txns, _) = maybe_data?;
visitor.visit_rows_of(txns.as_ref())?;
if application_id.is_some() && !visitor.set_transactions.is_empty() {
break;
}
}
Ok(visitor.set_transactions)
}
fn get_txn_schema() -> DeltaResult<SchemaRef> {
get_log_schema().project(&[SET_TRANSACTION_NAME])
}
fn replay_for_app_ids(
&self,
engine: &dyn Engine,
schema: SchemaRef,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = LazyLock::new(|| {
Some(Arc::new(
Expr::column([SET_TRANSACTION_NAME, "appId"]).is_not_null(),
))
});
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}
pub fn application_transaction(
&self,
engine: &dyn Engine,
application_id: &str,
) -> DeltaResult<Option<SetTransaction>> {
let mut transactions = self.scan_application_transactions(engine, Some(application_id))?;
Ok(transactions.remove(application_id))
}
pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult<SetTransactionMap> {
self.scan_application_transactions(engine, None)
}
}
#[cfg(all(test, feature = "default-engine"))]
mod tests {
use std::path::PathBuf;
use super::*;
use crate::engine::sync::SyncEngine;
use crate::Table;
use itertools::Itertools;
fn get_latest_transactions(
path: &str,
app_id: &str,
) -> (SetTransactionMap, Option<SetTransaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn_scan = SetTransactionScanner::new(snapshot.into());
(
txn_scan.application_transactions(&engine).unwrap(),
txn_scan.application_transaction(&engine, app_id).unwrap(),
)
}
#[test]
fn test_txn() {
let (txns, txn) = get_latest_transactions("./tests/data/basic_partitioned/", "test");
assert!(txn.is_none());
assert_eq!(txns.len(), 0);
let (txns, txn) = get_latest_transactions("./tests/data/app-txn-no-checkpoint/", "my-app");
assert!(txn.is_some());
assert_eq!(txns.len(), 2);
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
})
.as_ref()
);
let (txns, txn) = get_latest_transactions("./tests/data/app-txn-checkpoint/", "my-app");
assert!(txn.is_some());
assert_eq!(txns.len(), 2);
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
})
.as_ref()
);
}
#[test]
fn test_replay_for_app_ids() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn = SetTransactionScanner::new(snapshot.into());
let txn_schema = SetTransactionScanner::get_txn_schema().unwrap();
let data: Vec<_> = txn
.replay_for_app_ids(&engine, txn_schema.clone())
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 2);
}
}