use bytes::Bytes;
use std::{
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::{mpsc, watch},
};
use super::{Receiver, ReceiverStream, RecvError, SendError, Sender, SenderSink};
use crate::{
agg::task::SendReq,
cfg::{Cfg, ExchangedCfg},
id::ConnId,
};
#[derive(Debug)]
pub struct Channel {
cfg: Arc<Cfg>,
remote_cfg: Option<Arc<ExchangedCfg>>,
conn_id: ConnId,
tx: mpsc::Sender<SendReq>,
tx_error: watch::Receiver<SendError>,
rx: mpsc::Receiver<Bytes>,
rx_closed: mpsc::Sender<()>,
rx_error: watch::Receiver<Option<RecvError>>,
}
impl Channel {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
cfg: Arc<Cfg>, remote_cfg: Option<Arc<ExchangedCfg>>, conn_id: ConnId, tx: mpsc::Sender<SendReq>,
tx_error: watch::Receiver<SendError>, rx: mpsc::Receiver<Bytes>, rx_closed: mpsc::Sender<()>,
rx_error: watch::Receiver<Option<RecvError>>,
) -> Self {
Self { cfg, remote_cfg, conn_id, tx, tx_error, rx, rx_closed, rx_error }
}
pub fn id(&self) -> ConnId {
self.conn_id
}
pub(crate) fn set_remote_cfg(&mut self, remote_cfg: Arc<ExchangedCfg>) {
assert!(self.remote_cfg.is_none(), "remote configuration was already set");
self.remote_cfg = Some(remote_cfg);
}
pub fn into_tx_rx(self) -> (Sender, Receiver) {
let Self { cfg, remote_cfg, conn_id, tx, tx_error, rx, rx_closed, rx_error } = self;
let tx = Sender::new(cfg, remote_cfg.unwrap(), conn_id, tx, tx_error);
let rx = Receiver::new(conn_id, rx, rx_closed, rx_error);
(tx, rx)
}
pub fn into_stream(self) -> Stream {
let (tx, rx) = self.into_tx_rx();
Stream { tx: tx.into_sink(), rx: rx.into_stream() }
}
}
#[derive(Debug)]
pub struct Stream {
tx: SenderSink,
rx: ReceiverStream,
}
impl Stream {
pub fn id(&self) -> ConnId {
self.tx.id()
}
pub fn into_split(self) -> (ReceiverStream, SenderSink) {
let Self { tx, rx } = self;
(rx, tx)
}
pub fn close(&mut self) {
self.rx.close()
}
}
impl AsyncRead for Stream {
#[inline]
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().rx).poll_read(cx, buf)
}
}
impl AsyncWrite for Stream {
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.get_mut().tx).poll_write(cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().tx).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().tx).poll_shutdown(cx)
}
}