use std::collections::{HashMap, HashSet};
use tracing::instrument;
use crate::actions::get_log_domain_metadata_schema;
use crate::actions::visitors::DomainMetadataVisitor;
use crate::actions::DomainMetadata;
use crate::log_replay::ActionsBatch;
use crate::{DeltaResult, Engine, RowVisitor as _};
use super::LogSegment;
pub(crate) type DomainMetadataMap = HashMap<String, DomainMetadata>;
impl LogSegment {
#[instrument(name = "domain_metadata.scan", skip_all, fields(domains = ?domains.map(|d| d.iter().collect::<Vec<_>>())), err)]
pub(crate) fn scan_domain_metadatas(
&self,
domains: Option<&HashSet<&str>>,
engine: &dyn Engine,
) -> DeltaResult<DomainMetadataMap> {
let domain_filter = domains.map(|set| {
set.iter()
.map(|s| s.to_string())
.collect::<HashSet<String>>()
});
let mut visitor = DomainMetadataVisitor::new(domain_filter);
for actions in self.read_domain_metadata_batches(engine)? {
let domain_metadatas = actions?.actions;
visitor.visit_rows_of(domain_metadatas.as_ref())?;
if visitor.filter_found() {
break;
}
}
Ok(visitor.into_domain_metadatas())
}
fn read_domain_metadata_batches(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
let schema = get_log_domain_metadata_schema();
self.read_actions(engine, schema.clone())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use url::Url;
use crate::actions::visitors::DomainMetadataVisitor;
use crate::committer::FileSystemCommitter;
use crate::engine::default::DefaultEngineBuilder;
use crate::object_store::memory::InMemory;
use crate::schema::{DataType, StructField, StructType};
use crate::transaction::create_table::create_table as create_table_txn;
use crate::{RowVisitor as _, Snapshot};
fn build_two_commit_log() -> (impl crate::Engine, std::sync::Arc<Snapshot>) {
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store).build();
let url = Url::parse("memory:///").unwrap();
let _ = create_table_txn(
url.as_str(),
Arc::new(StructType::new_unchecked(vec![StructField::new(
"id",
DataType::INTEGER,
true,
)])),
"test",
)
.with_table_properties([("delta.feature.domainMetadata", "supported")])
.build(&engine, Box::new(FileSystemCommitter::new()))
.unwrap()
.with_domain_metadata("domainC".to_string(), "cfgC".to_string())
.commit(&engine)
.unwrap();
let snapshot = Snapshot::builder_for(url.clone()).build(&engine).unwrap();
let _ = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_domain_metadata("domainA".to_string(), "cfgA".to_string())
.with_domain_metadata("domainB".to_string(), "cfgB".to_string())
.commit(&engine)
.unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
(engine, snapshot)
}
#[tokio::test]
async fn test_scan_domain_metadatas_early_termination() {
let (engine, snapshot) = build_two_commit_log();
let log_segment = snapshot.log_segment();
let total_batches = log_segment
.read_domain_metadata_batches(&engine)
.unwrap()
.filter(|r| r.is_ok())
.count();
assert_eq!(
total_batches, 2,
"expected 2 total batches (one per commit)"
);
let filter = HashSet::from(["domainA".to_string(), "domainB".to_string()]);
let mut visitor = DomainMetadataVisitor::new(Some(filter));
let mut batches_consumed = 0;
for actions in log_segment.read_domain_metadata_batches(&engine).unwrap() {
batches_consumed += 1;
visitor
.visit_rows_of(actions.unwrap().actions.as_ref())
.unwrap();
if visitor.filter_found() {
break;
}
}
assert_eq!(
batches_consumed, 1,
"should break after the first (newest) batch once both domains are found"
);
assert!(
batches_consumed < total_batches,
"early termination must consume fewer batches than the total"
);
let result = visitor.into_domain_metadatas();
assert_eq!(result.len(), 2);
assert_eq!(result["domainA"].configuration(), "cfgA");
assert_eq!(result["domainB"].configuration(), "cfgB");
assert!(
!result.contains_key("domainC"),
"domainC must not appear — second batch was not read"
);
}
#[tokio::test]
async fn test_scan_domain_metadatas_with_single_domain_filter_returns_only_that_domain() {
let (engine, snapshot) = build_two_commit_log();
let result = snapshot
.log_segment()
.scan_domain_metadatas(Some(&HashSet::from(["domainA"])), &engine)
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result["domainA"].configuration(), "cfgA");
}
#[tokio::test]
async fn test_scan_domain_metadatas_with_subset_filter_returns_matching_domains() {
let (engine, snapshot) = build_two_commit_log();
let result = snapshot
.log_segment()
.scan_domain_metadatas(Some(&HashSet::from(["domainA", "domainC"])), &engine)
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result["domainA"].configuration(), "cfgA");
assert_eq!(result["domainC"].configuration(), "cfgC");
}
#[tokio::test]
async fn test_scan_domain_metadatas_with_no_filter_returns_all_domains() {
let (engine, snapshot) = build_two_commit_log();
let result = snapshot
.log_segment()
.scan_domain_metadatas(None, &engine)
.unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result["domainA"].configuration(), "cfgA");
assert_eq!(result["domainB"].configuration(), "cfgB");
assert_eq!(result["domainC"].configuration(), "cfgC");
}
#[tokio::test]
async fn test_scan_domain_metadatas_with_split_domains_does_not_terminate_early() {
let (engine, snapshot) = build_two_commit_log();
let log_segment = snapshot.log_segment();
let filter = HashSet::from(["domainA".to_string(), "domainC".to_string()]);
let mut visitor = DomainMetadataVisitor::new(Some(filter));
let mut batches_consumed = 0;
for actions in log_segment.read_domain_metadata_batches(&engine).unwrap() {
batches_consumed += 1;
visitor
.visit_rows_of(actions.unwrap().actions.as_ref())
.unwrap();
if visitor.filter_found() {
break;
}
}
assert_eq!(
batches_consumed, 2,
"must read both batches when requested domains span two commits"
);
let result = visitor.into_domain_metadatas();
assert_eq!(result["domainA"].configuration(), "cfgA");
assert_eq!(result["domainC"].configuration(), "cfgC");
}
}