x-pipe-rs 0.1.0

Composable recommendation/feed pipeline framework built on comp-cat-rs
Documentation
//! Candidate sources.
//!
//! A [`Source`] produces candidate items given a query.
//! Multiple sources can run in parallel via [`fan_out_2`],
//! [`fan_out_3`], and [`fan_out_4`], which use
//! [`par_zip`] from `comp-cat-rs` for concurrent execution.
//!
//! [`par_zip`]: comp_cat_rs::effect::fiber::par_zip

use comp_cat_rs::effect::fiber::{par_zip, FiberError};
use comp_cat_rs::effect::io::Io;

use crate::stage::Stage;

/// A source of candidate items.
///
/// Given a query of type `Q`, produces a `Vec<I>` of candidates
/// wrapped in [`Io`] for lazy execution.
///
/// [`Io`]: comp_cat_rs::effect::io::Io
pub trait Source<Q, I, E> {
    /// Produce candidates for a query.
    fn candidates(&self, query: &Q) -> Io<E, Vec<I>>;
}

/// Convert a [`Source`] into a [`Stage`] from `Q` to `Vec<I>`.
///
/// The source is called with a reference to the query, so the
/// query must be `Clone` to move into the stage closure.
pub fn source_stage<S, Q, I, E>(source: S) -> Stage<E, Q, Vec<I>>
where
    S: Source<Q, I, E> + Send + 'static,
    Q: Clone + Send + 'static,
    I: Send + 'static,
    E: Send + 'static,
{
    Stage::new(move |q: Q| source.candidates(&q))
}

/// Fan out to two sources in parallel, merging results.
///
/// Uses [`par_zip`] to run both sources concurrently on
/// separate OS threads.
///
/// [`par_zip`]: comp_cat_rs::effect::fiber::par_zip
#[must_use]
pub fn fan_out_2<S1, S2, Q, I, E>(
    s1: &S1,
    s2: &S2,
    query: &Q,
) -> Io<FiberError<E>, Vec<I>>
where
    S1: Source<Q, I, E>,
    S2: Source<Q, I, E>,
    E: Send + 'static,
    I: Send + 'static,
{
    let io1 = s1.candidates(query);
    let io2 = s2.candidates(query);
    par_zip(io1, io2).map(|(a, b)| {
        a.into_iter().chain(b).collect()
    })
}

/// Fan out to three sources in parallel, merging results.
#[must_use]
pub fn fan_out_3<S1, S2, S3, Q, I, E>(
    s1: &S1,
    s2: &S2,
    s3: &S3,
    query: &Q,
) -> Io<FiberError<FiberError<E>>, Vec<I>>
where
    S1: Source<Q, I, E>,
    S2: Source<Q, I, E>,
    S3: Source<Q, I, E>,
    E: Send + 'static,
    I: Send + 'static,
    FiberError<E>: Send,
{
    let io12 = fan_out_2(s1, s2, query);
    let io3 = s3.candidates(query).map_error(FiberError::Failed);
    par_zip(io12, io3).map(|(ab, c)| {
        ab.into_iter().chain(c).collect()
    })
}

/// Fan out to four sources in parallel, merging results.
#[must_use]
pub fn fan_out_4<S1, S2, S3, S4, Q, I, E>(
    s1: &S1,
    s2: &S2,
    s3: &S3,
    s4: &S4,
    query: &Q,
) -> Io<FiberError<FiberError<E>>, Vec<I>>
where
    S1: Source<Q, I, E>,
    S2: Source<Q, I, E>,
    S3: Source<Q, I, E>,
    S4: Source<Q, I, E>,
    E: Send + 'static,
    I: Send + 'static,
    FiberError<E>: Send,
{
    let io12 = fan_out_2(s1, s2, query);
    let io34 = fan_out_2(s3, s4, query);
    par_zip(io12, io34).map(|(ab, cd)| {
        ab.into_iter().chain(cd).collect()
    })
}