1use extism::{Manifest, Plugin, Wasm};
4use filter::{CACHED_FILTERS, PARSE_PARAMETERS_FUNCTION_NAME};
5use itertools::Itertools;
6use lib::{
7 KafkaRecord, SearchQuery, parse_search_query,
8 search::{
9 filter::{Filter, Parameter},
10 offset::FromOffset,
11 },
12};
13use std::{
14 collections::HashMap,
15 path::{Path, PathBuf},
16 sync::{LazyLock, Mutex},
17};
18use tracing::error;
19
20pub mod atom;
21pub mod compare;
22pub mod expression;
23pub mod filter;
24pub mod search_query;
25pub mod term;
26
27pub trait Search {
28 fn offset(&self) -> Option<FromOffset> {
30 None
31 }
32 fn matches(&self, context: &SearchContext) -> bool;
34
35 fn filters(&self) -> Vec<Filter>;
37}
38
39pub struct SearchContext<'a> {
42 pub record: &'a KafkaRecord,
44 pub filters: &'a LazyLock<Mutex<HashMap<String, Plugin>>>,
46 pub filters_directory: PathBuf,
48}
49
50impl SearchContext<'_> {
51 pub fn new<'a>(record: &'a KafkaRecord, filters_directory: &'a Path) -> SearchContext<'a> {
52 SearchContext {
53 record,
54 filters: &CACHED_FILTERS,
55 filters_directory: filters_directory.to_path_buf(),
56 }
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Default)]
61pub struct ValidSearchQuery(SearchQuery);
62
63impl ValidSearchQuery {
64 pub fn is_empty(&self) -> bool {
65 self.0.is_empty()
66 }
67
68 pub fn limit(&self) -> Option<usize> {
69 self.0.limit
70 }
71
72 pub fn query(&self) -> &SearchQuery {
73 &self.0
74 }
75}
76
77impl ValidSearchQuery {
78 pub fn from(input: &str, filters_directory: &Path) -> Result<Self, lib::Error> {
79 let query = parse_search_query(input).map_err(lib::Error::Search)?.1;
80 let filters = query.filters();
81 for filter in filters {
82 let name = filter.name;
83 let path = filters_directory.join(format!("{}.wasm", &name));
84 let url = Wasm::file(&path);
85 let manifest = Manifest::new([url]);
86 let mut filters = CACHED_FILTERS.lock().unwrap();
87 if !filters.contains_key(&name) {
88 match Plugin::new(manifest, [], true) {
89 Ok(plugin) => filters.insert(name.to_string(), plugin),
90 Err(err) => {
91 error!("No such file '{}': {}", path.display(), err);
92 return Err(lib::Error::Error(format!(
93 "Cannot find search filter '{name}' in {}: {}",
94 path.parent().unwrap().display(),
95 err
96 )));
97 }
98 };
99 }
100 let params = filter.parameters;
101 let wasm_module = &mut filters.get_mut(&name).unwrap();
102 if let Err(e) = wasm_module.call::<&str, &str>(
103 PARSE_PARAMETERS_FUNCTION_NAME,
104 &serde_json::to_string(¶ms.iter().map(Parameter::json).collect_vec()).unwrap(),
105 ) {
106 error!(
107 "Error when calling '{PARSE_PARAMETERS_FUNCTION_NAME}' from wasm module '{name}': {e:?}"
108 );
109 return Err(lib::Error::Error(format!("{}: {e}", &name)));
110 }
111 }
112
113 Ok(ValidSearchQuery(query))
114 }
115}
116
117impl Search for ValidSearchQuery {
118 fn offset(&self) -> Option<FromOffset> {
120 self.0.offset()
121 }
122
123 fn matches(&self, context: &SearchContext) -> bool {
124 self.0.matches(context)
125 }
126
127 fn filters(&self) -> Vec<Filter> {
128 self.0.filters()
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use lib::DataType;
135
136 use super::*;
137
138 #[test]
139 fn test_search_query_must_match() {
140 let filters_directory = PathBuf::from("tests/filters");
141 let input = "from begin";
142 let query = ValidSearchQuery::from(input, &filters_directory).unwrap();
143
144 let record = KafkaRecord {
145 key: DataType::String("".into()),
146 value: DataType::String("".into()),
147 ..Default::default()
148 };
149
150 let context = SearchContext::new(&record, &filters_directory);
151 assert!(query.matches(&context));
152 }
153
154 #[test]
155 fn unknown_search_filter() {
156 let filters_directory = PathBuf::from("tests/filters");
157 let input = "from begin my_filter()";
158 assert!(ValidSearchQuery::from(input, &filters_directory).is_err())
159 }
160
161 #[test]
162 #[ignore]
163 fn test_wasm_should_not_have_access_to_network() {
164 use tracing::Level;
165 testing_logger::setup();
166
167 let filters_directory = Path::new(env!("CARGO_MANIFEST_DIR"))
168 .join("tests")
169 .join("http_search_filter");
170 let input = "from begin module()";
171 let query = ValidSearchQuery::from(input, &filters_directory).unwrap();
172
173 let record = KafkaRecord {
174 key: DataType::String("".into()),
175 value: DataType::String("".into()),
176 ..Default::default()
177 };
178
179 let context = SearchContext::new(&record, &filters_directory);
180 assert!(!query.matches(&context));
181
182 testing_logger::validate(|captured_logs| {
183 let logs = captured_logs
184 .iter()
185 .filter(|c| c.level.to_string() == Level::ERROR.to_string())
186 .collect::<Vec<&testing_logger::CapturedLog>>();
187 assert_eq!(2, logs.len());
188 assert!(
189 logs[0]
190 .body
191 .contains("HTTP request to https://mcdostone.github.io/ is not allowed")
192 );
193 });
194 }
195
196 #[test]
197 fn test_matches_with_fine_grained_filter_on_json_field() {
198 use crate::search::filter::CACHED_FILTERS;
199 use lib::kafka::KafkaRecord;
200 use serde_json::json;
201 use std::path::PathBuf;
202
203 let filters_directory = PathBuf::from(".");
204
205 let query = ValidSearchQuery::from(
206 r#"from end - 10 value.myInteger == "42""#,
207 &filters_directory,
208 )
209 .unwrap();
210 let record = KafkaRecord {
211 topic: "test-topic".to_string(),
212 partition: 0,
213 offset: 42,
214 key: lib::DataType::String("key".to_string()),
215 value: lib::DataType::Json(json!({"myInteger": 42})),
216 timestamp: None,
217 headers: std::collections::BTreeMap::new(),
218 key_schema: None,
219 value_schema: None,
220 size: 12,
221 key_as_string: "key".to_string(),
222 value_as_string: "value".to_string(),
223 };
224 let context = SearchContext {
225 record: &record,
226 filters: &CACHED_FILTERS,
227 filters_directory,
228 };
229
230 assert!(query.matches(&context))
231 }
232}