1use pest::Parser;
9use pest_derive::Parser;
10use varpulis_core::ast::*;
11use varpulis_core::span::{Span, Spanned};
12use varpulis_core::types::Type;
13
14use crate::error::{ParseError, ParseResult};
15use crate::helpers::{parse_duration, parse_timestamp};
16use crate::indent::preprocess_indentation;
17
18trait IteratorExt<'a> {
20 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
22}
23
24impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
25 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
26 self.next().ok_or_else(|| ParseError::Located {
27 line: 0,
28 column: 0,
29 position: 0,
30 message: format!("Expected {expected}"),
31 hint: None,
32 })
33 }
34}
35
36#[derive(Debug, Parser)]
41#[grammar = "varpulis.pest"]
42pub struct VarpulisParser;
43
44const PARSE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
52
53#[cfg(not(target_arch = "wasm32"))]
61pub fn parse(source: &str) -> ParseResult<Program> {
62 let source = source.to_string();
63 let handle = std::thread::Builder::new()
64 .stack_size(16 * 1024 * 1024)
65 .spawn(move || parse_inner(&source))
66 .map_err(|e| ParseError::InvalidToken {
67 position: 0,
68 message: format!("Failed to spawn parser thread: {e}"),
69 })?;
70
71 let deadline = std::time::Instant::now() + PARSE_TIMEOUT;
73 loop {
74 if handle.is_finished() {
75 return handle.join().unwrap_or_else(|_| {
76 Err(ParseError::InvalidToken {
77 position: 0,
78 message: "Parser stack overflow on deeply nested input".to_string(),
79 })
80 });
81 }
82 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
83 if remaining.is_zero() {
84 return Err(ParseError::InvalidToken {
85 position: 0,
86 message: format!(
87 "Parser timed out after {}s on pathological input",
88 PARSE_TIMEOUT.as_secs()
89 ),
90 });
91 }
92 std::thread::sleep(std::time::Duration::from_millis(5).min(remaining));
93 }
94}
95
96#[cfg(target_arch = "wasm32")]
98pub fn parse(source: &str) -> ParseResult<Program> {
99 parse_inner(source)
100}
101
102const MAX_NESTING_DEPTH: usize = 10;
111
112const MAX_UNMATCHED_OPEN_BRACKETS: usize = 6;
120
121fn check_nesting_depth(source: &str) -> ParseResult<()> {
126 let mut depth: usize = 0;
127 let mut max_depth: usize = 0;
128 let mut max_depth_pos: usize = 0;
129 let bytes = source.as_bytes();
130 let len = bytes.len();
131 let mut i = 0;
132
133 let mut paren_open: usize = 0;
135 let mut paren_close: usize = 0;
136 let mut square_open: usize = 0;
137 let mut square_close: usize = 0;
138 let mut curly_open: usize = 0;
139 let mut curly_close: usize = 0;
140
141 while i < len {
142 let b = bytes[i];
143
144 if b == b'"' {
146 i += 1;
147 while i < len {
148 if bytes[i] == b'\\' {
149 i += 2; continue;
151 }
152 if bytes[i] == b'"' {
153 i += 1;
154 break;
155 }
156 i += 1;
157 }
158 continue;
159 }
160
161 if b == b'#' {
163 i += 1;
164 while i < len && bytes[i] != b'\n' {
165 i += 1;
166 }
167 continue;
168 }
169
170 if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
172 i += 2;
173 while i + 1 < len {
174 if bytes[i] == b'*' && bytes[i + 1] == b'/' {
175 i += 2;
176 break;
177 }
178 i += 1;
179 }
180 continue;
181 }
182
183 match b {
185 b'(' => {
186 depth += 1;
187 paren_open += 1;
188 }
189 b'[' => {
190 depth += 1;
191 square_open += 1;
192 }
193 b'{' => {
194 depth += 1;
195 curly_open += 1;
196 }
197 b')' => {
198 depth = depth.saturating_sub(1);
199 paren_close += 1;
200 }
201 b']' => {
202 depth = depth.saturating_sub(1);
203 square_close += 1;
204 }
205 b'}' => {
206 depth = depth.saturating_sub(1);
207 curly_close += 1;
208 }
209 _ => {}
210 }
211
212 if depth > max_depth {
213 max_depth = depth;
214 max_depth_pos = i;
215 }
216
217 if max_depth > MAX_NESTING_DEPTH {
218 return Err(ParseError::InvalidToken {
219 position: max_depth_pos,
220 message: format!("Nesting depth exceeds maximum of {MAX_NESTING_DEPTH} levels"),
221 });
222 }
223
224 i += 1;
225 }
226
227 let unmatched = paren_open.saturating_sub(paren_close)
231 + square_open.saturating_sub(square_close)
232 + curly_open.saturating_sub(curly_close);
233 if unmatched > MAX_UNMATCHED_OPEN_BRACKETS {
234 return Err(ParseError::InvalidToken {
235 position: max_depth_pos,
236 message: format!(
237 "Too many unmatched open brackets ({unmatched}); maximum is {MAX_UNMATCHED_OPEN_BRACKETS}"
238 ),
239 });
240 }
241
242 Ok(())
243}
244
245fn parse_inner(source: &str) -> ParseResult<Program> {
246 let expanded =
248 crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
249 position: 0,
250 message: e,
251 })?;
252 let preprocessed = preprocess_indentation(&expanded);
254
255 check_nesting_depth(&preprocessed)?;
257
258 let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
259
260 let mut statements = Vec::new();
261
262 for pair in pairs {
263 if pair.as_rule() == Rule::program {
264 for inner in pair.into_inner() {
265 if inner.as_rule() == Rule::statement {
266 statements.push(parse_statement(inner)?);
267 }
268 }
269 }
270 }
271
272 Ok(crate::optimize::fold_program(Program { statements }))
273}
274
275fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
276 let position = match e.location {
277 pest::error::InputLocation::Pos(p) => p,
278 pest::error::InputLocation::Span((s, _)) => s,
279 };
280
281 let (line, column) = match e.line_col {
283 pest::error::LineColLocation::Pos((l, c)) => (l, c),
284 pest::error::LineColLocation::Span((l, c), _) => (l, c),
285 };
286
287 let (message, hint) = match &e.variant {
289 pest::error::ErrorVariant::ParsingError {
290 positives,
291 negatives: _,
292 } => {
293 if positives.is_empty() {
294 ("Unexpected token".to_string(), None)
295 } else if is_stream_op_error(positives) {
296 (
298 "unknown stream operation".to_string(),
299 Some(
300 "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
301 .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
302 .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
303 .to_string(),
304 ),
305 )
306 } else {
307 let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
308 if expected.len() == 1 {
309 (format!("Expected {}", expected[0]), None)
310 } else {
311 (format!("Expected one of: {}", expected.join(", ")), None)
312 }
313 }
314 }
315 pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
316 };
317
318 ParseError::Located {
319 line,
320 column,
321 position,
322 message,
323 hint,
324 }
325}
326
327fn format_rule_name(rule: &Rule) -> String {
329 match rule {
330 Rule::identifier => "identifier".to_string(),
331 Rule::integer => "number".to_string(),
332 Rule::float => "number".to_string(),
333 Rule::string => "string".to_string(),
334 Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
335 Rule::type_expr => "type".to_string(),
336 Rule::expr => "expression".to_string(),
337 Rule::statement => "statement".to_string(),
338 Rule::context_decl => "context declaration".to_string(),
339 Rule::stream_decl => "stream declaration".to_string(),
340 Rule::pattern_decl => "pattern declaration".to_string(),
341 Rule::event_decl => "event declaration".to_string(),
342 Rule::fn_decl => "function declaration".to_string(),
343 Rule::INDENT => "indented block".to_string(),
344 Rule::DEDENT => "end of block".to_string(),
345 Rule::field => "field declaration (name: type)".to_string(),
346 Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
347 Rule::additive_op => "operator (+, -)".to_string(),
348 Rule::multiplicative_op => "operator (*, /, %)".to_string(),
349 Rule::postfix_suffix => "method call or member access".to_string(),
350 Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
351 Rule::sase_seq_expr => "SEQ expression".to_string(),
352 Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
353 _ => format!("{rule:?}").to_lowercase().replace('_', " "),
354 }
355}
356
357fn is_stream_op_error(positives: &[Rule]) -> bool {
361 const STREAM_OP_RULES: &[Rule] = &[
362 Rule::context_op,
363 Rule::where_op,
364 Rule::select_op,
365 Rule::window_op,
366 Rule::aggregate_op,
367 Rule::having_op,
368 Rule::partition_by_op,
369 Rule::order_by_op,
370 Rule::limit_op,
371 Rule::distinct_op,
372 Rule::map_op,
373 Rule::filter_op,
374 Rule::tap_op,
375 Rule::print_op,
376 Rule::log_op,
377 Rule::emit_op,
378 Rule::to_op,
379 Rule::pattern_op,
380 Rule::concurrent_op,
381 Rule::process_op,
382 Rule::on_error_op,
383 Rule::collect_op,
384 Rule::on_op,
385 Rule::within_op,
386 Rule::not_op,
387 Rule::fork_op,
388 Rule::any_op,
389 Rule::all_op,
390 Rule::first_op,
391 Rule::watermark_op,
392 Rule::allowed_lateness_op,
393 Rule::trend_aggregate_op,
394 Rule::score_op,
395 Rule::forecast_op,
396 Rule::enrich_op,
397 Rule::alert_op,
398 ];
399 positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
400}
401
402fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
403 let span = Span::new(pair.as_span().start(), pair.as_span().end());
404 let inner = pair.into_inner().expect_next("statement body")?;
405
406 let stmt = match inner.as_rule() {
407 Rule::context_decl => parse_context_decl(inner)?,
408 Rule::connector_decl => parse_connector_decl(inner)?,
409 Rule::stream_decl => parse_stream_decl(inner)?,
410 Rule::pattern_decl => parse_pattern_decl(inner)?,
411 Rule::event_decl => parse_event_decl(inner)?,
412 Rule::type_decl => parse_type_decl(inner)?,
413 Rule::var_decl => parse_var_decl(inner)?,
414 Rule::const_decl => parse_const_decl(inner)?,
415 Rule::fn_decl => parse_fn_decl(inner)?,
416 Rule::config_block => parse_config_block(inner)?,
417 Rule::import_stmt => parse_import_stmt(inner)?,
418 Rule::if_stmt => parse_if_stmt(inner)?,
419 Rule::for_stmt => parse_for_stmt(inner)?,
420 Rule::while_stmt => parse_while_stmt(inner)?,
421 Rule::return_stmt => parse_return_stmt(inner)?,
422 Rule::break_stmt => Stmt::Break,
423 Rule::continue_stmt => Stmt::Continue,
424 Rule::emit_stmt => parse_emit_stmt(inner)?,
425 Rule::assignment_stmt => {
426 let mut inner = inner.into_inner();
427 let name = inner.expect_next("variable name")?.as_str().to_string();
428 let value = parse_expr(inner.expect_next("assignment value")?)?;
429 Stmt::Assignment { name, value }
430 }
431 Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
432 _ => {
433 return Err(ParseError::UnexpectedToken {
434 position: span.start,
435 expected: "statement".to_string(),
436 found: format!("{:?}", inner.as_rule()),
437 })
438 }
439 };
440
441 Ok(Spanned::new(stmt, span))
442}
443
444fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
449 let mut inner = pair.into_inner();
450 let name = inner.expect_next("context name")?.as_str().to_string();
451 let mut cores = None;
452
453 for p in inner {
454 if p.as_rule() == Rule::context_params {
455 for param in p.into_inner() {
456 if param.as_rule() == Rule::context_param {
457 let core_ids: Vec<usize> = param
459 .into_inner()
460 .filter(|p| p.as_rule() == Rule::integer)
461 .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
462 .collect();
463 cores = Some(core_ids);
464 }
465 }
466 }
467 }
468
469 Ok(Stmt::ContextDecl { name, cores })
470}
471
472fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
477 let mut inner = pair.into_inner();
478 let name = inner.expect_next("connector name")?.as_str().to_string();
479 let connector_type = inner.expect_next("connector type")?.as_str().to_string();
480 let mut params = Vec::new();
481
482 for p in inner {
483 if p.as_rule() == Rule::connector_params {
484 params = parse_connector_params(p)?;
485 }
486 }
487
488 Ok(Stmt::ConnectorDecl {
489 name,
490 connector_type,
491 params,
492 })
493}
494
495fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
496 let mut params = Vec::new();
497 for p in pair.into_inner() {
498 if p.as_rule() == Rule::connector_param {
499 let mut inner = p.into_inner();
500 let name = inner.expect_next("param name")?.as_str().to_string();
501 let value_pair = inner.expect_next("param value")?;
502 let value = parse_config_value(value_pair)?;
503 params.push(ConnectorParam { name, value });
504 }
505 }
506 Ok(params)
507}
508
509fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
510 let mut inner = pair.into_inner();
511 let name = inner.expect_next("stream name")?.as_str().to_string();
512
513 let mut type_annotation = None;
514 let mut source = StreamSource::Ident(String::new());
515 let mut ops = Vec::new();
516 let mut op_spans = Vec::new();
517
518 for p in inner {
519 match p.as_rule() {
520 Rule::type_annotation => {
521 type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
522 }
523 Rule::stream_expr => {
524 let (s, o, spans) = parse_stream_expr(p)?;
525 source = s;
526 ops = o;
527 op_spans = spans;
528 }
529 _ => {}
530 }
531 }
532
533 Ok(Stmt::StreamDecl {
534 name,
535 type_annotation,
536 source,
537 ops,
538 op_spans,
539 })
540}
541
542fn parse_stream_expr(
543 pair: pest::iterators::Pair<Rule>,
544) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
545 let mut inner = pair.into_inner();
546 let source = parse_stream_source(inner.expect_next("stream source")?)?;
547 let mut ops = Vec::new();
548 let mut op_spans = Vec::new();
549
550 for p in inner {
551 if p.as_rule() == Rule::stream_op {
552 let pest_span = p.as_span();
553 let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
554 ops.push(parse_stream_op(p)?);
555 op_spans.push(span);
556 }
557 }
558
559 Ok((source, ops, op_spans))
560}
561
562fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
567 let mut inner = pair.into_inner();
568 let name = inner.expect_next("pattern name")?.as_str().to_string();
569
570 let mut expr = SasePatternExpr::Event(String::new());
571 let mut within = None;
572 let mut partition_by = None;
573
574 for p in inner {
575 match p.as_rule() {
576 Rule::sase_pattern_expr => {
577 expr = parse_sase_pattern_expr(p)?;
578 }
579 Rule::pattern_within_clause => {
580 let dur_pair = p.into_inner().expect_next("within duration")?;
581 within = Some(Expr::Duration(
582 parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
583 ));
584 }
585 Rule::pattern_partition_clause => {
586 let key = p
587 .into_inner()
588 .expect_next("partition key")?
589 .as_str()
590 .to_string();
591 partition_by = Some(Expr::Ident(key));
592 }
593 _ => {}
594 }
595 }
596
597 Ok(Stmt::PatternDecl {
598 name,
599 expr,
600 within,
601 partition_by,
602 })
603}
604
605fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
606 let inner = pair.into_inner().expect_next("SASE pattern expression")?;
607 parse_sase_or_expr(inner)
608}
609
610fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
611 let mut inner = pair.into_inner();
612 let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
613
614 for right_pair in inner {
615 let right = parse_sase_and_expr(right_pair)?;
616 left = SasePatternExpr::Or(Box::new(left), Box::new(right));
617 }
618
619 Ok(left)
620}
621
622fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
623 let mut inner = pair.into_inner();
624 let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
625
626 for right_pair in inner {
627 let right = parse_sase_not_expr(right_pair)?;
628 left = SasePatternExpr::And(Box::new(left), Box::new(right));
629 }
630
631 Ok(left)
632}
633
634fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
635 let mut inner = pair.into_inner();
636 let first = inner.expect_next("NOT or primary expression")?;
637
638 if first.as_rule() == Rule::sase_not_keyword {
639 let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
640 Ok(SasePatternExpr::Not(Box::new(expr)))
641 } else {
642 parse_sase_primary_expr(first)
643 }
644}
645
646fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
647 let inner = pair.into_inner().expect_next("SASE primary expression")?;
648
649 match inner.as_rule() {
650 Rule::sase_seq_expr => parse_sase_seq_expr(inner),
651 Rule::sase_grouped_expr => {
652 let nested = inner.into_inner().expect_next("grouped expression")?;
653 let expr = parse_sase_pattern_expr(nested)?;
654 Ok(SasePatternExpr::Group(Box::new(expr)))
655 }
656 Rule::sase_event_ref => parse_sase_event_ref(inner),
657 _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
658 }
659}
660
661fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
662 let mut items = Vec::new();
663
664 for p in pair.into_inner() {
665 if p.as_rule() == Rule::sase_seq_items {
666 for item in p.into_inner() {
667 if item.as_rule() == Rule::sase_seq_item {
668 items.push(parse_sase_seq_item(item)?);
669 }
670 }
671 }
672 }
673
674 Ok(SasePatternExpr::Seq(items))
675}
676
677fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
678 let inner = pair.into_inner().expect_next("sequence item")?;
679
680 match inner.as_rule() {
681 Rule::sase_negated_item => parse_sase_item_inner(inner, true),
682 Rule::sase_positive_item => parse_sase_item_inner(inner, false),
683 _ => parse_sase_item_inner(inner, false),
684 }
685}
686
687fn parse_sase_item_inner(
688 pair: pest::iterators::Pair<Rule>,
689 _negated: bool,
690) -> ParseResult<SasePatternItem> {
691 let mut inner = pair.into_inner();
692 let event_type = inner.expect_next("event type")?.as_str().to_string();
693
694 let mut kleene = None;
695 let mut filter = None;
696 let mut alias = None;
697
698 for p in inner {
699 match p.as_rule() {
700 Rule::kleene_op => {
701 kleene = Some(match p.as_str() {
702 "+" => KleeneOp::Plus,
703 "*" => KleeneOp::Star,
704 "?" => KleeneOp::Optional,
705 _ => KleeneOp::Plus,
706 });
707 }
708 Rule::sase_where_clause => {
709 filter = Some(parse_expr(
710 p.into_inner().expect_next("filter expression")?,
711 )?);
712 }
713 Rule::sase_alias_clause => {
714 alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
715 }
716 _ => {}
717 }
718 }
719
720 let event_type = if _negated {
723 format!("!{event_type}")
724 } else {
725 event_type
726 };
727
728 Ok(SasePatternItem {
729 event_type,
730 alias,
731 kleene,
732 filter,
733 })
734}
735
736fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
737 let item = parse_sase_item_inner(pair, false)?;
739
740 if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
742 Ok(SasePatternExpr::Event(item.event_type))
743 } else {
744 Ok(SasePatternExpr::Seq(vec![item]))
746 }
747}
748
749fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
750 let inner = pair.into_inner().expect_next("stream source type")?;
751
752 match inner.as_rule() {
753 Rule::from_connector_source => {
754 let mut inner_iter = inner.into_inner();
755 let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
756 let connector_name = inner_iter
757 .expect_next("connector name")?
758 .as_str()
759 .to_string();
760 let mut params = Vec::new();
761 for p in inner_iter {
762 if p.as_rule() == Rule::connector_params {
763 params = parse_connector_params(p)?;
764 }
765 }
766 Ok(StreamSource::FromConnector {
767 event_type,
768 connector_name,
769 params,
770 })
771 }
772 Rule::merge_source => {
773 let mut streams = Vec::new();
774 for p in inner.into_inner() {
775 if p.as_rule() == Rule::inline_stream_list {
776 for is in p.into_inner() {
777 streams.push(parse_inline_stream(is)?);
778 }
779 }
780 }
781 Ok(StreamSource::Merge(streams))
782 }
783 Rule::join_source
784 | Rule::left_join_source
785 | Rule::right_join_source
786 | Rule::full_join_source => {
787 let join_type = match inner.as_rule() {
788 Rule::left_join_source => varpulis_core::ast::JoinType::Left,
789 Rule::right_join_source => varpulis_core::ast::JoinType::Right,
790 Rule::full_join_source => varpulis_core::ast::JoinType::Full,
791 _ => varpulis_core::ast::JoinType::Inner,
792 };
793 let mut clauses = Vec::new();
794 for p in inner.into_inner() {
795 if p.as_rule() == Rule::join_clause_list {
796 for jc in p.into_inner() {
797 clauses.push(parse_join_clause(jc, join_type)?);
798 }
799 }
800 }
801 Ok(StreamSource::Join(clauses))
802 }
803 Rule::sequence_source => {
804 let decl =
805 parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
806 Ok(StreamSource::Sequence(decl))
807 }
808 Rule::timer_source => {
809 let timer_args = inner.into_inner().expect_next("timer arguments")?;
810 let decl = parse_timer_decl(timer_args)?;
811 Ok(StreamSource::Timer(decl))
812 }
813 Rule::all_source => {
814 let mut inner_iter = inner.into_inner();
815 let name = inner_iter.expect_next("event name")?.as_str().to_string();
816 let alias = inner_iter.next().map(|p| p.as_str().to_string());
817 Ok(StreamSource::AllWithAlias { name, alias })
818 }
819 Rule::filtered_source => {
820 let mut inner_iter = inner.into_inner();
821 let name = inner_iter.expect_next("event type")?.as_str().to_string();
822 let filter_pair = inner_iter.expect_next("filter expression")?;
823 let filter = parse_filter_expr(filter_pair)?;
824 let alias = inner_iter.next().map(|p| p.as_str().to_string());
825 Ok(StreamSource::IdentWithFilterAndAlias {
826 name,
827 filter,
828 alias,
829 })
830 }
831 Rule::aliased_source => {
832 let mut inner_iter = inner.into_inner();
833 let name = inner_iter.expect_next("event name")?.as_str().to_string();
834 let alias = inner_iter.expect_next("alias")?.as_str().to_string();
835 Ok(StreamSource::IdentWithAlias { name, alias })
836 }
837 Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
838 _ => Err(ParseError::UnexpectedToken {
839 position: 0,
840 expected: "stream source".to_string(),
841 found: format!("{:?}", inner.as_rule()),
842 }),
843 }
844}
845
846fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
847 let mut inner = pair.into_inner();
848
849 let first = inner.expect_next("stream identifier")?;
851 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
852 let name = first.as_str().to_string();
853 return Ok(InlineStreamDecl {
854 name: name.clone(),
855 source: name,
856 filter: None,
857 });
858 }
859
860 let name = first.as_str().to_string();
861 let source = inner.expect_next("stream source")?.as_str().to_string();
862 let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
863
864 Ok(InlineStreamDecl {
865 name,
866 source,
867 filter,
868 })
869}
870
871fn parse_join_clause(
872 pair: pest::iterators::Pair<Rule>,
873 join_type: varpulis_core::ast::JoinType,
874) -> ParseResult<JoinClause> {
875 let mut inner = pair.into_inner();
876
877 let first = inner.expect_next("join clause identifier")?;
878 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
879 let name = first.as_str().to_string();
880 return Ok(JoinClause {
881 name: name.clone(),
882 source: name,
883 on: None,
884 join_type,
885 });
886 }
887
888 let name = first.as_str().to_string();
889 let source = inner.expect_next("join source")?.as_str().to_string();
890 let on = inner.next().map(|p| parse_expr(p)).transpose()?;
891
892 Ok(JoinClause {
893 name,
894 source,
895 on,
896 join_type,
897 })
898}
899
900fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
901 let mut steps = Vec::new();
902
903 for p in pair.into_inner() {
904 if p.as_rule() == Rule::sequence_step {
905 steps.push(parse_sequence_step(p)?);
906 }
907 }
908
909 Ok(SequenceDecl {
910 match_all: false,
911 timeout: None,
912 steps,
913 })
914}
915
916fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
917 let mut inner = pair.into_inner();
918 let alias = inner.expect_next("step alias")?.as_str().to_string();
919 let event_type = inner.expect_next("event type")?.as_str().to_string();
920
921 let mut filter = None;
922 let mut timeout = None;
923
924 for p in inner {
925 match p.as_rule() {
926 Rule::or_expr => filter = Some(parse_expr(p)?),
927 Rule::within_suffix => {
928 let expr = p.into_inner().expect_next("within duration")?;
929 timeout = Some(Box::new(parse_expr(expr)?));
930 }
931 _ => {}
932 }
933 }
934
935 Ok(SequenceStepDecl {
936 alias,
937 event_type,
938 filter,
939 timeout,
940 })
941}
942
943fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
944 let mut inner = pair.into_inner();
945
946 let interval = parse_expr(inner.expect_next("timer interval")?)?;
948
949 let mut initial_delay = None;
951 for p in inner {
952 if p.as_rule() == Rule::named_arg {
953 let arg = parse_named_arg(p)?;
954 if arg.name == "initial_delay" {
955 initial_delay = Some(Box::new(arg.value));
956 }
957 }
958 }
959
960 Ok(TimerDecl {
961 interval,
962 initial_delay,
963 })
964}
965
966fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
967 let inner = pair.into_inner().expect_next("stream operation")?;
968
969 match inner.as_rule() {
970 Rule::dot_op => {
971 let op_inner = inner.into_inner().expect_next("dot operation")?;
972 parse_dot_op(op_inner)
973 }
974 Rule::followed_by_op => parse_followed_by_op(inner),
975 _ => Err(ParseError::UnexpectedToken {
976 position: 0,
977 expected: "stream operation".to_string(),
978 found: format!("{:?}", inner.as_rule()),
979 }),
980 }
981}
982
983fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
984 match pair.as_rule() {
985 Rule::context_op => {
986 let name = pair
987 .into_inner()
988 .expect_next("context name")?
989 .as_str()
990 .to_string();
991 Ok(StreamOp::Context(name))
992 }
993 Rule::where_op => {
994 let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
995 Ok(StreamOp::Where(expr))
996 }
997 Rule::select_op => {
998 let mut items = Vec::new();
999 for p in pair.into_inner() {
1000 if p.as_rule() == Rule::select_list {
1001 for si in p.into_inner() {
1002 items.push(parse_select_item(si)?);
1003 }
1004 }
1005 }
1006 Ok(StreamOp::Select(items))
1007 }
1008 Rule::window_op => {
1009 let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
1010 Ok(StreamOp::Window(args))
1011 }
1012 Rule::aggregate_op => {
1013 let mut items = Vec::new();
1014 for p in pair.into_inner() {
1015 if p.as_rule() == Rule::agg_list {
1016 for ai in p.into_inner() {
1017 items.push(parse_agg_item(ai)?);
1018 }
1019 }
1020 }
1021 Ok(StreamOp::Aggregate(items))
1022 }
1023 Rule::having_op => {
1024 let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
1025 Ok(StreamOp::Having(expr))
1026 }
1027 Rule::map_op => {
1028 let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
1029 Ok(StreamOp::Map(expr))
1030 }
1031 Rule::filter_op => {
1032 let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
1033 Ok(StreamOp::Filter(expr))
1034 }
1035 Rule::within_op => {
1036 let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
1037 Ok(StreamOp::Within(expr))
1038 }
1039 Rule::emit_op => {
1040 let mut output_type = None;
1041 let mut fields = Vec::new();
1042 let mut target_context = None;
1043 for p in pair.into_inner() {
1044 match p.as_rule() {
1045 Rule::emit_type_cast => {
1046 output_type = Some(
1047 p.into_inner()
1048 .expect_next("type name")?
1049 .as_str()
1050 .to_string(),
1051 );
1052 }
1053 Rule::named_arg_list => {
1054 for arg in p.into_inner() {
1055 let parsed = parse_named_arg(arg)?;
1056 if parsed.name == "context" {
1058 if let Expr::Ident(ctx_name) = &parsed.value {
1059 target_context = Some(ctx_name.clone());
1060 continue;
1061 }
1062 }
1063 fields.push(parsed);
1064 }
1065 }
1066 _ => {}
1067 }
1068 }
1069 Ok(StreamOp::Emit {
1070 output_type,
1071 fields,
1072 target_context,
1073 })
1074 }
1075 Rule::print_op => {
1076 let exprs = pair
1077 .into_inner()
1078 .filter(|p| p.as_rule() == Rule::expr_list)
1079 .flat_map(|p| p.into_inner())
1080 .map(parse_expr)
1081 .collect::<ParseResult<Vec<_>>>()?;
1082 Ok(StreamOp::Print(exprs))
1083 }
1084 Rule::collect_op => Ok(StreamOp::Collect),
1085 Rule::pattern_op => {
1086 let def_pair = pair.into_inner().expect_next("pattern definition")?;
1087 let mut inner = def_pair.into_inner();
1088 let name = inner.expect_next("pattern name")?.as_str().to_string();
1089 let body_pair = inner.expect_next("pattern body")?;
1090
1091 let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
1093 let matcher = match body_inner.as_rule() {
1094 Rule::lambda_expr => parse_lambda_expr(body_inner)?,
1095 Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
1096 _ => parse_expr_inner(body_inner)?,
1097 };
1098 Ok(StreamOp::Pattern(PatternDef { name, matcher }))
1099 }
1100 Rule::partition_by_op => {
1101 let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
1102 Ok(StreamOp::PartitionBy(expr))
1103 }
1104 Rule::order_by_op => {
1105 let mut items = Vec::new();
1106 for p in pair.into_inner() {
1107 if p.as_rule() == Rule::order_list {
1108 for oi in p.into_inner() {
1109 items.push(parse_order_item(oi)?);
1110 }
1111 }
1112 }
1113 Ok(StreamOp::OrderBy(items))
1114 }
1115 Rule::limit_op => {
1116 let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1117 Ok(StreamOp::Limit(expr))
1118 }
1119 Rule::distinct_op => {
1120 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1121 Ok(StreamOp::Distinct(expr))
1122 }
1123 Rule::tap_op => {
1124 let args = pair
1125 .into_inner()
1126 .filter(|p| p.as_rule() == Rule::named_arg_list)
1127 .flat_map(|p| p.into_inner())
1128 .map(parse_named_arg)
1129 .collect::<ParseResult<Vec<_>>>()?;
1130 Ok(StreamOp::Tap(args))
1131 }
1132 Rule::log_op => {
1133 let args = pair
1134 .into_inner()
1135 .filter(|p| p.as_rule() == Rule::named_arg_list)
1136 .flat_map(|p| p.into_inner())
1137 .map(parse_named_arg)
1138 .collect::<ParseResult<Vec<_>>>()?;
1139 Ok(StreamOp::Log(args))
1140 }
1141 Rule::to_op => {
1142 let mut inner = pair.into_inner();
1143 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1144 let mut params = Vec::new();
1145 for p in inner {
1146 if p.as_rule() == Rule::connector_params {
1147 params = parse_connector_params(p)?;
1148 }
1149 }
1150 Ok(StreamOp::To {
1151 connector_name,
1152 params,
1153 })
1154 }
1155 Rule::process_op => {
1156 let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1157 Ok(StreamOp::Process(expr))
1158 }
1159 Rule::on_error_op => {
1160 let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1161 Ok(StreamOp::OnError(expr))
1162 }
1163 Rule::on_op => {
1164 let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1165 Ok(StreamOp::On(expr))
1166 }
1167 Rule::not_op => {
1168 let mut inner = pair.into_inner();
1169 let event_type = inner.expect_next("event type")?.as_str().to_string();
1170 let filter = inner.next().map(parse_expr).transpose()?;
1171 Ok(StreamOp::Not(FollowedByClause {
1172 event_type,
1173 filter,
1174 alias: None,
1175 match_all: false,
1176 }))
1177 }
1178 Rule::fork_op => {
1179 let mut paths = Vec::new();
1180 for p in pair.into_inner() {
1181 if p.as_rule() == Rule::fork_path_list {
1182 for fp in p.into_inner() {
1183 paths.push(parse_fork_path(fp)?);
1184 }
1185 }
1186 }
1187 Ok(StreamOp::Fork(paths))
1188 }
1189 Rule::any_op => {
1190 let count = pair
1191 .into_inner()
1192 .next()
1193 .map(|p| p.as_str().parse().unwrap_or(1));
1194 Ok(StreamOp::Any(count))
1195 }
1196 Rule::all_op => Ok(StreamOp::All),
1197 Rule::first_op => Ok(StreamOp::First),
1198 Rule::concurrent_op => {
1199 let args = pair
1200 .into_inner()
1201 .filter(|p| p.as_rule() == Rule::named_arg_list)
1202 .flat_map(|p| p.into_inner())
1203 .map(parse_named_arg)
1204 .collect::<ParseResult<Vec<_>>>()?;
1205 Ok(StreamOp::Concurrent(args))
1206 }
1207 Rule::watermark_op => {
1208 let args = pair
1209 .into_inner()
1210 .filter(|p| p.as_rule() == Rule::named_arg_list)
1211 .flat_map(|p| p.into_inner())
1212 .map(parse_named_arg)
1213 .collect::<ParseResult<Vec<_>>>()?;
1214 Ok(StreamOp::Watermark(args))
1215 }
1216 Rule::allowed_lateness_op => {
1217 let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1218 Ok(StreamOp::AllowedLateness(expr))
1219 }
1220 Rule::trend_aggregate_op => {
1221 let mut items = Vec::new();
1222 for p in pair.into_inner() {
1223 if p.as_rule() == Rule::trend_agg_list {
1224 for item_pair in p.into_inner() {
1225 if item_pair.as_rule() == Rule::trend_agg_item {
1226 let mut inner = item_pair.into_inner();
1227 let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1228 let func_pair = inner.expect_next("trend agg function")?;
1229 let mut func_inner = func_pair.into_inner();
1230 let func_name = func_inner
1231 .expect_next("function name")?
1232 .as_str()
1233 .to_string();
1234 let arg = func_inner.next().map(parse_expr).transpose()?;
1235 items.push(TrendAggItem {
1236 alias,
1237 func: func_name,
1238 arg,
1239 });
1240 }
1241 }
1242 }
1243 }
1244 Ok(StreamOp::TrendAggregate(items))
1245 }
1246 Rule::forecast_op => {
1247 let mut confidence = None;
1248 let mut horizon = None;
1249 let mut warmup = None;
1250 let mut max_depth = None;
1251 let mut hawkes = None;
1252 let mut conformal = None;
1253 let mut mode = None;
1254 for p in pair.into_inner() {
1255 if p.as_rule() == Rule::forecast_params {
1256 for param_pair in p.into_inner() {
1257 if param_pair.as_rule() == Rule::forecast_param {
1258 let mut inner = param_pair.into_inner();
1259 let name = inner.expect_next("forecast param name")?.as_str();
1260 let value_pair = inner.expect_next("forecast param value")?;
1261 let expr = parse_expr(value_pair)?;
1262 match name {
1263 "confidence" => confidence = Some(expr),
1264 "horizon" => horizon = Some(expr),
1265 "warmup" => warmup = Some(expr),
1266 "max_depth" => max_depth = Some(expr),
1267 "hawkes" => hawkes = Some(expr),
1268 "conformal" => conformal = Some(expr),
1269 "mode" => mode = Some(expr),
1270 _ => {}
1271 }
1272 }
1273 }
1274 }
1275 }
1276 Ok(StreamOp::Forecast(ForecastSpec {
1277 confidence,
1278 horizon,
1279 warmup,
1280 max_depth,
1281 hawkes,
1282 conformal,
1283 mode,
1284 }))
1285 }
1286 Rule::enrich_op => {
1287 let mut inner = pair.into_inner();
1288 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1289 let mut key_expr = None;
1290 let mut fields = Vec::new();
1291 let mut cache_ttl = None;
1292 let mut timeout = None;
1293 let mut fallback = None;
1294 for p in inner {
1295 if p.as_rule() == Rule::enrich_params {
1296 for param_pair in p.into_inner() {
1297 if param_pair.as_rule() == Rule::enrich_param {
1298 let param_inner =
1299 param_pair.into_inner().expect_next("enrich param")?;
1300 match param_inner.as_rule() {
1301 Rule::enrich_key_param => {
1302 let expr_pair =
1303 param_inner.into_inner().expect_next("key expression")?;
1304 key_expr = Some(parse_expr(expr_pair)?);
1305 }
1306 Rule::enrich_fields_param => {
1307 for field in param_inner.into_inner() {
1308 if field.as_rule() == Rule::identifier {
1309 fields.push(field.as_str().to_string());
1310 }
1311 }
1312 }
1313 Rule::enrich_cache_ttl_param => {
1314 let expr_pair = param_inner
1315 .into_inner()
1316 .expect_next("cache_ttl expression")?;
1317 cache_ttl = Some(parse_expr(expr_pair)?);
1318 }
1319 Rule::enrich_timeout_param => {
1320 let expr_pair = param_inner
1321 .into_inner()
1322 .expect_next("timeout expression")?;
1323 timeout = Some(parse_expr(expr_pair)?);
1324 }
1325 Rule::enrich_fallback_param => {
1326 let literal_pair =
1327 param_inner.into_inner().expect_next("fallback literal")?;
1328 fallback = Some(parse_expr(literal_pair)?);
1329 }
1330 _ => {}
1331 }
1332 }
1333 }
1334 }
1335 }
1336 let key = key_expr.ok_or_else(|| ParseError::Located {
1337 line: 0,
1338 column: 0,
1339 position: 0,
1340 message: ".enrich() requires a key: parameter".to_string(),
1341 hint: Some("add key: <expression> to .enrich()".to_string()),
1342 })?;
1343 Ok(StreamOp::Enrich(EnrichSpec {
1344 connector_name,
1345 key_expr: Box::new(key),
1346 fields,
1347 cache_ttl,
1348 timeout,
1349 fallback,
1350 }))
1351 }
1352 Rule::score_op => {
1353 let mut model_path = String::new();
1354 let mut inputs = Vec::new();
1355 let mut outputs = Vec::new();
1356 let mut gpu = false;
1357 let mut batch_size: usize = 1;
1358 let mut device_id: i32 = 0;
1359 for p in pair.into_inner() {
1360 if p.as_rule() == Rule::score_params {
1361 for param_pair in p.into_inner() {
1362 if param_pair.as_rule() == Rule::score_param {
1363 let mut inner = param_pair.into_inner();
1364 let name = inner.expect_next("score param name")?.as_str();
1365 let value_pair = inner.expect_next("score param value")?;
1366 match name {
1367 "model" => {
1368 let raw = value_pair.as_str();
1369 model_path = raw.trim_matches('"').to_string();
1370 }
1371 "inputs" => {
1372 if value_pair.as_rule() == Rule::score_field_list {
1373 for field in value_pair.into_inner() {
1374 if field.as_rule() == Rule::identifier {
1375 inputs.push(field.as_str().to_string());
1376 }
1377 }
1378 }
1379 }
1380 "outputs" => {
1381 if value_pair.as_rule() == Rule::score_field_list {
1382 for field in value_pair.into_inner() {
1383 if field.as_rule() == Rule::identifier {
1384 outputs.push(field.as_str().to_string());
1385 }
1386 }
1387 }
1388 }
1389 "gpu" => {
1390 gpu = value_pair.as_str() == "true";
1391 }
1392 "batch_size" => {
1393 batch_size = value_pair.as_str().parse().unwrap_or(1);
1394 }
1395 "device" | "device_id" => {
1396 device_id = value_pair.as_str().parse().unwrap_or(0);
1397 }
1398 _ => {}
1399 }
1400 }
1401 }
1402 }
1403 }
1404 Ok(StreamOp::Score(ScoreSpec {
1405 model_path,
1406 inputs,
1407 outputs,
1408 gpu,
1409 batch_size,
1410 device_id,
1411 }))
1412 }
1413 Rule::alert_op => {
1414 let args = pair
1415 .into_inner()
1416 .filter(|p| p.as_rule() == Rule::named_arg_list)
1417 .flat_map(|p| p.into_inner())
1418 .map(parse_named_arg)
1419 .collect::<ParseResult<Vec<_>>>()?;
1420 Ok(StreamOp::Alert(args))
1421 }
1422 _ => Err(ParseError::UnexpectedToken {
1423 position: 0,
1424 expected: "stream operation".to_string(),
1425 found: format!("{:?}", pair.as_rule()),
1426 }),
1427 }
1428}
1429
1430fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1431 let mut inner = pair.into_inner();
1432 let expr = parse_expr(inner.expect_next("order expression")?)?;
1433 let desc = inner.next().is_some_and(|p| p.as_str() == "desc");
1434 Ok(OrderItem {
1435 expr,
1436 descending: desc,
1437 })
1438}
1439
1440fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1441 let mut inner = pair.into_inner();
1442 let name = inner.expect_next("fork path name")?.as_str().to_string();
1443 let mut ops = Vec::new();
1444 for p in inner {
1445 if p.as_rule() == Rule::stream_op {
1446 ops.push(parse_stream_op(p)?);
1447 }
1448 }
1449 Ok(ForkPath { name, ops })
1450}
1451
1452fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1453 let mut inner = pair.into_inner();
1454 let mut match_all = false;
1455
1456 let first = inner.expect_next("event type or match_all")?;
1457 let event_type = if first.as_rule() == Rule::match_all_keyword {
1458 match_all = true;
1459 inner.expect_next("event type")?.as_str().to_string()
1460 } else {
1461 first.as_str().to_string()
1462 };
1463
1464 let mut filter = None;
1465 let mut alias = None;
1466
1467 for p in inner {
1468 match p.as_rule() {
1469 Rule::or_expr => filter = Some(parse_or_expr(p)?),
1470 Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1471 Rule::identifier => alias = Some(p.as_str().to_string()),
1472 _ => {}
1473 }
1474 }
1475
1476 Ok(StreamOp::FollowedBy(FollowedByClause {
1477 event_type,
1478 filter,
1479 alias,
1480 match_all,
1481 }))
1482}
1483
1484fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1485 let mut inner = pair.into_inner();
1486 let first = inner.expect_next("select field or alias")?;
1487
1488 if let Some(second) = inner.next() {
1489 Ok(SelectItem::Alias(
1490 first.as_str().to_string(),
1491 parse_expr(second)?,
1492 ))
1493 } else {
1494 Ok(SelectItem::Field(first.as_str().to_string()))
1495 }
1496}
1497
1498fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1499 let raw = pair.as_str().trim();
1500 let is_session = raw.starts_with("session");
1501
1502 let mut inner = pair.into_inner();
1503
1504 if is_session {
1505 let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1507 return Ok(WindowArgs {
1508 duration: gap_expr.clone(),
1509 sliding: None,
1510 policy: None,
1511 session_gap: Some(gap_expr),
1512 });
1513 }
1514
1515 let duration = parse_expr(inner.expect_next("window duration")?)?;
1516
1517 let mut sliding = None;
1518 let mut policy = None;
1519
1520 for p in inner {
1521 if p.as_rule() == Rule::expr {
1522 if sliding.is_none() {
1524 sliding = Some(parse_expr(p)?);
1525 } else {
1526 policy = Some(parse_expr(p)?);
1527 }
1528 }
1529 }
1530
1531 Ok(WindowArgs {
1532 duration,
1533 sliding,
1534 policy,
1535 session_gap: None,
1536 })
1537}
1538
1539fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1540 let mut inner = pair.into_inner();
1541 let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1542 let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1543 Ok(AggItem { alias, expr })
1544}
1545
1546fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1547 let mut inner = pair.into_inner();
1548 let name = inner.expect_next("argument name")?.as_str().to_string();
1549 let value = parse_expr(inner.expect_next("argument value")?)?;
1550 Ok(NamedArg { name, value })
1551}
1552
1553fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1554 let mut inner = pair.into_inner();
1555 let name = inner.expect_next("event name")?.as_str().to_string();
1556
1557 let mut extends = None;
1558 let mut fields = Vec::new();
1559
1560 for p in inner {
1561 match p.as_rule() {
1562 Rule::identifier => extends = Some(p.as_str().to_string()),
1563 Rule::field => fields.push(parse_field(p)?),
1564 _ => {}
1565 }
1566 }
1567
1568 Ok(Stmt::EventDecl {
1569 name,
1570 extends,
1571 fields,
1572 })
1573}
1574
1575fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1576 let mut inner = pair.into_inner();
1577 let name = inner.expect_next("field name")?.as_str().to_string();
1578 let ty = parse_type(inner.expect_next("field type")?)?;
1579 let optional = inner.next().is_some();
1580 Ok(Field { name, ty, optional })
1581}
1582
1583fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1584 let mut inner = pair.into_inner();
1585 let name = inner.expect_next("type name")?.as_str().to_string();
1586 let next = inner.expect_next("type body")?;
1587 match next.as_rule() {
1588 Rule::field => {
1589 let mut fields = vec![parse_field(next)?];
1590 for p in inner {
1591 if p.as_rule() == Rule::field {
1592 fields.push(parse_field(p)?);
1593 }
1594 }
1595 Ok(Stmt::TypeDecl {
1596 name,
1597 ty: None,
1598 fields,
1599 })
1600 }
1601 _ => Ok(Stmt::TypeDecl {
1602 name,
1603 ty: Some(parse_type(next)?),
1604 fields: vec![],
1605 }),
1606 }
1607}
1608
1609fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1610 let cloned = pair.clone();
1611 let inner = pair.into_inner().next().unwrap_or(cloned);
1612
1613 match inner.as_rule() {
1614 Rule::primitive_type => match inner.as_str() {
1615 "int" => Ok(Type::Int),
1616 "float" => Ok(Type::Float),
1617 "bool" => Ok(Type::Bool),
1618 "str" => Ok(Type::Str),
1619 "timestamp" => Ok(Type::Timestamp),
1620 "duration" => Ok(Type::Duration),
1621 _ => Ok(Type::Named(inner.as_str().to_string())),
1622 },
1623 Rule::array_type => {
1624 let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1625 Ok(Type::Array(Box::new(inner_type)))
1626 }
1627 Rule::map_type => {
1628 let mut inner_pairs = inner.into_inner();
1629 let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1630 let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1631 Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1632 }
1633 Rule::tuple_type => {
1634 let types: Vec<Type> = inner
1635 .into_inner()
1636 .map(parse_type)
1637 .collect::<ParseResult<Vec<_>>>()?;
1638 Ok(Type::Tuple(types))
1639 }
1640 Rule::stream_type => {
1641 let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1642 Ok(Type::Stream(Box::new(inner_type)))
1643 }
1644 Rule::optional_type => {
1645 let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1646 Ok(Type::Optional(Box::new(inner_type)))
1647 }
1648 Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1649 _ => Ok(Type::Named(inner.as_str().to_string())),
1650 }
1651}
1652
1653fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1654 let mut inner = pair.into_inner();
1655 let keyword = inner.expect_next("var_keyword")?.as_str();
1656 let mutable = keyword == "var";
1657 let name = inner.expect_next("variable name")?.as_str().to_string();
1658
1659 let mut ty = None;
1660 let mut value = Expr::Null;
1661
1662 for p in inner {
1663 match p.as_rule() {
1664 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1665 _ => value = parse_expr(p)?,
1666 }
1667 }
1668
1669 Ok(Stmt::VarDecl {
1670 mutable,
1671 name,
1672 ty,
1673 value,
1674 })
1675}
1676
1677fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1678 let mut inner = pair.into_inner();
1679 let name = inner.expect_next("constant name")?.as_str().to_string();
1680
1681 let mut ty = None;
1682 let mut value = Expr::Null;
1683
1684 for p in inner {
1685 match p.as_rule() {
1686 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1687 _ => value = parse_expr(p)?,
1688 }
1689 }
1690
1691 Ok(Stmt::ConstDecl { name, ty, value })
1692}
1693
1694fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1695 let mut inner = pair.into_inner();
1696 let name = inner.expect_next("function name")?.as_str().to_string();
1697
1698 let mut params = Vec::new();
1699 let mut ret = None;
1700 let mut body = Vec::new();
1701
1702 for p in inner {
1703 match p.as_rule() {
1704 Rule::param_list => {
1705 for param in p.into_inner() {
1706 params.push(parse_param(param)?);
1707 }
1708 }
1709 Rule::type_expr => ret = Some(parse_type(p)?),
1710 Rule::block => body = parse_block(p)?,
1711 Rule::statement => body.push(parse_statement(p)?),
1712 _ => {}
1713 }
1714 }
1715
1716 Ok(Stmt::FnDecl {
1717 name,
1718 params,
1719 ret,
1720 body,
1721 })
1722}
1723
1724fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1725 let mut statements = Vec::new();
1726 for p in pair.into_inner() {
1727 if p.as_rule() == Rule::statement {
1728 statements.push(parse_statement(p)?);
1729 }
1730 }
1731 Ok(statements)
1732}
1733
1734fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1735 let mut inner = pair.into_inner();
1736 let name = inner.expect_next("parameter name")?.as_str().to_string();
1737 let ty = parse_type(inner.expect_next("parameter type")?)?;
1738 Ok(Param { name, ty })
1739}
1740
1741fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1742 let mut inner = pair.into_inner();
1743 let first = inner.expect_next("config name or item")?;
1744
1745 let (name, items_start) = if first.as_rule() == Rule::identifier {
1747 (first.as_str().to_string(), None)
1748 } else {
1749 ("default".to_string(), Some(first))
1751 };
1752
1753 let mut items = Vec::new();
1754
1755 if let Some(first_item) = items_start {
1757 if first_item.as_rule() == Rule::config_item {
1758 items.push(parse_config_item(first_item)?);
1759 }
1760 }
1761
1762 for p in inner {
1763 if p.as_rule() == Rule::config_item {
1764 items.push(parse_config_item(p)?);
1765 }
1766 }
1767 Ok(Stmt::Config { name, items })
1768}
1769
1770fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1771 let mut inner = pair.into_inner();
1772 let key = inner.expect_next("config key")?.as_str().to_string();
1773 let value = parse_config_value(inner.expect_next("config value")?)?;
1774 Ok(ConfigItem::Value(key, value))
1775}
1776
1777fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1778 let cloned = pair.clone();
1779 let inner = pair.into_inner().next().unwrap_or(cloned);
1780
1781 match inner.as_rule() {
1782 Rule::config_concat => {
1783 let parts: Vec<ConfigValue> = inner
1784 .into_inner()
1785 .map(|atom| {
1786 let inner_atom = atom.into_inner().next().expect("config_atom child");
1788 match inner_atom.as_rule() {
1789 Rule::string => {
1790 let s = inner_atom.as_str();
1791 Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1792 }
1793 Rule::identifier => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1794 _ => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1795 }
1796 })
1797 .collect::<ParseResult<Vec<_>>>()?;
1798 Ok(ConfigValue::Concat(parts))
1799 }
1800 Rule::config_array => {
1801 let values: Vec<ConfigValue> = inner
1802 .into_inner()
1803 .map(parse_config_value)
1804 .collect::<ParseResult<Vec<_>>>()?;
1805 Ok(ConfigValue::Array(values))
1806 }
1807 Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1808 Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1809 Rule::string => {
1810 let s = inner.as_str();
1811 Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1812 }
1813 Rule::duration => Ok(ConfigValue::Duration(
1814 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1815 )),
1816 Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1817 Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1818 _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1819 }
1820}
1821
1822fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1823 let mut inner = pair.into_inner();
1824 let path_pair = inner.expect_next("import path")?;
1825 let path = path_pair.as_str();
1826 let path = path[1..path.len() - 1].to_string();
1827 let alias = inner.next().map(|p| p.as_str().to_string());
1828 Ok(Stmt::Import { path, alias })
1829}
1830
1831fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1832 let mut inner = pair.into_inner();
1833 let cond = parse_expr(inner.expect_next("if condition")?)?;
1834
1835 let mut then_branch = Vec::new();
1836 let mut elif_branches = Vec::new();
1837 let mut else_branch = None;
1838
1839 for p in inner {
1840 match p.as_rule() {
1841 Rule::block => then_branch = parse_block(p)?,
1842 Rule::statement => then_branch.push(parse_statement(p)?),
1843 Rule::elif_clause => {
1844 let mut elif_inner = p.into_inner();
1845 let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1846 let mut elif_body = Vec::new();
1847 for ep in elif_inner {
1848 match ep.as_rule() {
1849 Rule::block => elif_body = parse_block(ep)?,
1850 Rule::statement => elif_body.push(parse_statement(ep)?),
1851 _ => {}
1852 }
1853 }
1854 elif_branches.push((elif_cond, elif_body));
1855 }
1856 Rule::else_clause => {
1857 let mut else_body = Vec::new();
1858 for ep in p.into_inner() {
1859 match ep.as_rule() {
1860 Rule::block => else_body = parse_block(ep)?,
1861 Rule::statement => else_body.push(parse_statement(ep)?),
1862 _ => {}
1863 }
1864 }
1865 else_branch = Some(else_body);
1866 }
1867 _ => {}
1868 }
1869 }
1870
1871 Ok(Stmt::If {
1872 cond,
1873 then_branch,
1874 elif_branches,
1875 else_branch,
1876 })
1877}
1878
1879fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1880 let mut inner = pair.into_inner();
1881 let var = inner.expect_next("loop variable")?.as_str().to_string();
1882 let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1883 let mut body = Vec::new();
1884 for p in inner {
1885 match p.as_rule() {
1886 Rule::block => body = parse_block(p)?,
1887 Rule::statement => body.push(parse_statement(p)?),
1888 _ => {}
1889 }
1890 }
1891 Ok(Stmt::For { var, iter, body })
1892}
1893
1894fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1895 let mut inner = pair.into_inner();
1896 let cond = parse_expr(inner.expect_next("while condition")?)?;
1897 let mut body = Vec::new();
1898 for p in inner {
1899 match p.as_rule() {
1900 Rule::block => body = parse_block(p)?,
1901 Rule::statement => body.push(parse_statement(p)?),
1902 _ => {}
1903 }
1904 }
1905 Ok(Stmt::While { cond, body })
1906}
1907
1908fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1909 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1910 Ok(Stmt::Return(expr))
1911}
1912
1913fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1914 let mut inner = pair.into_inner();
1915 let event_type = inner.expect_next("event type name")?.as_str().to_string();
1916 let mut fields = Vec::new();
1917 for p in inner {
1918 if p.as_rule() == Rule::named_arg_list {
1919 for arg in p.into_inner() {
1920 fields.push(parse_named_arg(arg)?);
1921 }
1922 }
1923 }
1924 Ok(Stmt::Emit { event_type, fields })
1925}
1926
1927fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1932 let inner = pair.into_inner().next();
1933
1934 match inner {
1935 Some(p) => parse_expr_inner(p),
1936 None => Ok(Expr::Null),
1937 }
1938}
1939
1940fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1941 match pair.as_rule() {
1942 Rule::expr => parse_expr(pair),
1943 Rule::lambda_expr => parse_lambda_expr(pair),
1944 Rule::range_expr => parse_range_expr(pair),
1945 Rule::or_expr => parse_or_expr(pair),
1946 Rule::and_expr => parse_and_expr(pair),
1947 Rule::not_expr => parse_not_expr(pair),
1948 Rule::comparison_expr => parse_comparison_expr(pair),
1949 Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1950 Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1951 Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1952 Rule::shift_expr => parse_shift_expr(pair),
1953 Rule::additive_expr => parse_additive_expr(pair),
1954 Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1955 Rule::power_expr => parse_power_expr(pair),
1956 Rule::unary_expr => parse_unary_expr(pair),
1957 Rule::postfix_expr => parse_postfix_expr(pair),
1958 Rule::primary_expr => parse_primary_expr(pair),
1959 Rule::literal => parse_literal(pair),
1960 Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1961 Rule::if_expr => parse_if_expr(pair),
1962 _ => Ok(Expr::Ident(pair.as_str().to_string())),
1963 }
1964}
1965
1966fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1967 let mut inner = pair.into_inner();
1968 let mut params = Vec::new();
1969
1970 let first = inner.expect_next("lambda parameters")?;
1972 match first.as_rule() {
1973 Rule::identifier_list => {
1974 for p in first.into_inner() {
1975 params.push(p.as_str().to_string());
1976 }
1977 }
1978 Rule::identifier => {
1979 params.push(first.as_str().to_string());
1980 }
1981 _ => {}
1982 }
1983
1984 let body_pair = inner.expect_next("lambda body")?;
1986 let body = match body_pair.as_rule() {
1987 Rule::lambda_block => parse_lambda_block(body_pair)?,
1988 _ => parse_expr_inner(body_pair)?,
1989 };
1990
1991 Ok(Expr::Lambda {
1992 params,
1993 body: Box::new(body),
1994 })
1995}
1996
1997fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1998 let mut stmts = Vec::new();
1999 let mut final_expr = None;
2000
2001 for p in pair.into_inner() {
2002 match p.as_rule() {
2003 Rule::statement => {
2004 let stmt = parse_statement(p)?;
2006 match &stmt.node {
2007 Stmt::VarDecl {
2008 mutable,
2009 name,
2010 ty,
2011 value,
2012 } => {
2013 stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
2014 }
2015 Stmt::Expr(e) => {
2016 final_expr = Some(e.clone());
2018 }
2019 _ => {
2020 }
2022 }
2023 }
2024 _ => {
2025 final_expr = Some(parse_expr_inner(p)?);
2027 }
2028 }
2029 }
2030
2031 if stmts.is_empty() {
2033 Ok(final_expr.unwrap_or(Expr::Null))
2034 } else {
2035 Ok(Expr::Block {
2036 stmts,
2037 result: Box::new(final_expr.unwrap_or(Expr::Null)),
2038 })
2039 }
2040}
2041
2042fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2043 let mut inner = pair.into_inner();
2046 let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
2047
2048 for right_pair in inner {
2049 let right = parse_pattern_and_as_expr(right_pair)?;
2050 left = Expr::Binary {
2051 op: BinOp::Or,
2052 left: Box::new(left),
2053 right: Box::new(right),
2054 };
2055 }
2056 Ok(left)
2057}
2058
2059fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2060 let mut inner = pair.into_inner();
2061 let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
2062
2063 for right_pair in inner {
2064 let right = parse_pattern_xor_as_expr(right_pair)?;
2065 left = Expr::Binary {
2066 op: BinOp::And,
2067 left: Box::new(left),
2068 right: Box::new(right),
2069 };
2070 }
2071 Ok(left)
2072}
2073
2074fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2075 let mut inner = pair.into_inner();
2076 let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
2077
2078 for right_pair in inner {
2079 let right = parse_pattern_unary_as_expr(right_pair)?;
2080 left = Expr::Binary {
2081 op: BinOp::Xor,
2082 left: Box::new(left),
2083 right: Box::new(right),
2084 };
2085 }
2086 Ok(left)
2087}
2088
2089fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2090 let mut inner = pair.into_inner();
2091 let first = inner.expect_next("unary expression or operand")?;
2092
2093 if first.as_rule() == Rule::not_keyword {
2094 let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
2095 Ok(Expr::Unary {
2096 op: UnaryOp::Not,
2097 expr: Box::new(expr),
2098 })
2099 } else {
2100 parse_pattern_primary_as_expr(first)
2101 }
2102}
2103
2104fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2105 let inner = pair
2106 .into_inner()
2107 .expect_next("pattern primary expression")?;
2108
2109 match inner.as_rule() {
2110 Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
2111 Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
2112 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2113 }
2114}
2115
2116fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2117 let mut inner = pair.into_inner();
2120 let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
2121
2122 for right_pair in inner {
2123 let right = Expr::Ident(right_pair.as_str().to_string());
2124 left = Expr::Binary {
2125 op: BinOp::FollowedBy,
2126 left: Box::new(left),
2127 right: Box::new(right),
2128 };
2129 }
2130 Ok(left)
2131}
2132
2133fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2134 let inner = pair.into_inner().expect_next("filter expression")?;
2135 parse_filter_or_expr(inner)
2136}
2137
2138fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2139 let mut inner = pair.into_inner();
2140 let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
2141
2142 for right_pair in inner {
2143 let right = parse_filter_and_expr(right_pair)?;
2144 left = Expr::Binary {
2145 op: BinOp::Or,
2146 left: Box::new(left),
2147 right: Box::new(right),
2148 };
2149 }
2150 Ok(left)
2151}
2152
2153fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2154 let mut inner = pair.into_inner();
2155 let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2156
2157 for right_pair in inner {
2158 let right = parse_filter_not_expr(right_pair)?;
2159 left = Expr::Binary {
2160 op: BinOp::And,
2161 left: Box::new(left),
2162 right: Box::new(right),
2163 };
2164 }
2165 Ok(left)
2166}
2167
2168fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2169 let mut inner = pair.into_inner();
2170 let first = inner.expect_next("not or expression")?;
2171
2172 if first.as_rule() == Rule::not_keyword {
2173 let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2174 Ok(Expr::Unary {
2175 op: UnaryOp::Not,
2176 expr: Box::new(expr),
2177 })
2178 } else {
2179 parse_filter_comparison_expr(first)
2180 }
2181}
2182
2183fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2184 let mut inner = pair.into_inner();
2185 let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2186
2187 if let Some(op_pair) = inner.next() {
2188 let op = match op_pair.as_str() {
2189 "==" => BinOp::Eq,
2190 "!=" => BinOp::NotEq,
2191 "<" => BinOp::Lt,
2192 "<=" => BinOp::Le,
2193 ">" => BinOp::Gt,
2194 ">=" => BinOp::Ge,
2195 "in" => BinOp::In,
2196 "is" => BinOp::Is,
2197 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2198 _ => BinOp::Eq,
2199 };
2200 let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2201 Ok(Expr::Binary {
2202 op,
2203 left: Box::new(left),
2204 right: Box::new(right),
2205 })
2206 } else {
2207 Ok(left)
2208 }
2209}
2210
2211fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2212 let mut inner = pair.into_inner();
2213 let mut left =
2214 parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2215
2216 while let Some(op_pair) = inner.next() {
2217 let op = if op_pair.as_str() == "-" {
2218 BinOp::Sub
2219 } else {
2220 BinOp::Add
2221 };
2222 if let Some(right_pair) = inner.next() {
2223 let right = parse_filter_multiplicative_expr(right_pair)?;
2224 left = Expr::Binary {
2225 op,
2226 left: Box::new(left),
2227 right: Box::new(right),
2228 };
2229 }
2230 }
2231 Ok(left)
2232}
2233
2234fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2235 let mut inner = pair.into_inner();
2236 let mut left =
2237 parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2238
2239 while let Some(op_pair) = inner.next() {
2240 let op = match op_pair.as_str() {
2241 "*" => BinOp::Mul,
2242 "/" => BinOp::Div,
2243 "%" => BinOp::Mod,
2244 _ => BinOp::Mul,
2245 };
2246 if let Some(right_pair) = inner.next() {
2247 let right = parse_filter_unary_expr(right_pair)?;
2248 left = Expr::Binary {
2249 op,
2250 left: Box::new(left),
2251 right: Box::new(right),
2252 };
2253 }
2254 }
2255 Ok(left)
2256}
2257
2258fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2259 let mut inner = pair.into_inner();
2260 let first = inner.expect_next("unary operator or expression")?;
2261
2262 if first.as_rule() == Rule::filter_unary_op {
2264 let op_str = first.as_str();
2265 let expr =
2266 parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2267 let op = match op_str {
2268 "-" => UnaryOp::Neg,
2269 "~" => UnaryOp::BitNot,
2270 _ => unreachable!("Grammar only allows - or ~"),
2271 };
2272 Ok(Expr::Unary {
2273 op,
2274 expr: Box::new(expr),
2275 })
2276 } else {
2277 parse_filter_postfix_expr(first)
2278 }
2279}
2280
2281fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2282 let mut inner = pair.into_inner();
2283 let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2284
2285 for suffix in inner {
2286 expr = parse_filter_postfix_suffix(expr, suffix)?;
2287 }
2288 Ok(expr)
2289}
2290
2291fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2292 let mut inner = pair.into_inner();
2293
2294 if let Some(first) = inner.next() {
2295 match first.as_rule() {
2296 Rule::identifier => {
2297 Ok(Expr::Member {
2299 expr: Box::new(expr),
2300 member: first.as_str().to_string(),
2301 })
2302 }
2303 Rule::optional_member_access => {
2304 let member = first
2305 .into_inner()
2306 .expect_next("member name")?
2307 .as_str()
2308 .to_string();
2309 Ok(Expr::OptionalMember {
2310 expr: Box::new(expr),
2311 member,
2312 })
2313 }
2314 Rule::index_access => {
2315 let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2316 Ok(Expr::Index {
2317 expr: Box::new(expr),
2318 index: Box::new(index),
2319 })
2320 }
2321 Rule::call_args => {
2322 let args = first
2323 .into_inner()
2324 .filter(|p| p.as_rule() == Rule::arg_list)
2325 .flat_map(|p| p.into_inner())
2326 .map(parse_arg)
2327 .collect::<ParseResult<Vec<_>>>()?;
2328 Ok(Expr::Call {
2329 func: Box::new(expr),
2330 args,
2331 })
2332 }
2333 _ => Ok(expr),
2334 }
2335 } else {
2336 Ok(expr)
2337 }
2338}
2339
2340fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2341 let inner = pair.into_inner().expect_next("filter primary expression")?;
2342
2343 match inner.as_rule() {
2344 Rule::literal => parse_literal(inner),
2345 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2346 Rule::filter_expr => parse_filter_expr(inner),
2347 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2348 }
2349}
2350
2351fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2352 let mut inner = pair.into_inner();
2353 let left = parse_expr_inner(inner.expect_next("range start")?)?;
2354
2355 if let Some(op_pair) = inner.next() {
2356 let inclusive = op_pair.as_str() == "..=";
2357 let right = parse_expr_inner(inner.expect_next("range end")?)?;
2358 Ok(Expr::Range {
2359 start: Box::new(left),
2360 end: Box::new(right),
2361 inclusive,
2362 })
2363 } else {
2364 Ok(left)
2365 }
2366}
2367
2368fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2369 let mut inner = pair.into_inner();
2370 let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2371
2372 for right_pair in inner {
2373 let right = parse_expr_inner(right_pair)?;
2374 left = Expr::Binary {
2375 op: BinOp::Or,
2376 left: Box::new(left),
2377 right: Box::new(right),
2378 };
2379 }
2380
2381 Ok(left)
2382}
2383
2384fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2385 let mut inner = pair.into_inner();
2386 let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2387
2388 for right_pair in inner {
2389 let right = parse_expr_inner(right_pair)?;
2390 left = Expr::Binary {
2391 op: BinOp::And,
2392 left: Box::new(left),
2393 right: Box::new(right),
2394 };
2395 }
2396
2397 Ok(left)
2398}
2399
2400fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2401 let mut inner = pair.into_inner();
2402 let first = inner.expect_next("not keyword or expression")?;
2403
2404 if first.as_rule() == Rule::not_keyword {
2405 let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2406 Ok(Expr::Unary {
2407 op: UnaryOp::Not,
2408 expr: Box::new(expr),
2409 })
2410 } else {
2411 parse_expr_inner(first)
2412 }
2413}
2414
2415fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2416 let mut inner = pair.into_inner();
2417 let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2418
2419 if let Some(op_pair) = inner.next() {
2420 let op_str = op_pair.as_str();
2421 let op = match op_str {
2422 "==" => BinOp::Eq,
2423 "!=" => BinOp::NotEq,
2424 "<" => BinOp::Lt,
2425 "<=" => BinOp::Le,
2426 ">" => BinOp::Gt,
2427 ">=" => BinOp::Ge,
2428 "in" => BinOp::In,
2429 "is" => BinOp::Is,
2430 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2431 _ => BinOp::Eq,
2432 };
2433 let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2434 Ok(Expr::Binary {
2435 op,
2436 left: Box::new(left),
2437 right: Box::new(right),
2438 })
2439 } else {
2440 Ok(left)
2441 }
2442}
2443
2444fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2445 parse_binary_chain(pair, BinOp::BitOr)
2446}
2447
2448fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2449 parse_binary_chain(pair, BinOp::BitXor)
2450}
2451
2452fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2453 parse_binary_chain(pair, BinOp::BitAnd)
2454}
2455
2456fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2457 let mut inner = pair.into_inner();
2458 let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2459
2460 while let Some(op_or_expr) = inner.next() {
2461 let op = match op_or_expr.as_str() {
2462 "<<" => BinOp::Shl,
2463 ">>" => BinOp::Shr,
2464 _ => {
2465 let right = parse_expr_inner(op_or_expr)?;
2466 left = Expr::Binary {
2467 op: BinOp::Shl,
2468 left: Box::new(left),
2469 right: Box::new(right),
2470 };
2471 continue;
2472 }
2473 };
2474 if let Some(right_pair) = inner.next() {
2475 let right = parse_expr_inner(right_pair)?;
2476 left = Expr::Binary {
2477 op,
2478 left: Box::new(left),
2479 right: Box::new(right),
2480 };
2481 }
2482 }
2483
2484 Ok(left)
2485}
2486
2487fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2488 let mut inner = pair.into_inner();
2489 let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2490
2491 while let Some(op_pair) = inner.next() {
2492 let op_text = op_pair.as_str();
2493 let op = if op_text == "-" {
2494 BinOp::Sub
2495 } else {
2496 BinOp::Add
2497 };
2498
2499 if let Some(right_pair) = inner.next() {
2500 let right = parse_expr_inner(right_pair)?;
2501 left = Expr::Binary {
2502 op,
2503 left: Box::new(left),
2504 right: Box::new(right),
2505 };
2506 }
2507 }
2508
2509 Ok(left)
2510}
2511
2512fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2513 let mut inner = pair.into_inner();
2514 let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2515
2516 while let Some(op_pair) = inner.next() {
2517 let op_text = op_pair.as_str();
2518 let op = match op_text {
2519 "*" => BinOp::Mul,
2520 "/" => BinOp::Div,
2521 "%" => BinOp::Mod,
2522 _ => BinOp::Mul,
2523 };
2524
2525 if let Some(right_pair) = inner.next() {
2526 let right = parse_expr_inner(right_pair)?;
2527 left = Expr::Binary {
2528 op,
2529 left: Box::new(left),
2530 right: Box::new(right),
2531 };
2532 }
2533 }
2534
2535 Ok(left)
2536}
2537
2538fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2539 let mut inner = pair.into_inner();
2540 let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2541
2542 if let Some(exp_pair) = inner.next() {
2543 let exp = parse_expr_inner(exp_pair)?;
2544 Ok(Expr::Binary {
2545 op: BinOp::Pow,
2546 left: Box::new(base),
2547 right: Box::new(exp),
2548 })
2549 } else {
2550 Ok(base)
2551 }
2552}
2553
2554fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2555 let mut inner = pair.into_inner();
2556 let first = inner.expect_next("unary operator or expression")?;
2557
2558 match first.as_rule() {
2560 Rule::unary_op => {
2561 let op_str = first.as_str();
2562 let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2563 let op = match op_str {
2564 "-" => UnaryOp::Neg,
2565 "~" => UnaryOp::BitNot,
2566 _ => unreachable!("Grammar only allows - or ~"),
2567 };
2568 Ok(Expr::Unary {
2569 op,
2570 expr: Box::new(expr),
2571 })
2572 }
2573 _ => parse_expr_inner(first),
2574 }
2575}
2576
2577fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2578 let mut inner = pair.into_inner();
2579 let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2580
2581 for suffix in inner {
2582 expr = parse_postfix_suffix(expr, suffix)?;
2583 }
2584
2585 Ok(expr)
2586}
2587
2588fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2589 let inner = pair.into_inner().expect_next("postfix suffix")?;
2590
2591 match inner.as_rule() {
2592 Rule::member_access => {
2593 let member = inner
2594 .into_inner()
2595 .expect_next("member name")?
2596 .as_str()
2597 .to_string();
2598 Ok(Expr::Member {
2599 expr: Box::new(expr),
2600 member,
2601 })
2602 }
2603 Rule::optional_member_access => {
2604 let member = inner
2605 .into_inner()
2606 .expect_next("member name")?
2607 .as_str()
2608 .to_string();
2609 Ok(Expr::OptionalMember {
2610 expr: Box::new(expr),
2611 member,
2612 })
2613 }
2614 Rule::slice_access => {
2615 let slice_range = inner.into_inner().expect_next("slice range")?;
2617 let slice_inner = slice_range.into_inner();
2618
2619 let mut start = None;
2620 let mut end = None;
2621
2622 for p in slice_inner {
2623 match p.as_rule() {
2624 Rule::slice_start => {
2625 start = Some(Box::new(parse_expr_inner(
2626 p.into_inner().expect_next("slice start expression")?,
2627 )?));
2628 }
2629 Rule::slice_end => {
2630 end = Some(Box::new(parse_expr_inner(
2631 p.into_inner().expect_next("slice end expression")?,
2632 )?));
2633 }
2634 _ => {}
2635 }
2636 }
2637
2638 Ok(Expr::Slice {
2639 expr: Box::new(expr),
2640 start,
2641 end,
2642 })
2643 }
2644 Rule::index_access => {
2645 let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2646 Ok(Expr::Index {
2647 expr: Box::new(expr),
2648 index: Box::new(index),
2649 })
2650 }
2651 Rule::call_args => {
2652 let mut args = Vec::new();
2653 for p in inner.into_inner() {
2654 if p.as_rule() == Rule::arg_list {
2655 for arg in p.into_inner() {
2656 args.push(parse_arg(arg)?);
2657 }
2658 }
2659 }
2660 Ok(Expr::Call {
2661 func: Box::new(expr),
2662 args,
2663 })
2664 }
2665 _ => Ok(expr),
2666 }
2667}
2668
2669fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2670 let mut inner = pair.into_inner();
2671 let first = inner.expect_next("argument")?;
2672
2673 if let Some(second) = inner.next() {
2674 Ok(Arg::Named(
2675 first.as_str().to_string(),
2676 parse_expr_inner(second)?,
2677 ))
2678 } else {
2679 Ok(Arg::Positional(parse_expr_inner(first)?))
2680 }
2681}
2682
2683fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2684 let inner = pair.into_inner().expect_next("primary expression")?;
2685
2686 match inner.as_rule() {
2687 Rule::if_expr => parse_if_expr(inner),
2688 Rule::literal => parse_literal(inner),
2689 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2690 Rule::array_literal => parse_array_literal(inner),
2691 Rule::map_literal => parse_map_literal(inner),
2692 Rule::expr => parse_expr(inner),
2693 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2694 }
2695}
2696
2697fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2698 let mut inner = pair.into_inner();
2699 let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2700 let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2701 let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2702
2703 Ok(Expr::If {
2704 cond: Box::new(cond),
2705 then_branch: Box::new(then_branch),
2706 else_branch: Box::new(else_branch),
2707 })
2708}
2709
2710fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2711 let inner = pair.into_inner().expect_next("literal value")?;
2712
2713 match inner.as_rule() {
2714 Rule::integer => inner
2715 .as_str()
2716 .parse::<i64>()
2717 .map(Expr::Int)
2718 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2719 Rule::float => inner
2720 .as_str()
2721 .parse::<f64>()
2722 .map(Expr::Float)
2723 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2724 Rule::string => {
2725 let s = inner.as_str();
2726 Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2727 }
2728 Rule::duration => Ok(Expr::Duration(
2729 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2730 )),
2731 Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2732 Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2733 Rule::null => Ok(Expr::Null),
2734 _ => Ok(Expr::Null),
2735 }
2736}
2737
2738fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2739 let mut items = Vec::new();
2740 for p in pair.into_inner() {
2741 if p.as_rule() == Rule::expr_list {
2742 for expr in p.into_inner() {
2743 items.push(parse_expr(expr)?);
2744 }
2745 }
2746 }
2747 Ok(Expr::Array(items))
2748}
2749
2750fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2751 let mut entries = Vec::new();
2752 for p in pair.into_inner() {
2753 if p.as_rule() == Rule::map_entry_list {
2754 for entry in p.into_inner() {
2755 let mut inner = entry.into_inner();
2756 let key = inner.expect_next("map key")?.as_str().to_string();
2757 let key = if key.starts_with('"') {
2758 key[1..key.len() - 1].to_string()
2759 } else {
2760 key
2761 };
2762 let value = parse_expr(inner.expect_next("map value")?)?;
2763 entries.push((key, value));
2764 }
2765 }
2766 }
2767 Ok(Expr::Map(entries))
2768}
2769
2770fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2771 let mut inner = pair.into_inner();
2772 let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2773
2774 for right_pair in inner {
2775 let right = parse_expr_inner(right_pair)?;
2776 left = Expr::Binary {
2777 op,
2778 left: Box::new(left),
2779 right: Box::new(right),
2780 };
2781 }
2782
2783 Ok(left)
2784}
2785
2786#[cfg(test)]
2787mod tests {
2788 use super::*;
2789
2790 #[test]
2791 fn test_parse_simple_stream() {
2792 let result = parse("stream output = input");
2793 assert!(result.is_ok(), "Failed: {:?}", result.err());
2794 }
2795
2796 #[test]
2797 fn test_parse_stream_with_filter() {
2798 let result = parse("stream output = input.where(value > 100)");
2799 assert!(result.is_ok(), "Failed: {:?}", result.err());
2800 }
2801
2802 #[test]
2803 fn test_parse_stream_with_map() {
2804 let result = parse("stream output = input.map(x * 2)");
2805 assert!(result.is_ok(), "Failed: {:?}", result.err());
2806 }
2807
2808 #[test]
2809 fn test_parse_event_declaration() {
2810 let result = parse("event SensorReading:\n sensor_id: str\n value: float");
2811 assert!(result.is_ok(), "Failed: {:?}", result.err());
2812 }
2813
2814 #[test]
2815 fn test_parse_variable() {
2816 let result = parse("let x = 42");
2817 assert!(result.is_ok(), "Failed: {:?}", result.err());
2818 }
2819
2820 #[test]
2821 fn test_parse_function() {
2822 let result = parse("fn add(a: int, b: int) -> int:\n return a + b");
2823 assert!(result.is_ok(), "Failed: {:?}", result.err());
2824 }
2825
2826 #[test]
2827 fn test_parse_lambda() {
2828 let result = parse("let f = (x) => x * 2");
2829 assert!(result.is_ok(), "Failed: {:?}", result.err());
2830 }
2831
2832 #[test]
2833 fn test_parse_if_expression() {
2834 let result = parse("let x = if a > b then a else b");
2835 assert!(result.is_ok(), "Failed: {:?}", result.err());
2836 }
2837
2838 #[test]
2839 fn test_parse_followed_by() {
2840 let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2841 assert!(result.is_ok(), "Failed: {:?}", result.err());
2842 }
2843
2844 #[test]
2845 fn test_parse_window() {
2846 let result = parse("stream windowed = input.window(5s)");
2847 assert!(result.is_ok(), "Failed: {:?}", result.err());
2848 }
2849
2850 #[test]
2851 fn test_parse_aggregate() {
2852 let result =
2853 parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2854 assert!(result.is_ok(), "Failed: {:?}", result.err());
2855 }
2856
2857 #[test]
2858 fn test_parse_merge() {
2859 let result = parse("stream combined = merge(stream1, stream2)");
2860 assert!(result.is_ok(), "Failed: {:?}", result.err());
2861 }
2862
2863 #[test]
2864 fn test_parse_sequence() {
2865 let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2866 assert!(result.is_ok(), "Failed: {:?}", result.err());
2867 }
2868
2869 #[test]
2870 fn test_parse_config() {
2871 let result = parse("config:\n window_size: 5s\n batch_size: 100");
2872 assert!(result.is_ok(), "Failed: {:?}", result.err());
2873 }
2874
2875 #[test]
2876 fn test_parse_complex_expression() {
2877 let result = parse("let x = (a + b) * c / d - e");
2878 assert!(result.is_ok(), "Failed: {:?}", result.err());
2879 }
2880
2881 #[test]
2882 fn test_parse_sliding_window() {
2883 let result = parse("stream output = input.window(5m, sliding: 1m)");
2884 assert!(result.is_ok(), "Failed: {:?}", result.err());
2885 }
2886
2887 #[test]
2888 fn test_parse_fork_construct() {
2889 let result =
2890 parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2891 assert!(result.is_ok(), "Failed: {:?}", result.err());
2892 }
2893
2894 #[test]
2895 fn test_parse_aggregate_functions() {
2896 let result = parse(
2897 "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2898 );
2899 assert!(result.is_ok(), "Failed: {:?}", result.err());
2900 }
2901
2902 #[test]
2903 fn test_parse_complex_parentheses() {
2904 let result = parse("let x = ((a + b) * (c - d)) / e");
2905 assert!(result.is_ok(), "Failed: {:?}", result.err());
2906 }
2907
2908 #[test]
2909 fn test_parse_sequence_with_alias() {
2910 let result = parse(
2911 r#"
2912 stream TwoTicks = StockTick as first
2913 -> StockTick as second
2914 .emit(result: "two_ticks")
2915 "#,
2916 );
2917 assert!(result.is_ok(), "Failed: {:?}", result.err());
2918 }
2919
2920 #[test]
2921 fn test_parse_followed_by_with_alias() {
2922 let result = parse("stream alerts = Order as a -> Payment as b");
2923 assert!(result.is_ok(), "Failed: {:?}", result.err());
2924 }
2925
2926 #[test]
2927 fn test_parse_followed_by_with_filter_and_alias() {
2928 let result = parse(
2930 r#"
2931 stream Test = A as a
2932 -> B where value == a.base + 10 as b
2933 .emit(status: "matched")
2934 "#,
2935 );
2936 assert!(result.is_ok(), "Failed: {:?}", result.err());
2937 }
2938
2939 #[test]
2940 fn test_parse_pattern_with_lambda() {
2941 let result =
2942 parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2943 assert!(result.is_ok(), "Failed: {:?}", result.err());
2944 }
2945
2946 #[test]
2947 fn test_parse_sase_pattern_decl_simple() {
2948 let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2949 assert!(result.is_ok(), "Failed: {:?}", result.err());
2950 }
2951
2952 #[test]
2953 fn test_parse_sase_pattern_decl_with_kleene() {
2954 let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2955 assert!(result.is_ok(), "Failed: {:?}", result.err());
2956 }
2957
2958 #[test]
2959 fn test_parse_sase_pattern_decl_with_alias() {
2960 let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2961 assert!(result.is_ok(), "Failed: {:?}", result.err());
2962 }
2963
2964 #[test]
2965 fn test_parse_sase_pattern_decl_with_within() {
2966 let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2967 assert!(result.is_ok(), "Failed: {:?}", result.err());
2968 }
2969
2970 #[test]
2971 fn test_parse_sase_pattern_decl_with_partition() {
2972 let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2973 assert!(result.is_ok(), "Failed: {:?}", result.err());
2974 }
2975
2976 #[test]
2977 fn test_parse_sase_pattern_decl_full() {
2978 let result = parse(
2980 "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2981 );
2982 assert!(result.is_ok(), "Failed: {:?}", result.err());
2983 }
2984
2985 #[test]
2986 fn test_parse_sase_pattern_decl_or() {
2987 let result = parse("pattern AlertOrWarn = Login OR Logout");
2988 assert!(result.is_ok(), "Failed: {:?}", result.err());
2989 }
2990
2991 #[test]
2992 fn test_parse_sase_pattern_decl_and() {
2993 let result = parse("pattern BothEvents = Login AND Transaction");
2994 assert!(result.is_ok(), "Failed: {:?}", result.err());
2995 }
2996
2997 #[test]
2998 fn test_parse_sase_pattern_decl_not() {
2999 let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
3000 assert!(result.is_ok(), "Failed: {:?}", result.err());
3001 }
3002
3003 #[test]
3004 fn test_parse_having() {
3005 let result = parse(
3006 "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
3007 );
3008 assert!(result.is_ok(), "Failed: {:?}", result.err());
3009 }
3010
3011 #[test]
3012 fn test_parse_having_with_partition() {
3013 let result = parse(
3014 "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
3015 );
3016 assert!(result.is_ok(), "Failed: {:?}", result.err());
3017 }
3018
3019 #[test]
3020 fn test_parse_timer_source() {
3021 let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
3022 assert!(result.is_ok(), "Failed: {:?}", result.err());
3023 }
3024
3025 #[test]
3026 fn test_parse_timer_source_with_initial_delay() {
3027 let result =
3028 parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
3029 assert!(result.is_ok(), "Failed: {:?}", result.err());
3030 }
3031
3032 #[test]
3033 fn test_parse_var_decl() {
3034 let result = parse("var threshold: float = 10.0");
3035 assert!(result.is_ok(), "Failed: {:?}", result.err());
3036 }
3037
3038 #[test]
3039 fn test_parse_let_decl() {
3040 let result = parse("let max_count: int = 100");
3041 assert!(result.is_ok(), "Failed: {:?}", result.err());
3042 }
3043
3044 #[test]
3045 fn test_parse_assignment() {
3046 let result = parse("threshold := threshold + 10.0");
3047 assert!(result.is_ok(), "Failed: {:?}", result.err());
3048 }
3049
3050 #[test]
3051 fn test_parse_assignment_with_expression() {
3052 let result = parse("count := count * 2 + offset");
3053 assert!(result.is_ok(), "Failed: {:?}", result.err());
3054 }
3055
3056 #[test]
3057 fn test_parse_nested_stream_reference() {
3058 let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
3059 assert!(result.is_ok(), "Failed: {:?}", result.err());
3060 }
3061
3062 #[test]
3063 fn test_parse_multi_stage_pipeline() {
3064 let result = parse(
3065 "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
3066 );
3067 assert!(result.is_ok(), "Failed: {:?}", result.err());
3068 }
3069
3070 #[test]
3071 fn test_parse_stream_with_operations_chain() {
3072 let result = parse(
3073 "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
3074 );
3075 assert!(result.is_ok(), "Failed: {:?}", result.err());
3076 }
3077
3078 #[test]
3083 fn test_parse_connector_mqtt() {
3084 let result = parse(
3085 r#"connector MqttBroker = mqtt (
3086 host: "localhost",
3087 port: 1883,
3088 client_id: "varpulis"
3089 )"#,
3090 );
3091 assert!(result.is_ok(), "Failed: {:?}", result.err());
3092 }
3093
3094 #[test]
3095 fn test_parse_connector_kafka() {
3096 let result = parse(
3097 r#"connector KafkaCluster = kafka (
3098 brokers: ["kafka1:9092", "kafka2:9092"],
3099 group_id: "my-group"
3100 )"#,
3101 );
3102 assert!(result.is_ok(), "Failed: {:?}", result.err());
3103 }
3104
3105 #[test]
3106 fn test_parse_connector_http() {
3107 let result = parse(
3108 r#"connector ApiEndpoint = http (
3109 base_url: "https://api.example.com"
3110 )"#,
3111 );
3112 assert!(result.is_ok(), "Failed: {:?}", result.err());
3113 }
3114
3115 #[test]
3116 fn test_parse_stream_with_from_connector() {
3117 let result = parse(
3118 r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
3119 );
3120 assert!(result.is_ok(), "Failed: {:?}", result.err());
3121 }
3122
3123 #[test]
3124 fn test_parse_stream_with_from_and_operations() {
3125 let result = parse(
3126 r#"stream HighTemp = TemperatureReading
3127 .from(MqttSensors, topic: "sensors/#")
3128 .where(value > 30)
3129 .emit(alert: "high_temp")"#,
3130 );
3131 assert!(result.is_ok(), "Failed: {:?}", result.err());
3132 }
3133
3134 #[test]
3135 fn test_parse_full_connectivity_pipeline() {
3136 let result = parse(
3137 r#"
3138 connector MqttSensors = mqtt (host: "localhost", port: 1883)
3139 connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
3140
3141 event TemperatureReading:
3142 sensor_id: str
3143 value: float
3144 ts: timestamp
3145
3146 stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
3147
3148 stream HighTempAlert = Temperatures
3149 .where(value > 30)
3150 .emit(alert_type: "HIGH_TEMP", temperature: value)
3151
3152 "#,
3153 );
3154 assert!(result.is_ok(), "Failed: {:?}", result.err());
3155 }
3156
3157 #[test]
3158 fn test_parse_emit_as_type() {
3159 let result = parse(
3160 r#"stream Alerts = Temperatures
3161 .where(value > 30)
3162 .emit as AlertEvent(severity: "high", temp: value)"#,
3163 );
3164 assert!(result.is_ok(), "Failed: {:?}", result.err());
3165 }
3166
3167 #[test]
3168 fn test_parse_stream_with_to_connector() {
3169 let result = parse(
3170 r#"stream Output = Input
3171 .where(x > 0)
3172 .emit(y: x * 2)
3173 .to(KafkaOutput, topic: "output")"#,
3174 );
3175 assert!(result.is_ok(), "Failed: {:?}", result.err());
3176 }
3177
3178 #[test]
3179 fn test_emit_stmt_parses() {
3180 let result = parse(
3181 r"fn test():
3182 emit Pixel(x: 1, y: 2)",
3183 );
3184 assert!(result.is_ok(), "Failed: {:?}", result.err());
3185 let program = result.unwrap();
3186 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3188 match &body[0].node {
3189 Stmt::Emit { event_type, fields } => {
3190 assert_eq!(event_type, "Pixel");
3191 assert_eq!(fields.len(), 2);
3192 assert_eq!(fields[0].name, "x");
3193 assert_eq!(fields[1].name, "y");
3194 }
3195 other => panic!("Expected Stmt::Emit, got {other:?}"),
3196 }
3197 } else {
3198 panic!("Expected FnDecl");
3199 }
3200 }
3201
3202 #[test]
3203 fn test_emit_stmt_no_args() {
3204 let result = parse(
3205 r"fn test():
3206 emit Done()",
3207 );
3208 assert!(result.is_ok(), "Failed: {:?}", result.err());
3209 let program = result.unwrap();
3210 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3211 match &body[0].node {
3212 Stmt::Emit { event_type, fields } => {
3213 assert_eq!(event_type, "Done");
3214 assert!(fields.is_empty());
3215 }
3216 other => panic!("Expected Stmt::Emit, got {other:?}"),
3217 }
3218 } else {
3219 panic!("Expected FnDecl");
3220 }
3221 }
3222
3223 #[test]
3224 fn test_emit_in_function_with_for_loop() {
3225 let result = parse(
3226 r"fn generate(n: int):
3227 for i in 0..n:
3228 emit Item(index: i, value: i * 2)",
3229 );
3230 assert!(result.is_ok(), "Failed: {:?}", result.err());
3231 }
3232
3233 #[test]
3234 fn test_parse_process_op() {
3235 let result = parse(
3236 r"fn do_work():
3237 emit Result(v: 42)
3238
3239stream S = timer(1s).process(do_work())",
3240 );
3241 assert!(result.is_ok(), "Failed: {:?}", result.err());
3242 }
3243
3244 #[test]
3245 fn test_parse_trend_aggregate_count_trends() {
3246 let result = parse(
3247 r#"stream S = StockTick as first
3248 -> all StockTick where price > first.price as rising
3249 -> StockTick where price < rising.price as drop
3250 .within(60s)
3251 .trend_aggregate(count: count_trends())
3252 .emit(event_type: "TrendStats", trends: count)"#,
3253 );
3254 assert!(result.is_ok(), "Failed: {:?}", result.err());
3255 let program = result.unwrap();
3256 for stmt in &program.statements {
3258 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3259 let has_trend_agg = ops
3260 .iter()
3261 .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3262 assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3263 for op in ops {
3265 if let StreamOp::TrendAggregate(items) = op {
3266 assert_eq!(items.len(), 1);
3267 assert_eq!(items[0].alias, "count");
3268 assert_eq!(items[0].func, "count_trends");
3269 assert!(items[0].arg.is_none());
3270 }
3271 }
3272 return;
3273 }
3274 }
3275 panic!("No stream declaration found");
3276 }
3277
3278 #[test]
3279 fn test_parse_trend_aggregate_multiple_items() {
3280 let result = parse(
3281 r"stream S = StockTick as first
3282 -> all StockTick as rising
3283 .within(60s)
3284 .trend_aggregate(
3285 trend_count: count_trends(),
3286 event_count: count_events(rising)
3287 )
3288 .emit(trends: trend_count, events: event_count)",
3289 );
3290 assert!(result.is_ok(), "Failed: {:?}", result.err());
3291 let program = result.unwrap();
3292 for stmt in &program.statements {
3293 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3294 for op in ops {
3295 if let StreamOp::TrendAggregate(items) = op {
3296 assert_eq!(items.len(), 2);
3297 assert_eq!(items[0].alias, "trend_count");
3298 assert_eq!(items[0].func, "count_trends");
3299 assert_eq!(items[1].alias, "event_count");
3300 assert_eq!(items[1].func, "count_events");
3301 assert!(items[1].arg.is_some());
3302 return;
3303 }
3304 }
3305 }
3306 }
3307 panic!("No TrendAggregate found");
3308 }
3309
3310 #[test]
3311 fn test_parse_score_basic() {
3312 let result = parse(
3313 r#"stream S = TradeEvent
3314 .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3315 );
3316 assert!(result.is_ok(), "Failed: {:?}", result.err());
3317 let program = result.unwrap();
3318 for stmt in &program.statements {
3319 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3320 for op in ops {
3321 if let StreamOp::Score(spec) = op {
3322 assert_eq!(spec.model_path, "models/fraud.onnx");
3323 assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3324 assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3325 return;
3326 }
3327 }
3328 }
3329 }
3330 panic!("No Score op found");
3331 }
3332
3333 #[test]
3334 fn test_parse_score_single_field() {
3335 let result = parse(
3336 r#"stream S = Event
3337 .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3338 );
3339 assert!(result.is_ok(), "Failed: {:?}", result.err());
3340 let program = result.unwrap();
3341 for stmt in &program.statements {
3342 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3343 for op in ops {
3344 if let StreamOp::Score(spec) = op {
3345 assert_eq!(spec.model_path, "model.onnx");
3346 assert_eq!(spec.inputs, vec!["value"]);
3347 assert_eq!(spec.outputs, vec!["prediction"]);
3348 return;
3349 }
3350 }
3351 }
3352 }
3353 panic!("No Score op found");
3354 }
3355
3356 #[test]
3361 fn fuzz_regression_unmatched_brackets_timeout() {
3362 let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3364 let start = std::time::Instant::now();
3365 let result = parse(input);
3366 let elapsed = start.elapsed();
3367 assert!(
3368 result.is_err(),
3369 "Should reject deeply nested unmatched brackets"
3370 );
3371 assert!(
3372 elapsed.as_millis() < 100,
3373 "Parser should reject fast, took {elapsed:?}"
3374 );
3375 }
3376
3377 #[test]
3378 fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3379 let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3381 let start = std::time::Instant::now();
3382 let result = parse(input);
3383 let elapsed = start.elapsed();
3384 assert!(result.is_err(), "Should reject deeply nested brackets");
3385 assert!(
3386 elapsed.as_millis() < 100,
3387 "Parser should reject fast, took {elapsed:?}"
3388 );
3389 }
3390
3391 #[test]
3392 fn nesting_depth_allows_reasonable_programs() {
3393 let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3395 let result = parse(input);
3396 if let Err(ref e) = result {
3399 let msg = format!("{e}");
3400 assert!(
3401 !msg.contains("Nesting depth"),
3402 "Should allow 10 levels of nesting: {msg}"
3403 );
3404 }
3405 }
3406
3407 #[test]
3408 fn nesting_depth_ignores_brackets_in_comments() {
3409 let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3411 let result = parse(input);
3412 assert!(
3413 result.is_ok(),
3414 "Brackets in comments should be ignored: {:?}",
3415 result.err()
3416 );
3417 }
3418
3419 #[test]
3420 fn nesting_depth_ignores_brackets_in_strings() {
3421 let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3423 let result = parse(input);
3424 if let Err(ref e) = result {
3425 let msg = format!("{e}");
3426 assert!(
3427 !msg.contains("Nesting depth"),
3428 "Brackets in strings should be ignored: {msg}"
3429 );
3430 }
3431 }
3432}