allsource_core/infrastructure/replication/
protocol.rs1use crate::infrastructure::persistence::wal::WALEntry;
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum FollowerMessage {
15 Subscribe { last_offset: u64 },
17 Ack { offset: u64 },
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24#[allow(clippy::large_enum_variant)]
25pub enum LeaderMessage {
26 WalEntry { offset: u64, data: WALEntry },
28 CaughtUp { current_offset: u64 },
30 SnapshotStart {
33 parquet_files: Vec<String>,
35 },
36 SnapshotChunk {
39 filename: String,
41 data: String,
43 chunk_offset: u64,
45 is_last: bool,
47 },
48 SnapshotEnd {
51 wal_offset_after_snapshot: u64,
53 },
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59
60 #[test]
61 fn test_follower_subscribe_serialization() {
62 let msg = FollowerMessage::Subscribe { last_offset: 42 };
63 let json = serde_json::to_string(&msg).unwrap();
64 assert!(json.contains("\"type\":\"subscribe\""));
65 assert!(json.contains("\"last_offset\":42"));
66
67 let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
68 match parsed {
69 FollowerMessage::Subscribe { last_offset } => assert_eq!(last_offset, 42),
70 _ => panic!("wrong variant"),
71 }
72 }
73
74 #[test]
75 fn test_follower_ack_serialization() {
76 let msg = FollowerMessage::Ack { offset: 100 };
77 let json = serde_json::to_string(&msg).unwrap();
78 assert!(json.contains("\"type\":\"ack\""));
79
80 let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
81 match parsed {
82 FollowerMessage::Ack { offset } => assert_eq!(offset, 100),
83 _ => panic!("wrong variant"),
84 }
85 }
86
87 #[test]
88 fn test_leader_caught_up_serialization() {
89 let msg = LeaderMessage::CaughtUp {
90 current_offset: 500,
91 };
92 let json = serde_json::to_string(&msg).unwrap();
93 assert!(json.contains("\"type\":\"caught_up\""));
94
95 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
96 match parsed {
97 LeaderMessage::CaughtUp { current_offset } => assert_eq!(current_offset, 500),
98 _ => panic!("wrong variant"),
99 }
100 }
101
102 #[test]
103 fn test_snapshot_start_serialization() {
104 let msg = LeaderMessage::SnapshotStart {
105 parquet_files: vec![
106 "events-001.parquet".to_string(),
107 "events-002.parquet".to_string(),
108 ],
109 };
110 let json = serde_json::to_string(&msg).unwrap();
111 assert!(json.contains("\"type\":\"snapshot_start\""));
112 assert!(json.contains("events-001.parquet"));
113
114 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
115 match parsed {
116 LeaderMessage::SnapshotStart { parquet_files } => {
117 assert_eq!(parquet_files.len(), 2);
118 assert_eq!(parquet_files[0], "events-001.parquet");
119 }
120 _ => panic!("wrong variant"),
121 }
122 }
123
124 #[test]
125 fn test_snapshot_chunk_serialization() {
126 let msg = LeaderMessage::SnapshotChunk {
127 filename: "events-001.parquet".to_string(),
128 data: "AQIDBA==".to_string(), chunk_offset: 0,
130 is_last: true,
131 };
132 let json = serde_json::to_string(&msg).unwrap();
133 assert!(json.contains("\"type\":\"snapshot_chunk\""));
134 assert!(json.contains("\"is_last\":true"));
135
136 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
137 match parsed {
138 LeaderMessage::SnapshotChunk {
139 filename,
140 chunk_offset,
141 is_last,
142 ..
143 } => {
144 assert_eq!(filename, "events-001.parquet");
145 assert_eq!(chunk_offset, 0);
146 assert!(is_last);
147 }
148 _ => panic!("wrong variant"),
149 }
150 }
151
152 #[test]
153 fn test_snapshot_end_serialization() {
154 let msg = LeaderMessage::SnapshotEnd {
155 wal_offset_after_snapshot: 1500,
156 };
157 let json = serde_json::to_string(&msg).unwrap();
158 assert!(json.contains("\"type\":\"snapshot_end\""));
159
160 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
161 match parsed {
162 LeaderMessage::SnapshotEnd {
163 wal_offset_after_snapshot,
164 } => assert_eq!(wal_offset_after_snapshot, 1500),
165 _ => panic!("wrong variant"),
166 }
167 }
168}