x-pipe-rs 0.1.0

Composable recommendation/feed pipeline framework built on comp-cat-rs
Documentation
//! Candidate filtering: removing items that fail criteria.
//!
//! A [`Filter`] is a predicate over candidates, wrapped in
//! [`Io`] to allow effectful checks (e.g. database lookups).
//!
//! [`Io`]: comp_cat_rs::effect::io::Io

use std::sync::Arc;

use comp_cat_rs::effect::io::Io;

use crate::stage::Stage;

/// A filter that removes candidates not meeting criteria.
///
/// Returns `true` to keep the item, `false` to drop it.
/// The check is wrapped in [`Io`] to allow effectful predicates.
///
/// [`Io`]: comp_cat_rs::effect::io::Io
pub trait Filter<I, E> {
    /// Returns `true` if the candidate should be kept.
    fn keep(&self, item: &I) -> Io<E, bool>;
}

/// Convert a [`Filter`] into a [`Stage`] over `Vec`.
///
/// Items for which `keep` returns `false` are dropped.
/// The filter is shared via `Arc` across fold iterations.
pub fn filter_stage<F, I, E>(filter: F) -> Stage<E, Vec<I>, Vec<I>>
where
    F: Filter<I, E> + Send + Sync + 'static,
    I: Send + 'static,
    E: Send + 'static,
{
    let filter = Arc::new(filter);
    Stage::new(move |items: Vec<I>| {
        items.into_iter().fold(Io::pure(Vec::new()), |acc_io, item| {
            let f = Arc::clone(&filter);
            acc_io.flat_map(move |acc| {
                f.keep(&item).map(move |keep| {
                    if keep {
                        acc.into_iter().chain(std::iter::once(item)).collect()
                    } else {
                        acc
                    }
                })
            })
        })
    })
}