dbx_core/replication/
master.rs1use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::sync::broadcast;
6
7use crate::replication::protocol::ReplicationMessage;
8
9pub struct ReplicationMaster {
14 tx: broadcast::Sender<ReplicationMessage>,
16 current_lsn: Arc<AtomicU64>,
18}
19
20impl ReplicationMaster {
21 pub fn new(capacity: usize) -> (Self, broadcast::Receiver<ReplicationMessage>) {
23 let (tx, rx) = broadcast::channel(capacity);
24 let master = Self {
25 tx,
26 current_lsn: Arc::new(AtomicU64::new(0)),
27 };
28 (master, rx)
29 }
30
31 pub fn replicate(&self, data: Vec<u8>) -> u64 {
35 let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
36 let msg = ReplicationMessage::WalEntry {
37 node_id: 0,
38 lsn,
39 timestamp: std::time::SystemTime::now()
40 .duration_since(std::time::UNIX_EPOCH)
41 .unwrap()
42 .as_micros() as u64,
43 data,
44 };
45 let _ = self.tx.send(msg);
47 lsn
48 }
49
50 pub fn heartbeat(&self) {
52 let lsn = self.current_lsn.load(Ordering::SeqCst);
53 let _ = self
54 .tx
55 .send(ReplicationMessage::Heartbeat { node_id: 0, lsn });
56 }
57
58 pub fn subscribe(&self) -> broadcast::Receiver<ReplicationMessage> {
60 self.tx.subscribe()
61 }
62
63 pub fn current_lsn(&self) -> u64 {
65 self.current_lsn.load(Ordering::SeqCst)
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72
73 #[test]
74 fn test_master_replicate_increments_lsn() {
75 let (master, _rx) = ReplicationMaster::new(16);
76 assert_eq!(master.current_lsn(), 0);
77 let lsn1 = master.replicate(b"data1".to_vec());
78 let lsn2 = master.replicate(b"data2".to_vec());
79 assert_eq!(lsn1, 0);
80 assert_eq!(lsn2, 1);
81 assert_eq!(master.current_lsn(), 2);
82 }
83
84 #[tokio::test]
85 async fn test_slave_receives_wal_entry() {
86 let (master, mut rx) = ReplicationMaster::new(16);
87 master.replicate(b"hello".to_vec());
88
89 let msg = rx.recv().await.unwrap();
90 if let ReplicationMessage::WalEntry { lsn, data, .. } = msg {
91 assert_eq!(lsn, 0);
92 assert_eq!(data, b"hello");
93 } else {
94 panic!("WalEntry 메시지 기대");
95 }
96 }
97
98 #[tokio::test]
99 async fn test_multiple_subscribers() {
100 let (master, mut rx1) = ReplicationMaster::new(16);
101 let mut rx2 = master.subscribe();
102
103 master.replicate(b"broadcast".to_vec());
104
105 let msg1 = rx1.recv().await.unwrap();
106 let msg2 = rx2.recv().await.unwrap();
107 assert_eq!(msg1.lsn(), msg2.lsn());
108 }
109}