1use crate::grid::protocol::{GridMessage, LockMessage};
7use crate::replication::protocol::ReplicationMessage;
8use tokio::sync::broadcast;
9
10pub struct GridRouter {
11 pub grid_tx: broadcast::Sender<GridMessage>,
13
14 pub repl_in_tx: broadcast::Sender<ReplicationMessage>,
16 pub lock_in_tx: broadcast::Sender<LockMessage>,
17
18 pub repl_out_tx: broadcast::Sender<ReplicationMessage>,
20 pub lock_out_tx: broadcast::Sender<LockMessage>,
21}
22
23impl GridRouter {
24 pub fn new(grid_tx: broadcast::Sender<GridMessage>) -> Self {
25 let (repl_in_tx, _) = broadcast::channel(1024);
26 let (lock_in_tx, _) = broadcast::channel(1024);
27 let (repl_out_tx, _) = broadcast::channel(1024);
28 let (lock_out_tx, _) = broadcast::channel(1024);
29
30 Self {
31 grid_tx,
32 repl_in_tx,
33 lock_in_tx,
34 repl_out_tx,
35 lock_out_tx,
36 }
37 }
38
39 pub fn start(&self) {
41 let mut grid_rx = self.grid_tx.subscribe();
43 let repl_in_tx = self.repl_in_tx.clone();
44 let lock_in_tx = self.lock_in_tx.clone();
45
46 tokio::spawn(async move {
47 while let Ok(msg) = grid_rx.recv().await {
48 match msg {
49 GridMessage::Replication(r) => {
50 let _ = repl_in_tx.send(r);
51 }
52 GridMessage::Lock(l) => {
53 let _ = lock_in_tx.send(l);
54 }
55 }
56 }
57 });
58
59 let mut repl_out_rx = self.repl_out_tx.subscribe();
61 let grid_tx_repl = self.grid_tx.clone();
62 tokio::spawn(async move {
63 while let Ok(msg) = repl_out_rx.recv().await {
64 let _ = grid_tx_repl.send(GridMessage::Replication(msg));
65 }
66 });
67
68 let mut lock_out_rx = self.lock_out_tx.subscribe();
70 let grid_tx_lock = self.grid_tx.clone();
71 tokio::spawn(async move {
72 while let Ok(msg) = lock_out_rx.recv().await {
73 let _ = grid_tx_lock.send(GridMessage::Lock(msg));
74 }
75 });
76 }
77}