allsource_core/infrastructure/replication/
protocol.rs1use crate::infrastructure::persistence::wal::WALEntry;
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum FollowerMessage {
15 Subscribe { last_offset: u64 },
17 Ack { offset: u64 },
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum LeaderMessage {
25 WalEntry { offset: u64, data: WALEntry },
27 CaughtUp { current_offset: u64 },
29 SnapshotStart {
32 parquet_files: Vec<String>,
34 },
35 SnapshotChunk {
38 filename: String,
40 data: String,
42 chunk_offset: u64,
44 is_last: bool,
46 },
47 SnapshotEnd {
50 wal_offset_after_snapshot: u64,
52 },
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58
59 #[test]
60 fn test_follower_subscribe_serialization() {
61 let msg = FollowerMessage::Subscribe { last_offset: 42 };
62 let json = serde_json::to_string(&msg).unwrap();
63 assert!(json.contains("\"type\":\"subscribe\""));
64 assert!(json.contains("\"last_offset\":42"));
65
66 let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
67 match parsed {
68 FollowerMessage::Subscribe { last_offset } => assert_eq!(last_offset, 42),
69 _ => panic!("wrong variant"),
70 }
71 }
72
73 #[test]
74 fn test_follower_ack_serialization() {
75 let msg = FollowerMessage::Ack { offset: 100 };
76 let json = serde_json::to_string(&msg).unwrap();
77 assert!(json.contains("\"type\":\"ack\""));
78
79 let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
80 match parsed {
81 FollowerMessage::Ack { offset } => assert_eq!(offset, 100),
82 _ => panic!("wrong variant"),
83 }
84 }
85
86 #[test]
87 fn test_leader_caught_up_serialization() {
88 let msg = LeaderMessage::CaughtUp {
89 current_offset: 500,
90 };
91 let json = serde_json::to_string(&msg).unwrap();
92 assert!(json.contains("\"type\":\"caught_up\""));
93
94 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
95 match parsed {
96 LeaderMessage::CaughtUp { current_offset } => assert_eq!(current_offset, 500),
97 _ => panic!("wrong variant"),
98 }
99 }
100
101 #[test]
102 fn test_snapshot_start_serialization() {
103 let msg = LeaderMessage::SnapshotStart {
104 parquet_files: vec![
105 "events-001.parquet".to_string(),
106 "events-002.parquet".to_string(),
107 ],
108 };
109 let json = serde_json::to_string(&msg).unwrap();
110 assert!(json.contains("\"type\":\"snapshot_start\""));
111 assert!(json.contains("events-001.parquet"));
112
113 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
114 match parsed {
115 LeaderMessage::SnapshotStart { parquet_files } => {
116 assert_eq!(parquet_files.len(), 2);
117 assert_eq!(parquet_files[0], "events-001.parquet");
118 }
119 _ => panic!("wrong variant"),
120 }
121 }
122
123 #[test]
124 fn test_snapshot_chunk_serialization() {
125 let msg = LeaderMessage::SnapshotChunk {
126 filename: "events-001.parquet".to_string(),
127 data: "AQIDBA==".to_string(), chunk_offset: 0,
129 is_last: true,
130 };
131 let json = serde_json::to_string(&msg).unwrap();
132 assert!(json.contains("\"type\":\"snapshot_chunk\""));
133 assert!(json.contains("\"is_last\":true"));
134
135 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
136 match parsed {
137 LeaderMessage::SnapshotChunk {
138 filename,
139 chunk_offset,
140 is_last,
141 ..
142 } => {
143 assert_eq!(filename, "events-001.parquet");
144 assert_eq!(chunk_offset, 0);
145 assert!(is_last);
146 }
147 _ => panic!("wrong variant"),
148 }
149 }
150
151 #[test]
152 fn test_snapshot_end_serialization() {
153 let msg = LeaderMessage::SnapshotEnd {
154 wal_offset_after_snapshot: 1500,
155 };
156 let json = serde_json::to_string(&msg).unwrap();
157 assert!(json.contains("\"type\":\"snapshot_end\""));
158
159 let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
160 match parsed {
161 LeaderMessage::SnapshotEnd {
162 wal_offset_after_snapshot,
163 } => assert_eq!(wal_offset_after_snapshot, 1500),
164 _ => panic!("wrong variant"),
165 }
166 }
167}