1use crate::grid::protocol::{GridMessage, LockMessage};
7use crate::replication::protocol::ReplicationMessage;
8use tokio::sync::broadcast;
9
10pub struct GridRouter {
11 pub grid_tx: broadcast::Sender<GridMessage>,
13
14 pub repl_in_tx: broadcast::Sender<ReplicationMessage>,
16 pub lock_in_tx: broadcast::Sender<LockMessage>,
17
18 pub repl_out_tx: broadcast::Sender<ReplicationMessage>,
20 pub lock_out_tx: broadcast::Sender<LockMessage>,
21 pub storage_out_tx: broadcast::Sender<crate::grid::protocol::StorageMessage>,
22}
23
24impl GridRouter {
25 pub fn new(grid_tx: broadcast::Sender<GridMessage>) -> Self {
26 let (repl_in_tx, _) = broadcast::channel(1024);
27 let (lock_in_tx, _) = broadcast::channel(1024);
28 let (repl_out_tx, _) = broadcast::channel(1024);
29 let (lock_out_tx, _) = broadcast::channel(1024);
30 let (storage_out_tx, _) = broadcast::channel(1024);
31
32 Self {
33 grid_tx,
34 repl_in_tx,
35 lock_in_tx,
36 repl_out_tx,
37 lock_out_tx,
38 storage_out_tx,
39 }
40 }
41
42 pub fn start(&self) {
44 let mut grid_rx = self.grid_tx.subscribe();
46 let repl_in_tx = self.repl_in_tx.clone();
47 let lock_in_tx = self.lock_in_tx.clone();
48
49 tokio::spawn(async move {
50 while let Ok(msg) = grid_rx.recv().await {
51 match msg {
52 GridMessage::Replication(r) => {
53 let _ = repl_in_tx.send(r);
54 }
55 GridMessage::Lock(l) => {
56 let _ = lock_in_tx.send(l);
57 }
58 GridMessage::Storage(_) => {
59 }
61 GridMessage::Query(_) => {
62 }
64 }
65 }
66 });
67
68 let mut repl_out_rx = self.repl_out_tx.subscribe();
70 let grid_tx_repl = self.grid_tx.clone();
71 tokio::spawn(async move {
72 while let Ok(msg) = repl_out_rx.recv().await {
73 let _ = grid_tx_repl.send(GridMessage::Replication(msg));
74 }
75 });
76
77 let mut lock_out_rx = self.lock_out_tx.subscribe();
79 let grid_tx_lock = self.grid_tx.clone();
80 tokio::spawn(async move {
81 while let Ok(msg) = lock_out_rx.recv().await {
82 let _ = grid_tx_lock.send(GridMessage::Lock(msg));
83 }
84 });
85
86 let mut storage_out_rx = self.storage_out_tx.subscribe();
88 let grid_tx_storage = self.grid_tx.clone();
89 tokio::spawn(async move {
90 while let Ok(msg) = storage_out_rx.recv().await {
91 let _ = grid_tx_storage.send(GridMessage::Storage(msg));
92 }
93 });
94 }
95}