mod factories;
#[cfg(all(test, feature = "datafusion"))]
pub(crate) mod datafusion;
#[cfg(test)]
pub(crate) mod object_store;
use std::{collections::HashMap, path::PathBuf, process::Command};
use url::Url;
pub use self::factories::*;
#[cfg(test)]
use crate::DeltaTable;
#[cfg(test)]
use crate::kernel::LogicalFileView;
#[cfg(test)]
use crate::logstore::LogStoreRef;
#[cfg(test)]
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTableBuilder};
#[cfg(test)]
use futures::TryStreamExt;
pub type TestResult<T = ()> = Result<T, Box<dyn std::error::Error + 'static>>;
#[cfg(test)]
pub(crate) fn open_fs_path(path: &str) -> DeltaTable {
let url =
url::Url::from_directory_path(std::path::Path::new(path).canonicalize().unwrap()).unwrap();
DeltaTableBuilder::from_url(url).unwrap().build().unwrap()
}
#[cfg(test)]
pub(crate) async fn file_paths_from(
state: &DeltaTableState,
log_store: &LogStoreRef,
) -> DeltaResult<Vec<String>> {
Ok(state
.snapshot()
.file_views(log_store, None)
.try_collect::<Vec<LogicalFileView>>()
.await?
.iter()
.map(|lfv| lfv.path().to_string())
.collect())
}
pub enum TestTables {
Simple,
SimpleWithCheckpoint,
SimpleCommit,
Golden,
Delta0_8_0Partitioned,
Delta0_8_0SpecialPartitioned,
Checkpoints,
LatestNotCheckpointed,
WithDvSmall,
Custom(String),
}
impl TestTables {
pub fn as_path(&self) -> PathBuf {
let data_path = find_git_root().join("crates/test/tests/data");
match self {
Self::Simple => data_path.join("simple_table"),
Self::SimpleWithCheckpoint => data_path.join("simple_table_with_checkpoint"),
Self::SimpleCommit => data_path.join("simple_commit"),
Self::Golden => data_path.join("golden/data-reader-array-primitives"),
Self::Delta0_8_0Partitioned => data_path.join("delta-0.8.0-partitioned"),
Self::Delta0_8_0SpecialPartitioned => data_path.join("delta-0.8.0-special-partition"),
Self::Checkpoints => data_path.join("checkpoints"),
Self::LatestNotCheckpointed => data_path.join("latest_not_checkpointed"),
Self::WithDvSmall => data_path.join("table-with-dv-small"),
Self::Custom(_) => todo!(),
}
}
pub fn as_name(&self) -> String {
match self {
Self::Simple => "simple".into(),
Self::SimpleWithCheckpoint => "simple_table_with_checkpoint".into(),
Self::SimpleCommit => "simple_commit".into(),
Self::Golden => "golden".into(),
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
Self::Delta0_8_0SpecialPartitioned => "delta-0.8.0-special-partition".into(),
Self::Checkpoints => "checkpoints".into(),
Self::LatestNotCheckpointed => "latest_not_checkpointed".into(),
Self::WithDvSmall => "table-with-dv-small".into(),
Self::Custom(name) => name.to_owned(),
}
}
pub fn uri_for_table(&self, root_uri: impl AsRef<str>) -> String {
let root_uri = root_uri.as_ref();
if root_uri.ends_with('/') {
format!("{}{}", root_uri, self.as_name())
} else {
format!("{}/{}", root_uri, self.as_name())
}
}
pub fn table_builder(&self) -> DeltaResult<DeltaTableBuilder> {
let url = Url::from_directory_path(self.as_path()).map_err(|_| {
crate::DeltaTableError::InvalidTableLocation(
self.as_path().to_string_lossy().into_owned(),
)
})?;
DeltaTableBuilder::from_url(url).map(|b| b.with_allow_http(true))
}
}
fn find_git_root() -> PathBuf {
let output = Command::new("git")
.arg("rev-parse")
.arg("--show-toplevel")
.output()
.unwrap();
PathBuf::from(String::from_utf8(output.stdout).unwrap().trim())
}
pub fn with_env(vars: Vec<(&str, &str)>) -> impl Drop {
let original_values: HashMap<String, Option<String>> = vars
.iter()
.map(|(key, _)| (key.to_string(), std::env::var(key).ok()))
.collect();
for (key, value) in vars {
unsafe {
std::env::set_var(key, value);
}
}
struct EnvCleanup(HashMap<String, Option<String>>);
impl Drop for EnvCleanup {
fn drop(&mut self) {
for (key, maybe_value) in self.0.iter() {
match maybe_value {
Some(value) => unsafe { std::env::set_var(key, value) },
None => unsafe { std::env::remove_var(key) },
}
}
}
}
EnvCleanup(original_values)
}
#[macro_export]
macro_rules! assert_batches_sorted_eq {
($EXPECTED_LINES: expr, $CHUNKS: expr) => {
let mut expected_lines: Vec<String> = $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
let num_lines = expected_lines.len();
if num_lines > 3 {
expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
.unwrap()
.to_string();
let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
let num_lines = actual_lines.len();
if num_lines > 3 {
actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n",
);
};
}
pub use assert_batches_sorted_eq;
#[cfg(test)]
pub(crate) fn make_test_add(
path: impl Into<String>,
partitions: &[(&str, &str)],
modification_time: i64,
) -> crate::kernel::Add {
use crate::kernel::Add;
Add {
path: path.into(),
partition_values: partitions
.iter()
.map(|(k, v)| ((*k).to_string(), Some((*v).to_string())))
.collect(),
size: 0,
modification_time,
data_change: true,
stats: None,
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
}
}
#[cfg(all(test, feature = "datafusion"))]
pub(crate) fn multibatch_add_actions_for_partition(
action_count: usize,
partition_column: &str,
even_value: &str,
odd_value: &str,
) -> Vec<crate::kernel::Action> {
use chrono::Utc;
use crate::kernel::Action;
let now_ms = Utc::now().timestamp_millis();
(0..action_count)
.map(|idx| {
let partition_value = if idx % 2 == 0 { even_value } else { odd_value };
Action::Add(make_test_add(
format!("{partition_column}={partition_value}/file-{idx:05}.parquet"),
&[(partition_column, partition_value)],
now_ms,
))
})
.collect::<Vec<_>>()
}
#[cfg(test)]
mod tests {
use std::env;
use super::*;
#[test]
fn test_api_with_env() {
let _env = with_env(vec![
("API_KEY", "test_key"),
("API_URL", "http://test.example.com"),
]);
assert_eq!(env::var("API_KEY").unwrap(), "test_key");
assert_eq!(env::var("API_URL").unwrap(), "http://test.example.com");
drop(_env);
assert!(env::var("API_KEY").is_err());
assert!(env::var("API_URL").is_err());
}
}