use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_util::lock::Mutex;
use futures_util::stream::{Fuse, Stream, StreamExt};
use futures_util::{ready, FutureExt};
#[allow(clippy::type_complexity)]
#[must_use = "stream do nothing unless polled"]
pub(crate) struct RcStream<S: Stream>
where
S: Stream + Send + Unpin,
S::Item: Clone + Send,
{
stream_and_result: Arc<Mutex<(Fuse<S>, Vec<S::Item>)>>,
pos: usize,
}
pub(crate) fn rc_stream<S>(stream: S) -> RcStream<S>
where
S: Stream + Unpin + Send,
S::Item: Clone + Send,
{
let stream_and_result = Arc::new(Mutex::new((stream.fuse(), vec![])));
RcStream {
stream_and_result,
pos: 0,
}
}
impl<S> Stream for RcStream<S>
where
S: Stream + Send + Unpin,
S::Item: Clone + Send,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut stream_and_result = ready!(self.stream_and_result.lock().poll_unpin(cx));
let (ref mut stream, ref mut stored_result) = *stream_and_result;
if stored_result.len() > self.pos {
let result = stored_result[self.pos].clone();
drop(stream_and_result); self.pos += 1;
return Poll::Ready(Some(result));
}
match stream.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => {
if let Some(ref result) = result {
stored_result.push(result.clone());
}
Poll::Ready(result)
}
}
}
}
impl<S> Clone for RcStream<S>
where
S: Stream + Send + Unpin,
S::Item: Clone + Send + Unpin,
{
fn clone(&self) -> Self {
Self {
stream_and_result: Arc::clone(&self.stream_and_result),
pos: 0, }
}
}
#[cfg(test)]
mod tests {
use futures::executor::block_on;
use futures::future;
use futures_util::stream::once;
use crate::proto::error::{ProtoError, ProtoErrorKind};
use crate::proto::xfer::FirstAnswer;
use super::*;
#[test]
fn test_rc_stream() {
let future = future::ok::<usize, ProtoError>(1_usize);
let rc = rc_stream(once(future));
let i = block_on(rc.clone().first_answer()).ok().unwrap();
assert_eq!(i, 1);
let i = block_on(rc.first_answer()).ok().unwrap();
assert_eq!(i, 1);
}
#[test]
fn test_rc_stream_failed() {
let future = future::err::<usize, ProtoError>(ProtoError::from(ProtoErrorKind::Busy));
let rc = rc_stream(once(future));
let i = block_on(rc.clone().first_answer()).err().unwrap();
assert!(matches!(i.kind(), ProtoErrorKind::Busy));
let i = block_on(rc.first_answer()).err().unwrap();
assert!(matches!(i.kind(), ProtoErrorKind::Busy));
}
}