1use pest::Parser;
6use pest_derive::Parser;
7
8use crate::error::{ParseError, ParseResult};
9use crate::helpers::{parse_duration, parse_timestamp};
10use crate::indent::preprocess_indentation;
11use varpulis_core::ast::*;
12use varpulis_core::span::{Span, Spanned};
13use varpulis_core::types::Type;
14
15trait IteratorExt<'a> {
17 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
19}
20
21impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
22 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
23 self.next().ok_or_else(|| ParseError::Located {
24 line: 0,
25 column: 0,
26 position: 0,
27 message: format!("Expected {}", expected),
28 hint: None,
29 })
30 }
31}
32
33#[derive(Parser)]
34#[grammar = "varpulis.pest"]
35pub struct VarpulisParser;
36
37pub fn parse(source: &str) -> ParseResult<Program> {
43 let source = source.to_string();
44 std::thread::Builder::new()
45 .stack_size(16 * 1024 * 1024)
46 .spawn(move || parse_inner(&source))
47 .map_err(|e| ParseError::InvalidToken {
48 position: 0,
49 message: format!("Failed to spawn parser thread: {}", e),
50 })?
51 .join()
52 .unwrap_or_else(|_| {
53 Err(ParseError::InvalidToken {
54 position: 0,
55 message: "Parser stack overflow on deeply nested input".to_string(),
56 })
57 })
58}
59
60const MAX_NESTING_DEPTH: usize = 10;
69
70fn check_nesting_depth(source: &str) -> ParseResult<()> {
74 let mut depth: usize = 0;
75 let mut max_depth: usize = 0;
76 let mut max_depth_pos: usize = 0;
77 let bytes = source.as_bytes();
78 let len = bytes.len();
79 let mut i = 0;
80
81 while i < len {
82 let b = bytes[i];
83
84 if b == b'"' {
86 i += 1;
87 while i < len {
88 if bytes[i] == b'\\' {
89 i += 2; continue;
91 }
92 if bytes[i] == b'"' {
93 i += 1;
94 break;
95 }
96 i += 1;
97 }
98 continue;
99 }
100
101 if b == b'#' {
103 i += 1;
104 while i < len && bytes[i] != b'\n' {
105 i += 1;
106 }
107 continue;
108 }
109
110 if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
112 i += 2;
113 while i + 1 < len {
114 if bytes[i] == b'*' && bytes[i + 1] == b'/' {
115 i += 2;
116 break;
117 }
118 i += 1;
119 }
120 continue;
121 }
122
123 if b == b'(' || b == b'[' || b == b'{' {
125 depth += 1;
126 if depth > max_depth {
127 max_depth = depth;
128 max_depth_pos = i;
129 }
130 } else if b == b')' || b == b']' || b == b'}' {
131 depth = depth.saturating_sub(1);
132 }
133
134 if max_depth > MAX_NESTING_DEPTH {
135 return Err(ParseError::InvalidToken {
136 position: max_depth_pos,
137 message: format!(
138 "Nesting depth exceeds maximum of {} levels",
139 MAX_NESTING_DEPTH
140 ),
141 });
142 }
143
144 i += 1;
145 }
146
147 Ok(())
148}
149
150fn parse_inner(source: &str) -> ParseResult<Program> {
151 let expanded =
153 crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
154 position: 0,
155 message: e,
156 })?;
157 let preprocessed = preprocess_indentation(&expanded);
159
160 check_nesting_depth(&preprocessed)?;
162
163 let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
164
165 let mut statements = Vec::new();
166
167 for pair in pairs {
168 if pair.as_rule() == Rule::program {
169 for inner in pair.into_inner() {
170 if inner.as_rule() == Rule::statement {
171 statements.push(parse_statement(inner)?);
172 }
173 }
174 }
175 }
176
177 Ok(crate::optimize::fold_program(Program { statements }))
178}
179
180fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
181 let position = match e.location {
182 pest::error::InputLocation::Pos(p) => p,
183 pest::error::InputLocation::Span((s, _)) => s,
184 };
185
186 let (line, column) = match e.line_col {
188 pest::error::LineColLocation::Pos((l, c)) => (l, c),
189 pest::error::LineColLocation::Span((l, c), _) => (l, c),
190 };
191
192 let (message, hint) = match &e.variant {
194 pest::error::ErrorVariant::ParsingError {
195 positives,
196 negatives: _,
197 } => {
198 if positives.is_empty() {
199 ("Unexpected token".to_string(), None)
200 } else if is_stream_op_error(positives) {
201 (
203 "unknown stream operation".to_string(),
204 Some(
205 "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
206 .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
207 .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
208 .to_string(),
209 ),
210 )
211 } else {
212 let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
213 if expected.len() == 1 {
214 (format!("Expected {}", expected[0]), None)
215 } else {
216 (format!("Expected one of: {}", expected.join(", ")), None)
217 }
218 }
219 }
220 pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
221 };
222
223 ParseError::Located {
224 line,
225 column,
226 position,
227 message,
228 hint,
229 }
230}
231
232fn format_rule_name(rule: &Rule) -> String {
234 match rule {
235 Rule::identifier => "identifier".to_string(),
236 Rule::integer => "number".to_string(),
237 Rule::float => "number".to_string(),
238 Rule::string => "string".to_string(),
239 Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
240 Rule::type_expr => "type".to_string(),
241 Rule::expr => "expression".to_string(),
242 Rule::statement => "statement".to_string(),
243 Rule::context_decl => "context declaration".to_string(),
244 Rule::stream_decl => "stream declaration".to_string(),
245 Rule::pattern_decl => "pattern declaration".to_string(),
246 Rule::event_decl => "event declaration".to_string(),
247 Rule::fn_decl => "function declaration".to_string(),
248 Rule::INDENT => "indented block".to_string(),
249 Rule::DEDENT => "end of block".to_string(),
250 Rule::field => "field declaration (name: type)".to_string(),
251 Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
252 Rule::additive_op => "operator (+, -)".to_string(),
253 Rule::multiplicative_op => "operator (*, /, %)".to_string(),
254 Rule::postfix_suffix => "method call or member access".to_string(),
255 Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
256 Rule::sase_seq_expr => "SEQ expression".to_string(),
257 Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
258 _ => format!("{:?}", rule).to_lowercase().replace('_', " "),
259 }
260}
261
262fn is_stream_op_error(positives: &[Rule]) -> bool {
266 const STREAM_OP_RULES: &[Rule] = &[
267 Rule::context_op,
268 Rule::where_op,
269 Rule::select_op,
270 Rule::window_op,
271 Rule::aggregate_op,
272 Rule::having_op,
273 Rule::partition_by_op,
274 Rule::order_by_op,
275 Rule::limit_op,
276 Rule::distinct_op,
277 Rule::map_op,
278 Rule::filter_op,
279 Rule::tap_op,
280 Rule::print_op,
281 Rule::log_op,
282 Rule::emit_op,
283 Rule::to_op,
284 Rule::pattern_op,
285 Rule::concurrent_op,
286 Rule::process_op,
287 Rule::on_error_op,
288 Rule::collect_op,
289 Rule::on_op,
290 Rule::within_op,
291 Rule::not_op,
292 Rule::fork_op,
293 Rule::any_op,
294 Rule::all_op,
295 Rule::first_op,
296 Rule::watermark_op,
297 Rule::allowed_lateness_op,
298 Rule::trend_aggregate_op,
299 Rule::score_op,
300 Rule::forecast_op,
301 Rule::enrich_op,
302 ];
303 positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
304}
305
306fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
307 let span = Span::new(pair.as_span().start(), pair.as_span().end());
308 let inner = pair.into_inner().expect_next("statement body")?;
309
310 let stmt = match inner.as_rule() {
311 Rule::context_decl => parse_context_decl(inner)?,
312 Rule::connector_decl => parse_connector_decl(inner)?,
313 Rule::stream_decl => parse_stream_decl(inner)?,
314 Rule::pattern_decl => parse_pattern_decl(inner)?,
315 Rule::event_decl => parse_event_decl(inner)?,
316 Rule::type_decl => parse_type_decl(inner)?,
317 Rule::var_decl => parse_var_decl(inner)?,
318 Rule::const_decl => parse_const_decl(inner)?,
319 Rule::fn_decl => parse_fn_decl(inner)?,
320 Rule::config_block => parse_config_block(inner)?,
321 Rule::import_stmt => parse_import_stmt(inner)?,
322 Rule::if_stmt => parse_if_stmt(inner)?,
323 Rule::for_stmt => parse_for_stmt(inner)?,
324 Rule::while_stmt => parse_while_stmt(inner)?,
325 Rule::return_stmt => parse_return_stmt(inner)?,
326 Rule::break_stmt => Stmt::Break,
327 Rule::continue_stmt => Stmt::Continue,
328 Rule::emit_stmt => parse_emit_stmt(inner)?,
329 Rule::assignment_stmt => {
330 let mut inner = inner.into_inner();
331 let name = inner.expect_next("variable name")?.as_str().to_string();
332 let value = parse_expr(inner.expect_next("assignment value")?)?;
333 Stmt::Assignment { name, value }
334 }
335 Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
336 _ => {
337 return Err(ParseError::UnexpectedToken {
338 position: span.start,
339 expected: "statement".to_string(),
340 found: format!("{:?}", inner.as_rule()),
341 })
342 }
343 };
344
345 Ok(Spanned::new(stmt, span))
346}
347
348fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
353 let mut inner = pair.into_inner();
354 let name = inner.expect_next("context name")?.as_str().to_string();
355 let mut cores = None;
356
357 for p in inner {
358 if p.as_rule() == Rule::context_params {
359 for param in p.into_inner() {
360 if param.as_rule() == Rule::context_param {
361 let core_ids: Vec<usize> = param
363 .into_inner()
364 .filter(|p| p.as_rule() == Rule::integer)
365 .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
366 .collect();
367 cores = Some(core_ids);
368 }
369 }
370 }
371 }
372
373 Ok(Stmt::ContextDecl { name, cores })
374}
375
376fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
381 let mut inner = pair.into_inner();
382 let name = inner.expect_next("connector name")?.as_str().to_string();
383 let connector_type = inner.expect_next("connector type")?.as_str().to_string();
384 let mut params = Vec::new();
385
386 for p in inner {
387 if p.as_rule() == Rule::connector_params {
388 params = parse_connector_params(p)?;
389 }
390 }
391
392 Ok(Stmt::ConnectorDecl {
393 name,
394 connector_type,
395 params,
396 })
397}
398
399fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
400 let mut params = Vec::new();
401 for p in pair.into_inner() {
402 if p.as_rule() == Rule::connector_param {
403 let mut inner = p.into_inner();
404 let name = inner.expect_next("param name")?.as_str().to_string();
405 let value_pair = inner.expect_next("param value")?;
406 let value = parse_config_value(value_pair)?;
407 params.push(ConnectorParam { name, value });
408 }
409 }
410 Ok(params)
411}
412
413fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
414 let mut inner = pair.into_inner();
415 let name = inner.expect_next("stream name")?.as_str().to_string();
416
417 let mut type_annotation = None;
418 let mut source = StreamSource::Ident("".to_string());
419 let mut ops = Vec::new();
420 let mut op_spans = Vec::new();
421
422 for p in inner {
423 match p.as_rule() {
424 Rule::type_annotation => {
425 type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
426 }
427 Rule::stream_expr => {
428 let (s, o, spans) = parse_stream_expr(p)?;
429 source = s;
430 ops = o;
431 op_spans = spans;
432 }
433 _ => {}
434 }
435 }
436
437 Ok(Stmt::StreamDecl {
438 name,
439 type_annotation,
440 source,
441 ops,
442 op_spans,
443 })
444}
445
446fn parse_stream_expr(
447 pair: pest::iterators::Pair<Rule>,
448) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
449 let mut inner = pair.into_inner();
450 let source = parse_stream_source(inner.expect_next("stream source")?)?;
451 let mut ops = Vec::new();
452 let mut op_spans = Vec::new();
453
454 for p in inner {
455 if p.as_rule() == Rule::stream_op {
456 let pest_span = p.as_span();
457 let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
458 ops.push(parse_stream_op(p)?);
459 op_spans.push(span);
460 }
461 }
462
463 Ok((source, ops, op_spans))
464}
465
466fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
471 let mut inner = pair.into_inner();
472 let name = inner.expect_next("pattern name")?.as_str().to_string();
473
474 let mut expr = SasePatternExpr::Event("".to_string());
475 let mut within = None;
476 let mut partition_by = None;
477
478 for p in inner {
479 match p.as_rule() {
480 Rule::sase_pattern_expr => {
481 expr = parse_sase_pattern_expr(p)?;
482 }
483 Rule::pattern_within_clause => {
484 let dur_pair = p.into_inner().expect_next("within duration")?;
485 within = Some(Expr::Duration(
486 parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
487 ));
488 }
489 Rule::pattern_partition_clause => {
490 let key = p
491 .into_inner()
492 .expect_next("partition key")?
493 .as_str()
494 .to_string();
495 partition_by = Some(Expr::Ident(key));
496 }
497 _ => {}
498 }
499 }
500
501 Ok(Stmt::PatternDecl {
502 name,
503 expr,
504 within,
505 partition_by,
506 })
507}
508
509fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
510 let inner = pair.into_inner().expect_next("SASE pattern expression")?;
511 parse_sase_or_expr(inner)
512}
513
514fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
515 let mut inner = pair.into_inner();
516 let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
517
518 for right_pair in inner {
519 let right = parse_sase_and_expr(right_pair)?;
520 left = SasePatternExpr::Or(Box::new(left), Box::new(right));
521 }
522
523 Ok(left)
524}
525
526fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
527 let mut inner = pair.into_inner();
528 let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
529
530 for right_pair in inner {
531 let right = parse_sase_not_expr(right_pair)?;
532 left = SasePatternExpr::And(Box::new(left), Box::new(right));
533 }
534
535 Ok(left)
536}
537
538fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
539 let mut inner = pair.into_inner();
540 let first = inner.expect_next("NOT or primary expression")?;
541
542 if first.as_str() == "NOT" {
543 let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
544 Ok(SasePatternExpr::Not(Box::new(expr)))
545 } else {
546 parse_sase_primary_expr(first)
547 }
548}
549
550fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
551 let inner = pair.into_inner().expect_next("SASE primary expression")?;
552
553 match inner.as_rule() {
554 Rule::sase_seq_expr => parse_sase_seq_expr(inner),
555 Rule::sase_grouped_expr => {
556 let nested = inner.into_inner().expect_next("grouped expression")?;
557 let expr = parse_sase_pattern_expr(nested)?;
558 Ok(SasePatternExpr::Group(Box::new(expr)))
559 }
560 Rule::sase_event_ref => parse_sase_event_ref(inner),
561 _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
562 }
563}
564
565fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
566 let mut items = Vec::new();
567
568 for p in pair.into_inner() {
569 if p.as_rule() == Rule::sase_seq_items {
570 for item in p.into_inner() {
571 if item.as_rule() == Rule::sase_seq_item {
572 items.push(parse_sase_seq_item(item)?);
573 }
574 }
575 }
576 }
577
578 Ok(SasePatternExpr::Seq(items))
579}
580
581fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
582 let inner = pair.into_inner().expect_next("sequence item")?;
583
584 match inner.as_rule() {
585 Rule::sase_negated_item => parse_sase_item_inner(inner, true),
586 Rule::sase_positive_item => parse_sase_item_inner(inner, false),
587 _ => parse_sase_item_inner(inner, false),
588 }
589}
590
591fn parse_sase_item_inner(
592 pair: pest::iterators::Pair<Rule>,
593 _negated: bool,
594) -> ParseResult<SasePatternItem> {
595 let mut inner = pair.into_inner();
596 let event_type = inner.expect_next("event type")?.as_str().to_string();
597
598 let mut kleene = None;
599 let mut filter = None;
600 let mut alias = None;
601
602 for p in inner {
603 match p.as_rule() {
604 Rule::kleene_op => {
605 kleene = Some(match p.as_str() {
606 "+" => KleeneOp::Plus,
607 "*" => KleeneOp::Star,
608 "?" => KleeneOp::Optional,
609 _ => KleeneOp::Plus,
610 });
611 }
612 Rule::sase_where_clause => {
613 filter = Some(parse_expr(
614 p.into_inner().expect_next("filter expression")?,
615 )?);
616 }
617 Rule::sase_alias_clause => {
618 alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
619 }
620 _ => {}
621 }
622 }
623
624 let event_type = if _negated {
627 format!("!{}", event_type)
628 } else {
629 event_type
630 };
631
632 Ok(SasePatternItem {
633 event_type,
634 alias,
635 kleene,
636 filter,
637 })
638}
639
640fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
641 let item = parse_sase_item_inner(pair, false)?;
643
644 if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
646 Ok(SasePatternExpr::Event(item.event_type))
647 } else {
648 Ok(SasePatternExpr::Seq(vec![item]))
650 }
651}
652
653fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
654 let inner = pair.into_inner().expect_next("stream source type")?;
655
656 match inner.as_rule() {
657 Rule::from_connector_source => {
658 let mut inner_iter = inner.into_inner();
659 let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
660 let connector_name = inner_iter
661 .expect_next("connector name")?
662 .as_str()
663 .to_string();
664 let mut params = Vec::new();
665 for p in inner_iter {
666 if p.as_rule() == Rule::connector_params {
667 params = parse_connector_params(p)?;
668 }
669 }
670 Ok(StreamSource::FromConnector {
671 event_type,
672 connector_name,
673 params,
674 })
675 }
676 Rule::merge_source => {
677 let mut streams = Vec::new();
678 for p in inner.into_inner() {
679 if p.as_rule() == Rule::inline_stream_list {
680 for is in p.into_inner() {
681 streams.push(parse_inline_stream(is)?);
682 }
683 }
684 }
685 Ok(StreamSource::Merge(streams))
686 }
687 Rule::join_source
688 | Rule::left_join_source
689 | Rule::right_join_source
690 | Rule::full_join_source => {
691 let join_type = match inner.as_rule() {
692 Rule::left_join_source => varpulis_core::ast::JoinType::Left,
693 Rule::right_join_source => varpulis_core::ast::JoinType::Right,
694 Rule::full_join_source => varpulis_core::ast::JoinType::Full,
695 _ => varpulis_core::ast::JoinType::Inner,
696 };
697 let mut clauses = Vec::new();
698 for p in inner.into_inner() {
699 if p.as_rule() == Rule::join_clause_list {
700 for jc in p.into_inner() {
701 clauses.push(parse_join_clause(jc, join_type)?);
702 }
703 }
704 }
705 Ok(StreamSource::Join(clauses))
706 }
707 Rule::sequence_source => {
708 let decl =
709 parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
710 Ok(StreamSource::Sequence(decl))
711 }
712 Rule::timer_source => {
713 let timer_args = inner.into_inner().expect_next("timer arguments")?;
714 let decl = parse_timer_decl(timer_args)?;
715 Ok(StreamSource::Timer(decl))
716 }
717 Rule::all_source => {
718 let mut inner_iter = inner.into_inner();
719 let name = inner_iter.expect_next("event name")?.as_str().to_string();
720 let alias = inner_iter.next().map(|p| p.as_str().to_string());
721 Ok(StreamSource::AllWithAlias { name, alias })
722 }
723 Rule::aliased_source => {
724 let mut inner_iter = inner.into_inner();
725 let name = inner_iter.expect_next("event name")?.as_str().to_string();
726 let alias = inner_iter.expect_next("alias")?.as_str().to_string();
727 Ok(StreamSource::IdentWithAlias { name, alias })
728 }
729 Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
730 _ => Err(ParseError::UnexpectedToken {
731 position: 0,
732 expected: "stream source".to_string(),
733 found: format!("{:?}", inner.as_rule()),
734 }),
735 }
736}
737
738fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
739 let mut inner = pair.into_inner();
740
741 let first = inner.expect_next("stream identifier")?;
743 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
744 let name = first.as_str().to_string();
745 return Ok(InlineStreamDecl {
746 name: name.clone(),
747 source: name,
748 filter: None,
749 });
750 }
751
752 let name = first.as_str().to_string();
753 let source = inner.expect_next("stream source")?.as_str().to_string();
754 let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
755
756 Ok(InlineStreamDecl {
757 name,
758 source,
759 filter,
760 })
761}
762
763fn parse_join_clause(
764 pair: pest::iterators::Pair<Rule>,
765 join_type: varpulis_core::ast::JoinType,
766) -> ParseResult<JoinClause> {
767 let mut inner = pair.into_inner();
768
769 let first = inner.expect_next("join clause identifier")?;
770 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
771 let name = first.as_str().to_string();
772 return Ok(JoinClause {
773 name: name.clone(),
774 source: name,
775 on: None,
776 join_type,
777 });
778 }
779
780 let name = first.as_str().to_string();
781 let source = inner.expect_next("join source")?.as_str().to_string();
782 let on = inner.next().map(|p| parse_expr(p)).transpose()?;
783
784 Ok(JoinClause {
785 name,
786 source,
787 on,
788 join_type,
789 })
790}
791
792fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
793 let mut steps = Vec::new();
794
795 for p in pair.into_inner() {
796 if p.as_rule() == Rule::sequence_step {
797 steps.push(parse_sequence_step(p)?);
798 }
799 }
800
801 Ok(SequenceDecl {
802 match_all: false,
803 timeout: None,
804 steps,
805 })
806}
807
808fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
809 let mut inner = pair.into_inner();
810 let alias = inner.expect_next("step alias")?.as_str().to_string();
811 let event_type = inner.expect_next("event type")?.as_str().to_string();
812
813 let mut filter = None;
814 let mut timeout = None;
815
816 for p in inner {
817 match p.as_rule() {
818 Rule::or_expr => filter = Some(parse_expr(p)?),
819 Rule::within_suffix => {
820 let expr = p.into_inner().expect_next("within duration")?;
821 timeout = Some(Box::new(parse_expr(expr)?));
822 }
823 _ => {}
824 }
825 }
826
827 Ok(SequenceStepDecl {
828 alias,
829 event_type,
830 filter,
831 timeout,
832 })
833}
834
835fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
836 let mut inner = pair.into_inner();
837
838 let interval = parse_expr(inner.expect_next("timer interval")?)?;
840
841 let mut initial_delay = None;
843 for p in inner {
844 if p.as_rule() == Rule::named_arg {
845 let arg = parse_named_arg(p)?;
846 if arg.name == "initial_delay" {
847 initial_delay = Some(Box::new(arg.value));
848 }
849 }
850 }
851
852 Ok(TimerDecl {
853 interval,
854 initial_delay,
855 })
856}
857
858fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
859 let inner = pair.into_inner().expect_next("stream operation")?;
860
861 match inner.as_rule() {
862 Rule::dot_op => {
863 let op_inner = inner.into_inner().expect_next("dot operation")?;
864 parse_dot_op(op_inner)
865 }
866 Rule::followed_by_op => parse_followed_by_op(inner),
867 _ => Err(ParseError::UnexpectedToken {
868 position: 0,
869 expected: "stream operation".to_string(),
870 found: format!("{:?}", inner.as_rule()),
871 }),
872 }
873}
874
875fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
876 match pair.as_rule() {
877 Rule::context_op => {
878 let name = pair
879 .into_inner()
880 .expect_next("context name")?
881 .as_str()
882 .to_string();
883 Ok(StreamOp::Context(name))
884 }
885 Rule::where_op => {
886 let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
887 Ok(StreamOp::Where(expr))
888 }
889 Rule::select_op => {
890 let mut items = Vec::new();
891 for p in pair.into_inner() {
892 if p.as_rule() == Rule::select_list {
893 for si in p.into_inner() {
894 items.push(parse_select_item(si)?);
895 }
896 }
897 }
898 Ok(StreamOp::Select(items))
899 }
900 Rule::window_op => {
901 let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
902 Ok(StreamOp::Window(args))
903 }
904 Rule::aggregate_op => {
905 let mut items = Vec::new();
906 for p in pair.into_inner() {
907 if p.as_rule() == Rule::agg_list {
908 for ai in p.into_inner() {
909 items.push(parse_agg_item(ai)?);
910 }
911 }
912 }
913 Ok(StreamOp::Aggregate(items))
914 }
915 Rule::having_op => {
916 let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
917 Ok(StreamOp::Having(expr))
918 }
919 Rule::map_op => {
920 let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
921 Ok(StreamOp::Map(expr))
922 }
923 Rule::filter_op => {
924 let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
925 Ok(StreamOp::Filter(expr))
926 }
927 Rule::within_op => {
928 let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
929 Ok(StreamOp::Within(expr))
930 }
931 Rule::emit_op => {
932 let mut output_type = None;
933 let mut fields = Vec::new();
934 let mut target_context = None;
935 for p in pair.into_inner() {
936 match p.as_rule() {
937 Rule::emit_type_cast => {
938 output_type = Some(
939 p.into_inner()
940 .expect_next("type name")?
941 .as_str()
942 .to_string(),
943 );
944 }
945 Rule::named_arg_list => {
946 for arg in p.into_inner() {
947 let parsed = parse_named_arg(arg)?;
948 if parsed.name == "context" {
950 if let Expr::Ident(ctx_name) = &parsed.value {
951 target_context = Some(ctx_name.clone());
952 continue;
953 }
954 }
955 fields.push(parsed);
956 }
957 }
958 _ => {}
959 }
960 }
961 Ok(StreamOp::Emit {
962 output_type,
963 fields,
964 target_context,
965 })
966 }
967 Rule::print_op => {
968 let exprs = pair
969 .into_inner()
970 .filter(|p| p.as_rule() == Rule::expr_list)
971 .flat_map(|p| p.into_inner())
972 .map(parse_expr)
973 .collect::<ParseResult<Vec<_>>>()?;
974 Ok(StreamOp::Print(exprs))
975 }
976 Rule::collect_op => Ok(StreamOp::Collect),
977 Rule::pattern_op => {
978 let def_pair = pair.into_inner().expect_next("pattern definition")?;
979 let mut inner = def_pair.into_inner();
980 let name = inner.expect_next("pattern name")?.as_str().to_string();
981 let body_pair = inner.expect_next("pattern body")?;
982
983 let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
985 let matcher = match body_inner.as_rule() {
986 Rule::lambda_expr => parse_lambda_expr(body_inner)?,
987 Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
988 _ => parse_expr_inner(body_inner)?,
989 };
990 Ok(StreamOp::Pattern(PatternDef { name, matcher }))
991 }
992 Rule::partition_by_op => {
993 let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
994 Ok(StreamOp::PartitionBy(expr))
995 }
996 Rule::order_by_op => {
997 let mut items = Vec::new();
998 for p in pair.into_inner() {
999 if p.as_rule() == Rule::order_list {
1000 for oi in p.into_inner() {
1001 items.push(parse_order_item(oi)?);
1002 }
1003 }
1004 }
1005 Ok(StreamOp::OrderBy(items))
1006 }
1007 Rule::limit_op => {
1008 let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1009 Ok(StreamOp::Limit(expr))
1010 }
1011 Rule::distinct_op => {
1012 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1013 Ok(StreamOp::Distinct(expr))
1014 }
1015 Rule::tap_op => {
1016 let args = pair
1017 .into_inner()
1018 .filter(|p| p.as_rule() == Rule::named_arg_list)
1019 .flat_map(|p| p.into_inner())
1020 .map(parse_named_arg)
1021 .collect::<ParseResult<Vec<_>>>()?;
1022 Ok(StreamOp::Tap(args))
1023 }
1024 Rule::log_op => {
1025 let args = pair
1026 .into_inner()
1027 .filter(|p| p.as_rule() == Rule::named_arg_list)
1028 .flat_map(|p| p.into_inner())
1029 .map(parse_named_arg)
1030 .collect::<ParseResult<Vec<_>>>()?;
1031 Ok(StreamOp::Log(args))
1032 }
1033 Rule::to_op => {
1034 let mut inner = pair.into_inner();
1035 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1036 let mut params = Vec::new();
1037 for p in inner {
1038 if p.as_rule() == Rule::connector_params {
1039 params = parse_connector_params(p)?;
1040 }
1041 }
1042 Ok(StreamOp::To {
1043 connector_name,
1044 params,
1045 })
1046 }
1047 Rule::process_op => {
1048 let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1049 Ok(StreamOp::Process(expr))
1050 }
1051 Rule::on_error_op => {
1052 let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1053 Ok(StreamOp::OnError(expr))
1054 }
1055 Rule::on_op => {
1056 let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1057 Ok(StreamOp::On(expr))
1058 }
1059 Rule::not_op => {
1060 let mut inner = pair.into_inner();
1061 let event_type = inner.expect_next("event type")?.as_str().to_string();
1062 let filter = inner.next().map(parse_expr).transpose()?;
1063 Ok(StreamOp::Not(FollowedByClause {
1064 event_type,
1065 filter,
1066 alias: None,
1067 match_all: false,
1068 }))
1069 }
1070 Rule::fork_op => {
1071 let mut paths = Vec::new();
1072 for p in pair.into_inner() {
1073 if p.as_rule() == Rule::fork_path_list {
1074 for fp in p.into_inner() {
1075 paths.push(parse_fork_path(fp)?);
1076 }
1077 }
1078 }
1079 Ok(StreamOp::Fork(paths))
1080 }
1081 Rule::any_op => {
1082 let count = pair
1083 .into_inner()
1084 .next()
1085 .map(|p| p.as_str().parse().unwrap_or(1));
1086 Ok(StreamOp::Any(count))
1087 }
1088 Rule::all_op => Ok(StreamOp::All),
1089 Rule::first_op => Ok(StreamOp::First),
1090 Rule::concurrent_op => {
1091 let args = pair
1092 .into_inner()
1093 .filter(|p| p.as_rule() == Rule::named_arg_list)
1094 .flat_map(|p| p.into_inner())
1095 .map(parse_named_arg)
1096 .collect::<ParseResult<Vec<_>>>()?;
1097 Ok(StreamOp::Concurrent(args))
1098 }
1099 Rule::watermark_op => {
1100 let args = pair
1101 .into_inner()
1102 .filter(|p| p.as_rule() == Rule::named_arg_list)
1103 .flat_map(|p| p.into_inner())
1104 .map(parse_named_arg)
1105 .collect::<ParseResult<Vec<_>>>()?;
1106 Ok(StreamOp::Watermark(args))
1107 }
1108 Rule::allowed_lateness_op => {
1109 let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1110 Ok(StreamOp::AllowedLateness(expr))
1111 }
1112 Rule::trend_aggregate_op => {
1113 let mut items = Vec::new();
1114 for p in pair.into_inner() {
1115 if p.as_rule() == Rule::trend_agg_list {
1116 for item_pair in p.into_inner() {
1117 if item_pair.as_rule() == Rule::trend_agg_item {
1118 let mut inner = item_pair.into_inner();
1119 let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1120 let func_pair = inner.expect_next("trend agg function")?;
1121 let mut func_inner = func_pair.into_inner();
1122 let func_name = func_inner
1123 .expect_next("function name")?
1124 .as_str()
1125 .to_string();
1126 let arg = func_inner.next().map(parse_expr).transpose()?;
1127 items.push(TrendAggItem {
1128 alias,
1129 func: func_name,
1130 arg,
1131 });
1132 }
1133 }
1134 }
1135 }
1136 Ok(StreamOp::TrendAggregate(items))
1137 }
1138 Rule::forecast_op => {
1139 let mut confidence = None;
1140 let mut horizon = None;
1141 let mut warmup = None;
1142 let mut max_depth = None;
1143 let mut hawkes = None;
1144 let mut conformal = None;
1145 let mut mode = None;
1146 for p in pair.into_inner() {
1147 if p.as_rule() == Rule::forecast_params {
1148 for param_pair in p.into_inner() {
1149 if param_pair.as_rule() == Rule::forecast_param {
1150 let mut inner = param_pair.into_inner();
1151 let name = inner.expect_next("forecast param name")?.as_str();
1152 let value_pair = inner.expect_next("forecast param value")?;
1153 let expr = parse_expr(value_pair)?;
1154 match name {
1155 "confidence" => confidence = Some(expr),
1156 "horizon" => horizon = Some(expr),
1157 "warmup" => warmup = Some(expr),
1158 "max_depth" => max_depth = Some(expr),
1159 "hawkes" => hawkes = Some(expr),
1160 "conformal" => conformal = Some(expr),
1161 "mode" => mode = Some(expr),
1162 _ => {}
1163 }
1164 }
1165 }
1166 }
1167 }
1168 Ok(StreamOp::Forecast(ForecastSpec {
1169 confidence,
1170 horizon,
1171 warmup,
1172 max_depth,
1173 hawkes,
1174 conformal,
1175 mode,
1176 }))
1177 }
1178 Rule::enrich_op => {
1179 let mut inner = pair.into_inner();
1180 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1181 let mut key_expr = None;
1182 let mut fields = Vec::new();
1183 let mut cache_ttl = None;
1184 let mut timeout = None;
1185 let mut fallback = None;
1186 for p in inner {
1187 if p.as_rule() == Rule::enrich_params {
1188 for param_pair in p.into_inner() {
1189 if param_pair.as_rule() == Rule::enrich_param {
1190 let param_inner =
1191 param_pair.into_inner().expect_next("enrich param")?;
1192 match param_inner.as_rule() {
1193 Rule::enrich_key_param => {
1194 let expr_pair =
1195 param_inner.into_inner().expect_next("key expression")?;
1196 key_expr = Some(parse_expr(expr_pair)?);
1197 }
1198 Rule::enrich_fields_param => {
1199 for field in param_inner.into_inner() {
1200 if field.as_rule() == Rule::identifier {
1201 fields.push(field.as_str().to_string());
1202 }
1203 }
1204 }
1205 Rule::enrich_cache_ttl_param => {
1206 let expr_pair = param_inner
1207 .into_inner()
1208 .expect_next("cache_ttl expression")?;
1209 cache_ttl = Some(parse_expr(expr_pair)?);
1210 }
1211 Rule::enrich_timeout_param => {
1212 let expr_pair = param_inner
1213 .into_inner()
1214 .expect_next("timeout expression")?;
1215 timeout = Some(parse_expr(expr_pair)?);
1216 }
1217 Rule::enrich_fallback_param => {
1218 let literal_pair =
1219 param_inner.into_inner().expect_next("fallback literal")?;
1220 fallback = Some(parse_expr(literal_pair)?);
1221 }
1222 _ => {}
1223 }
1224 }
1225 }
1226 }
1227 }
1228 let key = key_expr.ok_or_else(|| ParseError::Located {
1229 line: 0,
1230 column: 0,
1231 position: 0,
1232 message: ".enrich() requires a key: parameter".to_string(),
1233 hint: Some("add key: <expression> to .enrich()".to_string()),
1234 })?;
1235 Ok(StreamOp::Enrich(EnrichSpec {
1236 connector_name,
1237 key_expr: Box::new(key),
1238 fields,
1239 cache_ttl,
1240 timeout,
1241 fallback,
1242 }))
1243 }
1244 Rule::score_op => {
1245 let mut model_path = String::new();
1246 let mut inputs = Vec::new();
1247 let mut outputs = Vec::new();
1248 let mut gpu = false;
1249 let mut batch_size: usize = 1;
1250 let mut device_id: i32 = 0;
1251 for p in pair.into_inner() {
1252 if p.as_rule() == Rule::score_params {
1253 for param_pair in p.into_inner() {
1254 if param_pair.as_rule() == Rule::score_param {
1255 let mut inner = param_pair.into_inner();
1256 let name = inner.expect_next("score param name")?.as_str();
1257 let value_pair = inner.expect_next("score param value")?;
1258 match name {
1259 "model" => {
1260 let raw = value_pair.as_str();
1261 model_path = raw.trim_matches('"').to_string();
1262 }
1263 "inputs" => {
1264 if value_pair.as_rule() == Rule::score_field_list {
1265 for field in value_pair.into_inner() {
1266 if field.as_rule() == Rule::identifier {
1267 inputs.push(field.as_str().to_string());
1268 }
1269 }
1270 }
1271 }
1272 "outputs" => {
1273 if value_pair.as_rule() == Rule::score_field_list {
1274 for field in value_pair.into_inner() {
1275 if field.as_rule() == Rule::identifier {
1276 outputs.push(field.as_str().to_string());
1277 }
1278 }
1279 }
1280 }
1281 "gpu" => {
1282 gpu = value_pair.as_str() == "true";
1283 }
1284 "batch_size" => {
1285 batch_size = value_pair.as_str().parse().unwrap_or(1);
1286 }
1287 "device" | "device_id" => {
1288 device_id = value_pair.as_str().parse().unwrap_or(0);
1289 }
1290 _ => {}
1291 }
1292 }
1293 }
1294 }
1295 }
1296 Ok(StreamOp::Score(ScoreSpec {
1297 model_path,
1298 inputs,
1299 outputs,
1300 gpu,
1301 batch_size,
1302 device_id,
1303 }))
1304 }
1305 _ => Err(ParseError::UnexpectedToken {
1306 position: 0,
1307 expected: "stream operation".to_string(),
1308 found: format!("{:?}", pair.as_rule()),
1309 }),
1310 }
1311}
1312
1313fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1314 let mut inner = pair.into_inner();
1315 let expr = parse_expr(inner.expect_next("order expression")?)?;
1316 let desc = inner.next().map(|p| p.as_str() == "desc").unwrap_or(false);
1317 Ok(OrderItem {
1318 expr,
1319 descending: desc,
1320 })
1321}
1322
1323fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1324 let mut inner = pair.into_inner();
1325 let name = inner.expect_next("fork path name")?.as_str().to_string();
1326 let mut ops = Vec::new();
1327 for p in inner {
1328 if p.as_rule() == Rule::stream_op {
1329 ops.push(parse_stream_op(p)?);
1330 }
1331 }
1332 Ok(ForkPath { name, ops })
1333}
1334
1335fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1336 let mut inner = pair.into_inner();
1337 let mut match_all = false;
1338
1339 let first = inner.expect_next("event type or match_all")?;
1340 let event_type = if first.as_rule() == Rule::match_all_keyword {
1341 match_all = true;
1342 inner.expect_next("event type")?.as_str().to_string()
1343 } else {
1344 first.as_str().to_string()
1345 };
1346
1347 let mut filter = None;
1348 let mut alias = None;
1349
1350 for p in inner {
1351 match p.as_rule() {
1352 Rule::or_expr => filter = Some(parse_or_expr(p)?),
1353 Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1354 Rule::identifier => alias = Some(p.as_str().to_string()),
1355 _ => {}
1356 }
1357 }
1358
1359 Ok(StreamOp::FollowedBy(FollowedByClause {
1360 event_type,
1361 filter,
1362 alias,
1363 match_all,
1364 }))
1365}
1366
1367fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1368 let mut inner = pair.into_inner();
1369 let first = inner.expect_next("select field or alias")?;
1370
1371 if let Some(second) = inner.next() {
1372 Ok(SelectItem::Alias(
1373 first.as_str().to_string(),
1374 parse_expr(second)?,
1375 ))
1376 } else {
1377 Ok(SelectItem::Field(first.as_str().to_string()))
1378 }
1379}
1380
1381fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1382 let raw = pair.as_str().trim();
1383 let is_session = raw.starts_with("session");
1384
1385 let mut inner = pair.into_inner();
1386
1387 if is_session {
1388 let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1390 return Ok(WindowArgs {
1391 duration: gap_expr.clone(),
1392 sliding: None,
1393 policy: None,
1394 session_gap: Some(gap_expr),
1395 });
1396 }
1397
1398 let duration = parse_expr(inner.expect_next("window duration")?)?;
1399
1400 let mut sliding = None;
1401 let mut policy = None;
1402
1403 for p in inner {
1404 if p.as_rule() == Rule::expr {
1405 if sliding.is_none() {
1407 sliding = Some(parse_expr(p)?);
1408 } else {
1409 policy = Some(parse_expr(p)?);
1410 }
1411 }
1412 }
1413
1414 Ok(WindowArgs {
1415 duration,
1416 sliding,
1417 policy,
1418 session_gap: None,
1419 })
1420}
1421
1422fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1423 let mut inner = pair.into_inner();
1424 let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1425 let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1426 Ok(AggItem { alias, expr })
1427}
1428
1429fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1430 let mut inner = pair.into_inner();
1431 let name = inner.expect_next("argument name")?.as_str().to_string();
1432 let value = parse_expr(inner.expect_next("argument value")?)?;
1433 Ok(NamedArg { name, value })
1434}
1435
1436fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1437 let mut inner = pair.into_inner();
1438 let name = inner.expect_next("event name")?.as_str().to_string();
1439
1440 let mut extends = None;
1441 let mut fields = Vec::new();
1442
1443 for p in inner {
1444 match p.as_rule() {
1445 Rule::identifier => extends = Some(p.as_str().to_string()),
1446 Rule::field => fields.push(parse_field(p)?),
1447 _ => {}
1448 }
1449 }
1450
1451 Ok(Stmt::EventDecl {
1452 name,
1453 extends,
1454 fields,
1455 })
1456}
1457
1458fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1459 let mut inner = pair.into_inner();
1460 let name = inner.expect_next("field name")?.as_str().to_string();
1461 let ty = parse_type(inner.expect_next("field type")?)?;
1462 let optional = inner.next().is_some();
1463 Ok(Field { name, ty, optional })
1464}
1465
1466fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1467 let mut inner = pair.into_inner();
1468 let name = inner.expect_next("type name")?.as_str().to_string();
1469 let ty = parse_type(inner.expect_next("type definition")?)?;
1470 Ok(Stmt::TypeDecl { name, ty })
1471}
1472
1473fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1474 let cloned = pair.clone();
1475 let inner = pair.into_inner().next().unwrap_or(cloned);
1476
1477 match inner.as_rule() {
1478 Rule::primitive_type => match inner.as_str() {
1479 "int" => Ok(Type::Int),
1480 "float" => Ok(Type::Float),
1481 "bool" => Ok(Type::Bool),
1482 "str" => Ok(Type::Str),
1483 "timestamp" => Ok(Type::Timestamp),
1484 "duration" => Ok(Type::Duration),
1485 _ => Ok(Type::Named(inner.as_str().to_string())),
1486 },
1487 Rule::array_type => {
1488 let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1489 Ok(Type::Array(Box::new(inner_type)))
1490 }
1491 Rule::map_type => {
1492 let mut inner_pairs = inner.into_inner();
1493 let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1494 let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1495 Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1496 }
1497 Rule::tuple_type => {
1498 let types: Vec<Type> = inner
1499 .into_inner()
1500 .map(parse_type)
1501 .collect::<ParseResult<Vec<_>>>()?;
1502 Ok(Type::Tuple(types))
1503 }
1504 Rule::stream_type => {
1505 let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1506 Ok(Type::Stream(Box::new(inner_type)))
1507 }
1508 Rule::optional_type => {
1509 let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1510 Ok(Type::Optional(Box::new(inner_type)))
1511 }
1512 Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1513 _ => Ok(Type::Named(inner.as_str().to_string())),
1514 }
1515}
1516
1517fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1518 let mut inner = pair.into_inner();
1519 let keyword = inner.expect_next("var_keyword")?.as_str();
1520 let mutable = keyword == "var";
1521 let name = inner.expect_next("variable name")?.as_str().to_string();
1522
1523 let mut ty = None;
1524 let mut value = Expr::Null;
1525
1526 for p in inner {
1527 match p.as_rule() {
1528 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1529 _ => value = parse_expr(p)?,
1530 }
1531 }
1532
1533 Ok(Stmt::VarDecl {
1534 mutable,
1535 name,
1536 ty,
1537 value,
1538 })
1539}
1540
1541fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1542 let mut inner = pair.into_inner();
1543 let name = inner.expect_next("constant name")?.as_str().to_string();
1544
1545 let mut ty = None;
1546 let mut value = Expr::Null;
1547
1548 for p in inner {
1549 match p.as_rule() {
1550 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1551 _ => value = parse_expr(p)?,
1552 }
1553 }
1554
1555 Ok(Stmt::ConstDecl { name, ty, value })
1556}
1557
1558fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1559 let mut inner = pair.into_inner();
1560 let name = inner.expect_next("function name")?.as_str().to_string();
1561
1562 let mut params = Vec::new();
1563 let mut ret = None;
1564 let mut body = Vec::new();
1565
1566 for p in inner {
1567 match p.as_rule() {
1568 Rule::param_list => {
1569 for param in p.into_inner() {
1570 params.push(parse_param(param)?);
1571 }
1572 }
1573 Rule::type_expr => ret = Some(parse_type(p)?),
1574 Rule::block => body = parse_block(p)?,
1575 Rule::statement => body.push(parse_statement(p)?),
1576 _ => {}
1577 }
1578 }
1579
1580 Ok(Stmt::FnDecl {
1581 name,
1582 params,
1583 ret,
1584 body,
1585 })
1586}
1587
1588fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1589 let mut statements = Vec::new();
1590 for p in pair.into_inner() {
1591 if p.as_rule() == Rule::statement {
1592 statements.push(parse_statement(p)?);
1593 }
1594 }
1595 Ok(statements)
1596}
1597
1598fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1599 let mut inner = pair.into_inner();
1600 let name = inner.expect_next("parameter name")?.as_str().to_string();
1601 let ty = parse_type(inner.expect_next("parameter type")?)?;
1602 Ok(Param { name, ty })
1603}
1604
1605fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1606 let mut inner = pair.into_inner();
1607 let first = inner.expect_next("config name or item")?;
1608
1609 let (name, items_start) = if first.as_rule() == Rule::identifier {
1611 (first.as_str().to_string(), None)
1612 } else {
1613 ("default".to_string(), Some(first))
1615 };
1616
1617 let mut items = Vec::new();
1618
1619 if let Some(first_item) = items_start {
1621 if first_item.as_rule() == Rule::config_item {
1622 items.push(parse_config_item(first_item)?);
1623 }
1624 }
1625
1626 for p in inner {
1627 if p.as_rule() == Rule::config_item {
1628 items.push(parse_config_item(p)?);
1629 }
1630 }
1631 Ok(Stmt::Config { name, items })
1632}
1633
1634fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1635 let mut inner = pair.into_inner();
1636 let key = inner.expect_next("config key")?.as_str().to_string();
1637 let value = parse_config_value(inner.expect_next("config value")?)?;
1638 Ok(ConfigItem::Value(key, value))
1639}
1640
1641fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1642 let cloned = pair.clone();
1643 let inner = pair.into_inner().next().unwrap_or(cloned);
1644
1645 match inner.as_rule() {
1646 Rule::config_array => {
1647 let values: Vec<ConfigValue> = inner
1648 .into_inner()
1649 .map(parse_config_value)
1650 .collect::<ParseResult<Vec<_>>>()?;
1651 Ok(ConfigValue::Array(values))
1652 }
1653 Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1654 Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1655 Rule::string => {
1656 let s = inner.as_str();
1657 Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1658 }
1659 Rule::duration => Ok(ConfigValue::Duration(
1660 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1661 )),
1662 Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1663 Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1664 _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1665 }
1666}
1667
1668fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1669 let mut inner = pair.into_inner();
1670 let path_pair = inner.expect_next("import path")?;
1671 let path = path_pair.as_str();
1672 let path = path[1..path.len() - 1].to_string();
1673 let alias = inner.next().map(|p| p.as_str().to_string());
1674 Ok(Stmt::Import { path, alias })
1675}
1676
1677fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1678 let mut inner = pair.into_inner();
1679 let cond = parse_expr(inner.expect_next("if condition")?)?;
1680
1681 let mut then_branch = Vec::new();
1682 let mut elif_branches = Vec::new();
1683 let mut else_branch = None;
1684
1685 for p in inner {
1686 match p.as_rule() {
1687 Rule::block => then_branch = parse_block(p)?,
1688 Rule::statement => then_branch.push(parse_statement(p)?),
1689 Rule::elif_clause => {
1690 let mut elif_inner = p.into_inner();
1691 let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1692 let mut elif_body = Vec::new();
1693 for ep in elif_inner {
1694 match ep.as_rule() {
1695 Rule::block => elif_body = parse_block(ep)?,
1696 Rule::statement => elif_body.push(parse_statement(ep)?),
1697 _ => {}
1698 }
1699 }
1700 elif_branches.push((elif_cond, elif_body));
1701 }
1702 Rule::else_clause => {
1703 let mut else_body = Vec::new();
1704 for ep in p.into_inner() {
1705 match ep.as_rule() {
1706 Rule::block => else_body = parse_block(ep)?,
1707 Rule::statement => else_body.push(parse_statement(ep)?),
1708 _ => {}
1709 }
1710 }
1711 else_branch = Some(else_body);
1712 }
1713 _ => {}
1714 }
1715 }
1716
1717 Ok(Stmt::If {
1718 cond,
1719 then_branch,
1720 elif_branches,
1721 else_branch,
1722 })
1723}
1724
1725fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1726 let mut inner = pair.into_inner();
1727 let var = inner.expect_next("loop variable")?.as_str().to_string();
1728 let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1729 let mut body = Vec::new();
1730 for p in inner {
1731 match p.as_rule() {
1732 Rule::block => body = parse_block(p)?,
1733 Rule::statement => body.push(parse_statement(p)?),
1734 _ => {}
1735 }
1736 }
1737 Ok(Stmt::For { var, iter, body })
1738}
1739
1740fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1741 let mut inner = pair.into_inner();
1742 let cond = parse_expr(inner.expect_next("while condition")?)?;
1743 let mut body = Vec::new();
1744 for p in inner {
1745 match p.as_rule() {
1746 Rule::block => body = parse_block(p)?,
1747 Rule::statement => body.push(parse_statement(p)?),
1748 _ => {}
1749 }
1750 }
1751 Ok(Stmt::While { cond, body })
1752}
1753
1754fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1755 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1756 Ok(Stmt::Return(expr))
1757}
1758
1759fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1760 let mut inner = pair.into_inner();
1761 let event_type = inner.expect_next("event type name")?.as_str().to_string();
1762 let mut fields = Vec::new();
1763 for p in inner {
1764 if p.as_rule() == Rule::named_arg_list {
1765 for arg in p.into_inner() {
1766 fields.push(parse_named_arg(arg)?);
1767 }
1768 }
1769 }
1770 Ok(Stmt::Emit { event_type, fields })
1771}
1772
1773fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1778 let inner = pair.into_inner().next();
1779
1780 match inner {
1781 Some(p) => parse_expr_inner(p),
1782 None => Ok(Expr::Null),
1783 }
1784}
1785
1786fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1787 match pair.as_rule() {
1788 Rule::expr => parse_expr(pair),
1789 Rule::lambda_expr => parse_lambda_expr(pair),
1790 Rule::range_expr => parse_range_expr(pair),
1791 Rule::or_expr => parse_or_expr(pair),
1792 Rule::and_expr => parse_and_expr(pair),
1793 Rule::not_expr => parse_not_expr(pair),
1794 Rule::comparison_expr => parse_comparison_expr(pair),
1795 Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1796 Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1797 Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1798 Rule::shift_expr => parse_shift_expr(pair),
1799 Rule::additive_expr => parse_additive_expr(pair),
1800 Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1801 Rule::power_expr => parse_power_expr(pair),
1802 Rule::unary_expr => parse_unary_expr(pair),
1803 Rule::postfix_expr => parse_postfix_expr(pair),
1804 Rule::primary_expr => parse_primary_expr(pair),
1805 Rule::literal => parse_literal(pair),
1806 Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1807 Rule::if_expr => parse_if_expr(pair),
1808 _ => Ok(Expr::Ident(pair.as_str().to_string())),
1809 }
1810}
1811
1812fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1813 let mut inner = pair.into_inner();
1814 let mut params = Vec::new();
1815
1816 let first = inner.expect_next("lambda parameters")?;
1818 match first.as_rule() {
1819 Rule::identifier_list => {
1820 for p in first.into_inner() {
1821 params.push(p.as_str().to_string());
1822 }
1823 }
1824 Rule::identifier => {
1825 params.push(first.as_str().to_string());
1826 }
1827 _ => {}
1828 }
1829
1830 let body_pair = inner.expect_next("lambda body")?;
1832 let body = match body_pair.as_rule() {
1833 Rule::lambda_block => parse_lambda_block(body_pair)?,
1834 _ => parse_expr_inner(body_pair)?,
1835 };
1836
1837 Ok(Expr::Lambda {
1838 params,
1839 body: Box::new(body),
1840 })
1841}
1842
1843fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1844 let mut stmts = Vec::new();
1845 let mut final_expr = None;
1846
1847 for p in pair.into_inner() {
1848 match p.as_rule() {
1849 Rule::statement => {
1850 let stmt = parse_statement(p)?;
1852 match &stmt.node {
1853 Stmt::VarDecl {
1854 mutable,
1855 name,
1856 ty,
1857 value,
1858 } => {
1859 stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
1860 }
1861 Stmt::Expr(e) => {
1862 final_expr = Some(e.clone());
1864 }
1865 _ => {
1866 }
1868 }
1869 }
1870 _ => {
1871 final_expr = Some(parse_expr_inner(p)?);
1873 }
1874 }
1875 }
1876
1877 if stmts.is_empty() {
1879 Ok(final_expr.unwrap_or(Expr::Null))
1880 } else {
1881 Ok(Expr::Block {
1882 stmts,
1883 result: Box::new(final_expr.unwrap_or(Expr::Null)),
1884 })
1885 }
1886}
1887
1888fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1889 let mut inner = pair.into_inner();
1892 let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
1893
1894 for right_pair in inner {
1895 let right = parse_pattern_and_as_expr(right_pair)?;
1896 left = Expr::Binary {
1897 op: BinOp::Or,
1898 left: Box::new(left),
1899 right: Box::new(right),
1900 };
1901 }
1902 Ok(left)
1903}
1904
1905fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1906 let mut inner = pair.into_inner();
1907 let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
1908
1909 for right_pair in inner {
1910 let right = parse_pattern_xor_as_expr(right_pair)?;
1911 left = Expr::Binary {
1912 op: BinOp::And,
1913 left: Box::new(left),
1914 right: Box::new(right),
1915 };
1916 }
1917 Ok(left)
1918}
1919
1920fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1921 let mut inner = pair.into_inner();
1922 let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
1923
1924 for right_pair in inner {
1925 let right = parse_pattern_unary_as_expr(right_pair)?;
1926 left = Expr::Binary {
1927 op: BinOp::Xor,
1928 left: Box::new(left),
1929 right: Box::new(right),
1930 };
1931 }
1932 Ok(left)
1933}
1934
1935fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1936 let mut inner = pair.into_inner();
1937 let first = inner.expect_next("unary expression or operand")?;
1938
1939 if first.as_str() == "not" {
1940 let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
1941 Ok(Expr::Unary {
1942 op: UnaryOp::Not,
1943 expr: Box::new(expr),
1944 })
1945 } else {
1946 parse_pattern_primary_as_expr(first)
1947 }
1948}
1949
1950fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1951 let inner = pair
1952 .into_inner()
1953 .expect_next("pattern primary expression")?;
1954
1955 match inner.as_rule() {
1956 Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
1957 Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
1958 _ => Ok(Expr::Ident(inner.as_str().to_string())),
1959 }
1960}
1961
1962fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1963 let mut inner = pair.into_inner();
1966 let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
1967
1968 for right_pair in inner {
1969 let right = Expr::Ident(right_pair.as_str().to_string());
1970 left = Expr::Binary {
1971 op: BinOp::FollowedBy,
1972 left: Box::new(left),
1973 right: Box::new(right),
1974 };
1975 }
1976 Ok(left)
1977}
1978
1979fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1980 let inner = pair.into_inner().expect_next("filter expression")?;
1981 parse_filter_or_expr(inner)
1982}
1983
1984fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1985 let mut inner = pair.into_inner();
1986 let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
1987
1988 for right_pair in inner {
1989 let right = parse_filter_and_expr(right_pair)?;
1990 left = Expr::Binary {
1991 op: BinOp::Or,
1992 left: Box::new(left),
1993 right: Box::new(right),
1994 };
1995 }
1996 Ok(left)
1997}
1998
1999fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2000 let mut inner = pair.into_inner();
2001 let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2002
2003 for right_pair in inner {
2004 let right = parse_filter_not_expr(right_pair)?;
2005 left = Expr::Binary {
2006 op: BinOp::And,
2007 left: Box::new(left),
2008 right: Box::new(right),
2009 };
2010 }
2011 Ok(left)
2012}
2013
2014fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2015 let mut inner = pair.into_inner();
2016 let first = inner.expect_next("not or expression")?;
2017
2018 if first.as_str() == "not" {
2019 let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2020 Ok(Expr::Unary {
2021 op: UnaryOp::Not,
2022 expr: Box::new(expr),
2023 })
2024 } else {
2025 parse_filter_comparison_expr(first)
2026 }
2027}
2028
2029fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2030 let mut inner = pair.into_inner();
2031 let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2032
2033 if let Some(op_pair) = inner.next() {
2034 let op = match op_pair.as_str() {
2035 "==" => BinOp::Eq,
2036 "!=" => BinOp::NotEq,
2037 "<" => BinOp::Lt,
2038 "<=" => BinOp::Le,
2039 ">" => BinOp::Gt,
2040 ">=" => BinOp::Ge,
2041 "in" => BinOp::In,
2042 "is" => BinOp::Is,
2043 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2044 _ => BinOp::Eq,
2045 };
2046 let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2047 Ok(Expr::Binary {
2048 op,
2049 left: Box::new(left),
2050 right: Box::new(right),
2051 })
2052 } else {
2053 Ok(left)
2054 }
2055}
2056
2057fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2058 let mut inner = pair.into_inner();
2059 let mut left =
2060 parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2061
2062 while let Some(op_pair) = inner.next() {
2063 let op = if op_pair.as_str() == "-" {
2064 BinOp::Sub
2065 } else {
2066 BinOp::Add
2067 };
2068 if let Some(right_pair) = inner.next() {
2069 let right = parse_filter_multiplicative_expr(right_pair)?;
2070 left = Expr::Binary {
2071 op,
2072 left: Box::new(left),
2073 right: Box::new(right),
2074 };
2075 }
2076 }
2077 Ok(left)
2078}
2079
2080fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2081 let mut inner = pair.into_inner();
2082 let mut left =
2083 parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2084
2085 while let Some(op_pair) = inner.next() {
2086 let op = match op_pair.as_str() {
2087 "*" => BinOp::Mul,
2088 "/" => BinOp::Div,
2089 "%" => BinOp::Mod,
2090 _ => BinOp::Mul,
2091 };
2092 if let Some(right_pair) = inner.next() {
2093 let right = parse_filter_unary_expr(right_pair)?;
2094 left = Expr::Binary {
2095 op,
2096 left: Box::new(left),
2097 right: Box::new(right),
2098 };
2099 }
2100 }
2101 Ok(left)
2102}
2103
2104fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2105 let mut inner = pair.into_inner();
2106 let first = inner.expect_next("unary operator or expression")?;
2107
2108 if first.as_rule() == Rule::filter_unary_op {
2110 let op_str = first.as_str();
2111 let expr =
2112 parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2113 let op = match op_str {
2114 "-" => UnaryOp::Neg,
2115 "~" => UnaryOp::BitNot,
2116 _ => unreachable!("Grammar only allows - or ~"),
2117 };
2118 Ok(Expr::Unary {
2119 op,
2120 expr: Box::new(expr),
2121 })
2122 } else {
2123 parse_filter_postfix_expr(first)
2124 }
2125}
2126
2127fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2128 let mut inner = pair.into_inner();
2129 let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2130
2131 for suffix in inner {
2132 expr = parse_filter_postfix_suffix(expr, suffix)?;
2133 }
2134 Ok(expr)
2135}
2136
2137fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2138 let mut inner = pair.into_inner();
2139
2140 if let Some(first) = inner.next() {
2141 match first.as_rule() {
2142 Rule::identifier => {
2143 Ok(Expr::Member {
2145 expr: Box::new(expr),
2146 member: first.as_str().to_string(),
2147 })
2148 }
2149 Rule::optional_member_access => {
2150 let member = first
2151 .into_inner()
2152 .expect_next("member name")?
2153 .as_str()
2154 .to_string();
2155 Ok(Expr::OptionalMember {
2156 expr: Box::new(expr),
2157 member,
2158 })
2159 }
2160 Rule::index_access => {
2161 let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2162 Ok(Expr::Index {
2163 expr: Box::new(expr),
2164 index: Box::new(index),
2165 })
2166 }
2167 Rule::call_args => {
2168 let args = first
2169 .into_inner()
2170 .filter(|p| p.as_rule() == Rule::arg_list)
2171 .flat_map(|p| p.into_inner())
2172 .map(parse_arg)
2173 .collect::<ParseResult<Vec<_>>>()?;
2174 Ok(Expr::Call {
2175 func: Box::new(expr),
2176 args,
2177 })
2178 }
2179 _ => Ok(expr),
2180 }
2181 } else {
2182 Ok(expr)
2183 }
2184}
2185
2186fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2187 let inner = pair.into_inner().expect_next("filter primary expression")?;
2188
2189 match inner.as_rule() {
2190 Rule::literal => parse_literal(inner),
2191 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2192 Rule::filter_expr => parse_filter_expr(inner),
2193 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2194 }
2195}
2196
2197fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2198 let mut inner = pair.into_inner();
2199 let left = parse_expr_inner(inner.expect_next("range start")?)?;
2200
2201 if let Some(op_pair) = inner.next() {
2202 let inclusive = op_pair.as_str() == "..=";
2203 let right = parse_expr_inner(inner.expect_next("range end")?)?;
2204 Ok(Expr::Range {
2205 start: Box::new(left),
2206 end: Box::new(right),
2207 inclusive,
2208 })
2209 } else {
2210 Ok(left)
2211 }
2212}
2213
2214fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2215 let mut inner = pair.into_inner();
2216 let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2217
2218 for right_pair in inner {
2219 let right = parse_expr_inner(right_pair)?;
2220 left = Expr::Binary {
2221 op: BinOp::Or,
2222 left: Box::new(left),
2223 right: Box::new(right),
2224 };
2225 }
2226
2227 Ok(left)
2228}
2229
2230fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2231 let mut inner = pair.into_inner();
2232 let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2233
2234 for right_pair in inner {
2235 let right = parse_expr_inner(right_pair)?;
2236 left = Expr::Binary {
2237 op: BinOp::And,
2238 left: Box::new(left),
2239 right: Box::new(right),
2240 };
2241 }
2242
2243 Ok(left)
2244}
2245
2246fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2247 let mut inner = pair.into_inner();
2248 let first = inner.expect_next("not keyword or expression")?;
2249
2250 if first.as_str() == "not" {
2251 let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2252 Ok(Expr::Unary {
2253 op: UnaryOp::Not,
2254 expr: Box::new(expr),
2255 })
2256 } else {
2257 parse_expr_inner(first)
2258 }
2259}
2260
2261fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2262 let mut inner = pair.into_inner();
2263 let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2264
2265 if let Some(op_pair) = inner.next() {
2266 let op_str = op_pair.as_str();
2267 let op = match op_str {
2268 "==" => BinOp::Eq,
2269 "!=" => BinOp::NotEq,
2270 "<" => BinOp::Lt,
2271 "<=" => BinOp::Le,
2272 ">" => BinOp::Gt,
2273 ">=" => BinOp::Ge,
2274 "in" => BinOp::In,
2275 "is" => BinOp::Is,
2276 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2277 _ => BinOp::Eq,
2278 };
2279 let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2280 Ok(Expr::Binary {
2281 op,
2282 left: Box::new(left),
2283 right: Box::new(right),
2284 })
2285 } else {
2286 Ok(left)
2287 }
2288}
2289
2290fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2291 parse_binary_chain(pair, BinOp::BitOr)
2292}
2293
2294fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2295 parse_binary_chain(pair, BinOp::BitXor)
2296}
2297
2298fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2299 parse_binary_chain(pair, BinOp::BitAnd)
2300}
2301
2302fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2303 let mut inner = pair.into_inner();
2304 let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2305
2306 while let Some(op_or_expr) = inner.next() {
2307 let op = match op_or_expr.as_str() {
2308 "<<" => BinOp::Shl,
2309 ">>" => BinOp::Shr,
2310 _ => {
2311 let right = parse_expr_inner(op_or_expr)?;
2312 left = Expr::Binary {
2313 op: BinOp::Shl,
2314 left: Box::new(left),
2315 right: Box::new(right),
2316 };
2317 continue;
2318 }
2319 };
2320 if let Some(right_pair) = inner.next() {
2321 let right = parse_expr_inner(right_pair)?;
2322 left = Expr::Binary {
2323 op,
2324 left: Box::new(left),
2325 right: Box::new(right),
2326 };
2327 }
2328 }
2329
2330 Ok(left)
2331}
2332
2333fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2334 let mut inner = pair.into_inner();
2335 let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2336
2337 while let Some(op_pair) = inner.next() {
2338 let op_text = op_pair.as_str();
2339 let op = if op_text == "-" {
2340 BinOp::Sub
2341 } else {
2342 BinOp::Add
2343 };
2344
2345 if let Some(right_pair) = inner.next() {
2346 let right = parse_expr_inner(right_pair)?;
2347 left = Expr::Binary {
2348 op,
2349 left: Box::new(left),
2350 right: Box::new(right),
2351 };
2352 }
2353 }
2354
2355 Ok(left)
2356}
2357
2358fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2359 let mut inner = pair.into_inner();
2360 let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2361
2362 while let Some(op_pair) = inner.next() {
2363 let op_text = op_pair.as_str();
2364 let op = match op_text {
2365 "*" => BinOp::Mul,
2366 "/" => BinOp::Div,
2367 "%" => BinOp::Mod,
2368 _ => BinOp::Mul,
2369 };
2370
2371 if let Some(right_pair) = inner.next() {
2372 let right = parse_expr_inner(right_pair)?;
2373 left = Expr::Binary {
2374 op,
2375 left: Box::new(left),
2376 right: Box::new(right),
2377 };
2378 }
2379 }
2380
2381 Ok(left)
2382}
2383
2384fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2385 let mut inner = pair.into_inner();
2386 let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2387
2388 if let Some(exp_pair) = inner.next() {
2389 let exp = parse_expr_inner(exp_pair)?;
2390 Ok(Expr::Binary {
2391 op: BinOp::Pow,
2392 left: Box::new(base),
2393 right: Box::new(exp),
2394 })
2395 } else {
2396 Ok(base)
2397 }
2398}
2399
2400fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2401 let mut inner = pair.into_inner();
2402 let first = inner.expect_next("unary operator or expression")?;
2403
2404 match first.as_rule() {
2406 Rule::unary_op => {
2407 let op_str = first.as_str();
2408 let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2409 let op = match op_str {
2410 "-" => UnaryOp::Neg,
2411 "~" => UnaryOp::BitNot,
2412 _ => unreachable!("Grammar only allows - or ~"),
2413 };
2414 Ok(Expr::Unary {
2415 op,
2416 expr: Box::new(expr),
2417 })
2418 }
2419 _ => parse_expr_inner(first),
2420 }
2421}
2422
2423fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2424 let mut inner = pair.into_inner();
2425 let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2426
2427 for suffix in inner {
2428 expr = parse_postfix_suffix(expr, suffix)?;
2429 }
2430
2431 Ok(expr)
2432}
2433
2434fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2435 let inner = pair.into_inner().expect_next("postfix suffix")?;
2436
2437 match inner.as_rule() {
2438 Rule::member_access => {
2439 let member = inner
2440 .into_inner()
2441 .expect_next("member name")?
2442 .as_str()
2443 .to_string();
2444 Ok(Expr::Member {
2445 expr: Box::new(expr),
2446 member,
2447 })
2448 }
2449 Rule::optional_member_access => {
2450 let member = inner
2451 .into_inner()
2452 .expect_next("member name")?
2453 .as_str()
2454 .to_string();
2455 Ok(Expr::OptionalMember {
2456 expr: Box::new(expr),
2457 member,
2458 })
2459 }
2460 Rule::slice_access => {
2461 let slice_range = inner.into_inner().expect_next("slice range")?;
2463 let slice_inner = slice_range.into_inner();
2464
2465 let mut start = None;
2466 let mut end = None;
2467
2468 for p in slice_inner {
2469 match p.as_rule() {
2470 Rule::slice_start => {
2471 start = Some(Box::new(parse_expr_inner(
2472 p.into_inner().expect_next("slice start expression")?,
2473 )?));
2474 }
2475 Rule::slice_end => {
2476 end = Some(Box::new(parse_expr_inner(
2477 p.into_inner().expect_next("slice end expression")?,
2478 )?));
2479 }
2480 _ => {}
2481 }
2482 }
2483
2484 Ok(Expr::Slice {
2485 expr: Box::new(expr),
2486 start,
2487 end,
2488 })
2489 }
2490 Rule::index_access => {
2491 let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2492 Ok(Expr::Index {
2493 expr: Box::new(expr),
2494 index: Box::new(index),
2495 })
2496 }
2497 Rule::call_args => {
2498 let mut args = Vec::new();
2499 for p in inner.into_inner() {
2500 if p.as_rule() == Rule::arg_list {
2501 for arg in p.into_inner() {
2502 args.push(parse_arg(arg)?);
2503 }
2504 }
2505 }
2506 Ok(Expr::Call {
2507 func: Box::new(expr),
2508 args,
2509 })
2510 }
2511 _ => Ok(expr),
2512 }
2513}
2514
2515fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2516 let mut inner = pair.into_inner();
2517 let first = inner.expect_next("argument")?;
2518
2519 if let Some(second) = inner.next() {
2520 Ok(Arg::Named(
2521 first.as_str().to_string(),
2522 parse_expr_inner(second)?,
2523 ))
2524 } else {
2525 Ok(Arg::Positional(parse_expr_inner(first)?))
2526 }
2527}
2528
2529fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2530 let inner = pair.into_inner().expect_next("primary expression")?;
2531
2532 match inner.as_rule() {
2533 Rule::if_expr => parse_if_expr(inner),
2534 Rule::literal => parse_literal(inner),
2535 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2536 Rule::array_literal => parse_array_literal(inner),
2537 Rule::map_literal => parse_map_literal(inner),
2538 Rule::expr => parse_expr(inner),
2539 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2540 }
2541}
2542
2543fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2544 let mut inner = pair.into_inner();
2545 let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2546 let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2547 let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2548
2549 Ok(Expr::If {
2550 cond: Box::new(cond),
2551 then_branch: Box::new(then_branch),
2552 else_branch: Box::new(else_branch),
2553 })
2554}
2555
2556fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2557 let inner = pair.into_inner().expect_next("literal value")?;
2558
2559 match inner.as_rule() {
2560 Rule::integer => inner
2561 .as_str()
2562 .parse::<i64>()
2563 .map(Expr::Int)
2564 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2565 Rule::float => inner
2566 .as_str()
2567 .parse::<f64>()
2568 .map(Expr::Float)
2569 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2570 Rule::string => {
2571 let s = inner.as_str();
2572 Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2573 }
2574 Rule::duration => Ok(Expr::Duration(
2575 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2576 )),
2577 Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2578 Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2579 Rule::null => Ok(Expr::Null),
2580 _ => Ok(Expr::Null),
2581 }
2582}
2583
2584fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2585 let mut items = Vec::new();
2586 for p in pair.into_inner() {
2587 if p.as_rule() == Rule::expr_list {
2588 for expr in p.into_inner() {
2589 items.push(parse_expr(expr)?);
2590 }
2591 }
2592 }
2593 Ok(Expr::Array(items))
2594}
2595
2596fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2597 let mut entries = Vec::new();
2598 for p in pair.into_inner() {
2599 if p.as_rule() == Rule::map_entry_list {
2600 for entry in p.into_inner() {
2601 let mut inner = entry.into_inner();
2602 let key = inner.expect_next("map key")?.as_str().to_string();
2603 let key = if key.starts_with('"') {
2604 key[1..key.len() - 1].to_string()
2605 } else {
2606 key
2607 };
2608 let value = parse_expr(inner.expect_next("map value")?)?;
2609 entries.push((key, value));
2610 }
2611 }
2612 }
2613 Ok(Expr::Map(entries))
2614}
2615
2616fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2617 let mut inner = pair.into_inner();
2618 let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2619
2620 for right_pair in inner {
2621 let right = parse_expr_inner(right_pair)?;
2622 left = Expr::Binary {
2623 op,
2624 left: Box::new(left),
2625 right: Box::new(right),
2626 };
2627 }
2628
2629 Ok(left)
2630}
2631
2632#[cfg(test)]
2633mod tests {
2634 use super::*;
2635
2636 #[test]
2637 fn test_parse_simple_stream() {
2638 let result = parse("stream output = input");
2639 assert!(result.is_ok(), "Failed: {:?}", result.err());
2640 }
2641
2642 #[test]
2643 fn test_parse_stream_with_filter() {
2644 let result = parse("stream output = input.where(value > 100)");
2645 assert!(result.is_ok(), "Failed: {:?}", result.err());
2646 }
2647
2648 #[test]
2649 fn test_parse_stream_with_map() {
2650 let result = parse("stream output = input.map(x * 2)");
2651 assert!(result.is_ok(), "Failed: {:?}", result.err());
2652 }
2653
2654 #[test]
2655 fn test_parse_event_declaration() {
2656 let result = parse("event SensorReading:\n sensor_id: str\n value: float");
2657 assert!(result.is_ok(), "Failed: {:?}", result.err());
2658 }
2659
2660 #[test]
2661 fn test_parse_variable() {
2662 let result = parse("let x = 42");
2663 assert!(result.is_ok(), "Failed: {:?}", result.err());
2664 }
2665
2666 #[test]
2667 fn test_parse_function() {
2668 let result = parse("fn add(a: int, b: int) -> int:\n return a + b");
2669 assert!(result.is_ok(), "Failed: {:?}", result.err());
2670 }
2671
2672 #[test]
2673 fn test_parse_lambda() {
2674 let result = parse("let f = (x) => x * 2");
2675 assert!(result.is_ok(), "Failed: {:?}", result.err());
2676 }
2677
2678 #[test]
2679 fn test_parse_if_expression() {
2680 let result = parse("let x = if a > b then a else b");
2681 assert!(result.is_ok(), "Failed: {:?}", result.err());
2682 }
2683
2684 #[test]
2685 fn test_parse_followed_by() {
2686 let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2687 assert!(result.is_ok(), "Failed: {:?}", result.err());
2688 }
2689
2690 #[test]
2691 fn test_parse_window() {
2692 let result = parse("stream windowed = input.window(5s)");
2693 assert!(result.is_ok(), "Failed: {:?}", result.err());
2694 }
2695
2696 #[test]
2697 fn test_parse_aggregate() {
2698 let result =
2699 parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2700 assert!(result.is_ok(), "Failed: {:?}", result.err());
2701 }
2702
2703 #[test]
2704 fn test_parse_merge() {
2705 let result = parse("stream combined = merge(stream1, stream2)");
2706 assert!(result.is_ok(), "Failed: {:?}", result.err());
2707 }
2708
2709 #[test]
2710 fn test_parse_sequence() {
2711 let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2712 assert!(result.is_ok(), "Failed: {:?}", result.err());
2713 }
2714
2715 #[test]
2716 fn test_parse_config() {
2717 let result = parse("config:\n window_size: 5s\n batch_size: 100");
2718 assert!(result.is_ok(), "Failed: {:?}", result.err());
2719 }
2720
2721 #[test]
2722 fn test_parse_complex_expression() {
2723 let result = parse("let x = (a + b) * c / d - e");
2724 assert!(result.is_ok(), "Failed: {:?}", result.err());
2725 }
2726
2727 #[test]
2728 fn test_parse_sliding_window() {
2729 let result = parse("stream output = input.window(5m, sliding: 1m)");
2730 assert!(result.is_ok(), "Failed: {:?}", result.err());
2731 }
2732
2733 #[test]
2734 fn test_parse_fork_construct() {
2735 let result =
2736 parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2737 assert!(result.is_ok(), "Failed: {:?}", result.err());
2738 }
2739
2740 #[test]
2741 fn test_parse_aggregate_functions() {
2742 let result = parse(
2743 "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2744 );
2745 assert!(result.is_ok(), "Failed: {:?}", result.err());
2746 }
2747
2748 #[test]
2749 fn test_parse_complex_parentheses() {
2750 let result = parse("let x = ((a + b) * (c - d)) / e");
2751 assert!(result.is_ok(), "Failed: {:?}", result.err());
2752 }
2753
2754 #[test]
2755 fn test_parse_sequence_with_alias() {
2756 let result = parse(
2757 r#"
2758 stream TwoTicks = StockTick as first
2759 -> StockTick as second
2760 .emit(result: "two_ticks")
2761 "#,
2762 );
2763 assert!(result.is_ok(), "Failed: {:?}", result.err());
2764 }
2765
2766 #[test]
2767 fn test_parse_followed_by_with_alias() {
2768 let result = parse("stream alerts = Order as a -> Payment as b");
2769 assert!(result.is_ok(), "Failed: {:?}", result.err());
2770 }
2771
2772 #[test]
2773 fn test_parse_followed_by_with_filter_and_alias() {
2774 let result = parse(
2776 r#"
2777 stream Test = A as a
2778 -> B where value == a.base + 10 as b
2779 .emit(status: "matched")
2780 "#,
2781 );
2782 assert!(result.is_ok(), "Failed: {:?}", result.err());
2783 }
2784
2785 #[test]
2786 fn test_parse_pattern_with_lambda() {
2787 let result =
2788 parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2789 assert!(result.is_ok(), "Failed: {:?}", result.err());
2790 }
2791
2792 #[test]
2793 fn test_parse_sase_pattern_decl_simple() {
2794 let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2795 assert!(result.is_ok(), "Failed: {:?}", result.err());
2796 }
2797
2798 #[test]
2799 fn test_parse_sase_pattern_decl_with_kleene() {
2800 let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2801 assert!(result.is_ok(), "Failed: {:?}", result.err());
2802 }
2803
2804 #[test]
2805 fn test_parse_sase_pattern_decl_with_alias() {
2806 let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2807 assert!(result.is_ok(), "Failed: {:?}", result.err());
2808 }
2809
2810 #[test]
2811 fn test_parse_sase_pattern_decl_with_within() {
2812 let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2813 assert!(result.is_ok(), "Failed: {:?}", result.err());
2814 }
2815
2816 #[test]
2817 fn test_parse_sase_pattern_decl_with_partition() {
2818 let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2819 assert!(result.is_ok(), "Failed: {:?}", result.err());
2820 }
2821
2822 #[test]
2823 fn test_parse_sase_pattern_decl_full() {
2824 let result = parse(
2826 "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2827 );
2828 assert!(result.is_ok(), "Failed: {:?}", result.err());
2829 }
2830
2831 #[test]
2832 fn test_parse_sase_pattern_decl_or() {
2833 let result = parse("pattern AlertOrWarn = Login OR Logout");
2834 assert!(result.is_ok(), "Failed: {:?}", result.err());
2835 }
2836
2837 #[test]
2838 fn test_parse_sase_pattern_decl_and() {
2839 let result = parse("pattern BothEvents = Login AND Transaction");
2840 assert!(result.is_ok(), "Failed: {:?}", result.err());
2841 }
2842
2843 #[test]
2844 fn test_parse_sase_pattern_decl_not() {
2845 let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
2846 assert!(result.is_ok(), "Failed: {:?}", result.err());
2847 }
2848
2849 #[test]
2850 fn test_parse_having() {
2851 let result = parse(
2852 "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
2853 );
2854 assert!(result.is_ok(), "Failed: {:?}", result.err());
2855 }
2856
2857 #[test]
2858 fn test_parse_having_with_partition() {
2859 let result = parse(
2860 "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
2861 );
2862 assert!(result.is_ok(), "Failed: {:?}", result.err());
2863 }
2864
2865 #[test]
2866 fn test_parse_timer_source() {
2867 let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
2868 assert!(result.is_ok(), "Failed: {:?}", result.err());
2869 }
2870
2871 #[test]
2872 fn test_parse_timer_source_with_initial_delay() {
2873 let result =
2874 parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
2875 assert!(result.is_ok(), "Failed: {:?}", result.err());
2876 }
2877
2878 #[test]
2879 fn test_parse_var_decl() {
2880 let result = parse("var threshold: float = 10.0");
2881 assert!(result.is_ok(), "Failed: {:?}", result.err());
2882 }
2883
2884 #[test]
2885 fn test_parse_let_decl() {
2886 let result = parse("let max_count: int = 100");
2887 assert!(result.is_ok(), "Failed: {:?}", result.err());
2888 }
2889
2890 #[test]
2891 fn test_parse_assignment() {
2892 let result = parse("threshold := threshold + 10.0");
2893 assert!(result.is_ok(), "Failed: {:?}", result.err());
2894 }
2895
2896 #[test]
2897 fn test_parse_assignment_with_expression() {
2898 let result = parse("count := count * 2 + offset");
2899 assert!(result.is_ok(), "Failed: {:?}", result.err());
2900 }
2901
2902 #[test]
2903 fn test_parse_nested_stream_reference() {
2904 let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
2905 assert!(result.is_ok(), "Failed: {:?}", result.err());
2906 }
2907
2908 #[test]
2909 fn test_parse_multi_stage_pipeline() {
2910 let result = parse(
2911 "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
2912 );
2913 assert!(result.is_ok(), "Failed: {:?}", result.err());
2914 }
2915
2916 #[test]
2917 fn test_parse_stream_with_operations_chain() {
2918 let result = parse(
2919 "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
2920 );
2921 assert!(result.is_ok(), "Failed: {:?}", result.err());
2922 }
2923
2924 #[test]
2929 fn test_parse_connector_mqtt() {
2930 let result = parse(
2931 r#"connector MqttBroker = mqtt (
2932 host: "localhost",
2933 port: 1883,
2934 client_id: "varpulis"
2935 )"#,
2936 );
2937 assert!(result.is_ok(), "Failed: {:?}", result.err());
2938 }
2939
2940 #[test]
2941 fn test_parse_connector_kafka() {
2942 let result = parse(
2943 r#"connector KafkaCluster = kafka (
2944 brokers: ["kafka1:9092", "kafka2:9092"],
2945 group_id: "my-group"
2946 )"#,
2947 );
2948 assert!(result.is_ok(), "Failed: {:?}", result.err());
2949 }
2950
2951 #[test]
2952 fn test_parse_connector_http() {
2953 let result = parse(
2954 r#"connector ApiEndpoint = http (
2955 base_url: "https://api.example.com"
2956 )"#,
2957 );
2958 assert!(result.is_ok(), "Failed: {:?}", result.err());
2959 }
2960
2961 #[test]
2962 fn test_parse_stream_with_from_connector() {
2963 let result = parse(
2964 r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
2965 );
2966 assert!(result.is_ok(), "Failed: {:?}", result.err());
2967 }
2968
2969 #[test]
2970 fn test_parse_stream_with_from_and_operations() {
2971 let result = parse(
2972 r#"stream HighTemp = TemperatureReading
2973 .from(MqttSensors, topic: "sensors/#")
2974 .where(value > 30)
2975 .emit(alert: "high_temp")"#,
2976 );
2977 assert!(result.is_ok(), "Failed: {:?}", result.err());
2978 }
2979
2980 #[test]
2981 fn test_parse_full_connectivity_pipeline() {
2982 let result = parse(
2983 r#"
2984 connector MqttSensors = mqtt (host: "localhost", port: 1883)
2985 connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
2986
2987 event TemperatureReading:
2988 sensor_id: str
2989 value: float
2990 ts: timestamp
2991
2992 stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
2993
2994 stream HighTempAlert = Temperatures
2995 .where(value > 30)
2996 .emit(alert_type: "HIGH_TEMP", temperature: value)
2997
2998 "#,
2999 );
3000 assert!(result.is_ok(), "Failed: {:?}", result.err());
3001 }
3002
3003 #[test]
3004 fn test_parse_emit_as_type() {
3005 let result = parse(
3006 r#"stream Alerts = Temperatures
3007 .where(value > 30)
3008 .emit as AlertEvent(severity: "high", temp: value)"#,
3009 );
3010 assert!(result.is_ok(), "Failed: {:?}", result.err());
3011 }
3012
3013 #[test]
3014 fn test_parse_stream_with_to_connector() {
3015 let result = parse(
3016 r#"stream Output = Input
3017 .where(x > 0)
3018 .emit(y: x * 2)
3019 .to(KafkaOutput, topic: "output")"#,
3020 );
3021 assert!(result.is_ok(), "Failed: {:?}", result.err());
3022 }
3023
3024 #[test]
3025 fn test_emit_stmt_parses() {
3026 let result = parse(
3027 r#"fn test():
3028 emit Pixel(x: 1, y: 2)"#,
3029 );
3030 assert!(result.is_ok(), "Failed: {:?}", result.err());
3031 let program = result.unwrap();
3032 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3034 match &body[0].node {
3035 Stmt::Emit { event_type, fields } => {
3036 assert_eq!(event_type, "Pixel");
3037 assert_eq!(fields.len(), 2);
3038 assert_eq!(fields[0].name, "x");
3039 assert_eq!(fields[1].name, "y");
3040 }
3041 other => panic!("Expected Stmt::Emit, got {:?}", other),
3042 }
3043 } else {
3044 panic!("Expected FnDecl");
3045 }
3046 }
3047
3048 #[test]
3049 fn test_emit_stmt_no_args() {
3050 let result = parse(
3051 r#"fn test():
3052 emit Done()"#,
3053 );
3054 assert!(result.is_ok(), "Failed: {:?}", result.err());
3055 let program = result.unwrap();
3056 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3057 match &body[0].node {
3058 Stmt::Emit { event_type, fields } => {
3059 assert_eq!(event_type, "Done");
3060 assert!(fields.is_empty());
3061 }
3062 other => panic!("Expected Stmt::Emit, got {:?}", other),
3063 }
3064 } else {
3065 panic!("Expected FnDecl");
3066 }
3067 }
3068
3069 #[test]
3070 fn test_emit_in_function_with_for_loop() {
3071 let result = parse(
3072 r#"fn generate(n: int):
3073 for i in 0..n:
3074 emit Item(index: i, value: i * 2)"#,
3075 );
3076 assert!(result.is_ok(), "Failed: {:?}", result.err());
3077 }
3078
3079 #[test]
3080 fn test_parse_process_op() {
3081 let result = parse(
3082 r#"fn do_work():
3083 emit Result(v: 42)
3084
3085stream S = timer(1s).process(do_work())"#,
3086 );
3087 assert!(result.is_ok(), "Failed: {:?}", result.err());
3088 }
3089
3090 #[test]
3091 fn test_parse_trend_aggregate_count_trends() {
3092 let result = parse(
3093 r#"stream S = StockTick as first
3094 -> all StockTick where price > first.price as rising
3095 -> StockTick where price < rising.price as drop
3096 .within(60s)
3097 .trend_aggregate(count: count_trends())
3098 .emit(event_type: "TrendStats", trends: count)"#,
3099 );
3100 assert!(result.is_ok(), "Failed: {:?}", result.err());
3101 let program = result.unwrap();
3102 for stmt in &program.statements {
3104 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3105 let has_trend_agg = ops
3106 .iter()
3107 .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3108 assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3109 for op in ops {
3111 if let StreamOp::TrendAggregate(items) = op {
3112 assert_eq!(items.len(), 1);
3113 assert_eq!(items[0].alias, "count");
3114 assert_eq!(items[0].func, "count_trends");
3115 assert!(items[0].arg.is_none());
3116 }
3117 }
3118 return;
3119 }
3120 }
3121 panic!("No stream declaration found");
3122 }
3123
3124 #[test]
3125 fn test_parse_trend_aggregate_multiple_items() {
3126 let result = parse(
3127 r#"stream S = StockTick as first
3128 -> all StockTick as rising
3129 .within(60s)
3130 .trend_aggregate(
3131 trend_count: count_trends(),
3132 event_count: count_events(rising)
3133 )
3134 .emit(trends: trend_count, events: event_count)"#,
3135 );
3136 assert!(result.is_ok(), "Failed: {:?}", result.err());
3137 let program = result.unwrap();
3138 for stmt in &program.statements {
3139 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3140 for op in ops {
3141 if let StreamOp::TrendAggregate(items) = op {
3142 assert_eq!(items.len(), 2);
3143 assert_eq!(items[0].alias, "trend_count");
3144 assert_eq!(items[0].func, "count_trends");
3145 assert_eq!(items[1].alias, "event_count");
3146 assert_eq!(items[1].func, "count_events");
3147 assert!(items[1].arg.is_some());
3148 return;
3149 }
3150 }
3151 }
3152 }
3153 panic!("No TrendAggregate found");
3154 }
3155
3156 #[test]
3157 fn test_parse_score_basic() {
3158 let result = parse(
3159 r#"stream S = TradeEvent
3160 .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3161 );
3162 assert!(result.is_ok(), "Failed: {:?}", result.err());
3163 let program = result.unwrap();
3164 for stmt in &program.statements {
3165 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3166 for op in ops {
3167 if let StreamOp::Score(spec) = op {
3168 assert_eq!(spec.model_path, "models/fraud.onnx");
3169 assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3170 assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3171 return;
3172 }
3173 }
3174 }
3175 }
3176 panic!("No Score op found");
3177 }
3178
3179 #[test]
3180 fn test_parse_score_single_field() {
3181 let result = parse(
3182 r#"stream S = Event
3183 .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3184 );
3185 assert!(result.is_ok(), "Failed: {:?}", result.err());
3186 let program = result.unwrap();
3187 for stmt in &program.statements {
3188 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3189 for op in ops {
3190 if let StreamOp::Score(spec) = op {
3191 assert_eq!(spec.model_path, "model.onnx");
3192 assert_eq!(spec.inputs, vec!["value"]);
3193 assert_eq!(spec.outputs, vec!["prediction"]);
3194 return;
3195 }
3196 }
3197 }
3198 }
3199 panic!("No Score op found");
3200 }
3201
3202 #[test]
3207 fn fuzz_regression_unmatched_brackets_timeout() {
3208 let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3210 let start = std::time::Instant::now();
3211 let result = parse(input);
3212 let elapsed = start.elapsed();
3213 assert!(
3214 result.is_err(),
3215 "Should reject deeply nested unmatched brackets"
3216 );
3217 assert!(
3218 elapsed.as_millis() < 100,
3219 "Parser should reject fast, took {:?}",
3220 elapsed
3221 );
3222 }
3223
3224 #[test]
3225 fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3226 let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3228 let start = std::time::Instant::now();
3229 let result = parse(input);
3230 let elapsed = start.elapsed();
3231 assert!(result.is_err(), "Should reject deeply nested brackets");
3232 assert!(
3233 elapsed.as_millis() < 100,
3234 "Parser should reject fast, took {:?}",
3235 elapsed
3236 );
3237 }
3238
3239 #[test]
3240 fn nesting_depth_allows_reasonable_programs() {
3241 let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3243 let result = parse(input);
3244 if let Err(ref e) = result {
3247 let msg = format!("{}", e);
3248 assert!(
3249 !msg.contains("Nesting depth"),
3250 "Should allow 10 levels of nesting: {}",
3251 msg
3252 );
3253 }
3254 }
3255
3256 #[test]
3257 fn nesting_depth_ignores_brackets_in_comments() {
3258 let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3260 let result = parse(input);
3261 assert!(
3262 result.is_ok(),
3263 "Brackets in comments should be ignored: {:?}",
3264 result.err()
3265 );
3266 }
3267
3268 #[test]
3269 fn nesting_depth_ignores_brackets_in_strings() {
3270 let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3272 let result = parse(input);
3273 if let Err(ref e) = result {
3274 let msg = format!("{}", e);
3275 assert!(
3276 !msg.contains("Nesting depth"),
3277 "Brackets in strings should be ignored: {}",
3278 msg
3279 );
3280 }
3281 }
3282}