use url::Url;
use crate::{DeltaResult, Version};
const DELIMITER: &str = "/";
const VERSION_LEN: usize = 20;
const MULTIPART_PART_LEN: usize = 10;
#[derive(Debug)]
pub(crate) struct LogPath<'a> {
url: &'a Url,
pub(crate) filename: Option<&'a str>,
pub(crate) version: Option<Version>,
_compacted_to_version: Option<Version>,
pub(crate) is_commit: bool,
pub(crate) is_checkpoint: bool,
}
fn get_filename(path: &str) -> Option<&str> {
if path.is_empty() || path.ends_with('/') {
None
} else {
path.rsplit(DELIMITER).next()
}
}
fn get_version_opt(version_str_opt: Option<&str>, expected_digits: usize) -> Option<Version> {
version_str_opt.and_then(|version_str| {
if version_str.len() == expected_digits {
version_str.parse().ok()
} else {
None
}
})
}
pub(crate) fn version_from_location(location: &Url) -> Option<Version> {
let path = location.path();
get_filename(path)
.and_then(|f| f.split_once('.'))
.and_then(|(name, _)| get_version_opt(Some(name), VERSION_LEN))
}
impl<'a> LogPath<'a> {
pub(crate) fn new(url: &'a Url) -> Self {
let filename = get_filename(url.path());
let version_str = filename.and_then(|f| f.split_once('.'));
let version = version_str.and_then(|(name, _)| get_version_opt(Some(name), VERSION_LEN));
let mut is_commit = false;
let mut is_checkpoint = false;
let mut compacted_to_version = None;
if version.is_some() {
let (_, suffix) = version_str.unwrap(); is_commit = suffix == "json";
if !is_commit && suffix.starts_with("checkpoint.") {
let rest = &suffix[11..]; is_checkpoint = rest == "parquet";
if !is_checkpoint {
let mut split = rest.splitn(3, '.');
let (checkpoint_index, checkpoint_max, ext) = (
get_version_opt(split.next(), MULTIPART_PART_LEN),
get_version_opt(split.next(), MULTIPART_PART_LEN),
split.next(),
);
is_checkpoint = checkpoint_index.is_some()
&& checkpoint_max.is_some()
&& ext == Some("parquet");
}
}
if !is_commit && !is_checkpoint {
if let Some((maybe_compacted_version, suffix)) = suffix.split_once('.') {
if suffix == "json" {
compacted_to_version =
get_version_opt(Some(maybe_compacted_version), VERSION_LEN);
is_commit = compacted_to_version.is_some()
}
}
}
}
LogPath {
url,
filename,
version,
_compacted_to_version: compacted_to_version,
is_commit,
is_checkpoint,
}
}
pub(crate) fn child(&self, path: impl AsRef<str>) -> DeltaResult<Url> {
Ok(self.url.join(path.as_ref())?)
}
#[allow(unused)]
pub(crate) fn extension(&self) -> Option<&str> {
self.filename
.and_then(|f| f.rsplit_once('.'))
.and_then(|(_, extension)| {
if extension.is_empty() {
None
} else {
Some(extension)
}
})
}
}
impl<'a> AsRef<Url> for LogPath<'a> {
fn as_ref(&self) -> &Url {
self.url
}
}
impl<'a> AsRef<str> for LogPath<'a> {
fn as_ref(&self) -> &str {
self.url.as_str()
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::*;
fn table_url() -> Url {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
url::Url::from_file_path(path).unwrap()
}
#[test]
fn test_file_patterns() {
let table_url = table_url();
let log_path = LogPath::new(&table_url)
.child("_delta_log/00000000000000000000.json")
.unwrap();
let log_path = LogPath::new(&log_path);
assert_eq!(log_path.filename, Some("00000000000000000000.json"));
assert_eq!(log_path.extension(), Some("json"));
assert!(log_path.is_commit);
assert!(!log_path.is_checkpoint);
assert_eq!(log_path.version, Some(0));
let log_path = log_path.child("00000000000000000005.json").unwrap();
let log_path = LogPath::new(&log_path);
assert_eq!(log_path.version, Some(5));
let log_path = log_path
.child("00000000000000000002.checkpoint.parquet")
.unwrap();
let log_path = LogPath::new(&log_path);
assert_eq!(
"00000000000000000002.checkpoint.parquet",
log_path.filename.unwrap()
);
assert_eq!(log_path.extension(), Some("parquet"));
assert!(!log_path.is_commit);
assert!(log_path.is_checkpoint);
assert_eq!(log_path.version, Some(2));
}
fn test_child_is_multi(log_path: &LogPath<'_>, child: &str, is_checkpoint: bool) {
let path = log_path.child(child).unwrap();
let to_test = LogPath::new(&path);
assert_eq!(to_test.is_checkpoint, is_checkpoint);
}
#[test]
fn test_multipart_parsing() {
let table_url = table_url();
let log_path = LogPath::new(&table_url);
for good_path in [
"_delta_log/00000000000000000001.checkpoint.0000000001.0000000002.parquet",
"_delta_log/00000000000000000021.checkpoint.0000000003.0000000010.parquet",
] {
test_child_is_multi(&log_path, good_path, true);
}
for bad_path in [
"_delta_log/00000000000000000001.checkpoint.00000001.0000000002.parquet",
"_delta_log/00000000000000000001.checkpoint.0000000001.00000002.parquet",
"_delta_log/00000000000000000001.checkpoint.000000000a.0000000002.parquet",
"_delta_log/00000000000000000001.checkpoint.0000000001.000000000x.parquet",
"_delta_log/00000000000000000001.checkpoinx.00000001.0000000002.parquet",
"_delta_log/00000000000000000001.checkpoint.00000001.0000000002.json",
] {
test_child_is_multi(&log_path, bad_path, false);
}
}
#[test]
fn test_compaction_files() {
let table_url = table_url();
let log_path = LogPath::new(&table_url)
.child("_delta_log/00000000000000000000.00000000000000000004.json")
.unwrap();
let log_path = LogPath::new(&log_path);
assert!(log_path.is_commit);
assert_eq!(log_path.version, Some(0));
assert_eq!(log_path._compacted_to_version, Some(4));
assert_eq!(log_path.extension(), Some("json"));
let log_path_bad = LogPath::new(&table_url)
.child("_delta_log/00000000000000000000.0000000000000000000a.json")
.unwrap();
let log_path_bad = LogPath::new(&log_path_bad);
assert!(!log_path_bad.is_commit);
}
}