use std::collections::HashMap;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::transaction::create_table::create_table as create_table_txn;
use delta_kernel::Snapshot;
use test_utils::{
begin_transaction, create_table_and_load_snapshot, read_add_infos, test_table_setup,
write_batch_to_table,
};
use url::Url;
use crate::common::write_utils::{get_simple_schema, simple_id_batch};
async fn write_batch_to_table_simple(
snapshot: &Arc<Snapshot>,
engine: &DefaultEngine<TokioBackgroundExecutor>,
data: RecordBatch,
) -> Result<Arc<Snapshot>, Box<dyn std::error::Error>> {
let mut txn = begin_transaction(snapshot.clone(), engine)?.with_engine_info("test");
let write_context = txn.unpartitioned_write_context()?;
let add_meta = engine
.write_parquet(&ArrowEngineData::new(data), &write_context)
.await?;
txn.add_files(add_meta);
let committed = txn.commit(engine)?.unwrap_committed();
Ok(committed.post_commit_snapshot().unwrap().clone())
}
#[tokio::test]
async fn test_write_uses_relative_paths_and_readback() -> Result<(), Box<dyn std::error::Error>> {
let schema = get_simple_schema();
let (_tmp_dir, table_path, engine) = test_table_setup()?;
let snapshot =
create_table_and_load_snapshot(&table_path, schema.clone(), engine.as_ref(), &[])?;
let snapshot = write_batch_to_table_simple(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, vec![1, 2, 3]),
)
.await?;
let add_infos = read_add_infos(&snapshot, engine.as_ref())?;
assert_eq!(add_infos.len(), 1);
let path = &add_infos[0].path;
assert!(
!path.contains("://"),
"should produce relative paths, got: {path}"
);
let scan = snapshot.scan_builder().build()?;
let batches = test_utils::read_scan(&scan, engine)?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
Ok(())
}
#[tokio::test]
async fn test_multiple_files_in_commit_all_use_relative_paths(
) -> Result<(), Box<dyn std::error::Error>> {
let schema = get_simple_schema();
let (_tmp_dir, table_path, engine) = test_table_setup()?;
let snapshot =
create_table_and_load_snapshot(&table_path, schema.clone(), engine.as_ref(), &[])?;
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?.with_engine_info("test");
let write_context = txn.unpartitioned_write_context().unwrap();
for values in [vec![1, 2], vec![3, 4]] {
let add_meta = engine
.write_parquet(
&ArrowEngineData::new(simple_id_batch(&schema, values)),
&write_context,
)
.await?;
txn.add_files(add_meta);
}
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot = committed.post_commit_snapshot().unwrap().clone();
let add_infos = read_add_infos(&snapshot, engine.as_ref())?;
assert_eq!(add_infos.len(), 2);
for info in &add_infos {
assert!(
!info.path.contains("://"),
"Expected relative path, got: {}",
info.path
);
}
Ok(())
}
#[tokio::test]
async fn test_multiple_commits_with_relative_paths_all_readable(
) -> Result<(), Box<dyn std::error::Error>> {
let schema = get_simple_schema();
let (_tmp_dir, table_path, engine) = test_table_setup()?;
let mut snapshot =
create_table_and_load_snapshot(&table_path, schema.clone(), engine.as_ref(), &[])?;
for values in [vec![1, 2], vec![3, 4], vec![5, 6]] {
snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
simple_id_batch(&schema, values),
HashMap::new(),
)
.await?;
}
let scan = snapshot.scan_builder().build()?;
let batches = test_utils::read_scan(&scan, engine)?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 6);
Ok(())
}
#[tokio::test]
async fn test_create_table_with_data_uses_relative_paths() -> Result<(), Box<dyn std::error::Error>>
{
let schema = get_simple_schema();
let (_tmp_dir, table_path, engine) = test_table_setup()?;
let table_url = Url::from_directory_path(&table_path).unwrap();
let mut txn = create_table_txn(table_url.as_str(), schema.clone(), "test/1.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
let write_context = txn.unpartitioned_write_context()?;
let add_meta = engine
.write_parquet(
&ArrowEngineData::new(simple_id_batch(&schema, vec![10, 20])),
&write_context,
)
.await?;
txn.add_files(add_meta);
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot = committed.post_commit_snapshot().unwrap().clone();
let add_infos = read_add_infos(&snapshot, engine.as_ref())?;
assert_eq!(add_infos.len(), 1);
let path = &add_infos[0].path;
assert!(
!path.contains("://"),
"should produce relative paths, got: {path}"
);
let scan = snapshot.scan_builder().build()?;
let batches = test_utils::read_scan(&scan, engine)?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}