1use nom::{
9 bytes::complete::{tag, take_while1},
10 character::complete::{alpha1, char, digit1, multispace0, multispace1},
11 combinator::opt,
12 sequence::{delimited, tuple},
13 IResult,
14};
15use std::time::Duration;
16
17#[cfg(feature = "streaming")]
19pub use crate::streaming::window::WindowType;
20
21#[cfg(not(feature = "streaming"))]
23#[derive(Debug, Clone, PartialEq, Copy)]
24pub enum WindowType {
25 Sliding,
26 Tumbling,
27 Session { timeout: Duration },
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct StreamSource {
33 pub stream_name: String,
34 pub window: Option<WindowSpec>,
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub struct WindowSpec {
40 pub duration: Duration,
41 pub window_type: WindowType,
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub struct StreamPattern {
47 pub var_name: String,
48 pub event_type: Option<String>,
49 pub source: StreamSource,
50}
51
52#[derive(Debug, Clone, PartialEq)]
54pub struct StreamJoinPattern {
55 pub left: StreamPattern,
56 pub right: StreamPattern,
57 pub join_conditions: Vec<JoinCondition>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
62pub enum JoinCondition {
63 Equality {
65 left_field: String,
66 right_field: String,
67 },
68 Expression(String),
70 Temporal {
72 operator: TemporalOp,
73 left_field: String,
74 right_field: String,
75 },
76}
77
78#[derive(Debug, Clone, PartialEq)]
80pub enum TemporalOp {
81 Before, After, Within, }
85
86pub fn parse_stream_source(input: &str) -> IResult<&str, StreamSource> {
94 let (input, _) = multispace0(input)?;
95 let (input, _) = tag("from")(input)?;
96 let (input, _) = multispace1(input)?;
97 let (input, _) = tag("stream")(input)?;
98 let (input, _) = multispace0(input)?;
99
100 let (input, stream_name) = delimited(
102 tuple((char('('), multispace0, char('"'))),
103 take_while1(|c: char| c != '"'),
104 tuple((char('"'), multispace0, char(')'))),
105 )(input)?;
106
107 let (input, window) = opt(parse_window_spec)(input)?;
109
110 Ok((
111 input,
112 StreamSource {
113 stream_name: stream_name.to_string(),
114 window,
115 },
116 ))
117}
118
119pub fn parse_window_spec(input: &str) -> IResult<&str, WindowSpec> {
128 let (input, _) = multispace0(input)?;
129 let (input, _) = tag("over")(input)?;
130 let (input, _) = multispace1(input)?;
131 let (input, _) = tag("window")(input)?;
132 let (input, _) = multispace0(input)?;
133 let (input, _) = char('(')(input)?;
134 let (input, _) = multispace0(input)?;
135
136 let (input, duration) = parse_duration(input)?;
138
139 let (input, _) = multispace0(input)?;
140 let (input, _) = char(',')(input)?;
141 let (input, _) = multispace0(input)?;
142
143 let (input, window_type) = parse_window_type(input)?;
145
146 let (input, _) = multispace0(input)?;
147 let (input, _) = char(')')(input)?;
148
149 Ok((
150 input,
151 WindowSpec {
152 duration,
153 window_type,
154 },
155 ))
156}
157
158pub fn parse_duration(input: &str) -> IResult<&str, Duration> {
166 let (input, value) = digit1(input)?;
167 let (input, _) = multispace1(input)?;
168 let (input, unit) = alpha1(input)?;
169
170 let value: u64 = value.parse().map_err(|_| {
171 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
172 })?;
173
174 let duration = match unit {
175 "ms" | "milliseconds" | "millisecond" => Duration::from_millis(value),
176 "sec" | "second" | "seconds" => Duration::from_secs(value),
177 "min" | "minute" | "minutes" => Duration::from_secs(value * 60),
178 "hour" | "hours" => Duration::from_secs(value * 3600),
179 _ => {
180 return Err(nom::Err::Error(nom::error::Error::new(
181 input,
182 nom::error::ErrorKind::Tag,
183 )))
184 }
185 };
186
187 Ok((input, duration))
188}
189
190pub fn parse_window_type(input: &str) -> IResult<&str, WindowType> {
192 let (input, type_str) = alpha1(input)?;
193
194 let window_type = match type_str {
195 "sliding" => WindowType::Sliding,
196 "tumbling" => WindowType::Tumbling,
197 _ => {
198 return Err(nom::Err::Error(nom::error::Error::new(
199 input,
200 nom::error::ErrorKind::Tag,
201 )))
202 }
203 };
204
205 Ok((input, window_type))
206}
207
208pub fn parse_stream_pattern(input: &str) -> IResult<&str, StreamPattern> {
216 let (input, var_name) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
218 let (input, _) = multispace0(input)?;
219 let (input, _) = char(':')(input)?;
220 let (input, _) = multispace0(input)?;
221
222 let (input, event_type) = {
224 let checkpoint = input;
225 match take_while1::<_, _, nom::error::Error<&str>>(|c: char| {
226 c.is_alphanumeric() || c == '_'
227 })(input)
228 {
229 Ok((remaining, name)) if name != "from" => (remaining, Some(name)),
230 _ => (checkpoint, None),
231 }
232 };
233
234 let (input, _) = multispace0(input)?;
235
236 let (input, source) = parse_stream_source(input)?;
238
239 Ok((
240 input,
241 StreamPattern {
242 var_name: var_name.to_string(),
243 event_type: event_type.map(|s| s.to_string()),
244 source,
245 },
246 ))
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn test_parse_stream_source_basic() {
255 let input = r#"from stream("user-events")"#;
256 let result = parse_stream_source(input);
257
258 assert!(result.is_ok());
259 let (_, source) = result.unwrap();
260 assert_eq!(source.stream_name, "user-events");
261 assert!(source.window.is_none());
262 }
263
264 #[test]
265 fn test_parse_stream_source_with_spaces() {
266 let input = r#" from stream ( "sensor-data" ) "#;
267 let result = parse_stream_source(input);
268
269 assert!(result.is_ok());
270 let (_, source) = result.unwrap();
271 assert_eq!(source.stream_name, "sensor-data");
272 }
273
274 #[test]
275 fn test_parse_duration_seconds() {
276 let tests = vec![
277 ("5 seconds", Duration::from_secs(5)),
278 ("10 sec", Duration::from_secs(10)),
279 ("1 second", Duration::from_secs(1)),
280 ];
281
282 for (input, expected) in tests {
283 let result = parse_duration(input);
284 assert!(result.is_ok(), "Failed to parse: {}", input);
285 let (_, duration) = result.unwrap();
286 assert_eq!(duration, expected);
287 }
288 }
289
290 #[test]
291 fn test_parse_duration_minutes() {
292 let tests = vec![
293 ("5 min", Duration::from_secs(300)),
294 ("10 minutes", Duration::from_secs(600)),
295 ("1 minute", Duration::from_secs(60)),
296 ];
297
298 for (input, expected) in tests {
299 let result = parse_duration(input);
300 assert!(result.is_ok());
301 let (_, duration) = result.unwrap();
302 assert_eq!(duration, expected);
303 }
304 }
305
306 #[test]
307 fn test_parse_duration_hours() {
308 let input = "1 hour";
309 let result = parse_duration(input);
310
311 assert!(result.is_ok());
312 let (_, duration) = result.unwrap();
313 assert_eq!(duration, Duration::from_secs(3600));
314 }
315
316 #[test]
317 fn test_parse_duration_milliseconds() {
318 let input = "500 ms";
319 let result = parse_duration(input);
320
321 assert!(result.is_ok());
322 let (_, duration) = result.unwrap();
323 assert_eq!(duration, Duration::from_millis(500));
324 }
325
326 #[test]
327 fn test_parse_window_type() {
328 let tests = vec![
329 ("sliding", WindowType::Sliding),
330 ("tumbling", WindowType::Tumbling),
331 ];
332
333 for (input, expected) in tests {
334 let result = parse_window_type(input);
335 assert!(result.is_ok());
336 let (_, window_type) = result.unwrap();
337 assert_eq!(window_type, expected);
338 }
339 }
340
341 #[test]
342 fn test_parse_window_spec() {
343 let input = "over window(5 min, sliding)";
344 let result = parse_window_spec(input);
345
346 assert!(result.is_ok());
347 let (_, spec) = result.unwrap();
348 assert_eq!(spec.duration, Duration::from_secs(300));
349 assert_eq!(spec.window_type, WindowType::Sliding);
350 }
351
352 #[test]
353 fn test_parse_window_spec_tumbling() {
354 let input = "over window(1 hour, tumbling)";
355 let result = parse_window_spec(input);
356
357 assert!(result.is_ok());
358 let (_, spec) = result.unwrap();
359 assert_eq!(spec.duration, Duration::from_secs(3600));
360 assert_eq!(spec.window_type, WindowType::Tumbling);
361 }
362
363 #[test]
364 fn test_parse_stream_pattern_simple() {
365 let input = r#"event: LoginEvent from stream("logins")"#;
366 let result = parse_stream_pattern(input);
367
368 assert!(result.is_ok());
369 let (_, pattern) = result.unwrap();
370 assert_eq!(pattern.var_name, "event");
371 assert_eq!(pattern.event_type, Some("LoginEvent".to_string()));
372 assert_eq!(pattern.source.stream_name, "logins");
373 assert!(pattern.source.window.is_none());
374 }
375
376 #[test]
377 fn test_parse_stream_pattern_with_window() {
378 let input = r#"reading: TempReading from stream("sensors") over window(10 min, sliding)"#;
379 let result = parse_stream_pattern(input);
380
381 assert!(result.is_ok());
382 let (_, pattern) = result.unwrap();
383 assert_eq!(pattern.var_name, "reading");
384 assert_eq!(pattern.event_type, Some("TempReading".to_string()));
385 assert_eq!(pattern.source.stream_name, "sensors");
386 assert!(pattern.source.window.is_some());
387
388 let window = pattern.source.window.unwrap();
389 assert_eq!(window.duration, Duration::from_secs(600));
390 assert_eq!(window.window_type, WindowType::Sliding);
391 }
392
393 #[test]
394 fn test_parse_stream_pattern_no_type() {
395 let input = r#"e: from stream("events")"#;
396 let result = parse_stream_pattern(input);
397
398 assert!(result.is_ok());
399 let (_, pattern) = result.unwrap();
400 assert_eq!(pattern.var_name, "e");
401 assert_eq!(pattern.event_type, None);
402 }
403
404 #[test]
405 fn test_invalid_window_type() {
406 let input = "over window(5 min, invalid)";
407 let result = parse_window_spec(input);
408
409 assert!(result.is_err());
410 }
411
412 #[test]
413 fn test_invalid_duration_unit() {
414 let input = "5 invalid_unit";
415 let result = parse_duration(input);
416
417 assert!(result.is_err());
418 }
419}
420
421pub fn parse_stream_join_pattern(input: &str) -> IResult<&str, StreamJoinPattern> {
429 use nom::bytes::complete::tag;
430
431 let (input, left) = parse_stream_pattern(input)?;
433
434 let (input, _) = multispace0(input)?;
436 let (input, _) = tag("&&")(input)?;
437 let (input, _) = multispace0(input)?;
438
439 let (input, right) = parse_stream_pattern(input)?;
441
442 Ok((
445 input,
446 StreamJoinPattern {
447 left,
448 right,
449 join_conditions: Vec::new(),
450 },
451 ))
452}
453
454pub fn parse_join_condition(input: &str) -> IResult<&str, JoinCondition> {
462 use nom::branch::alt;
463 use nom::bytes::complete::tag;
464
465 let (input, left_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
467 let (input, _) = char('.')(input)?;
468 let (input, left_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
469
470 let (input, _) = multispace0(input)?;
471
472 let (input, op) = alt((
474 tag("=="),
475 tag("!="),
476 tag("<="),
477 tag(">="),
478 tag("<"),
479 tag(">"),
480 ))(input)?;
481
482 let (input, _) = multispace0(input)?;
483
484 let (input, right_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
486 let (input, _) = char('.')(input)?;
487 let (input, right_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
488
489 let condition = match op {
491 "==" => JoinCondition::Equality {
492 left_field: format!("{}.{}", left_var, left_field),
493 right_field: format!("{}.{}", right_var, right_field),
494 },
495 ">" => {
496 if left_field.contains("time") || right_field.contains("time") {
497 JoinCondition::Temporal {
498 operator: TemporalOp::After,
499 left_field: format!("{}.{}", left_var, left_field),
500 right_field: format!("{}.{}", right_var, right_field),
501 }
502 } else {
503 JoinCondition::Expression(format!(
504 "{}.{} > {}.{}",
505 left_var, left_field, right_var, right_field
506 ))
507 }
508 }
509 "<" => {
510 if left_field.contains("time") || right_field.contains("time") {
511 JoinCondition::Temporal {
512 operator: TemporalOp::Before,
513 left_field: format!("{}.{}", left_var, left_field),
514 right_field: format!("{}.{}", right_var, right_field),
515 }
516 } else {
517 JoinCondition::Expression(format!(
518 "{}.{} < {}.{}",
519 left_var, left_field, right_var, right_field
520 ))
521 }
522 }
523 _ => JoinCondition::Expression(format!(
524 "{}.{} {} {}.{}",
525 left_var, left_field, op, right_var, right_field
526 )),
527 };
528
529 Ok((input, condition))
530}
531
532#[cfg(test)]
533mod join_tests {
534 use super::*;
535
536 #[test]
537 fn test_parse_join_condition_equality() {
538 let input = "click.user_id == purchase.user_id";
539 let result = parse_join_condition(input);
540
541 assert!(result.is_ok());
542 let (_, condition) = result.unwrap();
543 match condition {
544 JoinCondition::Equality {
545 left_field,
546 right_field,
547 } => {
548 assert_eq!(left_field, "click.user_id");
549 assert_eq!(right_field, "purchase.user_id");
550 }
551 _ => panic!("Expected Equality condition"),
552 }
553 }
554
555 #[test]
556 fn test_parse_join_condition_temporal() {
557 let input = "purchase.timestamp > click.timestamp";
558 let result = parse_join_condition(input);
559
560 assert!(result.is_ok());
561 let (_, condition) = result.unwrap();
562 match condition {
563 JoinCondition::Temporal {
564 operator,
565 left_field,
566 right_field,
567 } => {
568 assert_eq!(operator, TemporalOp::After);
569 assert_eq!(left_field, "purchase.timestamp");
570 assert_eq!(right_field, "click.timestamp");
571 }
572 _ => panic!("Expected Temporal condition"),
573 }
574 }
575
576 #[test]
577 fn test_parse_stream_join_pattern() {
578 let input = r#"click: ClickEvent from stream("clicks") over window(10 min, sliding) && purchase: PurchaseEvent from stream("purchases") over window(10 min, sliding)"#;
579 let result = parse_stream_join_pattern(input);
580
581 assert!(result.is_ok());
582 let (_, join_pattern) = result.unwrap();
583
584 assert_eq!(join_pattern.left.var_name, "click");
585 assert_eq!(join_pattern.left.event_type, Some("ClickEvent".to_string()));
586 assert_eq!(join_pattern.left.source.stream_name, "clicks");
587
588 assert_eq!(join_pattern.right.var_name, "purchase");
589 assert_eq!(
590 join_pattern.right.event_type,
591 Some("PurchaseEvent".to_string())
592 );
593 assert_eq!(join_pattern.right.source.stream_name, "purchases");
594 }
595}