ai_lib_rust/pipeline/
select.rs1use crate::pipeline::{PipelineError, Transform};
2use crate::utils::PathMapper;
3use crate::{BoxStream, PipeResult};
4use futures::StreamExt;
5use serde_json::Value;
6
7pub struct Selector {
10 path: String,
11 evaluator: crate::utils::json_path::JsonPathEvaluator,
12}
13
14impl Selector {
15 pub fn new(path: String) -> Self {
16 let evaluator =
17 crate::utils::json_path::JsonPathEvaluator::new(&path).unwrap_or_else(|_| {
18 crate::utils::json_path::JsonPathEvaluator::new(&format!("exists({})", path))
20 .unwrap()
21 });
22 Self { path, evaluator }
23 }
24}
25
26#[async_trait::async_trait]
27impl Transform for Selector {
28 async fn transform(
29 &self,
30 input: BoxStream<'static, Value>,
31 ) -> PipeResult<BoxStream<'static, Value>> {
32 let path = self.path.clone();
33 let evaluator = self.evaluator.clone();
34
35 let stream = input.filter_map(move |result| {
36 let path = path.clone();
37 let evaluator = evaluator.clone();
38 async move {
39 match result {
40 Ok(value) => {
41 if path.contains("exists(")
44 || path.contains("==")
45 || path.contains("||")
46 || path.contains("&&")
47 {
48 if evaluator.matches(&value) {
49 return Some(Ok(value));
50 } else {
51 return None;
52 }
53 }
54
55 PathMapper::get_path(&value, &path).cloned().map(Ok)
57 }
58 Err(e) => Some(Err(e)), }
60 }
61 });
62
63 Ok(Box::pin(stream))
64 }
65}
66
67pub fn create_selector(path: &str) -> Result<Box<dyn Transform>, PipelineError> {
68 Ok(Box::new(Selector::new(path.to_string())))
69}