Skip to main content

relay_core_api/
flow_query.rs

1//! Shared flow search matching and filter expression parsing (TUI / CLI / runtime).
2
3use crate::flow::{Flow, Layer};
4use crate::modification::FlowQuery;
5
6/// Parsed filter: structured [`FlowQuery`] fields (AND) plus optional plain-text tokens (AND, each matches URL or method).
7#[derive(Debug, Clone, Default)]
8pub struct FlowFilter {
9    pub query: FlowQuery,
10    pub text_tokens: Vec<String>,
11}
12
13/// Returns true when `flow` satisfies all set fields in `query` (AND semantics).
14pub fn flow_matches_query(flow: &Flow, query: &FlowQuery) -> bool {
15    let (host_str, path_str, method_str, status, is_ws) = flow_layer_fields(flow);
16
17    if let Some(h) = &query.host
18        && !host_str
19            .to_ascii_lowercase()
20            .contains(&h.to_ascii_lowercase())
21    {
22        return false;
23    }
24    if let Some(p) = &query.path_contains
25        && !path_str
26            .to_ascii_lowercase()
27            .contains(&p.to_ascii_lowercase())
28    {
29        return false;
30    }
31    if let Some(m) = &query.method
32        && !method_str.eq_ignore_ascii_case(m)
33    {
34        return false;
35    }
36    if let Some(min) = query.status_min
37        && status.is_none_or(|s| s < min)
38    {
39        return false;
40    }
41    if let Some(max) = query.status_max
42        && status.is_none_or(|s| s > max)
43    {
44        return false;
45    }
46    if let Some(ws_only) = query.is_websocket
47        && is_ws != ws_only
48    {
49        return false;
50    }
51    if let Some(err_only) = query.has_error {
52        let is_err = flow_has_error(flow, status);
53        if err_only != is_err {
54            return false;
55        }
56    }
57    true
58}
59
60/// Structured query + plain-text tokens.
61pub fn flow_matches_filter(flow: &Flow, filter: &FlowFilter) -> bool {
62    if !flow_matches_query(flow, &filter.query) {
63        return false;
64    }
65    if filter.text_tokens.is_empty() {
66        return true;
67    }
68    let (host_str, path_str, method_str, _status, _is_ws) = flow_layer_fields(flow);
69    let url_blob = format!(
70        "{host_str}{path_str}",
71        host_str = host_str.to_ascii_lowercase(),
72        path_str = path_str.to_ascii_lowercase()
73    );
74    let method_lc = method_str.to_ascii_lowercase();
75    for token in &filter.text_tokens {
76        let needle = token.to_ascii_lowercase();
77        if !url_blob.contains(&needle) && !method_lc.contains(&needle) {
78            return false;
79        }
80    }
81    true
82}
83
84/// Parse a TUI/CLI filter bar expression into structured + plain tokens.
85///
86/// Structured tokens (AND): `host:`, `path:`, `method:`, `status:` (`404`, `>=400`, `400-499`),
87/// bare `err` / `ws`.
88/// Plain tokens: words without `:` (unless unknown `key:value` → treated as plain).
89/// When there are no structured tokens, the whole input is one plain-text search (preserves spaces).
90pub fn parse_flow_filter(input: &str) -> FlowFilter {
91    let input = input.trim();
92    if input.is_empty() {
93        return FlowFilter::default();
94    }
95
96    let mut query = FlowQuery::default();
97    let mut text_tokens = Vec::new();
98    let mut had_structured = false;
99
100    for token in input.split_whitespace() {
101        if let Some((key, value)) = token.split_once(':') {
102            let key_lc = key.to_ascii_lowercase();
103            if value.is_empty() {
104                text_tokens.push(token.to_string());
105                continue;
106            }
107            had_structured = true;
108            match key_lc.as_str() {
109                "host" => query.host = Some(value.to_string()),
110                "path" => query.path_contains = Some(value.to_string()),
111                "method" => query.method = Some(value.to_string()),
112                "status" => apply_status_token(&mut query, value),
113                _ => text_tokens.push(token.to_string()),
114            }
115        } else {
116            match token.to_ascii_lowercase().as_str() {
117                "err" | "error" => {
118                    had_structured = true;
119                    query.has_error = Some(true);
120                }
121                "ws" | "websocket" => {
122                    had_structured = true;
123                    query.is_websocket = Some(true);
124                }
125                _ => text_tokens.push(token.to_string()),
126            }
127        }
128    }
129
130    if !had_structured && !text_tokens.is_empty() {
131        let joined = text_tokens.join(" ");
132        text_tokens = vec![joined];
133    }
134
135    FlowFilter { query, text_tokens }
136}
137
138fn apply_status_token(query: &mut FlowQuery, value: &str) {
139    let value = value.trim();
140    if let Some(rest) = value.strip_prefix(">=") {
141        if let Ok(n) = rest.parse::<u16>() {
142            query.status_min = Some(n);
143        }
144        return;
145    }
146    if let Some(rest) = value.strip_prefix("<=") {
147        if let Ok(n) = rest.parse::<u16>() {
148            query.status_max = Some(n);
149        }
150        return;
151    }
152    if let Some(rest) = value.strip_prefix('>') {
153        if let Ok(n) = rest.parse::<u16>() {
154            query.status_min = Some(n.saturating_add(1));
155        }
156        return;
157    }
158    if let Some(rest) = value.strip_prefix('<') {
159        if let Ok(n) = rest.parse::<u16>() {
160            query.status_max = Some(n.saturating_sub(1));
161        }
162        return;
163    }
164    if let Some((lo, hi)) = value.split_once('-')
165        && let (Ok(min), Ok(max)) = (lo.trim().parse::<u16>(), hi.trim().parse::<u16>())
166    {
167        query.status_min = Some(min);
168        query.status_max = Some(max);
169        return;
170    }
171    if let Ok(code) = value.parse::<u16>() {
172        query.status_min = Some(code);
173        query.status_max = Some(code);
174    }
175}
176
177fn flow_layer_fields(flow: &Flow) -> (String, String, String, Option<u16>, bool) {
178    match &flow.layer {
179        Layer::Http(h) => {
180            let status = h.response.as_ref().map(|r| r.status);
181            (
182                h.request.url.host_str().unwrap_or("").to_string(),
183                h.request.url.path().to_string(),
184                h.request.method.clone(),
185                status,
186                false,
187            )
188        }
189        Layer::WebSocket(ws) => (
190            ws.handshake_request
191                .url
192                .host_str()
193                .unwrap_or("")
194                .to_string(),
195            ws.handshake_request.url.path().to_string(),
196            ws.handshake_request.method.clone(),
197            Some(ws.handshake_response.status),
198            true,
199        ),
200        _ => (String::new(), String::new(), String::new(), None, false),
201    }
202}
203
204fn flow_has_error(flow: &Flow, status: Option<u16>) -> bool {
205    status.is_some_and(|s| s >= 500) || flow.tags.iter().any(|t| t == "error")
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::flow::{
212        Flow, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo, ResponseTiming,
213        TransportProtocol,
214    };
215    use chrono::Utc;
216    use std::collections::HashMap;
217    use url::Url;
218    use uuid::Uuid;
219
220    fn http_flow(url: &str, method: &str, status: Option<u16>) -> Flow {
221        Flow {
222            id: Uuid::new_v4(),
223            start_time: Utc::now(),
224            end_time: None,
225            network: NetworkInfo {
226                client_ip: "127.0.0.1".into(),
227                client_port: 1,
228                server_ip: "127.0.0.1".into(),
229                server_port: 443,
230                protocol: TransportProtocol::TCP,
231                tls: false,
232                tls_version: None,
233                sni: None,
234            },
235            layer: Layer::Http(HttpLayer {
236                request: HttpRequest {
237                    method: method.into(),
238                    url: Url::parse(url).unwrap(),
239                    version: "HTTP/1.1".into(),
240                    headers: vec![],
241                    cookies: vec![],
242                    query: vec![],
243                    body: None,
244                },
245                response: status.map(|s| HttpResponse {
246                    status: s,
247                    status_text: "OK".into(),
248                    version: "HTTP/1.1".into(),
249                    headers: vec![],
250                    cookies: vec![],
251                    body: None,
252                    timing: ResponseTiming {
253                        time_to_first_byte: None,
254                        time_to_last_byte: None,
255                        connect_time_ms: None,
256                        ssl_time_ms: None,
257                    },
258                }),
259                error: None,
260            }),
261            tags: vec![],
262            meta: HashMap::new(),
263        }
264    }
265
266    #[test]
267    fn parse_plain_text_single_blob() {
268        let f = parse_flow_filter("  api example ");
269        assert_eq!(f.text_tokens, vec!["api example"]);
270        assert_eq!(f.query.host, None);
271    }
272
273    #[test]
274    fn parse_structured_host_and_method() {
275        let f = parse_flow_filter("host:api.example method:POST");
276        assert_eq!(f.query.host.as_deref(), Some("api.example"));
277        assert_eq!(f.query.method.as_deref(), Some("POST"));
278        assert!(f.text_tokens.is_empty());
279    }
280
281    #[test]
282    fn parse_status_range_and_err_ws() {
283        let f = parse_flow_filter("status:>=400 err ws");
284        assert_eq!(f.query.status_min, Some(400));
285        assert_eq!(f.query.status_max, None);
286        assert_eq!(f.query.has_error, Some(true));
287        assert_eq!(f.query.is_websocket, Some(true));
288    }
289
290    #[test]
291    fn parse_status_exact_and_range() {
292        assert_eq!(parse_flow_filter("status:404").query.status_min, Some(404));
293        assert_eq!(parse_flow_filter("status:404").query.status_max, Some(404));
294        let r = parse_flow_filter("status:400-499");
295        assert_eq!(r.query.status_min, Some(400));
296        assert_eq!(r.query.status_max, Some(499));
297    }
298
299    #[test]
300    fn parse_mixed_structured_and_plain() {
301        let f = parse_flow_filter("api host:foo");
302        assert_eq!(f.query.host.as_deref(), Some("foo"));
303        assert_eq!(f.text_tokens, vec!["api"]);
304    }
305
306    #[test]
307    fn flow_matches_query_host_case_insensitive() {
308        let flow = http_flow("http://API.Example.com/x", "GET", Some(200));
309        let q = FlowQuery {
310            host: Some("api.example".into()),
311            ..Default::default()
312        };
313        assert!(flow_matches_query(&flow, &q));
314    }
315
316    #[test]
317    fn flow_matches_query_status_min_and_error() {
318        let ok = http_flow("http://x.com", "GET", Some(200));
319        let err = http_flow("http://x.com", "GET", Some(500));
320        let q = FlowQuery {
321            status_min: Some(400),
322            ..Default::default()
323        };
324        assert!(!flow_matches_query(&ok, &q));
325        assert!(flow_matches_query(&err, &q));
326
327        let q_err = FlowQuery {
328            has_error: Some(true),
329            ..Default::default()
330        };
331        assert!(flow_matches_query(&err, &q_err));
332    }
333
334    #[test]
335    fn flow_matches_filter_plain_case_insensitive() {
336        let flow = http_flow("http://api.example.com/x", "get", Some(200));
337        let filter = parse_flow_filter("API");
338        assert!(flow_matches_filter(&flow, &filter));
339        let filter2 = parse_flow_filter("POST");
340        assert!(!flow_matches_filter(&flow, &filter2));
341    }
342
343    #[test]
344    fn flow_matches_filter_combined() {
345        let flow = http_flow("http://api.example.com/x", "POST", Some(201));
346        let filter = parse_flow_filter("host:api method:POST status:>=200");
347        assert!(flow_matches_filter(&flow, &filter));
348        let filter_fail = parse_flow_filter("host:other method:POST");
349        assert!(!flow_matches_filter(&flow, &filter_fail));
350    }
351}