#![doc = include_str!("../README.md")]
#![cfg_attr(not(any(test, feature = "std")), no_std)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
use core::marker::PhantomData;
use futures_util::Stream;
pub use futures_util::sink;
pub use futures_util::sink::Sink;
#[cfg(feature = "variadics")]
#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
pub use variadics;
pub mod filter;
pub mod filter_map;
pub mod flat_map;
pub mod flatten;
pub mod for_each;
pub mod inspect;
pub mod lazy;
pub mod lazy_sink_source;
pub mod map;
pub mod send_iter;
pub mod send_stream;
pub mod try_for_each;
pub mod unzip;
use filter::Filter;
use filter_map::FilterMap;
use flat_map::FlatMap;
use flatten::Flatten;
use for_each::ForEach;
use inspect::Inspect;
use map::Map;
use send_iter::SendIter;
use send_stream::SendStream;
use try_for_each::TryForEach;
use unzip::Unzip;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod demux_map;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub use demux_map::demux_map;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod demux_map_lazy;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub use demux_map_lazy::demux_map_lazy;
#[cfg(feature = "variadics")]
#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
pub mod demux_var;
#[cfg(feature = "variadics")]
#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
pub use demux_var::{SinkVariadic, demux_var};
pub trait SinkBuild {
type Item;
type Output<Next: Sink<Self::Item>>;
fn send_to<Next>(self, next: Next) -> Self::Output<Next>
where
Next: Sink<Self::Item>;
fn fanout<Si0, Si1>(self, sink0: Si0, sink1: Si1) -> Self::Output<sink::Fanout<Si0, Si1>>
where
Self: Sized,
Self::Item: Clone,
Si0: Sink<Self::Item>,
Si1: Sink<Self::Item, Error = Si0::Error>,
{
self.send_to(sink::SinkExt::fanout(sink0, sink1))
}
fn for_each<Func>(self, func: Func) -> Self::Output<ForEach<Func>>
where
Self: Sized,
Func: FnMut(Self::Item),
{
self.send_to(ForEach::new(func))
}
fn try_for_each<Func, Error>(self, func: Func) -> Self::Output<TryForEach<Func>>
where
Self: Sized,
Func: FnMut(Self::Item) -> Result<(), Error>,
{
self.send_to(TryForEach::new(func))
}
fn map<Func, Out>(self, func: Func) -> map::MapBuilder<Self, Func>
where
Self: Sized,
Func: FnMut(Self::Item) -> Out,
{
map::MapBuilder { prev: self, func }
}
fn filter<Func>(self, func: Func) -> filter::FilterBuilder<Self, Func>
where
Self: Sized,
Func: FnMut(&Self::Item) -> bool,
{
filter::FilterBuilder { prev: self, func }
}
fn filter_map<Func, Out>(self, func: Func) -> filter_map::FilterMapBuilder<Self, Func>
where
Self: Sized,
Func: FnMut(Self::Item) -> Option<Out>,
{
filter_map::FilterMapBuilder { prev: self, func }
}
fn flat_map<Func, IntoIter>(self, func: Func) -> flat_map::FlatMapBuilder<Self, Func>
where
Self: Sized,
Func: FnMut(Self::Item) -> IntoIter,
IntoIter: IntoIterator,
{
flat_map::FlatMapBuilder { prev: self, func }
}
fn flatten<IntoIter>(self) -> flatten::FlattenBuilder<Self>
where
Self: Sized,
Self::Item: IntoIterator,
{
flatten::FlattenBuilder { prev: self }
}
fn inspect<Func>(self, func: Func) -> inspect::InspectBuilder<Self, Func>
where
Self: Sized,
Func: FnMut(&Self::Item),
{
inspect::InspectBuilder { prev: self, func }
}
fn unzip<Si0, Si1, Item0, Item1>(self, sink0: Si0, sink1: Si1) -> Self::Output<Unzip<Si0, Si1>>
where
Self: Sized + SinkBuild<Item = (Item0, Item1)>,
Si0: Sink<Item0>,
Si1: Sink<Item1>,
Si0::Error: From<Si1::Error>,
{
self.send_to(Unzip::new(sink0, sink1))
}
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
fn demux_map<Key, ItemVal, Si>(
self,
sinks: impl Into<std::collections::HashMap<Key, Si>>,
) -> Self::Output<demux_map::DemuxMap<Key, Si>>
where
Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
Si: Sink<ItemVal> + Unpin,
{
self.send_to(demux_map(sinks))
}
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
fn demux_map_lazy<Key, ItemVal, Si, Func>(
self,
func: Func,
) -> Self::Output<demux_map_lazy::LazyDemuxSink<Key, Si, Func>>
where
Self: Sized + SinkBuild<Item = (Key, ItemVal)>,
Key: Eq + core::hash::Hash + core::fmt::Debug + Unpin,
Si: Sink<ItemVal> + Unpin,
Func: FnMut(&Key) -> Si + Unpin,
{
self.send_to(demux_map_lazy(func))
}
#[cfg(feature = "variadics")]
#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
fn demux_var<Sinks, ItemVal, Error>(
self,
sinks: Sinks,
) -> Self::Output<demux_var::DemuxVar<Sinks, Error>>
where
Self: Sized + SinkBuild<Item = (usize, ItemVal)>,
Sinks: SinkVariadic<ItemVal, Error>,
{
self.send_to(demux_var(sinks))
}
}
pub struct SinkBuilder<Item>(PhantomData<fn() -> Item>);
impl<Item> Default for SinkBuilder<Item> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<Item> SinkBuilder<Item> {
pub fn new() -> Self {
Self::default()
}
}
impl<Item> SinkBuild for SinkBuilder<Item> {
type Item = Item;
type Output<Next: Sink<Self::Item>> = Next;
fn send_to<Next>(self, next: Next) -> Self::Output<Next>
where
Next: Sink<Self::Item>,
{
next
}
}
pub trait ToSinkBuild {
fn iter_to_sink_build(self) -> send_iter::SendIterBuild<Self>
where
Self: Sized + Iterator,
{
send_iter::SendIterBuild { iter: self }
}
fn stream_to_sink_build(self) -> send_stream::SendStreamBuild<Self>
where
Self: Sized + Stream,
{
send_stream::SendStreamBuild { stream: self }
}
}
impl<T> ToSinkBuild for T {}
macro_rules! forward_sink {
(
$( $method:ident ),+
) => {
$(
fn $method(self: ::core::pin::Pin<&mut Self>, cx: &mut ::core::task::Context<'_>) -> ::core::task::Poll<::core::result::Result<(), Self::Error>> {
self.project().sink.$method(cx)
}
)+
}
}
use forward_sink;
macro_rules! ready_both {
($a:expr, $b:expr $(,)?) => {
if !matches!(
($a, $b),
(::core::task::Poll::Ready(()), ::core::task::Poll::Ready(())),
) {
return ::core::task::Poll::Pending;
}
};
}
use ready_both;
pub fn map<Func, In, Out, Si>(func: Func, sink: Si) -> Map<Si, Func>
where
Func: FnMut(In) -> Out,
Si: Sink<Out>,
{
Map::new(func, sink)
}
pub fn filter<Func, Item, Si>(func: Func, sink: Si) -> Filter<Si, Func>
where
Func: FnMut(&Item) -> bool,
Si: Sink<Item>,
{
Filter::new(func, sink)
}
pub fn filter_map<Func, In, Out, Si>(func: Func, sink: Si) -> FilterMap<Si, Func>
where
Func: FnMut(In) -> Option<Out>,
Si: Sink<Out>,
{
FilterMap::new(func, sink)
}
pub fn flat_map<Func, In, IntoIter, Si>(func: Func, sink: Si) -> FlatMap<Si, Func, IntoIter>
where
Func: FnMut(In) -> IntoIter,
IntoIter: IntoIterator,
Si: Sink<IntoIter::Item>,
{
FlatMap::new(func, sink)
}
pub fn flatten<IntoIter, Si>(sink: Si) -> Flatten<Si, IntoIter>
where
IntoIter: IntoIterator,
Si: Sink<IntoIter::Item>,
{
Flatten::new(sink)
}
pub fn inspect<Func, Item, Si>(func: Func, sink: Si) -> Inspect<Si, Func>
where
Func: FnMut(&Item),
Si: Sink<Item>,
{
Inspect::new(func, sink)
}
pub fn unzip<Si0, Si1, Item0, Item1>(sink0: Si0, sink1: Si1) -> Unzip<Si0, Si1>
where
Si0: Sink<Item0>,
Si1: Sink<Item1>,
Si0::Error: From<Si1::Error>,
{
Unzip::new(sink0, sink1)
}
pub fn for_each<Func, Item>(func: Func) -> ForEach<Func>
where
Func: FnMut(Item),
{
ForEach::new(func)
}
pub fn try_for_each<Func, Item, Error>(func: Func) -> TryForEach<Func>
where
Func: FnMut(Item) -> Result<(), Error>,
{
TryForEach::new(func)
}
pub fn send_iter<I, Si>(iter: I, sink: Si) -> SendIter<I::IntoIter, Si>
where
I: IntoIterator,
Si: Sink<I::Item>,
{
SendIter::new(iter.into_iter(), sink)
}
pub fn send_stream<St, Si>(stream: St, sink: Si) -> SendStream<St, Si>
where
St: Stream,
Si: Sink<St::Item>,
{
SendStream::new(stream, sink)
}