Skip to main content

feldera_types/
checkpoint.rs

1use std::time::Duration;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use utoipa::ToSchema;
6use uuid::Uuid;
7
8use crate::suspend::TemporarySuspendError;
9
10/// Checkpoint status returned by the `/checkpoint_status` endpoint.
11#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
12pub struct CheckpointStatus {
13    /// Most recently successful checkpoint.
14    pub success: Option<u64>,
15
16    /// Most recently failed checkpoint, and the associated error.
17    ///
18    /// This tracks transient checkpoint failures (e.g. I/O errors during
19    /// writing).  A subsequent successful checkpoint will not clear this
20    /// field — it always reflects the *last* failure that occurred.
21    pub failure: Option<CheckpointFailure>,
22}
23
24/// Current checkpoint activity state.
25#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
26#[serde(tag = "status", rename_all = "snake_case")]
27pub enum CheckpointActivity {
28    /// No checkpoint is pending or in progress.
29    #[default]
30    Idle,
31
32    /// A checkpoint has been requested but is delayed for temporary reasons
33    /// (e.g. replaying, bootstrapping, transaction in progress, or input
34    /// endpoint barriers that require the coordinator to run steps).
35    Delayed {
36        /// Why the checkpoint cannot proceed yet.
37        reasons: Vec<TemporarySuspendError>,
38        /// When the delay started (serialized as ISO 8601).
39        delayed_since: DateTime<Utc>,
40    },
41
42    /// A checkpoint is currently being written to storage.
43    InProgress {
44        /// When the checkpoint write started (serialized as ISO 8601).
45        started_at: DateTime<Utc>,
46    },
47}
48
49/// Information about a failed checkpoint.
50#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
51pub struct CheckpointFailure {
52    /// Sequence number of the failed checkpoint.
53    pub sequence_number: u64,
54
55    /// Error message associated with the failure.
56    pub error: String,
57
58    /// When the failure occurred (serialized as ISO 8601).
59    pub failed_at: DateTime<Utc>,
60}
61
62/// Response to a checkpoint request.
63#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
64pub struct CheckpointResponse {
65    pub checkpoint_sequence_number: u64,
66}
67
68impl CheckpointResponse {
69    pub fn new(checkpoint_sequence_number: u64) -> Self {
70        Self {
71            checkpoint_sequence_number,
72        }
73    }
74}
75
76/// Response to a sync checkpoint request.
77#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
78pub struct CheckpointSyncResponse {
79    pub checkpoint_uuid: Uuid,
80}
81
82impl CheckpointSyncResponse {
83    pub fn new(checkpoint_uuid: Uuid) -> Self {
84        Self { checkpoint_uuid }
85    }
86}
87
88/// Checkpoint status returned by the `/checkpoint/sync_status` endpoint.
89#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
90pub struct CheckpointSyncStatus {
91    /// Most recently successful checkpoint sync.
92    pub success: Option<Uuid>,
93
94    /// Most recently failed checkpoint sync, and the associated error.
95    pub failure: Option<CheckpointSyncFailure>,
96
97    /// Most recently successful automated periodic checkpoint sync.
98    pub periodic: Option<Uuid>,
99}
100
101/// Information about a failed checkpoint sync.
102#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
103pub struct CheckpointSyncFailure {
104    /// UUID of the failed checkpoint.
105    pub uuid: Uuid,
106
107    /// Error message associated with the failure.
108    pub error: String,
109}
110
111/// Holds meta-data about a checkpoint that was taken for persistent storage
112/// and recovery of a circuit's state.
113#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
114pub struct CheckpointMetadata {
115    /// A unique identifier for the given checkpoint.
116    ///
117    /// This is used to identify the checkpoint in the file-system hierarchy.
118    pub uuid: Uuid,
119    /// An optional name for the checkpoint.
120    pub identifier: Option<String>,
121    /// Fingerprint of the circuit at the time of the checkpoint.
122    pub fingerprint: u64,
123    /// Total size of the checkpoint files in bytes.
124    pub size: Option<u64>,
125    /// Total number of steps made.
126    pub steps: Option<u64>,
127    /// Total number of records processed.
128    pub processed_records: Option<u64>,
129}
130
131/// Format of `pspine-batches-*.dat` in storage.
132///
133/// These files exist to be a simple format for higher-level code and outside
134/// tools to parse.  The spine itself writes them for that purpose, but it does
135/// not read them.
136#[derive(Debug, Serialize, Deserialize)]
137pub struct PSpineBatches {
138    pub files: Vec<String>,
139}
140
141/// Serialized form of `dependencies.json` on disk.
142///
143/// Two formats. New checkpoints write the struct form (`V2`) carrying both
144/// the batch list referenced at the storage root *and* the list of per-operator
145/// state files inside the checkpoint dir. Old checkpoints stored only the
146/// batch-filename array (`V1`); they remain readable so a rolling upgrade
147/// across in-flight checkpoints is safe.
148#[derive(Debug, Deserialize)]
149#[serde(untagged)]
150pub enum CheckpointDependencies {
151    V2 {
152        /// Batch filenames at the storage root (`w*.feldera`) that the
153        /// checkpoint references for GC retention.
154        batches: Vec<String>,
155        /// Per-operator state filenames inside the checkpoint dir
156        /// (e.g. `pspine-*.dat`, `z1-*.dat`, `CHECKPOINT`). Consumed by
157        /// restore-time verification. Defaulted to empty for forward compat.
158        #[serde(default)]
159        state_files: Vec<String>,
160    },
161    /// Legacy form: JSON array of batch filenames at the storage root
162    /// (`w*.feldera`). No state-file manifest.
163    V1(Vec<String>),
164}
165
166impl CheckpointDependencies {
167    /// Batch files the checkpoint references at the storage root
168    /// (`w*.feldera`). Present in both V1 and V2 checkpoints.
169    pub fn batches(&self) -> &[String] {
170        match self {
171            CheckpointDependencies::V2 { batches, .. } => batches,
172            CheckpointDependencies::V1(batches) => batches,
173        }
174    }
175
176    /// Per-operator state files the checkpoint owned at commit time. These
177    /// live inside the checkpoint dir (e.g. `pspine-*.dat`, `z1-*.dat`).
178    /// Empty for V1 checkpoints, which predate the state-file manifest.
179    pub fn state_files(&self) -> &[String] {
180        match self {
181            CheckpointDependencies::V2 { state_files, .. } => state_files,
182            CheckpointDependencies::V1(_) => &[],
183        }
184    }
185}
186
187/// Serialized form written to `dependencies.json`.  Always emits V2.
188#[derive(Debug, Serialize)]
189pub struct CheckpointDependenciesWrite<'a> {
190    pub batches: &'a [String],
191    pub state_files: &'a [String],
192}
193
194#[derive(Debug)]
195pub struct CheckpointSyncMetrics {
196    pub duration: Duration,
197    pub speed: u64,
198    pub bytes: u64,
199}
200
201#[cfg(test)]
202mod tests {
203    use super::{CheckpointDependencies, CheckpointDependenciesWrite};
204
205    /// Legacy bare-array dependencies.json from older checkpoints must still
206    /// parse, yielding an empty state-file list (no manifest verification).
207    #[test]
208    fn deserialize_v1_legacy_array() {
209        let raw = r#"["w0-aaa.feldera", "w1-bbb.feldera"]"#;
210        let deps: CheckpointDependencies = serde_json::from_str(raw).unwrap();
211        assert!(deps.state_files().is_empty());
212        assert_eq!(deps.batches(), &["w0-aaa.feldera", "w1-bbb.feldera"]);
213    }
214
215    /// Current struct form carries both lists.
216    #[test]
217    fn deserialize_v2_struct() {
218        let raw = r#"{
219            "batches": ["w0-aaa.feldera"],
220            "state_files": ["pspine-0-zzz.dat", "CHECKPOINT"]
221        }"#;
222        let deps: CheckpointDependencies = serde_json::from_str(raw).unwrap();
223        assert_eq!(deps.state_files(), &["pspine-0-zzz.dat", "CHECKPOINT"]);
224        assert_eq!(deps.batches(), &["w0-aaa.feldera"]);
225    }
226
227    /// V2 without `state_files` (partial writer, partial migration)
228    /// deserializes with an empty state-file list rather than failing.
229    #[test]
230    fn deserialize_v2_missing_state_files_defaults_to_empty() {
231        let raw = r#"{"batches": ["w0-aaa.feldera"]}"#;
232        let deps: CheckpointDependencies = serde_json::from_str(raw).unwrap();
233        assert!(deps.state_files().is_empty());
234        assert_eq!(deps.batches(), &["w0-aaa.feldera"]);
235    }
236
237    /// Writes emit V2 and round-trip back to the same content.
238    #[test]
239    fn write_v2_round_trips() {
240        let batches = vec!["w0-x.feldera".to_string()];
241        let state_files = vec!["pspine-0-y.dat".to_string()];
242        let json = serde_json::to_string(&CheckpointDependenciesWrite {
243            batches: &batches,
244            state_files: &state_files,
245        })
246        .unwrap();
247        let deps: CheckpointDependencies = serde_json::from_str(&json).unwrap();
248        assert_eq!(deps.state_files(), state_files.as_slice());
249        assert_eq!(deps.batches(), batches.as_slice());
250    }
251}