#[allow(clippy::disallowed_types)] use std::collections::HashMap;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum SinkCommitStatus {
Pending,
Committed,
Failed(String),
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct CheckpointManifest {
pub version: u32,
pub checkpoint_id: u64,
pub epoch: u64,
pub timestamp_ms: u64,
#[serde(default)]
pub source_offsets: HashMap<String, ConnectorCheckpoint>,
#[serde(default)]
pub sink_epochs: HashMap<String, u64>,
#[serde(default)]
pub sink_commit_statuses: HashMap<String, SinkCommitStatus>,
#[serde(default)]
pub table_offsets: HashMap<String, ConnectorCheckpoint>,
#[serde(default)]
pub operator_states: HashMap<String, OperatorCheckpoint>,
#[serde(default)]
pub table_store_checkpoint_path: Option<String>,
#[serde(default)]
pub watermark: Option<i64>,
#[serde(default)]
pub source_watermarks: HashMap<String, i64>,
#[serde(default)]
pub source_names: Vec<String>,
#[serde(default)]
pub sink_names: Vec<String>,
#[serde(default)]
pub pipeline_hash: Option<u64>,
#[serde(default)]
pub inflight_data: HashMap<String, Vec<InFlightRecord>>,
#[serde(default)]
pub assignment_version: u64,
#[serde(default)]
pub vnode_map: HashMap<u16, u64>,
#[serde(default)]
pub vnode_count: u16,
#[serde(default)]
pub size_bytes: u64,
#[serde(default)]
pub is_incremental: bool,
#[serde(default)]
pub parent_id: Option<u64>,
#[serde(default)]
pub state_checksum: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ManifestValidationError {
pub message: String,
}
impl std::fmt::Display for ManifestValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl CheckpointManifest {
#[must_use]
pub fn validate(&self) -> Vec<ManifestValidationError> {
let mut errors = Vec::new();
if self.version == 0 {
errors.push(ManifestValidationError {
message: "manifest version is 0".into(),
});
}
if self.checkpoint_id == 0 {
errors.push(ManifestValidationError {
message: "checkpoint_id is 0".into(),
});
}
if self.epoch == 0 {
errors.push(ManifestValidationError {
message: "epoch is 0".into(),
});
}
if self.timestamp_ms == 0 {
errors.push(ManifestValidationError {
message: "timestamp_ms is 0 (missing creation time)".into(),
});
}
for sink_name in self.sink_epochs.keys() {
if !self.sink_commit_statuses.is_empty()
&& !self.sink_commit_statuses.contains_key(sink_name)
{
errors.push(ManifestValidationError {
message: format!("sink '{sink_name}' has epoch but no commit status"),
});
}
}
if !self.source_names.is_empty() {
for name in self.source_offsets.keys() {
if !self.source_names.contains(name) {
errors.push(ManifestValidationError {
message: format!("source_offsets contains '{name}' not in source_names"),
});
}
}
}
if self.is_incremental && self.parent_id.is_none() {
errors.push(ManifestValidationError {
message: "incremental checkpoint has no parent_id".into(),
});
}
if self.vnode_count > 0 && !self.vnode_map.is_empty() {
for &vnode_id in self.vnode_map.keys() {
if vnode_id >= self.vnode_count {
errors.push(ManifestValidationError {
message: format!(
"vnode_map contains vnode_id {vnode_id} >= vnode_count {}",
self.vnode_count
),
});
}
}
}
if self.vnode_count == 0 {
errors.push(ManifestValidationError {
message: "vnode_count is 0 (missing or legacy checkpoint)".into(),
});
} else if self.vnode_count != laminar_core::state::VNODE_COUNT {
errors.push(ManifestValidationError {
message: format!(
"vnode_count mismatch: checkpoint has {}, runtime expects {}",
self.vnode_count,
laminar_core::state::VNODE_COUNT
),
});
}
errors
}
#[must_use]
pub fn new(checkpoint_id: u64, epoch: u64) -> Self {
#[allow(clippy::cast_possible_truncation)] let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
version: 1,
checkpoint_id,
epoch,
timestamp_ms,
source_offsets: HashMap::new(),
sink_epochs: HashMap::new(),
sink_commit_statuses: HashMap::new(),
table_offsets: HashMap::new(),
operator_states: HashMap::new(),
table_store_checkpoint_path: None,
watermark: None,
source_watermarks: HashMap::new(),
source_names: Vec::new(),
sink_names: Vec::new(),
pipeline_hash: None,
inflight_data: HashMap::new(),
assignment_version: 0,
vnode_map: HashMap::new(),
vnode_count: laminar_core::state::VNODE_COUNT,
size_bytes: 0,
is_incremental: false,
parent_id: None,
state_checksum: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct ConnectorCheckpoint {
pub offsets: HashMap<String, String>,
pub epoch: u64,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
impl ConnectorCheckpoint {
#[must_use]
pub fn new(epoch: u64) -> Self {
Self {
offsets: HashMap::new(),
epoch,
metadata: HashMap::new(),
}
}
#[must_use]
pub fn with_offsets(epoch: u64, offsets: HashMap<String, String>) -> Self {
Self {
offsets,
epoch,
metadata: HashMap::new(),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct InFlightRecord {
pub input_id: usize,
pub data_b64: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OperatorCheckpoint {
#[serde(default)]
pub state_b64: Option<String>,
#[serde(default)]
pub external: bool,
#[serde(default)]
pub external_offset: u64,
#[serde(default)]
pub external_length: u64,
}
impl OperatorCheckpoint {
#[must_use]
pub fn inline(data: &[u8]) -> Self {
use base64::Engine;
Self {
state_b64: Some(base64::engine::general_purpose::STANDARD.encode(data)),
external: false,
external_offset: 0,
external_length: 0,
}
}
#[must_use]
pub fn external(offset: u64, length: u64) -> Self {
Self {
state_b64: None,
external: true,
external_offset: offset,
external_length: length,
}
}
#[must_use]
pub fn decode_inline(&self) -> Option<Vec<u8>> {
use base64::Engine;
self.state_b64.as_ref().and_then(|b64| {
match base64::engine::general_purpose::STANDARD.decode(b64) {
Ok(data) => Some(data),
Err(e) => {
tracing::warn!(
error = %e,
b64_len = b64.len(),
"[LDB-4004] Failed to decode inline operator state from base64 — \
operator will start from scratch"
);
None
}
}
})
}
pub fn try_decode_inline(&self) -> Result<Option<Vec<u8>>, String> {
use base64::Engine;
match &self.state_b64 {
None => Ok(None),
Some(b64) => base64::engine::general_purpose::STANDARD
.decode(b64)
.map(Some)
.map_err(|e| format!("[LDB-4004] base64 decode failed: {e}")),
}
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn from_bytes(
data: &[u8],
threshold: usize,
current_offset: u64,
) -> (Self, Option<Vec<u8>>) {
if data.len() <= threshold {
(Self::inline(data), None)
} else {
let length = data.len() as u64;
(Self::external(current_offset, length), Some(data.to_vec()))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_manifest_new() {
let m = CheckpointManifest::new(1, 5);
assert_eq!(m.version, 1);
assert_eq!(m.checkpoint_id, 1);
assert_eq!(m.epoch, 5);
assert!(m.timestamp_ms > 0);
assert!(m.source_offsets.is_empty());
assert!(m.sink_epochs.is_empty());
assert!(m.operator_states.is_empty());
assert!(!m.is_incremental);
assert!(m.parent_id.is_none());
}
#[test]
fn test_manifest_json_round_trip() {
let mut m = CheckpointManifest::new(42, 10);
m.source_offsets.insert(
"kafka-src".into(),
ConnectorCheckpoint::with_offsets(
10,
HashMap::from([
("partition-0".into(), "1234".into()),
("partition-1".into(), "5678".into()),
]),
),
);
m.sink_epochs.insert("pg-sink".into(), 9);
m.watermark = Some(999_000);
m.operator_states
.insert("window-agg".into(), OperatorCheckpoint::inline(b"hello"));
let json = serde_json::to_string_pretty(&m).unwrap();
let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
assert_eq!(restored.checkpoint_id, 42);
assert_eq!(restored.epoch, 10);
assert_eq!(restored.watermark, Some(999_000));
let src = restored.source_offsets.get("kafka-src").unwrap();
assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
assert_eq!(restored.sink_epochs.get("pg-sink"), Some(&9));
let op = restored.operator_states.get("window-agg").unwrap();
assert_eq!(op.decode_inline().unwrap(), b"hello");
}
#[test]
fn test_manifest_backward_compat_missing_fields() {
let json = r#"{
"version": 1,
"checkpoint_id": 1,
"epoch": 1,
"timestamp_ms": 1000
}"#;
let m: CheckpointManifest = serde_json::from_str(json).unwrap();
assert_eq!(m.version, 1);
assert!(m.source_offsets.is_empty());
assert!(m.sink_epochs.is_empty());
assert!(m.operator_states.is_empty());
assert!(m.watermark.is_none());
assert!(!m.is_incremental);
}
#[test]
fn test_connector_checkpoint_new() {
let cp = ConnectorCheckpoint::new(5);
assert_eq!(cp.epoch, 5);
assert!(cp.offsets.is_empty());
assert!(cp.metadata.is_empty());
}
#[test]
fn test_connector_checkpoint_with_offsets() {
let offsets = HashMap::from([("lsn".into(), "0/ABCD".into())]);
let cp = ConnectorCheckpoint::with_offsets(3, offsets);
assert_eq!(cp.epoch, 3);
assert_eq!(cp.offsets.get("lsn"), Some(&"0/ABCD".into()));
}
#[test]
fn test_operator_checkpoint_inline() {
let op = OperatorCheckpoint::inline(b"state-data");
assert!(!op.external);
assert!(op.state_b64.is_some());
assert_eq!(op.decode_inline().unwrap(), b"state-data");
}
#[test]
fn test_operator_checkpoint_external() {
let op = OperatorCheckpoint::external(1024, 256);
assert!(op.external);
assert_eq!(op.external_offset, 1024);
assert_eq!(op.external_length, 256);
assert!(op.decode_inline().is_none());
}
#[test]
fn test_operator_checkpoint_empty_inline() {
let op = OperatorCheckpoint::inline(b"");
assert_eq!(op.decode_inline().unwrap(), b"");
}
#[test]
fn test_manifest_with_incremental() {
let mut m = CheckpointManifest::new(5, 5);
m.is_incremental = true;
m.parent_id = Some(4);
let json = serde_json::to_string(&m).unwrap();
let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
assert!(restored.is_incremental);
assert_eq!(restored.parent_id, Some(4));
}
#[test]
fn test_manifest_table_offsets() {
let mut m = CheckpointManifest::new(1, 1);
m.table_offsets.insert(
"instruments".into(),
ConnectorCheckpoint::with_offsets(1, HashMap::from([("lsn".into(), "0/ABCD".into())])),
);
m.table_store_checkpoint_path = Some("/tmp/rocksdb_cp".into());
let json = serde_json::to_string(&m).unwrap();
let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
assert_eq!(restored.table_offsets.len(), 1);
assert_eq!(
restored.table_store_checkpoint_path.as_deref(),
Some("/tmp/rocksdb_cp")
);
}
#[test]
fn test_manifest_topology_fields_round_trip() {
let mut m = CheckpointManifest::new(1, 1);
m.source_names = vec!["kafka-clicks".into(), "ws-prices".into()];
m.sink_names = vec!["pg-sink".into()];
let json = serde_json::to_string(&m).unwrap();
let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
assert_eq!(restored.source_names, vec!["kafka-clicks", "ws-prices"]);
assert_eq!(restored.sink_names, vec!["pg-sink"]);
}
#[test]
fn test_manifest_topology_backward_compat() {
let json = r#"{
"version": 1,
"checkpoint_id": 5,
"epoch": 3,
"timestamp_ms": 1000
}"#;
let m: CheckpointManifest = serde_json::from_str(json).unwrap();
assert!(m.source_names.is_empty());
assert!(m.sink_names.is_empty());
}
#[test]
fn test_validate_orphaned_source_offset() {
let mut m = CheckpointManifest::new(1, 1);
m.source_names = vec!["a".into(), "b".into()];
m.source_offsets
.insert("c".into(), ConnectorCheckpoint::new(1));
let errors = m.validate();
assert!(
errors
.iter()
.any(|e| e.message.contains("'c' not in source_names")),
"expected orphaned source offset error: {errors:?}"
);
}
#[test]
fn test_manifest_pipeline_hash_round_trip() {
let mut m = CheckpointManifest::new(1, 1);
m.pipeline_hash = Some(0xDEAD_BEEF_CAFE_1234);
let json = serde_json::to_string(&m).unwrap();
let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
assert_eq!(restored.pipeline_hash, Some(0xDEAD_BEEF_CAFE_1234));
}
#[test]
fn test_from_bytes_inline() {
let data = b"small-state";
let (op, sidecar) = OperatorCheckpoint::from_bytes(data, 1024, 0);
assert!(!op.external);
assert!(sidecar.is_none());
assert_eq!(op.decode_inline().unwrap(), data);
}
#[test]
fn test_from_bytes_external() {
let data = vec![0xAB; 2048];
let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 1024, 512);
assert!(op.external);
assert_eq!(op.external_offset, 512);
assert_eq!(op.external_length, 2048);
assert!(op.decode_inline().is_none());
assert_eq!(sidecar.unwrap(), data);
}
#[test]
fn test_from_bytes_at_threshold_boundary() {
let data = vec![0xFF; 100];
let (op, sidecar) = OperatorCheckpoint::from_bytes(&data, 100, 0);
assert!(!op.external);
assert!(sidecar.is_none());
assert_eq!(op.decode_inline().unwrap(), data);
let data_over = vec![0xFF; 101];
let (op2, sidecar2) = OperatorCheckpoint::from_bytes(&data_over, 100, 0);
assert!(op2.external);
assert!(sidecar2.is_some());
}
#[test]
fn test_from_bytes_empty_data() {
let (op, sidecar) = OperatorCheckpoint::from_bytes(b"", 1024, 0);
assert!(!op.external);
assert!(sidecar.is_none());
assert_eq!(op.decode_inline().unwrap(), b"");
}
#[test]
fn test_manifest_inflight_round_trip() {
use base64::Engine;
let mut m = CheckpointManifest::new(1, 1);
let record = InFlightRecord {
input_id: 2,
data_b64: base64::engine::general_purpose::STANDARD.encode(b"buffered-event"),
};
m.inflight_data.insert("join-op".into(), vec![record]);
let json = serde_json::to_string_pretty(&m).unwrap();
let restored: CheckpointManifest = serde_json::from_str(&json).unwrap();
assert_eq!(restored.inflight_data.len(), 1);
let records = restored.inflight_data.get("join-op").unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].input_id, 2);
let decoded = base64::engine::general_purpose::STANDARD
.decode(&records[0].data_b64)
.unwrap();
assert_eq!(decoded, b"buffered-event");
}
#[test]
fn test_manifest_inflight_backward_compat() {
let json = r#"{
"version": 1,
"checkpoint_id": 1,
"epoch": 1,
"timestamp_ms": 1000
}"#;
let m: CheckpointManifest = serde_json::from_str(json).unwrap();
assert!(m.inflight_data.is_empty());
}
#[test]
fn test_manifest_pipeline_hash_backward_compat() {
let json = r#"{
"version": 1,
"checkpoint_id": 1,
"epoch": 1,
"timestamp_ms": 1000
}"#;
let m: CheckpointManifest = serde_json::from_str(json).unwrap();
assert!(m.pipeline_hash.is_none());
}
}