use std::{future::Future, marker::PhantomPinned, ops::DerefMut, pin::Pin};
use crate::Context;
use pin_project::pin_project;
use std::task::Poll;
use self::{
chain::ChainStream, filter::FilterStream, find::FindStream, map::MapStream, merge::MergeStream,
once::OnceStream, repeat::RepeatStream,
};
mod chain;
mod errors;
mod filter;
mod find;
mod map;
mod merge;
mod once;
mod repeat;
#[cfg(feature = "logging")]
mod stream_log;
pub use errors::*;
#[must_use = "streams do nothing unless polled"]
pub trait Stream {
type Item;
fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item>;
fn recv(&mut self) -> RecvFuture<'_, Self>
where
Self: Unpin,
{
RecvFuture::new(self)
}
fn try_recv(&mut self) -> Result<Self::Item, TryRecvError>
where
Self: Unpin,
{
let pin = Pin::new(self);
match pin.poll_recv(&mut Context::empty()) {
PollRecv::Ready(value) => Ok(value),
PollRecv::Pending => Err(TryRecvError::Pending),
PollRecv::Closed => Err(TryRecvError::Closed),
}
}
#[cfg(feature = "blocking")]
fn blocking_recv(&mut self) -> Option<Self::Item>
where
Self: Unpin,
{
pollster::block_on(self.recv())
}
fn map<Map, Into>(self, map: Map) -> MapStream<Self, Map, Into>
where
Map: Fn(Self::Item) -> Into,
Self: Sized,
{
MapStream::new(self, map)
}
fn filter<Filter>(self, filter: Filter) -> FilterStream<Self, Filter>
where
Self: Sized + Unpin,
Filter: FnMut(&Self::Item) -> bool + Unpin,
{
FilterStream::new(self, filter)
}
fn merge<Other>(self, other: Other) -> MergeStream<Self, Other>
where
Other: Stream<Item = Self::Item>,
Self: Sized,
{
MergeStream::new(self, other)
}
fn chain<Other>(self, other: Other) -> ChainStream<Self, Other>
where
Other: Stream<Item = Self::Item>,
Self: Sized,
{
ChainStream::new(self, other)
}
fn find<Condition>(self, condition: Condition) -> FindStream<Self, Condition>
where
Self: Sized + Unpin,
Condition: Fn(&Self::Item) -> bool + Unpin,
{
FindStream::new(self, condition)
}
#[cfg(feature = "logging")]
fn log(self, level: log::Level) -> stream_log::StreamLog<Self>
where
Self: Sized,
Self::Item: std::fmt::Debug,
{
stream_log::StreamLog::new(self, level)
}
}
impl<S> Stream for &mut S
where
S: Stream + Unpin + ?Sized,
{
type Item = S::Item;
fn poll_recv(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
S::poll_recv(Pin::new(&mut **self), cx)
}
}
impl<P, S> Stream for Pin<P>
where
P: DerefMut<Target = S> + Unpin,
S: Stream + Unpin + ?Sized,
{
type Item = <S as Stream>::Item;
fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
Pin::get_mut(self).as_mut().poll_recv(cx)
}
}
pub fn once<T>(item: T) -> OnceStream<T> {
OnceStream::new(item)
}
pub fn repeat<T>(item: T) -> RepeatStream<T>
where
T: Clone,
{
RepeatStream::new(item)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollRecv<T> {
Ready(T),
Pending,
Closed,
}
#[pin_project]
#[must_use = "futures do nothing unless polled"]
pub struct RecvFuture<'s, S>
where
S: Stream + ?Sized,
{
recv: &'s mut S,
#[pin]
_pin: PhantomPinned,
}
impl<'s, S: Stream> RecvFuture<'s, S>
where
S: ?Sized,
{
pub fn new(recv: &'s mut S) -> RecvFuture<'s, S> {
Self {
recv,
_pin: PhantomPinned,
}
}
}
impl<'s, S> Future for RecvFuture<'s, S>
where
S: Stream + Unpin + ?Sized,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut cx: crate::Context<'_> = cx.into();
match Pin::new(this.recv).poll_recv(&mut cx) {
PollRecv::Ready(v) => Poll::Ready(Some(v)),
PollRecv::Pending => Poll::Pending,
PollRecv::Closed => Poll::Ready(None),
}
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "blocking")]
#[test]
fn test_blocking() {
use super::Stream;
use crate::test::stream::ready;
let mut stream = ready(1usize);
assert_eq!(Some(1usize), stream.blocking_recv());
}
}