Skip to main content

ai_lib_rust/pipeline/
fan_out.rs

1use crate::pipeline::{PipelineError, Transform};
2use crate::protocol::CandidateConfig;
3use crate::{BoxStream, PipeResult};
4use futures::StreamExt;
5use serde_json::Value;
6
7/// FanOut replicates the stream or splits array elements into separate events
8pub struct FanOut {
9    config: CandidateConfig,
10}
11
12impl FanOut {
13    pub fn new(config: CandidateConfig) -> Self {
14        Self { config }
15    }
16}
17
18#[async_trait::async_trait]
19impl Transform for FanOut {
20    async fn transform(
21        &self,
22        input: BoxStream<'static, Value>,
23    ) -> PipeResult<BoxStream<'static, Value>> {
24        let fan_out_enabled = self.config.fan_out.unwrap_or(false);
25
26        if !fan_out_enabled {
27            return Ok(input);
28        }
29
30        // If FanOut is enabled, we assume the input might be an Array of candidates
31        // and we want to emit each candidate as a separate item in the stream.
32        // Or if it's an object, we pass it through.
33
34        // Note: Real fan-out in async streams often implies parallel request processing,
35        // but in the pipeline context, it usually means "One Event Frame -> Many Event Frames"
36
37        let stream = input.flat_map(|result| {
38            let res_vec: Vec<Result<Value, crate::Error>> = match result {
39                Ok(Value::Array(arr)) => arr.into_iter().map(Ok).collect(),
40                Ok(val) => vec![Ok(val)],
41                Err(e) => vec![Err(e)],
42            };
43            futures::stream::iter(res_vec)
44        });
45
46        Ok(Box::pin(stream))
47    }
48}
49
50pub fn create_fan_out(config: &CandidateConfig) -> Result<Box<dyn Transform>, PipelineError> {
51    Ok(Box::new(FanOut::new(config.clone())))
52}