1use crate::state::LogEvent;
10use serde_json::{json, Value};
11
12#[derive(Debug, Default)]
14pub struct ParsedQuery {
15 pub fields: Vec<String>,
16 pub filters: Vec<FilterClause>,
17 pub sort_field: Option<String>,
18 pub sort_desc: bool,
19 pub limit: Option<usize>,
20}
21
22#[derive(Debug)]
23pub enum FilterClause {
24 Equals { field: String, value: String },
26 NotEquals { field: String, value: String },
28 Like { field: String, pattern: String },
30}
31
32pub fn parse_query(query: &str) -> ParsedQuery {
34 let mut parsed = ParsedQuery::default();
35
36 let commands: Vec<&str> = query.split('|').map(|s| s.trim()).collect();
38
39 for cmd in commands {
40 if cmd.is_empty() {
41 continue;
42 }
43
44 if let Some(rest) = cmd
45 .strip_prefix("fields ")
46 .or_else(|| cmd.strip_prefix("fields\t"))
47 {
48 parsed.fields = rest
49 .split(',')
50 .map(|f| f.trim().to_string())
51 .filter(|f| !f.is_empty())
52 .collect();
53 } else if let Some(rest) = cmd
54 .strip_prefix("filter ")
55 .or_else(|| cmd.strip_prefix("filter\t"))
56 {
57 if let Some(clause) = parse_filter_clause(rest.trim()) {
58 parsed.filters.push(clause);
59 }
60 } else if let Some(rest) = cmd
61 .strip_prefix("sort ")
62 .or_else(|| cmd.strip_prefix("sort\t"))
63 {
64 let parts: Vec<&str> = rest.split_whitespace().collect();
65 if !parts.is_empty() {
66 parsed.sort_field = Some(parts[0].to_string());
67 parsed.sort_desc =
68 parts.get(1).map(|s| s.eq_ignore_ascii_case("desc")) == Some(true);
69 }
70 } else if let Some(rest) = cmd
71 .strip_prefix("limit ")
72 .or_else(|| cmd.strip_prefix("limit\t"))
73 {
74 if let Ok(n) = rest.trim().parse::<usize>() {
75 parsed.limit = Some(n);
76 }
77 }
78 }
79
80 parsed
81}
82
83fn parse_filter_clause(s: &str) -> Option<FilterClause> {
84 if let Some(like_pos) = s.find(" like ") {
86 let field = s[..like_pos].trim().to_string();
87 let pattern_str = s[like_pos + 6..].trim();
88 let pattern = if pattern_str.starts_with('/') && pattern_str.ends_with('/') {
89 pattern_str[1..pattern_str.len() - 1].to_string()
91 } else {
92 unquote(pattern_str)
94 };
95 return Some(FilterClause::Like { field, pattern });
96 }
97
98 if let Some(ne_pos) = s.find(" != ") {
100 let field = s[..ne_pos].trim().to_string();
101 let value = unquote(s[ne_pos + 4..].trim());
102 return Some(FilterClause::NotEquals { field, value });
103 }
104
105 if let Some(eq_pos) = s.find(" = ") {
107 let field = s[..eq_pos].trim().to_string();
108 let value = unquote(s[eq_pos + 3..].trim());
109 return Some(FilterClause::Equals { field, value });
110 }
111
112 None
113}
114
115fn unquote(s: &str) -> String {
116 if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
117 s[1..s.len() - 1].to_string()
118 } else {
119 s.to_string()
120 }
121}
122
123fn get_field_value(event: &LogEvent, field: &str, stream_name: &str) -> Option<String> {
125 match field {
126 "@timestamp" => {
127 let secs = event.timestamp / 1000;
129 let nsecs = ((event.timestamp % 1000) * 1_000_000) as u32;
130 if let Some(dt) = chrono::DateTime::from_timestamp(secs, nsecs) {
131 Some(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
132 } else {
133 Some(event.timestamp.to_string())
134 }
135 }
136 "@message" => Some(event.message.clone()),
137 "@logStream" => Some(stream_name.to_string()),
138 "@ingestionTime" => Some(event.ingestion_time.to_string()),
139 "@ptr" => Some(format!("{}/{}", stream_name, event.timestamp)),
140 _ => {
141 if let Ok(parsed) = serde_json::from_str::<Value>(&event.message) {
143 let key = field.strip_prefix('@').unwrap_or(field);
145 parsed.get(key).map(|v| match v {
146 Value::String(s) => s.clone(),
147 other => other.to_string(),
148 })
149 } else {
150 None
151 }
152 }
153 }
154}
155
156fn matches_pattern(haystack: &str, pattern: &str) -> bool {
158 if let Some(inner) = pattern.strip_prefix('^').and_then(|p| p.strip_suffix('$')) {
163 haystack == inner
164 } else if let Some(prefix) = pattern.strip_prefix('^') {
165 haystack.starts_with(prefix)
166 } else if let Some(suffix) = pattern.strip_suffix('$') {
167 haystack.ends_with(suffix)
168 } else {
169 haystack.contains(pattern)
171 }
172}
173
174fn event_matches_filter(event: &LogEvent, stream_name: &str, clause: &FilterClause) -> bool {
176 match clause {
177 FilterClause::Equals { field, value } => get_field_value(event, field, stream_name)
178 .map(|v| v == *value)
179 .unwrap_or(false),
180 FilterClause::NotEquals { field, value } => get_field_value(event, field, stream_name)
181 .map(|v| v != *value)
182 .unwrap_or(true),
183 FilterClause::Like { field, pattern } => get_field_value(event, field, stream_name)
184 .map(|v| matches_pattern(&v, pattern))
185 .unwrap_or(false),
186 }
187}
188
189struct EventWithContext<'a> {
191 event: &'a LogEvent,
192 stream_name: &'a str,
193}
194
195pub fn execute_query(
198 query: &ParsedQuery,
199 events: &[(String, Vec<LogEvent>)], start_time_secs: i64,
201 end_time_secs: i64,
202) -> Vec<Value> {
203 let mut all_events: Vec<EventWithContext> = Vec::new();
205 for (stream_name, stream_events) in events {
206 for event in stream_events {
207 let event_time_secs = event.timestamp / 1000;
208 if event_time_secs >= start_time_secs && event_time_secs < end_time_secs {
209 all_events.push(EventWithContext { event, stream_name });
210 }
211 }
212 }
213
214 let filtered: Vec<&EventWithContext> = all_events
216 .iter()
217 .filter(|ec| {
218 query
219 .filters
220 .iter()
221 .all(|f| event_matches_filter(ec.event, ec.stream_name, f))
222 })
223 .collect();
224
225 let mut sorted: Vec<&EventWithContext> = filtered;
227 if let Some(ref sort_field) = query.sort_field {
228 let field = sort_field.clone();
229 let desc = query.sort_desc;
230 sorted.sort_by(|a, b| {
231 let va = get_field_value(a.event, &field, a.stream_name).unwrap_or_default();
232 let vb = get_field_value(b.event, &field, b.stream_name).unwrap_or_default();
233 if desc {
234 vb.cmp(&va)
235 } else {
236 va.cmp(&vb)
237 }
238 });
239 } else {
240 sorted.sort_by_key(|ec| ec.event.timestamp);
242 }
243
244 if let Some(limit) = query.limit {
246 sorted.truncate(limit);
247 }
248
249 let output_fields = if query.fields.is_empty() {
251 vec![
252 "@timestamp".to_string(),
253 "@message".to_string(),
254 "@ptr".to_string(),
255 ]
256 } else {
257 let mut fields = query.fields.clone();
258 if !fields.iter().any(|f| f == "@ptr") {
260 fields.push("@ptr".to_string());
261 }
262 fields
263 };
264
265 sorted
267 .iter()
268 .map(|ec| {
269 let row: Vec<Value> = output_fields
270 .iter()
271 .filter_map(|field| {
272 get_field_value(ec.event, field, ec.stream_name)
273 .map(|value| json!({"field": field, "value": value}))
274 })
275 .collect();
276 Value::Array(row)
277 })
278 .collect()
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 #[test]
286 fn parse_fields_and_limit() {
287 let q = parse_query("fields @timestamp, @message | limit 5");
288 assert_eq!(q.fields, vec!["@timestamp", "@message"]);
289 assert_eq!(q.limit, Some(5));
290 }
291
292 #[test]
293 fn parse_filter_equals() {
294 let q = parse_query("filter level = \"ERROR\"");
295 assert_eq!(q.filters.len(), 1);
296 match &q.filters[0] {
297 FilterClause::Equals { field, value } => {
298 assert_eq!(field, "level");
299 assert_eq!(value, "ERROR");
300 }
301 _ => panic!("expected Equals"),
302 }
303 }
304
305 #[test]
306 fn parse_filter_like_regex() {
307 let q = parse_query("filter @message like /ERROR/");
308 assert_eq!(q.filters.len(), 1);
309 match &q.filters[0] {
310 FilterClause::Like { field, pattern } => {
311 assert_eq!(field, "@message");
312 assert_eq!(pattern, "ERROR");
313 }
314 _ => panic!("expected Like"),
315 }
316 }
317
318 #[test]
319 fn parse_sort_desc() {
320 let q = parse_query("sort @timestamp desc");
321 assert_eq!(q.sort_field.as_deref(), Some("@timestamp"));
322 assert!(q.sort_desc);
323 }
324
325 #[test]
326 fn parse_sort_asc() {
327 let q = parse_query("sort @timestamp asc");
328 assert_eq!(q.sort_field.as_deref(), Some("@timestamp"));
329 assert!(!q.sort_desc);
330 }
331
332 #[test]
333 fn parse_complex_query() {
334 let q = parse_query(
335 "fields @timestamp, @message | filter @message like /ERROR/ | sort @timestamp desc | limit 10",
336 );
337 assert_eq!(q.fields, vec!["@timestamp", "@message"]);
338 assert_eq!(q.filters.len(), 1);
339 assert_eq!(q.sort_field.as_deref(), Some("@timestamp"));
340 assert!(q.sort_desc);
341 assert_eq!(q.limit, Some(10));
342 }
343
344 #[test]
345 fn execute_query_filters_events() {
346 let events = vec![(
347 "stream-1".to_string(),
348 vec![
349 LogEvent {
350 timestamp: 1000000,
351 message: "ERROR: something broke".to_string(),
352 ingestion_time: 1000000,
353 },
354 LogEvent {
355 timestamp: 2000000,
356 message: "INFO: all good".to_string(),
357 ingestion_time: 2000000,
358 },
359 LogEvent {
360 timestamp: 3000000,
361 message: "ERROR: another failure".to_string(),
362 ingestion_time: 3000000,
363 },
364 ],
365 )];
366
367 let query = parse_query("filter @message like /ERROR/ | limit 10");
368 let results = execute_query(&query, &events, 0, 10000);
369 assert_eq!(results.len(), 2);
370 }
371
372 #[test]
373 fn execute_query_limit() {
374 let events = vec![(
375 "stream-1".to_string(),
376 vec![
377 LogEvent {
378 timestamp: 1000000,
379 message: "msg1".to_string(),
380 ingestion_time: 1000000,
381 },
382 LogEvent {
383 timestamp: 2000000,
384 message: "msg2".to_string(),
385 ingestion_time: 2000000,
386 },
387 LogEvent {
388 timestamp: 3000000,
389 message: "msg3".to_string(),
390 ingestion_time: 3000000,
391 },
392 ],
393 )];
394
395 let query = parse_query("limit 2");
396 let results = execute_query(&query, &events, 0, 10000);
397 assert_eq!(results.len(), 2);
398 }
399
400 #[test]
401 fn execute_query_fields_selection() {
402 let events = vec![(
403 "stream-1".to_string(),
404 vec![LogEvent {
405 timestamp: 1000000,
406 message: "hello".to_string(),
407 ingestion_time: 1000000,
408 }],
409 )];
410
411 let query = parse_query("fields @message");
412 let results = execute_query(&query, &events, 0, 10000);
413 assert_eq!(results.len(), 1);
414
415 let row = results[0].as_array().unwrap();
416 let field_names: Vec<&str> = row.iter().map(|f| f["field"].as_str().unwrap()).collect();
417 assert!(field_names.contains(&"@message"));
418 assert!(field_names.contains(&"@ptr")); assert!(!field_names.contains(&"@timestamp")); }
421
422 #[test]
423 fn execute_query_sort_desc() {
424 let events = vec![(
425 "stream-1".to_string(),
426 vec![
427 LogEvent {
428 timestamp: 1000000,
429 message: "first".to_string(),
430 ingestion_time: 1000000,
431 },
432 LogEvent {
433 timestamp: 3000000,
434 message: "third".to_string(),
435 ingestion_time: 3000000,
436 },
437 LogEvent {
438 timestamp: 2000000,
439 message: "second".to_string(),
440 ingestion_time: 2000000,
441 },
442 ],
443 )];
444
445 let query = parse_query("sort @timestamp desc");
446 let results = execute_query(&query, &events, 0, 10000);
447 assert_eq!(results.len(), 3);
448 let first_msg = results[0]
450 .as_array()
451 .unwrap()
452 .iter()
453 .find(|f| f["field"].as_str() == Some("@message"))
454 .unwrap();
455 assert_eq!(first_msg["value"].as_str().unwrap(), "third");
456 }
457
458 #[test]
459 fn execute_query_json_field_filter() {
460 let events = vec![(
461 "stream-1".to_string(),
462 vec![
463 LogEvent {
464 timestamp: 1000000,
465 message: r#"{"level":"ERROR","msg":"fail"}"#.to_string(),
466 ingestion_time: 1000000,
467 },
468 LogEvent {
469 timestamp: 2000000,
470 message: r#"{"level":"INFO","msg":"ok"}"#.to_string(),
471 ingestion_time: 2000000,
472 },
473 ],
474 )];
475
476 let query = parse_query(r#"filter level = "ERROR""#);
477 let results = execute_query(&query, &events, 0, 10000);
478 assert_eq!(results.len(), 1);
479 }
480
481 #[test]
482 fn execute_query_not_equals_filter() {
483 let events = vec![(
484 "stream-1".to_string(),
485 vec![
486 LogEvent {
487 timestamp: 1000000,
488 message: r#"{"level":"ERROR","msg":"fail"}"#.to_string(),
489 ingestion_time: 1000000,
490 },
491 LogEvent {
492 timestamp: 2000000,
493 message: r#"{"level":"INFO","msg":"ok"}"#.to_string(),
494 ingestion_time: 2000000,
495 },
496 ],
497 )];
498
499 let query = parse_query(r#"filter level != "ERROR""#);
500 let results = execute_query(&query, &events, 0, 10000);
501 assert_eq!(results.len(), 1);
502 let msg = results[0]
503 .as_array()
504 .unwrap()
505 .iter()
506 .find(|f| f["field"].as_str() == Some("@message"))
507 .unwrap();
508 assert!(msg["value"].as_str().unwrap().contains("INFO"));
509 }
510}