use crate::infrastructure::persistence::wal::WALEntry;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FollowerMessage {
Subscribe { last_offset: u64 },
Ack { offset: u64 },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[allow(clippy::large_enum_variant)]
pub enum LeaderMessage {
WalEntry { offset: u64, data: WALEntry },
CaughtUp { current_offset: u64 },
SnapshotStart {
parquet_files: Vec<String>,
},
SnapshotChunk {
filename: String,
data: String,
chunk_offset: u64,
is_last: bool,
},
SnapshotEnd {
wal_offset_after_snapshot: u64,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_follower_subscribe_serialization() {
let msg = FollowerMessage::Subscribe { last_offset: 42 };
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"subscribe\""));
assert!(json.contains("\"last_offset\":42"));
let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
match parsed {
FollowerMessage::Subscribe { last_offset } => assert_eq!(last_offset, 42),
FollowerMessage::Ack { .. } => panic!("wrong variant"),
}
}
#[test]
fn test_follower_ack_serialization() {
let msg = FollowerMessage::Ack { offset: 100 };
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"ack\""));
let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
match parsed {
FollowerMessage::Ack { offset } => assert_eq!(offset, 100),
FollowerMessage::Subscribe { .. } => panic!("wrong variant"),
}
}
#[test]
fn test_leader_caught_up_serialization() {
let msg = LeaderMessage::CaughtUp {
current_offset: 500,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"caught_up\""));
let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
match parsed {
LeaderMessage::CaughtUp { current_offset } => assert_eq!(current_offset, 500),
_ => panic!("wrong variant"),
}
}
#[test]
fn test_snapshot_start_serialization() {
let msg = LeaderMessage::SnapshotStart {
parquet_files: vec![
"events-001.parquet".to_string(),
"events-002.parquet".to_string(),
],
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"snapshot_start\""));
assert!(json.contains("events-001.parquet"));
let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
match parsed {
LeaderMessage::SnapshotStart { parquet_files } => {
assert_eq!(parquet_files.len(), 2);
assert_eq!(parquet_files[0], "events-001.parquet");
}
_ => panic!("wrong variant"),
}
}
#[test]
fn test_snapshot_chunk_serialization() {
let msg = LeaderMessage::SnapshotChunk {
filename: "events-001.parquet".to_string(),
data: "AQIDBA==".to_string(), chunk_offset: 0,
is_last: true,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"snapshot_chunk\""));
assert!(json.contains("\"is_last\":true"));
let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
match parsed {
LeaderMessage::SnapshotChunk {
filename,
chunk_offset,
is_last,
..
} => {
assert_eq!(filename, "events-001.parquet");
assert_eq!(chunk_offset, 0);
assert!(is_last);
}
_ => panic!("wrong variant"),
}
}
#[test]
fn test_snapshot_end_serialization() {
let msg = LeaderMessage::SnapshotEnd {
wal_offset_after_snapshot: 1500,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"snapshot_end\""));
let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
match parsed {
LeaderMessage::SnapshotEnd {
wal_offset_after_snapshot,
} => assert_eq!(wal_offset_after_snapshot, 1500),
_ => panic!("wrong variant"),
}
}
}