jstream_ext/lib.rs
1//!
2//! # Introduction
3//!
4//! Extensions to the [`Stream`](futures::Stream) and [`TryStream`](futures::TryStream) traits
5//! which implement behavior that I've implemented at least a few times while working with them.
6//!
7//! To use these extensions, simply `use` the [`JStreamExt`](crate::JStreamExt) or
8//! [`JTryStreamExt`](crate::JTryStreamExt) items exported by this crate.
9//!
10//! # Summary
11//!
12//! Here's a list of the various extensions provided by this crate:
13//!
14//! ## `Stream` Extensions
15//!
16//! The extensions to [`Stream`](futures::Stream) are provided by the [`JStreamExt`](crate::JStreamExt)
17//! trait.
18//!
19//! * [`dedup`](crate::JStreamExt::dedup) - remove duplicate items from a stream
20//! * [`fold_mut`](crate::JStreamExt::fold_mut) - Similar to [`fold`](futures::StreamExt::fold), but
21//! asks for a `(&mut T, Self::Item)` -> `Future<Output=()>` instead of a
22//! `(T, Self::Item)` -> `Future<Output=T>` folding function.
23//! * [`first`](crate::JStreamExt::first) - turns a stream into a future which emits only the first
24//! item emitted by the source.
25//! * [`nth`](crate::JStreamExt::nth) - turns a stream into a future which emits an item after skipping
26//! a specified number of preceding items.
27//!
28//! ## `TryStream` Extensions
29//!
30//! The extensions to [`TryStream`](futures::TryStream) are provided by the [`JTryStreamExt`](crate::JTryStreamExt)
31//! trait.
32//!
33//! * [`try_first`](crate::JTryStreamExt::try_first) - turns the stream into a future which emits only
34//! the first result emitted by the source.
35//! * [`try_nth`](crate::JTryStreamExt::try_nth) - turns the stream into a future which emits an item
36//! after skipping a specified number of preceding items, or emits an error immediately when encountered.
37//! * [`try_filter_map_ok`](crate::JTryStreamExt::try_filter_map_ok) - similar to
38//! [`filter_map`](futures::StreamExt::filter_map), except it allows you to filter-map on the `Ok`
39//! part of the `TryStream`, and it emits any errors immediately when they are encountered.
40//! * [`try_dedup`](crate::JTryStreamExt::try_dedup) - remove duplicate items from a stream, but also
41//! emit any errors immediately when they are seen.
42//! * [`fuse_on_fail`](crate::JTryStreamExt::fuse_on_fail) - if an error is seen, "fuse" the stream
43//! such that it panics if `try_poll_next` is called after an `Err(Self::Error)` item is emitted.
44//! This also makes a [`TryStream`](futures::TryStream) implement [`FusedStream`](futures::stream::FusedStream)
45//! regardless if the source implements that trait.
46//! * [`try_fold_mut`](crate::JTryStreamExt::try_fold_mut) - Similar to
47//! [`try_fold`](futures::TryStreamExt::try_fold), but asks for a
48//! `(&mut T, Self::Ok)` -> `Future<Output=Result<(), Self::Error>>` instead of a
49//! `(T, Self::Ok)` -> `Future<Output=Result<T, Self::Error>>` folding function.
50//!
51
52#[macro_use]
53extern crate futures;
54
55macro_rules! op_mods {
56 {$($nam: ident),*$(,)*} => {
57 $(mod $nam;)*
58
59 /// The various structs which wrap various [`Stream`](futures::Stream) and
60 /// [`TryStream`](futures::TryStream) upstreams to implement various behavior live in this
61 /// module.
62 ///
63 /// You should not have to directly import this module. Instead, use the extension traits
64 /// detailed in the module documentation (see: [`JStreamExt`](crate::JStreamExt) and
65 /// [`JTryStreamExt`](crate::JTryStreamExt)).
66 pub mod ops {
67 $(pub use super::$nam::*;)*
68 }
69 }
70}
71
72#[cfg(feature = "sink")]
73macro_rules! delegate_sink {
74 ($field:ident, $err: ty, $item: ty) => {
75 type Error = $err;
76
77 fn poll_ready(
78 self: core::pin::Pin<&mut Self>,
79 cx: &mut core::task::Context<'_>,
80 ) -> core::task::Poll<Result<(), Self::Error>> {
81 self.project().$field.poll_ready(cx)
82 }
83
84 fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
85 self.project().$field.start_send(item)
86 }
87
88 fn poll_flush(
89 self: core::pin::Pin<&mut Self>,
90 cx: &mut core::task::Context<'_>,
91 ) -> core::task::Poll<Result<(), Self::Error>> {
92 self.project().$field.poll_flush(cx)
93 }
94
95 fn poll_close(
96 self: core::pin::Pin<&mut Self>,
97 cx: &mut core::task::Context<'_>,
98 ) -> core::task::Poll<Result<(), Self::Error>> {
99 self.project().$field.poll_close(cx)
100 }
101 };
102}
103
104macro_rules! delegate_fused {
105 ($nam: ident) => {
106 fn is_terminated(&self) -> bool {
107 self.$nam.is_terminated()
108 }
109 };
110}
111
112op_mods! {
113 fuse_on_fail,
114 dedup,
115 try_filter_map_ok,
116 nth,
117 fold_mut,
118}
119
120pub(crate) mod op_prelude {
121 #[cfg(feature = "sink")]
122 pub use futures::sink::Sink;
123 pub use futures::stream::FusedStream;
124 pub use futures::{Future, Stream, TryFuture, TryStream};
125 pub use pin_project_lite::pin_project;
126 pub use std::marker::PhantomData;
127 pub use std::pin::Pin;
128 pub use std::task::{Context, Poll};
129}
130
131mod ext;
132pub use ext::*;