Skip to main content

dbx_core/grid/
router.rs

1//! Grid Router
2//!
3//! 네트워크 수신부에서 `GridMessage`를 받아 `Replication`, `Lock`으로 변환해주고,
4//! 송신 시 `ReplicationMessage`를 `GridMessage`로 래핑하여 전송합니다.
5
6use crate::grid::protocol::{GridMessage, LockMessage};
7use crate::replication::protocol::ReplicationMessage;
8use tokio::sync::broadcast;
9
10pub struct GridRouter {
11    /// 외부 네트워크(Transport)와 연결된 메인 그리드 채널
12    pub grid_tx: broadcast::Sender<GridMessage>,
13
14    // Inbound (Network -> Node)
15    pub repl_in_tx: broadcast::Sender<ReplicationMessage>,
16    pub lock_in_tx: broadcast::Sender<LockMessage>,
17
18    // Outbound (Node -> Network)
19    pub repl_out_tx: broadcast::Sender<ReplicationMessage>,
20    pub lock_out_tx: broadcast::Sender<LockMessage>,
21}
22
23impl GridRouter {
24    pub fn new(grid_tx: broadcast::Sender<GridMessage>) -> Self {
25        let (repl_in_tx, _) = broadcast::channel(1024);
26        let (lock_in_tx, _) = broadcast::channel(1024);
27        let (repl_out_tx, _) = broadcast::channel(1024);
28        let (lock_out_tx, _) = broadcast::channel(1024);
29
30        Self {
31            grid_tx,
32            repl_in_tx,
33            lock_in_tx,
34            repl_out_tx,
35            lock_out_tx,
36        }
37    }
38
39    /// 라우터를 백그라운드 태스크로 시작합니다.
40    pub fn start(&self) {
41        // 1. Grid -> Subsystems (Inbound)
42        let mut grid_rx = self.grid_tx.subscribe();
43        let repl_in_tx = self.repl_in_tx.clone();
44        let lock_in_tx = self.lock_in_tx.clone();
45
46        tokio::spawn(async move {
47            while let Ok(msg) = grid_rx.recv().await {
48                match msg {
49                    GridMessage::Replication(r) => {
50                        let _ = repl_in_tx.send(r);
51                    }
52                    GridMessage::Lock(l) => {
53                        let _ = lock_in_tx.send(l);
54                    }
55                }
56            }
57        });
58
59        // 2. Subsystems -> Grid (Outbound Replication)
60        let mut repl_out_rx = self.repl_out_tx.subscribe();
61        let grid_tx_repl = self.grid_tx.clone();
62        tokio::spawn(async move {
63            while let Ok(msg) = repl_out_rx.recv().await {
64                let _ = grid_tx_repl.send(GridMessage::Replication(msg));
65            }
66        });
67
68        // 3. Subsystems -> Grid (Outbound Lock)
69        let mut lock_out_rx = self.lock_out_tx.subscribe();
70        let grid_tx_lock = self.grid_tx.clone();
71        tokio::spawn(async move {
72            while let Ok(msg) = lock_out_rx.recv().await {
73                let _ = grid_tx_lock.send(GridMessage::Lock(msg));
74            }
75        });
76    }
77}