use core::borrow::Borrow;
use smoltcp::iface::SocketHandle;
use crate::{
Error,
command::{Command, Request, Response},
};
pub trait UpgradableChannel {
fn upgrade(&self) -> Result<impl Borrow<flume::Sender<Request>>, ChannelClosedError>;
}
#[derive(Debug, thiserror::Error, Clone, Copy, Eq, PartialEq)]
#[error("the remote end of the channel has closed")]
pub struct ChannelClosedError;
impl From<ChannelClosedError> for Error {
fn from(_: ChannelClosedError) -> Self {
Error::Internal(crate::InternalErrorKind::InternalChannelClosed)
}
}
#[cfg(feature = "std")]
impl From<ChannelClosedError> for std::io::Error {
fn from(_: ChannelClosedError) -> Self {
std::io::ErrorKind::BrokenPipe.into()
}
}
impl UpgradableChannel for flume::Sender<Request> {
fn upgrade(&self) -> Result<impl Borrow<flume::Sender<Request>>, ChannelClosedError> {
Ok(self)
}
}
impl UpgradableChannel for flume::WeakSender<Request> {
fn upgrade(&self) -> Result<impl Borrow<flume::Sender<Request>>, ChannelClosedError> {
flume::WeakSender::<Request>::upgrade(self).ok_or(ChannelClosedError)
}
}
impl<T> UpgradableChannel for &T
where
T: UpgradableChannel + ?Sized,
{
fn upgrade(&self) -> Result<impl Borrow<flume::Sender<Request>>, ChannelClosedError> {
T::upgrade(self)
}
}
pub fn request_blocking(
command_tx: impl UpgradableChannel,
handle: Option<SocketHandle>,
command: impl Into<Command>,
) -> Result<Response, ChannelClosedError> {
fn _request_blocking(
command_tx: &flume::Sender<Request>,
handle: Option<SocketHandle>,
command: Command,
) -> Result<Response, ChannelClosedError> {
let (resp_tx, resp_rx) = flume::bounded(1);
command_tx
.send(Request {
handle,
command,
resp: resp_tx,
})
.map_err(|_| ChannelClosedError)?;
resp_rx.recv().map_err(|_| ChannelClosedError)
}
let ch = command_tx.upgrade()?;
let ch = ch.borrow();
_request_blocking(ch, handle, command.into())
}
pub fn request(
command_tx: impl UpgradableChannel,
handle: Option<SocketHandle>,
command: impl Into<Command>,
) -> impl Future<Output = Result<Response, Error>> + Send {
async fn _request(
command_tx: &flume::Sender<Request>,
handle: Option<SocketHandle>,
command: Command,
) -> Result<Response, Error> {
let (resp_tx, resp_rx) = flume::bounded(1);
command_tx
.send_async(Request {
handle,
command,
resp: resp_tx,
})
.await?;
resp_rx.recv_async().await.map_err(Error::from)
}
let ch = command_tx.upgrade().map(|x| x.borrow().clone());
let command = command.into();
async move {
let ch = ch?;
_request(&ch, handle, command).await
}
}
pub fn request_nonblocking(
command_tx: impl UpgradableChannel,
handle: Option<SocketHandle>,
command: impl Into<Command>,
) -> Result<(), ChannelClosedError> {
fn _request_nonblocking(
command_tx: &flume::Sender<Request>,
handle: Option<SocketHandle>,
command: Command,
) -> Result<(), ChannelClosedError> {
let (resp_tx, _resp_rx) = flume::bounded(1);
match command_tx.try_send(Request {
handle,
command,
resp: resp_tx,
}) {
Ok(_) | Err(flume::TrySendError::Full(_)) => Ok(()),
Err(flume::TrySendError::Disconnected(_)) => Err(ChannelClosedError),
}
}
let ch = command_tx.upgrade()?;
let ch = ch.borrow();
_request_nonblocking(ch, handle, command.into())
}