1use nom::{
9 bytes::complete::{tag, take_while1},
10 character::complete::{alpha1, char, digit1, multispace0, multispace1},
11 combinator::opt,
12 sequence::{delimited, tuple},
13 IResult,
14};
15use std::time::Duration;
16
17#[cfg(feature = "streaming")]
19pub use crate::streaming::window::WindowType;
20
21#[cfg(not(feature = "streaming"))]
23#[derive(Debug, Clone, PartialEq, Copy)]
24pub enum WindowType {
25 Sliding,
26 Tumbling,
27 Session { timeout: Duration },
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct StreamSource {
33 pub stream_name: String,
34 pub window: Option<WindowSpec>,
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub struct WindowSpec {
40 pub duration: Duration,
41 pub window_type: WindowType,
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub struct StreamPattern {
47 pub var_name: String,
48 pub event_type: Option<String>,
49 pub source: StreamSource,
50}
51
52pub fn parse_stream_source(input: &str) -> IResult<&str, StreamSource> {
60 let (input, _) = multispace0(input)?;
61 let (input, _) = tag("from")(input)?;
62 let (input, _) = multispace1(input)?;
63 let (input, _) = tag("stream")(input)?;
64 let (input, _) = multispace0(input)?;
65
66 let (input, stream_name) = delimited(
68 tuple((char('('), multispace0, char('"'))),
69 take_while1(|c: char| c != '"'),
70 tuple((char('"'), multispace0, char(')'))),
71 )(input)?;
72
73 let (input, window) = opt(parse_window_spec)(input)?;
75
76 Ok((
77 input,
78 StreamSource {
79 stream_name: stream_name.to_string(),
80 window,
81 },
82 ))
83}
84
85pub fn parse_window_spec(input: &str) -> IResult<&str, WindowSpec> {
94 let (input, _) = multispace0(input)?;
95 let (input, _) = tag("over")(input)?;
96 let (input, _) = multispace1(input)?;
97 let (input, _) = tag("window")(input)?;
98 let (input, _) = multispace0(input)?;
99 let (input, _) = char('(')(input)?;
100 let (input, _) = multispace0(input)?;
101
102 let (input, duration) = parse_duration(input)?;
104
105 let (input, _) = multispace0(input)?;
106 let (input, _) = char(',')(input)?;
107 let (input, _) = multispace0(input)?;
108
109 let (input, window_type) = parse_window_type(input)?;
111
112 let (input, _) = multispace0(input)?;
113 let (input, _) = char(')')(input)?;
114
115 Ok((
116 input,
117 WindowSpec {
118 duration,
119 window_type,
120 },
121 ))
122}
123
124pub fn parse_duration(input: &str) -> IResult<&str, Duration> {
132 let (input, value) = digit1(input)?;
133 let (input, _) = multispace1(input)?;
134 let (input, unit) = alpha1(input)?;
135
136 let value: u64 = value.parse().map_err(|_| {
137 nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
138 })?;
139
140 let duration = match unit {
141 "ms" | "milliseconds" | "millisecond" => Duration::from_millis(value),
142 "sec" | "second" | "seconds" => Duration::from_secs(value),
143 "min" | "minute" | "minutes" => Duration::from_secs(value * 60),
144 "hour" | "hours" => Duration::from_secs(value * 3600),
145 _ => {
146 return Err(nom::Err::Error(nom::error::Error::new(
147 input,
148 nom::error::ErrorKind::Tag,
149 )))
150 }
151 };
152
153 Ok((input, duration))
154}
155
156pub fn parse_window_type(input: &str) -> IResult<&str, WindowType> {
158 let (input, type_str) = alpha1(input)?;
159
160 let window_type = match type_str {
161 "sliding" => WindowType::Sliding,
162 "tumbling" => WindowType::Tumbling,
163 _ => {
164 return Err(nom::Err::Error(nom::error::Error::new(
165 input,
166 nom::error::ErrorKind::Tag,
167 )))
168 }
169 };
170
171 Ok((input, window_type))
172}
173
174pub fn parse_stream_pattern(input: &str) -> IResult<&str, StreamPattern> {
182 let (input, var_name) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
184 let (input, _) = multispace0(input)?;
185 let (input, _) = char(':')(input)?;
186 let (input, _) = multispace0(input)?;
187
188 let (input, event_type) = {
190 let checkpoint = input;
191 match take_while1::<_, _, nom::error::Error<&str>>(|c: char| {
192 c.is_alphanumeric() || c == '_'
193 })(input)
194 {
195 Ok((remaining, name)) if name != "from" => (remaining, Some(name)),
196 _ => (checkpoint, None),
197 }
198 };
199
200 let (input, _) = multispace0(input)?;
201
202 let (input, source) = parse_stream_source(input)?;
204
205 Ok((
206 input,
207 StreamPattern {
208 var_name: var_name.to_string(),
209 event_type: event_type.map(|s| s.to_string()),
210 source,
211 },
212 ))
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn test_parse_stream_source_basic() {
221 let input = r#"from stream("user-events")"#;
222 let result = parse_stream_source(input);
223
224 assert!(result.is_ok());
225 let (_, source) = result.unwrap();
226 assert_eq!(source.stream_name, "user-events");
227 assert!(source.window.is_none());
228 }
229
230 #[test]
231 fn test_parse_stream_source_with_spaces() {
232 let input = r#" from stream ( "sensor-data" ) "#;
233 let result = parse_stream_source(input);
234
235 assert!(result.is_ok());
236 let (_, source) = result.unwrap();
237 assert_eq!(source.stream_name, "sensor-data");
238 }
239
240 #[test]
241 fn test_parse_duration_seconds() {
242 let tests = vec![
243 ("5 seconds", Duration::from_secs(5)),
244 ("10 sec", Duration::from_secs(10)),
245 ("1 second", Duration::from_secs(1)),
246 ];
247
248 for (input, expected) in tests {
249 let result = parse_duration(input);
250 assert!(result.is_ok(), "Failed to parse: {}", input);
251 let (_, duration) = result.unwrap();
252 assert_eq!(duration, expected);
253 }
254 }
255
256 #[test]
257 fn test_parse_duration_minutes() {
258 let tests = vec![
259 ("5 min", Duration::from_secs(300)),
260 ("10 minutes", Duration::from_secs(600)),
261 ("1 minute", Duration::from_secs(60)),
262 ];
263
264 for (input, expected) in tests {
265 let result = parse_duration(input);
266 assert!(result.is_ok());
267 let (_, duration) = result.unwrap();
268 assert_eq!(duration, expected);
269 }
270 }
271
272 #[test]
273 fn test_parse_duration_hours() {
274 let input = "1 hour";
275 let result = parse_duration(input);
276
277 assert!(result.is_ok());
278 let (_, duration) = result.unwrap();
279 assert_eq!(duration, Duration::from_secs(3600));
280 }
281
282 #[test]
283 fn test_parse_duration_milliseconds() {
284 let input = "500 ms";
285 let result = parse_duration(input);
286
287 assert!(result.is_ok());
288 let (_, duration) = result.unwrap();
289 assert_eq!(duration, Duration::from_millis(500));
290 }
291
292 #[test]
293 fn test_parse_window_type() {
294 let tests = vec![
295 ("sliding", WindowType::Sliding),
296 ("tumbling", WindowType::Tumbling),
297 ];
298
299 for (input, expected) in tests {
300 let result = parse_window_type(input);
301 assert!(result.is_ok());
302 let (_, window_type) = result.unwrap();
303 assert_eq!(window_type, expected);
304 }
305 }
306
307 #[test]
308 fn test_parse_window_spec() {
309 let input = "over window(5 min, sliding)";
310 let result = parse_window_spec(input);
311
312 assert!(result.is_ok());
313 let (_, spec) = result.unwrap();
314 assert_eq!(spec.duration, Duration::from_secs(300));
315 assert_eq!(spec.window_type, WindowType::Sliding);
316 }
317
318 #[test]
319 fn test_parse_window_spec_tumbling() {
320 let input = "over window(1 hour, tumbling)";
321 let result = parse_window_spec(input);
322
323 assert!(result.is_ok());
324 let (_, spec) = result.unwrap();
325 assert_eq!(spec.duration, Duration::from_secs(3600));
326 assert_eq!(spec.window_type, WindowType::Tumbling);
327 }
328
329 #[test]
330 fn test_parse_stream_pattern_simple() {
331 let input = r#"event: LoginEvent from stream("logins")"#;
332 let result = parse_stream_pattern(input);
333
334 assert!(result.is_ok());
335 let (_, pattern) = result.unwrap();
336 assert_eq!(pattern.var_name, "event");
337 assert_eq!(pattern.event_type, Some("LoginEvent".to_string()));
338 assert_eq!(pattern.source.stream_name, "logins");
339 assert!(pattern.source.window.is_none());
340 }
341
342 #[test]
343 fn test_parse_stream_pattern_with_window() {
344 let input = r#"reading: TempReading from stream("sensors") over window(10 min, sliding)"#;
345 let result = parse_stream_pattern(input);
346
347 assert!(result.is_ok());
348 let (_, pattern) = result.unwrap();
349 assert_eq!(pattern.var_name, "reading");
350 assert_eq!(pattern.event_type, Some("TempReading".to_string()));
351 assert_eq!(pattern.source.stream_name, "sensors");
352 assert!(pattern.source.window.is_some());
353
354 let window = pattern.source.window.unwrap();
355 assert_eq!(window.duration, Duration::from_secs(600));
356 assert_eq!(window.window_type, WindowType::Sliding);
357 }
358
359 #[test]
360 fn test_parse_stream_pattern_no_type() {
361 let input = r#"e: from stream("events")"#;
362 let result = parse_stream_pattern(input);
363
364 assert!(result.is_ok());
365 let (_, pattern) = result.unwrap();
366 assert_eq!(pattern.var_name, "e");
367 assert_eq!(pattern.event_type, None);
368 }
369
370 #[test]
371 fn test_invalid_window_type() {
372 let input = "over window(5 min, invalid)";
373 let result = parse_window_spec(input);
374
375 assert!(result.is_err());
376 }
377
378 #[test]
379 fn test_invalid_duration_unit() {
380 let input = "5 invalid_unit";
381 let result = parse_duration(input);
382
383 assert!(result.is_err());
384 }
385}