use std::sync::{Arc, Mutex};
use crate::effect::sink::and_then::SinkAndThen;
use crate::effect::sink::boxed::BoxedSinkEffect;
use crate::effect::sink::map::SinkMap;
use crate::effect::sink::map_err::SinkMapErr;
use crate::effect::sink::or_else::SinkOrElse;
use crate::effect::sink::tap_emit::TapEmit;
use crate::effect::sink::zip::SinkZip;
use crate::effect::sink::SinkEffect;
pub trait SinkEffectExt: SinkEffect {
fn and_then<F, E2>(self, f: F) -> SinkAndThen<Self, F>
where
Self: Sized,
E2: SinkEffect<Error = Self::Error, Env = Self::Env, Item = Self::Item>,
F: FnOnce(Self::Output) -> E2 + Send,
{
SinkAndThen { inner: self, f }
}
fn map<F, U>(self, f: F) -> SinkMap<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output) -> U + Send,
U: Send,
{
SinkMap { inner: self, f }
}
fn map_err<F, E2>(self, f: F) -> SinkMapErr<Self, F>
where
Self: Sized,
F: FnOnce(Self::Error) -> E2 + Send,
E2: Send,
{
SinkMapErr { inner: self, f }
}
fn or_else<F, E2>(self, f: F) -> SinkOrElse<Self, F>
where
Self: Sized,
E2: SinkEffect<Output = Self::Output, Env = Self::Env, Item = Self::Item>,
F: FnOnce(Self::Error) -> E2 + Send,
{
SinkOrElse { inner: self, f }
}
fn zip<E2>(self, other: E2) -> SinkZip<Self, E2>
where
Self: Sized,
E2: SinkEffect<Error = Self::Error, Env = Self::Env, Item = Self::Item>,
{
SinkZip {
left: self,
right: other,
}
}
fn tap_emit<F>(self, f: F) -> TapEmit<Self, F>
where
Self: Sized,
Self::Output: Clone + Send,
F: FnOnce(&Self::Output) -> Self::Item + Send,
{
TapEmit { inner: self, f }
}
#[allow(async_fn_in_trait)]
async fn run_collecting(
self,
env: &Self::Env,
) -> (Result<Self::Output, Self::Error>, Vec<Self::Item>)
where
Self: Sized,
Self::Item: Send + 'static,
{
let collected: Arc<Mutex<Vec<Self::Item>>> = Arc::new(Mutex::new(Vec::new()));
let collected_clone = Arc::clone(&collected);
let result = self
.run_with_sink(env, move |item| {
let collected = Arc::clone(&collected_clone);
async move {
collected.lock().expect("mutex poisoned").push(item);
}
})
.await;
let items = Arc::try_unwrap(collected)
.ok()
.expect("sink should be dropped")
.into_inner()
.expect("mutex poisoned");
(result, items)
}
#[allow(async_fn_in_trait)]
async fn run_ignore_emissions(self, env: &Self::Env) -> Result<Self::Output, Self::Error>
where
Self: Sized,
{
self.run_with_sink(env, |_| async {}).await
}
fn boxed_sink(self) -> BoxedSinkEffect<Self::Output, Self::Error, Self::Env, Self::Item>
where
Self: Sized + Send + 'static,
Self::Output: Send + 'static,
Self::Error: Send + 'static,
Self::Env: Clone + Send + Sync + 'static,
Self::Item: Send + 'static,
{
BoxedSinkEffect::new(self)
}
}
impl<E: SinkEffect> SinkEffectExt for E {}