Skip to main content

allsource_core/infrastructure/replication/
protocol.rs

1//! Replication protocol message types for leader-follower WAL shipping.
2//!
3//! Uses newline-delimited JSON over TCP:
4//! 1. Follower connects and sends `Subscribe { last_offset }`
5//! 2. Leader streams `WalEntry { offset, data }` for each new WAL write
6//! 3. Follower sends `Ack { offset }` to confirm receipt
7
8use crate::infrastructure::persistence::wal::WALEntry;
9use serde::{Deserialize, Serialize};
10
11/// Messages sent from follower to leader.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum FollowerMessage {
15    /// Initial subscription request with the follower's last known WAL offset.
16    Subscribe { last_offset: u64 },
17    /// Acknowledgement that a WAL entry has been received and applied.
18    Ack { offset: u64 },
19}
20
21/// Messages sent from leader to follower.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum LeaderMessage {
25    /// A WAL entry to be replayed by the follower.
26    WalEntry { offset: u64, data: WALEntry },
27    /// Sent after all catch-up entries have been streamed.
28    CaughtUp { current_offset: u64 },
29    /// Signals the start of a Parquet snapshot transfer for catch-up.
30    /// Sent when the follower's last_offset is older than the oldest WAL entry.
31    SnapshotStart {
32        /// List of Parquet file names that will be streamed.
33        parquet_files: Vec<String>,
34    },
35    /// A chunk of a Parquet file being transferred during catch-up.
36    /// File contents are sent as base64-encoded binary chunks.
37    SnapshotChunk {
38        /// The Parquet file name this chunk belongs to.
39        filename: String,
40        /// Base64-encoded binary data for this chunk.
41        data: String,
42        /// Byte offset within the file where this chunk starts.
43        chunk_offset: u64,
44        /// Whether this is the last chunk for this file.
45        is_last: bool,
46    },
47    /// Signals the end of Parquet snapshot transfer.
48    /// After this, the leader resumes normal WAL streaming from the given offset.
49    SnapshotEnd {
50        /// The WAL offset to resume streaming from after loading the snapshot.
51        wal_offset_after_snapshot: u64,
52    },
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58
59    #[test]
60    fn test_follower_subscribe_serialization() {
61        let msg = FollowerMessage::Subscribe { last_offset: 42 };
62        let json = serde_json::to_string(&msg).unwrap();
63        assert!(json.contains("\"type\":\"subscribe\""));
64        assert!(json.contains("\"last_offset\":42"));
65
66        let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
67        match parsed {
68            FollowerMessage::Subscribe { last_offset } => assert_eq!(last_offset, 42),
69            _ => panic!("wrong variant"),
70        }
71    }
72
73    #[test]
74    fn test_follower_ack_serialization() {
75        let msg = FollowerMessage::Ack { offset: 100 };
76        let json = serde_json::to_string(&msg).unwrap();
77        assert!(json.contains("\"type\":\"ack\""));
78
79        let parsed: FollowerMessage = serde_json::from_str(&json).unwrap();
80        match parsed {
81            FollowerMessage::Ack { offset } => assert_eq!(offset, 100),
82            _ => panic!("wrong variant"),
83        }
84    }
85
86    #[test]
87    fn test_leader_caught_up_serialization() {
88        let msg = LeaderMessage::CaughtUp {
89            current_offset: 500,
90        };
91        let json = serde_json::to_string(&msg).unwrap();
92        assert!(json.contains("\"type\":\"caught_up\""));
93
94        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
95        match parsed {
96            LeaderMessage::CaughtUp { current_offset } => assert_eq!(current_offset, 500),
97            _ => panic!("wrong variant"),
98        }
99    }
100
101    #[test]
102    fn test_snapshot_start_serialization() {
103        let msg = LeaderMessage::SnapshotStart {
104            parquet_files: vec![
105                "events-001.parquet".to_string(),
106                "events-002.parquet".to_string(),
107            ],
108        };
109        let json = serde_json::to_string(&msg).unwrap();
110        assert!(json.contains("\"type\":\"snapshot_start\""));
111        assert!(json.contains("events-001.parquet"));
112
113        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
114        match parsed {
115            LeaderMessage::SnapshotStart { parquet_files } => {
116                assert_eq!(parquet_files.len(), 2);
117                assert_eq!(parquet_files[0], "events-001.parquet");
118            }
119            _ => panic!("wrong variant"),
120        }
121    }
122
123    #[test]
124    fn test_snapshot_chunk_serialization() {
125        let msg = LeaderMessage::SnapshotChunk {
126            filename: "events-001.parquet".to_string(),
127            data: "AQIDBA==".to_string(), // base64 for [1,2,3,4]
128            chunk_offset: 0,
129            is_last: true,
130        };
131        let json = serde_json::to_string(&msg).unwrap();
132        assert!(json.contains("\"type\":\"snapshot_chunk\""));
133        assert!(json.contains("\"is_last\":true"));
134
135        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
136        match parsed {
137            LeaderMessage::SnapshotChunk {
138                filename,
139                chunk_offset,
140                is_last,
141                ..
142            } => {
143                assert_eq!(filename, "events-001.parquet");
144                assert_eq!(chunk_offset, 0);
145                assert!(is_last);
146            }
147            _ => panic!("wrong variant"),
148        }
149    }
150
151    #[test]
152    fn test_snapshot_end_serialization() {
153        let msg = LeaderMessage::SnapshotEnd {
154            wal_offset_after_snapshot: 1500,
155        };
156        let json = serde_json::to_string(&msg).unwrap();
157        assert!(json.contains("\"type\":\"snapshot_end\""));
158
159        let parsed: LeaderMessage = serde_json::from_str(&json).unwrap();
160        match parsed {
161            LeaderMessage::SnapshotEnd {
162                wal_offset_after_snapshot,
163            } => assert_eq!(wal_offset_after_snapshot, 1500),
164            _ => panic!("wrong variant"),
165        }
166    }
167}