use std::sync::Arc;
use url::Url;
use buoyant_kernel as delta_kernel;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
use delta_kernel::object_store::memory::InMemory;
use delta_kernel::object_store::path::Path;
use delta_kernel::{FileMeta, LogPath, Snapshot};
use test_utils::{
actions_to_string, add_commit, add_staged_commit, delta_path_for_version, TestAction,
};
fn create_log_path(table_root: &Url, commit_path: Path) -> LogPath {
let file_meta = create_file_meta(table_root, commit_path);
LogPath::try_new(file_meta).expect("Failed to create LogPath")
}
fn create_file_meta(table_root: &Url, commit_path: Path) -> FileMeta {
let commit_url = table_root.join(commit_path.as_ref()).unwrap();
FileMeta {
location: commit_url,
last_modified: 123,
size: 100, }
}
fn setup_test() -> (
Arc<InMemory>,
Arc<DefaultEngine<TokioBackgroundExecutor>>,
Url,
) {
let storage = Arc::new(InMemory::new());
let table_root = Url::parse("memory:///").unwrap();
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
(storage, engine, table_root)
}
#[tokio::test]
async fn basic_snapshot_with_log_tail_staged_commits() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let path1 = add_staged_commit(table_root, storage.as_ref(), 1, String::from("{}")).await?;
let _ = add_staged_commit(table_root, storage.as_ref(), 1, String::from("{}")).await?;
let path2 = add_staged_commit(table_root, storage.as_ref(), 2, String::from("{}")).await?;
let log_tail = vec![
create_log_path(&table_url, path1.clone()),
create_log_path(&table_url, path2.clone()),
];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail.clone())
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 2);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 3);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[1]
.location
.location,
table_url.join(path1.as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[2]
.location
.location,
table_url.join(path2.as_ref())?
);
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.at_version(1)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 2);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[1]
.location
.location,
table_url.join(path1.as_ref())?
);
let log_tail = vec![create_log_path(&table_url, path1.clone())];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 2);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
assert_eq!(
log_segment.listed.ascending_commit_files[1]
.location
.location,
table_url.join(path1.as_ref())?
);
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
let log_tail = vec![create_log_path(
&table_url,
delta_path_for_version(0, "json"),
)];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
let log_segment = snapshot.log_segment();
assert_eq!(log_segment.listed.ascending_commit_files.len(), 1);
assert_eq!(
log_segment.listed.ascending_commit_files[0]
.location
.location,
table_url.join(delta_path_for_version(0, "json").as_ref())?
);
Ok(())
}
#[tokio::test]
async fn basic_snapshot_with_log_tail() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(1, "json")),
create_log_path(&table_url, delta_path_for_version(2, "json")),
];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 2);
Ok(())
}
#[tokio::test]
async fn log_tail_behind_filesystem() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(0, "json")),
create_log_path(&table_url, delta_path_for_version(1, "json")),
];
let snapshot = Snapshot::builder_for(table_root)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(
snapshot.version(),
1,
"Log tail should define the latest version"
);
Ok(())
}
#[tokio::test]
async fn incremental_snapshot_with_log_tail() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let initial_snapshot = Snapshot::builder_for(table_root)
.at_version(1)
.build(engine.as_ref())?;
assert_eq!(initial_snapshot.version(), 1);
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
let path3 =
add_staged_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_4.parquet".to_string())];
let path4 =
add_staged_commit(table_root, storage.as_ref(), 4, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(2, "json")),
create_log_path(&table_url, path3),
create_log_path(&table_url, path4),
];
let new_snapshot = Snapshot::builder_from(initial_snapshot)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(new_snapshot.version(), 4);
Ok(())
}
#[tokio::test]
async fn log_tail_exceeds_requested_version() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_4.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 4, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(1, "json")),
create_log_path(&table_url, delta_path_for_version(2, "json")),
create_log_path(&table_url, delta_path_for_version(3, "json")),
create_log_path(&table_url, delta_path_for_version(4, "json")),
];
let snapshot = Snapshot::builder_for(table_root)
.at_version(3)
.with_log_tail(log_tail)
.build(engine.as_ref())?;
assert_eq!(snapshot.version(), 3);
Ok(())
}
#[tokio::test]
async fn log_tail_behind_requested_version() -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_url) = setup_test();
let table_root = table_url.as_str();
let actions = vec![TestAction::Metadata];
add_commit(table_root, storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 1, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_2.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 2, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_3.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 3, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_4.parquet".to_string())];
add_commit(table_root, storage.as_ref(), 4, actions_to_string(actions)).await?;
let log_tail = vec![
create_log_path(&table_url, delta_path_for_version(1, "json")),
create_log_path(&table_url, delta_path_for_version(2, "json")),
create_log_path(&table_url, delta_path_for_version(3, "json")),
];
let result = Snapshot::builder_for(table_root)
.at_version(4)
.with_log_tail(log_tail)
.build(engine.as_ref());
assert!(result
.unwrap_err()
.to_string()
.contains("LogSegment end version 3 not the same as the specified end version 4"));
Ok(())
}