use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use typed_builder::TypedBuilder;
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum CommitKind {
APPEND,
COMPACT,
OVERWRITE,
ANALYZE,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, TypedBuilder)]
#[serde(rename_all = "camelCase")]
pub struct Snapshot {
version: i32,
id: i64,
schema_id: i64,
base_manifest_list: String,
delta_manifest_list: String,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
changelog_manifest_list: Option<String>,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
index_manifest: Option<String>,
commit_user: String,
commit_identifier: i64,
commit_kind: CommitKind,
time_millis: u64,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
log_offsets: Option<HashMap<i32, i64>>,
#[builder(default = None)]
total_record_count: Option<i64>,
#[builder(default = None)]
delta_record_count: Option<i64>,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
changelog_record_count: Option<i64>,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
watermark: Option<i64>,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
statistics: Option<String>,
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
next_row_id: Option<i64>,
}
impl Snapshot {
#[inline]
pub fn version(&self) -> i32 {
self.version
}
#[inline]
pub fn id(&self) -> i64 {
self.id
}
#[inline]
pub fn schema_id(&self) -> i64 {
self.schema_id
}
#[inline]
pub fn base_manifest_list(&self) -> &str {
&self.base_manifest_list
}
#[inline]
pub fn delta_manifest_list(&self) -> &str {
&self.delta_manifest_list
}
#[inline]
pub fn changelog_manifest_list(&self) -> Option<&str> {
self.changelog_manifest_list.as_deref()
}
#[inline]
pub fn index_manifest(&self) -> Option<&str> {
self.index_manifest.as_deref()
}
#[inline]
pub fn commit_user(&self) -> &str {
&self.commit_user
}
#[inline]
pub fn time_millis(&self) -> u64 {
self.time_millis
}
#[inline]
pub fn commit_identifier(&self) -> i64 {
self.commit_identifier
}
#[inline]
pub fn log_offsets(&self) -> Option<&HashMap<i32, i64>> {
self.log_offsets.as_ref()
}
#[inline]
pub fn total_record_count(&self) -> Option<i64> {
self.total_record_count
}
#[inline]
pub fn delta_record_count(&self) -> Option<i64> {
self.delta_record_count
}
#[inline]
pub fn changelog_record_count(&self) -> Option<i64> {
self.changelog_record_count
}
#[inline]
pub fn watermark(&self) -> Option<i64> {
self.watermark
}
#[inline]
pub fn statistics(&self) -> Option<&str> {
self.statistics.as_deref()
}
#[inline]
pub fn next_row_id(&self) -> Option<i64> {
self.next_row_id
}
#[inline]
pub fn commit_kind(&self) -> &CommitKind {
&self.commit_kind
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json;
use std::env::current_dir;
fn load_fixture(name: &str) -> String {
let path = current_dir()
.unwrap_or_else(|err| panic!("current_dir must exist: {err}"))
.join(format!("tests/fixtures/snapshot/{name}.json"));
let bytes = std::fs::read(&path)
.unwrap_or_else(|err| panic!("fixtures {path:?} load failed: {err}"));
String::from_utf8(bytes).expect("fixtures content must be valid utf8")
}
fn test_cases() -> Vec<(&'static str, Snapshot)> {
vec![
(
"snapshot-v3",
Snapshot::builder()
.version(3)
.id(2)
.schema_id(0)
.base_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
)
.delta_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
)
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
.commit_identifier(9223372036854775807)
.changelog_manifest_list(Some(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2".to_string(),
))
.commit_kind(CommitKind::APPEND)
.time_millis(1724509030368)
.log_offsets(Some(HashMap::default()))
.total_record_count(Some(4))
.delta_record_count(Some(2))
.changelog_record_count(Some(2))
.statistics(Some("statistics_string".to_string()))
.build(),
),
(
"snapshot-v3-none-field",
Snapshot::builder()
.version(3)
.id(2)
.schema_id(0)
.base_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
)
.delta_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
)
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
.commit_identifier(9223372036854775807)
.changelog_manifest_list(None)
.commit_kind(CommitKind::APPEND)
.time_millis(1724509030368)
.log_offsets(Some(HashMap::default()))
.total_record_count(Some(4))
.delta_record_count(Some(2))
.changelog_record_count(Some(2))
.build(),
),
]
}
#[test]
fn test_snapshot_serialization_deserialization() {
for (name, expect) in test_cases() {
let content = load_fixture(name);
let snapshot: Snapshot =
serde_json::from_str(content.as_str()).expect("Failed to deserialize Snapshot");
assert_eq!(snapshot, expect);
let serialized =
serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot");
let deserialized: Snapshot = serde_json::from_str(&serialized)
.expect("Failed to deserialize serialized Snapshot");
assert_eq!(snapshot, deserialized);
}
}
}