use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use crate::effect::sink::SinkEffect;
use crate::effect::Effect;
type BoxedSinkFn<T, E, Env, Item> = Box<
dyn FnOnce(Env) -> Pin<Box<dyn Future<Output = (Result<T, E>, Vec<Item>)> + Send + 'static>>
+ Send
+ 'static,
>;
pub struct BoxedSinkEffect<T, E, Env, Item>
where
Env: Clone + Send + Sync + 'static,
Item: Send + 'static,
{
inner: BoxedSinkFn<T, E, Env, Item>,
}
impl<T, E, Env, Item> std::fmt::Debug for BoxedSinkEffect<T, E, Env, Item>
where
Env: Clone + Send + Sync + 'static,
Item: Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BoxedSinkEffect").finish_non_exhaustive()
}
}
impl<T, E, Env, Item> BoxedSinkEffect<T, E, Env, Item>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
Item: Send + 'static,
{
pub fn new<Eff>(effect: Eff) -> Self
where
Eff: SinkEffect<Output = T, Error = E, Env = Env, Item = Item> + Send + 'static,
{
BoxedSinkEffect {
inner: Box::new(move |env: Env| {
Box::pin(async move {
let collected: Arc<Mutex<Vec<Item>>> = Arc::new(Mutex::new(Vec::new()));
let collected_clone = Arc::clone(&collected);
let result = effect
.run_with_sink(&env, move |item| {
let collected = Arc::clone(&collected_clone);
async move {
collected.lock().expect("mutex poisoned").push(item);
}
})
.await;
let items = Arc::try_unwrap(collected)
.ok()
.expect("collected should be unique")
.into_inner()
.expect("mutex poisoned");
(result, items)
})
}),
}
}
}
impl<T, E, Env, Item> Effect for BoxedSinkEffect<T, E, Env, Item>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
Item: Send + 'static,
{
type Output = T;
type Error = E;
type Env = Env;
async fn run(self, env: &Self::Env) -> Result<Self::Output, Self::Error> {
let (result, _items) = (self.inner)(env.clone()).await;
result
}
}
impl<T, E, Env, Item> SinkEffect for BoxedSinkEffect<T, E, Env, Item>
where
T: Send + 'static,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
Item: Send + 'static,
{
type Item = Item;
async fn run_with_sink<S, Fut>(
self,
env: &Self::Env,
sink: S,
) -> Result<Self::Output, Self::Error>
where
S: Fn(Self::Item) -> Fut + Send + Sync,
Fut: Future<Output = ()> + Send,
{
let (result, items) = (self.inner)(env.clone()).await;
for item in items {
sink(item).await;
}
result
}
}