use std::sync::Arc;
use url::Url;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
use delta_kernel::object_store::memory::InMemory;
use delta_kernel::Snapshot;
use test_utils::{
actions_to_string, actions_to_string_catalog_managed, add_commit, add_staged_commit,
create_log_path, delta_path_for_version, TestAction,
};
fn setup_test() -> (
Arc<InMemory>,
Arc<DefaultEngine<TokioBackgroundExecutor>>,
Url,
) {
let storage = Arc::new(InMemory::new());
let table_root = Url::parse("memory:///").unwrap();
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
(storage, engine, table_root)
}
#[tokio::test]
async fn basic_snapshot_with_log_tail_staged_commits() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string_catalog_managed(actions),
)
.await?;
let path1 = add_staged_commit(table_root, storage.as_ref(), 1, String::from("{}")).await?;
let _ = add_staged_commit(table_root, storage.as_ref(), 1, String::from("{}")).await?;
let path2 = add_staged_commit(table_root, storage.as_ref(), 2, String::from("{}")).await?;
let log_tail = vec![
create_log_path(&table_url, path1.clone()),
create_log_path(&table_url, path2.clone()),
];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail.clone())
.with_max_catalog_version(2)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 2);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 3);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[1]
.location
.location,
table_url.join(path1.as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[2]
.location
.location,
table_url.join(path2.as_ref())?
);
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.at_version(1)
.with_max_catalog_version(2)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 2);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[1]
.location
.location,
table_url.join(path1.as_ref())?
);
let log_tail = vec![create_log_path(&table_url, path1.clone())];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.with_max_catalog_version(1)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 2);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[1]
.location
.location,
table_url.join(path1.as_ref())?
);
let snapshot = Snapshot::builder_for(table_root)
.with_max_catalog_version(0)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
let log_tail = vec![create_log_path(
&table_url,
delta_path_for_version(0, "json"),
)];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.with_max_catalog_version(0)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
Ok(())
}
#[tokio::test]
async fn basic_snapshot_with_log_tail() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(1, "json")),
create_log_path(&table_url, delta_path_for_version(2, "json")),
];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 2);
Ok(())
}
#[tokio::test]
async fn log_tail_behind_filesystem() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(0, "json")),
create_log_path(&table_url, delta_path_for_version(1, "json")),
];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(
snapshot.version(),
1,
"Log tail should define the latest version"
);
Ok(())
}
#[tokio::test]
async fn incremental_snapshot_with_log_tail() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string_catalog_managed(actions),
)
.await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let initial_snapshot = Snapshot::builder_for(table_root)
.at_version(1)
.with_max_catalog_version(2)
.build(engine.as_ref())?;
assert_eq!(initial_snapshot.version(), 1);
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
let path3 =
add_staged_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_4.parquet".to_string())];
let path4 =
add_staged_commit(table_root, storage.as_ref(), 4, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(2, "json")),
create_log_path(&table_url, path3),
create_log_path(&table_url, path4),
];
let new_snapshot = Snapshot::builder_from(initial_snapshot)
.with_log_tail(log_tail)
.with_max_catalog_version(4)
.build(engine.as_ref())?;
assert_eq!(new_snapshot.version(), 4);
Ok(())
}
#[tokio::test]
async fn incremental_snapshot_caps_at_max_catalog_version() -> Result<(), Box<dyn std::error::Error>>
{
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string_catalog_managed(actions),
)
.await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let mcv = 2;
let initial_snapshot = Snapshot::builder_for(table_root)
.at_version(1)
.with_max_catalog_version(mcv)
.build(engine.as_ref())?;
assert_eq!(initial_snapshot.version(), 1);
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let log_tail = vec![create_log_path(
&table_url,
delta_path_for_version(mcv, "json"),
)];
let new_snapshot = Snapshot::builder_from(initial_snapshot)
.with_log_tail(log_tail)
.with_max_catalog_version(mcv)
.build(engine.as_ref())?;
assert_eq!(new_snapshot.version(), 2);
Ok(())
}
#[tokio::test]
async fn log_tail_exceeds_requested_version() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_4.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 4, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(1, "json")),
create_log_path(&table_url, delta_path_for_version(2, "json")),
create_log_path(&table_url, delta_path_for_version(3, "json")),
create_log_path(&table_url, delta_path_for_version(4, "json")),
];
let snapshot = Snapshot::builder_for(table_root)
.at_version(3)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 3);
Ok(())
}
#[tokio::test]
async fn log_tail_behind_requested_version() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_4.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 4, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(1, "json")),
create_log_path(&table_url, delta_path_for_version(2, "json")),
create_log_path(&table_url, delta_path_for_version(3, "json")),
];
let result = Snapshot::builder_for(table_root)
.at_version(4)
.with_log_tail(log_tail)
.build(engine.as_ref());
assert!(result
.unwrap_err()
.to_string()
.contains("LogSegment end version 3 not the same as the specified end version 4"));
Ok(())
}