x-pipe-rs 0.1.0

Composable recommendation/feed pipeline framework built on comp-cat-rs
Documentation
//! Candidate hydration: enriching items with additional data.
//!
//! A [`Hydrator`] transforms items from type `I` to type `J`,
//! typically by fetching additional data (e.g. full text,
//! metadata, embeddings).

use std::sync::Arc;

use comp_cat_rs::effect::io::Io;

use crate::stage::Stage;

/// Enriches a candidate item with additional data.
///
/// The input type `I` and output type `J` may differ, allowing
/// hydration to add fields by producing a richer type.
pub trait Hydrator<I, J, E> {
    /// Enrich a single candidate.
    fn hydrate(&self, item: I) -> Io<E, J>;
}

/// Convert a [`Hydrator`] into a [`Stage`] over `Vec`.
///
/// Each item is hydrated sequentially via fold over `flat_map`.
/// The hydrator is shared via `Arc` across fold iterations.
pub fn hydrator_stage<H, I, J, E>(hydrator: H) -> Stage<E, Vec<I>, Vec<J>>
where
    H: Hydrator<I, J, E> + Send + Sync + 'static,
    I: Send + 'static,
    J: Send + 'static,
    E: Send + 'static,
{
    let hydrator = Arc::new(hydrator);
    Stage::new(move |items: Vec<I>| {
        items.into_iter().fold(Io::pure(Vec::new()), |acc_io, item| {
            let h = Arc::clone(&hydrator);
            acc_io.flat_map(move |acc| {
                h.hydrate(item).map(move |j| {
                    acc.into_iter().chain(std::iter::once(j)).collect()
                })
            })
        })
    })
}