#[allow(clippy::disallowed_types)] use std::collections::HashMap;
use std::fmt;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct CheckpointId(Uuid);
impl CheckpointId {
#[must_use]
pub fn now() -> Self {
Self(Uuid::now_v7())
}
#[must_use]
pub const fn from_uuid(uuid: Uuid) -> Self {
Self(uuid)
}
#[must_use]
pub const fn as_uuid(&self) -> Uuid {
self.0
}
#[must_use]
pub fn to_string_id(&self) -> String {
self.0.to_string()
}
}
impl fmt::Display for CheckpointId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct CheckpointPaths {
pub(crate) base_prefix: String,
}
impl CheckpointPaths {
#[must_use]
pub fn new(base_prefix: &str) -> Self {
let base_prefix = if base_prefix.ends_with('/') {
base_prefix.to_string()
} else {
format!("{base_prefix}/")
};
Self { base_prefix }
}
#[must_use]
pub fn latest_pointer(&self) -> String {
format!("{}_{}", self.base_prefix, "latest")
}
#[must_use]
pub fn checkpoint_dir(&self, id: &CheckpointId) -> String {
format!("{}{}/", self.base_prefix, id)
}
#[must_use]
pub fn manifest(&self, id: &CheckpointId) -> String {
format!("{}{}manifest.json", self.base_prefix, id)
}
#[must_use]
pub fn snapshot(&self, id: &CheckpointId, operator: &str, partition: u32) -> String {
format!(
"{}{}operators/{}/partition-{partition}.snap",
self.base_prefix, id, operator
)
}
#[must_use]
pub fn delta(&self, id: &CheckpointId, operator: &str, partition: u32) -> String {
format!(
"{}{}operators/{}/partition-{partition}.delta",
self.base_prefix, id, operator
)
}
#[must_use]
pub fn source_offset(&self, id: &CheckpointId, source_name: &str) -> String {
format!("{}{}offsets/{source_name}.json", self.base_prefix, id)
}
}
impl Default for CheckpointPaths {
fn default() -> Self {
Self::new("checkpoints/")
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CheckpointManifestV2 {
pub version: u32,
pub checkpoint_id: CheckpointId,
pub epoch: u64,
pub timestamp_ms: u64,
#[serde(default)]
pub operators: HashMap<String, OperatorSnapshotEntry>,
#[serde(default)]
pub source_offsets: HashMap<String, SourceOffsetEntry>,
#[serde(default)]
pub parent_id: Option<CheckpointId>,
#[serde(default)]
pub watermark: Option<i64>,
#[serde(default)]
pub total_size_bytes: u64,
}
impl CheckpointManifestV2 {
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn new(checkpoint_id: CheckpointId, epoch: u64) -> Self {
let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
version: 2,
checkpoint_id,
epoch,
timestamp_ms,
operators: HashMap::new(),
source_offsets: HashMap::new(),
parent_id: None,
watermark: None,
total_size_bytes: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OperatorSnapshotEntry {
pub partitions: Vec<PartitionSnapshotEntry>,
#[serde(default)]
pub total_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PartitionSnapshotEntry {
pub partition_id: u32,
pub is_delta: bool,
pub path: String,
pub size_bytes: u64,
#[serde(default)]
pub sha256: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SourceOffsetEntry {
pub source_type: String,
pub offsets: HashMap<String, String>,
pub epoch: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_checkpoint_id_time_sortable() {
let id1 = CheckpointId::now();
std::thread::sleep(std::time::Duration::from_millis(2));
let id2 = CheckpointId::now();
assert!(id1 < id2, "UUID v7 should be time-sortable");
assert!(id1.to_string_id() < id2.to_string_id());
}
#[test]
fn test_checkpoint_id_display() {
let id = CheckpointId::now();
let s = id.to_string();
assert_eq!(s.len(), 36);
assert_eq!(s.chars().filter(|c| *c == '-').count(), 4);
}
#[test]
fn test_checkpoint_paths() {
let paths = CheckpointPaths::new("s3://my-bucket/checkpoints");
let id = CheckpointId::now();
let latest = paths.latest_pointer();
assert!(latest.ends_with("_latest"));
let manifest = paths.manifest(&id);
assert!(manifest.ends_with("manifest.json"));
assert!(manifest.contains(&id.to_string()));
let snap = paths.snapshot(&id, "window-agg", 3);
assert!(snap.contains("operators/window-agg/"));
assert!(snap.ends_with("partition-3.snap"));
let delta = paths.delta(&id, "window-agg", 3);
assert!(delta.ends_with("partition-3.delta"));
let offset = paths.source_offset(&id, "kafka-trades");
assert!(offset.ends_with("kafka-trades.json"));
}
#[test]
fn test_checkpoint_paths_trailing_slash() {
let paths1 = CheckpointPaths::new("prefix/");
let paths2 = CheckpointPaths::new("prefix");
let id = CheckpointId::now();
assert_eq!(paths1.manifest(&id), paths2.manifest(&id));
}
#[test]
fn test_manifest_v2_json_round_trip() {
let id = CheckpointId::now();
let mut manifest = CheckpointManifestV2::new(id, 10);
manifest.watermark = Some(5000);
manifest.parent_id = Some(CheckpointId::now());
manifest.operators.insert(
"window-agg".into(),
OperatorSnapshotEntry {
partitions: vec![
PartitionSnapshotEntry {
partition_id: 0,
is_delta: false,
path: "operators/window-agg/partition-0.snap".into(),
size_bytes: 1024,
sha256: Some("abcd1234".into()),
},
PartitionSnapshotEntry {
partition_id: 1,
is_delta: true,
path: "operators/window-agg/partition-1.delta".into(),
size_bytes: 256,
sha256: None,
},
],
total_bytes: 1280,
},
);
manifest.source_offsets.insert(
"kafka-trades".into(),
SourceOffsetEntry {
source_type: "kafka".into(),
offsets: HashMap::from([
("partition-0".into(), "1234".into()),
("partition-1".into(), "5678".into()),
]),
epoch: 10,
},
);
let json = serde_json::to_string_pretty(&manifest).unwrap();
let restored: CheckpointManifestV2 = serde_json::from_str(&json).unwrap();
assert_eq!(restored.version, 2);
assert_eq!(restored.checkpoint_id, id);
assert_eq!(restored.epoch, 10);
assert_eq!(restored.watermark, Some(5000));
assert!(restored.parent_id.is_some());
let op = restored.operators.get("window-agg").unwrap();
assert_eq!(op.partitions.len(), 2);
assert_eq!(op.total_bytes, 1280);
let src = restored.source_offsets.get("kafka-trades").unwrap();
assert_eq!(src.source_type, "kafka");
assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
}
#[test]
fn test_manifest_v2_backward_compat_missing_fields() {
let id = CheckpointId::now();
let json = format!(
r#"{{
"version": 2,
"checkpoint_id": "{id}",
"epoch": 1,
"timestamp_ms": 1000
}}"#
);
let manifest: CheckpointManifestV2 = serde_json::from_str(&json).unwrap();
assert_eq!(manifest.version, 2);
assert!(manifest.operators.is_empty());
assert!(manifest.source_offsets.is_empty());
assert!(manifest.parent_id.is_none());
assert!(manifest.watermark.is_none());
}
}