Skip to main content

dbx_core/replication/
master.rs

1//! Replication Master — WAL 변경사항을 Slave로 브로드캐스트
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::sync::broadcast;
6
7use crate::replication::protocol::ReplicationMessage;
8
9/// Replication Master
10///
11/// WAL append 시 브로드캐스트 채널로 메시지를 전송합니다.
12/// 인메모리 `tokio::sync::broadcast` 채널 기반 MVP 구현.
13pub struct ReplicationMaster {
14    /// 브로드캐스트 송신자
15    tx: broadcast::Sender<ReplicationMessage>,
16    /// 현재 LSN (단조 증가)
17    current_lsn: Arc<AtomicU64>,
18}
19
20impl ReplicationMaster {
21    /// 새 Master 생성. capacity는 채널 버퍼 크기.
22    pub fn new(capacity: usize) -> (Self, broadcast::Receiver<ReplicationMessage>) {
23        let (tx, rx) = broadcast::channel(capacity);
24        let master = Self {
25            tx,
26            current_lsn: Arc::new(AtomicU64::new(0)),
27        };
28        (master, rx)
29    }
30
31    /// WAL 데이터를 복제로 전송. LSN을 자동 증가.
32    ///
33    /// `data` — 직렬화된 WAL 레코드 바이트
34    pub fn replicate(&self, data: Vec<u8>) -> u64 {
35        let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
36        let msg = ReplicationMessage::WalEntry {
37            node_id: 0,
38            lsn,
39            timestamp: std::time::SystemTime::now()
40                .duration_since(std::time::UNIX_EPOCH)
41                .unwrap()
42                .as_micros() as u64,
43            data,
44        };
45        // 수신자가 없어도 에러 무시 (Slave가 아직 연결 안 됐을 수 있음)
46        let _ = self.tx.send(msg);
47        lsn
48    }
49
50    /// Heartbeat 전송
51    pub fn heartbeat(&self) {
52        let lsn = self.current_lsn.load(Ordering::SeqCst);
53        let _ = self
54            .tx
55            .send(ReplicationMessage::Heartbeat { node_id: 0, lsn });
56    }
57
58    /// 새 Slave를 위한 구독자 추가
59    pub fn subscribe(&self) -> broadcast::Receiver<ReplicationMessage> {
60        self.tx.subscribe()
61    }
62
63    /// 현재 LSN
64    pub fn current_lsn(&self) -> u64 {
65        self.current_lsn.load(Ordering::SeqCst)
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72
73    #[test]
74    fn test_master_replicate_increments_lsn() {
75        let (master, _rx) = ReplicationMaster::new(16);
76        assert_eq!(master.current_lsn(), 0);
77        let lsn1 = master.replicate(b"data1".to_vec());
78        let lsn2 = master.replicate(b"data2".to_vec());
79        assert_eq!(lsn1, 0);
80        assert_eq!(lsn2, 1);
81        assert_eq!(master.current_lsn(), 2);
82    }
83
84    #[tokio::test]
85    async fn test_slave_receives_wal_entry() {
86        let (master, mut rx) = ReplicationMaster::new(16);
87        master.replicate(b"hello".to_vec());
88
89        let msg = rx.recv().await.unwrap();
90        if let ReplicationMessage::WalEntry { lsn, data, .. } = msg {
91            assert_eq!(lsn, 0);
92            assert_eq!(data, b"hello");
93        } else {
94            panic!("WalEntry 메시지 기대");
95        }
96    }
97
98    #[tokio::test]
99    async fn test_multiple_subscribers() {
100        let (master, mut rx1) = ReplicationMaster::new(16);
101        let mut rx2 = master.subscribe();
102
103        master.replicate(b"broadcast".to_vec());
104
105        let msg1 = rx1.recv().await.unwrap();
106        let msg2 = rx2.recv().await.unwrap();
107        assert_eq!(msg1.lsn(), msg2.lsn());
108    }
109}