Skip to main content

rust_rule_engine/parser/grl/
stream_syntax.rs

1// Stream syntax parser for GRL
2//
3// This module implements parsing for stream-related GRL syntax:
4// - from stream("name")
5// - over window(duration, type)
6// - Stream patterns with variable bindings
7
8use nom::{
9    bytes::complete::{tag, take_while1},
10    character::complete::{alpha1, char, digit1, multispace0, multispace1},
11    combinator::opt,
12    sequence::delimited,
13    IResult, Parser,
14};
15use std::time::Duration;
16
17// Re-export WindowType from streaming module when available
18#[cfg(feature = "streaming")]
19pub use crate::streaming::window::WindowType;
20
21// Fallback WindowType for when streaming feature is not enabled
22#[cfg(not(feature = "streaming"))]
23#[derive(Debug, Clone, PartialEq, Copy)]
24pub enum WindowType {
25    Sliding,
26    Tumbling,
27    Session { timeout: Duration },
28}
29
30/// Stream source specification
31#[derive(Debug, Clone, PartialEq)]
32pub struct StreamSource {
33    pub stream_name: String,
34    pub window: Option<WindowSpec>,
35}
36
37/// Window specification
38#[derive(Debug, Clone, PartialEq)]
39pub struct WindowSpec {
40    pub duration: Duration,
41    pub window_type: WindowType,
42}
43
44/// Stream pattern with variable binding
45#[derive(Debug, Clone, PartialEq)]
46pub struct StreamPattern {
47    pub var_name: String,
48    pub event_type: Option<String>,
49    pub source: StreamSource,
50}
51
52/// Stream join specification
53#[derive(Debug, Clone, PartialEq)]
54pub struct StreamJoinPattern {
55    pub left: StreamPattern,
56    pub right: StreamPattern,
57    pub join_conditions: Vec<JoinCondition>,
58}
59
60/// Join condition between two streams
61#[derive(Debug, Clone, PartialEq)]
62pub enum JoinCondition {
63    /// Equality condition: left.field == right.field
64    Equality {
65        left_field: String,
66        right_field: String,
67    },
68    /// Custom expression condition
69    Expression(String),
70    /// Temporal constraint: right.time > left.time
71    Temporal {
72        operator: TemporalOp,
73        left_field: String,
74        right_field: String,
75    },
76}
77
78/// Temporal operators for stream joins
79#[derive(Debug, Clone, PartialEq)]
80pub enum TemporalOp {
81    Before, // left.time < right.time
82    After,  // left.time > right.time
83    Within, // abs(left.time - right.time) < duration
84}
85
86/// Parse: from stream("stream-name")
87///
88/// # Example
89/// ```text
90/// from stream("user-events")
91/// from stream("sensor-readings")
92/// ```
93pub fn parse_stream_source(input: &str) -> IResult<&str, StreamSource> {
94    let (input, _) = multispace0(input)?;
95    let (input, _) = tag("from")(input)?;
96    let (input, _) = multispace1(input)?;
97    let (input, _) = tag("stream")(input)?;
98    let (input, _) = multispace0(input)?;
99
100    // Parse stream name in parentheses: stream("name")
101    let (input, stream_name) = delimited(
102        (char('('), multispace0, char('"')),
103        take_while1(|c: char| c != '"'),
104        (char('"'), multispace0, char(')')),
105    )
106    .parse(input)?;
107
108    // Optional window specification
109    let (input, window) = opt(parse_window_spec).parse(input)?;
110
111    Ok((
112        input,
113        StreamSource {
114            stream_name: stream_name.to_string(),
115            window,
116        },
117    ))
118}
119
120/// Parse: over window(5 min, sliding)
121///
122/// # Example
123/// ```text
124/// over window(5 min, sliding)
125/// over window(1 hour, tumbling)
126/// over window(30 seconds, sliding)
127/// ```
128pub fn parse_window_spec(input: &str) -> IResult<&str, WindowSpec> {
129    let (input, _) = multispace0(input)?;
130    let (input, _) = tag("over")(input)?;
131    let (input, _) = multispace1(input)?;
132    let (input, _) = tag("window")(input)?;
133    let (input, _) = multispace0(input)?;
134    let (input, _) = char('(')(input)?;
135    let (input, _) = multispace0(input)?;
136
137    // Parse duration
138    let (input, duration) = parse_duration(input)?;
139
140    let (input, _) = multispace0(input)?;
141    let (input, _) = char(',')(input)?;
142    let (input, _) = multispace0(input)?;
143
144    // Parse window type
145    let (input, window_type) = parse_window_type(input)?;
146
147    let (input, _) = multispace0(input)?;
148    let (input, _) = char(')')(input)?;
149
150    Ok((
151        input,
152        WindowSpec {
153            duration,
154            window_type,
155        },
156    ))
157}
158
159/// Parse duration: "5 min", "10 seconds", "1 hour", etc.
160///
161/// # Supported units
162/// - ms, milliseconds, millisecond
163/// - sec, second, seconds
164/// - min, minute, minutes
165/// - hour, hours
166pub fn parse_duration(input: &str) -> IResult<&str, Duration> {
167    let (input, value) = digit1(input)?;
168    let (input, _) = multispace1(input)?;
169    let (input, unit) = alpha1(input)?;
170
171    let value: u64 = value.parse().map_err(|_| {
172        nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
173    })?;
174
175    let duration = match unit {
176        "ms" | "milliseconds" | "millisecond" => Duration::from_millis(value),
177        "sec" | "second" | "seconds" => Duration::from_secs(value),
178        "min" | "minute" | "minutes" => Duration::from_secs(value * 60),
179        "hour" | "hours" => Duration::from_secs(value * 3600),
180        _ => {
181            return Err(nom::Err::Error(nom::error::Error::new(
182                input,
183                nom::error::ErrorKind::Tag,
184            )))
185        }
186    };
187
188    Ok((input, duration))
189}
190
191/// Parse window type: "sliding" or "tumbling"
192pub fn parse_window_type(input: &str) -> IResult<&str, WindowType> {
193    let (input, type_str) = alpha1(input)?;
194
195    let window_type = match type_str {
196        "sliding" => WindowType::Sliding,
197        "tumbling" => WindowType::Tumbling,
198        _ => {
199            return Err(nom::Err::Error(nom::error::Error::new(
200                input,
201                nom::error::ErrorKind::Tag,
202            )))
203        }
204    };
205
206    Ok((input, window_type))
207}
208
209/// Parse complete stream pattern
210///
211/// # Example
212/// ```text
213/// event: EventType from stream("events") over window(5 min, sliding)
214/// reading: TempReading from stream("sensors")
215/// ```
216pub fn parse_stream_pattern(input: &str) -> IResult<&str, StreamPattern> {
217    // Parse variable binding: event:
218    let (input, var_name) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
219    let (input, _) = multispace0(input)?;
220    let (input, _) = char(':')(input)?;
221    let (input, _) = multispace0(input)?;
222
223    // Optional event type (but not "from" keyword)
224    let (input, event_type) = {
225        let checkpoint = input;
226        match take_while1::<_, _, nom::error::Error<&str>>(|c: char| {
227            c.is_alphanumeric() || c == '_'
228        })(input)
229        {
230            Ok((remaining, name)) if name != "from" => (remaining, Some(name)),
231            _ => (checkpoint, None),
232        }
233    };
234
235    let (input, _) = multispace0(input)?;
236
237    // Parse stream source
238    let (input, source) = parse_stream_source(input)?;
239
240    Ok((
241        input,
242        StreamPattern {
243            var_name: var_name.to_string(),
244            event_type: event_type.map(|s| s.to_string()),
245            source,
246        },
247    ))
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn test_parse_stream_source_basic() {
256        let input = r#"from stream("user-events")"#;
257        let result = parse_stream_source(input);
258
259        assert!(result.is_ok());
260        let (_, source) = result.unwrap();
261        assert_eq!(source.stream_name, "user-events");
262        assert!(source.window.is_none());
263    }
264
265    #[test]
266    fn test_parse_stream_source_with_spaces() {
267        let input = r#"  from   stream  (  "sensor-data"  )  "#;
268        let result = parse_stream_source(input);
269
270        assert!(result.is_ok());
271        let (_, source) = result.unwrap();
272        assert_eq!(source.stream_name, "sensor-data");
273    }
274
275    #[test]
276    fn test_parse_duration_seconds() {
277        let tests = vec![
278            ("5 seconds", Duration::from_secs(5)),
279            ("10 sec", Duration::from_secs(10)),
280            ("1 second", Duration::from_secs(1)),
281        ];
282
283        for (input, expected) in tests {
284            let result = parse_duration(input);
285            assert!(result.is_ok(), "Failed to parse: {}", input);
286            let (_, duration) = result.unwrap();
287            assert_eq!(duration, expected);
288        }
289    }
290
291    #[test]
292    fn test_parse_duration_minutes() {
293        let tests = vec![
294            ("5 min", Duration::from_secs(300)),
295            ("10 minutes", Duration::from_secs(600)),
296            ("1 minute", Duration::from_secs(60)),
297        ];
298
299        for (input, expected) in tests {
300            let result = parse_duration(input);
301            assert!(result.is_ok());
302            let (_, duration) = result.unwrap();
303            assert_eq!(duration, expected);
304        }
305    }
306
307    #[test]
308    fn test_parse_duration_hours() {
309        let input = "1 hour";
310        let result = parse_duration(input);
311
312        assert!(result.is_ok());
313        let (_, duration) = result.unwrap();
314        assert_eq!(duration, Duration::from_secs(3600));
315    }
316
317    #[test]
318    fn test_parse_duration_milliseconds() {
319        let input = "500 ms";
320        let result = parse_duration(input);
321
322        assert!(result.is_ok());
323        let (_, duration) = result.unwrap();
324        assert_eq!(duration, Duration::from_millis(500));
325    }
326
327    #[test]
328    fn test_parse_window_type() {
329        let tests = vec![
330            ("sliding", WindowType::Sliding),
331            ("tumbling", WindowType::Tumbling),
332        ];
333
334        for (input, expected) in tests {
335            let result = parse_window_type(input);
336            assert!(result.is_ok());
337            let (_, window_type) = result.unwrap();
338            assert_eq!(window_type, expected);
339        }
340    }
341
342    #[test]
343    fn test_parse_window_spec() {
344        let input = "over window(5 min, sliding)";
345        let result = parse_window_spec(input);
346
347        assert!(result.is_ok());
348        let (_, spec) = result.unwrap();
349        assert_eq!(spec.duration, Duration::from_secs(300));
350        assert_eq!(spec.window_type, WindowType::Sliding);
351    }
352
353    #[test]
354    fn test_parse_window_spec_tumbling() {
355        let input = "over window(1 hour, tumbling)";
356        let result = parse_window_spec(input);
357
358        assert!(result.is_ok());
359        let (_, spec) = result.unwrap();
360        assert_eq!(spec.duration, Duration::from_secs(3600));
361        assert_eq!(spec.window_type, WindowType::Tumbling);
362    }
363
364    #[test]
365    fn test_parse_stream_pattern_simple() {
366        let input = r#"event: LoginEvent from stream("logins")"#;
367        let result = parse_stream_pattern(input);
368
369        assert!(result.is_ok());
370        let (_, pattern) = result.unwrap();
371        assert_eq!(pattern.var_name, "event");
372        assert_eq!(pattern.event_type, Some("LoginEvent".to_string()));
373        assert_eq!(pattern.source.stream_name, "logins");
374        assert!(pattern.source.window.is_none());
375    }
376
377    #[test]
378    fn test_parse_stream_pattern_with_window() {
379        let input = r#"reading: TempReading from stream("sensors") over window(10 min, sliding)"#;
380        let result = parse_stream_pattern(input);
381
382        assert!(result.is_ok());
383        let (_, pattern) = result.unwrap();
384        assert_eq!(pattern.var_name, "reading");
385        assert_eq!(pattern.event_type, Some("TempReading".to_string()));
386        assert_eq!(pattern.source.stream_name, "sensors");
387        assert!(pattern.source.window.is_some());
388
389        let window = pattern.source.window.unwrap();
390        assert_eq!(window.duration, Duration::from_secs(600));
391        assert_eq!(window.window_type, WindowType::Sliding);
392    }
393
394    #[test]
395    fn test_parse_stream_pattern_no_type() {
396        let input = r#"e: from stream("events")"#;
397        let result = parse_stream_pattern(input);
398
399        assert!(result.is_ok());
400        let (_, pattern) = result.unwrap();
401        assert_eq!(pattern.var_name, "e");
402        assert_eq!(pattern.event_type, None);
403    }
404
405    #[test]
406    fn test_invalid_window_type() {
407        let input = "over window(5 min, invalid)";
408        let result = parse_window_spec(input);
409
410        assert!(result.is_err());
411    }
412
413    #[test]
414    fn test_invalid_duration_unit() {
415        let input = "5 invalid_unit";
416        let result = parse_duration(input);
417
418        assert!(result.is_err());
419    }
420}
421
422/// Parse stream join pattern with && operator
423///
424/// # Example
425/// ```text
426/// click: ClickEvent from stream("clicks") over window(10 min, sliding) &&
427/// purchase: PurchaseEvent from stream("purchases") over window(10 min, sliding)
428/// ```
429pub fn parse_stream_join_pattern(input: &str) -> IResult<&str, StreamJoinPattern> {
430    use nom::bytes::complete::tag;
431
432    // Parse left stream pattern
433    let (input, left) = parse_stream_pattern(input)?;
434
435    // Expect && operator
436    let (input, _) = multispace0(input)?;
437    let (input, _) = tag("&&")(input)?;
438    let (input, _) = multispace0(input)?;
439
440    // Parse right stream pattern
441    let (input, right) = parse_stream_pattern(input)?;
442
443    // Join conditions will be parsed separately from additional && clauses
444    // For now, return empty conditions (to be filled by caller)
445    Ok((
446        input,
447        StreamJoinPattern {
448            left,
449            right,
450            join_conditions: Vec::new(),
451        },
452    ))
453}
454
455/// Parse a join condition like "click.user_id == purchase.user_id"
456///
457/// # Example
458/// ```text
459/// click.user_id == purchase.user_id
460/// purchase.timestamp > click.timestamp
461/// ```
462pub fn parse_join_condition(input: &str) -> IResult<&str, JoinCondition> {
463    use nom::branch::alt;
464    use nom::bytes::complete::tag;
465
466    // Parse left side: variable.field
467    let (input, left_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
468    let (input, _) = char('.')(input)?;
469    let (input, left_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
470
471    let (input, _) = multispace0(input)?;
472
473    // Parse operator
474    let (input, op) = alt((
475        tag("=="),
476        tag("!="),
477        tag("<="),
478        tag(">="),
479        tag("<"),
480        tag(">"),
481    ))
482    .parse(input)?;
483
484    let (input, _) = multispace0(input)?;
485
486    // Parse right side: variable.field
487    let (input, right_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
488    let (input, _) = char('.')(input)?;
489    let (input, right_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
490
491    // Construct condition based on operator
492    let condition = match op {
493        "==" => JoinCondition::Equality {
494            left_field: format!("{}.{}", left_var, left_field),
495            right_field: format!("{}.{}", right_var, right_field),
496        },
497        ">" => {
498            if left_field.contains("time") || right_field.contains("time") {
499                JoinCondition::Temporal {
500                    operator: TemporalOp::After,
501                    left_field: format!("{}.{}", left_var, left_field),
502                    right_field: format!("{}.{}", right_var, right_field),
503                }
504            } else {
505                JoinCondition::Expression(format!(
506                    "{}.{} > {}.{}",
507                    left_var, left_field, right_var, right_field
508                ))
509            }
510        }
511        "<" => {
512            if left_field.contains("time") || right_field.contains("time") {
513                JoinCondition::Temporal {
514                    operator: TemporalOp::Before,
515                    left_field: format!("{}.{}", left_var, left_field),
516                    right_field: format!("{}.{}", right_var, right_field),
517                }
518            } else {
519                JoinCondition::Expression(format!(
520                    "{}.{} < {}.{}",
521                    left_var, left_field, right_var, right_field
522                ))
523            }
524        }
525        _ => JoinCondition::Expression(format!(
526            "{}.{} {} {}.{}",
527            left_var, left_field, op, right_var, right_field
528        )),
529    };
530
531    Ok((input, condition))
532}
533
534#[cfg(test)]
535mod join_tests {
536    use super::*;
537
538    #[test]
539    fn test_parse_join_condition_equality() {
540        let input = "click.user_id == purchase.user_id";
541        let result = parse_join_condition(input);
542
543        assert!(result.is_ok());
544        let (_, condition) = result.unwrap();
545        match condition {
546            JoinCondition::Equality {
547                left_field,
548                right_field,
549            } => {
550                assert_eq!(left_field, "click.user_id");
551                assert_eq!(right_field, "purchase.user_id");
552            }
553            _ => panic!("Expected Equality condition"),
554        }
555    }
556
557    #[test]
558    fn test_parse_join_condition_temporal() {
559        let input = "purchase.timestamp > click.timestamp";
560        let result = parse_join_condition(input);
561
562        assert!(result.is_ok());
563        let (_, condition) = result.unwrap();
564        match condition {
565            JoinCondition::Temporal {
566                operator,
567                left_field,
568                right_field,
569            } => {
570                assert_eq!(operator, TemporalOp::After);
571                assert_eq!(left_field, "purchase.timestamp");
572                assert_eq!(right_field, "click.timestamp");
573            }
574            _ => panic!("Expected Temporal condition"),
575        }
576    }
577
578    #[test]
579    fn test_parse_stream_join_pattern() {
580        let input = r#"click: ClickEvent from stream("clicks") over window(10 min, sliding) && purchase: PurchaseEvent from stream("purchases") over window(10 min, sliding)"#;
581        let result = parse_stream_join_pattern(input);
582
583        assert!(result.is_ok());
584        let (_, join_pattern) = result.unwrap();
585
586        assert_eq!(join_pattern.left.var_name, "click");
587        assert_eq!(join_pattern.left.event_type, Some("ClickEvent".to_string()));
588        assert_eq!(join_pattern.left.source.stream_name, "clicks");
589
590        assert_eq!(join_pattern.right.var_name, "purchase");
591        assert_eq!(
592            join_pattern.right.event_type,
593            Some("PurchaseEvent".to_string())
594        );
595        assert_eq!(join_pattern.right.source.stream_name, "purchases");
596    }
597}