use std::str::FromStr;
use url::Url;
use crate::{DeltaResult, Error, FileMeta, Version};
const VERSION_LEN: usize = 20;
const MULTIPART_PART_LEN: usize = 10;
const UUID_PART_LEN: usize = 36;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
enum LogPathFileType {
Commit,
SinglePartCheckpoint,
#[allow(unused)]
UuidCheckpoint(String),
#[allow(unused)]
MultiPartCheckpoint {
part_num: u32,
num_parts: u32,
},
#[allow(unused)]
CompactedCommit {
hi: Version,
},
Unknown,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ParsedLogPath<Location: AsUrl = FileMeta> {
pub location: Location,
#[allow(unused)]
pub filename: String,
#[allow(unused)]
pub extension: String,
pub version: Version,
pub file_type: LogPathFileType,
}
fn parse_path_part<T: FromStr>(value: &str, expect_len: usize, location: &Url) -> DeltaResult<T> {
match value.parse() {
Ok(result) if value.len() == expect_len => Ok(result),
_ => Err(Error::invalid_log_path(location)),
}
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
trait AsUrl {
fn as_url(&self) -> &Url;
}
impl AsUrl for FileMeta {
fn as_url(&self) -> &Url {
&self.location
}
}
impl AsUrl for Url {
fn as_url(&self) -> &Url {
self
}
}
impl<Location: AsUrl> ParsedLogPath<Location> {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn try_from(location: Location) -> DeltaResult<Option<ParsedLogPath<Location>>> {
let url = location.as_url();
let filename = url
.path_segments()
.ok_or_else(|| Error::invalid_log_path(url))?
.last()
.unwrap() .to_string();
if filename.is_empty() {
return Err(Error::invalid_log_path(url));
}
let mut split = filename.split('.');
let version = split.next().unwrap();
let version = match version.parse().ok() {
Some(v) if version.len() == VERSION_LEN => v,
Some(_) => return Err(Error::invalid_log_path(url)),
None => return Ok(None),
};
let split: Vec<_> = split.collect();
let extension = match split.last() {
Some(extension) => extension.to_string(),
None => return Ok(None),
};
let file_type = match split.as_slice() {
["json"] => LogPathFileType::Commit,
["checkpoint", "parquet"] => LogPathFileType::SinglePartCheckpoint,
["checkpoint", uuid, "json" | "parquet"] => {
let uuid = parse_path_part(uuid, UUID_PART_LEN, url)?;
LogPathFileType::UuidCheckpoint(uuid)
}
[hi, "compacted", "json"] => {
let hi = parse_path_part(hi, VERSION_LEN, url)?;
LogPathFileType::CompactedCommit { hi }
}
["checkpoint", part_num, num_parts, "parquet"] => {
let part_num = parse_path_part(part_num, MULTIPART_PART_LEN, url)?;
let num_parts = parse_path_part(num_parts, MULTIPART_PART_LEN, url)?;
if !(0 < part_num && part_num <= num_parts) {
return Err(Error::invalid_log_path(url));
}
LogPathFileType::MultiPartCheckpoint {
part_num,
num_parts,
}
}
_ => LogPathFileType::Unknown,
};
Ok(Some(ParsedLogPath {
location,
filename,
extension,
version,
file_type,
}))
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn is_commit(&self) -> bool {
matches!(self.file_type, LogPathFileType::Commit)
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn is_checkpoint(&self) -> bool {
matches!(
self.file_type,
LogPathFileType::SinglePartCheckpoint | LogPathFileType::MultiPartCheckpoint { .. }
)
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[allow(dead_code)] fn is_unknown(&self) -> bool {
matches!(
self.file_type,
LogPathFileType::Unknown | LogPathFileType::UuidCheckpoint(_)
)
}
}
impl ParsedLogPath<Url> {
pub(crate) fn new_commit(
table_root: &Url,
version: Version,
) -> DeltaResult<ParsedLogPath<Url>> {
let filename = format!("{:020}.json", version);
let location = table_root.join("_delta_log/")?.join(&filename)?;
let path = Self::try_from(location)?
.ok_or_else(|| Error::internal_error("attempted to create invalid commit path"))?;
if !path.is_commit() {
return Err(Error::internal_error(
"ParsedLogPath::new_commit created a non-commit path",
));
}
Ok(path)
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::*;
fn table_log_dir_url() -> Url {
let path = PathBuf::from("./tests/data/table-with-dv-small/_delta_log/");
let path = std::fs::canonicalize(path).unwrap();
assert!(path.is_dir());
let url = url::Url::from_directory_path(path).unwrap();
assert!(url.path().ends_with('/'));
url
}
#[test]
fn test_unknown_invalid_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir.join("subdir/").unwrap();
assert!(log_path
.path()
.ends_with("/tests/data/table-with-dv-small/_delta_log/subdir/"));
ParsedLogPath::try_from(log_path).expect_err("directory path");
let log_path = table_log_dir.join("_last_checkpoint").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("00000000000000000010").unwrap();
let result = ParsedLogPath::try_from(log_path);
assert!(
matches!(result, Ok(None)),
"Expected Ok(None) for missing file extension"
);
let log_path = table_log_dir.join("00000000000000000011.").unwrap();
let result = ParsedLogPath::try_from(log_path);
assert!(
matches!(
result,
Ok(Some(ParsedLogPath {
file_type: LogPathFileType::Unknown,
..
}))
),
"Expected Unknown file type, got {:?}",
result
);
let log_path = table_log_dir.join("abc.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("000000000000000000010.json").unwrap();
ParsedLogPath::try_from(log_path).expect_err("too many digits");
let log_path = table_log_dir.join("0000000000000000010.json").unwrap();
ParsedLogPath::try_from(log_path).expect_err("too few digits");
let log_path = table_log_dir.join("00000000000000000010.foo").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000010.foo");
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 10);
assert!(matches!(log_path.file_type, LogPathFileType::Unknown));
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000010.a.b.c.foo")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000010.a.b.c.foo");
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 10);
assert!(log_path.is_unknown());
}
#[test]
fn test_commit_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir.join("00000000000000000000.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000000.json");
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 0);
assert!(matches!(log_path.file_type, LogPathFileType::Commit));
assert!(log_path.is_commit());
assert!(!log_path.is_checkpoint());
let log_path = table_log_dir.join("00000000000000000005.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.version, 5);
assert!(log_path.is_commit());
}
#[test]
fn test_single_part_checkpoint_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000002.checkpoint.parquet");
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 2);
assert!(matches!(
log_path.file_type,
LogPathFileType::SinglePartCheckpoint
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000002.checkpoint.json");
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 2);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
}
#[test]
fn test_uuid_checkpoint_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 2);
assert!(matches!(
log_path.file_type,
LogPathFileType::UuidCheckpoint(ref u) if u == "3a0d65cd-4056-49b8-937b-95f9e3ee90e5",
));
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 2);
assert!(matches!(
log_path.file_type,
LogPathFileType::UuidCheckpoint(ref u) if u == "3a0d65cd-4056-49b8-937b-95f9e3ee90e5",
));
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo"
);
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 2);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.foo.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("not a uuid");
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.foo")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000002.checkpoint.foo");
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 2);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000010.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e.parquet")
.unwrap();
let result = ParsedLogPath::try_from(log_path);
assert!(
matches!(result, Err(Error::InvalidLogPath(_))),
"Expected an error for UUID with exactly 35 characters"
);
}
#[test]
fn test_multi_part_checkpoint_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000000.0000000002.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.checkpoint.0000000000.0000000002.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 8);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000000.0000000002.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("invalid part 0");
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000001.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.checkpoint.0000000001.0000000002.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 8);
assert!(matches!(
log_path.file_type,
LogPathFileType::MultiPartCheckpoint {
part_num: 1,
num_parts: 2
}
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000002.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.checkpoint.0000000002.0000000002.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 8);
assert!(matches!(
log_path.file_type,
LogPathFileType::MultiPartCheckpoint {
part_num: 2,
num_parts: 2
}
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000003.0000000002.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("invalid part 3");
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.000000001.0000000002.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("invalid part_num");
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000001.000000002.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("invalid num_parts");
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.00000000x1.0000000002.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("invalid part_num");
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000001.00000000x2.parquet")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("invalid num_parts");
}
#[test]
fn test_compacted_delta_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000008.00000000000000000015.compacted.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.00000000000000000015.compacted.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 8);
assert!(matches!(
log_path.file_type,
LogPathFileType::CompactedCommit { hi: 15 },
));
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000008.00000000000000000015.compacted.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.00000000000000000015.compacted.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 8);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000008.0000000000000000015.compacted.json")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("too few digits in hi");
let log_path = table_log_dir
.join("00000000000000000008.000000000000000000015.compacted.json")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("too many digits in hi");
let log_path = table_log_dir
.join("00000000000000000008.00000000000000000a15.compacted.json")
.unwrap();
ParsedLogPath::try_from(log_path).expect_err("non-numeric hi");
}
#[test]
fn test_new_commit() {
let table_log_dir = table_log_dir_url();
let log_path = ParsedLogPath::new_commit(&table_log_dir, 10).unwrap();
assert_eq!(log_path.version, 10);
assert!(log_path.is_commit());
assert_eq!(log_path.extension, "json");
assert!(matches!(log_path.file_type, LogPathFileType::Commit));
assert_eq!(log_path.filename, "00000000000000000010.json");
}
}