pub(crate) use crate::actions::visitors::SetTransactionMap;
use crate::actions::visitors::SetTransactionVisitor;
use crate::actions::{get_log_txn_schema, SetTransaction};
use crate::log_replay::ActionsBatch;
use crate::log_segment::LogSegment;
use crate::{DeltaResult, Engine, RowVisitor as _};
pub(crate) fn is_set_txn_expired(
expiration_timestamp: Option<i64>,
last_updated: Option<i64>,
) -> bool {
matches!(
(expiration_timestamp, last_updated),
(Some(exp_ts), Some(lu)) if lu <= exp_ts
)
}
pub(crate) struct SetTransactionScanner {}
impl SetTransactionScanner {
pub(crate) fn get_one(
log_segment: &LogSegment,
application_id: &str,
engine: &dyn Engine,
expiration_timestamp: Option<i64>,
) -> DeltaResult<Option<SetTransaction>> {
let mut transactions = scan_application_transactions(
log_segment,
Some(application_id),
engine,
expiration_timestamp,
)?;
Ok(transactions.remove(application_id))
}
#[allow(unused)]
pub(crate) fn get_all(
log_segment: &LogSegment,
engine: &dyn Engine,
expiration_timestamp: Option<i64>,
) -> DeltaResult<SetTransactionMap> {
scan_application_transactions(log_segment, None, engine, expiration_timestamp)
}
}
fn scan_application_transactions(
log_segment: &LogSegment,
application_id: Option<&str>,
engine: &dyn Engine,
expiration_timestamp: Option<i64>,
) -> DeltaResult<SetTransactionMap> {
let mut visitor =
SetTransactionVisitor::new(application_id.map(|s| s.to_owned()), expiration_timestamp);
for maybe_data in replay_for_app_ids(log_segment, engine)? {
let txns = maybe_data?.actions;
visitor.visit_rows_of(txns.as_ref())?;
if application_id.is_some() && !visitor.set_transactions.is_empty() {
break;
}
}
Ok(visitor.set_transactions)
}
fn replay_for_app_ids(
log_segment: &LogSegment,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
let txn_schema = get_log_txn_schema();
log_segment.read_actions(engine, txn_schema.clone())
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use itertools::Itertools;
use super::*;
use crate::arrow::array::StringArray;
use crate::engine::sync::SyncEngine;
use crate::utils::test_utils::parse_json_batch;
use crate::Snapshot;
fn get_latest_transactions(
path: &str,
app_id: &str,
) -> (SetTransactionMap, Option<SetTransaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let log_segment = snapshot.log_segment();
(
SetTransactionScanner::get_all(log_segment, &engine, None).unwrap(),
SetTransactionScanner::get_one(log_segment, app_id, &engine, None).unwrap(),
)
}
#[test]
fn test_txn() {
let (txns, txn) = get_latest_transactions("./tests/data/basic_partitioned/", "test");
assert!(txn.is_none());
assert_eq!(txns.len(), 0);
let (txns, txn) = get_latest_transactions("./tests/data/app-txn-no-checkpoint/", "my-app");
assert!(txn.is_some());
assert_eq!(txns.len(), 2);
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
})
.as_ref()
);
let (txns, txn) = get_latest_transactions("./tests/data/app-txn-checkpoint/", "my-app");
assert!(txn.is_some());
assert_eq!(txns.len(), 2);
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
})
.as_ref()
);
}
#[test]
fn test_replay_for_app_ids() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let log_segment = snapshot.log_segment();
let data: Vec<_> = replay_for_app_ids(log_segment, &engine)
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 2);
}
#[test]
fn test_txn_retention_filtering() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/app-txn-with-last-updated/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let log_segment = snapshot.log_segment();
let all_txns = SetTransactionScanner::get_all(log_segment, &engine, None).unwrap();
assert_eq!(all_txns.len(), 4);
let expiration_timestamp = Some(100); let filtered_txns =
SetTransactionScanner::get_all(log_segment, &engine, expiration_timestamp).unwrap();
assert!(filtered_txns.len() <= all_txns.len());
}
#[test]
fn test_visitor_retention_with_null_last_updated() {
let json_strings: StringArray = vec![
r#"{"txn":{"appId":"app_with_time","version":1,"lastUpdated":100}}"#,
r#"{"txn":{"appId":"app_without_time","version":2}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut visitor = SetTransactionVisitor::new(None, Some(1000));
visitor.visit_rows_of(batch.as_ref()).unwrap();
assert_eq!(visitor.set_transactions.len(), 1);
assert!(visitor.set_transactions.contains_key("app_without_time"));
}
}