Skip to main content

ai_lib_rust/pipeline/
select.rs

1use crate::pipeline::{PipelineError, Transform};
2use crate::utils::PathMapper;
3use crate::{BoxStream, PipeResult};
4use futures::StreamExt;
5use serde_json::Value;
6
7/// Selector filters the stream to include only relevant frames
8/// typically matching a specific JSON path (e.g., "choices.0.delta")
9pub struct Selector {
10    path: String,
11    evaluator: crate::utils::json_path::JsonPathEvaluator,
12}
13
14impl Selector {
15    pub fn new(path: String) -> Self {
16        let evaluator =
17            crate::utils::json_path::JsonPathEvaluator::new(&path).unwrap_or_else(|_| {
18                // Fallback for simple paths if evaluator creation fails
19                crate::utils::json_path::JsonPathEvaluator::new(&format!("exists({})", path))
20                    .unwrap()
21            });
22        Self { path, evaluator }
23    }
24}
25
26#[async_trait::async_trait]
27impl Transform for Selector {
28    async fn transform(
29        &self,
30        input: BoxStream<'static, Value>,
31    ) -> PipeResult<BoxStream<'static, Value>> {
32        let path = self.path.clone();
33        let evaluator = self.evaluator.clone();
34
35        let stream = input.filter_map(move |result| {
36            let path = path.clone();
37            let evaluator = evaluator.clone();
38            async move {
39                match result {
40                    Ok(value) => {
41                        // Apply selection logic
42                        // 1. If it's a condition, return whole frame if matches
43                        if path.contains("exists(")
44                            || path.contains("==")
45                            || path.contains("||")
46                            || path.contains("&&")
47                        {
48                            if evaluator.matches(&value) {
49                                return Some(Ok(value));
50                            } else {
51                                return None;
52                            }
53                        }
54
55                        // 2. Simple path selection
56                        PathMapper::get_path(&value, &path).cloned().map(Ok)
57                    }
58                    Err(e) => Some(Err(e)), // Propagate errors
59                }
60            }
61        });
62
63        Ok(Box::pin(stream))
64    }
65}
66
67pub fn create_selector(path: &str) -> Result<Box<dyn Transform>, PipelineError> {
68    Ok(Box::new(Selector::new(path.to_string())))
69}