Skip to main content

fakecloud_logs/
query.rs

1/// CloudWatch Logs Insights query language parser and executor.
2///
3/// Supports a subset of CWLI syntax:
4/// - `fields @timestamp, @message` — select specific fields
5/// - `filter @message like /pattern/` — filter by regex/substring
6/// - `filter field = "value"` — filter by field equality
7/// - `sort @timestamp desc` — sort results
8/// - `limit N` — limit number of results
9use crate::state::LogEvent;
10use serde_json::{json, Value};
11
12/// Parsed representation of a CWLI query.
13#[derive(Debug, Default)]
14pub struct ParsedQuery {
15    pub fields: Vec<String>,
16    pub filters: Vec<FilterClause>,
17    pub sort_field: Option<String>,
18    pub sort_desc: bool,
19    pub limit: Option<usize>,
20}
21
22#[derive(Debug)]
23pub enum FilterClause {
24    /// `filter field = "value"`
25    Equals { field: String, value: String },
26    /// `filter field != "value"`
27    NotEquals { field: String, value: String },
28    /// `filter field like /pattern/` or `filter field like "substring"`
29    Like { field: String, pattern: String },
30}
31
32/// Parse a CWLI query string into a structured representation.
33pub fn parse_query(query: &str) -> ParsedQuery {
34    let mut parsed = ParsedQuery::default();
35
36    // Split on pipe delimiter, trimming whitespace
37    let commands: Vec<&str> = query.split('|').map(|s| s.trim()).collect();
38
39    for cmd in commands {
40        if cmd.is_empty() {
41            continue;
42        }
43
44        if let Some(rest) = cmd
45            .strip_prefix("fields ")
46            .or_else(|| cmd.strip_prefix("fields\t"))
47        {
48            parsed.fields = rest
49                .split(',')
50                .map(|f| f.trim().to_string())
51                .filter(|f| !f.is_empty())
52                .collect();
53        } else if let Some(rest) = cmd
54            .strip_prefix("filter ")
55            .or_else(|| cmd.strip_prefix("filter\t"))
56        {
57            if let Some(clause) = parse_filter_clause(rest.trim()) {
58                parsed.filters.push(clause);
59            }
60        } else if let Some(rest) = cmd
61            .strip_prefix("sort ")
62            .or_else(|| cmd.strip_prefix("sort\t"))
63        {
64            let parts: Vec<&str> = rest.split_whitespace().collect();
65            if !parts.is_empty() {
66                parsed.sort_field = Some(parts[0].to_string());
67                parsed.sort_desc =
68                    parts.get(1).map(|s| s.eq_ignore_ascii_case("desc")) == Some(true);
69            }
70        } else if let Some(rest) = cmd
71            .strip_prefix("limit ")
72            .or_else(|| cmd.strip_prefix("limit\t"))
73        {
74            if let Ok(n) = rest.trim().parse::<usize>() {
75                parsed.limit = Some(n);
76            }
77        }
78    }
79
80    parsed
81}
82
83fn parse_filter_clause(s: &str) -> Option<FilterClause> {
84    // Try: field like /pattern/ or field like "substring"
85    if let Some(like_pos) = s.find(" like ") {
86        let field = s[..like_pos].trim().to_string();
87        let pattern_str = s[like_pos + 6..].trim();
88        let pattern = if pattern_str.starts_with('/') && pattern_str.ends_with('/') {
89            // Regex pattern - extract content between slashes
90            pattern_str[1..pattern_str.len() - 1].to_string()
91        } else {
92            // Quoted string
93            unquote(pattern_str)
94        };
95        return Some(FilterClause::Like { field, pattern });
96    }
97
98    // Try: field != "value"
99    if let Some(ne_pos) = s.find(" != ") {
100        let field = s[..ne_pos].trim().to_string();
101        let value = unquote(s[ne_pos + 4..].trim());
102        return Some(FilterClause::NotEquals { field, value });
103    }
104
105    // Try: field = "value"
106    if let Some(eq_pos) = s.find(" = ") {
107        let field = s[..eq_pos].trim().to_string();
108        let value = unquote(s[eq_pos + 3..].trim());
109        return Some(FilterClause::Equals { field, value });
110    }
111
112    None
113}
114
115fn unquote(s: &str) -> String {
116    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
117        s[1..s.len() - 1].to_string()
118    } else {
119        s.to_string()
120    }
121}
122
123/// Get the value of a virtual field for a log event.
124fn get_field_value(event: &LogEvent, field: &str, stream_name: &str) -> Option<String> {
125    match field {
126        "@timestamp" => {
127            // Format as ISO 8601
128            let secs = event.timestamp / 1000;
129            let nsecs = ((event.timestamp % 1000) * 1_000_000) as u32;
130            if let Some(dt) = chrono::DateTime::from_timestamp(secs, nsecs) {
131                Some(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
132            } else {
133                Some(event.timestamp.to_string())
134            }
135        }
136        "@message" => Some(event.message.clone()),
137        "@logStream" => Some(stream_name.to_string()),
138        "@ingestionTime" => Some(event.ingestion_time.to_string()),
139        "@ptr" => Some(format!("{}/{}", stream_name, event.timestamp)),
140        _ => {
141            // Try to extract from JSON message
142            if let Ok(parsed) = serde_json::from_str::<Value>(&event.message) {
143                // Strip leading @ if present for JSON field lookup
144                let key = field.strip_prefix('@').unwrap_or(field);
145                parsed.get(key).map(|v| match v {
146                    Value::String(s) => s.clone(),
147                    other => other.to_string(),
148                })
149            } else {
150                None
151            }
152        }
153    }
154}
155
156/// Check if a substring pattern matches a string (simple glob-like matching).
157fn matches_pattern(haystack: &str, pattern: &str) -> bool {
158    // Simple substring/regex-like matching without the regex crate.
159    // For `/pattern/` syntax we do substring match.
160    // For more complex patterns, we handle common regex anchors:
161    //   ^..$ for exact match, ^ for starts-with, $ for ends-with
162    if let Some(inner) = pattern.strip_prefix('^').and_then(|p| p.strip_suffix('$')) {
163        haystack == inner
164    } else if let Some(prefix) = pattern.strip_prefix('^') {
165        haystack.starts_with(prefix)
166    } else if let Some(suffix) = pattern.strip_suffix('$') {
167        haystack.ends_with(suffix)
168    } else {
169        // Default: substring match
170        haystack.contains(pattern)
171    }
172}
173
174/// Apply a filter clause to an event, returning true if the event matches.
175fn event_matches_filter(event: &LogEvent, stream_name: &str, clause: &FilterClause) -> bool {
176    match clause {
177        FilterClause::Equals { field, value } => get_field_value(event, field, stream_name)
178            .map(|v| v == *value)
179            .unwrap_or(false),
180        FilterClause::NotEquals { field, value } => get_field_value(event, field, stream_name)
181            .map(|v| v != *value)
182            .unwrap_or(true),
183        FilterClause::Like { field, pattern } => get_field_value(event, field, stream_name)
184            .map(|v| matches_pattern(&v, pattern))
185            .unwrap_or(false),
186    }
187}
188
189/// A log event together with its stream name context, used during query execution.
190struct EventWithContext<'a> {
191    event: &'a LogEvent,
192    stream_name: &'a str,
193}
194
195/// Execute a parsed query against a set of log events.
196/// Returns results in the CloudWatch Logs Insights format: array of arrays of {field, value} objects.
197pub fn execute_query(
198    query: &ParsedQuery,
199    events: &[(String, Vec<LogEvent>)], // (stream_name, events) pairs
200    start_time_secs: i64,
201    end_time_secs: i64,
202) -> Vec<Value> {
203    // Collect all events with context
204    let mut all_events: Vec<EventWithContext> = Vec::new();
205    for (stream_name, stream_events) in events {
206        for event in stream_events {
207            let event_time_secs = event.timestamp / 1000;
208            if event_time_secs >= start_time_secs && event_time_secs < end_time_secs {
209                all_events.push(EventWithContext { event, stream_name });
210            }
211        }
212    }
213
214    // Apply filters
215    let filtered: Vec<&EventWithContext> = all_events
216        .iter()
217        .filter(|ec| {
218            query
219                .filters
220                .iter()
221                .all(|f| event_matches_filter(ec.event, ec.stream_name, f))
222        })
223        .collect();
224
225    // Sort
226    let mut sorted: Vec<&EventWithContext> = filtered;
227    if let Some(ref sort_field) = query.sort_field {
228        let field = sort_field.clone();
229        let desc = query.sort_desc;
230        sorted.sort_by(|a, b| {
231            let va = get_field_value(a.event, &field, a.stream_name).unwrap_or_default();
232            let vb = get_field_value(b.event, &field, b.stream_name).unwrap_or_default();
233            if desc {
234                vb.cmp(&va)
235            } else {
236                va.cmp(&vb)
237            }
238        });
239    } else {
240        // Default: sort by timestamp ascending
241        sorted.sort_by_key(|ec| ec.event.timestamp);
242    }
243
244    // Apply limit
245    if let Some(limit) = query.limit {
246        sorted.truncate(limit);
247    }
248
249    // Determine which fields to output
250    let output_fields = if query.fields.is_empty() {
251        vec![
252            "@timestamp".to_string(),
253            "@message".to_string(),
254            "@ptr".to_string(),
255        ]
256    } else {
257        let mut fields = query.fields.clone();
258        // Always include @ptr
259        if !fields.iter().any(|f| f == "@ptr") {
260            fields.push("@ptr".to_string());
261        }
262        fields
263    };
264
265    // Build result rows
266    sorted
267        .iter()
268        .map(|ec| {
269            let row: Vec<Value> = output_fields
270                .iter()
271                .filter_map(|field| {
272                    get_field_value(ec.event, field, ec.stream_name)
273                        .map(|value| json!({"field": field, "value": value}))
274                })
275                .collect();
276            Value::Array(row)
277        })
278        .collect()
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284
285    #[test]
286    fn parse_fields_and_limit() {
287        let q = parse_query("fields @timestamp, @message | limit 5");
288        assert_eq!(q.fields, vec!["@timestamp", "@message"]);
289        assert_eq!(q.limit, Some(5));
290    }
291
292    #[test]
293    fn parse_filter_equals() {
294        let q = parse_query("filter level = \"ERROR\"");
295        assert_eq!(q.filters.len(), 1);
296        match &q.filters[0] {
297            FilterClause::Equals { field, value } => {
298                assert_eq!(field, "level");
299                assert_eq!(value, "ERROR");
300            }
301            _ => panic!("expected Equals"),
302        }
303    }
304
305    #[test]
306    fn parse_filter_like_regex() {
307        let q = parse_query("filter @message like /ERROR/");
308        assert_eq!(q.filters.len(), 1);
309        match &q.filters[0] {
310            FilterClause::Like { field, pattern } => {
311                assert_eq!(field, "@message");
312                assert_eq!(pattern, "ERROR");
313            }
314            _ => panic!("expected Like"),
315        }
316    }
317
318    #[test]
319    fn parse_sort_desc() {
320        let q = parse_query("sort @timestamp desc");
321        assert_eq!(q.sort_field.as_deref(), Some("@timestamp"));
322        assert!(q.sort_desc);
323    }
324
325    #[test]
326    fn parse_sort_asc() {
327        let q = parse_query("sort @timestamp asc");
328        assert_eq!(q.sort_field.as_deref(), Some("@timestamp"));
329        assert!(!q.sort_desc);
330    }
331
332    #[test]
333    fn parse_complex_query() {
334        let q = parse_query(
335            "fields @timestamp, @message | filter @message like /ERROR/ | sort @timestamp desc | limit 10",
336        );
337        assert_eq!(q.fields, vec!["@timestamp", "@message"]);
338        assert_eq!(q.filters.len(), 1);
339        assert_eq!(q.sort_field.as_deref(), Some("@timestamp"));
340        assert!(q.sort_desc);
341        assert_eq!(q.limit, Some(10));
342    }
343
344    #[test]
345    fn execute_query_filters_events() {
346        let events = vec![(
347            "stream-1".to_string(),
348            vec![
349                LogEvent {
350                    timestamp: 1000000,
351                    message: "ERROR: something broke".to_string(),
352                    ingestion_time: 1000000,
353                },
354                LogEvent {
355                    timestamp: 2000000,
356                    message: "INFO: all good".to_string(),
357                    ingestion_time: 2000000,
358                },
359                LogEvent {
360                    timestamp: 3000000,
361                    message: "ERROR: another failure".to_string(),
362                    ingestion_time: 3000000,
363                },
364            ],
365        )];
366
367        let query = parse_query("filter @message like /ERROR/ | limit 10");
368        let results = execute_query(&query, &events, 0, 10000);
369        assert_eq!(results.len(), 2);
370    }
371
372    #[test]
373    fn execute_query_limit() {
374        let events = vec![(
375            "stream-1".to_string(),
376            vec![
377                LogEvent {
378                    timestamp: 1000000,
379                    message: "msg1".to_string(),
380                    ingestion_time: 1000000,
381                },
382                LogEvent {
383                    timestamp: 2000000,
384                    message: "msg2".to_string(),
385                    ingestion_time: 2000000,
386                },
387                LogEvent {
388                    timestamp: 3000000,
389                    message: "msg3".to_string(),
390                    ingestion_time: 3000000,
391                },
392            ],
393        )];
394
395        let query = parse_query("limit 2");
396        let results = execute_query(&query, &events, 0, 10000);
397        assert_eq!(results.len(), 2);
398    }
399
400    #[test]
401    fn execute_query_fields_selection() {
402        let events = vec![(
403            "stream-1".to_string(),
404            vec![LogEvent {
405                timestamp: 1000000,
406                message: "hello".to_string(),
407                ingestion_time: 1000000,
408            }],
409        )];
410
411        let query = parse_query("fields @message");
412        let results = execute_query(&query, &events, 0, 10000);
413        assert_eq!(results.len(), 1);
414
415        let row = results[0].as_array().unwrap();
416        let field_names: Vec<&str> = row.iter().map(|f| f["field"].as_str().unwrap()).collect();
417        assert!(field_names.contains(&"@message"));
418        assert!(field_names.contains(&"@ptr")); // always included
419        assert!(!field_names.contains(&"@timestamp")); // not requested
420    }
421
422    #[test]
423    fn execute_query_sort_desc() {
424        let events = vec![(
425            "stream-1".to_string(),
426            vec![
427                LogEvent {
428                    timestamp: 1000000,
429                    message: "first".to_string(),
430                    ingestion_time: 1000000,
431                },
432                LogEvent {
433                    timestamp: 3000000,
434                    message: "third".to_string(),
435                    ingestion_time: 3000000,
436                },
437                LogEvent {
438                    timestamp: 2000000,
439                    message: "second".to_string(),
440                    ingestion_time: 2000000,
441                },
442            ],
443        )];
444
445        let query = parse_query("sort @timestamp desc");
446        let results = execute_query(&query, &events, 0, 10000);
447        assert_eq!(results.len(), 3);
448        // First result should have the latest timestamp
449        let first_msg = results[0]
450            .as_array()
451            .unwrap()
452            .iter()
453            .find(|f| f["field"].as_str() == Some("@message"))
454            .unwrap();
455        assert_eq!(first_msg["value"].as_str().unwrap(), "third");
456    }
457
458    #[test]
459    fn execute_query_json_field_filter() {
460        let events = vec![(
461            "stream-1".to_string(),
462            vec![
463                LogEvent {
464                    timestamp: 1000000,
465                    message: r#"{"level":"ERROR","msg":"fail"}"#.to_string(),
466                    ingestion_time: 1000000,
467                },
468                LogEvent {
469                    timestamp: 2000000,
470                    message: r#"{"level":"INFO","msg":"ok"}"#.to_string(),
471                    ingestion_time: 2000000,
472                },
473            ],
474        )];
475
476        let query = parse_query(r#"filter level = "ERROR""#);
477        let results = execute_query(&query, &events, 0, 10000);
478        assert_eq!(results.len(), 1);
479    }
480
481    #[test]
482    fn execute_query_not_equals_filter() {
483        let events = vec![(
484            "stream-1".to_string(),
485            vec![
486                LogEvent {
487                    timestamp: 1000000,
488                    message: r#"{"level":"ERROR","msg":"fail"}"#.to_string(),
489                    ingestion_time: 1000000,
490                },
491                LogEvent {
492                    timestamp: 2000000,
493                    message: r#"{"level":"INFO","msg":"ok"}"#.to_string(),
494                    ingestion_time: 2000000,
495                },
496            ],
497        )];
498
499        let query = parse_query(r#"filter level != "ERROR""#);
500        let results = execute_query(&query, &events, 0, 10000);
501        assert_eq!(results.len(), 1);
502        let msg = results[0]
503            .as_array()
504            .unwrap()
505            .iter()
506            .find(|f| f["field"].as_str() == Some("@message"))
507            .unwrap();
508        assert!(msg["value"].as_str().unwrap().contains("INFO"));
509    }
510}