#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{Int32Array, StructArray};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine_data::FilteredEngineData;
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::DynObjectStore;
use delta_kernel::parquet::file::reader::{FileReader, SerializedFileReader};
use delta_kernel::parquet::schema::types::Type as ParquetType;
use delta_kernel::path::ParsedLogPath;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::table_features::ColumnMappingMode;
use delta_kernel::transaction::CommitResult;
use delta_kernel::{DeltaResult, Engine, Snapshot, Version};
use serde_json::json;
use test_utils::{begin_transaction, create_add_files_metadata, create_table, engine_store_setup};
use url::Url;
use uuid::Uuid;
pub const ZERO_UUID: &str = "00000000-0000-0000-0000-000000000000";
pub fn get_simple_schema() -> SchemaRef {
Arc::new(StructType::try_new(vec![StructField::new("id", DataType::INTEGER, true)]).unwrap())
}
pub fn simple_id_batch(schema: &SchemaRef, values: Vec<i32>) -> RecordBatch {
RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow().unwrap()),
vec![Arc::new(Int32Array::from(values))],
)
.unwrap()
}
pub fn load_existing_single_file_checkpoint_path(
table_path: &str,
version: u64,
) -> std::path::PathBuf {
let log_dir = std::path::Path::new(table_path).join("_delta_log");
for entry in std::fs::read_dir(&log_dir).expect("failed to read _delta_log") {
let entry = entry.unwrap();
let url = Url::from_file_path(entry.path()).expect("entry path to url");
let Some(parsed) = ParsedLogPath::try_from(url).expect("parse log path") else {
continue;
};
if parsed.is_checkpoint() && parsed.version == version {
return entry.path();
}
}
panic!("checkpoint parquet file not found for version {version} in {log_dir:?}");
}
pub fn read_parquet_root_schema(parquet_file: &std::path::Path) -> ParquetType {
let file = std::fs::File::open(parquet_file).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
reader
.metadata()
.file_metadata()
.schema_descr()
.root_schema()
.clone()
}
pub fn get_parquet_field_id(
parquet_file: &std::path::Path,
physical_path: &[String],
) -> Option<i32> {
let root = read_parquet_root_schema(parquet_file);
let mut current = &root;
for name in physical_path {
current = current
.get_fields()
.iter()
.find(|f| f.name() == name)
.unwrap_or_else(|| panic!("parquet schema missing field '{name}'"));
}
let info = current.get_basic_info();
info.has_id().then(|| info.id())
}
pub fn collect_all_parquet_field_ids(parquet_file: &std::path::Path) -> HashMap<String, i64> {
fn walk(t: &ParquetType, path: &str, out: &mut HashMap<String, i64>) {
let info = t.get_basic_info();
if info.has_id() {
out.insert(path.to_string(), info.id() as i64);
}
if let ParquetType::GroupType { fields, .. } = t {
for f in fields {
walk(f, &format!("{path}.{}", f.name()), out);
}
}
}
let mut ids = HashMap::new();
if let ParquetType::GroupType { fields, .. } = &read_parquet_root_schema(parquet_file) {
for f in fields {
walk(f, f.name(), &mut ids);
}
}
ids
}
pub fn validate_txn_id(commit_info: &serde_json::Value) {
let txn_id = commit_info["txnId"]
.as_str()
.expect("txnId should be present in commitInfo");
Uuid::parse_str(txn_id).expect("txnId should be valid UUID format");
}
pub fn validate_timestamp(commit_info: &serde_json::Value) {
let timestamp = commit_info["timestamp"]
.as_i64()
.expect("timestamp should be present in commitInfo");
let current_ts: i64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap();
let two_days_ms = Duration::from_secs(2 * 24 * 60 * 60).as_millis() as i64;
assert!(
(timestamp <= current_ts && timestamp > current_ts - two_days_ms),
"commit timestamp should be at most 2 days behind current system time: got {timestamp}, now {current_ts}"
);
}
pub fn check_action_timestamps<'a>(
parsed_commits: impl Iterator<Item = &'a serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error>> {
let now: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis()
.try_into()
.unwrap();
parsed_commits.for_each(|commit| {
if let Some(commit_info_ts) = &commit.pointer("/commitInfo/timestamp") {
assert!((now - commit_info_ts.as_i64().unwrap()).abs() < 10_000);
}
if let Some(add_ts) = &commit.pointer("/add/modificationTime") {
assert!((now - add_ts.as_i64().unwrap()).abs() < 10_000);
}
});
Ok(())
}
pub async fn get_and_check_all_parquet_sizes(store: Arc<DynObjectStore>, path: &str) -> u64 {
use futures::stream::StreamExt;
let files: Vec<_> = store.list(Some(&Path::from(path))).collect().await;
let parquet_files = files
.into_iter()
.filter(|f| match f {
Ok(f) => f.location.extension() == Some("parquet"),
Err(_) => false,
})
.collect::<Vec<_>>();
assert_eq!(parquet_files.len(), 2);
let size = parquet_files.first().unwrap().as_ref().unwrap().size;
assert!(parquet_files
.iter()
.all(|f| f.as_ref().unwrap().size == size));
size
}
pub async fn write_data_and_check_result_and_stats(
table_url: Url,
schema: SchemaRef,
engine: Arc<DefaultEngine<TokioBackgroundExecutor>>,
expected_since_commit: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let mut txn = test_utils::load_and_begin_transaction(table_url.clone(), engine.as_ref())?
.with_data_change(true);
let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> {
let data = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(data.to_vec()))],
)?;
Ok(Box::new(ArrowEngineData::new(data)))
});
let write_context = Arc::new(txn.unpartitioned_write_context().unwrap());
let tasks = append_data.into_iter().map(|data| {
let engine = engine.clone();
let write_context = write_context.clone();
tokio::task::spawn(async move {
engine
.write_parquet(data.as_ref().unwrap(), write_context.as_ref())
.await
})
});
let add_files_metadata = futures::future::join_all(tasks).await.into_iter().flatten();
for meta in add_files_metadata {
txn.add_files(meta?);
}
match txn.commit(engine.as_ref())? {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(committed.commit_version(), expected_since_commit as Version);
assert_eq!(
committed.post_commit_stats().commits_since_checkpoint,
expected_since_commit
);
assert_eq!(
committed.post_commit_stats().commits_since_log_compaction,
expected_since_commit
);
}
_ => panic!("Commit should have succeeded"),
};
Ok(())
}
pub fn get_simple_int_schema() -> Arc<StructType> {
Arc::new(StructType::try_new(vec![StructField::nullable("number", DataType::INTEGER)]).unwrap())
}
pub fn set_table_properties(
table_path: &str,
table_url: &Url,
engine: &dyn Engine,
current_version: Version,
properties: &[(&str, &str)],
) -> Result<Arc<Snapshot>, Box<dyn std::error::Error>> {
let v0_path = std::path::Path::new(table_path).join("_delta_log/00000000000000000000.json");
let mut meta: serde_json::Value = std::fs::read_to_string(&v0_path)?
.lines()
.find_map(|line| {
serde_json::from_str::<serde_json::Value>(line)
.ok()
.filter(|v| v.get("metaData").is_some())
})
.expect("version 0 should contain a metaData action");
for &(key, value) in properties {
meta["metaData"]["configuration"][key] = json!(value);
}
let new_commit = std::path::Path::new(table_path)
.join(format!("_delta_log/{:020}.json", current_version + 1));
std::fs::write(&new_commit, serde_json::to_string(&meta)?)?;
Ok(Snapshot::builder_for(table_url.clone()).build(engine)?)
}
pub fn assert_column_mapping_mode(snapshot: &Snapshot, cm_mode: &str) -> ColumnMappingMode {
let expected = match cm_mode {
"none" => ColumnMappingMode::None,
"name" => ColumnMappingMode::Name,
"id" => ColumnMappingMode::Id,
_ => panic!("unexpected cm_mode: {cm_mode}"),
};
let actual = snapshot
.table_properties()
.column_mapping_mode
.expect("column mapping mode should be set");
assert_eq!(actual, expected);
actual
}
pub fn resolve_struct_field<'a, T: 'static>(root: &'a StructArray, path: &[String]) -> &'a T {
assert!(!path.is_empty(), "path must be non-empty");
let mut current: &StructArray = root;
for (i, name) in path.iter().enumerate() {
let col = current
.column_by_name(name)
.unwrap_or_else(|| panic!("missing field: {name}"));
if i == path.len() - 1 {
return col
.as_any()
.downcast_ref::<T>()
.expect("leaf array type mismatch");
}
current = col
.as_any()
.downcast_ref::<StructArray>()
.unwrap_or_else(|| panic!("expected StructArray at field: {name}"));
}
unreachable!()
}
pub fn resolve_json_path<'a>(
root: &'a serde_json::Value,
path: &[String],
) -> &'a serde_json::Value {
path.iter().fold(root, |v, key| &v[key])
}
pub fn assert_min_max_stats(
stats: &serde_json::Value,
physical_path: &[String],
expected_min: impl Into<serde_json::Value>,
expected_max: impl Into<serde_json::Value>,
) {
assert_eq!(
*resolve_json_path(&stats["minValues"], physical_path),
expected_min.into()
);
assert_eq!(
*resolve_json_path(&stats["maxValues"], physical_path),
expected_max.into()
);
}
pub async fn create_dv_table_with_files(
table_name: &str,
schema: Arc<StructType>,
file_paths: &[&str],
) -> Result<
(
Arc<DynObjectStore>,
Arc<dyn delta_kernel::Engine>,
Url,
Vec<String>,
),
Box<dyn std::error::Error>,
> {
let (store, engine, table_url) = engine_store_setup(table_name, None);
let engine = Arc::new(engine);
create_table(
store.clone(),
table_url.clone(),
schema.clone(),
&[],
true, vec!["deletionVectors"],
vec!["deletionVectors"],
)
.await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut txn = begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("test engine")
.with_operation("WRITE".to_string())
.with_data_change(true);
let add_files_schema = txn.add_files_schema();
let files: Vec<(&str, i64, i64, Option<i64>)> = file_paths
.iter()
.enumerate()
.map(|(i, &path)| {
(
path,
1024 + i as i64 * 100, 1000000 + i as i64, Some(3), )
})
.collect();
let metadata = create_add_files_metadata(add_files_schema, files)?;
txn.add_files(metadata);
let _ = txn.commit(engine.as_ref())?;
let paths: Vec<String> = file_paths.iter().map(|&s| s.to_string()).collect();
Ok((store, engine, table_url, paths))
}
pub fn get_scan_files(
snapshot: Arc<Snapshot>,
engine: &dyn delta_kernel::Engine,
) -> DeltaResult<Vec<FilteredEngineData>> {
let scan = snapshot.scan_builder().build()?;
let all_scan_metadata: Vec<_> = scan.scan_metadata(engine)?.collect::<Result<Vec<_>, _>>()?;
Ok(all_scan_metadata
.into_iter()
.map(|sm| sm.scan_files)
.collect())
}