use std::collections::HashSet;
use std::path::{Path, PathBuf};
use glob::{Pattern, glob};
use walkdir::WalkDir;
use crate::error::{IngestionError, IngestionResult};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PartitionSegment {
pub key: String,
pub value: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PartitionedFile {
pub path: PathBuf,
pub segments: Vec<PartitionSegment>,
}
pub fn parse_partition_segment(component: &str) -> Option<PartitionSegment> {
let (k, v) = component.split_once('=')?;
if k.is_empty() || v.is_empty() {
return None;
}
Some(PartitionSegment {
key: k.to_string(),
value: v.to_string(),
})
}
pub fn hive_segments_for_relative_parent(relative_parent: &Path) -> Option<Vec<PartitionSegment>> {
let mut segments = Vec::new();
for c in relative_parent.components() {
let std::path::Component::Normal(part) = c else {
continue;
};
let s = part.to_str()?;
segments.push(parse_partition_segment(s)?);
}
Some(segments)
}
pub fn discover_hive_partitioned_files(
root: impl AsRef<Path>,
file_pattern: Option<&str>,
) -> IngestionResult<Vec<PartitionedFile>> {
let root = root.as_ref();
if !root.is_dir() {
return Err(IngestionError::SchemaMismatch {
message: format!(
"hive partition root must be an existing directory: {}",
root.display()
),
});
}
let pattern = match file_pattern {
None => None,
Some(p) => Some(Pattern::new(p).map_err(|e| IngestionError::SchemaMismatch {
message: format!("invalid glob pattern '{p}': {e}"),
})?),
};
let root = root.to_path_buf();
let mut out = Vec::new();
for entry in WalkDir::new(&root)
.follow_links(false)
.into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
if !path.is_file() {
continue;
}
let rel = match path.strip_prefix(&root) {
Ok(r) => r.to_path_buf(),
Err(_) => continue,
};
if let Some(ref pat) = pattern {
if !pat.matches_path_with(
&rel,
glob::MatchOptions {
case_sensitive: true,
require_literal_separator: true,
require_literal_leading_dot: false,
},
) {
continue;
}
}
let parent = rel.parent().unwrap_or_else(|| Path::new(""));
if let Some(segments) = hive_segments_for_relative_parent(parent) {
out.push(PartitionedFile {
path: path.to_path_buf(),
segments,
});
}
}
out.sort_by(|a, b| a.path.cmp(&b.path));
Ok(out)
}
pub fn paths_from_glob(pattern: &str) -> IngestionResult<Vec<PathBuf>> {
let mut out: Vec<PathBuf> = Vec::new();
for entry in glob(pattern).map_err(|e| IngestionError::SchemaMismatch {
message: format!("invalid glob pattern '{pattern}': {e}"),
})? {
let p = entry.map_err(|e| IngestionError::SchemaMismatch {
message: format!("glob expansion error for '{pattern}': {e}"),
})?;
if p.is_file() {
out.push(p);
}
}
out.sort();
out.dedup();
Ok(out)
}
pub fn paths_from_directory_scan(
root: impl AsRef<Path>,
relative_pattern: Option<&str>,
) -> IngestionResult<Vec<PathBuf>> {
let root = root.as_ref();
if !root.is_dir() {
return Err(IngestionError::SchemaMismatch {
message: format!(
"directory scan root must be an existing directory: {}",
root.display()
),
});
}
let pattern = match relative_pattern {
None => None,
Some(p) => Some(Pattern::new(p).map_err(|e| IngestionError::SchemaMismatch {
message: format!("invalid glob pattern '{p}': {e}"),
})?),
};
let root = root.to_path_buf();
let mut out = Vec::new();
for entry in WalkDir::new(&root)
.follow_links(false)
.into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
if !path.is_file() {
continue;
}
let rel = match path.strip_prefix(&root) {
Ok(r) => r.to_path_buf(),
Err(_) => continue,
};
if let Some(ref pat) = pattern {
if !pat.matches_path_with(
&rel,
glob::MatchOptions {
case_sensitive: true,
require_literal_separator: true,
require_literal_leading_dot: false,
},
) {
continue;
}
}
out.push(path.to_path_buf());
}
out.sort();
out.dedup();
Ok(out)
}
pub fn paths_from_explicit_list(paths: &[PathBuf]) -> IngestionResult<Vec<PathBuf>> {
let mut seen = HashSet::new();
let mut out = Vec::new();
for p in paths {
if !p.is_file() {
return Err(IngestionError::SchemaMismatch {
message: format!("explicit path is not an existing file: {}", p.display()),
});
}
if seen.insert(p.clone()) {
out.push(p.clone());
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_segment_happy() {
let s = parse_partition_segment("dt=2024-01-01").unwrap();
assert_eq!(s.key, "dt");
assert_eq!(s.value, "2024-01-01");
}
#[test]
fn parse_segment_rejects() {
assert!(parse_partition_segment("nodash").is_none());
assert!(parse_partition_segment("=v").is_none());
assert!(parse_partition_segment("k=").is_none());
}
#[test]
fn hive_segments_nested() {
let p = Path::new("dt=2024-01-01").join("region=us");
let segs = hive_segments_for_relative_parent(&p).unwrap();
assert_eq!(segs.len(), 2);
assert_eq!(segs[0].key, "dt");
assert_eq!(segs[1].key, "region");
}
}