use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::suspend::TemporarySuspendError;
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
pub struct CheckpointStatus {
pub success: Option<u64>,
pub failure: Option<CheckpointFailure>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum CheckpointActivity {
#[default]
Idle,
Delayed {
reasons: Vec<TemporarySuspendError>,
delayed_since: DateTime<Utc>,
},
InProgress {
started_at: DateTime<Utc>,
},
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
pub struct CheckpointFailure {
pub sequence_number: u64,
pub error: String,
pub failed_at: DateTime<Utc>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
pub struct CheckpointResponse {
pub checkpoint_sequence_number: u64,
}
impl CheckpointResponse {
pub fn new(checkpoint_sequence_number: u64) -> Self {
Self {
checkpoint_sequence_number,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
pub struct CheckpointSyncResponse {
pub checkpoint_uuid: Uuid,
}
impl CheckpointSyncResponse {
pub fn new(checkpoint_uuid: Uuid) -> Self {
Self { checkpoint_uuid }
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
pub struct CheckpointSyncStatus {
pub success: Option<Uuid>,
pub failure: Option<CheckpointSyncFailure>,
pub periodic: Option<Uuid>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
pub struct CheckpointSyncFailure {
pub uuid: Uuid,
pub error: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
pub struct CheckpointMetadata {
pub uuid: Uuid,
pub identifier: Option<String>,
pub fingerprint: u64,
pub size: Option<u64>,
pub steps: Option<u64>,
pub processed_records: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PSpineBatches {
pub files: Vec<String>,
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum CheckpointDependencies {
V2 {
batches: Vec<String>,
#[serde(default)]
state_files: Vec<String>,
},
V1(Vec<String>),
}
impl CheckpointDependencies {
pub fn batches(&self) -> &[String] {
match self {
CheckpointDependencies::V2 { batches, .. } => batches,
CheckpointDependencies::V1(batches) => batches,
}
}
pub fn state_files(&self) -> &[String] {
match self {
CheckpointDependencies::V2 { state_files, .. } => state_files,
CheckpointDependencies::V1(_) => &[],
}
}
}
#[derive(Debug, Serialize)]
pub struct CheckpointDependenciesWrite<'a> {
pub batches: &'a [String],
pub state_files: &'a [String],
}
#[derive(Debug)]
pub struct CheckpointSyncMetrics {
pub duration: Duration,
pub speed: u64,
pub bytes: u64,
}
#[cfg(test)]
mod tests {
use super::{CheckpointDependencies, CheckpointDependenciesWrite};
#[test]
fn deserialize_v1_legacy_array() {
let raw = r#"["w0-aaa.feldera", "w1-bbb.feldera"]"#;
let deps: CheckpointDependencies = serde_json::from_str(raw).unwrap();
assert!(deps.state_files().is_empty());
assert_eq!(deps.batches(), &["w0-aaa.feldera", "w1-bbb.feldera"]);
}
#[test]
fn deserialize_v2_struct() {
let raw = r#"{
"batches": ["w0-aaa.feldera"],
"state_files": ["pspine-0-zzz.dat", "CHECKPOINT"]
}"#;
let deps: CheckpointDependencies = serde_json::from_str(raw).unwrap();
assert_eq!(deps.state_files(), &["pspine-0-zzz.dat", "CHECKPOINT"]);
assert_eq!(deps.batches(), &["w0-aaa.feldera"]);
}
#[test]
fn deserialize_v2_missing_state_files_defaults_to_empty() {
let raw = r#"{"batches": ["w0-aaa.feldera"]}"#;
let deps: CheckpointDependencies = serde_json::from_str(raw).unwrap();
assert!(deps.state_files().is_empty());
assert_eq!(deps.batches(), &["w0-aaa.feldera"]);
}
#[test]
fn write_v2_round_trips() {
let batches = vec!["w0-x.feldera".to_string()];
let state_files = vec!["pspine-0-y.dat".to_string()];
let json = serde_json::to_string(&CheckpointDependenciesWrite {
batches: &batches,
state_files: &state_files,
})
.unwrap();
let deps: CheckpointDependencies = serde_json::from_str(&json).unwrap();
assert_eq!(deps.state_files(), state_files.as_slice());
assert_eq!(deps.batches(), batches.as_slice());
}
}