use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream;
pub struct PriorityMerge<T> {
streams: Vec<Pin<Box<dyn Stream<Item = T> + Send>>>,
}
impl<T> PriorityMerge<T> {
pub fn new<S1, S2>(high_priority: S1, low_priority: S2) -> Self
where
S1: Stream<Item = T> + Send + 'static,
S2: Stream<Item = T> + Send + 'static,
{
Self {
streams: vec![Box::pin(high_priority), Box::pin(low_priority)],
}
}
}
impl<T> Stream for PriorityMerge<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut result = None;
self.streams.retain_mut(|stream| {
if result.is_some() {
return true;
}
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
result = Some(item);
true }
Poll::Ready(None) => false, Poll::Pending => true, }
});
match result {
Some(item) => Poll::Ready(Some(item)),
None if self.streams.is_empty() => Poll::Ready(None),
None => Poll::Pending,
}
}
}