1use pest::Parser;
9use pest_derive::Parser;
10
11use crate::error::{ParseError, ParseResult};
12use crate::helpers::{parse_duration, parse_timestamp};
13use crate::indent::preprocess_indentation;
14use varpulis_core::ast::*;
15use varpulis_core::span::{Span, Spanned};
16use varpulis_core::types::Type;
17
18trait IteratorExt<'a> {
20 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
22}
23
24impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
25 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
26 self.next().ok_or_else(|| ParseError::Located {
27 line: 0,
28 column: 0,
29 position: 0,
30 message: format!("Expected {expected}"),
31 hint: None,
32 })
33 }
34}
35
36#[derive(Debug, Parser)]
41#[grammar = "varpulis.pest"]
42pub struct VarpulisParser;
43
44pub fn parse(source: &str) -> ParseResult<Program> {
50 let source = source.to_string();
51 std::thread::Builder::new()
52 .stack_size(16 * 1024 * 1024)
53 .spawn(move || parse_inner(&source))
54 .map_err(|e| ParseError::InvalidToken {
55 position: 0,
56 message: format!("Failed to spawn parser thread: {e}"),
57 })?
58 .join()
59 .unwrap_or_else(|_| {
60 Err(ParseError::InvalidToken {
61 position: 0,
62 message: "Parser stack overflow on deeply nested input".to_string(),
63 })
64 })
65}
66
67const MAX_NESTING_DEPTH: usize = 10;
76
77fn check_nesting_depth(source: &str) -> ParseResult<()> {
81 let mut depth: usize = 0;
82 let mut max_depth: usize = 0;
83 let mut max_depth_pos: usize = 0;
84 let bytes = source.as_bytes();
85 let len = bytes.len();
86 let mut i = 0;
87
88 while i < len {
89 let b = bytes[i];
90
91 if b == b'"' {
93 i += 1;
94 while i < len {
95 if bytes[i] == b'\\' {
96 i += 2; continue;
98 }
99 if bytes[i] == b'"' {
100 i += 1;
101 break;
102 }
103 i += 1;
104 }
105 continue;
106 }
107
108 if b == b'#' {
110 i += 1;
111 while i < len && bytes[i] != b'\n' {
112 i += 1;
113 }
114 continue;
115 }
116
117 if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
119 i += 2;
120 while i + 1 < len {
121 if bytes[i] == b'*' && bytes[i + 1] == b'/' {
122 i += 2;
123 break;
124 }
125 i += 1;
126 }
127 continue;
128 }
129
130 if b == b'(' || b == b'[' || b == b'{' {
132 depth += 1;
133 if depth > max_depth {
134 max_depth = depth;
135 max_depth_pos = i;
136 }
137 } else if b == b')' || b == b']' || b == b'}' {
138 depth = depth.saturating_sub(1);
139 }
140
141 if max_depth > MAX_NESTING_DEPTH {
142 return Err(ParseError::InvalidToken {
143 position: max_depth_pos,
144 message: format!("Nesting depth exceeds maximum of {MAX_NESTING_DEPTH} levels"),
145 });
146 }
147
148 i += 1;
149 }
150
151 Ok(())
152}
153
154fn parse_inner(source: &str) -> ParseResult<Program> {
155 let expanded =
157 crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
158 position: 0,
159 message: e,
160 })?;
161 let preprocessed = preprocess_indentation(&expanded);
163
164 check_nesting_depth(&preprocessed)?;
166
167 let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
168
169 let mut statements = Vec::new();
170
171 for pair in pairs {
172 if pair.as_rule() == Rule::program {
173 for inner in pair.into_inner() {
174 if inner.as_rule() == Rule::statement {
175 statements.push(parse_statement(inner)?);
176 }
177 }
178 }
179 }
180
181 Ok(crate::optimize::fold_program(Program { statements }))
182}
183
184fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
185 let position = match e.location {
186 pest::error::InputLocation::Pos(p) => p,
187 pest::error::InputLocation::Span((s, _)) => s,
188 };
189
190 let (line, column) = match e.line_col {
192 pest::error::LineColLocation::Pos((l, c)) => (l, c),
193 pest::error::LineColLocation::Span((l, c), _) => (l, c),
194 };
195
196 let (message, hint) = match &e.variant {
198 pest::error::ErrorVariant::ParsingError {
199 positives,
200 negatives: _,
201 } => {
202 if positives.is_empty() {
203 ("Unexpected token".to_string(), None)
204 } else if is_stream_op_error(positives) {
205 (
207 "unknown stream operation".to_string(),
208 Some(
209 "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
210 .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
211 .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
212 .to_string(),
213 ),
214 )
215 } else {
216 let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
217 if expected.len() == 1 {
218 (format!("Expected {}", expected[0]), None)
219 } else {
220 (format!("Expected one of: {}", expected.join(", ")), None)
221 }
222 }
223 }
224 pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
225 };
226
227 ParseError::Located {
228 line,
229 column,
230 position,
231 message,
232 hint,
233 }
234}
235
236fn format_rule_name(rule: &Rule) -> String {
238 match rule {
239 Rule::identifier => "identifier".to_string(),
240 Rule::integer => "number".to_string(),
241 Rule::float => "number".to_string(),
242 Rule::string => "string".to_string(),
243 Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
244 Rule::type_expr => "type".to_string(),
245 Rule::expr => "expression".to_string(),
246 Rule::statement => "statement".to_string(),
247 Rule::context_decl => "context declaration".to_string(),
248 Rule::stream_decl => "stream declaration".to_string(),
249 Rule::pattern_decl => "pattern declaration".to_string(),
250 Rule::event_decl => "event declaration".to_string(),
251 Rule::fn_decl => "function declaration".to_string(),
252 Rule::INDENT => "indented block".to_string(),
253 Rule::DEDENT => "end of block".to_string(),
254 Rule::field => "field declaration (name: type)".to_string(),
255 Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
256 Rule::additive_op => "operator (+, -)".to_string(),
257 Rule::multiplicative_op => "operator (*, /, %)".to_string(),
258 Rule::postfix_suffix => "method call or member access".to_string(),
259 Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
260 Rule::sase_seq_expr => "SEQ expression".to_string(),
261 Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
262 _ => format!("{rule:?}").to_lowercase().replace('_', " "),
263 }
264}
265
266fn is_stream_op_error(positives: &[Rule]) -> bool {
270 const STREAM_OP_RULES: &[Rule] = &[
271 Rule::context_op,
272 Rule::where_op,
273 Rule::select_op,
274 Rule::window_op,
275 Rule::aggregate_op,
276 Rule::having_op,
277 Rule::partition_by_op,
278 Rule::order_by_op,
279 Rule::limit_op,
280 Rule::distinct_op,
281 Rule::map_op,
282 Rule::filter_op,
283 Rule::tap_op,
284 Rule::print_op,
285 Rule::log_op,
286 Rule::emit_op,
287 Rule::to_op,
288 Rule::pattern_op,
289 Rule::concurrent_op,
290 Rule::process_op,
291 Rule::on_error_op,
292 Rule::collect_op,
293 Rule::on_op,
294 Rule::within_op,
295 Rule::not_op,
296 Rule::fork_op,
297 Rule::any_op,
298 Rule::all_op,
299 Rule::first_op,
300 Rule::watermark_op,
301 Rule::allowed_lateness_op,
302 Rule::trend_aggregate_op,
303 Rule::score_op,
304 Rule::forecast_op,
305 Rule::enrich_op,
306 ];
307 positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
308}
309
310fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
311 let span = Span::new(pair.as_span().start(), pair.as_span().end());
312 let inner = pair.into_inner().expect_next("statement body")?;
313
314 let stmt = match inner.as_rule() {
315 Rule::context_decl => parse_context_decl(inner)?,
316 Rule::connector_decl => parse_connector_decl(inner)?,
317 Rule::stream_decl => parse_stream_decl(inner)?,
318 Rule::pattern_decl => parse_pattern_decl(inner)?,
319 Rule::event_decl => parse_event_decl(inner)?,
320 Rule::type_decl => parse_type_decl(inner)?,
321 Rule::var_decl => parse_var_decl(inner)?,
322 Rule::const_decl => parse_const_decl(inner)?,
323 Rule::fn_decl => parse_fn_decl(inner)?,
324 Rule::config_block => parse_config_block(inner)?,
325 Rule::import_stmt => parse_import_stmt(inner)?,
326 Rule::if_stmt => parse_if_stmt(inner)?,
327 Rule::for_stmt => parse_for_stmt(inner)?,
328 Rule::while_stmt => parse_while_stmt(inner)?,
329 Rule::return_stmt => parse_return_stmt(inner)?,
330 Rule::break_stmt => Stmt::Break,
331 Rule::continue_stmt => Stmt::Continue,
332 Rule::emit_stmt => parse_emit_stmt(inner)?,
333 Rule::assignment_stmt => {
334 let mut inner = inner.into_inner();
335 let name = inner.expect_next("variable name")?.as_str().to_string();
336 let value = parse_expr(inner.expect_next("assignment value")?)?;
337 Stmt::Assignment { name, value }
338 }
339 Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
340 _ => {
341 return Err(ParseError::UnexpectedToken {
342 position: span.start,
343 expected: "statement".to_string(),
344 found: format!("{:?}", inner.as_rule()),
345 })
346 }
347 };
348
349 Ok(Spanned::new(stmt, span))
350}
351
352fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
357 let mut inner = pair.into_inner();
358 let name = inner.expect_next("context name")?.as_str().to_string();
359 let mut cores = None;
360
361 for p in inner {
362 if p.as_rule() == Rule::context_params {
363 for param in p.into_inner() {
364 if param.as_rule() == Rule::context_param {
365 let core_ids: Vec<usize> = param
367 .into_inner()
368 .filter(|p| p.as_rule() == Rule::integer)
369 .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
370 .collect();
371 cores = Some(core_ids);
372 }
373 }
374 }
375 }
376
377 Ok(Stmt::ContextDecl { name, cores })
378}
379
380fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
385 let mut inner = pair.into_inner();
386 let name = inner.expect_next("connector name")?.as_str().to_string();
387 let connector_type = inner.expect_next("connector type")?.as_str().to_string();
388 let mut params = Vec::new();
389
390 for p in inner {
391 if p.as_rule() == Rule::connector_params {
392 params = parse_connector_params(p)?;
393 }
394 }
395
396 Ok(Stmt::ConnectorDecl {
397 name,
398 connector_type,
399 params,
400 })
401}
402
403fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
404 let mut params = Vec::new();
405 for p in pair.into_inner() {
406 if p.as_rule() == Rule::connector_param {
407 let mut inner = p.into_inner();
408 let name = inner.expect_next("param name")?.as_str().to_string();
409 let value_pair = inner.expect_next("param value")?;
410 let value = parse_config_value(value_pair)?;
411 params.push(ConnectorParam { name, value });
412 }
413 }
414 Ok(params)
415}
416
417fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
418 let mut inner = pair.into_inner();
419 let name = inner.expect_next("stream name")?.as_str().to_string();
420
421 let mut type_annotation = None;
422 let mut source = StreamSource::Ident(String::new());
423 let mut ops = Vec::new();
424 let mut op_spans = Vec::new();
425
426 for p in inner {
427 match p.as_rule() {
428 Rule::type_annotation => {
429 type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
430 }
431 Rule::stream_expr => {
432 let (s, o, spans) = parse_stream_expr(p)?;
433 source = s;
434 ops = o;
435 op_spans = spans;
436 }
437 _ => {}
438 }
439 }
440
441 Ok(Stmt::StreamDecl {
442 name,
443 type_annotation,
444 source,
445 ops,
446 op_spans,
447 })
448}
449
450fn parse_stream_expr(
451 pair: pest::iterators::Pair<Rule>,
452) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
453 let mut inner = pair.into_inner();
454 let source = parse_stream_source(inner.expect_next("stream source")?)?;
455 let mut ops = Vec::new();
456 let mut op_spans = Vec::new();
457
458 for p in inner {
459 if p.as_rule() == Rule::stream_op {
460 let pest_span = p.as_span();
461 let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
462 ops.push(parse_stream_op(p)?);
463 op_spans.push(span);
464 }
465 }
466
467 Ok((source, ops, op_spans))
468}
469
470fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
475 let mut inner = pair.into_inner();
476 let name = inner.expect_next("pattern name")?.as_str().to_string();
477
478 let mut expr = SasePatternExpr::Event(String::new());
479 let mut within = None;
480 let mut partition_by = None;
481
482 for p in inner {
483 match p.as_rule() {
484 Rule::sase_pattern_expr => {
485 expr = parse_sase_pattern_expr(p)?;
486 }
487 Rule::pattern_within_clause => {
488 let dur_pair = p.into_inner().expect_next("within duration")?;
489 within = Some(Expr::Duration(
490 parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
491 ));
492 }
493 Rule::pattern_partition_clause => {
494 let key = p
495 .into_inner()
496 .expect_next("partition key")?
497 .as_str()
498 .to_string();
499 partition_by = Some(Expr::Ident(key));
500 }
501 _ => {}
502 }
503 }
504
505 Ok(Stmt::PatternDecl {
506 name,
507 expr,
508 within,
509 partition_by,
510 })
511}
512
513fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
514 let inner = pair.into_inner().expect_next("SASE pattern expression")?;
515 parse_sase_or_expr(inner)
516}
517
518fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
519 let mut inner = pair.into_inner();
520 let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
521
522 for right_pair in inner {
523 let right = parse_sase_and_expr(right_pair)?;
524 left = SasePatternExpr::Or(Box::new(left), Box::new(right));
525 }
526
527 Ok(left)
528}
529
530fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
531 let mut inner = pair.into_inner();
532 let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
533
534 for right_pair in inner {
535 let right = parse_sase_not_expr(right_pair)?;
536 left = SasePatternExpr::And(Box::new(left), Box::new(right));
537 }
538
539 Ok(left)
540}
541
542fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
543 let mut inner = pair.into_inner();
544 let first = inner.expect_next("NOT or primary expression")?;
545
546 if first.as_str() == "NOT" {
547 let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
548 Ok(SasePatternExpr::Not(Box::new(expr)))
549 } else {
550 parse_sase_primary_expr(first)
551 }
552}
553
554fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
555 let inner = pair.into_inner().expect_next("SASE primary expression")?;
556
557 match inner.as_rule() {
558 Rule::sase_seq_expr => parse_sase_seq_expr(inner),
559 Rule::sase_grouped_expr => {
560 let nested = inner.into_inner().expect_next("grouped expression")?;
561 let expr = parse_sase_pattern_expr(nested)?;
562 Ok(SasePatternExpr::Group(Box::new(expr)))
563 }
564 Rule::sase_event_ref => parse_sase_event_ref(inner),
565 _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
566 }
567}
568
569fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
570 let mut items = Vec::new();
571
572 for p in pair.into_inner() {
573 if p.as_rule() == Rule::sase_seq_items {
574 for item in p.into_inner() {
575 if item.as_rule() == Rule::sase_seq_item {
576 items.push(parse_sase_seq_item(item)?);
577 }
578 }
579 }
580 }
581
582 Ok(SasePatternExpr::Seq(items))
583}
584
585fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
586 let inner = pair.into_inner().expect_next("sequence item")?;
587
588 match inner.as_rule() {
589 Rule::sase_negated_item => parse_sase_item_inner(inner, true),
590 Rule::sase_positive_item => parse_sase_item_inner(inner, false),
591 _ => parse_sase_item_inner(inner, false),
592 }
593}
594
595fn parse_sase_item_inner(
596 pair: pest::iterators::Pair<Rule>,
597 _negated: bool,
598) -> ParseResult<SasePatternItem> {
599 let mut inner = pair.into_inner();
600 let event_type = inner.expect_next("event type")?.as_str().to_string();
601
602 let mut kleene = None;
603 let mut filter = None;
604 let mut alias = None;
605
606 for p in inner {
607 match p.as_rule() {
608 Rule::kleene_op => {
609 kleene = Some(match p.as_str() {
610 "+" => KleeneOp::Plus,
611 "*" => KleeneOp::Star,
612 "?" => KleeneOp::Optional,
613 _ => KleeneOp::Plus,
614 });
615 }
616 Rule::sase_where_clause => {
617 filter = Some(parse_expr(
618 p.into_inner().expect_next("filter expression")?,
619 )?);
620 }
621 Rule::sase_alias_clause => {
622 alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
623 }
624 _ => {}
625 }
626 }
627
628 let event_type = if _negated {
631 format!("!{event_type}")
632 } else {
633 event_type
634 };
635
636 Ok(SasePatternItem {
637 event_type,
638 alias,
639 kleene,
640 filter,
641 })
642}
643
644fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
645 let item = parse_sase_item_inner(pair, false)?;
647
648 if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
650 Ok(SasePatternExpr::Event(item.event_type))
651 } else {
652 Ok(SasePatternExpr::Seq(vec![item]))
654 }
655}
656
657fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
658 let inner = pair.into_inner().expect_next("stream source type")?;
659
660 match inner.as_rule() {
661 Rule::from_connector_source => {
662 let mut inner_iter = inner.into_inner();
663 let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
664 let connector_name = inner_iter
665 .expect_next("connector name")?
666 .as_str()
667 .to_string();
668 let mut params = Vec::new();
669 for p in inner_iter {
670 if p.as_rule() == Rule::connector_params {
671 params = parse_connector_params(p)?;
672 }
673 }
674 Ok(StreamSource::FromConnector {
675 event_type,
676 connector_name,
677 params,
678 })
679 }
680 Rule::merge_source => {
681 let mut streams = Vec::new();
682 for p in inner.into_inner() {
683 if p.as_rule() == Rule::inline_stream_list {
684 for is in p.into_inner() {
685 streams.push(parse_inline_stream(is)?);
686 }
687 }
688 }
689 Ok(StreamSource::Merge(streams))
690 }
691 Rule::join_source
692 | Rule::left_join_source
693 | Rule::right_join_source
694 | Rule::full_join_source => {
695 let join_type = match inner.as_rule() {
696 Rule::left_join_source => varpulis_core::ast::JoinType::Left,
697 Rule::right_join_source => varpulis_core::ast::JoinType::Right,
698 Rule::full_join_source => varpulis_core::ast::JoinType::Full,
699 _ => varpulis_core::ast::JoinType::Inner,
700 };
701 let mut clauses = Vec::new();
702 for p in inner.into_inner() {
703 if p.as_rule() == Rule::join_clause_list {
704 for jc in p.into_inner() {
705 clauses.push(parse_join_clause(jc, join_type)?);
706 }
707 }
708 }
709 Ok(StreamSource::Join(clauses))
710 }
711 Rule::sequence_source => {
712 let decl =
713 parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
714 Ok(StreamSource::Sequence(decl))
715 }
716 Rule::timer_source => {
717 let timer_args = inner.into_inner().expect_next("timer arguments")?;
718 let decl = parse_timer_decl(timer_args)?;
719 Ok(StreamSource::Timer(decl))
720 }
721 Rule::all_source => {
722 let mut inner_iter = inner.into_inner();
723 let name = inner_iter.expect_next("event name")?.as_str().to_string();
724 let alias = inner_iter.next().map(|p| p.as_str().to_string());
725 Ok(StreamSource::AllWithAlias { name, alias })
726 }
727 Rule::aliased_source => {
728 let mut inner_iter = inner.into_inner();
729 let name = inner_iter.expect_next("event name")?.as_str().to_string();
730 let alias = inner_iter.expect_next("alias")?.as_str().to_string();
731 Ok(StreamSource::IdentWithAlias { name, alias })
732 }
733 Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
734 _ => Err(ParseError::UnexpectedToken {
735 position: 0,
736 expected: "stream source".to_string(),
737 found: format!("{:?}", inner.as_rule()),
738 }),
739 }
740}
741
742fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
743 let mut inner = pair.into_inner();
744
745 let first = inner.expect_next("stream identifier")?;
747 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
748 let name = first.as_str().to_string();
749 return Ok(InlineStreamDecl {
750 name: name.clone(),
751 source: name,
752 filter: None,
753 });
754 }
755
756 let name = first.as_str().to_string();
757 let source = inner.expect_next("stream source")?.as_str().to_string();
758 let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
759
760 Ok(InlineStreamDecl {
761 name,
762 source,
763 filter,
764 })
765}
766
767fn parse_join_clause(
768 pair: pest::iterators::Pair<Rule>,
769 join_type: varpulis_core::ast::JoinType,
770) -> ParseResult<JoinClause> {
771 let mut inner = pair.into_inner();
772
773 let first = inner.expect_next("join clause identifier")?;
774 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
775 let name = first.as_str().to_string();
776 return Ok(JoinClause {
777 name: name.clone(),
778 source: name,
779 on: None,
780 join_type,
781 });
782 }
783
784 let name = first.as_str().to_string();
785 let source = inner.expect_next("join source")?.as_str().to_string();
786 let on = inner.next().map(|p| parse_expr(p)).transpose()?;
787
788 Ok(JoinClause {
789 name,
790 source,
791 on,
792 join_type,
793 })
794}
795
796fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
797 let mut steps = Vec::new();
798
799 for p in pair.into_inner() {
800 if p.as_rule() == Rule::sequence_step {
801 steps.push(parse_sequence_step(p)?);
802 }
803 }
804
805 Ok(SequenceDecl {
806 match_all: false,
807 timeout: None,
808 steps,
809 })
810}
811
812fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
813 let mut inner = pair.into_inner();
814 let alias = inner.expect_next("step alias")?.as_str().to_string();
815 let event_type = inner.expect_next("event type")?.as_str().to_string();
816
817 let mut filter = None;
818 let mut timeout = None;
819
820 for p in inner {
821 match p.as_rule() {
822 Rule::or_expr => filter = Some(parse_expr(p)?),
823 Rule::within_suffix => {
824 let expr = p.into_inner().expect_next("within duration")?;
825 timeout = Some(Box::new(parse_expr(expr)?));
826 }
827 _ => {}
828 }
829 }
830
831 Ok(SequenceStepDecl {
832 alias,
833 event_type,
834 filter,
835 timeout,
836 })
837}
838
839fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
840 let mut inner = pair.into_inner();
841
842 let interval = parse_expr(inner.expect_next("timer interval")?)?;
844
845 let mut initial_delay = None;
847 for p in inner {
848 if p.as_rule() == Rule::named_arg {
849 let arg = parse_named_arg(p)?;
850 if arg.name == "initial_delay" {
851 initial_delay = Some(Box::new(arg.value));
852 }
853 }
854 }
855
856 Ok(TimerDecl {
857 interval,
858 initial_delay,
859 })
860}
861
862fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
863 let inner = pair.into_inner().expect_next("stream operation")?;
864
865 match inner.as_rule() {
866 Rule::dot_op => {
867 let op_inner = inner.into_inner().expect_next("dot operation")?;
868 parse_dot_op(op_inner)
869 }
870 Rule::followed_by_op => parse_followed_by_op(inner),
871 _ => Err(ParseError::UnexpectedToken {
872 position: 0,
873 expected: "stream operation".to_string(),
874 found: format!("{:?}", inner.as_rule()),
875 }),
876 }
877}
878
879fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
880 match pair.as_rule() {
881 Rule::context_op => {
882 let name = pair
883 .into_inner()
884 .expect_next("context name")?
885 .as_str()
886 .to_string();
887 Ok(StreamOp::Context(name))
888 }
889 Rule::where_op => {
890 let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
891 Ok(StreamOp::Where(expr))
892 }
893 Rule::select_op => {
894 let mut items = Vec::new();
895 for p in pair.into_inner() {
896 if p.as_rule() == Rule::select_list {
897 for si in p.into_inner() {
898 items.push(parse_select_item(si)?);
899 }
900 }
901 }
902 Ok(StreamOp::Select(items))
903 }
904 Rule::window_op => {
905 let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
906 Ok(StreamOp::Window(args))
907 }
908 Rule::aggregate_op => {
909 let mut items = Vec::new();
910 for p in pair.into_inner() {
911 if p.as_rule() == Rule::agg_list {
912 for ai in p.into_inner() {
913 items.push(parse_agg_item(ai)?);
914 }
915 }
916 }
917 Ok(StreamOp::Aggregate(items))
918 }
919 Rule::having_op => {
920 let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
921 Ok(StreamOp::Having(expr))
922 }
923 Rule::map_op => {
924 let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
925 Ok(StreamOp::Map(expr))
926 }
927 Rule::filter_op => {
928 let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
929 Ok(StreamOp::Filter(expr))
930 }
931 Rule::within_op => {
932 let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
933 Ok(StreamOp::Within(expr))
934 }
935 Rule::emit_op => {
936 let mut output_type = None;
937 let mut fields = Vec::new();
938 let mut target_context = None;
939 for p in pair.into_inner() {
940 match p.as_rule() {
941 Rule::emit_type_cast => {
942 output_type = Some(
943 p.into_inner()
944 .expect_next("type name")?
945 .as_str()
946 .to_string(),
947 );
948 }
949 Rule::named_arg_list => {
950 for arg in p.into_inner() {
951 let parsed = parse_named_arg(arg)?;
952 if parsed.name == "context" {
954 if let Expr::Ident(ctx_name) = &parsed.value {
955 target_context = Some(ctx_name.clone());
956 continue;
957 }
958 }
959 fields.push(parsed);
960 }
961 }
962 _ => {}
963 }
964 }
965 Ok(StreamOp::Emit {
966 output_type,
967 fields,
968 target_context,
969 })
970 }
971 Rule::print_op => {
972 let exprs = pair
973 .into_inner()
974 .filter(|p| p.as_rule() == Rule::expr_list)
975 .flat_map(|p| p.into_inner())
976 .map(parse_expr)
977 .collect::<ParseResult<Vec<_>>>()?;
978 Ok(StreamOp::Print(exprs))
979 }
980 Rule::collect_op => Ok(StreamOp::Collect),
981 Rule::pattern_op => {
982 let def_pair = pair.into_inner().expect_next("pattern definition")?;
983 let mut inner = def_pair.into_inner();
984 let name = inner.expect_next("pattern name")?.as_str().to_string();
985 let body_pair = inner.expect_next("pattern body")?;
986
987 let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
989 let matcher = match body_inner.as_rule() {
990 Rule::lambda_expr => parse_lambda_expr(body_inner)?,
991 Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
992 _ => parse_expr_inner(body_inner)?,
993 };
994 Ok(StreamOp::Pattern(PatternDef { name, matcher }))
995 }
996 Rule::partition_by_op => {
997 let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
998 Ok(StreamOp::PartitionBy(expr))
999 }
1000 Rule::order_by_op => {
1001 let mut items = Vec::new();
1002 for p in pair.into_inner() {
1003 if p.as_rule() == Rule::order_list {
1004 for oi in p.into_inner() {
1005 items.push(parse_order_item(oi)?);
1006 }
1007 }
1008 }
1009 Ok(StreamOp::OrderBy(items))
1010 }
1011 Rule::limit_op => {
1012 let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1013 Ok(StreamOp::Limit(expr))
1014 }
1015 Rule::distinct_op => {
1016 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1017 Ok(StreamOp::Distinct(expr))
1018 }
1019 Rule::tap_op => {
1020 let args = pair
1021 .into_inner()
1022 .filter(|p| p.as_rule() == Rule::named_arg_list)
1023 .flat_map(|p| p.into_inner())
1024 .map(parse_named_arg)
1025 .collect::<ParseResult<Vec<_>>>()?;
1026 Ok(StreamOp::Tap(args))
1027 }
1028 Rule::log_op => {
1029 let args = pair
1030 .into_inner()
1031 .filter(|p| p.as_rule() == Rule::named_arg_list)
1032 .flat_map(|p| p.into_inner())
1033 .map(parse_named_arg)
1034 .collect::<ParseResult<Vec<_>>>()?;
1035 Ok(StreamOp::Log(args))
1036 }
1037 Rule::to_op => {
1038 let mut inner = pair.into_inner();
1039 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1040 let mut params = Vec::new();
1041 for p in inner {
1042 if p.as_rule() == Rule::connector_params {
1043 params = parse_connector_params(p)?;
1044 }
1045 }
1046 Ok(StreamOp::To {
1047 connector_name,
1048 params,
1049 })
1050 }
1051 Rule::process_op => {
1052 let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1053 Ok(StreamOp::Process(expr))
1054 }
1055 Rule::on_error_op => {
1056 let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1057 Ok(StreamOp::OnError(expr))
1058 }
1059 Rule::on_op => {
1060 let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1061 Ok(StreamOp::On(expr))
1062 }
1063 Rule::not_op => {
1064 let mut inner = pair.into_inner();
1065 let event_type = inner.expect_next("event type")?.as_str().to_string();
1066 let filter = inner.next().map(parse_expr).transpose()?;
1067 Ok(StreamOp::Not(FollowedByClause {
1068 event_type,
1069 filter,
1070 alias: None,
1071 match_all: false,
1072 }))
1073 }
1074 Rule::fork_op => {
1075 let mut paths = Vec::new();
1076 for p in pair.into_inner() {
1077 if p.as_rule() == Rule::fork_path_list {
1078 for fp in p.into_inner() {
1079 paths.push(parse_fork_path(fp)?);
1080 }
1081 }
1082 }
1083 Ok(StreamOp::Fork(paths))
1084 }
1085 Rule::any_op => {
1086 let count = pair
1087 .into_inner()
1088 .next()
1089 .map(|p| p.as_str().parse().unwrap_or(1));
1090 Ok(StreamOp::Any(count))
1091 }
1092 Rule::all_op => Ok(StreamOp::All),
1093 Rule::first_op => Ok(StreamOp::First),
1094 Rule::concurrent_op => {
1095 let args = pair
1096 .into_inner()
1097 .filter(|p| p.as_rule() == Rule::named_arg_list)
1098 .flat_map(|p| p.into_inner())
1099 .map(parse_named_arg)
1100 .collect::<ParseResult<Vec<_>>>()?;
1101 Ok(StreamOp::Concurrent(args))
1102 }
1103 Rule::watermark_op => {
1104 let args = pair
1105 .into_inner()
1106 .filter(|p| p.as_rule() == Rule::named_arg_list)
1107 .flat_map(|p| p.into_inner())
1108 .map(parse_named_arg)
1109 .collect::<ParseResult<Vec<_>>>()?;
1110 Ok(StreamOp::Watermark(args))
1111 }
1112 Rule::allowed_lateness_op => {
1113 let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1114 Ok(StreamOp::AllowedLateness(expr))
1115 }
1116 Rule::trend_aggregate_op => {
1117 let mut items = Vec::new();
1118 for p in pair.into_inner() {
1119 if p.as_rule() == Rule::trend_agg_list {
1120 for item_pair in p.into_inner() {
1121 if item_pair.as_rule() == Rule::trend_agg_item {
1122 let mut inner = item_pair.into_inner();
1123 let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1124 let func_pair = inner.expect_next("trend agg function")?;
1125 let mut func_inner = func_pair.into_inner();
1126 let func_name = func_inner
1127 .expect_next("function name")?
1128 .as_str()
1129 .to_string();
1130 let arg = func_inner.next().map(parse_expr).transpose()?;
1131 items.push(TrendAggItem {
1132 alias,
1133 func: func_name,
1134 arg,
1135 });
1136 }
1137 }
1138 }
1139 }
1140 Ok(StreamOp::TrendAggregate(items))
1141 }
1142 Rule::forecast_op => {
1143 let mut confidence = None;
1144 let mut horizon = None;
1145 let mut warmup = None;
1146 let mut max_depth = None;
1147 let mut hawkes = None;
1148 let mut conformal = None;
1149 let mut mode = None;
1150 for p in pair.into_inner() {
1151 if p.as_rule() == Rule::forecast_params {
1152 for param_pair in p.into_inner() {
1153 if param_pair.as_rule() == Rule::forecast_param {
1154 let mut inner = param_pair.into_inner();
1155 let name = inner.expect_next("forecast param name")?.as_str();
1156 let value_pair = inner.expect_next("forecast param value")?;
1157 let expr = parse_expr(value_pair)?;
1158 match name {
1159 "confidence" => confidence = Some(expr),
1160 "horizon" => horizon = Some(expr),
1161 "warmup" => warmup = Some(expr),
1162 "max_depth" => max_depth = Some(expr),
1163 "hawkes" => hawkes = Some(expr),
1164 "conformal" => conformal = Some(expr),
1165 "mode" => mode = Some(expr),
1166 _ => {}
1167 }
1168 }
1169 }
1170 }
1171 }
1172 Ok(StreamOp::Forecast(ForecastSpec {
1173 confidence,
1174 horizon,
1175 warmup,
1176 max_depth,
1177 hawkes,
1178 conformal,
1179 mode,
1180 }))
1181 }
1182 Rule::enrich_op => {
1183 let mut inner = pair.into_inner();
1184 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1185 let mut key_expr = None;
1186 let mut fields = Vec::new();
1187 let mut cache_ttl = None;
1188 let mut timeout = None;
1189 let mut fallback = None;
1190 for p in inner {
1191 if p.as_rule() == Rule::enrich_params {
1192 for param_pair in p.into_inner() {
1193 if param_pair.as_rule() == Rule::enrich_param {
1194 let param_inner =
1195 param_pair.into_inner().expect_next("enrich param")?;
1196 match param_inner.as_rule() {
1197 Rule::enrich_key_param => {
1198 let expr_pair =
1199 param_inner.into_inner().expect_next("key expression")?;
1200 key_expr = Some(parse_expr(expr_pair)?);
1201 }
1202 Rule::enrich_fields_param => {
1203 for field in param_inner.into_inner() {
1204 if field.as_rule() == Rule::identifier {
1205 fields.push(field.as_str().to_string());
1206 }
1207 }
1208 }
1209 Rule::enrich_cache_ttl_param => {
1210 let expr_pair = param_inner
1211 .into_inner()
1212 .expect_next("cache_ttl expression")?;
1213 cache_ttl = Some(parse_expr(expr_pair)?);
1214 }
1215 Rule::enrich_timeout_param => {
1216 let expr_pair = param_inner
1217 .into_inner()
1218 .expect_next("timeout expression")?;
1219 timeout = Some(parse_expr(expr_pair)?);
1220 }
1221 Rule::enrich_fallback_param => {
1222 let literal_pair =
1223 param_inner.into_inner().expect_next("fallback literal")?;
1224 fallback = Some(parse_expr(literal_pair)?);
1225 }
1226 _ => {}
1227 }
1228 }
1229 }
1230 }
1231 }
1232 let key = key_expr.ok_or_else(|| ParseError::Located {
1233 line: 0,
1234 column: 0,
1235 position: 0,
1236 message: ".enrich() requires a key: parameter".to_string(),
1237 hint: Some("add key: <expression> to .enrich()".to_string()),
1238 })?;
1239 Ok(StreamOp::Enrich(EnrichSpec {
1240 connector_name,
1241 key_expr: Box::new(key),
1242 fields,
1243 cache_ttl,
1244 timeout,
1245 fallback,
1246 }))
1247 }
1248 Rule::score_op => {
1249 let mut model_path = String::new();
1250 let mut inputs = Vec::new();
1251 let mut outputs = Vec::new();
1252 let mut gpu = false;
1253 let mut batch_size: usize = 1;
1254 let mut device_id: i32 = 0;
1255 for p in pair.into_inner() {
1256 if p.as_rule() == Rule::score_params {
1257 for param_pair in p.into_inner() {
1258 if param_pair.as_rule() == Rule::score_param {
1259 let mut inner = param_pair.into_inner();
1260 let name = inner.expect_next("score param name")?.as_str();
1261 let value_pair = inner.expect_next("score param value")?;
1262 match name {
1263 "model" => {
1264 let raw = value_pair.as_str();
1265 model_path = raw.trim_matches('"').to_string();
1266 }
1267 "inputs" => {
1268 if value_pair.as_rule() == Rule::score_field_list {
1269 for field in value_pair.into_inner() {
1270 if field.as_rule() == Rule::identifier {
1271 inputs.push(field.as_str().to_string());
1272 }
1273 }
1274 }
1275 }
1276 "outputs" => {
1277 if value_pair.as_rule() == Rule::score_field_list {
1278 for field in value_pair.into_inner() {
1279 if field.as_rule() == Rule::identifier {
1280 outputs.push(field.as_str().to_string());
1281 }
1282 }
1283 }
1284 }
1285 "gpu" => {
1286 gpu = value_pair.as_str() == "true";
1287 }
1288 "batch_size" => {
1289 batch_size = value_pair.as_str().parse().unwrap_or(1);
1290 }
1291 "device" | "device_id" => {
1292 device_id = value_pair.as_str().parse().unwrap_or(0);
1293 }
1294 _ => {}
1295 }
1296 }
1297 }
1298 }
1299 }
1300 Ok(StreamOp::Score(ScoreSpec {
1301 model_path,
1302 inputs,
1303 outputs,
1304 gpu,
1305 batch_size,
1306 device_id,
1307 }))
1308 }
1309 _ => Err(ParseError::UnexpectedToken {
1310 position: 0,
1311 expected: "stream operation".to_string(),
1312 found: format!("{:?}", pair.as_rule()),
1313 }),
1314 }
1315}
1316
1317fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1318 let mut inner = pair.into_inner();
1319 let expr = parse_expr(inner.expect_next("order expression")?)?;
1320 let desc = inner.next().is_some_and(|p| p.as_str() == "desc");
1321 Ok(OrderItem {
1322 expr,
1323 descending: desc,
1324 })
1325}
1326
1327fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1328 let mut inner = pair.into_inner();
1329 let name = inner.expect_next("fork path name")?.as_str().to_string();
1330 let mut ops = Vec::new();
1331 for p in inner {
1332 if p.as_rule() == Rule::stream_op {
1333 ops.push(parse_stream_op(p)?);
1334 }
1335 }
1336 Ok(ForkPath { name, ops })
1337}
1338
1339fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1340 let mut inner = pair.into_inner();
1341 let mut match_all = false;
1342
1343 let first = inner.expect_next("event type or match_all")?;
1344 let event_type = if first.as_rule() == Rule::match_all_keyword {
1345 match_all = true;
1346 inner.expect_next("event type")?.as_str().to_string()
1347 } else {
1348 first.as_str().to_string()
1349 };
1350
1351 let mut filter = None;
1352 let mut alias = None;
1353
1354 for p in inner {
1355 match p.as_rule() {
1356 Rule::or_expr => filter = Some(parse_or_expr(p)?),
1357 Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1358 Rule::identifier => alias = Some(p.as_str().to_string()),
1359 _ => {}
1360 }
1361 }
1362
1363 Ok(StreamOp::FollowedBy(FollowedByClause {
1364 event_type,
1365 filter,
1366 alias,
1367 match_all,
1368 }))
1369}
1370
1371fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1372 let mut inner = pair.into_inner();
1373 let first = inner.expect_next("select field or alias")?;
1374
1375 if let Some(second) = inner.next() {
1376 Ok(SelectItem::Alias(
1377 first.as_str().to_string(),
1378 parse_expr(second)?,
1379 ))
1380 } else {
1381 Ok(SelectItem::Field(first.as_str().to_string()))
1382 }
1383}
1384
1385fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1386 let raw = pair.as_str().trim();
1387 let is_session = raw.starts_with("session");
1388
1389 let mut inner = pair.into_inner();
1390
1391 if is_session {
1392 let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1394 return Ok(WindowArgs {
1395 duration: gap_expr.clone(),
1396 sliding: None,
1397 policy: None,
1398 session_gap: Some(gap_expr),
1399 });
1400 }
1401
1402 let duration = parse_expr(inner.expect_next("window duration")?)?;
1403
1404 let mut sliding = None;
1405 let mut policy = None;
1406
1407 for p in inner {
1408 if p.as_rule() == Rule::expr {
1409 if sliding.is_none() {
1411 sliding = Some(parse_expr(p)?);
1412 } else {
1413 policy = Some(parse_expr(p)?);
1414 }
1415 }
1416 }
1417
1418 Ok(WindowArgs {
1419 duration,
1420 sliding,
1421 policy,
1422 session_gap: None,
1423 })
1424}
1425
1426fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1427 let mut inner = pair.into_inner();
1428 let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1429 let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1430 Ok(AggItem { alias, expr })
1431}
1432
1433fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1434 let mut inner = pair.into_inner();
1435 let name = inner.expect_next("argument name")?.as_str().to_string();
1436 let value = parse_expr(inner.expect_next("argument value")?)?;
1437 Ok(NamedArg { name, value })
1438}
1439
1440fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1441 let mut inner = pair.into_inner();
1442 let name = inner.expect_next("event name")?.as_str().to_string();
1443
1444 let mut extends = None;
1445 let mut fields = Vec::new();
1446
1447 for p in inner {
1448 match p.as_rule() {
1449 Rule::identifier => extends = Some(p.as_str().to_string()),
1450 Rule::field => fields.push(parse_field(p)?),
1451 _ => {}
1452 }
1453 }
1454
1455 Ok(Stmt::EventDecl {
1456 name,
1457 extends,
1458 fields,
1459 })
1460}
1461
1462fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1463 let mut inner = pair.into_inner();
1464 let name = inner.expect_next("field name")?.as_str().to_string();
1465 let ty = parse_type(inner.expect_next("field type")?)?;
1466 let optional = inner.next().is_some();
1467 Ok(Field { name, ty, optional })
1468}
1469
1470fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1471 let mut inner = pair.into_inner();
1472 let name = inner.expect_next("type name")?.as_str().to_string();
1473 let ty = parse_type(inner.expect_next("type definition")?)?;
1474 Ok(Stmt::TypeDecl { name, ty })
1475}
1476
1477fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1478 let cloned = pair.clone();
1479 let inner = pair.into_inner().next().unwrap_or(cloned);
1480
1481 match inner.as_rule() {
1482 Rule::primitive_type => match inner.as_str() {
1483 "int" => Ok(Type::Int),
1484 "float" => Ok(Type::Float),
1485 "bool" => Ok(Type::Bool),
1486 "str" => Ok(Type::Str),
1487 "timestamp" => Ok(Type::Timestamp),
1488 "duration" => Ok(Type::Duration),
1489 _ => Ok(Type::Named(inner.as_str().to_string())),
1490 },
1491 Rule::array_type => {
1492 let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1493 Ok(Type::Array(Box::new(inner_type)))
1494 }
1495 Rule::map_type => {
1496 let mut inner_pairs = inner.into_inner();
1497 let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1498 let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1499 Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1500 }
1501 Rule::tuple_type => {
1502 let types: Vec<Type> = inner
1503 .into_inner()
1504 .map(parse_type)
1505 .collect::<ParseResult<Vec<_>>>()?;
1506 Ok(Type::Tuple(types))
1507 }
1508 Rule::stream_type => {
1509 let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1510 Ok(Type::Stream(Box::new(inner_type)))
1511 }
1512 Rule::optional_type => {
1513 let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1514 Ok(Type::Optional(Box::new(inner_type)))
1515 }
1516 Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1517 _ => Ok(Type::Named(inner.as_str().to_string())),
1518 }
1519}
1520
1521fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1522 let mut inner = pair.into_inner();
1523 let keyword = inner.expect_next("var_keyword")?.as_str();
1524 let mutable = keyword == "var";
1525 let name = inner.expect_next("variable name")?.as_str().to_string();
1526
1527 let mut ty = None;
1528 let mut value = Expr::Null;
1529
1530 for p in inner {
1531 match p.as_rule() {
1532 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1533 _ => value = parse_expr(p)?,
1534 }
1535 }
1536
1537 Ok(Stmt::VarDecl {
1538 mutable,
1539 name,
1540 ty,
1541 value,
1542 })
1543}
1544
1545fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1546 let mut inner = pair.into_inner();
1547 let name = inner.expect_next("constant name")?.as_str().to_string();
1548
1549 let mut ty = None;
1550 let mut value = Expr::Null;
1551
1552 for p in inner {
1553 match p.as_rule() {
1554 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1555 _ => value = parse_expr(p)?,
1556 }
1557 }
1558
1559 Ok(Stmt::ConstDecl { name, ty, value })
1560}
1561
1562fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1563 let mut inner = pair.into_inner();
1564 let name = inner.expect_next("function name")?.as_str().to_string();
1565
1566 let mut params = Vec::new();
1567 let mut ret = None;
1568 let mut body = Vec::new();
1569
1570 for p in inner {
1571 match p.as_rule() {
1572 Rule::param_list => {
1573 for param in p.into_inner() {
1574 params.push(parse_param(param)?);
1575 }
1576 }
1577 Rule::type_expr => ret = Some(parse_type(p)?),
1578 Rule::block => body = parse_block(p)?,
1579 Rule::statement => body.push(parse_statement(p)?),
1580 _ => {}
1581 }
1582 }
1583
1584 Ok(Stmt::FnDecl {
1585 name,
1586 params,
1587 ret,
1588 body,
1589 })
1590}
1591
1592fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1593 let mut statements = Vec::new();
1594 for p in pair.into_inner() {
1595 if p.as_rule() == Rule::statement {
1596 statements.push(parse_statement(p)?);
1597 }
1598 }
1599 Ok(statements)
1600}
1601
1602fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1603 let mut inner = pair.into_inner();
1604 let name = inner.expect_next("parameter name")?.as_str().to_string();
1605 let ty = parse_type(inner.expect_next("parameter type")?)?;
1606 Ok(Param { name, ty })
1607}
1608
1609fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1610 let mut inner = pair.into_inner();
1611 let first = inner.expect_next("config name or item")?;
1612
1613 let (name, items_start) = if first.as_rule() == Rule::identifier {
1615 (first.as_str().to_string(), None)
1616 } else {
1617 ("default".to_string(), Some(first))
1619 };
1620
1621 let mut items = Vec::new();
1622
1623 if let Some(first_item) = items_start {
1625 if first_item.as_rule() == Rule::config_item {
1626 items.push(parse_config_item(first_item)?);
1627 }
1628 }
1629
1630 for p in inner {
1631 if p.as_rule() == Rule::config_item {
1632 items.push(parse_config_item(p)?);
1633 }
1634 }
1635 Ok(Stmt::Config { name, items })
1636}
1637
1638fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1639 let mut inner = pair.into_inner();
1640 let key = inner.expect_next("config key")?.as_str().to_string();
1641 let value = parse_config_value(inner.expect_next("config value")?)?;
1642 Ok(ConfigItem::Value(key, value))
1643}
1644
1645fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1646 let cloned = pair.clone();
1647 let inner = pair.into_inner().next().unwrap_or(cloned);
1648
1649 match inner.as_rule() {
1650 Rule::config_array => {
1651 let values: Vec<ConfigValue> = inner
1652 .into_inner()
1653 .map(parse_config_value)
1654 .collect::<ParseResult<Vec<_>>>()?;
1655 Ok(ConfigValue::Array(values))
1656 }
1657 Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1658 Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1659 Rule::string => {
1660 let s = inner.as_str();
1661 Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1662 }
1663 Rule::duration => Ok(ConfigValue::Duration(
1664 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1665 )),
1666 Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1667 Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1668 _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1669 }
1670}
1671
1672fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1673 let mut inner = pair.into_inner();
1674 let path_pair = inner.expect_next("import path")?;
1675 let path = path_pair.as_str();
1676 let path = path[1..path.len() - 1].to_string();
1677 let alias = inner.next().map(|p| p.as_str().to_string());
1678 Ok(Stmt::Import { path, alias })
1679}
1680
1681fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1682 let mut inner = pair.into_inner();
1683 let cond = parse_expr(inner.expect_next("if condition")?)?;
1684
1685 let mut then_branch = Vec::new();
1686 let mut elif_branches = Vec::new();
1687 let mut else_branch = None;
1688
1689 for p in inner {
1690 match p.as_rule() {
1691 Rule::block => then_branch = parse_block(p)?,
1692 Rule::statement => then_branch.push(parse_statement(p)?),
1693 Rule::elif_clause => {
1694 let mut elif_inner = p.into_inner();
1695 let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1696 let mut elif_body = Vec::new();
1697 for ep in elif_inner {
1698 match ep.as_rule() {
1699 Rule::block => elif_body = parse_block(ep)?,
1700 Rule::statement => elif_body.push(parse_statement(ep)?),
1701 _ => {}
1702 }
1703 }
1704 elif_branches.push((elif_cond, elif_body));
1705 }
1706 Rule::else_clause => {
1707 let mut else_body = Vec::new();
1708 for ep in p.into_inner() {
1709 match ep.as_rule() {
1710 Rule::block => else_body = parse_block(ep)?,
1711 Rule::statement => else_body.push(parse_statement(ep)?),
1712 _ => {}
1713 }
1714 }
1715 else_branch = Some(else_body);
1716 }
1717 _ => {}
1718 }
1719 }
1720
1721 Ok(Stmt::If {
1722 cond,
1723 then_branch,
1724 elif_branches,
1725 else_branch,
1726 })
1727}
1728
1729fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1730 let mut inner = pair.into_inner();
1731 let var = inner.expect_next("loop variable")?.as_str().to_string();
1732 let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1733 let mut body = Vec::new();
1734 for p in inner {
1735 match p.as_rule() {
1736 Rule::block => body = parse_block(p)?,
1737 Rule::statement => body.push(parse_statement(p)?),
1738 _ => {}
1739 }
1740 }
1741 Ok(Stmt::For { var, iter, body })
1742}
1743
1744fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1745 let mut inner = pair.into_inner();
1746 let cond = parse_expr(inner.expect_next("while condition")?)?;
1747 let mut body = Vec::new();
1748 for p in inner {
1749 match p.as_rule() {
1750 Rule::block => body = parse_block(p)?,
1751 Rule::statement => body.push(parse_statement(p)?),
1752 _ => {}
1753 }
1754 }
1755 Ok(Stmt::While { cond, body })
1756}
1757
1758fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1759 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1760 Ok(Stmt::Return(expr))
1761}
1762
1763fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1764 let mut inner = pair.into_inner();
1765 let event_type = inner.expect_next("event type name")?.as_str().to_string();
1766 let mut fields = Vec::new();
1767 for p in inner {
1768 if p.as_rule() == Rule::named_arg_list {
1769 for arg in p.into_inner() {
1770 fields.push(parse_named_arg(arg)?);
1771 }
1772 }
1773 }
1774 Ok(Stmt::Emit { event_type, fields })
1775}
1776
1777fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1782 let inner = pair.into_inner().next();
1783
1784 match inner {
1785 Some(p) => parse_expr_inner(p),
1786 None => Ok(Expr::Null),
1787 }
1788}
1789
1790fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1791 match pair.as_rule() {
1792 Rule::expr => parse_expr(pair),
1793 Rule::lambda_expr => parse_lambda_expr(pair),
1794 Rule::range_expr => parse_range_expr(pair),
1795 Rule::or_expr => parse_or_expr(pair),
1796 Rule::and_expr => parse_and_expr(pair),
1797 Rule::not_expr => parse_not_expr(pair),
1798 Rule::comparison_expr => parse_comparison_expr(pair),
1799 Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1800 Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1801 Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1802 Rule::shift_expr => parse_shift_expr(pair),
1803 Rule::additive_expr => parse_additive_expr(pair),
1804 Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1805 Rule::power_expr => parse_power_expr(pair),
1806 Rule::unary_expr => parse_unary_expr(pair),
1807 Rule::postfix_expr => parse_postfix_expr(pair),
1808 Rule::primary_expr => parse_primary_expr(pair),
1809 Rule::literal => parse_literal(pair),
1810 Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1811 Rule::if_expr => parse_if_expr(pair),
1812 _ => Ok(Expr::Ident(pair.as_str().to_string())),
1813 }
1814}
1815
1816fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1817 let mut inner = pair.into_inner();
1818 let mut params = Vec::new();
1819
1820 let first = inner.expect_next("lambda parameters")?;
1822 match first.as_rule() {
1823 Rule::identifier_list => {
1824 for p in first.into_inner() {
1825 params.push(p.as_str().to_string());
1826 }
1827 }
1828 Rule::identifier => {
1829 params.push(first.as_str().to_string());
1830 }
1831 _ => {}
1832 }
1833
1834 let body_pair = inner.expect_next("lambda body")?;
1836 let body = match body_pair.as_rule() {
1837 Rule::lambda_block => parse_lambda_block(body_pair)?,
1838 _ => parse_expr_inner(body_pair)?,
1839 };
1840
1841 Ok(Expr::Lambda {
1842 params,
1843 body: Box::new(body),
1844 })
1845}
1846
1847fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1848 let mut stmts = Vec::new();
1849 let mut final_expr = None;
1850
1851 for p in pair.into_inner() {
1852 match p.as_rule() {
1853 Rule::statement => {
1854 let stmt = parse_statement(p)?;
1856 match &stmt.node {
1857 Stmt::VarDecl {
1858 mutable,
1859 name,
1860 ty,
1861 value,
1862 } => {
1863 stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
1864 }
1865 Stmt::Expr(e) => {
1866 final_expr = Some(e.clone());
1868 }
1869 _ => {
1870 }
1872 }
1873 }
1874 _ => {
1875 final_expr = Some(parse_expr_inner(p)?);
1877 }
1878 }
1879 }
1880
1881 if stmts.is_empty() {
1883 Ok(final_expr.unwrap_or(Expr::Null))
1884 } else {
1885 Ok(Expr::Block {
1886 stmts,
1887 result: Box::new(final_expr.unwrap_or(Expr::Null)),
1888 })
1889 }
1890}
1891
1892fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1893 let mut inner = pair.into_inner();
1896 let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
1897
1898 for right_pair in inner {
1899 let right = parse_pattern_and_as_expr(right_pair)?;
1900 left = Expr::Binary {
1901 op: BinOp::Or,
1902 left: Box::new(left),
1903 right: Box::new(right),
1904 };
1905 }
1906 Ok(left)
1907}
1908
1909fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1910 let mut inner = pair.into_inner();
1911 let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
1912
1913 for right_pair in inner {
1914 let right = parse_pattern_xor_as_expr(right_pair)?;
1915 left = Expr::Binary {
1916 op: BinOp::And,
1917 left: Box::new(left),
1918 right: Box::new(right),
1919 };
1920 }
1921 Ok(left)
1922}
1923
1924fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1925 let mut inner = pair.into_inner();
1926 let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
1927
1928 for right_pair in inner {
1929 let right = parse_pattern_unary_as_expr(right_pair)?;
1930 left = Expr::Binary {
1931 op: BinOp::Xor,
1932 left: Box::new(left),
1933 right: Box::new(right),
1934 };
1935 }
1936 Ok(left)
1937}
1938
1939fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1940 let mut inner = pair.into_inner();
1941 let first = inner.expect_next("unary expression or operand")?;
1942
1943 if first.as_str() == "not" {
1944 let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
1945 Ok(Expr::Unary {
1946 op: UnaryOp::Not,
1947 expr: Box::new(expr),
1948 })
1949 } else {
1950 parse_pattern_primary_as_expr(first)
1951 }
1952}
1953
1954fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1955 let inner = pair
1956 .into_inner()
1957 .expect_next("pattern primary expression")?;
1958
1959 match inner.as_rule() {
1960 Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
1961 Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
1962 _ => Ok(Expr::Ident(inner.as_str().to_string())),
1963 }
1964}
1965
1966fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1967 let mut inner = pair.into_inner();
1970 let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
1971
1972 for right_pair in inner {
1973 let right = Expr::Ident(right_pair.as_str().to_string());
1974 left = Expr::Binary {
1975 op: BinOp::FollowedBy,
1976 left: Box::new(left),
1977 right: Box::new(right),
1978 };
1979 }
1980 Ok(left)
1981}
1982
1983fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1984 let inner = pair.into_inner().expect_next("filter expression")?;
1985 parse_filter_or_expr(inner)
1986}
1987
1988fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1989 let mut inner = pair.into_inner();
1990 let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
1991
1992 for right_pair in inner {
1993 let right = parse_filter_and_expr(right_pair)?;
1994 left = Expr::Binary {
1995 op: BinOp::Or,
1996 left: Box::new(left),
1997 right: Box::new(right),
1998 };
1999 }
2000 Ok(left)
2001}
2002
2003fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2004 let mut inner = pair.into_inner();
2005 let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2006
2007 for right_pair in inner {
2008 let right = parse_filter_not_expr(right_pair)?;
2009 left = Expr::Binary {
2010 op: BinOp::And,
2011 left: Box::new(left),
2012 right: Box::new(right),
2013 };
2014 }
2015 Ok(left)
2016}
2017
2018fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2019 let mut inner = pair.into_inner();
2020 let first = inner.expect_next("not or expression")?;
2021
2022 if first.as_str() == "not" {
2023 let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2024 Ok(Expr::Unary {
2025 op: UnaryOp::Not,
2026 expr: Box::new(expr),
2027 })
2028 } else {
2029 parse_filter_comparison_expr(first)
2030 }
2031}
2032
2033fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2034 let mut inner = pair.into_inner();
2035 let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2036
2037 if let Some(op_pair) = inner.next() {
2038 let op = match op_pair.as_str() {
2039 "==" => BinOp::Eq,
2040 "!=" => BinOp::NotEq,
2041 "<" => BinOp::Lt,
2042 "<=" => BinOp::Le,
2043 ">" => BinOp::Gt,
2044 ">=" => BinOp::Ge,
2045 "in" => BinOp::In,
2046 "is" => BinOp::Is,
2047 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2048 _ => BinOp::Eq,
2049 };
2050 let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2051 Ok(Expr::Binary {
2052 op,
2053 left: Box::new(left),
2054 right: Box::new(right),
2055 })
2056 } else {
2057 Ok(left)
2058 }
2059}
2060
2061fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2062 let mut inner = pair.into_inner();
2063 let mut left =
2064 parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2065
2066 while let Some(op_pair) = inner.next() {
2067 let op = if op_pair.as_str() == "-" {
2068 BinOp::Sub
2069 } else {
2070 BinOp::Add
2071 };
2072 if let Some(right_pair) = inner.next() {
2073 let right = parse_filter_multiplicative_expr(right_pair)?;
2074 left = Expr::Binary {
2075 op,
2076 left: Box::new(left),
2077 right: Box::new(right),
2078 };
2079 }
2080 }
2081 Ok(left)
2082}
2083
2084fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2085 let mut inner = pair.into_inner();
2086 let mut left =
2087 parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2088
2089 while let Some(op_pair) = inner.next() {
2090 let op = match op_pair.as_str() {
2091 "*" => BinOp::Mul,
2092 "/" => BinOp::Div,
2093 "%" => BinOp::Mod,
2094 _ => BinOp::Mul,
2095 };
2096 if let Some(right_pair) = inner.next() {
2097 let right = parse_filter_unary_expr(right_pair)?;
2098 left = Expr::Binary {
2099 op,
2100 left: Box::new(left),
2101 right: Box::new(right),
2102 };
2103 }
2104 }
2105 Ok(left)
2106}
2107
2108fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2109 let mut inner = pair.into_inner();
2110 let first = inner.expect_next("unary operator or expression")?;
2111
2112 if first.as_rule() == Rule::filter_unary_op {
2114 let op_str = first.as_str();
2115 let expr =
2116 parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2117 let op = match op_str {
2118 "-" => UnaryOp::Neg,
2119 "~" => UnaryOp::BitNot,
2120 _ => unreachable!("Grammar only allows - or ~"),
2121 };
2122 Ok(Expr::Unary {
2123 op,
2124 expr: Box::new(expr),
2125 })
2126 } else {
2127 parse_filter_postfix_expr(first)
2128 }
2129}
2130
2131fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2132 let mut inner = pair.into_inner();
2133 let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2134
2135 for suffix in inner {
2136 expr = parse_filter_postfix_suffix(expr, suffix)?;
2137 }
2138 Ok(expr)
2139}
2140
2141fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2142 let mut inner = pair.into_inner();
2143
2144 if let Some(first) = inner.next() {
2145 match first.as_rule() {
2146 Rule::identifier => {
2147 Ok(Expr::Member {
2149 expr: Box::new(expr),
2150 member: first.as_str().to_string(),
2151 })
2152 }
2153 Rule::optional_member_access => {
2154 let member = first
2155 .into_inner()
2156 .expect_next("member name")?
2157 .as_str()
2158 .to_string();
2159 Ok(Expr::OptionalMember {
2160 expr: Box::new(expr),
2161 member,
2162 })
2163 }
2164 Rule::index_access => {
2165 let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2166 Ok(Expr::Index {
2167 expr: Box::new(expr),
2168 index: Box::new(index),
2169 })
2170 }
2171 Rule::call_args => {
2172 let args = first
2173 .into_inner()
2174 .filter(|p| p.as_rule() == Rule::arg_list)
2175 .flat_map(|p| p.into_inner())
2176 .map(parse_arg)
2177 .collect::<ParseResult<Vec<_>>>()?;
2178 Ok(Expr::Call {
2179 func: Box::new(expr),
2180 args,
2181 })
2182 }
2183 _ => Ok(expr),
2184 }
2185 } else {
2186 Ok(expr)
2187 }
2188}
2189
2190fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2191 let inner = pair.into_inner().expect_next("filter primary expression")?;
2192
2193 match inner.as_rule() {
2194 Rule::literal => parse_literal(inner),
2195 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2196 Rule::filter_expr => parse_filter_expr(inner),
2197 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2198 }
2199}
2200
2201fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2202 let mut inner = pair.into_inner();
2203 let left = parse_expr_inner(inner.expect_next("range start")?)?;
2204
2205 if let Some(op_pair) = inner.next() {
2206 let inclusive = op_pair.as_str() == "..=";
2207 let right = parse_expr_inner(inner.expect_next("range end")?)?;
2208 Ok(Expr::Range {
2209 start: Box::new(left),
2210 end: Box::new(right),
2211 inclusive,
2212 })
2213 } else {
2214 Ok(left)
2215 }
2216}
2217
2218fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2219 let mut inner = pair.into_inner();
2220 let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2221
2222 for right_pair in inner {
2223 let right = parse_expr_inner(right_pair)?;
2224 left = Expr::Binary {
2225 op: BinOp::Or,
2226 left: Box::new(left),
2227 right: Box::new(right),
2228 };
2229 }
2230
2231 Ok(left)
2232}
2233
2234fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2235 let mut inner = pair.into_inner();
2236 let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2237
2238 for right_pair in inner {
2239 let right = parse_expr_inner(right_pair)?;
2240 left = Expr::Binary {
2241 op: BinOp::And,
2242 left: Box::new(left),
2243 right: Box::new(right),
2244 };
2245 }
2246
2247 Ok(left)
2248}
2249
2250fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2251 let mut inner = pair.into_inner();
2252 let first = inner.expect_next("not keyword or expression")?;
2253
2254 if first.as_str() == "not" {
2255 let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2256 Ok(Expr::Unary {
2257 op: UnaryOp::Not,
2258 expr: Box::new(expr),
2259 })
2260 } else {
2261 parse_expr_inner(first)
2262 }
2263}
2264
2265fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2266 let mut inner = pair.into_inner();
2267 let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2268
2269 if let Some(op_pair) = inner.next() {
2270 let op_str = op_pair.as_str();
2271 let op = match op_str {
2272 "==" => BinOp::Eq,
2273 "!=" => BinOp::NotEq,
2274 "<" => BinOp::Lt,
2275 "<=" => BinOp::Le,
2276 ">" => BinOp::Gt,
2277 ">=" => BinOp::Ge,
2278 "in" => BinOp::In,
2279 "is" => BinOp::Is,
2280 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2281 _ => BinOp::Eq,
2282 };
2283 let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2284 Ok(Expr::Binary {
2285 op,
2286 left: Box::new(left),
2287 right: Box::new(right),
2288 })
2289 } else {
2290 Ok(left)
2291 }
2292}
2293
2294fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2295 parse_binary_chain(pair, BinOp::BitOr)
2296}
2297
2298fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2299 parse_binary_chain(pair, BinOp::BitXor)
2300}
2301
2302fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2303 parse_binary_chain(pair, BinOp::BitAnd)
2304}
2305
2306fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2307 let mut inner = pair.into_inner();
2308 let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2309
2310 while let Some(op_or_expr) = inner.next() {
2311 let op = match op_or_expr.as_str() {
2312 "<<" => BinOp::Shl,
2313 ">>" => BinOp::Shr,
2314 _ => {
2315 let right = parse_expr_inner(op_or_expr)?;
2316 left = Expr::Binary {
2317 op: BinOp::Shl,
2318 left: Box::new(left),
2319 right: Box::new(right),
2320 };
2321 continue;
2322 }
2323 };
2324 if let Some(right_pair) = inner.next() {
2325 let right = parse_expr_inner(right_pair)?;
2326 left = Expr::Binary {
2327 op,
2328 left: Box::new(left),
2329 right: Box::new(right),
2330 };
2331 }
2332 }
2333
2334 Ok(left)
2335}
2336
2337fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2338 let mut inner = pair.into_inner();
2339 let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2340
2341 while let Some(op_pair) = inner.next() {
2342 let op_text = op_pair.as_str();
2343 let op = if op_text == "-" {
2344 BinOp::Sub
2345 } else {
2346 BinOp::Add
2347 };
2348
2349 if let Some(right_pair) = inner.next() {
2350 let right = parse_expr_inner(right_pair)?;
2351 left = Expr::Binary {
2352 op,
2353 left: Box::new(left),
2354 right: Box::new(right),
2355 };
2356 }
2357 }
2358
2359 Ok(left)
2360}
2361
2362fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2363 let mut inner = pair.into_inner();
2364 let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2365
2366 while let Some(op_pair) = inner.next() {
2367 let op_text = op_pair.as_str();
2368 let op = match op_text {
2369 "*" => BinOp::Mul,
2370 "/" => BinOp::Div,
2371 "%" => BinOp::Mod,
2372 _ => BinOp::Mul,
2373 };
2374
2375 if let Some(right_pair) = inner.next() {
2376 let right = parse_expr_inner(right_pair)?;
2377 left = Expr::Binary {
2378 op,
2379 left: Box::new(left),
2380 right: Box::new(right),
2381 };
2382 }
2383 }
2384
2385 Ok(left)
2386}
2387
2388fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2389 let mut inner = pair.into_inner();
2390 let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2391
2392 if let Some(exp_pair) = inner.next() {
2393 let exp = parse_expr_inner(exp_pair)?;
2394 Ok(Expr::Binary {
2395 op: BinOp::Pow,
2396 left: Box::new(base),
2397 right: Box::new(exp),
2398 })
2399 } else {
2400 Ok(base)
2401 }
2402}
2403
2404fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2405 let mut inner = pair.into_inner();
2406 let first = inner.expect_next("unary operator or expression")?;
2407
2408 match first.as_rule() {
2410 Rule::unary_op => {
2411 let op_str = first.as_str();
2412 let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2413 let op = match op_str {
2414 "-" => UnaryOp::Neg,
2415 "~" => UnaryOp::BitNot,
2416 _ => unreachable!("Grammar only allows - or ~"),
2417 };
2418 Ok(Expr::Unary {
2419 op,
2420 expr: Box::new(expr),
2421 })
2422 }
2423 _ => parse_expr_inner(first),
2424 }
2425}
2426
2427fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2428 let mut inner = pair.into_inner();
2429 let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2430
2431 for suffix in inner {
2432 expr = parse_postfix_suffix(expr, suffix)?;
2433 }
2434
2435 Ok(expr)
2436}
2437
2438fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2439 let inner = pair.into_inner().expect_next("postfix suffix")?;
2440
2441 match inner.as_rule() {
2442 Rule::member_access => {
2443 let member = inner
2444 .into_inner()
2445 .expect_next("member name")?
2446 .as_str()
2447 .to_string();
2448 Ok(Expr::Member {
2449 expr: Box::new(expr),
2450 member,
2451 })
2452 }
2453 Rule::optional_member_access => {
2454 let member = inner
2455 .into_inner()
2456 .expect_next("member name")?
2457 .as_str()
2458 .to_string();
2459 Ok(Expr::OptionalMember {
2460 expr: Box::new(expr),
2461 member,
2462 })
2463 }
2464 Rule::slice_access => {
2465 let slice_range = inner.into_inner().expect_next("slice range")?;
2467 let slice_inner = slice_range.into_inner();
2468
2469 let mut start = None;
2470 let mut end = None;
2471
2472 for p in slice_inner {
2473 match p.as_rule() {
2474 Rule::slice_start => {
2475 start = Some(Box::new(parse_expr_inner(
2476 p.into_inner().expect_next("slice start expression")?,
2477 )?));
2478 }
2479 Rule::slice_end => {
2480 end = Some(Box::new(parse_expr_inner(
2481 p.into_inner().expect_next("slice end expression")?,
2482 )?));
2483 }
2484 _ => {}
2485 }
2486 }
2487
2488 Ok(Expr::Slice {
2489 expr: Box::new(expr),
2490 start,
2491 end,
2492 })
2493 }
2494 Rule::index_access => {
2495 let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2496 Ok(Expr::Index {
2497 expr: Box::new(expr),
2498 index: Box::new(index),
2499 })
2500 }
2501 Rule::call_args => {
2502 let mut args = Vec::new();
2503 for p in inner.into_inner() {
2504 if p.as_rule() == Rule::arg_list {
2505 for arg in p.into_inner() {
2506 args.push(parse_arg(arg)?);
2507 }
2508 }
2509 }
2510 Ok(Expr::Call {
2511 func: Box::new(expr),
2512 args,
2513 })
2514 }
2515 _ => Ok(expr),
2516 }
2517}
2518
2519fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2520 let mut inner = pair.into_inner();
2521 let first = inner.expect_next("argument")?;
2522
2523 if let Some(second) = inner.next() {
2524 Ok(Arg::Named(
2525 first.as_str().to_string(),
2526 parse_expr_inner(second)?,
2527 ))
2528 } else {
2529 Ok(Arg::Positional(parse_expr_inner(first)?))
2530 }
2531}
2532
2533fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2534 let inner = pair.into_inner().expect_next("primary expression")?;
2535
2536 match inner.as_rule() {
2537 Rule::if_expr => parse_if_expr(inner),
2538 Rule::literal => parse_literal(inner),
2539 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2540 Rule::array_literal => parse_array_literal(inner),
2541 Rule::map_literal => parse_map_literal(inner),
2542 Rule::expr => parse_expr(inner),
2543 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2544 }
2545}
2546
2547fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2548 let mut inner = pair.into_inner();
2549 let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2550 let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2551 let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2552
2553 Ok(Expr::If {
2554 cond: Box::new(cond),
2555 then_branch: Box::new(then_branch),
2556 else_branch: Box::new(else_branch),
2557 })
2558}
2559
2560fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2561 let inner = pair.into_inner().expect_next("literal value")?;
2562
2563 match inner.as_rule() {
2564 Rule::integer => inner
2565 .as_str()
2566 .parse::<i64>()
2567 .map(Expr::Int)
2568 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2569 Rule::float => inner
2570 .as_str()
2571 .parse::<f64>()
2572 .map(Expr::Float)
2573 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2574 Rule::string => {
2575 let s = inner.as_str();
2576 Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2577 }
2578 Rule::duration => Ok(Expr::Duration(
2579 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2580 )),
2581 Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2582 Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2583 Rule::null => Ok(Expr::Null),
2584 _ => Ok(Expr::Null),
2585 }
2586}
2587
2588fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2589 let mut items = Vec::new();
2590 for p in pair.into_inner() {
2591 if p.as_rule() == Rule::expr_list {
2592 for expr in p.into_inner() {
2593 items.push(parse_expr(expr)?);
2594 }
2595 }
2596 }
2597 Ok(Expr::Array(items))
2598}
2599
2600fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2601 let mut entries = Vec::new();
2602 for p in pair.into_inner() {
2603 if p.as_rule() == Rule::map_entry_list {
2604 for entry in p.into_inner() {
2605 let mut inner = entry.into_inner();
2606 let key = inner.expect_next("map key")?.as_str().to_string();
2607 let key = if key.starts_with('"') {
2608 key[1..key.len() - 1].to_string()
2609 } else {
2610 key
2611 };
2612 let value = parse_expr(inner.expect_next("map value")?)?;
2613 entries.push((key, value));
2614 }
2615 }
2616 }
2617 Ok(Expr::Map(entries))
2618}
2619
2620fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2621 let mut inner = pair.into_inner();
2622 let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2623
2624 for right_pair in inner {
2625 let right = parse_expr_inner(right_pair)?;
2626 left = Expr::Binary {
2627 op,
2628 left: Box::new(left),
2629 right: Box::new(right),
2630 };
2631 }
2632
2633 Ok(left)
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638 use super::*;
2639
2640 #[test]
2641 fn test_parse_simple_stream() {
2642 let result = parse("stream output = input");
2643 assert!(result.is_ok(), "Failed: {:?}", result.err());
2644 }
2645
2646 #[test]
2647 fn test_parse_stream_with_filter() {
2648 let result = parse("stream output = input.where(value > 100)");
2649 assert!(result.is_ok(), "Failed: {:?}", result.err());
2650 }
2651
2652 #[test]
2653 fn test_parse_stream_with_map() {
2654 let result = parse("stream output = input.map(x * 2)");
2655 assert!(result.is_ok(), "Failed: {:?}", result.err());
2656 }
2657
2658 #[test]
2659 fn test_parse_event_declaration() {
2660 let result = parse("event SensorReading:\n sensor_id: str\n value: float");
2661 assert!(result.is_ok(), "Failed: {:?}", result.err());
2662 }
2663
2664 #[test]
2665 fn test_parse_variable() {
2666 let result = parse("let x = 42");
2667 assert!(result.is_ok(), "Failed: {:?}", result.err());
2668 }
2669
2670 #[test]
2671 fn test_parse_function() {
2672 let result = parse("fn add(a: int, b: int) -> int:\n return a + b");
2673 assert!(result.is_ok(), "Failed: {:?}", result.err());
2674 }
2675
2676 #[test]
2677 fn test_parse_lambda() {
2678 let result = parse("let f = (x) => x * 2");
2679 assert!(result.is_ok(), "Failed: {:?}", result.err());
2680 }
2681
2682 #[test]
2683 fn test_parse_if_expression() {
2684 let result = parse("let x = if a > b then a else b");
2685 assert!(result.is_ok(), "Failed: {:?}", result.err());
2686 }
2687
2688 #[test]
2689 fn test_parse_followed_by() {
2690 let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2691 assert!(result.is_ok(), "Failed: {:?}", result.err());
2692 }
2693
2694 #[test]
2695 fn test_parse_window() {
2696 let result = parse("stream windowed = input.window(5s)");
2697 assert!(result.is_ok(), "Failed: {:?}", result.err());
2698 }
2699
2700 #[test]
2701 fn test_parse_aggregate() {
2702 let result =
2703 parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2704 assert!(result.is_ok(), "Failed: {:?}", result.err());
2705 }
2706
2707 #[test]
2708 fn test_parse_merge() {
2709 let result = parse("stream combined = merge(stream1, stream2)");
2710 assert!(result.is_ok(), "Failed: {:?}", result.err());
2711 }
2712
2713 #[test]
2714 fn test_parse_sequence() {
2715 let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2716 assert!(result.is_ok(), "Failed: {:?}", result.err());
2717 }
2718
2719 #[test]
2720 fn test_parse_config() {
2721 let result = parse("config:\n window_size: 5s\n batch_size: 100");
2722 assert!(result.is_ok(), "Failed: {:?}", result.err());
2723 }
2724
2725 #[test]
2726 fn test_parse_complex_expression() {
2727 let result = parse("let x = (a + b) * c / d - e");
2728 assert!(result.is_ok(), "Failed: {:?}", result.err());
2729 }
2730
2731 #[test]
2732 fn test_parse_sliding_window() {
2733 let result = parse("stream output = input.window(5m, sliding: 1m)");
2734 assert!(result.is_ok(), "Failed: {:?}", result.err());
2735 }
2736
2737 #[test]
2738 fn test_parse_fork_construct() {
2739 let result =
2740 parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2741 assert!(result.is_ok(), "Failed: {:?}", result.err());
2742 }
2743
2744 #[test]
2745 fn test_parse_aggregate_functions() {
2746 let result = parse(
2747 "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2748 );
2749 assert!(result.is_ok(), "Failed: {:?}", result.err());
2750 }
2751
2752 #[test]
2753 fn test_parse_complex_parentheses() {
2754 let result = parse("let x = ((a + b) * (c - d)) / e");
2755 assert!(result.is_ok(), "Failed: {:?}", result.err());
2756 }
2757
2758 #[test]
2759 fn test_parse_sequence_with_alias() {
2760 let result = parse(
2761 r#"
2762 stream TwoTicks = StockTick as first
2763 -> StockTick as second
2764 .emit(result: "two_ticks")
2765 "#,
2766 );
2767 assert!(result.is_ok(), "Failed: {:?}", result.err());
2768 }
2769
2770 #[test]
2771 fn test_parse_followed_by_with_alias() {
2772 let result = parse("stream alerts = Order as a -> Payment as b");
2773 assert!(result.is_ok(), "Failed: {:?}", result.err());
2774 }
2775
2776 #[test]
2777 fn test_parse_followed_by_with_filter_and_alias() {
2778 let result = parse(
2780 r#"
2781 stream Test = A as a
2782 -> B where value == a.base + 10 as b
2783 .emit(status: "matched")
2784 "#,
2785 );
2786 assert!(result.is_ok(), "Failed: {:?}", result.err());
2787 }
2788
2789 #[test]
2790 fn test_parse_pattern_with_lambda() {
2791 let result =
2792 parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2793 assert!(result.is_ok(), "Failed: {:?}", result.err());
2794 }
2795
2796 #[test]
2797 fn test_parse_sase_pattern_decl_simple() {
2798 let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2799 assert!(result.is_ok(), "Failed: {:?}", result.err());
2800 }
2801
2802 #[test]
2803 fn test_parse_sase_pattern_decl_with_kleene() {
2804 let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2805 assert!(result.is_ok(), "Failed: {:?}", result.err());
2806 }
2807
2808 #[test]
2809 fn test_parse_sase_pattern_decl_with_alias() {
2810 let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2811 assert!(result.is_ok(), "Failed: {:?}", result.err());
2812 }
2813
2814 #[test]
2815 fn test_parse_sase_pattern_decl_with_within() {
2816 let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2817 assert!(result.is_ok(), "Failed: {:?}", result.err());
2818 }
2819
2820 #[test]
2821 fn test_parse_sase_pattern_decl_with_partition() {
2822 let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2823 assert!(result.is_ok(), "Failed: {:?}", result.err());
2824 }
2825
2826 #[test]
2827 fn test_parse_sase_pattern_decl_full() {
2828 let result = parse(
2830 "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2831 );
2832 assert!(result.is_ok(), "Failed: {:?}", result.err());
2833 }
2834
2835 #[test]
2836 fn test_parse_sase_pattern_decl_or() {
2837 let result = parse("pattern AlertOrWarn = Login OR Logout");
2838 assert!(result.is_ok(), "Failed: {:?}", result.err());
2839 }
2840
2841 #[test]
2842 fn test_parse_sase_pattern_decl_and() {
2843 let result = parse("pattern BothEvents = Login AND Transaction");
2844 assert!(result.is_ok(), "Failed: {:?}", result.err());
2845 }
2846
2847 #[test]
2848 fn test_parse_sase_pattern_decl_not() {
2849 let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
2850 assert!(result.is_ok(), "Failed: {:?}", result.err());
2851 }
2852
2853 #[test]
2854 fn test_parse_having() {
2855 let result = parse(
2856 "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
2857 );
2858 assert!(result.is_ok(), "Failed: {:?}", result.err());
2859 }
2860
2861 #[test]
2862 fn test_parse_having_with_partition() {
2863 let result = parse(
2864 "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
2865 );
2866 assert!(result.is_ok(), "Failed: {:?}", result.err());
2867 }
2868
2869 #[test]
2870 fn test_parse_timer_source() {
2871 let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
2872 assert!(result.is_ok(), "Failed: {:?}", result.err());
2873 }
2874
2875 #[test]
2876 fn test_parse_timer_source_with_initial_delay() {
2877 let result =
2878 parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
2879 assert!(result.is_ok(), "Failed: {:?}", result.err());
2880 }
2881
2882 #[test]
2883 fn test_parse_var_decl() {
2884 let result = parse("var threshold: float = 10.0");
2885 assert!(result.is_ok(), "Failed: {:?}", result.err());
2886 }
2887
2888 #[test]
2889 fn test_parse_let_decl() {
2890 let result = parse("let max_count: int = 100");
2891 assert!(result.is_ok(), "Failed: {:?}", result.err());
2892 }
2893
2894 #[test]
2895 fn test_parse_assignment() {
2896 let result = parse("threshold := threshold + 10.0");
2897 assert!(result.is_ok(), "Failed: {:?}", result.err());
2898 }
2899
2900 #[test]
2901 fn test_parse_assignment_with_expression() {
2902 let result = parse("count := count * 2 + offset");
2903 assert!(result.is_ok(), "Failed: {:?}", result.err());
2904 }
2905
2906 #[test]
2907 fn test_parse_nested_stream_reference() {
2908 let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
2909 assert!(result.is_ok(), "Failed: {:?}", result.err());
2910 }
2911
2912 #[test]
2913 fn test_parse_multi_stage_pipeline() {
2914 let result = parse(
2915 "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
2916 );
2917 assert!(result.is_ok(), "Failed: {:?}", result.err());
2918 }
2919
2920 #[test]
2921 fn test_parse_stream_with_operations_chain() {
2922 let result = parse(
2923 "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
2924 );
2925 assert!(result.is_ok(), "Failed: {:?}", result.err());
2926 }
2927
2928 #[test]
2933 fn test_parse_connector_mqtt() {
2934 let result = parse(
2935 r#"connector MqttBroker = mqtt (
2936 host: "localhost",
2937 port: 1883,
2938 client_id: "varpulis"
2939 )"#,
2940 );
2941 assert!(result.is_ok(), "Failed: {:?}", result.err());
2942 }
2943
2944 #[test]
2945 fn test_parse_connector_kafka() {
2946 let result = parse(
2947 r#"connector KafkaCluster = kafka (
2948 brokers: ["kafka1:9092", "kafka2:9092"],
2949 group_id: "my-group"
2950 )"#,
2951 );
2952 assert!(result.is_ok(), "Failed: {:?}", result.err());
2953 }
2954
2955 #[test]
2956 fn test_parse_connector_http() {
2957 let result = parse(
2958 r#"connector ApiEndpoint = http (
2959 base_url: "https://api.example.com"
2960 )"#,
2961 );
2962 assert!(result.is_ok(), "Failed: {:?}", result.err());
2963 }
2964
2965 #[test]
2966 fn test_parse_stream_with_from_connector() {
2967 let result = parse(
2968 r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
2969 );
2970 assert!(result.is_ok(), "Failed: {:?}", result.err());
2971 }
2972
2973 #[test]
2974 fn test_parse_stream_with_from_and_operations() {
2975 let result = parse(
2976 r#"stream HighTemp = TemperatureReading
2977 .from(MqttSensors, topic: "sensors/#")
2978 .where(value > 30)
2979 .emit(alert: "high_temp")"#,
2980 );
2981 assert!(result.is_ok(), "Failed: {:?}", result.err());
2982 }
2983
2984 #[test]
2985 fn test_parse_full_connectivity_pipeline() {
2986 let result = parse(
2987 r#"
2988 connector MqttSensors = mqtt (host: "localhost", port: 1883)
2989 connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
2990
2991 event TemperatureReading:
2992 sensor_id: str
2993 value: float
2994 ts: timestamp
2995
2996 stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
2997
2998 stream HighTempAlert = Temperatures
2999 .where(value > 30)
3000 .emit(alert_type: "HIGH_TEMP", temperature: value)
3001
3002 "#,
3003 );
3004 assert!(result.is_ok(), "Failed: {:?}", result.err());
3005 }
3006
3007 #[test]
3008 fn test_parse_emit_as_type() {
3009 let result = parse(
3010 r#"stream Alerts = Temperatures
3011 .where(value > 30)
3012 .emit as AlertEvent(severity: "high", temp: value)"#,
3013 );
3014 assert!(result.is_ok(), "Failed: {:?}", result.err());
3015 }
3016
3017 #[test]
3018 fn test_parse_stream_with_to_connector() {
3019 let result = parse(
3020 r#"stream Output = Input
3021 .where(x > 0)
3022 .emit(y: x * 2)
3023 .to(KafkaOutput, topic: "output")"#,
3024 );
3025 assert!(result.is_ok(), "Failed: {:?}", result.err());
3026 }
3027
3028 #[test]
3029 fn test_emit_stmt_parses() {
3030 let result = parse(
3031 r"fn test():
3032 emit Pixel(x: 1, y: 2)",
3033 );
3034 assert!(result.is_ok(), "Failed: {:?}", result.err());
3035 let program = result.unwrap();
3036 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3038 match &body[0].node {
3039 Stmt::Emit { event_type, fields } => {
3040 assert_eq!(event_type, "Pixel");
3041 assert_eq!(fields.len(), 2);
3042 assert_eq!(fields[0].name, "x");
3043 assert_eq!(fields[1].name, "y");
3044 }
3045 other => panic!("Expected Stmt::Emit, got {other:?}"),
3046 }
3047 } else {
3048 panic!("Expected FnDecl");
3049 }
3050 }
3051
3052 #[test]
3053 fn test_emit_stmt_no_args() {
3054 let result = parse(
3055 r"fn test():
3056 emit Done()",
3057 );
3058 assert!(result.is_ok(), "Failed: {:?}", result.err());
3059 let program = result.unwrap();
3060 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3061 match &body[0].node {
3062 Stmt::Emit { event_type, fields } => {
3063 assert_eq!(event_type, "Done");
3064 assert!(fields.is_empty());
3065 }
3066 other => panic!("Expected Stmt::Emit, got {other:?}"),
3067 }
3068 } else {
3069 panic!("Expected FnDecl");
3070 }
3071 }
3072
3073 #[test]
3074 fn test_emit_in_function_with_for_loop() {
3075 let result = parse(
3076 r"fn generate(n: int):
3077 for i in 0..n:
3078 emit Item(index: i, value: i * 2)",
3079 );
3080 assert!(result.is_ok(), "Failed: {:?}", result.err());
3081 }
3082
3083 #[test]
3084 fn test_parse_process_op() {
3085 let result = parse(
3086 r"fn do_work():
3087 emit Result(v: 42)
3088
3089stream S = timer(1s).process(do_work())",
3090 );
3091 assert!(result.is_ok(), "Failed: {:?}", result.err());
3092 }
3093
3094 #[test]
3095 fn test_parse_trend_aggregate_count_trends() {
3096 let result = parse(
3097 r#"stream S = StockTick as first
3098 -> all StockTick where price > first.price as rising
3099 -> StockTick where price < rising.price as drop
3100 .within(60s)
3101 .trend_aggregate(count: count_trends())
3102 .emit(event_type: "TrendStats", trends: count)"#,
3103 );
3104 assert!(result.is_ok(), "Failed: {:?}", result.err());
3105 let program = result.unwrap();
3106 for stmt in &program.statements {
3108 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3109 let has_trend_agg = ops
3110 .iter()
3111 .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3112 assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3113 for op in ops {
3115 if let StreamOp::TrendAggregate(items) = op {
3116 assert_eq!(items.len(), 1);
3117 assert_eq!(items[0].alias, "count");
3118 assert_eq!(items[0].func, "count_trends");
3119 assert!(items[0].arg.is_none());
3120 }
3121 }
3122 return;
3123 }
3124 }
3125 panic!("No stream declaration found");
3126 }
3127
3128 #[test]
3129 fn test_parse_trend_aggregate_multiple_items() {
3130 let result = parse(
3131 r"stream S = StockTick as first
3132 -> all StockTick as rising
3133 .within(60s)
3134 .trend_aggregate(
3135 trend_count: count_trends(),
3136 event_count: count_events(rising)
3137 )
3138 .emit(trends: trend_count, events: event_count)",
3139 );
3140 assert!(result.is_ok(), "Failed: {:?}", result.err());
3141 let program = result.unwrap();
3142 for stmt in &program.statements {
3143 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3144 for op in ops {
3145 if let StreamOp::TrendAggregate(items) = op {
3146 assert_eq!(items.len(), 2);
3147 assert_eq!(items[0].alias, "trend_count");
3148 assert_eq!(items[0].func, "count_trends");
3149 assert_eq!(items[1].alias, "event_count");
3150 assert_eq!(items[1].func, "count_events");
3151 assert!(items[1].arg.is_some());
3152 return;
3153 }
3154 }
3155 }
3156 }
3157 panic!("No TrendAggregate found");
3158 }
3159
3160 #[test]
3161 fn test_parse_score_basic() {
3162 let result = parse(
3163 r#"stream S = TradeEvent
3164 .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3165 );
3166 assert!(result.is_ok(), "Failed: {:?}", result.err());
3167 let program = result.unwrap();
3168 for stmt in &program.statements {
3169 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3170 for op in ops {
3171 if let StreamOp::Score(spec) = op {
3172 assert_eq!(spec.model_path, "models/fraud.onnx");
3173 assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3174 assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3175 return;
3176 }
3177 }
3178 }
3179 }
3180 panic!("No Score op found");
3181 }
3182
3183 #[test]
3184 fn test_parse_score_single_field() {
3185 let result = parse(
3186 r#"stream S = Event
3187 .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3188 );
3189 assert!(result.is_ok(), "Failed: {:?}", result.err());
3190 let program = result.unwrap();
3191 for stmt in &program.statements {
3192 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3193 for op in ops {
3194 if let StreamOp::Score(spec) = op {
3195 assert_eq!(spec.model_path, "model.onnx");
3196 assert_eq!(spec.inputs, vec!["value"]);
3197 assert_eq!(spec.outputs, vec!["prediction"]);
3198 return;
3199 }
3200 }
3201 }
3202 }
3203 panic!("No Score op found");
3204 }
3205
3206 #[test]
3211 fn fuzz_regression_unmatched_brackets_timeout() {
3212 let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3214 let start = std::time::Instant::now();
3215 let result = parse(input);
3216 let elapsed = start.elapsed();
3217 assert!(
3218 result.is_err(),
3219 "Should reject deeply nested unmatched brackets"
3220 );
3221 assert!(
3222 elapsed.as_millis() < 100,
3223 "Parser should reject fast, took {elapsed:?}"
3224 );
3225 }
3226
3227 #[test]
3228 fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3229 let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3231 let start = std::time::Instant::now();
3232 let result = parse(input);
3233 let elapsed = start.elapsed();
3234 assert!(result.is_err(), "Should reject deeply nested brackets");
3235 assert!(
3236 elapsed.as_millis() < 100,
3237 "Parser should reject fast, took {elapsed:?}"
3238 );
3239 }
3240
3241 #[test]
3242 fn nesting_depth_allows_reasonable_programs() {
3243 let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3245 let result = parse(input);
3246 if let Err(ref e) = result {
3249 let msg = format!("{e}");
3250 assert!(
3251 !msg.contains("Nesting depth"),
3252 "Should allow 10 levels of nesting: {msg}"
3253 );
3254 }
3255 }
3256
3257 #[test]
3258 fn nesting_depth_ignores_brackets_in_comments() {
3259 let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3261 let result = parse(input);
3262 assert!(
3263 result.is_ok(),
3264 "Brackets in comments should be ignored: {:?}",
3265 result.err()
3266 );
3267 }
3268
3269 #[test]
3270 fn nesting_depth_ignores_brackets_in_strings() {
3271 let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3273 let result = parse(input);
3274 if let Err(ref e) = result {
3275 let msg = format!("{e}");
3276 assert!(
3277 !msg.contains("Nesting depth"),
3278 "Brackets in strings should be ignored: {msg}"
3279 );
3280 }
3281 }
3282}