use lock_api::RawMutex;
use log::error;
use futures::{
future::join,
prelude::*,
stream::{
StreamExt,
TryStreamExt,
},
};
use pin_project::pin_project;
use super::*;
use crate::channel::*;
#[pin_project]
pub struct Background<R: RawMutex, T> {
addr: i8,
#[pin]
send: Sender<R, Msg<T>>,
#[pin]
recv: Receiver<R, Msg<T>>,
}
impl<R: RawMutex, T> Clone for Background<R, T> {
fn clone(&self) -> Self {
Background {
addr: self.addr,
send: self.send.clone(),
recv: self.recv.clone(),
}
}
}
pub fn background<R, T, N, E>(net: N) -> (impl Future<Output = ()>, Background<R, T>)
where
R: RawMutex,
N: Net<T> + Unpin,
N: Stream<Item = Result<Msg<T>, E>>,
{
let addr = net.addr();
let (send_tx, mut send_rx) = channel::<R, _>();
let (recv_tx, recv_rx) = channel::<R, _>();
let (mut net_tx, mut net_rx) = StreamExt::split(net);
let net_send_task = async move {
while let Some(item) = send_rx.next().await {
let _ = net_tx.send(item).await;
}
};
let net_recv_task = async move {
while let Some(item) = net_rx.try_next().await.transpose() {
if let Ok(item) = item {
recv_tx.send(item);
}
}
error!("recv task exited");
};
let fut = async move {
join(net_send_task, net_recv_task).await;
};
let backgrounded = Background {
addr,
send: send_tx,
recv: recv_rx,
};
(fut, backgrounded)
}
impl<R, T> Sink<Msg<T>> for Background<R, T>
where
R: RawMutex,
{
type Error = !;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().send.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: Msg<T>) -> Result<(), Self::Error> {
self.project().send.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().send.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().send.poll_close(cx)
}
}
impl<R, T> Stream for Background<R, T>
where
R: RawMutex,
{
type Item = Result<Msg<T>, !>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.project().recv.poll_next(cx));
Poll::Ready(item.map(Ok))
}
}
impl<R: RawMutex, T> Net<T> for Background<R, T> {
fn addr(&self) -> i8 {
self.addr
}
}