use crate::grid::protocol::{GridMessage, LockMessage};
use crate::replication::protocol::ReplicationMessage;
use tokio::sync::broadcast;
pub struct GridRouter {
pub grid_tx: broadcast::Sender<GridMessage>,
pub repl_in_tx: broadcast::Sender<ReplicationMessage>,
pub lock_in_tx: broadcast::Sender<LockMessage>,
pub repl_out_tx: broadcast::Sender<ReplicationMessage>,
pub lock_out_tx: broadcast::Sender<LockMessage>,
}
impl GridRouter {
pub fn new(grid_tx: broadcast::Sender<GridMessage>) -> Self {
let (repl_in_tx, _) = broadcast::channel(1024);
let (lock_in_tx, _) = broadcast::channel(1024);
let (repl_out_tx, _) = broadcast::channel(1024);
let (lock_out_tx, _) = broadcast::channel(1024);
Self {
grid_tx,
repl_in_tx,
lock_in_tx,
repl_out_tx,
lock_out_tx,
}
}
pub fn start(&self) {
let mut grid_rx = self.grid_tx.subscribe();
let repl_in_tx = self.repl_in_tx.clone();
let lock_in_tx = self.lock_in_tx.clone();
tokio::spawn(async move {
while let Ok(msg) = grid_rx.recv().await {
match msg {
GridMessage::Replication(r) => {
let _ = repl_in_tx.send(r);
}
GridMessage::Lock(l) => {
let _ = lock_in_tx.send(l);
}
}
}
});
let mut repl_out_rx = self.repl_out_tx.subscribe();
let grid_tx_repl = self.grid_tx.clone();
tokio::spawn(async move {
while let Ok(msg) = repl_out_rx.recv().await {
let _ = grid_tx_repl.send(GridMessage::Replication(msg));
}
});
let mut lock_out_rx = self.lock_out_tx.subscribe();
let grid_tx_lock = self.grid_tx.clone();
tokio::spawn(async move {
while let Ok(msg) = lock_out_rx.recv().await {
let _ = grid_tx_lock.send(GridMessage::Lock(msg));
}
});
}
}