1use crate::flow::{Flow, Layer};
4use crate::modification::FlowQuery;
5
6#[derive(Debug, Clone, Default)]
8pub struct FlowFilter {
9 pub query: FlowQuery,
10 pub text_tokens: Vec<String>,
11}
12
13pub fn flow_matches_query(flow: &Flow, query: &FlowQuery) -> bool {
15 let (host_str, path_str, method_str, status, is_ws) = flow_layer_fields(flow);
16
17 if let Some(h) = &query.host
18 && !host_str
19 .to_ascii_lowercase()
20 .contains(&h.to_ascii_lowercase())
21 {
22 return false;
23 }
24 if let Some(p) = &query.path_contains
25 && !path_str
26 .to_ascii_lowercase()
27 .contains(&p.to_ascii_lowercase())
28 {
29 return false;
30 }
31 if let Some(m) = &query.method
32 && !method_str.eq_ignore_ascii_case(m)
33 {
34 return false;
35 }
36 if let Some(min) = query.status_min
37 && status.is_none_or(|s| s < min)
38 {
39 return false;
40 }
41 if let Some(max) = query.status_max
42 && status.is_none_or(|s| s > max)
43 {
44 return false;
45 }
46 if let Some(ws_only) = query.is_websocket
47 && is_ws != ws_only
48 {
49 return false;
50 }
51 if let Some(err_only) = query.has_error {
52 let is_err = flow_has_error(flow, status);
53 if err_only != is_err {
54 return false;
55 }
56 }
57 true
58}
59
60pub fn flow_matches_filter(flow: &Flow, filter: &FlowFilter) -> bool {
62 if !flow_matches_query(flow, &filter.query) {
63 return false;
64 }
65 if filter.text_tokens.is_empty() {
66 return true;
67 }
68 let (host_str, path_str, method_str, _status, _is_ws) = flow_layer_fields(flow);
69 let url_blob = format!(
70 "{host_str}{path_str}",
71 host_str = host_str.to_ascii_lowercase(),
72 path_str = path_str.to_ascii_lowercase()
73 );
74 let method_lc = method_str.to_ascii_lowercase();
75 for token in &filter.text_tokens {
76 let needle = token.to_ascii_lowercase();
77 if !url_blob.contains(&needle) && !method_lc.contains(&needle) {
78 return false;
79 }
80 }
81 true
82}
83
84pub fn parse_flow_filter(input: &str) -> FlowFilter {
91 let input = input.trim();
92 if input.is_empty() {
93 return FlowFilter::default();
94 }
95
96 let mut query = FlowQuery::default();
97 let mut text_tokens = Vec::new();
98 let mut had_structured = false;
99
100 for token in input.split_whitespace() {
101 if let Some((key, value)) = token.split_once(':') {
102 let key_lc = key.to_ascii_lowercase();
103 if value.is_empty() {
104 text_tokens.push(token.to_string());
105 continue;
106 }
107 had_structured = true;
108 match key_lc.as_str() {
109 "host" => query.host = Some(value.to_string()),
110 "path" => query.path_contains = Some(value.to_string()),
111 "method" => query.method = Some(value.to_string()),
112 "status" => apply_status_token(&mut query, value),
113 _ => text_tokens.push(token.to_string()),
114 }
115 } else {
116 match token.to_ascii_lowercase().as_str() {
117 "err" | "error" => {
118 had_structured = true;
119 query.has_error = Some(true);
120 }
121 "ws" | "websocket" => {
122 had_structured = true;
123 query.is_websocket = Some(true);
124 }
125 _ => text_tokens.push(token.to_string()),
126 }
127 }
128 }
129
130 if !had_structured && !text_tokens.is_empty() {
131 let joined = text_tokens.join(" ");
132 text_tokens = vec![joined];
133 }
134
135 FlowFilter { query, text_tokens }
136}
137
138fn apply_status_token(query: &mut FlowQuery, value: &str) {
139 let value = value.trim();
140 if let Some(rest) = value.strip_prefix(">=") {
141 if let Ok(n) = rest.parse::<u16>() {
142 query.status_min = Some(n);
143 }
144 return;
145 }
146 if let Some(rest) = value.strip_prefix("<=") {
147 if let Ok(n) = rest.parse::<u16>() {
148 query.status_max = Some(n);
149 }
150 return;
151 }
152 if let Some(rest) = value.strip_prefix('>') {
153 if let Ok(n) = rest.parse::<u16>() {
154 query.status_min = Some(n.saturating_add(1));
155 }
156 return;
157 }
158 if let Some(rest) = value.strip_prefix('<') {
159 if let Ok(n) = rest.parse::<u16>() {
160 query.status_max = Some(n.saturating_sub(1));
161 }
162 return;
163 }
164 if let Some((lo, hi)) = value.split_once('-')
165 && let (Ok(min), Ok(max)) = (lo.trim().parse::<u16>(), hi.trim().parse::<u16>())
166 {
167 query.status_min = Some(min);
168 query.status_max = Some(max);
169 return;
170 }
171 if let Ok(code) = value.parse::<u16>() {
172 query.status_min = Some(code);
173 query.status_max = Some(code);
174 }
175}
176
177fn flow_layer_fields(flow: &Flow) -> (String, String, String, Option<u16>, bool) {
178 match &flow.layer {
179 Layer::Http(h) => {
180 let status = h.response.as_ref().map(|r| r.status);
181 (
182 h.request.url.host_str().unwrap_or("").to_string(),
183 h.request.url.path().to_string(),
184 h.request.method.clone(),
185 status,
186 false,
187 )
188 }
189 Layer::WebSocket(ws) => (
190 ws.handshake_request
191 .url
192 .host_str()
193 .unwrap_or("")
194 .to_string(),
195 ws.handshake_request.url.path().to_string(),
196 ws.handshake_request.method.clone(),
197 Some(ws.handshake_response.status),
198 true,
199 ),
200 _ => (
201 String::new(),
202 String::new(),
203 String::new(),
204 None,
205 false,
206 ),
207 }
208}
209
210fn flow_has_error(flow: &Flow, status: Option<u16>) -> bool {
211 status.is_some_and(|s| s >= 500) || flow.tags.iter().any(|t| t == "error")
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::flow::{
218 Flow, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo, ResponseTiming,
219 TransportProtocol,
220 };
221 use chrono::Utc;
222 use std::collections::HashMap;
223 use url::Url;
224 use uuid::Uuid;
225
226 fn http_flow(url: &str, method: &str, status: Option<u16>) -> Flow {
227 Flow {
228 id: Uuid::new_v4(),
229 start_time: Utc::now(),
230 end_time: None,
231 network: NetworkInfo {
232 client_ip: "127.0.0.1".into(),
233 client_port: 1,
234 server_ip: "127.0.0.1".into(),
235 server_port: 443,
236 protocol: TransportProtocol::TCP,
237 tls: false,
238 tls_version: None,
239 sni: None,
240 },
241 layer: Layer::Http(HttpLayer {
242 request: HttpRequest {
243 method: method.into(),
244 url: Url::parse(url).unwrap(),
245 version: "HTTP/1.1".into(),
246 headers: vec![],
247 cookies: vec![],
248 query: vec![],
249 body: None,
250 },
251 response: status.map(|s| HttpResponse {
252 status: s,
253 status_text: "OK".into(),
254 version: "HTTP/1.1".into(),
255 headers: vec![],
256 cookies: vec![],
257 body: None,
258 timing: ResponseTiming {
259 time_to_first_byte: None,
260 time_to_last_byte: None,
261 connect_time_ms: None,
262 ssl_time_ms: None,
263 },
264 }),
265 error: None,
266 }),
267 tags: vec![],
268 meta: HashMap::new(),
269 }
270 }
271
272 #[test]
273 fn parse_plain_text_single_blob() {
274 let f = parse_flow_filter(" api example ");
275 assert_eq!(f.text_tokens, vec!["api example"]);
276 assert_eq!(f.query.host, None);
277 }
278
279 #[test]
280 fn parse_structured_host_and_method() {
281 let f = parse_flow_filter("host:api.example method:POST");
282 assert_eq!(f.query.host.as_deref(), Some("api.example"));
283 assert_eq!(f.query.method.as_deref(), Some("POST"));
284 assert!(f.text_tokens.is_empty());
285 }
286
287 #[test]
288 fn parse_status_range_and_err_ws() {
289 let f = parse_flow_filter("status:>=400 err ws");
290 assert_eq!(f.query.status_min, Some(400));
291 assert_eq!(f.query.status_max, None);
292 assert_eq!(f.query.has_error, Some(true));
293 assert_eq!(f.query.is_websocket, Some(true));
294 }
295
296 #[test]
297 fn parse_status_exact_and_range() {
298 assert_eq!(
299 parse_flow_filter("status:404").query.status_min,
300 Some(404)
301 );
302 assert_eq!(
303 parse_flow_filter("status:404").query.status_max,
304 Some(404)
305 );
306 let r = parse_flow_filter("status:400-499");
307 assert_eq!(r.query.status_min, Some(400));
308 assert_eq!(r.query.status_max, Some(499));
309 }
310
311 #[test]
312 fn parse_mixed_structured_and_plain() {
313 let f = parse_flow_filter("api host:foo");
314 assert_eq!(f.query.host.as_deref(), Some("foo"));
315 assert_eq!(f.text_tokens, vec!["api"]);
316 }
317
318 #[test]
319 fn flow_matches_query_host_case_insensitive() {
320 let flow = http_flow("http://API.Example.com/x", "GET", Some(200));
321 let q = FlowQuery {
322 host: Some("api.example".into()),
323 ..Default::default()
324 };
325 assert!(flow_matches_query(&flow, &q));
326 }
327
328 #[test]
329 fn flow_matches_query_status_min_and_error() {
330 let ok = http_flow("http://x.com", "GET", Some(200));
331 let err = http_flow("http://x.com", "GET", Some(500));
332 let q = FlowQuery {
333 status_min: Some(400),
334 ..Default::default()
335 };
336 assert!(!flow_matches_query(&ok, &q));
337 assert!(flow_matches_query(&err, &q));
338
339 let q_err = FlowQuery {
340 has_error: Some(true),
341 ..Default::default()
342 };
343 assert!(flow_matches_query(&err, &q_err));
344 }
345
346 #[test]
347 fn flow_matches_filter_plain_case_insensitive() {
348 let flow = http_flow("http://api.example.com/x", "get", Some(200));
349 let filter = parse_flow_filter("API");
350 assert!(flow_matches_filter(&flow, &filter));
351 let filter2 = parse_flow_filter("POST");
352 assert!(!flow_matches_filter(&flow, &filter2));
353 }
354
355 #[test]
356 fn flow_matches_filter_combined() {
357 let flow = http_flow("http://api.example.com/x", "POST", Some(201));
358 let filter = parse_flow_filter("host:api method:POST status:>=200");
359 assert!(flow_matches_filter(&flow, &filter));
360 let filter_fail = parse_flow_filter("host:other method:POST");
361 assert!(!flow_matches_filter(&flow, &filter_fail));
362 }
363}