x-pipe-rs 0.1.0

Composable recommendation/feed pipeline framework built on comp-cat-rs
Documentation
//! The Kleisli arrow type: the categorical heart of the pipeline.
//!
//! A [`Stage`] is a morphism in the Kleisli category of the
//! [`Io`] monad: a function `A -> Io<E, B>`.  Composition via
//! [`Stage::then`] is Kleisli composition (sequencing through
//! `flat_map`), which by `comp-cat-rs`'s collapse hierarchy is
//! justified as a pair of Kan extensions.
//!
//! [`Io`]: comp_cat_rs::effect::io::Io

use comp_cat_rs::effect::io::Io;

/// A pipeline stage: a Kleisli arrow `A -> Io<E, B>`.
///
/// Categorically, this is a morphism in the Kleisli category
/// of `Io<E, ->`.  [`Stage::then`] is Kleisli composition,
/// and [`Stage::identity`] is the identity morphism.
///
/// # Examples
///
/// ```
/// use x_pipe_rs::Stage;
/// use comp_cat_rs::effect::io::Io;
///
/// let double: Stage<std::convert::Infallible, i32, i32> =
///     Stage::new(|x| Io::pure(x * 2));
/// let add_one: Stage<std::convert::Infallible, i32, i32> =
///     Stage::new(|x| Io::pure(x + 1));
///
/// let combined = double.then(add_one);
/// let result = combined.apply(5).run();
/// assert_eq!(result, Ok(11));
/// ```
#[must_use]
pub struct Stage<E, A, B> {
    run: Box<dyn FnOnce(A) -> Io<E, B> + Send>,
}

impl<E: Send + 'static, A: Send + 'static, B: Send + 'static> Stage<E, A, B> {
    /// Construct a stage from a Kleisli arrow.
    pub fn new(f: impl FnOnce(A) -> Io<E, B> + Send + 'static) -> Self {
        Self { run: Box::new(f) }
    }

    /// Apply this stage to an input, producing a lazy [`Io`].
    ///
    /// The returned `Io` is not executed until `.run()` is called
    /// at the boundary.
    ///
    /// [`Io`]: comp_cat_rs::effect::io::Io
    pub fn apply(self, input: A) -> Io<E, B> {
        (self.run)(input)
    }

    /// Kleisli composition: run this stage, then the next.
    ///
    /// Categorically, this is composition in the Kleisli category:
    /// given `f: A -> Io<E, B>` and `g: B -> Io<E, C>`, produce
    /// `g . f: A -> Io<E, C>`.
    pub fn then<C: Send + 'static>(self, next: Stage<E, B, C>) -> Stage<E, A, C> {
        Stage::new(move |a| self.apply(a).flat_map(move |b| next.apply(b)))
    }

    /// Map a function over this stage's output.
    pub fn map_output<C: Send + 'static>(
        self,
        f: impl FnOnce(B) -> C + Send + 'static,
    ) -> Stage<E, A, C> {
        Stage::new(move |a| self.apply(a).map(f))
    }
}

#[allow(clippy::mismatching_type_param_order)]
impl<E: Send + 'static, A: Send + 'static> Stage<E, A, A> {
    /// The identity stage: passes input through unchanged.
    ///
    /// Categorically, this is the identity morphism in the
    /// Kleisli category.
    pub fn identity() -> Self {
        Self::new(Io::pure)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn identity_passes_through() {
        let stage: Stage<std::convert::Infallible, i32, i32> = Stage::identity();
        let result = stage.apply(42).run();
        assert_eq!(result, Ok(42));
    }

    #[test]
    fn then_composes_sequentially() {
        let double: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x * 2));
        let add_one: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x + 1));

        let combined = double.then(add_one);
        let result = combined.apply(5).run();
        assert_eq!(result, Ok(11));
    }

    #[test]
    fn left_identity_law() {
        let f: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x * 3));

        let via_id = Stage::identity().then(Stage::new(|x| Io::pure(x * 3)));
        assert_eq!(f.apply(7).run(), via_id.apply(7).run());
    }

    #[test]
    fn right_identity_law() {
        let f: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x * 3));

        let via_id = Stage::<std::convert::Infallible, i32, i32>::new(|x| Io::pure(x * 3))
            .then(Stage::identity());
        assert_eq!(f.apply(7).run(), via_id.apply(7).run());
    }

    #[test]
    fn associativity_law() {
        let f: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x + 1));
        let g: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x * 2));
        let h: Stage<std::convert::Infallible, i32, i32> =
            Stage::new(|x| Io::pure(x - 3));

        // (f.then(g)).then(h)
        let left = Stage::<std::convert::Infallible, i32, i32>::new(|x| Io::pure(x + 1))
            .then(Stage::new(|x| Io::pure(x * 2)))
            .then(Stage::new(|x| Io::pure(x - 3)));

        // f.then(g.then(h))
        let right = Stage::<std::convert::Infallible, i32, i32>::new(|x| Io::pure(x + 1))
            .then(
                Stage::<std::convert::Infallible, i32, i32>::new(|x| Io::pure(x * 2))
                    .then(Stage::new(|x| Io::pure(x - 3))),
            );

        let input = 10;
        assert_eq!(left.apply(input).run(), right.apply(input).run());

        // Also verify the intermediate values don't alias
        let _ = f;
        let _ = g;
        let _ = h;
    }

    #[test]
    fn map_output_transforms_result() {
        let stage: Stage<std::convert::Infallible, i32, String> =
            Stage::<std::convert::Infallible, i32, i32>::new(|x| Io::pure(x * 2))
                .map_output(|x| x.to_string());

        let result = stage.apply(5).run();
        assert_eq!(result, Ok("10".to_string()));
    }
}