use std::time::Duration;
use crate::core::io::{IncomingFrame, OutgoingFrame};
use crate::core::utils::Closable;
#[cfg(feature = "unstable")]
use crate::error::TryRecvResult;
use crate::error::{RecvResult, RecvTimeoutResult, SendError, SendResult};
use crate::prelude::*;
use crate::sync::prelude::*;
#[cfg(doc)]
#[cfg(feature = "unstable")]
use crate::sync::io::{Channel, Connection};
#[derive(Clone, Debug)]
pub struct OutgoingFrameSender<V: MaybeVersioned> {
sender: mpmc::Sender<OutgoingFrame<V>>,
state: Closable,
}
#[derive(Clone, Debug)]
pub struct OutgoingFrameHandler<V: MaybeVersioned> {
receiver: mpmc::Receiver<OutgoingFrame<V>>,
}
#[derive(Clone, Debug)]
pub struct IncomingFrameProducer<V: MaybeVersioned> {
sender: mpmc::Sender<IncomingFrame<V>>,
}
#[derive(Clone, Debug)]
pub struct IncomingFrameReceiver<V: MaybeVersioned> {
receiver: mpmc::Receiver<IncomingFrame<V>>,
}
pub fn outgoing_channel<V: MaybeVersioned>(
state: Closable,
) -> (OutgoingFrameSender<V>, OutgoingFrameHandler<V>) {
let (tx, rx) = mpmc::channel();
(
OutgoingFrameSender::new(tx, state),
OutgoingFrameHandler::new(rx),
)
}
pub fn incoming_channel<V: MaybeVersioned>() -> (IncomingFrameProducer<V>, IncomingFrameReceiver<V>)
{
let (tx, rx) = mpmc::channel();
(
IncomingFrameProducer::new(tx),
IncomingFrameReceiver::new(rx),
)
}
impl<V: MaybeVersioned> OutgoingFrameSender<V> {
fn new(sender: mpmc::Sender<OutgoingFrame<V>>, state: Closable) -> Self {
Self { sender, state }
}
#[inline(always)]
pub fn send(&self, frame: Frame<V>) -> SendResult<OutgoingFrame<V>> {
self.send_raw(OutgoingFrame::new(frame))
}
pub fn send_raw(&self, frame: OutgoingFrame<V>) -> SendResult<OutgoingFrame<V>> {
if self.state.is_closed() {
return Err(SendError(frame));
}
self.sender.send(frame)
}
}
impl<V: MaybeVersioned> OutgoingFrameHandler<V> {
fn new(receiver: mpmc::Receiver<OutgoingFrame<V>>) -> Self {
Self { receiver }
}
#[inline(always)]
pub fn recv(&self) -> RecvResult<OutgoingFrame<V>> {
self.receiver.recv()
}
#[inline(always)]
pub fn recv_timeout(&self, timeout: Duration) -> RecvTimeoutResult<OutgoingFrame<V>> {
self.receiver.recv_timeout(timeout)
}
}
impl<V: MaybeVersioned> IncomingFrameProducer<V> {
fn new(sender: mpmc::Sender<IncomingFrame<V>>) -> Self {
Self { sender }
}
#[allow(clippy::result_large_err)]
pub fn send(&self, frame: IncomingFrame<V>) -> SendResult<IncomingFrame<V>> {
self.sender.send(frame)
}
}
impl<V: MaybeVersioned> IncomingFrameReceiver<V> {
fn new(receiver: mpmc::Receiver<IncomingFrame<V>>) -> Self {
Self { receiver }
}
#[cfg(feature = "unstable")]
#[inline(always)]
pub fn recv(&self) -> RecvResult<IncomingFrame<V>> {
self.receiver.recv()
}
#[inline(always)]
pub fn recv_timeout(&self, timeout: Duration) -> RecvTimeoutResult<IncomingFrame<V>> {
self.receiver.recv_timeout(timeout)
}
#[cfg(feature = "unstable")]
#[inline(always)]
pub fn try_recv(&self) -> TryRecvResult<IncomingFrame<V>> {
self.receiver.try_recv()
}
}