use futures::{future, stream::SplitSink};
use futures_util::sink::SinkExt;
use std::sync::Arc;
use tokio::{
self,
net::TcpStream,
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
},
};
use tokio_util::codec::Framed;
use crate::communication::{
CommunicationError, ControlMessage, ControlMessageCodec, ControlMessageHandler,
InterProcessMessage, MessageCodec,
};
use crate::node::NodeId;
use crate::scheduler::endpoints_manager::ChannelsToSenders;
#[allow(dead_code)]
pub(crate) struct DataSender {
node_id: NodeId,
sink: SplitSink<Framed<TcpStream, MessageCodec>, InterProcessMessage>,
rx: UnboundedReceiver<InterProcessMessage>,
control_tx: UnboundedSender<ControlMessage>,
control_rx: UnboundedReceiver<ControlMessage>,
}
impl DataSender {
pub(crate) async fn new(
node_id: NodeId,
sink: SplitSink<Framed<TcpStream, MessageCodec>, InterProcessMessage>,
channels_to_senders: Arc<Mutex<ChannelsToSenders>>,
control_handler: &mut ControlMessageHandler,
) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
channels_to_senders.lock().await.add_sender(node_id, tx);
let (control_tx, control_rx) = mpsc::unbounded_channel();
control_handler.add_channel_to_data_sender(node_id, control_tx);
Self {
node_id,
sink,
rx,
control_tx: control_handler.get_channel_to_handler(),
control_rx,
}
}
pub(crate) async fn run(&mut self) -> Result<(), CommunicationError> {
self.control_tx
.send(ControlMessage::DataSenderInitialized(self.node_id))
.map_err(CommunicationError::from)?;
loop {
match self.rx.recv().await {
Some(msg) => {
if let Err(e) = self.sink.send(msg).await.map_err(CommunicationError::from) {
return Err(e);
}
}
None => return Err(CommunicationError::Disconnected),
}
}
}
}
pub(crate) async fn run_senders(senders: Vec<DataSender>) -> Result<(), CommunicationError> {
future::join_all(
senders
.into_iter()
.map(|mut sender| tokio::spawn(async move { sender.run().await })),
)
.await;
Ok(())
}
#[allow(dead_code)]
pub(crate) struct ControlSender {
node_id: NodeId,
sink: SplitSink<Framed<TcpStream, ControlMessageCodec>, ControlMessage>,
rx: UnboundedReceiver<ControlMessage>,
control_tx: UnboundedSender<ControlMessage>,
control_rx: UnboundedReceiver<ControlMessage>,
}
impl ControlSender {
pub(crate) fn new(
node_id: NodeId,
sink: SplitSink<Framed<TcpStream, ControlMessageCodec>, ControlMessage>,
control_handler: &mut ControlMessageHandler,
) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
control_handler.add_channel_to_node(node_id, tx);
let (control_tx, control_rx) = mpsc::unbounded_channel();
control_handler.add_channel_to_control_sender(node_id, control_tx);
Self {
node_id,
sink,
rx,
control_tx: control_handler.get_channel_to_handler(),
control_rx,
}
}
pub(crate) async fn run(&mut self) -> Result<(), CommunicationError> {
self.control_tx
.send(ControlMessage::ControlSenderInitialized(self.node_id))
.map_err(CommunicationError::from)?;
loop {
match self.rx.recv().await {
Some(msg) => {
if let Err(e) = self.sink.send(msg).await.map_err(CommunicationError::from) {
return Err(e);
}
}
None => {
return Err(CommunicationError::Disconnected);
}
}
}
}
}
pub(crate) async fn run_control_senders(
mut senders: Vec<ControlSender>,
) -> Result<(), CommunicationError> {
future::join_all(senders.iter_mut().map(|sender| sender.run())).await;
Ok(())
}