x-pipe-rs 0.1.0

Composable recommendation/feed pipeline framework built on comp-cat-rs
Documentation
//! Pipeline composition: wiring stages into a complete pipeline.
//!
//! A [`Pipeline`] composes Source -> Hydrate -> Filter -> Score ->
//! Select into a single [`Stage`] from query to scored results.
//! Construction uses [`from_source`] and method chaining.
//!
//! # Examples
//!
//! ```
//! use x_pipe_rs::pipeline::Pipeline;
//! use x_pipe_rs::score::Score;
//! use x_pipe_rs::selector::{TopNSelector, Budget, MaxItems};
//! use comp_cat_rs::effect::io::Io;
//!
//! // A trivial scorer for demonstration
//! struct LengthScorer;
//! impl x_pipe_rs::scorer::Scorer<String, std::convert::Infallible> for LengthScorer {
//!     fn score(&self, item: &String) -> Io<std::convert::Infallible, Score> {
//!         // len() is always finite, so Score::new never returns None
//!         Io::pure(Score::new(item.len() as f64).unwrap_or(Score::zero()))
//!     }
//! }
//!
//! // A trivial source
//! struct FixedSource;
//! impl x_pipe_rs::source::Source<(), String, std::convert::Infallible> for FixedSource {
//!     fn candidates(&self, _q: &()) -> Io<std::convert::Infallible, Vec<String>> {
//!         Io::pure(vec!["short".into(), "a longer string".into(), "mid".into()])
//!     }
//! }
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let pipeline = Pipeline::from_source(FixedSource)
//!     .score(LengthScorer)
//!     .select(TopNSelector, Budget::new(MaxItems::new(2)));
//!
//! let results = pipeline.execute(()).run().map_err(|_| "infallible")?;
//! assert_eq!(results.len(), 2);
//! # Ok(())
//! # }
//! ```

use comp_cat_rs::effect::io::Io;

use crate::filter::{self, Filter};
use crate::hydrator::{self, Hydrator};
use crate::score::ScoredCandidate;
use crate::scorer::{self, Scorer};
use crate::selector::{self, Budget, Selector};
use crate::side_effect::{self, SideEffect};
use crate::source::Source;
use crate::stage::Stage;

/// A pipeline that has been sourced but not yet scored.
///
/// At this point the pipeline is `Stage<E, Q, Vec<I>>`: it
/// produces unscored candidates.
#[must_use]
pub struct UnscoredPipeline<E, Q, I> {
    stage: Stage<E, Q, Vec<I>>,
}

impl<E: Send + 'static, Q: Clone + Send + 'static, I: Send + 'static>
    UnscoredPipeline<E, Q, I>
{
    /// Add a hydration stage, transforming items from `I` to `J`.
    pub fn hydrate<J: Send + 'static, H>(self, hydrator: H) -> UnscoredPipeline<E, Q, J>
    where
        H: Hydrator<I, J, E> + Send + Sync + 'static,
    {
        UnscoredPipeline {
            stage: self.stage.then(hydrator::hydrator_stage(hydrator)),
        }
    }

    /// Add a filter stage.
    pub fn filter<F>(self, filter: F) -> Self
    where
        F: Filter<I, E> + Send + Sync + 'static,
    {
        UnscoredPipeline {
            stage: self.stage.then(filter::filter_stage(filter)),
        }
    }

    /// Add a side-effect tap (logging, metrics) before scoring.
    pub fn tap<S>(self, effect: S) -> Self
    where
        S: SideEffect<I, E> + Send + 'static,
    {
        UnscoredPipeline {
            stage: self.stage.then(side_effect::side_effect_stage(effect)),
        }
    }

    /// Add a scoring stage, producing scored candidates.
    pub fn score<S>(self, s: S) -> ScoredPipeline<E, Q, I>
    where
        S: Scorer<I, E> + Send + Sync + 'static,
    {
        ScoredPipeline {
            stage: self.stage.then(scorer::scorer_stage(s)),
        }
    }
}

/// A pipeline that has been scored but not yet selected.
///
/// At this point the pipeline is `Stage<E, Q, Vec<ScoredCandidate<I>>>`.
#[must_use]
pub struct ScoredPipeline<E, Q, I> {
    stage: Stage<E, Q, Vec<ScoredCandidate<I>>>,
}

impl<E: Send + 'static, Q: Send + 'static, I: Send + 'static> ScoredPipeline<E, Q, I> {
    /// Add a selection stage with a budget, producing the final pipeline.
    pub fn select<S>(self, s: S, budget: Budget) -> Pipeline<E, Q, I>
    where
        S: Selector<I, E> + Send + 'static,
    {
        Pipeline {
            stage: self.stage.then(selector::selector_stage(s, budget)),
        }
    }
}

/// A complete pipeline from query to scored results.
///
/// Built by chaining [`Pipeline::from_source`] with hydration,
/// filtering, scoring, and selection stages.  Execute with
/// [`Pipeline::execute`], which returns a lazy [`Io`] that
/// runs only when `.run()` is called at the boundary.
///
/// [`Io`]: comp_cat_rs::effect::io::Io
#[must_use]
pub struct Pipeline<E, Q, I> {
    stage: Stage<E, Q, Vec<ScoredCandidate<I>>>,
}

impl<E: Send + 'static, Q: Clone + Send + 'static, I: Send + 'static> Pipeline<E, Q, I> {
    /// Start a pipeline from a single source.
    pub fn from_source<S>(source: S) -> UnscoredPipeline<E, Q, I>
    where
        S: Source<Q, I, E> + Send + 'static,
    {
        UnscoredPipeline {
            stage: crate::source::source_stage(source),
        }
    }

    /// Start a pipeline from a pre-built source stage.
    ///
    /// Use this with [`fan_out_2`], [`fan_out_3`], etc. to start
    /// from multiple parallel sources.
    ///
    /// [`fan_out_2`]: crate::source::fan_out_2
    /// [`fan_out_3`]: crate::source::fan_out_3
    pub fn from_stage(stage: Stage<E, Q, Vec<I>>) -> UnscoredPipeline<E, Q, I> {
        UnscoredPipeline { stage }
    }
}

impl<E: Send + 'static, Q: Send + 'static, I: Send + 'static> Pipeline<E, Q, I> {
    /// Execute the pipeline for a query.
    ///
    /// Returns a lazy [`Io`] that is not evaluated until `.run()`
    /// is called.  This is the boundary where effects happen.
    ///
    /// [`Io`]: comp_cat_rs::effect::io::Io
    pub fn execute(self, query: Q) -> Io<E, Vec<ScoredCandidate<I>>> {
        self.stage.apply(query)
    }
}