Skip to main content

yozefu_app/search/
mod.rs

1//! Module implementing the search logic
2
3use extism::{Manifest, Plugin, Wasm};
4use filter::{CACHED_FILTERS, PARSE_PARAMETERS_FUNCTION_NAME};
5use itertools::Itertools;
6use lib::{
7    KafkaRecord, SearchQuery, parse_search_query,
8    search::{
9        filter::{Filter, Parameter},
10        offset::FromOffset,
11    },
12};
13use std::{
14    collections::HashMap,
15    path::{Path, PathBuf},
16    sync::{LazyLock, Mutex},
17};
18use tracing::error;
19
20pub mod atom;
21pub mod compare;
22pub mod expression;
23pub mod filter;
24pub mod search_query;
25pub mod term;
26
27pub trait Search {
28    /// Returns the offset from which the search should start.
29    fn offset(&self) -> Option<FromOffset> {
30        None
31    }
32    /// returns `true` if the record matches the search query.
33    fn matches(&self, context: &SearchContext) -> bool;
34
35    /// Returns the search filters that are used in the search query.
36    fn filters(&self) -> Vec<Filter>;
37}
38
39/// Struct that holds the context of the search.
40/// It contains the record that is being searched and the loaded search filters.
41pub struct SearchContext<'a> {
42    /// The record that is being searched.
43    pub record: &'a KafkaRecord,
44    /// The search filters that are already loaded in memory.
45    pub filters: &'a LazyLock<Mutex<HashMap<String, Plugin>>>,
46    /// The directory containing the search filters
47    pub filters_directory: PathBuf,
48}
49
50impl SearchContext<'_> {
51    pub fn new<'a>(record: &'a KafkaRecord, filters_directory: &'a Path) -> SearchContext<'a> {
52        SearchContext {
53            record,
54            filters: &CACHED_FILTERS,
55            filters_directory: filters_directory.to_path_buf(),
56        }
57    }
58}
59
60#[derive(Debug, Clone, PartialEq, Default)]
61pub struct ValidSearchQuery(SearchQuery);
62
63impl ValidSearchQuery {
64    pub fn is_empty(&self) -> bool {
65        self.0.is_empty()
66    }
67
68    pub fn limit(&self) -> Option<usize> {
69        self.0.limit
70    }
71
72    pub fn query(&self) -> &SearchQuery {
73        &self.0
74    }
75}
76
77impl ValidSearchQuery {
78    pub fn from(input: &str, filters_directory: &Path) -> Result<Self, lib::Error> {
79        let query = parse_search_query(input).map_err(lib::Error::Search)?.1;
80        let filters = query.filters();
81        for filter in filters {
82            let name = filter.name;
83            let path = filters_directory.join(format!("{}.wasm", &name));
84            let url = Wasm::file(&path);
85            let manifest = Manifest::new([url]);
86            let mut filters = CACHED_FILTERS.lock().unwrap();
87            if !filters.contains_key(&name) {
88                match Plugin::new(manifest, [], true) {
89                    Ok(plugin) => filters.insert(name.to_string(), plugin),
90                    Err(err) => {
91                        error!("No such file '{}': {}", path.display(), err);
92                        return Err(lib::Error::Error(format!(
93                            "Cannot find search filter '{name}' in {}: {}",
94                            path.parent().unwrap().display(),
95                            err
96                        )));
97                    }
98                };
99            }
100            let params = filter.parameters;
101            let wasm_module = &mut filters.get_mut(&name).unwrap();
102            if let Err(e) = wasm_module.call::<&str, &str>(
103                PARSE_PARAMETERS_FUNCTION_NAME,
104                &serde_json::to_string(&params.iter().map(Parameter::json).collect_vec()).unwrap(),
105            ) {
106                error!(
107                    "Error when calling '{PARSE_PARAMETERS_FUNCTION_NAME}' from wasm module '{name}': {e:?}"
108                );
109                return Err(lib::Error::Error(format!("{}: {e}", &name)));
110            }
111        }
112
113        Ok(ValidSearchQuery(query))
114    }
115}
116
117impl Search for ValidSearchQuery {
118    /// Returns the offset from which the search should start.
119    fn offset(&self) -> Option<FromOffset> {
120        self.0.offset()
121    }
122
123    fn matches(&self, context: &SearchContext) -> bool {
124        self.0.matches(context)
125    }
126
127    fn filters(&self) -> Vec<Filter> {
128        self.0.filters()
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use lib::DataType;
135
136    use super::*;
137
138    #[test]
139    fn test_search_query_must_match() {
140        let filters_directory = PathBuf::from("tests/filters");
141        let input = "from begin";
142        let query = ValidSearchQuery::from(input, &filters_directory).unwrap();
143
144        let record = KafkaRecord {
145            key: DataType::String("".into()),
146            value: DataType::String("".into()),
147            ..Default::default()
148        };
149
150        let context = SearchContext::new(&record, &filters_directory);
151        assert!(query.matches(&context));
152    }
153
154    #[test]
155    fn unknown_search_filter() {
156        let filters_directory = PathBuf::from("tests/filters");
157        let input = "from begin my_filter()";
158        assert!(ValidSearchQuery::from(input, &filters_directory).is_err())
159    }
160
161    #[test]
162    #[ignore]
163    fn test_wasm_should_not_have_access_to_network() {
164        use tracing::Level;
165        testing_logger::setup();
166
167        let filters_directory = Path::new(env!("CARGO_MANIFEST_DIR"))
168            .join("tests")
169            .join("http_search_filter");
170        let input = "from begin module()";
171        let query = ValidSearchQuery::from(input, &filters_directory).unwrap();
172
173        let record = KafkaRecord {
174            key: DataType::String("".into()),
175            value: DataType::String("".into()),
176            ..Default::default()
177        };
178
179        let context = SearchContext::new(&record, &filters_directory);
180        assert!(!query.matches(&context));
181
182        testing_logger::validate(|captured_logs| {
183            let logs = captured_logs
184                .iter()
185                .filter(|c| c.level.to_string() == Level::ERROR.to_string())
186                .collect::<Vec<&testing_logger::CapturedLog>>();
187            assert_eq!(2, logs.len());
188            assert!(
189                logs[0]
190                    .body
191                    .contains("HTTP request to https://mcdostone.github.io/ is not allowed")
192            );
193        });
194    }
195
196    #[test]
197    fn test_matches_with_fine_grained_filter_on_json_field() {
198        use crate::search::filter::CACHED_FILTERS;
199        use lib::kafka::KafkaRecord;
200        use serde_json::json;
201        use std::path::PathBuf;
202
203        let filters_directory = PathBuf::from(".");
204
205        let query = ValidSearchQuery::from(
206            r#"from end - 10 value.myInteger == "42""#,
207            &filters_directory,
208        )
209        .unwrap();
210        let record = KafkaRecord {
211            topic: "test-topic".to_string(),
212            partition: 0,
213            offset: 42,
214            key: lib::DataType::String("key".to_string()),
215            value: lib::DataType::Json(json!({"myInteger": 42})),
216            timestamp: None,
217            headers: std::collections::BTreeMap::new(),
218            key_schema: None,
219            value_schema: None,
220            size: 12,
221            key_as_string: "key".to_string(),
222            value_as_string: "value".to_string(),
223        };
224        let context = SearchContext {
225            record: &record,
226            filters: &CACHED_FILTERS,
227            filters_directory,
228        };
229
230        assert!(query.matches(&context))
231    }
232}