use crate::TblError;
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Default, Debug)]
pub struct OutputPathSpec {
pub inputs: Option<Vec<PathBuf>>,
pub output_dir: Option<PathBuf>,
pub tree: bool,
pub file_prefix: Option<String>,
pub file_postfix: Option<String>,
pub sort: bool,
}
impl OutputPathSpec {
pub fn new() -> Self {
OutputPathSpec::default()
}
pub fn inputs<I>(mut self, inputs: I) -> Self
where
I: Into<InputPaths>,
{
self.inputs = inputs.into().0;
self
}
pub fn output_dir<T>(mut self, output_dir: T) -> Self
where
T: Into<OutputDirType>,
{
self.output_dir = output_dir.into().into();
self
}
pub fn tree(mut self, tree: bool) -> Self {
self.tree = tree;
self
}
pub fn file_prefix<T>(mut self, file_prefix: T) -> Self
where
T: Into<Option<String>>,
{
self.file_prefix = file_prefix.into();
self
}
pub fn file_postfix<T>(mut self, file_postfix: T) -> Self
where
T: Into<Option<String>>,
{
self.file_postfix = file_postfix.into();
self
}
pub fn sort(mut self, sort: bool) -> Self {
self.sort = sort;
self
}
}
pub enum OutputDirType {
Str(&'static str),
String(String),
PathBuf(PathBuf),
None,
}
impl From<OutputDirType> for Option<PathBuf> {
fn from(output_dir: OutputDirType) -> Self {
match output_dir {
OutputDirType::Str(s) => Some(PathBuf::from(s)),
OutputDirType::String(s) => Some(PathBuf::from(s)),
OutputDirType::PathBuf(p) => Some(p),
OutputDirType::None => None,
}
}
}
impl From<&'static str> for OutputDirType {
fn from(s: &'static str) -> Self {
OutputDirType::Str(s)
}
}
impl From<String> for OutputDirType {
fn from(s: String) -> Self {
OutputDirType::String(s)
}
}
impl From<PathBuf> for OutputDirType {
fn from(p: PathBuf) -> Self {
OutputDirType::PathBuf(p)
}
}
impl<T> From<Option<T>> for OutputDirType
where
T: Into<OutputDirType>,
{
fn from(opt: Option<T>) -> Self {
match opt {
Some(v) => v.into(),
None => OutputDirType::None,
}
}
}
pub struct InputPaths(Option<Vec<PathBuf>>);
impl From<Vec<PathBuf>> for InputPaths {
fn from(v: Vec<PathBuf>) -> Self {
InputPaths(Some(v))
}
}
impl From<Option<Vec<PathBuf>>> for InputPaths {
fn from(v: Option<Vec<PathBuf>>) -> Self {
InputPaths(v)
}
}
impl From<Vec<String>> for InputPaths {
fn from(v: Vec<String>) -> Self {
InputPaths(Some(v.into_iter().map(PathBuf::from).collect()))
}
}
impl From<Option<Vec<String>>> for InputPaths {
fn from(v: Option<Vec<String>>) -> Self {
InputPaths(v.map(|strings| strings.into_iter().map(PathBuf::from).collect()))
}
}
impl<'a> From<Vec<&'a str>> for InputPaths {
fn from(v: Vec<&'a str>) -> Self {
InputPaths(Some(v.into_iter().map(PathBuf::from).collect()))
}
}
impl<'a> From<Option<Vec<&'a str>>> for InputPaths {
fn from(v: Option<Vec<&'a str>>) -> Self {
InputPaths(v.map(|strings| strings.into_iter().map(PathBuf::from).collect()))
}
}
pub fn get_output_paths(
output_spec: OutputPathSpec,
) -> Result<(Vec<PathBuf>, Vec<PathBuf>), TblError> {
let output_dir = output_spec.output_dir;
let inputs = match output_spec.inputs {
None => vec![std::env::current_dir()?],
Some(inputs) => inputs,
};
let mut return_inputs: Vec<PathBuf> = Vec::new();
let mut return_outputs: Vec<PathBuf> = Vec::new();
for input in inputs {
let metadata = std::fs::metadata(&input)?;
if metadata.is_file() {
let output = super::manipulate::convert_file_path(
&input,
&output_dir,
&output_spec.file_prefix,
&output_spec.file_postfix,
)?;
return_inputs.push(input.clone());
return_outputs.push(output);
} else if metadata.is_dir() {
if !output_spec.tree {
for sub_input in super::gather::get_directory_tabular_files(&input)?.into_iter() {
let output = super::manipulate::convert_file_path(
&sub_input,
&output_dir,
&output_spec.file_prefix,
&output_spec.file_postfix,
)?;
return_inputs.push(sub_input);
return_outputs.push(output);
}
} else {
for sub_input in super::gather::get_tree_tabular_files(&input)?.into_iter() {
let new_path = if let Some(output_dir) = output_dir.clone() {
let relative_path = sub_input.strip_prefix(&input)?.to_path_buf();
output_dir.join(relative_path)
} else {
sub_input.clone()
};
let output = super::manipulate::convert_file_path(
&new_path,
&None,
&output_spec.file_prefix,
&output_spec.file_postfix,
)?;
return_inputs.push(sub_input.clone());
return_outputs.push(output);
}
}
} else {
return Err(TblError::Error("".to_string()));
};
}
let (return_inputs, return_outputs) = if output_spec.sort {
let mut paired = return_inputs
.into_iter()
.zip(return_outputs)
.collect::<Vec<_>>();
paired.sort_by(|a, b| a.1.cmp(&b.1));
paired.into_iter().unzip()
} else {
(return_inputs, return_outputs)
};
let mut count_per_output: HashMap<PathBuf, usize> = HashMap::new();
for output in return_outputs.iter() {
*count_per_output.entry(output.clone()).or_insert(0) += 1;
if count_per_output[output] > 1 {
return Err(TblError::Error(format!(
"Duplicate output path: {:?}",
output
)));
}
}
Ok((return_inputs, return_outputs))
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::{self, File};
use tempfile::TempDir;
fn create_test_file_tree() -> TempDir {
let temp_dir = TempDir::new().unwrap();
println!("Created temporary directory: {:?}", temp_dir.path());
let root = temp_dir.path().join("root");
fs::create_dir(&root).unwrap();
File::create(root.join("super_data_a.parquet")).unwrap();
File::create(root.join("super_data_b.parquet")).unwrap();
let data1 = root.join("data1");
fs::create_dir(&data1).unwrap();
File::create(data1.join("data1_a.parquet")).unwrap();
File::create(data1.join("data1_b.parquet")).unwrap();
let sub_data1_1 = data1.join("sub_data1_1");
fs::create_dir(&sub_data1_1).unwrap();
File::create(sub_data1_1.join("sub_data1_a.parquet")).unwrap();
File::create(sub_data1_1.join("sub_data1_b.parquet")).unwrap();
let data2 = root.join("data2");
fs::create_dir(&data2).unwrap();
File::create(data2.join("data2_a.parquet")).unwrap();
File::create(data2.join("data2_b.parquet")).unwrap();
temp_dir
}
struct TestCase {
name: &'static str,
spec: OutputPathSpec,
expected_outputs: Vec<&'static str>,
}
macro_rules! generate_tests {
($($name:ident: $value:expr,)*) => {
$(
#[test]
fn $name() {
let test_case: TestCase = $value;
let mut spec = test_case.spec;
let temp_dir = create_test_file_tree();
let temp_path = temp_dir.path().to_path_buf();
if let Some(inputs) = spec.inputs.as_ref() {
spec.inputs = Some(inputs.iter().map(|p| temp_path.join(p)).collect());
} else {
spec.inputs = Some(vec![temp_path.join("root")]);
}
if let Some(output_dir) = spec.output_dir.as_ref() {
spec.output_dir = Some(temp_path.join(output_dir));
}
let (_inputs, outputs) = get_output_paths(spec).unwrap();
let expected_outputs: Vec<PathBuf> = test_case.expected_outputs
.into_iter()
.map(|p| temp_dir.path().join(p))
.collect();
let mut sorted_outputs = outputs.clone();
sorted_outputs.sort();
let mut sorted_expected_outputs = expected_outputs.clone();
sorted_expected_outputs.sort();
assert_eq!(
sorted_outputs,
sorted_expected_outputs,
"Test case '{}' failed.\nExpected (sorted): {:?}\nGot (sorted): {:?}",
test_case.name,
sorted_expected_outputs,
sorted_outputs
);
}
)*
}
}
generate_tests! {
test_root_input: TestCase {
name: "Root input",
spec: OutputPathSpec::new().inputs(vec!["root"]),
expected_outputs: vec![
"root/super_data_a.parquet",
"root/super_data_b.parquet",
],
},
test_root_input_tree: TestCase {
name: "Root input with tree",
spec: OutputPathSpec::new().inputs(vec!["root"]).tree(true),
expected_outputs: vec![
"root/super_data_a.parquet",
"root/super_data_b.parquet",
"root/data1/data1_a.parquet",
"root/data1/data1_b.parquet",
"root/data1/sub_data1_1/sub_data1_a.parquet",
"root/data1/sub_data1_1/sub_data1_b.parquet",
"root/data2/data2_a.parquet",
"root/data2/data2_b.parquet",
],
},
test_root_input_self_output_dir: TestCase {
name: "Root input with self output dir",
spec: OutputPathSpec::new().inputs(vec!["root"]).output_dir("root"),
expected_outputs: vec![
"root/super_data_a.parquet",
"root/super_data_b.parquet",
],
},
test_root_input_self_output_dir_tree: TestCase {
name: "Root input with self output dir tree",
spec: OutputPathSpec::new().inputs(vec!["root"]).output_dir("root").tree(true),
expected_outputs: vec![
"root/super_data_a.parquet",
"root/super_data_b.parquet",
"root/data1/data1_a.parquet",
"root/data1/data1_b.parquet",
"root/data1/sub_data1_1/sub_data1_a.parquet",
"root/data1/sub_data1_1/sub_data1_b.parquet",
"root/data2/data2_a.parquet",
"root/data2/data2_b.parquet",
],
},
test_root_input_output_dir: TestCase {
name: "Root input with other output dir",
spec: OutputPathSpec::new().inputs(vec!["root"]).output_dir("other_root"),
expected_outputs: vec![
"other_root/super_data_a.parquet",
"other_root/super_data_b.parquet",
],
},
test_root_input_output_dir_tree: TestCase {
name: "Root input with other output dir tree",
spec: OutputPathSpec::new().inputs(vec!["root"]).output_dir("other_root").tree(true),
expected_outputs: vec![
"other_root/super_data_a.parquet",
"other_root/super_data_b.parquet",
"other_root/data1/data1_a.parquet",
"other_root/data1/data1_b.parquet",
"other_root/data1/sub_data1_1/sub_data1_a.parquet",
"other_root/data1/sub_data1_1/sub_data1_b.parquet",
"other_root/data2/data2_a.parquet",
"other_root/data2/data2_b.parquet",
],
},
test_data1_input: TestCase {
name: "Data1 input",
spec: OutputPathSpec::new().inputs(vec!["root/data1"]),
expected_outputs: vec![
"root/data1/data1_a.parquet",
"root/data1/data1_b.parquet",
],
},
test_data1_input_tree: TestCase {
name: "Data1 input with tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1"]).tree(true),
expected_outputs: vec![
"root/data1/data1_a.parquet",
"root/data1/data1_b.parquet",
"root/data1/sub_data1_1/sub_data1_a.parquet",
"root/data1/sub_data1_1/sub_data1_b.parquet",
],
},
test_data1_input_root_output: TestCase {
name: "Data1 input with root output",
spec: OutputPathSpec::new().inputs(vec!["root/data1"]).output_dir("root"),
expected_outputs: vec![
"root/data1_a.parquet",
"root/data1_b.parquet",
],
},
test_data1_input_root_output_tree: TestCase {
name: "Data1 input with root output and tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1"]).output_dir("root").tree(true),
expected_outputs: vec![
"root/data1_a.parquet",
"root/data1_b.parquet",
"root/sub_data1_1/sub_data1_a.parquet",
"root/sub_data1_1/sub_data1_b.parquet",
],
},
test_data1_input_other_output: TestCase {
name: "Data1 input with other output",
spec: OutputPathSpec::new().inputs(vec!["root/data1"]).output_dir("other_root"),
expected_outputs: vec![
"other_root/data1_a.parquet",
"other_root/data1_b.parquet",
],
},
test_data1_input_other_output_tree: TestCase {
name: "Data1 input with other output and tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1"]).output_dir("other_root").tree(true),
expected_outputs: vec![
"other_root/data1_a.parquet",
"other_root/data1_b.parquet",
"other_root/sub_data1_1/sub_data1_a.parquet",
"other_root/sub_data1_1/sub_data1_b.parquet",
],
},
test_data1_data2_input: TestCase {
name: "Data1 and Data2 input",
spec: OutputPathSpec::new().inputs(vec!["root/data1", "root/data2"]),
expected_outputs: vec![
"root/data1/data1_a.parquet",
"root/data1/data1_b.parquet",
"root/data2/data2_a.parquet",
"root/data2/data2_b.parquet",
],
},
test_data1_data2_input_tree: TestCase {
name: "Data1 and Data2 input with tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1", "root/data2"]).tree(true),
expected_outputs: vec![
"root/data1/data1_a.parquet",
"root/data1/data1_b.parquet",
"root/data1/sub_data1_1/sub_data1_a.parquet",
"root/data1/sub_data1_1/sub_data1_b.parquet",
"root/data2/data2_a.parquet",
"root/data2/data2_b.parquet",
],
},
test_data1_data2_input_root_output: TestCase {
name: "Data1 and Data2 input with root output",
spec: OutputPathSpec::new().inputs(vec!["root/data1", "root/data2"]).output_dir("root"),
expected_outputs: vec![
"root/data1_a.parquet",
"root/data1_b.parquet",
"root/data2_a.parquet",
"root/data2_b.parquet",
],
},
test_data1_data2_input_root_output_tree: TestCase {
name: "Data1 and Data2 input with root output and tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1", "root/data2"]).output_dir("root").tree(true),
expected_outputs: vec![
"root/data1_a.parquet",
"root/data1_b.parquet",
"root/sub_data1_1/sub_data1_a.parquet",
"root/sub_data1_1/sub_data1_b.parquet",
"root/data2_a.parquet",
"root/data2_b.parquet",
],
},
test_data1_data2_input_other_output: TestCase {
name: "Data1 and Data2 input with other output",
spec: OutputPathSpec::new().inputs(vec!["root/data1", "root/data2"]).output_dir("other_root"),
expected_outputs: vec![
"other_root/data1_a.parquet",
"other_root/data1_b.parquet",
"other_root/data2_a.parquet",
"other_root/data2_b.parquet",
],
},
test_data1_data2_input_other_output_tree: TestCase {
name: "Data1 and Data2 input with other output and tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1", "root/data2"]).output_dir("other_root").tree(true),
expected_outputs: vec![
"other_root/data1_a.parquet",
"other_root/data1_b.parquet",
"other_root/sub_data1_1/sub_data1_a.parquet",
"other_root/sub_data1_1/sub_data1_b.parquet",
"other_root/data2_a.parquet",
"other_root/data2_b.parquet",
],
},
test_specific_files_input: TestCase {
name: "Specific files input",
spec: OutputPathSpec::new().inputs(vec!["root/data1/data1_a.parquet", "root/super_data_a.parquet"]),
expected_outputs: vec![
"root/data1/data1_a.parquet",
"root/super_data_a.parquet",
],
},
test_specific_files_input_tree: TestCase {
name: "Specific files input with tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1/data1_a.parquet", "root/super_data_a.parquet"]).tree(true),
expected_outputs: vec![
"root/data1/data1_a.parquet",
"root/super_data_a.parquet",
],
},
test_specific_files_input_root_output: TestCase {
name: "Specific files input with root output",
spec: OutputPathSpec::new().inputs(vec!["root/data1/data1_a.parquet", "root/super_data_a.parquet"]).output_dir("root"),
expected_outputs: vec![
"root/data1_a.parquet",
"root/super_data_a.parquet",
],
},
test_specific_files_input_root_output_tree: TestCase {
name: "Specific files input with root output and tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1/data1_a.parquet", "root/super_data_a.parquet"]).output_dir("root").tree(true),
expected_outputs: vec![
"root/data1_a.parquet",
"root/super_data_a.parquet",
],
},
test_specific_files_input_other_output: TestCase {
name: "Specific files input with other output",
spec: OutputPathSpec::new().inputs(vec!["root/data1/data1_a.parquet", "root/super_data_a.parquet"]).output_dir("other_root"),
expected_outputs: vec![
"other_root/data1_a.parquet",
"other_root/super_data_a.parquet",
],
},
test_specific_files_input_other_output_tree: TestCase {
name: "Specific files input with other output and tree",
spec: OutputPathSpec::new().inputs(vec!["root/data1/data1_a.parquet", "root/super_data_a.parquet"]).output_dir("other_root").tree(true),
expected_outputs: vec![
"other_root/data1_a.parquet",
"other_root/super_data_a.parquet",
],
},
}
}