1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
//! //! # Introduction //! //! Extensions to the [`Stream`](futures::Stream) and [`TryStream`](futures::TryStream) traits //! which implement behavior that I've implemented at least a few times while working with them. //! //! To use these extensions, simply `use` the [`JStreamExt`](crate::JStreamExt) or //! [`JTryStreamExt`](crate::JTryStreamExt) items exported by this crate. //! //! # Summary //! //! Here's a list of the various extensions provided by this crate: //! //! ## `Stream` Extensions //! //! The extensions to [`Stream`](futures::Stream) are provided by the [`JStreamExt`](crate::JStreamExt) //! trait. //! //! * [`dedup`](crate::JStreamExt::dedup) - remove duplicate items from a stream //! * [`fold_mut`](crate::JStreamExt::fold_mut) - Similar to [`fold`](futures::StreamExt::fold), but //! asks for a `(&mut T, Self::Item)` -> `Future<Output=()>` instead of a //! `(T, Self::Item)` -> `Future<Output=T>` folding function. //! * [`first`](crate::JStreamExt::first) - turns a stream into a future which emits only the first //! item emitted by the source. //! * [`nth`](crate::JStreamExt::nth) - turns a stream into a future which emits an item after skipping //! a specified number of preceding items. //! //! ## `TryStream` Extensions //! //! The extensions to [`TryStream`](futures::TryStream) are provided by the [`JTryStreamExt`](crate::JTryStreamExt) //! trait. //! //! * [`try_first`](crate::JTryStreamExt::try_first) - turns the stream into a future which emits only //! the first result emitted by the source. //! * [`try_nth`](crate::JTryStreamExt::try_nth) - turns the stream into a future which emits an item //! after skipping a specified number of preceding items, or emits an error immediately when encountered. //! * [`try_filter_map_ok`](crate::JTryStreamExt::try_filter_map_ok) - similar to //! [`filter_map`](futures::StreamExt::filter_map), except it allows you to filter-map on the `Ok` //! part of the `TryStream`, and it emits any errors immediately when they are encountered. //! * [`try_dedup`](crate::JTryStreamExt::try_dedup) - remove duplicate items from a stream, but also //! emit any errors immediately when they are seen. //! * [`fuse_on_fail`](crate::JTryStreamExt::fuse_on_fail) - if an error is seen, "fuse" the stream //! such that it panics if `try_poll_next` is called after an `Err(Self::Error)` item is emitted. //! This also makes a [`TryStream`](futures::TryStream) implement [`FusedStream`](futures::stream::FusedStream) //! regardless if the source implements that trait. //! * [`try_fold_mut`](crate::JTryStreamExt::try_fold_mut) - Similar to //! [`try_fold`](futures::TryStreamExt::try_fold), but asks for a //! `(&mut T, Self::Ok)` -> `Future<Output=Result<(), Self::Error>>` instead of a //! `(T, Self::Ok)` -> `Future<Output=Result<T, Self::Error>>` folding function. //! #[macro_use] extern crate futures; macro_rules! op_mods { {$($nam: ident),*$(,)*} => { $(mod $nam;)* /// The various structs which wrap various [`Stream`](futures::Stream) and /// [`TryStream`](futures::TryStream) upstreams to implement various behavior live in this /// module. /// /// You should not have to directly import this module. Instead, use the extension traits /// detailed in the module documentation (see: [`JStreamExt`](crate::JStreamExt) and /// [`JTryStreamExt`](crate::JTryStreamExt)). pub mod ops { $(pub use super::$nam::*;)* } } } #[cfg(feature = "sink")] macro_rules! delegate_sink { ($field:ident, $err: ty, $item: ty) => { type Error = $err; fn poll_ready( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_ready(cx) } fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { self.project().$field.start_send(item) } fn poll_flush( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_flush(cx) } fn poll_close( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_close(cx) } }; } macro_rules! delegate_fused { ($nam: ident) => { fn is_terminated(&self) -> bool { self.$nam.is_terminated() } }; } op_mods! { fuse_on_fail, dedup, try_filter_map_ok, nth, fold_mut, } pub(crate) mod op_prelude { #[cfg(feature = "sink")] pub use futures::sink::Sink; pub use futures::stream::FusedStream; pub use futures::{Future, Stream, TryFuture, TryStream}; pub use pin_project_lite::pin_project; pub use std::marker::PhantomData; pub use std::pin::Pin; pub use std::task::{Context, Poll}; } mod ext; pub use ext::*;