use std::collections::HashMap;
use std::sync::Arc;
use serde_json::json;
use url::Url;
use crate::actions::{CommitInfo, Format, Metadata, Protocol};
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::{DefaultEngine, DefaultEngineBuilder};
use crate::object_store::memory::InMemory;
use crate::object_store::ObjectStoreExt as _;
use crate::Snapshot;
use test_utils::{assert_result_error_with_message, delta_path_for_version};
const SCHEMA_STRING: &str = r#"{"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"val","type":"string","nullable":true,"metadata":{}}]}"#;
fn protocol_v2() -> Protocol {
Protocol::try_new_modern(["v2Checkpoint"], ["v2Checkpoint"]).unwrap()
}
fn protocol_v2_dv() -> Protocol {
Protocol::try_new_modern(
["v2Checkpoint", "deletionVectors"],
["v2Checkpoint", "deletionVectors"],
)
.unwrap()
}
fn protocol_v2_dv_ntz() -> Protocol {
Protocol::try_new_modern(
["v2Checkpoint", "deletionVectors", "timestampNtz"],
["v2Checkpoint", "deletionVectors", "timestampNtz"],
)
.unwrap()
}
fn protocol_v2_ict() -> Protocol {
Protocol::try_new(
3,
7,
Some(["v2Checkpoint"]),
Some(["v2Checkpoint", "inCommitTimestamp"]),
)
.unwrap()
}
fn metadata_a() -> Metadata {
Metadata::new_unchecked(
"aaa",
None,
None,
Format::default(),
SCHEMA_STRING,
vec![],
Some(1587968585495),
HashMap::new(),
)
}
fn metadata_b() -> Metadata {
Metadata::new_unchecked(
"bbb",
None,
None,
Format::default(),
SCHEMA_STRING,
vec![],
Some(1587968585495),
HashMap::new(),
)
}
fn metadata_ict() -> Metadata {
Metadata::new_unchecked(
"5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
None,
None,
Format::default(),
SCHEMA_STRING,
vec![],
Some(1587968585495),
HashMap::from([(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
)]),
)
}
fn commit_info() -> CommitInfo {
CommitInfo {
timestamp: Some(1587968586154),
operation: Some("WRITE".to_string()),
..Default::default()
}
}
const V2_CKPT_SUFFIX: &str = "checkpoint.00000000-0000-0000-0000-000000000000.json";
fn protocol_action_json(protocol: &Protocol) -> serde_json::Value {
json!({"protocol": serde_json::to_value(protocol).unwrap()})
}
fn metadata_action_json(metadata: &Metadata) -> serde_json::Value {
json!({"metaData": serde_json::to_value(metadata).unwrap()})
}
fn commit_info_json() -> serde_json::Value {
json!({"commitInfo": serde_json::to_value(commit_info()).unwrap()})
}
fn commit_info_json_with_ict(ict: i64) -> serde_json::Value {
json!({"commitInfo": {
"timestamp": 1587968586154i64,
"operation": "WRITE",
"inCommitTimestamp": ict,
}})
}
fn crc_json(protocol: &Protocol, metadata: &Metadata, ict: Option<i64>) -> serde_json::Value {
let mut v = json!({
"tableSizeBytes": 0,
"numFiles": 0,
"numMetadata": 1,
"numProtocol": 1,
"metadata": serde_json::to_value(metadata).unwrap(),
"protocol": serde_json::to_value(protocol).unwrap(),
});
if let Some(ict) = ict {
v["inCommitTimestampOpt"] = json!(ict);
}
v
}
enum Op {
V2Checkpoint {
version: u64,
protocol: Protocol,
metadata: Metadata,
},
Delta(u64),
DeltaWithPM {
version: u64,
protocol: Option<Protocol>,
metadata: Option<Metadata>,
},
DeltaWithIct {
version: u64,
ict: i64,
},
Crc {
version: u64,
protocol: Protocol,
metadata: Metadata,
ict: Option<i64>,
},
CorruptCrc(u64),
}
struct CrcReadTest {
ops: Vec<Op>,
}
impl CrcReadTest {
fn new() -> Self {
Self { ops: vec![] }
}
fn v2_checkpoint(mut self, version: u64, protocol: Protocol, metadata: Metadata) -> Self {
self.ops.push(Op::V2Checkpoint {
version,
protocol,
metadata,
});
self
}
fn delta(mut self, version: u64) -> Self {
self.ops.push(Op::Delta(version));
self
}
fn delta_with_p_m(
mut self,
version: u64,
protocol: impl Into<Option<Protocol>>,
metadata: impl Into<Option<Metadata>>,
) -> Self {
self.ops.push(Op::DeltaWithPM {
version,
protocol: protocol.into(),
metadata: metadata.into(),
});
self
}
fn delta_with_ict(mut self, version: u64, ict: i64) -> Self {
self.ops.push(Op::DeltaWithIct { version, ict });
self
}
fn crc(
mut self,
version: u64,
protocol: Protocol,
metadata: Metadata,
ict: impl Into<Option<i64>>,
) -> Self {
self.ops.push(Op::Crc {
version,
protocol,
metadata,
ict: ict.into(),
});
self
}
fn corrupt_crc(mut self, version: u64) -> Self {
self.ops.push(Op::CorruptCrc(version));
self
}
async fn build(self) -> BuiltCrcTest {
let store = Arc::new(InMemory::new());
let url = Url::parse("memory:///").unwrap();
let engine = DefaultEngineBuilder::new(store.clone()).build();
for op in self.ops {
match op {
Op::V2Checkpoint {
version: v,
ref protocol,
ref metadata,
} => {
let content = format!(
"{}\n{}\n{}",
protocol_action_json(protocol),
metadata_action_json(metadata),
json!({"checkpointMetadata": {"version": 2}})
);
put(&store, v, V2_CKPT_SUFFIX, &content).await;
}
Op::Delta(v) => {
put(&store, v, "json", &commit_info_json().to_string()).await;
}
Op::DeltaWithPM {
version: v,
protocol,
metadata,
} => {
let mut lines = vec![commit_info_json().to_string()];
if let Some(ref p) = protocol {
lines.push(protocol_action_json(p).to_string());
}
if let Some(ref m) = metadata {
lines.push(metadata_action_json(m).to_string());
}
put(&store, v, "json", &lines.join("\n")).await;
}
Op::DeltaWithIct { version: v, ict } => {
put(
&store,
v,
"json",
&commit_info_json_with_ict(ict).to_string(),
)
.await;
}
Op::Crc {
version: v,
ref protocol,
ref metadata,
ict,
} => {
put(
&store,
v,
"crc",
&crc_json(protocol, metadata, ict).to_string(),
)
.await;
}
Op::CorruptCrc(v) => {
put(&store, v, "crc", "CORRUPT_CRC_DATA").await;
}
}
}
BuiltCrcTest { engine, url }
}
}
struct BuiltCrcTest {
engine: DefaultEngine<TokioBackgroundExecutor>,
url: Url,
}
impl BuiltCrcTest {
fn assert_p_m(
&self,
version: impl Into<Option<u64>>,
expected_protocol: &Protocol,
expected_metadata: &Metadata,
) {
let version = version.into();
let mut builder = Snapshot::builder_for(self.url.clone());
if let Some(v) = version {
builder = builder.at_version(v);
}
let snapshot = builder.build(&self.engine).unwrap();
let table_config = snapshot.table_configuration();
let version_label = version.map_or("latest".to_string(), |v| format!("v{v}"));
assert_eq!(
table_config.protocol(),
expected_protocol,
"Protocol mismatch at {version_label}"
);
assert_eq!(
table_config.metadata(),
expected_metadata,
"Metadata mismatch at {version_label}"
);
}
fn assert_ict(&self, version: impl Into<Option<u64>>, expected_ict: Option<i64>) {
let version = version.into();
let mut builder = Snapshot::builder_for(self.url.clone());
if let Some(v) = version {
builder = builder.at_version(v);
}
let snapshot = builder.build(&self.engine).unwrap();
let ict = snapshot.get_in_commit_timestamp(&self.engine).unwrap();
let version_label = version.map_or("latest".to_string(), |v| format!("v{v}"));
assert_eq!(ict, expected_ict, "ICT mismatch at {version_label}");
}
}
async fn put(store: &InMemory, version: u64, suffix: &str, content: &str) {
store
.put(
&delta_path_for_version(version, suffix),
content.as_bytes().to_vec().into(),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_get_p_m_from_delta_no_checkpoint() {
CrcReadTest::new()
.delta_with_p_m(0, protocol_v2(), metadata_a()) .delta(1)
.delta(2)
.build()
.await
.assert_p_m(None, &protocol_v2(), &metadata_a());
}
#[tokio::test]
async fn test_get_p_and_m_from_different_deltas() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta_with_p_m(1, protocol_v2_dv(), None) .delta_with_p_m(2, None, metadata_b()) .build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_b());
}
#[tokio::test]
async fn test_get_p_m_from_checkpoint() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a()) .delta(1)
.delta(2)
.build()
.await
.assert_p_m(None, &protocol_v2(), &metadata_a());
}
#[tokio::test]
async fn test_get_p_m_from_delta_after_checkpoint() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta_with_p_m(1, protocol_v2_dv(), metadata_b()) .delta(2)
.build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_b());
}
#[tokio::test]
async fn test_get_p_m_from_crc_at_target() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.delta(2)
.crc(2, protocol_v2_dv(), metadata_b(), None) .build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_b());
}
#[tokio::test]
async fn test_crc_preferred_over_delta_at_target() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.delta_with_p_m(2, protocol_v2_dv(), metadata_a())
.crc(2, protocol_v2_dv_ntz(), metadata_b(), None) .build()
.await
.assert_p_m(None, &protocol_v2_dv_ntz(), &metadata_b());
}
#[tokio::test]
async fn test_corrupt_crc_at_target_falls_back() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a()) .delta(1)
.delta(2)
.corrupt_crc(2) .build()
.await
.assert_p_m(None, &protocol_v2(), &metadata_a());
}
#[tokio::test]
async fn test_crc_wins_over_checkpoint() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.delta(2)
.v2_checkpoint(2, protocol_v2(), metadata_a())
.crc(2, protocol_v2_dv(), metadata_b(), None) .build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_b());
}
#[tokio::test]
async fn test_checkpoint_on_corrupt_crc() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.delta(2)
.v2_checkpoint(2, protocol_v2(), metadata_a()) .corrupt_crc(2) .build()
.await
.assert_p_m(None, &protocol_v2(), &metadata_a());
}
#[tokio::test]
async fn test_crc_at_earlier_version() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.crc(1, protocol_v2_dv(), metadata_b(), None) .delta(2)
.build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_b());
}
#[tokio::test]
async fn test_get_p_from_newer_delta_over_older_crc() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.crc(1, protocol_v2_dv(), metadata_b(), None) .delta_with_p_m(2, protocol_v2_dv_ntz(), None) .build()
.await
.assert_p_m(None, &protocol_v2_dv_ntz(), &metadata_b());
}
#[tokio::test]
async fn test_get_m_from_newer_delta_over_older_crc() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a())
.delta(1)
.crc(1, protocol_v2_dv(), metadata_b(), None) .delta_with_p_m(2, None, metadata_a()) .build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_a());
}
#[tokio::test]
async fn test_corrupt_crc_at_non_target_version_falls_back() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2(), metadata_a()) .delta(1)
.corrupt_crc(1) .delta(2)
.build()
.await
.assert_p_m(None, &protocol_v2(), &metadata_a());
}
#[tokio::test]
async fn test_crc_before_checkpoint_is_ignored() {
CrcReadTest::new()
.delta_with_p_m(0, protocol_v2(), metadata_a())
.delta(1)
.crc(1, protocol_v2_dv_ntz(), metadata_b(), None)
.v2_checkpoint(2, protocol_v2_dv(), metadata_a()) .delta(3)
.build()
.await
.assert_p_m(None, &protocol_v2_dv(), &metadata_a());
}
#[tokio::test]
async fn test_ict_from_crc_at_snapshot_version() {
CrcReadTest::new()
.v2_checkpoint(0, protocol_v2_ict(), metadata_ict())
.delta_with_ict(1, 2000)
.crc(1, protocol_v2_ict(), metadata_ict(), 1000) .build()
.await
.assert_ict(None, Some(1000));
}
#[tokio::test]
async fn test_ict_errors_when_crc_has_no_ict() {
let setup = CrcReadTest::new()
.v2_checkpoint(0, protocol_v2_ict(), metadata_ict())
.delta_with_ict(1, 2000)
.crc(1, protocol_v2_ict(), metadata_ict(), None)
.build()
.await;
let snapshot = Snapshot::builder_for(setup.url.clone())
.build(&setup.engine)
.unwrap();
let result = snapshot.get_in_commit_timestamp(&setup.engine);
assert_result_error_with_message(
result,
"In-Commit Timestamp not found in CRC file at version 1",
);
}