use core::borrow::Borrow;
use smoltcp::iface::SocketHandle;
use crate::{
Error,
command::{Command, Request, Response},
};
pub trait UpgradableChannel {
fn upgrade(&self) -> Option<impl Borrow<flume::Sender<Request>>>;
fn upgrade_or_err(&self) -> Result<impl Borrow<flume::Sender<Request>>, Error> {
self.upgrade().ok_or(Error::ChannelClosed)
}
}
impl UpgradableChannel for flume::Sender<Request> {
fn upgrade(&self) -> Option<impl Borrow<flume::Sender<Request>>> {
Some(self)
}
}
impl UpgradableChannel for flume::WeakSender<Request> {
fn upgrade(&self) -> Option<impl Borrow<flume::Sender<Request>>> {
self.upgrade()
}
}
impl<T> UpgradableChannel for &T
where
T: UpgradableChannel + ?Sized,
{
fn upgrade(&self) -> Option<impl Borrow<flume::Sender<Request>>> {
T::upgrade(self)
}
}
pub fn request_blocking(
command_tx: impl UpgradableChannel,
handle: Option<SocketHandle>,
command: impl Into<Command>,
) -> Result<Response, Error> {
fn _request_blocking(
command_tx: &flume::Sender<Request>,
handle: Option<SocketHandle>,
command: Command,
) -> Result<Response, Error> {
let (resp_tx, resp_rx) = flume::bounded(1);
command_tx.send(Request {
handle,
command,
resp: resp_tx,
})?;
resp_rx.recv().map_err(Error::from)
}
let ch = command_tx.upgrade_or_err()?;
let ch = ch.borrow();
_request_blocking(ch, handle, command.into())
}
pub fn request(
command_tx: impl UpgradableChannel,
handle: Option<SocketHandle>,
command: impl Into<Command>,
) -> impl Future<Output = Result<Response, Error>> + Send {
async fn _request(
command_tx: &flume::Sender<Request>,
handle: Option<SocketHandle>,
command: Command,
) -> Result<Response, Error> {
let (resp_tx, resp_rx) = flume::bounded(1);
command_tx
.send_async(Request {
handle,
command,
resp: resp_tx,
})
.await?;
resp_rx.recv_async().await.map_err(Error::from)
}
let ch = command_tx.upgrade_or_err().map(|x| x.borrow().clone());
let command = command.into();
async move {
let ch = ch?;
_request(&ch, handle, command).await
}
}
pub fn request_nonblocking(
command_tx: impl UpgradableChannel,
handle: Option<SocketHandle>,
command: impl Into<Command>,
) -> Result<(), Error> {
fn _request_nonblocking(
command_tx: &flume::Sender<Request>,
handle: Option<SocketHandle>,
command: Command,
) -> Result<(), Error> {
let (resp_tx, _resp_rx) = flume::bounded(1);
match command_tx.try_send(Request {
handle,
command,
resp: resp_tx,
}) {
Ok(_) | Err(flume::TrySendError::Full(_)) => Ok(()),
Err(flume::TrySendError::Disconnected(_)) => Err(Error::ChannelClosed),
}
}
let ch = command_tx.upgrade_or_err()?;
let ch = ch.borrow();
_request_nonblocking(ch, handle, command.into())
}