rust_rule_engine/parser/grl/
stream_syntax.rs

1// Stream syntax parser for GRL
2//
3// This module implements parsing for stream-related GRL syntax:
4// - from stream("name")
5// - over window(duration, type)
6// - Stream patterns with variable bindings
7
8use nom::{
9    bytes::complete::{tag, take_while1},
10    character::complete::{alpha1, char, digit1, multispace0, multispace1},
11    combinator::opt,
12    sequence::{delimited, tuple},
13    IResult,
14};
15use std::time::Duration;
16
17// Re-export WindowType from streaming module when available
18#[cfg(feature = "streaming")]
19pub use crate::streaming::window::WindowType;
20
21// Fallback WindowType for when streaming feature is not enabled
22#[cfg(not(feature = "streaming"))]
23#[derive(Debug, Clone, PartialEq, Copy)]
24pub enum WindowType {
25    Sliding,
26    Tumbling,
27    Session { timeout: Duration },
28}
29
30/// Stream source specification
31#[derive(Debug, Clone, PartialEq)]
32pub struct StreamSource {
33    pub stream_name: String,
34    pub window: Option<WindowSpec>,
35}
36
37/// Window specification
38#[derive(Debug, Clone, PartialEq)]
39pub struct WindowSpec {
40    pub duration: Duration,
41    pub window_type: WindowType,
42}
43
44/// Stream pattern with variable binding
45#[derive(Debug, Clone, PartialEq)]
46pub struct StreamPattern {
47    pub var_name: String,
48    pub event_type: Option<String>,
49    pub source: StreamSource,
50}
51
52/// Stream join specification
53#[derive(Debug, Clone, PartialEq)]
54pub struct StreamJoinPattern {
55    pub left: StreamPattern,
56    pub right: StreamPattern,
57    pub join_conditions: Vec<JoinCondition>,
58}
59
60/// Join condition between two streams
61#[derive(Debug, Clone, PartialEq)]
62pub enum JoinCondition {
63    /// Equality condition: left.field == right.field
64    Equality {
65        left_field: String,
66        right_field: String,
67    },
68    /// Custom expression condition
69    Expression(String),
70    /// Temporal constraint: right.time > left.time
71    Temporal {
72        operator: TemporalOp,
73        left_field: String,
74        right_field: String,
75    },
76}
77
78/// Temporal operators for stream joins
79#[derive(Debug, Clone, PartialEq)]
80pub enum TemporalOp {
81    Before, // left.time < right.time
82    After,  // left.time > right.time
83    Within, // abs(left.time - right.time) < duration
84}
85
86/// Parse: from stream("stream-name")
87///
88/// # Example
89/// ```text
90/// from stream("user-events")
91/// from stream("sensor-readings")
92/// ```
93pub fn parse_stream_source(input: &str) -> IResult<&str, StreamSource> {
94    let (input, _) = multispace0(input)?;
95    let (input, _) = tag("from")(input)?;
96    let (input, _) = multispace1(input)?;
97    let (input, _) = tag("stream")(input)?;
98    let (input, _) = multispace0(input)?;
99
100    // Parse stream name in parentheses: stream("name")
101    let (input, stream_name) = delimited(
102        tuple((char('('), multispace0, char('"'))),
103        take_while1(|c: char| c != '"'),
104        tuple((char('"'), multispace0, char(')'))),
105    )(input)?;
106
107    // Optional window specification
108    let (input, window) = opt(parse_window_spec)(input)?;
109
110    Ok((
111        input,
112        StreamSource {
113            stream_name: stream_name.to_string(),
114            window,
115        },
116    ))
117}
118
119/// Parse: over window(5 min, sliding)
120///
121/// # Example
122/// ```text
123/// over window(5 min, sliding)
124/// over window(1 hour, tumbling)
125/// over window(30 seconds, sliding)
126/// ```
127pub fn parse_window_spec(input: &str) -> IResult<&str, WindowSpec> {
128    let (input, _) = multispace0(input)?;
129    let (input, _) = tag("over")(input)?;
130    let (input, _) = multispace1(input)?;
131    let (input, _) = tag("window")(input)?;
132    let (input, _) = multispace0(input)?;
133    let (input, _) = char('(')(input)?;
134    let (input, _) = multispace0(input)?;
135
136    // Parse duration
137    let (input, duration) = parse_duration(input)?;
138
139    let (input, _) = multispace0(input)?;
140    let (input, _) = char(',')(input)?;
141    let (input, _) = multispace0(input)?;
142
143    // Parse window type
144    let (input, window_type) = parse_window_type(input)?;
145
146    let (input, _) = multispace0(input)?;
147    let (input, _) = char(')')(input)?;
148
149    Ok((
150        input,
151        WindowSpec {
152            duration,
153            window_type,
154        },
155    ))
156}
157
158/// Parse duration: "5 min", "10 seconds", "1 hour", etc.
159///
160/// # Supported units
161/// - ms, milliseconds, millisecond
162/// - sec, second, seconds
163/// - min, minute, minutes
164/// - hour, hours
165pub fn parse_duration(input: &str) -> IResult<&str, Duration> {
166    let (input, value) = digit1(input)?;
167    let (input, _) = multispace1(input)?;
168    let (input, unit) = alpha1(input)?;
169
170    let value: u64 = value.parse().map_err(|_| {
171        nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
172    })?;
173
174    let duration = match unit {
175        "ms" | "milliseconds" | "millisecond" => Duration::from_millis(value),
176        "sec" | "second" | "seconds" => Duration::from_secs(value),
177        "min" | "minute" | "minutes" => Duration::from_secs(value * 60),
178        "hour" | "hours" => Duration::from_secs(value * 3600),
179        _ => {
180            return Err(nom::Err::Error(nom::error::Error::new(
181                input,
182                nom::error::ErrorKind::Tag,
183            )))
184        }
185    };
186
187    Ok((input, duration))
188}
189
190/// Parse window type: "sliding" or "tumbling"
191pub fn parse_window_type(input: &str) -> IResult<&str, WindowType> {
192    let (input, type_str) = alpha1(input)?;
193
194    let window_type = match type_str {
195        "sliding" => WindowType::Sliding,
196        "tumbling" => WindowType::Tumbling,
197        _ => {
198            return Err(nom::Err::Error(nom::error::Error::new(
199                input,
200                nom::error::ErrorKind::Tag,
201            )))
202        }
203    };
204
205    Ok((input, window_type))
206}
207
208/// Parse complete stream pattern
209///
210/// # Example
211/// ```text
212/// event: EventType from stream("events") over window(5 min, sliding)
213/// reading: TempReading from stream("sensors")
214/// ```
215pub fn parse_stream_pattern(input: &str) -> IResult<&str, StreamPattern> {
216    // Parse variable binding: event:
217    let (input, var_name) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
218    let (input, _) = multispace0(input)?;
219    let (input, _) = char(':')(input)?;
220    let (input, _) = multispace0(input)?;
221
222    // Optional event type (but not "from" keyword)
223    let (input, event_type) = {
224        let checkpoint = input;
225        match take_while1::<_, _, nom::error::Error<&str>>(|c: char| {
226            c.is_alphanumeric() || c == '_'
227        })(input)
228        {
229            Ok((remaining, name)) if name != "from" => (remaining, Some(name)),
230            _ => (checkpoint, None),
231        }
232    };
233
234    let (input, _) = multispace0(input)?;
235
236    // Parse stream source
237    let (input, source) = parse_stream_source(input)?;
238
239    Ok((
240        input,
241        StreamPattern {
242            var_name: var_name.to_string(),
243            event_type: event_type.map(|s| s.to_string()),
244            source,
245        },
246    ))
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn test_parse_stream_source_basic() {
255        let input = r#"from stream("user-events")"#;
256        let result = parse_stream_source(input);
257
258        assert!(result.is_ok());
259        let (_, source) = result.unwrap();
260        assert_eq!(source.stream_name, "user-events");
261        assert!(source.window.is_none());
262    }
263
264    #[test]
265    fn test_parse_stream_source_with_spaces() {
266        let input = r#"  from   stream  (  "sensor-data"  )  "#;
267        let result = parse_stream_source(input);
268
269        assert!(result.is_ok());
270        let (_, source) = result.unwrap();
271        assert_eq!(source.stream_name, "sensor-data");
272    }
273
274    #[test]
275    fn test_parse_duration_seconds() {
276        let tests = vec![
277            ("5 seconds", Duration::from_secs(5)),
278            ("10 sec", Duration::from_secs(10)),
279            ("1 second", Duration::from_secs(1)),
280        ];
281
282        for (input, expected) in tests {
283            let result = parse_duration(input);
284            assert!(result.is_ok(), "Failed to parse: {}", input);
285            let (_, duration) = result.unwrap();
286            assert_eq!(duration, expected);
287        }
288    }
289
290    #[test]
291    fn test_parse_duration_minutes() {
292        let tests = vec![
293            ("5 min", Duration::from_secs(300)),
294            ("10 minutes", Duration::from_secs(600)),
295            ("1 minute", Duration::from_secs(60)),
296        ];
297
298        for (input, expected) in tests {
299            let result = parse_duration(input);
300            assert!(result.is_ok());
301            let (_, duration) = result.unwrap();
302            assert_eq!(duration, expected);
303        }
304    }
305
306    #[test]
307    fn test_parse_duration_hours() {
308        let input = "1 hour";
309        let result = parse_duration(input);
310
311        assert!(result.is_ok());
312        let (_, duration) = result.unwrap();
313        assert_eq!(duration, Duration::from_secs(3600));
314    }
315
316    #[test]
317    fn test_parse_duration_milliseconds() {
318        let input = "500 ms";
319        let result = parse_duration(input);
320
321        assert!(result.is_ok());
322        let (_, duration) = result.unwrap();
323        assert_eq!(duration, Duration::from_millis(500));
324    }
325
326    #[test]
327    fn test_parse_window_type() {
328        let tests = vec![
329            ("sliding", WindowType::Sliding),
330            ("tumbling", WindowType::Tumbling),
331        ];
332
333        for (input, expected) in tests {
334            let result = parse_window_type(input);
335            assert!(result.is_ok());
336            let (_, window_type) = result.unwrap();
337            assert_eq!(window_type, expected);
338        }
339    }
340
341    #[test]
342    fn test_parse_window_spec() {
343        let input = "over window(5 min, sliding)";
344        let result = parse_window_spec(input);
345
346        assert!(result.is_ok());
347        let (_, spec) = result.unwrap();
348        assert_eq!(spec.duration, Duration::from_secs(300));
349        assert_eq!(spec.window_type, WindowType::Sliding);
350    }
351
352    #[test]
353    fn test_parse_window_spec_tumbling() {
354        let input = "over window(1 hour, tumbling)";
355        let result = parse_window_spec(input);
356
357        assert!(result.is_ok());
358        let (_, spec) = result.unwrap();
359        assert_eq!(spec.duration, Duration::from_secs(3600));
360        assert_eq!(spec.window_type, WindowType::Tumbling);
361    }
362
363    #[test]
364    fn test_parse_stream_pattern_simple() {
365        let input = r#"event: LoginEvent from stream("logins")"#;
366        let result = parse_stream_pattern(input);
367
368        assert!(result.is_ok());
369        let (_, pattern) = result.unwrap();
370        assert_eq!(pattern.var_name, "event");
371        assert_eq!(pattern.event_type, Some("LoginEvent".to_string()));
372        assert_eq!(pattern.source.stream_name, "logins");
373        assert!(pattern.source.window.is_none());
374    }
375
376    #[test]
377    fn test_parse_stream_pattern_with_window() {
378        let input = r#"reading: TempReading from stream("sensors") over window(10 min, sliding)"#;
379        let result = parse_stream_pattern(input);
380
381        assert!(result.is_ok());
382        let (_, pattern) = result.unwrap();
383        assert_eq!(pattern.var_name, "reading");
384        assert_eq!(pattern.event_type, Some("TempReading".to_string()));
385        assert_eq!(pattern.source.stream_name, "sensors");
386        assert!(pattern.source.window.is_some());
387
388        let window = pattern.source.window.unwrap();
389        assert_eq!(window.duration, Duration::from_secs(600));
390        assert_eq!(window.window_type, WindowType::Sliding);
391    }
392
393    #[test]
394    fn test_parse_stream_pattern_no_type() {
395        let input = r#"e: from stream("events")"#;
396        let result = parse_stream_pattern(input);
397
398        assert!(result.is_ok());
399        let (_, pattern) = result.unwrap();
400        assert_eq!(pattern.var_name, "e");
401        assert_eq!(pattern.event_type, None);
402    }
403
404    #[test]
405    fn test_invalid_window_type() {
406        let input = "over window(5 min, invalid)";
407        let result = parse_window_spec(input);
408
409        assert!(result.is_err());
410    }
411
412    #[test]
413    fn test_invalid_duration_unit() {
414        let input = "5 invalid_unit";
415        let result = parse_duration(input);
416
417        assert!(result.is_err());
418    }
419}
420
421/// Parse stream join pattern with && operator
422///
423/// # Example
424/// ```text
425/// click: ClickEvent from stream("clicks") over window(10 min, sliding) &&
426/// purchase: PurchaseEvent from stream("purchases") over window(10 min, sliding)
427/// ```
428pub fn parse_stream_join_pattern(input: &str) -> IResult<&str, StreamJoinPattern> {
429    use nom::bytes::complete::tag;
430
431    // Parse left stream pattern
432    let (input, left) = parse_stream_pattern(input)?;
433
434    // Expect && operator
435    let (input, _) = multispace0(input)?;
436    let (input, _) = tag("&&")(input)?;
437    let (input, _) = multispace0(input)?;
438
439    // Parse right stream pattern
440    let (input, right) = parse_stream_pattern(input)?;
441
442    // Join conditions will be parsed separately from additional && clauses
443    // For now, return empty conditions (to be filled by caller)
444    Ok((
445        input,
446        StreamJoinPattern {
447            left,
448            right,
449            join_conditions: Vec::new(),
450        },
451    ))
452}
453
454/// Parse a join condition like "click.user_id == purchase.user_id"
455///
456/// # Example
457/// ```text
458/// click.user_id == purchase.user_id
459/// purchase.timestamp > click.timestamp
460/// ```
461pub fn parse_join_condition(input: &str) -> IResult<&str, JoinCondition> {
462    use nom::branch::alt;
463    use nom::bytes::complete::tag;
464
465    // Parse left side: variable.field
466    let (input, left_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
467    let (input, _) = char('.')(input)?;
468    let (input, left_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
469
470    let (input, _) = multispace0(input)?;
471
472    // Parse operator
473    let (input, op) = alt((
474        tag("=="),
475        tag("!="),
476        tag("<="),
477        tag(">="),
478        tag("<"),
479        tag(">"),
480    ))(input)?;
481
482    let (input, _) = multispace0(input)?;
483
484    // Parse right side: variable.field
485    let (input, right_var) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
486    let (input, _) = char('.')(input)?;
487    let (input, right_field) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
488
489    // Construct condition based on operator
490    let condition = match op {
491        "==" => JoinCondition::Equality {
492            left_field: format!("{}.{}", left_var, left_field),
493            right_field: format!("{}.{}", right_var, right_field),
494        },
495        ">" => {
496            if left_field.contains("time") || right_field.contains("time") {
497                JoinCondition::Temporal {
498                    operator: TemporalOp::After,
499                    left_field: format!("{}.{}", left_var, left_field),
500                    right_field: format!("{}.{}", right_var, right_field),
501                }
502            } else {
503                JoinCondition::Expression(format!(
504                    "{}.{} > {}.{}",
505                    left_var, left_field, right_var, right_field
506                ))
507            }
508        }
509        "<" => {
510            if left_field.contains("time") || right_field.contains("time") {
511                JoinCondition::Temporal {
512                    operator: TemporalOp::Before,
513                    left_field: format!("{}.{}", left_var, left_field),
514                    right_field: format!("{}.{}", right_var, right_field),
515                }
516            } else {
517                JoinCondition::Expression(format!(
518                    "{}.{} < {}.{}",
519                    left_var, left_field, right_var, right_field
520                ))
521            }
522        }
523        _ => JoinCondition::Expression(format!(
524            "{}.{} {} {}.{}",
525            left_var, left_field, op, right_var, right_field
526        )),
527    };
528
529    Ok((input, condition))
530}
531
532#[cfg(test)]
533mod join_tests {
534    use super::*;
535
536    #[test]
537    fn test_parse_join_condition_equality() {
538        let input = "click.user_id == purchase.user_id";
539        let result = parse_join_condition(input);
540
541        assert!(result.is_ok());
542        let (_, condition) = result.unwrap();
543        match condition {
544            JoinCondition::Equality {
545                left_field,
546                right_field,
547            } => {
548                assert_eq!(left_field, "click.user_id");
549                assert_eq!(right_field, "purchase.user_id");
550            }
551            _ => panic!("Expected Equality condition"),
552        }
553    }
554
555    #[test]
556    fn test_parse_join_condition_temporal() {
557        let input = "purchase.timestamp > click.timestamp";
558        let result = parse_join_condition(input);
559
560        assert!(result.is_ok());
561        let (_, condition) = result.unwrap();
562        match condition {
563            JoinCondition::Temporal {
564                operator,
565                left_field,
566                right_field,
567            } => {
568                assert_eq!(operator, TemporalOp::After);
569                assert_eq!(left_field, "purchase.timestamp");
570                assert_eq!(right_field, "click.timestamp");
571            }
572            _ => panic!("Expected Temporal condition"),
573        }
574    }
575
576    #[test]
577    fn test_parse_stream_join_pattern() {
578        let input = r#"click: ClickEvent from stream("clicks") over window(10 min, sliding) && purchase: PurchaseEvent from stream("purchases") over window(10 min, sliding)"#;
579        let result = parse_stream_join_pattern(input);
580
581        assert!(result.is_ok());
582        let (_, join_pattern) = result.unwrap();
583
584        assert_eq!(join_pattern.left.var_name, "click");
585        assert_eq!(join_pattern.left.event_type, Some("ClickEvent".to_string()));
586        assert_eq!(join_pattern.left.source.stream_name, "clicks");
587
588        assert_eq!(join_pattern.right.var_name, "purchase");
589        assert_eq!(
590            join_pattern.right.event_type,
591            Some("PurchaseEvent".to_string())
592        );
593        assert_eq!(join_pattern.right.source.stream_name, "purchases");
594    }
595}