jstream_ext/
lib.rs

1//!
2//! # Introduction
3//!
4//! Extensions to the [`Stream`](futures::Stream) and [`TryStream`](futures::TryStream) traits
5//! which implement behavior that I've implemented at least a few times while working with them.
6//!
7//! To use these extensions, simply `use` the [`JStreamExt`](crate::JStreamExt) or
8//! [`JTryStreamExt`](crate::JTryStreamExt) items exported by this crate.
9//!
10//! # Summary
11//!
12//! Here's a list of the various extensions provided by this crate:
13//!
14//! ## `Stream` Extensions
15//!
16//! The extensions to [`Stream`](futures::Stream) are provided by the [`JStreamExt`](crate::JStreamExt)
17//! trait.
18//!
19//! * [`dedup`](crate::JStreamExt::dedup) - remove duplicate items from a stream
20//! * [`fold_mut`](crate::JStreamExt::fold_mut) - Similar to [`fold`](futures::StreamExt::fold), but
21//!   asks for a `(&mut T, Self::Item)` -> `Future<Output=()>` instead of a
22//!   `(T, Self::Item)` -> `Future<Output=T>` folding function.
23//! * [`first`](crate::JStreamExt::first) - turns a stream into a future which emits only the first
24//!   item emitted by the source.
25//! * [`nth`](crate::JStreamExt::nth) - turns a stream into a future which emits an item after skipping
26//!   a specified number of preceding items.
27//!
28//! ## `TryStream` Extensions
29//!
30//! The extensions to [`TryStream`](futures::TryStream) are provided by the [`JTryStreamExt`](crate::JTryStreamExt)
31//! trait.
32//!
33//! * [`try_first`](crate::JTryStreamExt::try_first) - turns the stream into a future which emits only
34//!   the first result emitted by the source.
35//! * [`try_nth`](crate::JTryStreamExt::try_nth) - turns the stream into a future which emits an item
36//!   after skipping a specified number of preceding items, or emits an error immediately when encountered.
37//! * [`try_filter_map_ok`](crate::JTryStreamExt::try_filter_map_ok) - similar to
38//!   [`filter_map`](futures::StreamExt::filter_map), except it allows you to filter-map on the `Ok`
39//!   part of the `TryStream`, and it emits any errors immediately when they are encountered.
40//! * [`try_dedup`](crate::JTryStreamExt::try_dedup) - remove duplicate items from a stream, but also
41//!   emit any errors immediately when they are seen.
42//! * [`fuse_on_fail`](crate::JTryStreamExt::fuse_on_fail) - if an error is seen, "fuse" the stream
43//!   such that it panics if `try_poll_next` is called after an `Err(Self::Error)` item is emitted.
44//!   This also makes a [`TryStream`](futures::TryStream) implement [`FusedStream`](futures::stream::FusedStream)
45//!   regardless if the source implements that trait.
46//! * [`try_fold_mut`](crate::JTryStreamExt::try_fold_mut) - Similar to
47//!   [`try_fold`](futures::TryStreamExt::try_fold), but asks for a
48//!   `(&mut T, Self::Ok)` -> `Future<Output=Result<(), Self::Error>>` instead of a
49//!   `(T, Self::Ok)` -> `Future<Output=Result<T, Self::Error>>` folding function.
50//!
51
52#[macro_use]
53extern crate futures;
54
55macro_rules! op_mods {
56    {$($nam: ident),*$(,)*} => {
57        $(mod $nam;)*
58
59        /// The various structs which wrap various [`Stream`](futures::Stream) and
60        /// [`TryStream`](futures::TryStream) upstreams to implement various behavior live in this
61        /// module.
62        ///
63        /// You should not have to directly import this module. Instead, use the extension traits
64        /// detailed in the module documentation (see: [`JStreamExt`](crate::JStreamExt) and
65        /// [`JTryStreamExt`](crate::JTryStreamExt)).
66        pub mod ops {
67            $(pub use super::$nam::*;)*
68        }
69    }
70}
71
72#[cfg(feature = "sink")]
73macro_rules! delegate_sink {
74    ($field:ident, $err: ty, $item: ty) => {
75        type Error = $err;
76
77        fn poll_ready(
78            self: core::pin::Pin<&mut Self>,
79            cx: &mut core::task::Context<'_>,
80        ) -> core::task::Poll<Result<(), Self::Error>> {
81            self.project().$field.poll_ready(cx)
82        }
83
84        fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
85            self.project().$field.start_send(item)
86        }
87
88        fn poll_flush(
89            self: core::pin::Pin<&mut Self>,
90            cx: &mut core::task::Context<'_>,
91        ) -> core::task::Poll<Result<(), Self::Error>> {
92            self.project().$field.poll_flush(cx)
93        }
94
95        fn poll_close(
96            self: core::pin::Pin<&mut Self>,
97            cx: &mut core::task::Context<'_>,
98        ) -> core::task::Poll<Result<(), Self::Error>> {
99            self.project().$field.poll_close(cx)
100        }
101    };
102}
103
104macro_rules! delegate_fused {
105    ($nam: ident) => {
106        fn is_terminated(&self) -> bool {
107            self.$nam.is_terminated()
108        }
109    };
110}
111
112op_mods! {
113    fuse_on_fail,
114    dedup,
115    try_filter_map_ok,
116    nth,
117    fold_mut,
118}
119
120pub(crate) mod op_prelude {
121    #[cfg(feature = "sink")]
122    pub use futures::sink::Sink;
123    pub use futures::stream::FusedStream;
124    pub use futures::{Future, Stream, TryFuture, TryStream};
125    pub use pin_project_lite::pin_project;
126    pub use std::marker::PhantomData;
127    pub use std::pin::Pin;
128    pub use std::task::{Context, Poll};
129}
130
131mod ext;
132pub use ext::*;