1use nom::{
9 bytes::complete::{tag, take_while1},
10 character::complete::{alpha1, char, digit1, multispace0, multispace1},
11 combinator::opt,
12 sequence::delimited,
13 IResult, Parser,
14};
15use std::time::Duration;
16
17#[cfg(feature = "streaming")]
19pub use crate::streaming::window::WindowType;
20
21#[cfg(not(feature = "streaming"))]
23#[derive(Debug, Clone, PartialEq, Copy)]
24pub enum WindowType {
25 Sliding,
26 Tumbling,
27 Session { timeout: Duration },
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct StreamSource {
33 pub stream_name: String,
34 pub window: Option<WindowSpec>,
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub struct WindowSpec {
40 pub duration: Duration,
41 pub window_type: WindowType,
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub struct StreamPattern {
47 pub var_name: String,
48 pub event_type: Option<String>,
49 pub source: StreamSource,
50}
51
52#[derive(Debug, Clone, PartialEq)]
54pub struct StreamJoinPattern {
55 pub left: StreamPattern,
56 pub right: StreamPattern,
57 pub join_conditions: Vec<JoinCondition>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
62pub enum JoinCondition {
63 Equality {
65 left_field: String,
66 right_field: String,
67 },
68 Expression(String),
70 Temporal {
72 operator: TemporalOp,
73 left_field: String,
74 right_field: String,
75 },
76}
77
78#[derive(Debug, Clone, PartialEq)]
80pub enum TemporalOp {
81 Before, After, Within, }
85
86pub fn parse_stream_source(input: &str) -> IResult<&str, StreamSource> {
94 let (input, _) = multispace0(input)?;
95 let (input, _) = tag("from")(input)?;
96 let (input, _) = multispace1(input)?;
97 let (input, _) = tag("stream")(input)?;
98 let (input, _) = multispace0(input)?;
99
100 let (input, stream_name) = delimited(
102 (char('('), multispace0, char('"')),
103 take_while1(|c: char| c != '"'),
104 (char('"'), multispace0, char(')')),
105 )
106 .parse(input)?;
107
108 let (input, window) = opt(parse_window_spec).parse(input)?;
110
111 Ok((
112 input,
113 StreamSource {
114 stream_name: stream_name.to_string(),
115 window,
116 },
117 ))
118}
119
120pub fn parse_window_spec(input: &str) -> IResult<&str, WindowSpec> {
129 let (input, _) = multispace0(input)?;
130 let (input, _) = tag("over")(input)?;
131 let (input, _) = multispace1(input)?;
132 let (input, _) = tag("window")(input)?;
133 let (input, _) = multispace0(input)?;
134 let (input, _) = char('(')(input)?;
135 let (input, _) = multispace0(input)?;
136
137 let (input, duration) = parse_duration(input)?;
139
140 let (input, _) = multispace0(input)?;
141 let (input, _) = char(',')(input)?;
142 let (input, _) = multispace0(input)?;
143
144 let (input, window_type) = parse_window_type(input)?;
146
147 let (input, _) = multispace0(input)?;
148 let (input, _) = char(')')(input)?;
149
150 Ok((
151 input,
152 WindowSpec {
153 duration,
154 window_type,
155 },
156 ))
157}
158
159pub fn parse_duration(input: &str) -> IResult<&str, Duration> {
167 let (input, value) = digit1(input)?;
168 let (input, _) = multispace1(input)?;
169 let (input, unit) = alpha1(input)?;
170
171 let value: u64 = value.parse().map_err(|_| {
172 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
173 })?;
174
175 let duration = match unit {
176 "ms" | "milliseconds" | "millisecond" => Duration::from_millis(value),
177 "sec" | "second" | "seconds" => Duration::from_secs(value),
178 "min" | "minute" | "minutes" => Duration::from_secs(value * 60),
179 "hour" | "hours" => Duration::from_secs(value * 3600),
180 _ => {
181 return Err(nom::Err::Error(nom::error::Error::new(
182 input,
183 nom::error::ErrorKind::Tag,
184 )))
185 }
186 };
187
188 Ok((input, duration))
189}
190
191pub fn parse_window_type(input: &str) -> IResult<&str, WindowType> {
193 let (input, type_str) = alpha1(input)?;
194
195 let window_type = match type_str {
196 "sliding" => WindowType::Sliding,
197 "tumbling" => WindowType::Tumbling,
198 _ => {
199 return Err(nom::Err::Error(nom::error::Error::new(
200 input,
201 nom::error::ErrorKind::Tag,
202 )))
203 }
204 };
205
206 Ok((input, window_type))
207}
208
209pub fn parse_stream_pattern(input: &str) -> IResult<&str, StreamPattern> {
217 let (input, var_name) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
219 let (input, _) = multispace0(input)?;
220 let (input, _) = char(':')(input)?;
221 let (input, _) = multispace0(input)?;
222
223 let (input, event_type) = {
225 let checkpoint = input;
226 match take_while1::<_, _, nom::error::Error<&str>>(|c: char| {
227 c.is_alphanumeric() || c == '_'
228 })(input)
229 {
230 Ok((remaining, name)) if name != "from" => (remaining, Some(name)),
231 _ => (checkpoint, None),
232 }
233 };
234
235 let (input, _) = multispace0(input)?;
236
237 let (input, source) = parse_stream_source(input)?;
239
240 Ok((
241 input,
242 StreamPattern {
243 var_name: var_name.to_string(),
244 event_type: event_type.map(|s| s.to_string()),
245 source,
246 },
247 ))
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn test_parse_stream_source_basic() {
256 let input = r#"from stream("user-events")"#;
257 let result = parse_stream_source(input);
258
259 assert!(result.is_ok());
260 let (_, source) = result.unwrap();
261 assert_eq!(source.stream_name, "user-events");
262 assert!(source.window.is_none());
263 }
264
265 #[test]
266 fn test_parse_stream_source_with_spaces() {
267 let input = r#" from stream ( "sensor-data" ) "#;
268 let result = parse_stream_source(input);
269
270 assert!(result.is_ok());
271 let (_, source) = result.unwrap();
272 assert_eq!(source.stream_name, "sensor-data");
273 }
274
275 #[test]
276 fn test_parse_duration_seconds() {
277 let tests = vec![
278 ("5 seconds", Duration::from_secs(5)),
279 ("10 sec", Duration::from_secs(10)),
280 ("1 second", Duration::from_secs(1)),
281 ];
282
283 for (input, expected) in tests {
284 let result = parse_duration(input);
285 assert!(result.is_ok(), "Failed to parse: {}", input);
286 let (_, duration) = result.unwrap();
287 assert_eq!(duration, expected);
288 }
289 }
290
291 #[test]
292 fn test_parse_duration_minutes() {
293 let tests = vec![
294 ("5 min", Duration::from_secs(300)),
295 ("10 minutes", Duration::from_secs(600)),
296 ("1 minute", Duration::from_secs(60)),
297 ];
298
299 for (input, expected) in tests {
300 let result = parse_duration(input);
301 assert!(result.is_ok());
302 let (_, duration) = result.unwrap();
303 assert_eq!(duration, expected);
304 }
305 }
306
307 #[test]
308 fn test_parse_duration_hours() {
309 let input = "1 hour";
310 let result = parse_duration(input);
311
312 assert!(result.is_ok());
313 let (_, duration) = result.unwrap();
314 assert_eq!(duration, Duration::from_secs(3600));
315 }
316
317 #[test]
318 fn test_parse_duration_milliseconds() {
319 let input = "500 ms";
320 let result = parse_duration(input);
321
322 assert!(result.is_ok());
323 let (_, duration) = result.unwrap();
324 assert_eq!(duration, Duration::from_millis(500));
325 }
326
327 #[test]
328 fn test_parse_window_type() {
329 let tests = vec![
330 ("sliding", WindowType::Sliding),
331 ("tumbling", WindowType::Tumbling),
332 ];
333
334 for (input, expected) in tests {
335 let result = parse_window_type(input);
336 assert!(result.is_ok());
337 let (_, window_type) = result.unwrap();
338 assert_eq!(window_type, expected);
339 }
340 }
341
342 #[test]
343 fn test_parse_window_spec() {
344 let input = "over window(5 min, sliding)";
345 let result = parse_window_spec(input);
346
347 assert!(result.is_ok());
348 let (_, spec) = result.unwrap();
349 assert_eq!(spec.duration, Duration::from_secs(300));
350 assert_eq!(spec.window_type, WindowType::Sliding);
351 }
352
353 #[test]
354 fn test_parse_window_spec_tumbling() {
355 let input = "over window(1 hour, tumbling)";
356 let result = parse_window_spec(input);
357
358 assert!(result.is_ok());
359 let (_, spec) = result.unwrap();
360 assert_eq!(spec.duration, Duration::from_secs(3600));
361 assert_eq!(spec.window_type, WindowType::Tumbling);
362 }
363
364 #[test]
365 fn test_parse_stream_pattern_simple() {
366 let input = r#"event: LoginEvent from stream("logins")"#;
367 let result = parse_stream_pattern(input);
368
369 assert!(result.is_ok());
370 let (_, pattern) = result.unwrap();
371 assert_eq!(pattern.var_name, "event");
372 assert_eq!(pattern.event_type, Some("LoginEvent".to_string()));
373 assert_eq!(pattern.source.stream_name, "logins");
374 assert!(pattern.source.window.is_none());
375 }
376
377 #[test]
378 fn test_parse_stream_pattern_with_window() {
379 let input = r#"reading: TempReading from stream("sensors") over window(10 min, sliding)"#;
380 let result = parse_stream_pattern(input);
381
382 assert!(result.is_ok());
383 let (_, pattern) = result.unwrap();
384 assert_eq!(pattern.var_name, "reading");
385 assert_eq!(pattern.event_type, Some("TempReading".to_string()));
386 assert_eq!(pattern.source.stream_name, "sensors");
387 assert!(pattern.source.window.is_some());
388
389 let window = pattern.source.window.unwrap();
390 assert_eq!(window.duration, Duration::from_secs(600));
391 assert_eq!(window.window_type, WindowType::Sliding);
392 }
393
394 #[test]
395 fn test_parse_stream_pattern_no_type() {
396 let input = r#"e: from stream("events")"#;
397 let result = parse_stream_pattern(input);
398
399 assert!(result.is_ok());
400 let (_, pattern) = result.unwrap();
401 assert_eq!(pattern.var_name, "e");
402 assert_eq!(pattern.event_type, None);
403 }
404
405 #[test]
406 fn test_invalid_window_type() {
407 let input = "over window(5 min, invalid)";
408 let result = parse_window_spec(input);
409
410 assert!(result.is_err());
411 }
412
413 #[test]
414 fn test_invalid_duration_unit() {
415 let input = "5 invalid_unit";
416 let result = parse_duration(input);
417
418 assert!(result.is_err());
419 }
420}
421
422pub fn parse_stream_join_pattern(input: &str) -> IResult<&str, StreamJoinPattern> {
430 use nom::bytes::complete::tag;
431
432 let (input, left) = parse_stream_pattern(input)?;
434
435 let (input, _) = multispace0(input)?;
437 let (input, _) = tag("&&")(input)?;
438 let (input, _) = multispace0(input)?;
439
440 let (input, right) = parse_stream_pattern(input)?;
442
443 Ok((
446 input,
447 StreamJoinPattern {
448 left,
449 right,
450 join_conditions: Vec::new(),
451 },
452 ))
453}
454
455pub fn parse_join_condition(input: &str) -> IResult<&str, JoinCondition> {
463 use nom::branch::alt;
464 use nom::bytes::complete::tag;
465
466 let (input, left_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
468 let (input, _) = char('.')(input)?;
469 let (input, left_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
470
471 let (input, _) = multispace0(input)?;
472
473 let (input, op) = alt((
475 tag("=="),
476 tag("!="),
477 tag("<="),
478 tag(">="),
479 tag("<"),
480 tag(">"),
481 ))
482 .parse(input)?;
483
484 let (input, _) = multispace0(input)?;
485
486 let (input, right_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
488 let (input, _) = char('.')(input)?;
489 let (input, right_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
490
491 let condition = match op {
493 "==" => JoinCondition::Equality {
494 left_field: format!("{}.{}", left_var, left_field),
495 right_field: format!("{}.{}", right_var, right_field),
496 },
497 ">" => {
498 if left_field.contains("time") || right_field.contains("time") {
499 JoinCondition::Temporal {
500 operator: TemporalOp::After,
501 left_field: format!("{}.{}", left_var, left_field),
502 right_field: format!("{}.{}", right_var, right_field),
503 }
504 } else {
505 JoinCondition::Expression(format!(
506 "{}.{} > {}.{}",
507 left_var, left_field, right_var, right_field
508 ))
509 }
510 }
511 "<" => {
512 if left_field.contains("time") || right_field.contains("time") {
513 JoinCondition::Temporal {
514 operator: TemporalOp::Before,
515 left_field: format!("{}.{}", left_var, left_field),
516 right_field: format!("{}.{}", right_var, right_field),
517 }
518 } else {
519 JoinCondition::Expression(format!(
520 "{}.{} < {}.{}",
521 left_var, left_field, right_var, right_field
522 ))
523 }
524 }
525 _ => JoinCondition::Expression(format!(
526 "{}.{} {} {}.{}",
527 left_var, left_field, op, right_var, right_field
528 )),
529 };
530
531 Ok((input, condition))
532}
533
534#[cfg(test)]
535mod join_tests {
536 use super::*;
537
538 #[test]
539 fn test_parse_join_condition_equality() {
540 let input = "click.user_id == purchase.user_id";
541 let result = parse_join_condition(input);
542
543 assert!(result.is_ok());
544 let (_, condition) = result.unwrap();
545 match condition {
546 JoinCondition::Equality {
547 left_field,
548 right_field,
549 } => {
550 assert_eq!(left_field, "click.user_id");
551 assert_eq!(right_field, "purchase.user_id");
552 }
553 _ => panic!("Expected Equality condition"),
554 }
555 }
556
557 #[test]
558 fn test_parse_join_condition_temporal() {
559 let input = "purchase.timestamp > click.timestamp";
560 let result = parse_join_condition(input);
561
562 assert!(result.is_ok());
563 let (_, condition) = result.unwrap();
564 match condition {
565 JoinCondition::Temporal {
566 operator,
567 left_field,
568 right_field,
569 } => {
570 assert_eq!(operator, TemporalOp::After);
571 assert_eq!(left_field, "purchase.timestamp");
572 assert_eq!(right_field, "click.timestamp");
573 }
574 _ => panic!("Expected Temporal condition"),
575 }
576 }
577
578 #[test]
579 fn test_parse_stream_join_pattern() {
580 let input = r#"click: ClickEvent from stream("clicks") over window(10 min, sliding) && purchase: PurchaseEvent from stream("purchases") over window(10 min, sliding)"#;
581 let result = parse_stream_join_pattern(input);
582
583 assert!(result.is_ok());
584 let (_, join_pattern) = result.unwrap();
585
586 assert_eq!(join_pattern.left.var_name, "click");
587 assert_eq!(join_pattern.left.event_type, Some("ClickEvent".to_string()));
588 assert_eq!(join_pattern.left.source.stream_name, "clicks");
589
590 assert_eq!(join_pattern.right.var_name, "purchase");
591 assert_eq!(
592 join_pattern.right.event_type,
593 Some("PurchaseEvent".to_string())
594 );
595 assert_eq!(join_pattern.right.source.stream_name, "purchases");
596 }
597}