1use crate::flow::{Flow, Layer};
4use crate::modification::FlowQuery;
5
6#[derive(Debug, Clone, Default)]
8pub struct FlowFilter {
9 pub query: FlowQuery,
10 pub text_tokens: Vec<String>,
11}
12
13pub fn flow_matches_query(flow: &Flow, query: &FlowQuery) -> bool {
15 let (host_str, path_str, method_str, status, is_ws) = flow_layer_fields(flow);
16
17 if let Some(h) = &query.host
18 && !host_str
19 .to_ascii_lowercase()
20 .contains(&h.to_ascii_lowercase())
21 {
22 return false;
23 }
24 if let Some(p) = &query.path_contains
25 && !path_str
26 .to_ascii_lowercase()
27 .contains(&p.to_ascii_lowercase())
28 {
29 return false;
30 }
31 if let Some(m) = &query.method
32 && !method_str.eq_ignore_ascii_case(m)
33 {
34 return false;
35 }
36 if let Some(min) = query.status_min
37 && status.is_none_or(|s| s < min)
38 {
39 return false;
40 }
41 if let Some(max) = query.status_max
42 && status.is_none_or(|s| s > max)
43 {
44 return false;
45 }
46 if let Some(ws_only) = query.is_websocket
47 && is_ws != ws_only
48 {
49 return false;
50 }
51 if let Some(err_only) = query.has_error {
52 let is_err = flow_has_error(flow, status);
53 if err_only != is_err {
54 return false;
55 }
56 }
57 true
58}
59
60pub fn flow_matches_filter(flow: &Flow, filter: &FlowFilter) -> bool {
62 if !flow_matches_query(flow, &filter.query) {
63 return false;
64 }
65 if filter.text_tokens.is_empty() {
66 return true;
67 }
68 let (host_str, path_str, method_str, _status, _is_ws) = flow_layer_fields(flow);
69 let url_blob = format!(
70 "{host_str}{path_str}",
71 host_str = host_str.to_ascii_lowercase(),
72 path_str = path_str.to_ascii_lowercase()
73 );
74 let method_lc = method_str.to_ascii_lowercase();
75 for token in &filter.text_tokens {
76 let needle = token.to_ascii_lowercase();
77 if !url_blob.contains(&needle) && !method_lc.contains(&needle) {
78 return false;
79 }
80 }
81 true
82}
83
84pub fn parse_flow_filter(input: &str) -> FlowFilter {
91 let input = input.trim();
92 if input.is_empty() {
93 return FlowFilter::default();
94 }
95
96 let mut query = FlowQuery::default();
97 let mut text_tokens = Vec::new();
98 let mut had_structured = false;
99
100 for token in input.split_whitespace() {
101 if let Some((key, value)) = token.split_once(':') {
102 let key_lc = key.to_ascii_lowercase();
103 if value.is_empty() {
104 text_tokens.push(token.to_string());
105 continue;
106 }
107 had_structured = true;
108 match key_lc.as_str() {
109 "host" => query.host = Some(value.to_string()),
110 "path" => query.path_contains = Some(value.to_string()),
111 "method" => query.method = Some(value.to_string()),
112 "status" => apply_status_token(&mut query, value),
113 _ => text_tokens.push(token.to_string()),
114 }
115 } else {
116 match token.to_ascii_lowercase().as_str() {
117 "err" | "error" => {
118 had_structured = true;
119 query.has_error = Some(true);
120 }
121 "ws" | "websocket" => {
122 had_structured = true;
123 query.is_websocket = Some(true);
124 }
125 _ => text_tokens.push(token.to_string()),
126 }
127 }
128 }
129
130 if !had_structured && !text_tokens.is_empty() {
131 let joined = text_tokens.join(" ");
132 text_tokens = vec![joined];
133 }
134
135 FlowFilter { query, text_tokens }
136}
137
138fn apply_status_token(query: &mut FlowQuery, value: &str) {
139 let value = value.trim();
140 if let Some(rest) = value.strip_prefix(">=") {
141 if let Ok(n) = rest.parse::<u16>() {
142 query.status_min = Some(n);
143 }
144 return;
145 }
146 if let Some(rest) = value.strip_prefix("<=") {
147 if let Ok(n) = rest.parse::<u16>() {
148 query.status_max = Some(n);
149 }
150 return;
151 }
152 if let Some(rest) = value.strip_prefix('>') {
153 if let Ok(n) = rest.parse::<u16>() {
154 query.status_min = Some(n.saturating_add(1));
155 }
156 return;
157 }
158 if let Some(rest) = value.strip_prefix('<') {
159 if let Ok(n) = rest.parse::<u16>() {
160 query.status_max = Some(n.saturating_sub(1));
161 }
162 return;
163 }
164 if let Some((lo, hi)) = value.split_once('-')
165 && let (Ok(min), Ok(max)) = (lo.trim().parse::<u16>(), hi.trim().parse::<u16>())
166 {
167 query.status_min = Some(min);
168 query.status_max = Some(max);
169 return;
170 }
171 if let Ok(code) = value.parse::<u16>() {
172 query.status_min = Some(code);
173 query.status_max = Some(code);
174 }
175}
176
177fn flow_layer_fields(flow: &Flow) -> (String, String, String, Option<u16>, bool) {
178 match &flow.layer {
179 Layer::Http(h) => {
180 let status = h.response.as_ref().map(|r| r.status);
181 (
182 h.request.url.host_str().unwrap_or("").to_string(),
183 h.request.url.path().to_string(),
184 h.request.method.clone(),
185 status,
186 false,
187 )
188 }
189 Layer::WebSocket(ws) => (
190 ws.handshake_request
191 .url
192 .host_str()
193 .unwrap_or("")
194 .to_string(),
195 ws.handshake_request.url.path().to_string(),
196 ws.handshake_request.method.clone(),
197 Some(ws.handshake_response.status),
198 true,
199 ),
200 _ => (String::new(), String::new(), String::new(), None, false),
201 }
202}
203
204fn flow_has_error(flow: &Flow, status: Option<u16>) -> bool {
205 status.is_some_and(|s| s >= 500) || flow.tags.iter().any(|t| t == "error")
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::flow::{
212 Flow, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo, ResponseTiming,
213 TransportProtocol,
214 };
215 use chrono::Utc;
216 use std::collections::HashMap;
217 use url::Url;
218 use uuid::Uuid;
219
220 fn http_flow(url: &str, method: &str, status: Option<u16>) -> Flow {
221 Flow {
222 id: Uuid::new_v4(),
223 start_time: Utc::now(),
224 end_time: None,
225 network: NetworkInfo {
226 client_ip: "127.0.0.1".into(),
227 client_port: 1,
228 server_ip: "127.0.0.1".into(),
229 server_port: 443,
230 protocol: TransportProtocol::TCP,
231 tls: false,
232 tls_version: None,
233 sni: None,
234 },
235 layer: Layer::Http(HttpLayer {
236 request: HttpRequest {
237 method: method.into(),
238 url: Url::parse(url).unwrap(),
239 version: "HTTP/1.1".into(),
240 headers: vec![],
241 cookies: vec![],
242 query: vec![],
243 body: None,
244 },
245 response: status.map(|s| HttpResponse {
246 status: s,
247 status_text: "OK".into(),
248 version: "HTTP/1.1".into(),
249 headers: vec![],
250 cookies: vec![],
251 body: None,
252 timing: ResponseTiming {
253 time_to_first_byte: None,
254 time_to_last_byte: None,
255 connect_time_ms: None,
256 ssl_time_ms: None,
257 },
258 }),
259 error: None,
260 }),
261 tags: vec![],
262 meta: HashMap::new(),
263 }
264 }
265
266 #[test]
267 fn parse_plain_text_single_blob() {
268 let f = parse_flow_filter(" api example ");
269 assert_eq!(f.text_tokens, vec!["api example"]);
270 assert_eq!(f.query.host, None);
271 }
272
273 #[test]
274 fn parse_structured_host_and_method() {
275 let f = parse_flow_filter("host:api.example method:POST");
276 assert_eq!(f.query.host.as_deref(), Some("api.example"));
277 assert_eq!(f.query.method.as_deref(), Some("POST"));
278 assert!(f.text_tokens.is_empty());
279 }
280
281 #[test]
282 fn parse_status_range_and_err_ws() {
283 let f = parse_flow_filter("status:>=400 err ws");
284 assert_eq!(f.query.status_min, Some(400));
285 assert_eq!(f.query.status_max, None);
286 assert_eq!(f.query.has_error, Some(true));
287 assert_eq!(f.query.is_websocket, Some(true));
288 }
289
290 #[test]
291 fn parse_status_exact_and_range() {
292 assert_eq!(parse_flow_filter("status:404").query.status_min, Some(404));
293 assert_eq!(parse_flow_filter("status:404").query.status_max, Some(404));
294 let r = parse_flow_filter("status:400-499");
295 assert_eq!(r.query.status_min, Some(400));
296 assert_eq!(r.query.status_max, Some(499));
297 }
298
299 #[test]
300 fn parse_mixed_structured_and_plain() {
301 let f = parse_flow_filter("api host:foo");
302 assert_eq!(f.query.host.as_deref(), Some("foo"));
303 assert_eq!(f.text_tokens, vec!["api"]);
304 }
305
306 #[test]
307 fn flow_matches_query_host_case_insensitive() {
308 let flow = http_flow("http://API.Example.com/x", "GET", Some(200));
309 let q = FlowQuery {
310 host: Some("api.example".into()),
311 ..Default::default()
312 };
313 assert!(flow_matches_query(&flow, &q));
314 }
315
316 #[test]
317 fn flow_matches_query_status_min_and_error() {
318 let ok = http_flow("http://x.com", "GET", Some(200));
319 let err = http_flow("http://x.com", "GET", Some(500));
320 let q = FlowQuery {
321 status_min: Some(400),
322 ..Default::default()
323 };
324 assert!(!flow_matches_query(&ok, &q));
325 assert!(flow_matches_query(&err, &q));
326
327 let q_err = FlowQuery {
328 has_error: Some(true),
329 ..Default::default()
330 };
331 assert!(flow_matches_query(&err, &q_err));
332 }
333
334 #[test]
335 fn flow_matches_filter_plain_case_insensitive() {
336 let flow = http_flow("http://api.example.com/x", "get", Some(200));
337 let filter = parse_flow_filter("API");
338 assert!(flow_matches_filter(&flow, &filter));
339 let filter2 = parse_flow_filter("POST");
340 assert!(!flow_matches_filter(&flow, &filter2));
341 }
342
343 #[test]
344 fn flow_matches_filter_combined() {
345 let flow = http_flow("http://api.example.com/x", "POST", Some(201));
346 let filter = parse_flow_filter("host:api method:POST status:>=200");
347 assert!(flow_matches_filter(&flow, &filter));
348 let filter_fail = parse_flow_filter("host:other method:POST");
349 assert!(!flow_matches_filter(&flow, &filter_fail));
350 }
351}