use crate::*;
use async_channel as channel;
use futures_util::future::FutureExt;
use futures_util::stream::{StreamExt, StreamFuture, TryStream};
type ReadStream<T> = StreamDropper<T>;
#[derive(Debug)]
pub(crate) struct Channel<T> {
id: ChannelId,
add_rx: channel::Receiver<ReadStream<T>>,
stream_futures: FuturesUnordered<StreamFuture<ReadStream<T>>>,
}
impl<T> Channel<T>
where
T: TryStream + Unpin,
{
pub(crate) fn new(id: ChannelId) -> (channel::Sender<ReadStream<T>>, Self) {
let (add_tx, add_rx) = channel::unbounded::<ReadStream<T>>();
let stream_futures = FuturesUnordered::new();
(
add_tx,
Self {
add_rx,
id,
stream_futures,
},
)
}
pub(crate) async fn next(&mut self) -> (StreamId, Option<T::Item>) {
log::trace!("next() for {}", self.id);
loop {
if self.stream_futures.is_empty() {
log::trace!("channel {} is empty, awaiting streams.", self.id);
let added_stream_res = self.add_rx.recv().await;
self.process_stream_add(added_stream_res);
} else {
log::trace!("channel {} is awaiting streams or packets", self.id);
futures_util::select_biased! {
added_stream_res = self.add_rx.recv().fuse() => {
log::trace!("channel {} adding new stream", self.id);
self.process_stream_add(added_stream_res);
}
stream_output = self.stream_futures.next().fuse() => {
log::trace!("channel {} processing stream data", self.id);
return self.process_stream_output(stream_output.unwrap()).await;
}
};
}
}
}
fn process_stream_add(
&mut self,
added_stream: Result<ReadStream<T>, async_channel::RecvError>,
) {
let added_stream = added_stream.expect("Add channel was closed for adding.");
let stream_id = added_stream.id;
if added_stream.stream.is_some() {
self.stream_futures.push(added_stream.into_future());
} else {
log::warn!(
"Trying to enqueue dropped stream {} into channel {}",
stream_id,
self.id
);
}
}
async fn process_stream_output(
&self,
(output, stream): (
Option<T::Item>, // TryStream
ReadStream<T>,
),
) -> (StreamId, Option<T::Item>) {
let stream_id = stream.id;
if output.is_some() && stream.stream.is_some() {
self.stream_futures.push(stream.into_future());
}
(stream_id, output)
}
}