pulse_core/
checkpoint.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3
4/// Identifier returned by state backends to refer to a point-in-time snapshot.
5pub type SnapshotId = String;
6
7/// Minimal checkpoint metadata for resuming processing after restart.
8/// Includes a source offset (opaque), current watermark, and the state snapshot id.
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub struct CheckpointMeta {
11    /// Opaque source offset (e.g., file byte offset, Kafka partition@offset, etc.)
12    pub source_offset: String,
13    /// Last committed watermark at the time of checkpoint.
14    pub current_watermark: DateTime<Utc>,
15    /// Snapshot identifier returned by the KeyValueState backend.
16    pub snapshot_id: SnapshotId,
17}
18
19#[cfg(test)]
20mod tests {
21    use super::*;
22
23    #[test]
24    fn checkpoint_meta_roundtrip() {
25        let meta = CheckpointMeta {
26            source_offset: "file://path:12345".to_string(),
27            current_watermark: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
28            snapshot_id: "mem-123".to_string(),
29        };
30        let s = serde_json::to_string(&meta).unwrap();
31        let back: CheckpointMeta = serde_json::from_str(&s).unwrap();
32        assert_eq!(meta, back);
33    }
34}