rust_rule_engine/parser/grl/
stream_syntax.rs

1// Stream syntax parser for GRL
2//
3// This module implements parsing for stream-related GRL syntax:
4// - from stream("name")
5// - over window(duration, type)
6// - Stream patterns with variable bindings
7
8use nom::{
9    bytes::complete::{tag, take_while1},
10    character::complete::{alpha1, char, digit1, multispace0, multispace1},
11    combinator::opt,
12    sequence::{delimited, tuple},
13    IResult,
14};
15use std::time::Duration;
16
17// Re-export WindowType from streaming module when available
18#[cfg(feature = "streaming")]
19pub use crate::streaming::window::WindowType;
20
21// Fallback WindowType for when streaming feature is not enabled
22#[cfg(not(feature = "streaming"))]
23#[derive(Debug, Clone, PartialEq, Copy)]
24pub enum WindowType {
25    Sliding,
26    Tumbling,
27    Session { timeout: Duration },
28}
29
30/// Stream source specification
31#[derive(Debug, Clone, PartialEq)]
32pub struct StreamSource {
33    pub stream_name: String,
34    pub window: Option<WindowSpec>,
35}
36
37/// Window specification
38#[derive(Debug, Clone, PartialEq)]
39pub struct WindowSpec {
40    pub duration: Duration,
41    pub window_type: WindowType,
42}
43
44/// Stream pattern with variable binding
45#[derive(Debug, Clone, PartialEq)]
46pub struct StreamPattern {
47    pub var_name: String,
48    pub event_type: Option<String>,
49    pub source: StreamSource,
50}
51
52/// Parse: from stream("stream-name")
53///
54/// # Example
55/// ```text
56/// from stream("user-events")
57/// from stream("sensor-readings")
58/// ```
59pub fn parse_stream_source(input: &str) -> IResult<&str, StreamSource> {
60    let (input, _) = multispace0(input)?;
61    let (input, _) = tag("from")(input)?;
62    let (input, _) = multispace1(input)?;
63    let (input, _) = tag("stream")(input)?;
64    let (input, _) = multispace0(input)?;
65
66    // Parse stream name in parentheses: stream("name")
67    let (input, stream_name) = delimited(
68        tuple((char('('), multispace0, char('"'))),
69        take_while1(|c: char| c != '"'),
70        tuple((char('"'), multispace0, char(')'))),
71    )(input)?;
72
73    // Optional window specification
74    let (input, window) = opt(parse_window_spec)(input)?;
75
76    Ok((
77        input,
78        StreamSource {
79            stream_name: stream_name.to_string(),
80            window,
81        },
82    ))
83}
84
85/// Parse: over window(5 min, sliding)
86///
87/// # Example
88/// ```text
89/// over window(5 min, sliding)
90/// over window(1 hour, tumbling)
91/// over window(30 seconds, sliding)
92/// ```
93pub fn parse_window_spec(input: &str) -> IResult<&str, WindowSpec> {
94    let (input, _) = multispace0(input)?;
95    let (input, _) = tag("over")(input)?;
96    let (input, _) = multispace1(input)?;
97    let (input, _) = tag("window")(input)?;
98    let (input, _) = multispace0(input)?;
99    let (input, _) = char('(')(input)?;
100    let (input, _) = multispace0(input)?;
101
102    // Parse duration
103    let (input, duration) = parse_duration(input)?;
104
105    let (input, _) = multispace0(input)?;
106    let (input, _) = char(',')(input)?;
107    let (input, _) = multispace0(input)?;
108
109    // Parse window type
110    let (input, window_type) = parse_window_type(input)?;
111
112    let (input, _) = multispace0(input)?;
113    let (input, _) = char(')')(input)?;
114
115    Ok((
116        input,
117        WindowSpec {
118            duration,
119            window_type,
120        },
121    ))
122}
123
124/// Parse duration: "5 min", "10 seconds", "1 hour", etc.
125///
126/// # Supported units
127/// - ms, milliseconds, millisecond
128/// - sec, second, seconds
129/// - min, minute, minutes
130/// - hour, hours
131pub fn parse_duration(input: &str) -> IResult<&str, Duration> {
132    let (input, value) = digit1(input)?;
133    let (input, _) = multispace1(input)?;
134    let (input, unit) = alpha1(input)?;
135
136    let value: u64 = value.parse().map_err(|_| {
137        nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
138    })?;
139
140    let duration = match unit {
141        "ms" | "milliseconds" | "millisecond" => Duration::from_millis(value),
142        "sec" | "second" | "seconds" => Duration::from_secs(value),
143        "min" | "minute" | "minutes" => Duration::from_secs(value * 60),
144        "hour" | "hours" => Duration::from_secs(value * 3600),
145        _ => {
146            return Err(nom::Err::Error(nom::error::Error::new(
147                input,
148                nom::error::ErrorKind::Tag,
149            )))
150        }
151    };
152
153    Ok((input, duration))
154}
155
156/// Parse window type: "sliding" or "tumbling"
157pub fn parse_window_type(input: &str) -> IResult<&str, WindowType> {
158    let (input, type_str) = alpha1(input)?;
159
160    let window_type = match type_str {
161        "sliding" => WindowType::Sliding,
162        "tumbling" => WindowType::Tumbling,
163        _ => {
164            return Err(nom::Err::Error(nom::error::Error::new(
165                input,
166                nom::error::ErrorKind::Tag,
167            )))
168        }
169    };
170
171    Ok((input, window_type))
172}
173
174/// Parse complete stream pattern
175///
176/// # Example
177/// ```text
178/// event: EventType from stream("events") over window(5 min, sliding)
179/// reading: TempReading from stream("sensors")
180/// ```
181pub fn parse_stream_pattern(input: &str) -> IResult<&str, StreamPattern> {
182    // Parse variable binding: event:
183    let (input, var_name) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
184    let (input, _) = multispace0(input)?;
185    let (input, _) = char(':')(input)?;
186    let (input, _) = multispace0(input)?;
187
188    // Optional event type (but not "from" keyword)
189    let (input, event_type) = {
190        let checkpoint = input;
191        match take_while1::<_, _, nom::error::Error<&str>>(|c: char| {
192            c.is_alphanumeric() || c == '_'
193        })(input)
194        {
195            Ok((remaining, name)) if name != "from" => (remaining, Some(name)),
196            _ => (checkpoint, None),
197        }
198    };
199
200    let (input, _) = multispace0(input)?;
201
202    // Parse stream source
203    let (input, source) = parse_stream_source(input)?;
204
205    Ok((
206        input,
207        StreamPattern {
208            var_name: var_name.to_string(),
209            event_type: event_type.map(|s| s.to_string()),
210            source,
211        },
212    ))
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_parse_stream_source_basic() {
221        let input = r#"from stream("user-events")"#;
222        let result = parse_stream_source(input);
223
224        assert!(result.is_ok());
225        let (_, source) = result.unwrap();
226        assert_eq!(source.stream_name, "user-events");
227        assert!(source.window.is_none());
228    }
229
230    #[test]
231    fn test_parse_stream_source_with_spaces() {
232        let input = r#"  from   stream  (  "sensor-data"  )  "#;
233        let result = parse_stream_source(input);
234
235        assert!(result.is_ok());
236        let (_, source) = result.unwrap();
237        assert_eq!(source.stream_name, "sensor-data");
238    }
239
240    #[test]
241    fn test_parse_duration_seconds() {
242        let tests = vec![
243            ("5 seconds", Duration::from_secs(5)),
244            ("10 sec", Duration::from_secs(10)),
245            ("1 second", Duration::from_secs(1)),
246        ];
247
248        for (input, expected) in tests {
249            let result = parse_duration(input);
250            assert!(result.is_ok(), "Failed to parse: {}", input);
251            let (_, duration) = result.unwrap();
252            assert_eq!(duration, expected);
253        }
254    }
255
256    #[test]
257    fn test_parse_duration_minutes() {
258        let tests = vec![
259            ("5 min", Duration::from_secs(300)),
260            ("10 minutes", Duration::from_secs(600)),
261            ("1 minute", Duration::from_secs(60)),
262        ];
263
264        for (input, expected) in tests {
265            let result = parse_duration(input);
266            assert!(result.is_ok());
267            let (_, duration) = result.unwrap();
268            assert_eq!(duration, expected);
269        }
270    }
271
272    #[test]
273    fn test_parse_duration_hours() {
274        let input = "1 hour";
275        let result = parse_duration(input);
276
277        assert!(result.is_ok());
278        let (_, duration) = result.unwrap();
279        assert_eq!(duration, Duration::from_secs(3600));
280    }
281
282    #[test]
283    fn test_parse_duration_milliseconds() {
284        let input = "500 ms";
285        let result = parse_duration(input);
286
287        assert!(result.is_ok());
288        let (_, duration) = result.unwrap();
289        assert_eq!(duration, Duration::from_millis(500));
290    }
291
292    #[test]
293    fn test_parse_window_type() {
294        let tests = vec![
295            ("sliding", WindowType::Sliding),
296            ("tumbling", WindowType::Tumbling),
297        ];
298
299        for (input, expected) in tests {
300            let result = parse_window_type(input);
301            assert!(result.is_ok());
302            let (_, window_type) = result.unwrap();
303            assert_eq!(window_type, expected);
304        }
305    }
306
307    #[test]
308    fn test_parse_window_spec() {
309        let input = "over window(5 min, sliding)";
310        let result = parse_window_spec(input);
311
312        assert!(result.is_ok());
313        let (_, spec) = result.unwrap();
314        assert_eq!(spec.duration, Duration::from_secs(300));
315        assert_eq!(spec.window_type, WindowType::Sliding);
316    }
317
318    #[test]
319    fn test_parse_window_spec_tumbling() {
320        let input = "over window(1 hour, tumbling)";
321        let result = parse_window_spec(input);
322
323        assert!(result.is_ok());
324        let (_, spec) = result.unwrap();
325        assert_eq!(spec.duration, Duration::from_secs(3600));
326        assert_eq!(spec.window_type, WindowType::Tumbling);
327    }
328
329    #[test]
330    fn test_parse_stream_pattern_simple() {
331        let input = r#"event: LoginEvent from stream("logins")"#;
332        let result = parse_stream_pattern(input);
333
334        assert!(result.is_ok());
335        let (_, pattern) = result.unwrap();
336        assert_eq!(pattern.var_name, "event");
337        assert_eq!(pattern.event_type, Some("LoginEvent".to_string()));
338        assert_eq!(pattern.source.stream_name, "logins");
339        assert!(pattern.source.window.is_none());
340    }
341
342    #[test]
343    fn test_parse_stream_pattern_with_window() {
344        let input = r#"reading: TempReading from stream("sensors") over window(10 min, sliding)"#;
345        let result = parse_stream_pattern(input);
346
347        assert!(result.is_ok());
348        let (_, pattern) = result.unwrap();
349        assert_eq!(pattern.var_name, "reading");
350        assert_eq!(pattern.event_type, Some("TempReading".to_string()));
351        assert_eq!(pattern.source.stream_name, "sensors");
352        assert!(pattern.source.window.is_some());
353
354        let window = pattern.source.window.unwrap();
355        assert_eq!(window.duration, Duration::from_secs(600));
356        assert_eq!(window.window_type, WindowType::Sliding);
357    }
358
359    #[test]
360    fn test_parse_stream_pattern_no_type() {
361        let input = r#"e: from stream("events")"#;
362        let result = parse_stream_pattern(input);
363
364        assert!(result.is_ok());
365        let (_, pattern) = result.unwrap();
366        assert_eq!(pattern.var_name, "e");
367        assert_eq!(pattern.event_type, None);
368    }
369
370    #[test]
371    fn test_invalid_window_type() {
372        let input = "over window(5 min, invalid)";
373        let result = parse_window_spec(input);
374
375        assert!(result.is_err());
376    }
377
378    #[test]
379    fn test_invalid_duration_unit() {
380        let input = "5 invalid_unit";
381        let result = parse_duration(input);
382
383        assert!(result.is_err());
384    }
385}