use std::slice;
use std::str::FromStr;
use delta_kernel_derive::internal_api;
use url::Url;
use uuid::Uuid;
use crate::actions::visitors::InCommitTimestampVisitor;
use crate::engine_data::RowVisitor;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileMeta, Version};
const VERSION_LEN: usize = 20;
const MULTIPART_PART_LEN: usize = 10;
const UUID_PART_LEN: usize = 36;
const DELTA_LOG_DIR: &str = "_delta_log";
const DELTA_LOG_DIR_WITH_SLASH: &str = "_delta_log/";
const STAGED_COMMITS_DIR: &str = "_staged_commits/";
#[derive(Debug, Clone, PartialEq, Eq)]
#[internal_api]
pub(crate) enum LogPathFileType {
Commit,
StagedCommit,
SinglePartCheckpoint,
#[allow(unused)]
UuidCheckpoint,
#[allow(unused)]
MultiPartCheckpoint {
part_num: u32,
num_parts: u32,
},
#[allow(unused)]
CompactedCommit {
hi: Version,
},
Crc,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[internal_api]
pub(crate) struct ParsedLogPath<Location: AsUrl = FileMeta> {
pub location: Location,
#[allow(unused)]
pub filename: String,
#[allow(unused)]
pub extension: String,
pub version: Version,
pub file_type: LogPathFileType,
}
fn parse_path_part<T: FromStr>(value: &str, expect_len: usize) -> Option<T> {
match value.parse() {
Ok(result) if value.len() == expect_len => Some(result),
_ => None,
}
}
#[internal_api]
pub(crate) trait AsUrl {
fn as_url(&self) -> &Url;
}
impl AsUrl for FileMeta {
fn as_url(&self) -> &Url {
&self.location
}
}
impl AsUrl for Url {
fn as_url(&self) -> &Url {
self
}
}
fn path_contains_delta_log_dir(mut path_segments: std::str::Split<'_, char>) -> bool {
path_segments.any(|p| p == DELTA_LOG_DIR)
}
impl<Location: AsUrl> ParsedLogPath<Location> {
#[internal_api]
pub(crate) fn try_from(location: Location) -> DeltaResult<Option<ParsedLogPath<Location>>> {
let url = location.as_url();
let Some(mut path_segments) = url.path_segments() else {
return Ok(None);
};
#[allow(clippy::unwrap_used)]
let filename = path_segments
.next_back()
.unwrap() .to_string();
let subdir = path_segments.next_back();
if filename.is_empty() {
return Ok(None); }
let mut split = filename.split('.');
#[allow(clippy::unwrap_used)]
let version = split.next().unwrap();
let version = match version.parse().ok() {
Some(v) if version.len() == VERSION_LEN => v,
Some(_) => return Ok(None), None => return Ok(None),
};
let split: Vec<_> = split.collect();
let extension = match split.last() {
Some(extension) => extension.to_string(),
None => return Ok(None),
};
let (in_delta_log_dir, in_staged_commits_dir) = if subdir == Some("_staged_commits") {
if path_segments.next_back() == Some(DELTA_LOG_DIR)
&& !path_contains_delta_log_dir(path_segments)
{
(false, true)
} else {
(false, false)
}
} else {
(
subdir == Some(DELTA_LOG_DIR) && !path_contains_delta_log_dir(path_segments),
false,
)
};
let file_type = match split.as_slice() {
["json"] if in_delta_log_dir => LogPathFileType::Commit,
[uuid, "json"] if in_staged_commits_dir => {
match parse_path_part::<String>(uuid, UUID_PART_LEN) {
Some(_uuid) => LogPathFileType::StagedCommit,
None => LogPathFileType::Unknown,
}
}
["crc"] if in_delta_log_dir => LogPathFileType::Crc,
["checkpoint", "parquet"] if in_delta_log_dir => LogPathFileType::SinglePartCheckpoint,
["checkpoint", uuid, "json" | "parquet"] if in_delta_log_dir => {
let Some(_) = parse_path_part::<String>(uuid, UUID_PART_LEN) else {
return Ok(None);
};
LogPathFileType::UuidCheckpoint
}
[hi, "compacted", "json"] if in_delta_log_dir => {
let Some(hi) = parse_path_part(hi, VERSION_LEN) else {
return Ok(None);
};
LogPathFileType::CompactedCommit { hi }
}
["checkpoint", part_num, num_parts, "parquet"] if in_delta_log_dir => {
let Some(part_num) = parse_path_part(part_num, MULTIPART_PART_LEN) else {
return Ok(None);
};
let Some(num_parts) = parse_path_part(num_parts, MULTIPART_PART_LEN) else {
return Ok(None);
};
if !(0 < part_num && part_num <= num_parts) {
return Ok(None);
}
LogPathFileType::MultiPartCheckpoint {
part_num,
num_parts,
}
}
_ => LogPathFileType::Unknown,
};
Ok(Some(ParsedLogPath {
location,
filename,
extension,
version,
file_type,
}))
}
pub(crate) fn parse_commit(location: Location) -> DeltaResult<Self> {
let url = location.as_url().to_string();
let parsed = Self::try_from(location)?.ok_or_else(|| Error::invalid_log_path(&url))?;
require!(
parsed.is_commit(),
Error::generic(format!(
"Expected a commit path, got {} of type {:?}",
url, parsed.file_type
))
);
Ok(parsed)
}
pub(crate) fn should_list(&self) -> bool {
match self.file_type {
LogPathFileType::Commit
| LogPathFileType::SinglePartCheckpoint
| LogPathFileType::UuidCheckpoint
| LogPathFileType::MultiPartCheckpoint { .. }
| LogPathFileType::CompactedCommit { .. }
| LogPathFileType::Crc
| LogPathFileType::Unknown => true,
LogPathFileType::StagedCommit => false,
}
}
#[internal_api]
pub(crate) fn is_commit(&self) -> bool {
matches!(
self.file_type,
LogPathFileType::Commit | LogPathFileType::StagedCommit
)
}
#[internal_api]
pub(crate) fn is_checkpoint(&self) -> bool {
matches!(
self.file_type,
LogPathFileType::SinglePartCheckpoint
| LogPathFileType::MultiPartCheckpoint { .. }
| LogPathFileType::UuidCheckpoint
)
}
#[internal_api]
#[allow(dead_code)] pub(crate) fn is_unknown(&self) -> bool {
matches!(self.file_type, LogPathFileType::Unknown)
}
}
impl ParsedLogPath<FileMeta> {
pub(crate) fn read_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult<i64> {
if !self.is_commit() {
return Err(Error::generic(format!(
"read_in_commit_timestamp can only be called on commit files, got: {:?}",
self.file_type
)));
}
let mut action_iter = engine.json_handler().read_json_files(
slice::from_ref(&self.location),
InCommitTimestampVisitor::schema(),
None,
)?;
match action_iter.next() {
Some(Ok(actions)) => {
let mut visitor = InCommitTimestampVisitor::default();
visitor.visit_rows_of(actions.as_ref())?;
visitor
.in_commit_timestamp
.ok_or_else(|| Error::generic("In-Commit Timestamp not found in commit file"))
}
Some(Err(err)) => Err(err),
None => Err(Error::generic("Commit file contains no actions")),
}
}
}
impl ParsedLogPath<Url> {
fn create_path(table_root: &Url, filename: String) -> DeltaResult<Self> {
let location = table_root.join(DELTA_LOG_DIR_WITH_SLASH)?.join(&filename)?;
Self::try_from(location)?.ok_or_else(|| {
Error::internal_error(format!("Attempted to create an invalid path: {filename}"))
})
}
#[allow(unused)]
pub(crate) fn new_commit(table_root: &Url, version: Version) -> DeltaResult<Self> {
let filename = format!("{version:020}.json");
let path = Self::create_path(table_root, filename)?;
if !path.is_commit() {
return Err(Error::internal_error(
"ParsedLogPath::new_commit created a non-commit path",
));
}
Ok(path)
}
pub(crate) fn new_classic_parquet_checkpoint(
table_root: &Url,
version: Version,
) -> DeltaResult<Self> {
let filename = format!("{version:020}.checkpoint.parquet");
let path = Self::create_path(table_root, filename)?;
if !path.is_checkpoint() {
return Err(Error::internal_error(
"ParsedLogPath::new_classic_parquet_checkpoint created a non-checkpoint path",
));
}
Ok(path)
}
#[allow(dead_code)] pub(crate) fn new_uuid_parquet_checkpoint(
table_root: &Url,
version: Version,
) -> DeltaResult<Self> {
let filename = format!("{:020}.checkpoint.{}.parquet", version, Uuid::new_v4());
let path = Self::create_path(table_root, filename)?;
if !path.is_checkpoint() {
return Err(Error::internal_error(
"ParsedLogPath::new_uuid_parquet_checkpoint created a non-checkpoint path",
));
}
Ok(path)
}
pub(crate) fn new_crc(table_root: &Url, version: Version) -> DeltaResult<Self> {
let filename = format!("{version:020}.crc");
let path = Self::create_path(table_root, filename)?;
if !matches!(path.file_type, LogPathFileType::Crc) {
return Err(Error::internal_error(
"ParsedLogPath::new_crc created a non-CRC path",
));
}
Ok(path)
}
pub(crate) fn new_log_compaction(
table_root: &Url,
start_version: Version,
end_version: Version,
) -> DeltaResult<Self> {
let filename = format!("{start_version:020}.{end_version:020}.compacted.json");
let path = Self::create_path(table_root, filename)?;
if !matches!(path.file_type, LogPathFileType::CompactedCommit { .. }) {
return Err(Error::internal_error(
"ParsedLogPath::new_log_compaction created a non-compaction path",
));
}
Ok(path)
}
}
#[derive(Debug, Clone)]
pub(crate) struct LogRoot {
table_root: Url,
log_root: Url,
}
impl LogRoot {
pub(crate) fn new(mut table_root: Url) -> DeltaResult<Self> {
if !table_root.path().ends_with('/') {
let new_path = format!("{}/", table_root.path());
table_root.set_path(&new_path);
}
let log_root = table_root.join(DELTA_LOG_DIR_WITH_SLASH)?;
Ok(Self {
table_root,
log_root,
})
}
pub(crate) fn table_root(&self) -> &Url {
&self.table_root
}
pub(crate) fn log_root(&self) -> &Url {
&self.log_root
}
pub(crate) fn new_commit_path(&self, version: Version) -> DeltaResult<ParsedLogPath<Url>> {
let filename = format!("{version:020}.json");
let path = self.log_root().join(&filename)?;
ParsedLogPath::try_from(path)?.ok_or_else(|| {
Error::internal_error(format!("Attempted to create an invalid path: {filename}"))
})
}
pub(crate) fn new_staged_commit_path(
&self,
version: Version,
) -> DeltaResult<ParsedLogPath<Url>> {
let uuid = uuid::Uuid::new_v4();
let filename = format!("{version:020}.{uuid}.json");
let path = self.log_root().join(STAGED_COMMITS_DIR)?.join(&filename)?;
ParsedLogPath::try_from(path)?.ok_or_else(|| {
Error::internal_error(format!("Attempted to create an invalid path: {filename}"))
})
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use test_utils::add_commit;
use super::*;
use crate::engine::default::DefaultEngineBuilder;
use crate::engine::sync::SyncEngine;
use crate::object_store::memory::InMemory;
use crate::utils::test_utils::assert_result_error_with_message;
impl ParsedLogPath<FileMeta> {
pub(crate) fn create_parsed_published_commit(table_root: &Url, version: Version) -> Self {
let filename = format!("{version:020}.json");
let location = table_root
.join(DELTA_LOG_DIR_WITH_SLASH)
.unwrap()
.join(&filename)
.unwrap();
let parsed = ParsedLogPath::try_from(FileMeta::new(location, 0, 100))
.unwrap()
.unwrap();
assert!(parsed.file_type == LogPathFileType::Commit);
parsed
}
pub(crate) fn create_parsed_staged_commit(table_root: &Url, version: Version) -> Self {
let uuid = Uuid::new_v4();
let filename = format!("{version:020}.{uuid}.json");
let location = table_root
.join(DELTA_LOG_DIR_WITH_SLASH)
.unwrap()
.join(STAGED_COMMITS_DIR)
.unwrap()
.join(&filename)
.unwrap();
let parsed = ParsedLogPath::try_from(FileMeta::new(location, 0, 100))
.unwrap()
.unwrap();
assert!(parsed.file_type == LogPathFileType::StagedCommit);
parsed
}
pub(crate) fn create_parsed_crc(table_root: &Url, version: Version) -> Self {
let filename = format!("{version:020}.crc");
let location = table_root
.join(DELTA_LOG_DIR_WITH_SLASH)
.unwrap()
.join(&filename)
.unwrap();
let parsed = ParsedLogPath::try_from(FileMeta::new(location, 0, 100))
.unwrap()
.unwrap();
assert!(parsed.file_type == LogPathFileType::Crc);
parsed
}
}
fn table_root_dir_url() -> Url {
let path = PathBuf::from("./tests/data/table-with-dv-small/");
let path = std::fs::canonicalize(path).unwrap();
assert!(path.is_dir());
let url = url::Url::from_directory_path(path).unwrap();
assert!(url.path().ends_with('/'));
url
}
fn table_log_dir_url() -> Url {
let path = PathBuf::from("./tests/data/table-with-dv-small/_delta_log/");
let path = std::fs::canonicalize(path).unwrap();
assert!(path.is_dir());
let url = url::Url::from_directory_path(path).unwrap();
assert!(url.path().ends_with('/'));
url
}
#[test]
fn test_unknown_invalid_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir.join("subdir/").unwrap();
assert!(log_path
.path()
.ends_with("/tests/data/table-with-dv-small/_delta_log/subdir/"));
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("_last_checkpoint").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("00000000000000000010").unwrap();
let result = ParsedLogPath::try_from(log_path);
assert!(
matches!(result, Ok(None)),
"Expected Ok(None) for missing file extension"
);
let log_path = table_log_dir.join("00000000000000000011.").unwrap();
let result = ParsedLogPath::try_from(log_path);
assert!(
matches!(
result,
Ok(Some(ParsedLogPath {
file_type: LogPathFileType::Unknown,
..
}))
),
"Expected Unknown file type, got {result:?}"
);
let log_path = table_log_dir.join("abc.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("000000000000000000010.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("0000000000000000010.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir.join("00000000000000000010.foo").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000010.foo");
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 10);
assert!(matches!(log_path.file_type, LogPathFileType::Unknown));
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000010.a.b.c.foo")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000010.a.b.c.foo");
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 10);
assert!(log_path.is_unknown());
}
#[test]
fn test_commit_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir.join("00000000000000000000.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000000.json");
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 0);
assert!(matches!(log_path.file_type, LogPathFileType::Commit));
assert!(log_path.is_commit());
assert!(!log_path.is_checkpoint());
let log_path = table_log_dir.join("00000000000000000005.json").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.version, 5);
assert!(log_path.is_commit());
}
#[test]
fn test_crc_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir.join("00000000000000000000.crc").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000000.crc");
assert_eq!(log_path.extension, "crc");
assert_eq!(log_path.version, 0);
assert!(matches!(log_path.file_type, LogPathFileType::Crc));
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
let log_path = table_log_dir.join("00000000000000000005.crc").unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.version, 5);
assert!(log_path.file_type == LogPathFileType::Crc);
}
#[test]
fn test_single_part_checkpoint_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000002.checkpoint.parquet");
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 2);
assert!(matches!(
log_path.file_type,
LogPathFileType::SinglePartCheckpoint
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000002.checkpoint.json");
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 2);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
}
#[test]
fn test_uuid_checkpoint_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 2);
assert!(matches!(
log_path.file_type,
LogPathFileType::UuidCheckpoint
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 2);
assert!(matches!(
log_path.file_type,
LogPathFileType::UuidCheckpoint
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo"
);
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 2);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.foo.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000002.checkpoint.foo")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(log_path.filename, "00000000000000000002.checkpoint.foo");
assert_eq!(log_path.extension, "foo");
assert_eq!(log_path.version, 2);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000010.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
}
#[test]
fn test_multi_part_checkpoint_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000000.0000000002.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.checkpoint.0000000000.0000000002.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 8);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000000.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000001.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.checkpoint.0000000001.0000000002.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 8);
assert!(matches!(
log_path.file_type,
LogPathFileType::MultiPartCheckpoint {
part_num: 1,
num_parts: 2
}
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000002.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.checkpoint.0000000002.0000000002.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 8);
assert!(matches!(
log_path.file_type,
LogPathFileType::MultiPartCheckpoint {
part_num: 2,
num_parts: 2
}
));
assert!(!log_path.is_commit());
assert!(log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000003.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.000000001.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000001.000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.00000000x1.0000000002.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.checkpoint.0000000001.00000000x2.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
}
#[test]
fn test_compacted_delta_patterns() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("00000000000000000008.00000000000000000015.compacted.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.00000000000000000015.compacted.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 8);
assert!(matches!(
log_path.file_type,
LogPathFileType::CompactedCommit { hi: 15 },
));
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000008.00000000000000000015.compacted.parquet")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000008.00000000000000000015.compacted.parquet"
);
assert_eq!(log_path.extension, "parquet");
assert_eq!(log_path.version, 8);
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
let log_path = table_log_dir
.join("00000000000000000008.0000000000000000015.compacted.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.000000000000000000015.compacted.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
let log_path = table_log_dir
.join("00000000000000000008.00000000000000000a15.compacted.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap();
assert!(log_path.is_none());
}
#[test]
fn test_new_commit() {
let table_root_dir = table_root_dir_url();
let log_path = ParsedLogPath::new_commit(&table_root_dir, 10).unwrap();
assert_eq!(log_path.version, 10);
assert!(log_path.is_commit());
assert_eq!(log_path.extension, "json");
assert!(matches!(log_path.file_type, LogPathFileType::Commit));
assert_eq!(log_path.filename, "00000000000000000010.json");
}
#[test]
fn test_new_uuid_parquet_checkpoint() {
let table_root_dir = table_root_dir_url();
let log_path = ParsedLogPath::new_uuid_parquet_checkpoint(&table_root_dir, 10).unwrap();
assert_eq!(log_path.version, 10);
assert!(log_path.is_checkpoint());
assert_eq!(log_path.extension, "parquet");
assert!(
matches!(log_path.file_type, LogPathFileType::UuidCheckpoint),
"Expected UuidCheckpoint file type"
);
let filename = log_path.filename.to_string();
let filename_parts: Vec<&str> = filename.split('.').collect();
assert_eq!(filename_parts.len(), 4);
assert_eq!(filename_parts[0], "00000000000000000010");
assert_eq!(filename_parts[1], "checkpoint");
assert_eq!(filename_parts[2].len(), UUID_PART_LEN);
assert_eq!(filename_parts[3], "parquet");
}
#[test]
fn test_new_classic_parquet_checkpoint() {
let table_root_dir = table_root_dir_url();
let log_path = ParsedLogPath::new_classic_parquet_checkpoint(&table_root_dir, 10).unwrap();
assert_eq!(log_path.version, 10);
assert!(log_path.is_checkpoint());
assert_eq!(log_path.extension, "parquet");
assert!(matches!(
log_path.file_type,
LogPathFileType::SinglePartCheckpoint
));
assert_eq!(log_path.filename, "00000000000000000010.checkpoint.parquet");
}
#[test]
fn test_staged_commit_paths() {
let table_log_dir = table_log_dir_url();
let log_path = table_log_dir
.join("_staged_commits/00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 10);
assert!(matches!(log_path.file_type, LogPathFileType::StagedCommit));
assert!(log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(!log_path.is_unknown());
let log_path = table_log_dir
.join("_staged_commits/00000000000000000010.not-a-uuid.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert!(log_path.is_unknown());
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
let log_path = table_log_dir
.join("00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")
.unwrap();
let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap();
assert_eq!(
log_path.filename,
"00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"
);
assert_eq!(log_path.extension, "json");
assert_eq!(log_path.version, 10);
assert!(matches!(log_path.file_type, LogPathFileType::Unknown));
assert!(!log_path.is_commit());
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
}
#[test]
fn test_should_list() {
let mut path = ParsedLogPath {
location: table_log_dir_url(),
filename: "".to_string(),
extension: "".to_string(),
version: 0,
file_type: LogPathFileType::Commit,
};
for (file_type, should_list) in [
(LogPathFileType::Commit, true),
(LogPathFileType::StagedCommit, false),
(LogPathFileType::SinglePartCheckpoint, true),
(LogPathFileType::UuidCheckpoint, true),
(
LogPathFileType::MultiPartCheckpoint {
part_num: 1,
num_parts: 2,
},
true,
),
(LogPathFileType::CompactedCommit { hi: 10 }, true),
(LogPathFileType::Crc, true),
(LogPathFileType::Unknown, true),
] {
path.file_type = file_type;
assert_eq!(
path.should_list(),
should_list,
"file_type: {:?}",
path.file_type
);
}
}
#[tokio::test]
async fn test_read_in_commit_timestamp_success() {
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let table_root = "memory://test/";
let table_url = url::Url::parse(table_root).unwrap();
let commit_content = r#"{"commitInfo":{"timestamp":1000,"inCommitTimestamp":2000},"protocol":{"minReaderVersion":3,"minWriterVersion":7,"writerFeatures":["inCommitTimestamp"]},"metaData":{"id":"test","schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true}]}"}}"#;
add_commit(table_root, store.as_ref(), 0, commit_content.to_string())
.await
.unwrap();
let commit_path = table_url
.join("_delta_log/00000000000000000000.json")
.unwrap();
let parsed_path = ParsedLogPath::try_from(FileMeta {
location: commit_path,
last_modified: 0,
size: commit_content.len() as u64,
})
.unwrap()
.unwrap();
let result = parsed_path.read_in_commit_timestamp(&engine).unwrap();
assert_eq!(result, 2000);
}
#[tokio::test]
async fn test_read_in_commit_timestamp_missing_ict() {
let store = Arc::new(InMemory::new());
let engine = DefaultEngineBuilder::new(store.clone()).build();
let table_root = "memory://test/";
let table_url = url::Url::parse(table_root).unwrap();
let commit_content = r#"{"commitInfo":{"timestamp":1000},"protocol":{"minReaderVersion":3,"minWriterVersion":7},"metaData":{"id":"test","schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true}]}"}}"#;
add_commit(table_root, store.as_ref(), 0, commit_content.to_string())
.await
.unwrap();
let commit_path = table_url
.join("_delta_log/00000000000000000000.json")
.unwrap();
let parsed_path = ParsedLogPath::try_from(FileMeta {
location: commit_path,
last_modified: 0,
size: commit_content.len() as u64,
})
.unwrap()
.unwrap();
let result = parsed_path.read_in_commit_timestamp(&engine);
assert_result_error_with_message(result, "In-Commit Timestamp not found");
}
#[test]
fn test_read_in_commit_timestamp_not_commit_file() {
let engine = SyncEngine::new();
let table_url = url::Url::try_from("file:///tmp/test_table").unwrap();
let checkpoint_path = table_url
.join("_delta_log/00000000000000000000.checkpoint.parquet")
.unwrap();
let parsed_path = ParsedLogPath::try_from(FileMeta {
location: checkpoint_path,
last_modified: 0,
size: 100,
})
.unwrap()
.unwrap();
let result = parsed_path.read_in_commit_timestamp(&engine);
assert_result_error_with_message(
result,
"read_in_commit_timestamp can only be called on commit files",
);
}
}