use serde::{Deserialize, Serialize};
use crate::error::EventStoreError;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SnapshotAnchor {
pub high_watermark: u64,
pub blob_ref: String,
pub content_hash: String,
}
impl SnapshotAnchor {
#[must_use]
pub fn new(
high_watermark: u64,
blob_ref: impl Into<String>,
content_hash: impl Into<String>,
) -> Self {
Self {
high_watermark,
blob_ref: blob_ref.into(),
content_hash: content_hash.into(),
}
}
}
#[must_use]
pub fn compute_snapshot_content_hash(bytes: &[u8]) -> String {
format!("blake3:{}", blake3::hash(bytes).to_hex())
}
pub(crate) fn validate_new_anchor(
anchor: &SnapshotAnchor,
durable_high_watermark: u64,
latest: Option<&SnapshotAnchor>,
) -> Result<(), EventStoreError> {
if anchor.high_watermark > durable_high_watermark {
return Err(EventStoreError::Backend(format!(
"snapshot anchor high_watermark {} exceeds durable high_watermark {}",
anchor.high_watermark, durable_high_watermark,
)));
}
if let Some(latest) = latest
&& anchor.high_watermark < latest.high_watermark
{
return Err(EventStoreError::Backend(format!(
"snapshot anchor high_watermark {} is older than latest anchor {}",
anchor.high_watermark, latest.high_watermark,
)));
}
Ok(())
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
#[rstest]
fn anchor_new_sets_all_fields() {
let anchor = SnapshotAnchor::new(7, "cache://snapshots/run-1/7", "blake3:abc");
assert_eq!(anchor.high_watermark, 7);
assert_eq!(anchor.blob_ref, "cache://snapshots/run-1/7");
assert_eq!(anchor.content_hash, "blake3:abc");
}
#[rstest]
fn compute_snapshot_content_hash_prefixes_blake3_digest() {
assert_eq!(
compute_snapshot_content_hash(b"snapshot"),
format!("blake3:{}", blake3::hash(b"snapshot").to_hex()),
);
}
#[rstest]
fn validate_rejects_anchor_past_durable_watermark() {
let anchor = SnapshotAnchor::new(8, "blob", "hash");
let err = validate_new_anchor(&anchor, 7, None).expect_err("must reject");
match err {
EventStoreError::Backend(msg) => {
assert!(
msg.contains("exceeds durable high_watermark"),
"msg was: {msg}",
);
}
other => panic!("expected Backend, was {other:?}"),
}
}
#[rstest]
fn validate_rejects_anchor_older_than_latest() {
let latest = SnapshotAnchor::new(9, "latest", "hash-latest");
let anchor = SnapshotAnchor::new(8, "older", "hash-older");
let err = validate_new_anchor(&anchor, 10, Some(&latest)).expect_err("must reject");
match err {
EventStoreError::Backend(msg) => {
assert!(msg.contains("older than latest anchor"), "msg was: {msg}");
}
other => panic!("expected Backend, was {other:?}"),
}
}
#[rstest]
fn validate_accepts_equal_or_newer_anchor() {
let latest = SnapshotAnchor::new(9, "latest", "hash-latest");
let same = SnapshotAnchor::new(9, "same", "hash-same");
let newer = SnapshotAnchor::new(10, "newer", "hash-newer");
validate_new_anchor(&same, 10, Some(&latest)).expect("same hwm accepted");
validate_new_anchor(&newer, 10, Some(&latest)).expect("newer hwm accepted");
}
}