Skip to main content

dbx_core/replication/
transport.rs

1//! Transport 추상화 계층 — s2n-quic 기반, 런타임 설정 지원
2//!
3//! ## 런타임으로 Transport 선택
4//! ```rust,no_run
5//! use dbx_core::replication::transport::{ReplicationConfig, TransportMode};
6//! use dbx_core::engine::parallel_engine::DbConfig;
7//!
8//! // InMemory (기본 — 단일 프로세스)
9//! let config = DbConfig::default();  // replication: InMemory
10//!
11//! // QUIC (분산 배포)
12//! let config = DbConfig {
13//!     replication: ReplicationConfig::quic(
14//!         "0.0.0.0:7878",
15//!         "/etc/dbx/cert.pem",
16//!         "/etc/dbx/key.pem",
17//!         3, // cluster_size
18//!     ),
19//!     ..Default::default()
20//! };
21//! ```
22
23use std::time::Duration;
24
25use crate::grid::protocol::GridMessage;
26
27// ─────────────────────────────────────────────────────────────────────────────
28// 런타임 Transport 설정
29// ─────────────────────────────────────────────────────────────────────────────
30
31/// Transport 동작 모드
32#[derive(Debug, Clone, Default)]
33pub enum TransportMode {
34    /// 단일 프로세스 내 인메모리 통신 (기본)
35    #[default]
36    InMemory,
37    /// QUIC 기반 네트워크 통신 (프로세스 간 / 분산 배포)
38    Quic {
39        /// 이 노드가 리스닝할 주소 (예: "0.0.0.0:7878")
40        bind_addr: String,
41        /// TLS 인증서 경로 (.pem)
42        cert_path: String,
43        /// TLS 개인키 경로 (.pem)
44        key_path: String,
45    },
46}
47
48/// 레플리케이션 전체 설정
49///
50/// `DbConfig::replication` 필드로 전달합니다.
51#[derive(Debug, Clone)]
52pub struct ReplicationConfig {
53    /// Transport 모드: `InMemory`(default) 또는 `Quic`
54    pub mode: TransportMode,
55    /// 클러스터 노드 수 (Quorum 계산용, 기본 1)
56    pub cluster_size: usize,
57    /// 레플리케이션 활성화 여부 (기본 false)
58    pub enabled: bool,
59    /// Quorum Write 타임아웃 (기본 5초)
60    ///
61    /// Master가 이 시간 내에 quorum ACK를 받지 못하면 `replicate()` 에러 반환.
62    pub quorum_write_timeout: Duration,
63}
64
65impl Default for ReplicationConfig {
66    fn default() -> Self {
67        Self {
68            mode: TransportMode::InMemory,
69            cluster_size: 1,
70            enabled: false,
71            quorum_write_timeout: Duration::from_secs(5),
72        }
73    }
74}
75
76impl ReplicationConfig {
77    /// InMemory 모드 (단일 부트 개발/테스트용)
78    pub fn in_memory(cluster_size: usize) -> Self {
79        Self {
80            mode: TransportMode::InMemory,
81            cluster_size,
82            enabled: true,
83            quorum_write_timeout: Duration::from_secs(5),
84        }
85    }
86
87    /// QUIC 모드 (실제 분산 배포용)
88    pub fn quic(
89        bind_addr: impl Into<String>,
90        cert_path: impl Into<String>,
91        key_path: impl Into<String>,
92        cluster_size: usize,
93    ) -> Self {
94        Self {
95            mode: TransportMode::Quic {
96                bind_addr: bind_addr.into(),
97                cert_path: cert_path.into(),
98                key_path: key_path.into(),
99            },
100            cluster_size,
101            enabled: true,
102            quorum_write_timeout: Duration::from_secs(5),
103        }
104    }
105
106    /// 현재 모드 이름 (로그용)
107    pub fn mode_name(&self) -> &'static str {
108        match &self.mode {
109            TransportMode::InMemory => "InMemory",
110            TransportMode::Quic { .. } => "QUIC (s2n-quic)",
111        }
112    }
113
114    /// Transport 빌드 (비동기)
115    ///
116    /// - `InMemory` → InMemoryTransport 즉시 반환
117    /// - `Quic`    → s2n-quic 서버 소켓을 열고 QuicTransport 반환
118    pub async fn build_transport_async(
119        &self,
120        broadcast_tx: tokio::sync::broadcast::Sender<GridMessage>,
121    ) -> Result<Box<dyn Transport>, quic::QuicError> {
122        match &self.mode {
123            TransportMode::InMemory => Ok(Box::new(InMemoryTransport::new(broadcast_tx))),
124            TransportMode::Quic {
125                bind_addr,
126                cert_path,
127                key_path,
128            } => {
129                tracing::info!(
130                    "QUIC Transport 초기화: bind={} cert={}",
131                    bind_addr,
132                    cert_path
133                );
134                let (node, handle) = quic::QuicNode::server(
135                    bind_addr,
136                    std::path::Path::new(cert_path),
137                    std::path::Path::new(key_path),
138                )
139                .await?;
140                // 백그라운드 수신 루프 유지
141                tokio::spawn(handle);
142                Ok(Box::new(quic::QuicTransport::new(node)))
143            }
144        }
145    }
146
147    /// 동기 편의 함수 — InMemory 전용
148    ///
149    /// Quic 모드에서는 `build_transport_async()`를 사용하세요.
150    pub fn build_inmemory(
151        broadcast_tx: tokio::sync::broadcast::Sender<GridMessage>,
152    ) -> Box<dyn Transport> {
153        Box::new(InMemoryTransport::new(broadcast_tx))
154    }
155}
156
157// ─────────────────────────────────────────────────────────────────────────────
158// 공통 Transport Trait
159// ─────────────────────────────────────────────────────────────────────────────
160
161/// 노드 간 메시지 전송 추상화 Trait
162///
163/// `InMemoryTransport`(단일 프로세스)와 `QuicTransport`(분산 프로세스) 모두
164/// 이 trait을 구현하므로 상위 코드는 Transport 구현체를 교체해도 수정 불필요.
165pub trait Transport: Send + Sync {
166    /// 특정 노드에게 메시지 전송
167    fn send(&self, target_node_id: u32, msg: GridMessage);
168
169    /// 메시지 수신 (비블로킹, 없으면 None)  
170    fn recv(&self) -> Option<GridMessage>;
171}
172
173// ─────────────────────────────────────────────────────────────────────────────
174// InMemoryTransport — 단일 프로세스용 (현재 MVP)
175// ─────────────────────────────────────────────────────────────────────────────
176
177/// 인메모리 Transport (단일 프로세스 내 노드 간 통신)
178///
179/// 테스트 및 단일 서버 시나리오에서 사용합니다.
180/// `QuicTransport`로 교체 시 이 구조체만 대체하면 됩니다.
181pub struct InMemoryTransport {
182    sender: tokio::sync::broadcast::Sender<GridMessage>,
183    receiver: std::sync::Mutex<tokio::sync::broadcast::Receiver<GridMessage>>,
184}
185
186impl InMemoryTransport {
187    pub fn new(sender: tokio::sync::broadcast::Sender<GridMessage>) -> Self {
188        let receiver = sender.subscribe();
189        Self {
190            sender,
191            receiver: std::sync::Mutex::new(receiver),
192        }
193    }
194}
195
196impl Transport for InMemoryTransport {
197    fn send(&self, _target_node_id: u32, msg: GridMessage) {
198        let _ = self.sender.send(msg);
199    }
200
201    fn recv(&self) -> Option<GridMessage> {
202        self.receiver.lock().unwrap().try_recv().ok()
203    }
204}
205
206// ─────────────────────────────────────────────────────────────────────────────
207// QuicTransport — 프로세스 간 통신 (s2n-quic 기반)
208// ─────────────────────────────────────────────────────────────────────────────
209
210pub mod quic {
211    //! s2n-quic 기반 실제 네트워크 Transport 구현
212    //!
213    //! ## 사용법
214    //! ```rust,ignore
215    //! use dbx_core::replication::transport::quic::QuicNode;
216    //! use std::path::Path;
217    //!
218    //! // 서버 (수신 노드)
219    //! let (server, handle) = QuicNode::server(
220    //!     "0.0.0.0:7878",
221    //!     Path::new("/etc/dbx/cert.pem"),
222    //!     Path::new("/etc/dbx/key.pem"),
223    //! ).await?;
224    //! tokio::spawn(handle);
225    //!
226    //! // 클라이언트 (발신 노드)
227    //! let (client, _handle) = QuicNode::client(
228    //!     "10.0.0.2:7878",
229    //!     Path::new("/etc/dbx/ca.pem"),
230    //! ).await?;
231    //! ```
232
233    use s2n_quic::provider::tls;
234    use s2n_quic::{Client, Server};
235    use std::path::Path;
236    use std::sync::Arc;
237    use tokio::sync::mpsc;
238
239    use crate::grid::protocol::GridMessage;
240
241    /// 메시지 길이 프리픽스 크기 (4바이트 little-endian u32)
242    const LEN_PREFIX_BYTES: usize = 4;
243
244    /// QUIC 노드 에러
245    #[derive(Debug, thiserror::Error)]
246    pub enum QuicError {
247        #[error("QUIC 연결 실패: {0}")]
248        ConnectionError(String),
249        #[error("직렬화 실패: {0}")]
250        SerializeError(String),
251        #[error("IO 오류: {0}")]
252        IoError(#[from] std::io::Error),
253    }
254
255    /// QUIC 기반 노드 (Server/Client 겸용)
256    pub struct QuicNode {
257        /// 수신 채널 (recv_loop에서 디코딩된 메시지가 들어옴)
258        rx: Arc<tokio::sync::Mutex<mpsc::Receiver<GridMessage>>>,
259        /// 발신 채널 (send_msg를 통해 메시지 전송)
260        tx_out: mpsc::Sender<(GridMessage, String)>,
261    }
262
263    impl QuicNode {
264        /// 서버 모드 시작
265        ///
266        /// `bind_addr`: 바인드할 주소, 예: "0.0.0.0:7878"
267        /// `cert_pem` / `key_pem`: TLS 인증서/키 (PEM 형식)
268        pub async fn server(
269            bind_addr: &str,
270            cert_pem: &Path,
271            key_pem: &Path,
272        ) -> Result<(Self, tokio::task::JoinHandle<()>), QuicError> {
273            let (msg_tx, msg_rx) = mpsc::channel::<GridMessage>(256);
274            let (out_tx, _out_rx) = mpsc::channel::<(GridMessage, String)>(256);
275
276            let tls = tls::default::Server::builder()
277                .with_certificate(cert_pem, key_pem)
278                .map_err(|e| QuicError::ConnectionError(e.to_string()))?
279                .build()
280                .map_err(|e| QuicError::ConnectionError(e.to_string()))?;
281
282            let mut server = Server::builder()
283                .with_tls(tls)
284                .map_err(|e| QuicError::ConnectionError(e.to_string()))?
285                .with_io(bind_addr)
286                .map_err(|e| QuicError::ConnectionError(e.to_string()))?
287                .start()
288                .map_err(|e| QuicError::ConnectionError(e.to_string()))?;
289
290            // 백그라운드 수신 루프
291            let handle = tokio::spawn(async move {
292                while let Some(mut conn) = server.accept().await {
293                    let msg_tx = msg_tx.clone();
294                    tokio::spawn(async move {
295                        while let Ok(Some(mut stream)) = conn.accept_bidirectional_stream().await {
296                            // 메시지 수신: [4바이트 길이][메시지 바이트]
297                            use tokio::io::AsyncReadExt;
298                            let mut len_buf = [0u8; LEN_PREFIX_BYTES];
299                            if stream.read_exact(&mut len_buf).await.is_err() {
300                                break;
301                            }
302                            let len = u32::from_le_bytes(len_buf) as usize;
303                            let mut buf = vec![0u8; len];
304                            if stream.read_exact(&mut buf).await.is_err() {
305                                break;
306                            }
307                            if let Ok(msg) = bincode::deserialize::<GridMessage>(&buf) {
308                                let _ = msg_tx.send(msg).await;
309                            }
310                        }
311                    });
312                }
313            });
314
315            Ok((
316                Self {
317                    rx: Arc::new(tokio::sync::Mutex::new(msg_rx)),
318                    tx_out: out_tx,
319                },
320                handle,
321            ))
322        }
323
324        /// 클라이언트 모드 — 상대 노드 주소로 연결
325        ///
326        /// `peer_addr`: 연결할 주소, 예: "10.0.0.2:7878"
327        /// `ca_cert_pem`: 서버 인증서 검증용 CA 인증서
328        pub async fn client(
329            peer_addr: &str,
330            ca_cert_pem: &Path,
331        ) -> Result<(Self, tokio::task::JoinHandle<()>), QuicError> {
332            let (msg_tx, msg_rx) = mpsc::channel::<GridMessage>(256);
333            let (out_tx, mut out_rx) = mpsc::channel::<(GridMessage, String)>(256);
334
335            let tls = tls::default::Client::builder()
336                .with_certificate(ca_cert_pem)
337                .map_err(|e| QuicError::ConnectionError(e.to_string()))?
338                .build()
339                .map_err(|e| QuicError::ConnectionError(e.to_string()))?;
340
341            let client = Client::builder()
342                .with_tls(tls)
343                .map_err(|e| QuicError::ConnectionError(e.to_string()))?
344                .with_io("0.0.0.0:0") // 임의 포트
345                .map_err(|e| QuicError::ConnectionError(e.to_string()))?
346                .start()
347                .map_err(|e| QuicError::ConnectionError(e.to_string()))?;
348
349            let peer = peer_addr.to_string();
350
351            // 백그라운드 발신 루프
352            let handle = tokio::spawn(async move {
353                use tokio::io::AsyncWriteExt;
354                let connect =
355                    s2n_quic::client::Connect::new(peer.parse::<std::net::SocketAddr>().unwrap())
356                        .with_server_name("dbx-node");
357
358                if let Ok(mut conn) = client.connect(connect).await {
359                    conn.keep_alive(true).ok();
360                    while let Some((msg, _addr)) = out_rx.recv().await {
361                        let Ok(bytes) = bincode::serialize(&msg) else {
362                            continue;
363                        };
364                        let len = bytes.len() as u32;
365                        if let Ok(mut stream) = conn.open_bidirectional_stream().await {
366                            let _ = stream.write_all(&len.to_le_bytes()).await;
367                            let _ = stream.write_all(&bytes).await;
368                        }
369                    }
370                }
371                // 연결 끊기면 수신 채널도 닫힘
372                drop(msg_tx);
373            });
374
375            Ok((
376                Self {
377                    rx: Arc::new(tokio::sync::Mutex::new(msg_rx)),
378                    tx_out: out_tx,
379                },
380                handle,
381            ))
382        }
383
384        /// 메시지 발송 (비동기)
385        pub async fn send_msg(&self, msg: GridMessage, peer_addr: String) {
386            let _ = self.tx_out.send((msg, peer_addr)).await;
387        }
388
389        /// 메시지 수신 (비블로킹)
390        pub async fn try_recv(&self) -> Option<GridMessage> {
391            self.rx.lock().await.try_recv().ok()
392        }
393    }
394
395    // ─────────────────────────────────────────────────────────────────────
396    // QuicTransport — Transport trait 구현체 (QuicNode 래퍼)
397    // ─────────────────────────────────────────────────────────────────────
398
399    /// `Transport` trait을 구현하는 QUIC 래퍼
400    ///
401    /// `QuicNode`(비동기)를 동기 `Transport` trait에 연결합니다.
402    /// 내부적으로 현재 tokio 런타임 핸들을 사용하여 비동기 호출을 브릿지합니다.
403    pub struct QuicTransport {
404        node: Arc<QuicNode>,
405        /// 동기 컨텍스트에서 비동기 호출용 런타임 핸들
406        rt: tokio::runtime::Handle,
407        /// 연결할 피어 주소 (클라이언트 측 발신 전용)
408        peer_addr: String,
409    }
410
411    impl QuicTransport {
412        pub fn new(node: QuicNode) -> Self {
413            Self {
414                node: Arc::new(node),
415                rt: tokio::runtime::Handle::current(),
416                peer_addr: String::new(),
417            }
418        }
419
420        pub fn with_peer(node: QuicNode, peer_addr: impl Into<String>) -> Self {
421            Self {
422                node: Arc::new(node),
423                rt: tokio::runtime::Handle::current(),
424                peer_addr: peer_addr.into(),
425            }
426        }
427    }
428
429    impl crate::replication::transport::Transport for QuicTransport {
430        fn send(&self, _target_node_id: u32, msg: GridMessage) {
431            let node = Arc::clone(&self.node);
432            let addr = self.peer_addr.clone();
433            self.rt.spawn(async move {
434                node.send_msg(msg, addr).await;
435            });
436        }
437
438        fn recv(&self) -> Option<GridMessage> {
439            let node = Arc::clone(&self.node);
440            self.rt.block_on(async move { node.try_recv().await })
441        }
442    }
443
444    // ─────────────────────────────────────────────────────────────────────
445    // 개발용 자가서명 인증서 생성 헬퍼
446    // ─────────────────────────────────────────────────────────────────────
447
448    /// 개발 환경용 자가서명 TLS 인증서/키를 임시 디렉토리에 생성
449    ///
450    /// 프로덕션에서는 실제 CA 인증서를 사용하세요.
451    pub fn generate_self_signed_cert(
452        output_dir: &Path,
453    ) -> Result<(std::path::PathBuf, std::path::PathBuf), QuicError> {
454        let cert_path = output_dir.join("cert.pem");
455        let key_path = output_dir.join("key.pem");
456
457        let status = std::process::Command::new("openssl")
458            .args([
459                "req",
460                "-x509",
461                "-newkey",
462                "rsa:2048",
463                "-keyout",
464                key_path.to_str().unwrap(),
465                "-out",
466                cert_path.to_str().unwrap(),
467                "-days",
468                "365",
469                "-nodes",
470                "-subj",
471                "/CN=dbx-node",
472            ])
473            .status()
474            .map_err(QuicError::IoError)?;
475
476        if !status.success() {
477            return Err(QuicError::ConnectionError(
478                "openssl 실행 실패 (openssl이 설치되어 있어야 함)".to_string(),
479            ));
480        }
481
482        Ok((cert_path, key_path))
483    }
484}
485
486// ─────────────────────────────────────────────────────────────────────────────
487// 테스트
488// ─────────────────────────────────────────────────────────────────────────────
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[tokio::test]
495    async fn test_in_memory_transport_roundtrip() {
496        let (tx, _) = tokio::sync::broadcast::channel(16);
497        let t1 = InMemoryTransport::new(tx.clone());
498        let t2 = InMemoryTransport::new(tx.clone());
499
500        let msg = GridMessage::Replication(
501            crate::replication::protocol::ReplicationMessage::Heartbeat {
502                node_id: 1,
503                lsn: 42,
504            },
505        );
506        t1.send(2, msg.clone());
507
508        // t2는 같은 채널을 구독하므로 메시지를 받을 수 있음
509        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
510        let received = t2.recv();
511        assert!(received.is_some());
512        assert_eq!(received.unwrap(), msg);
513    }
514
515    #[test]
516    fn test_transport_trait_object() {
517        // Transport trait object로 사용 가능한지 컴파일 타임 검증
518        let (tx, _) = tokio::sync::broadcast::channel::<GridMessage>(16);
519        let _t: Box<dyn Transport> = Box::new(InMemoryTransport::new(tx));
520    }
521}