use crate::path::ParsedLogPath;
use crate::utils::require;
use crate::{DeltaResult, Error, FileMeta, FileSize};
use url::Url;
#[derive(Debug, Clone, PartialEq)]
pub struct LogPath(ParsedLogPath);
impl From<LogPath> for ParsedLogPath {
fn from(p: LogPath) -> Self {
p.0
}
}
impl LogPath {
pub fn try_new(file_meta: FileMeta) -> DeltaResult<Self> {
let parsed = ParsedLogPath::try_from(file_meta.clone())?
.ok_or_else(|| Error::invalid_log_path(&file_meta.location))?;
require!(
!parsed.is_unknown(),
Error::invalid_log_path(&file_meta.location)
);
Ok(Self(parsed))
}
pub fn staged_commit(
table_root: Url,
filename: &str,
last_modified: i64,
size: FileSize,
) -> DeltaResult<LogPath> {
let commit_path = Self::staged_commit_url(table_root, filename)?;
let file_meta = FileMeta {
location: commit_path,
last_modified,
size,
};
LogPath::try_new(file_meta)
}
pub fn staged_commit_url(table_root: Url, filename: &str) -> DeltaResult<Url> {
if !table_root.path().ends_with('/') {
return Err(Error::invalid_table_location(table_root));
}
table_root
.join("_delta_log/")
.and_then(|url| url.join("_staged_commits/"))
.and_then(|url| url.join(filename))
.map_err(|_| Error::invalid_table_location(table_root))
}
}
#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
#[test]
fn test_staged_commit_path_creation() {
let table_root = Url::from_str("s3://my-bucket/my-table/").unwrap();
let filename = "00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
let last_modified = 1234567890i64;
let size = 1024u64;
let log_path = LogPath::staged_commit(table_root.clone(), filename, last_modified, size)
.expect("Failed to create staged commit log path");
let expected_path =
Url::from_str("s3://my-bucket/my-table/_delta_log/_staged_commits/00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json")
.unwrap();
let expected = FileMeta {
location: expected_path,
last_modified,
size,
};
let path = log_path.0;
assert_eq!(path.location, expected);
}
#[test]
fn test_staged_commit_path_creation_failures() {
let last_modified = 1234567890i64;
let size = 1024u64;
let table_root = Url::from_str("s3://my-bucket/my-table").unwrap();
let filename = "00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
let err =
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
assert!(matches!(err, Error::InvalidTableLocation(_)));
let table_root = Url::from_str("s3://my-bucket/my-table/").unwrap();
let filename = "subdir/00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
let table_root = Url::from_str("s3://my-bucket/my-table/").unwrap();
let filename = "00000000000000000010.not-a-uuid.json";
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
let filename = "000000000000000000aa.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
let filename = "00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.parquet";
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
}
}