Skip to main content

dbx_core/grid/
router.rs

1//! Grid Router
2//!
3//! 네트워크 수신부에서 `GridMessage`를 받아 `Replication`, `Lock`으로 변환해주고,
4//! 송신 시 `ReplicationMessage`를 `GridMessage`로 래핑하여 전송합니다.
5
6use crate::grid::protocol::{GridMessage, LockMessage};
7use crate::replication::protocol::ReplicationMessage;
8use tokio::sync::broadcast;
9
10pub struct GridRouter {
11    /// 외부 네트워크(Transport)와 연결된 메인 그리드 채널
12    pub grid_tx: broadcast::Sender<GridMessage>,
13
14    // Inbound (Network -> Node)
15    pub repl_in_tx: broadcast::Sender<ReplicationMessage>,
16    pub lock_in_tx: broadcast::Sender<LockMessage>,
17
18    // Outbound (Node -> Network)
19    pub repl_out_tx: broadcast::Sender<ReplicationMessage>,
20    pub lock_out_tx: broadcast::Sender<LockMessage>,
21    pub storage_out_tx: broadcast::Sender<crate::grid::protocol::StorageMessage>,
22}
23
24impl GridRouter {
25    pub fn new(grid_tx: broadcast::Sender<GridMessage>) -> Self {
26        let (repl_in_tx, _) = broadcast::channel(1024);
27        let (lock_in_tx, _) = broadcast::channel(1024);
28        let (repl_out_tx, _) = broadcast::channel(1024);
29        let (lock_out_tx, _) = broadcast::channel(1024);
30        let (storage_out_tx, _) = broadcast::channel(1024);
31
32        Self {
33            grid_tx,
34            repl_in_tx,
35            lock_in_tx,
36            repl_out_tx,
37            lock_out_tx,
38            storage_out_tx,
39        }
40    }
41
42    /// 라우터를 백그라운드 태스크로 시작합니다.
43    pub fn start(&self) {
44        // 1. Grid -> Subsystems (Inbound)
45        let mut grid_rx = self.grid_tx.subscribe();
46        let repl_in_tx = self.repl_in_tx.clone();
47        let lock_in_tx = self.lock_in_tx.clone();
48
49        tokio::spawn(async move {
50            while let Ok(msg) = grid_rx.recv().await {
51                match msg {
52                    GridMessage::Replication(r) => {
53                        let _ = repl_in_tx.send(r);
54                    }
55                    GridMessage::Lock(l) => {
56                        let _ = lock_in_tx.send(l);
57                    }
58                    GridMessage::Storage(_) => {
59                        // TODO: Implement Storage handling
60                    }
61                    GridMessage::Query(_) => {
62                        // Query 메시지는 GridManager가 직접 핸들링하므로 라우터 브로드캐스트에서는 무시.
63                    }
64                }
65            }
66        });
67
68        // 2. Subsystems -> Grid (Outbound Replication)
69        let mut repl_out_rx = self.repl_out_tx.subscribe();
70        let grid_tx_repl = self.grid_tx.clone();
71        tokio::spawn(async move {
72            while let Ok(msg) = repl_out_rx.recv().await {
73                let _ = grid_tx_repl.send(GridMessage::Replication(msg));
74            }
75        });
76
77        // 3. Subsystems -> Grid (Outbound Lock)
78        let mut lock_out_rx = self.lock_out_tx.subscribe();
79        let grid_tx_lock = self.grid_tx.clone();
80        tokio::spawn(async move {
81            while let Ok(msg) = lock_out_rx.recv().await {
82                let _ = grid_tx_lock.send(GridMessage::Lock(msg));
83            }
84        });
85
86        // 4. Subsystems -> Grid (Outbound Storage)
87        let mut storage_out_rx = self.storage_out_tx.subscribe();
88        let grid_tx_storage = self.grid_tx.clone();
89        tokio::spawn(async move {
90            while let Ok(msg) = storage_out_rx.recv().await {
91                let _ = grid_tx_storage.send(GridMessage::Storage(msg));
92            }
93        });
94    }
95}