use std::sync::Arc;
use crate::actions::visitors::TransactionVisitor;
use crate::actions::{get_log_schema, TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::Engine;
use crate::{actions::Transaction, DeltaResult};
pub use crate::actions::visitors::TransactionMap;
pub struct TransactionScanner {
snapshot: Arc<Snapshot>,
}
impl TransactionScanner {
pub fn new(snapshot: Arc<Snapshot>) -> Self {
TransactionScanner { snapshot }
}
fn scan_application_transactions(
&self,
engine: &dyn Engine,
application_id: Option<&str>,
) -> DeltaResult<TransactionMap> {
let schema = get_log_schema().project(&[TRANSACTION_NAME])?;
let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned()));
let iter =
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema.clone(), None)?;
for maybe_data in iter {
let (txns, _) = maybe_data?;
txns.extract(schema.clone(), &mut visitor)?;
if application_id.is_some() && !visitor.transactions.is_empty() {
break;
}
}
Ok(visitor.transactions)
}
pub fn application_transaction(
&self,
engine: &dyn Engine,
application_id: &str,
) -> DeltaResult<Option<Transaction>> {
let mut transactions = self.scan_application_transactions(engine, Some(application_id))?;
Ok(transactions.remove(application_id))
}
pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult<TransactionMap> {
self.scan_application_transactions(engine, None)
}
}
#[cfg(all(test, feature = "default-engine"))]
mod tests {
use std::path::PathBuf;
use super::*;
use crate::engine::sync::SyncEngine;
use crate::Table;
fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option<Transaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn_scan = TransactionScanner::new(snapshot.into());
(
txn_scan.application_transactions(&engine).unwrap(),
txn_scan.application_transaction(&engine, app_id).unwrap(),
)
}
#[test]
fn test_txn() {
let (txns, txn) = get_latest_transactions("./tests/data/basic_partitioned/", "test");
assert!(txn.is_none());
assert_eq!(txns.len(), 0);
let (txns, txn) = get_latest_transactions("./tests/data/app-txn-no-checkpoint/", "my-app");
assert!(txn.is_some());
assert_eq!(txns.len(), 2);
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(Transaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
})
.as_ref()
);
let (txns, txn) = get_latest_transactions("./tests/data/app-txn-checkpoint/", "my-app");
assert!(txn.is_some());
assert_eq!(txns.len(), 2);
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(Transaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
})
.as_ref()
);
}
}