effect-rs 0.1.0

A high-performance, strictly-typed, functional effect system for Rust.
Documentation
use crate::core::*;
use futures::stream::{BoxStream, StreamExt};
use std::sync::Arc;

/// A stream of effects.
/// R: Environment
/// E: Error
/// A: Item type
// Type alias for inner stream function
type StreamInner<R, E, A> = dyn Fn(EnvRef<R>, Ctx) -> BoxStream<'static, Exit<E, A>> + Send + Sync;

pub struct EffectStream<R, E, A> {
    pub(crate) inner: Arc<StreamInner<R, E, A>>,
}

impl<R, E, A> Clone for EffectStream<R, E, A> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<R, E, A> EffectStream<R, E, A>
where
    R: 'static + Send + Sync + Clone,
    E: 'static + Send + Sync + Clone,
    A: 'static + Send + Sync + Clone,
{
    /// Creates a stream from an iterator.
    #[allow(clippy::should_implement_trait)]
    pub fn from_iter<I>(iter: I) -> Self
    where
        I: IntoIterator<Item = A> + Send + Sync + 'static,
        I::IntoIter: Send,
    {
        // Capture iterator. Note: Iterator must be Send.
        // For simplicity, we convert to Vec first in this MVP to avoid lifetime issues with IntoIter in BoxStream
        let items: Vec<A> = iter.into_iter().collect();

        Self {
            inner: Arc::new(move |_, _| {
                let items = items.clone();
                futures::stream::iter(items.into_iter().map(Exit::Success)).boxed()
            }),
        }
    }

    /// Maps over elements of the stream.
    pub fn map<B, F>(self, f: F) -> EffectStream<R, E, B>
    where
        F: Fn(A) -> B + Send + Sync + 'static + Clone,
        B: Send + Sync + Clone + 'static,
    {
        EffectStream {
            inner: Arc::new(move |env, ctx| {
                let stream = (self.inner)(env, ctx);
                let f = f.clone();
                stream
                    .map(move |exit| match exit {
                        Exit::Success(a) => Exit::Success(f(a)),
                        Exit::Failure(c) => Exit::Failure(c),
                    })
                    .boxed()
            }),
        }
    }

    /// Runs the stream and collects all values into a Vec.
    pub fn run_collect(self) -> Effect<R, E, Vec<A>> {
        Effect {
            inner: Arc::new(move |env, ctx| {
                let stream = (self.inner)(env, ctx);
                Box::pin(async move {
                    let mut results = Vec::new();
                    let mut stream = stream;
                    while let Some(exit) = stream.next().await {
                        match exit {
                            Exit::Success(a) => results.push(a),
                            Exit::Failure(c) => return Exit::Failure(c),
                        }
                    }
                    Exit::Success(results)
                })
            }),
        }
    }
    pub fn merge(self, other: EffectStream<R, E, A>) -> EffectStream<R, E, A> {
        EffectStream {
            inner: Arc::new(move |env, ctx| {
                let s1 = (self.inner)(env.clone(), ctx.clone());
                let s2 = (other.inner)(env, ctx);
                futures::stream::select(s1, s2).boxed()
            }),
        }
    }

    pub fn map_par<B, F>(self, concurrency: usize, f: F) -> EffectStream<R, E, B>
    where
        F: Fn(A) -> Effect<R, E, B> + Send + Sync + 'static + Clone,
        B: Send + Sync + Clone + 'static,
        R: 'static + Send + Sync + Clone,
        E: 'static + Send + Sync + Clone,
    {
        EffectStream {
            inner: Arc::new(move |env, ctx| {
                let stream = (self.inner)(env.clone(), ctx.clone());
                let f = f.clone();
                let env = env.clone();
                let ctx = ctx.clone();

                stream
                    .map(move |exit| {
                        let f = f.clone();
                        let env = env.clone();
                        let ctx = ctx.clone();
                        async move {
                            match exit {
                                Exit::Success(a) => {
                                    let eff = f(a);
                                    // Now we can run it!
                                    (eff.inner)(env, ctx).await
                                }
                                Exit::Failure(c) => Exit::Failure(c),
                            }
                        }
                    })
                    .buffer_unordered(concurrency)
                    .boxed()
            }),
        }
    }

    // Attempting Ctx refactor later. For now, let's implement combinators that don't need Ctx.

    pub fn buffer(self, capacity: usize) -> EffectStream<R, E, A> {
        EffectStream {
            inner: Arc::new(move |env, ctx| {
                let mut stream = (self.inner)(env, ctx);
                let (tx, rx) = tokio::sync::mpsc::channel(capacity);

                // Spawn producer in background
                tokio::spawn(async move {
                    while let Some(item) = stream.next().await {
                        if tx.send(item).await.is_err() {
                            break; // Receiver dropped
                        }
                    }
                });

                // Receiver stream
                // We need to convert Receiver to Stream
                tokio_stream::wrappers::ReceiverStream::new(rx).boxed()
            }),
        }
    }

    pub fn take(self, n: usize) -> EffectStream<R, E, A> {
        EffectStream {
            inner: Arc::new(move |env, ctx| {
                let stream = (self.inner)(env, ctx);
                stream.take(n).boxed()
            }),
        }
    }

    pub fn filter<F>(self, f: F) -> EffectStream<R, E, A>
    where
        F: Fn(&A) -> bool + Send + Sync + 'static + Clone,
    {
        EffectStream {
            inner: Arc::new(move |env, ctx| {
                let stream = (self.inner)(env, ctx);
                let f = f.clone();
                stream
                    .filter_map(move |exit| {
                        let f = f.clone();
                        async move {
                            match exit {
                                Exit::Success(a) => {
                                    if f(&a) {
                                        Some(Exit::Success(a))
                                    } else {
                                        None
                                    }
                                }
                                Exit::Failure(c) => Some(Exit::Failure(c)), // Failure propagates
                            }
                        }
                    })
                    .boxed()
            }),
        }
    }
}