ai_lib_rust/pipeline/
fan_out.rs1use crate::pipeline::{PipelineError, Transform};
2use crate::protocol::CandidateConfig;
3use crate::{BoxStream, PipeResult};
4use futures::StreamExt;
5use serde_json::Value;
6
7pub struct FanOut {
9 config: CandidateConfig,
10}
11
12impl FanOut {
13 pub fn new(config: CandidateConfig) -> Self {
14 Self { config }
15 }
16}
17
18#[async_trait::async_trait]
19impl Transform for FanOut {
20 async fn transform(
21 &self,
22 input: BoxStream<'static, Value>,
23 ) -> PipeResult<BoxStream<'static, Value>> {
24 let fan_out_enabled = self.config.fan_out.unwrap_or(false);
25
26 if !fan_out_enabled {
27 return Ok(input);
28 }
29
30 let stream = input.flat_map(|result| {
38 let res_vec: Vec<Result<Value, crate::Error>> = match result {
39 Ok(Value::Array(arr)) => arr.into_iter().map(Ok).collect(),
40 Ok(val) => vec![Ok(val)],
41 Err(e) => vec![Err(e)],
42 };
43 futures::stream::iter(res_vec)
44 });
45
46 Ok(Box::pin(stream))
47 }
48}
49
50pub fn create_fan_out(config: &CandidateConfig) -> Result<Box<dyn Transform>, PipelineError> {
51 Ok(Box::new(FanOut::new(config.clone())))
52}