Skip to main content

allsource_core/infrastructure/replication/
protocol.rs

1//! Replication protocol message types for leader-follower WAL shipping.
2//!
3//! Uses newline-delimited JSON over TCP:
4//! 1. Follower connects and sends `Subscribe { last_offset }`
5//! 2. Leader streams `WalEntry { offset, data }` for each new WAL write
6//! 3. Follower sends `Ack { offset }` to confirm receipt
7
8use crate::infrastructure::persistence::wal::WALEntry;
9use serde::{Deserialize, Serialize};
10
11/// Messages sent from follower to leader.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum FollowerMessage {
15    /// Initial subscription request with the follower's last known WAL offset.
16    Subscribe { last_offset: u64 },
17    /// Acknowledgement that a WAL entry has been received and applied.
18    Ack { offset: u64 },
19}
20
21/// Messages sent from leader to follower.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24#[allow(clippy::large_enum_variant)]
25pub enum LeaderMessage {
26    /// A WAL entry to be replayed by the follower.
27    WalEntry { offset: u64, data: WALEntry },
28    /// Sent after all catch-up entries have been streamed.
29    CaughtUp { current_offset: u64 },
30    /// Signals the start of a Parquet snapshot transfer for catch-up.
31    /// Sent when the follower's last_offset is older than the oldest WAL entry.
32    SnapshotStart {
33        /// List of Parquet file names that will be streamed.
34        parquet_files: Vec<String>,
35    },
36    /// A chunk of a Parquet file being transferred during catch-up.
37    /// File contents are sent as base64-encoded binary chunks.
38    SnapshotChunk {
39        /// The Parquet file name this chunk belongs to.
40        filename: String,
41        /// Base64-encoded binary data for this chunk.
42        data: String,
43        /// Byte offset within the file where this chunk starts.
44        chunk_offset: u64,
45        /// Whether this is the last chunk for this file.
46        is_last: bool,
47    },
48    /// Signals the end of Parquet snapshot transfer.
49    /// After this, the leader resumes normal WAL streaming from the given offset.
50    SnapshotEnd {
51        /// The WAL offset to resume streaming from after loading the snapshot.
52        wal_offset_after_snapshot: u64,
53    },
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59
60    #[test]
61    fn test_follower_subscribe_serialization() {
62        let msg = FollowerMessage::Subscribe { last_offset: 42 };
63        let json = serde_json::to_string(&msg).unwrap();
64        assert!(json.contains("\"type\":\"subscribe\""));
65        assert!(json.contains("\"last_offset\":42"));
66
67        let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
68        match parsed {
69            FollowerMessage::Subscribe { last_offset } => assert_eq!(last_offset, 42),
70            _ => panic!("wrong variant"),
71        }
72    }
73
74    #[test]
75    fn test_follower_ack_serialization() {
76        let msg = FollowerMessage::Ack { offset: 100 };
77        let json = serde_json::to_string(&msg).unwrap();
78        assert!(json.contains("\"type\":\"ack\""));
79
80        let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
81        match parsed {
82            FollowerMessage::Ack { offset } => assert_eq!(offset, 100),
83            _ => panic!("wrong variant"),
84        }
85    }
86
87    #[test]
88    fn test_leader_caught_up_serialization() {
89        let msg = LeaderMessage::CaughtUp {
90            current_offset: 500,
91        };
92        let json = serde_json::to_string(&msg).unwrap();
93        assert!(json.contains("\"type\":\"caught_up\""));
94
95        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
96        match parsed {
97            LeaderMessage::CaughtUp { current_offset } => assert_eq!(current_offset, 500),
98            _ => panic!("wrong variant"),
99        }
100    }
101
102    #[test]
103    fn test_snapshot_start_serialization() {
104        let msg = LeaderMessage::SnapshotStart {
105            parquet_files: vec![
106                "events-001.parquet".to_string(),
107                "events-002.parquet".to_string(),
108            ],
109        };
110        let json = serde_json::to_string(&msg).unwrap();
111        assert!(json.contains("\"type\":\"snapshot_start\""));
112        assert!(json.contains("events-001.parquet"));
113
114        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
115        match parsed {
116            LeaderMessage::SnapshotStart { parquet_files } => {
117                assert_eq!(parquet_files.len(), 2);
118                assert_eq!(parquet_files[0], "events-001.parquet");
119            }
120            _ => panic!("wrong variant"),
121        }
122    }
123
124    #[test]
125    fn test_snapshot_chunk_serialization() {
126        let msg = LeaderMessage::SnapshotChunk {
127            filename: "events-001.parquet".to_string(),
128            data: "AQIDBA==".to_string(), // base64 for [1,2,3,4]
129            chunk_offset: 0,
130            is_last: true,
131        };
132        let json = serde_json::to_string(&msg).unwrap();
133        assert!(json.contains("\"type\":\"snapshot_chunk\""));
134        assert!(json.contains("\"is_last\":true"));
135
136        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
137        match parsed {
138            LeaderMessage::SnapshotChunk {
139                filename,
140                chunk_offset,
141                is_last,
142                ..
143            } => {
144                assert_eq!(filename, "events-001.parquet");
145                assert_eq!(chunk_offset, 0);
146                assert!(is_last);
147            }
148            _ => panic!("wrong variant"),
149        }
150    }
151
152    #[test]
153    fn test_snapshot_end_serialization() {
154        let msg = LeaderMessage::SnapshotEnd {
155            wal_offset_after_snapshot: 1500,
156        };
157        let json = serde_json::to_string(&msg).unwrap();
158        assert!(json.contains("\"type\":\"snapshot_end\""));
159
160        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
161        match parsed {
162            LeaderMessage::SnapshotEnd {
163                wal_offset_after_snapshot,
164            } => assert_eq!(wal_offset_after_snapshot, 1500),
165            _ => panic!("wrong variant"),
166        }
167    }
168}