1use pest::Parser;
9use pest_derive::Parser;
10use varpulis_core::ast::*;
11use varpulis_core::span::{Span, Spanned};
12use varpulis_core::types::Type;
13
14use crate::error::{ParseError, ParseResult};
15use crate::helpers::{parse_duration, parse_timestamp};
16use crate::indent::preprocess_indentation;
17
18trait IteratorExt<'a> {
20 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
22}
23
24impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
25 fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
26 self.next().ok_or_else(|| ParseError::Located {
27 line: 0,
28 column: 0,
29 position: 0,
30 message: format!("Expected {expected}"),
31 hint: None,
32 })
33 }
34}
35
36#[derive(Debug, Parser)]
41#[grammar = "varpulis.pest"]
42pub struct VarpulisParser;
43
44const PARSE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
52
53pub fn parse(source: &str) -> ParseResult<Program> {
59 let source = source.to_string();
60 let handle = std::thread::Builder::new()
61 .stack_size(16 * 1024 * 1024)
62 .spawn(move || parse_inner(&source))
63 .map_err(|e| ParseError::InvalidToken {
64 position: 0,
65 message: format!("Failed to spawn parser thread: {e}"),
66 })?;
67
68 let deadline = std::time::Instant::now() + PARSE_TIMEOUT;
70 loop {
71 if handle.is_finished() {
72 return handle.join().unwrap_or_else(|_| {
73 Err(ParseError::InvalidToken {
74 position: 0,
75 message: "Parser stack overflow on deeply nested input".to_string(),
76 })
77 });
78 }
79 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
80 if remaining.is_zero() {
81 return Err(ParseError::InvalidToken {
82 position: 0,
83 message: format!(
84 "Parser timed out after {}s on pathological input",
85 PARSE_TIMEOUT.as_secs()
86 ),
87 });
88 }
89 std::thread::sleep(std::time::Duration::from_millis(5).min(remaining));
90 }
91}
92
93const MAX_NESTING_DEPTH: usize = 10;
102
103const MAX_UNMATCHED_OPEN_BRACKETS: usize = 6;
111
112fn check_nesting_depth(source: &str) -> ParseResult<()> {
117 let mut depth: usize = 0;
118 let mut max_depth: usize = 0;
119 let mut max_depth_pos: usize = 0;
120 let bytes = source.as_bytes();
121 let len = bytes.len();
122 let mut i = 0;
123
124 let mut paren_open: usize = 0;
126 let mut paren_close: usize = 0;
127 let mut square_open: usize = 0;
128 let mut square_close: usize = 0;
129 let mut curly_open: usize = 0;
130 let mut curly_close: usize = 0;
131
132 while i < len {
133 let b = bytes[i];
134
135 if b == b'"' {
137 i += 1;
138 while i < len {
139 if bytes[i] == b'\\' {
140 i += 2; continue;
142 }
143 if bytes[i] == b'"' {
144 i += 1;
145 break;
146 }
147 i += 1;
148 }
149 continue;
150 }
151
152 if b == b'#' {
154 i += 1;
155 while i < len && bytes[i] != b'\n' {
156 i += 1;
157 }
158 continue;
159 }
160
161 if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
163 i += 2;
164 while i + 1 < len {
165 if bytes[i] == b'*' && bytes[i + 1] == b'/' {
166 i += 2;
167 break;
168 }
169 i += 1;
170 }
171 continue;
172 }
173
174 match b {
176 b'(' => {
177 depth += 1;
178 paren_open += 1;
179 }
180 b'[' => {
181 depth += 1;
182 square_open += 1;
183 }
184 b'{' => {
185 depth += 1;
186 curly_open += 1;
187 }
188 b')' => {
189 depth = depth.saturating_sub(1);
190 paren_close += 1;
191 }
192 b']' => {
193 depth = depth.saturating_sub(1);
194 square_close += 1;
195 }
196 b'}' => {
197 depth = depth.saturating_sub(1);
198 curly_close += 1;
199 }
200 _ => {}
201 }
202
203 if depth > max_depth {
204 max_depth = depth;
205 max_depth_pos = i;
206 }
207
208 if max_depth > MAX_NESTING_DEPTH {
209 return Err(ParseError::InvalidToken {
210 position: max_depth_pos,
211 message: format!("Nesting depth exceeds maximum of {MAX_NESTING_DEPTH} levels"),
212 });
213 }
214
215 i += 1;
216 }
217
218 let unmatched = paren_open.saturating_sub(paren_close)
222 + square_open.saturating_sub(square_close)
223 + curly_open.saturating_sub(curly_close);
224 if unmatched > MAX_UNMATCHED_OPEN_BRACKETS {
225 return Err(ParseError::InvalidToken {
226 position: max_depth_pos,
227 message: format!(
228 "Too many unmatched open brackets ({unmatched}); maximum is {MAX_UNMATCHED_OPEN_BRACKETS}"
229 ),
230 });
231 }
232
233 Ok(())
234}
235
236fn parse_inner(source: &str) -> ParseResult<Program> {
237 let expanded =
239 crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
240 position: 0,
241 message: e,
242 })?;
243 let preprocessed = preprocess_indentation(&expanded);
245
246 check_nesting_depth(&preprocessed)?;
248
249 let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
250
251 let mut statements = Vec::new();
252
253 for pair in pairs {
254 if pair.as_rule() == Rule::program {
255 for inner in pair.into_inner() {
256 if inner.as_rule() == Rule::statement {
257 statements.push(parse_statement(inner)?);
258 }
259 }
260 }
261 }
262
263 Ok(crate::optimize::fold_program(Program { statements }))
264}
265
266fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
267 let position = match e.location {
268 pest::error::InputLocation::Pos(p) => p,
269 pest::error::InputLocation::Span((s, _)) => s,
270 };
271
272 let (line, column) = match e.line_col {
274 pest::error::LineColLocation::Pos((l, c)) => (l, c),
275 pest::error::LineColLocation::Span((l, c), _) => (l, c),
276 };
277
278 let (message, hint) = match &e.variant {
280 pest::error::ErrorVariant::ParsingError {
281 positives,
282 negatives: _,
283 } => {
284 if positives.is_empty() {
285 ("Unexpected token".to_string(), None)
286 } else if is_stream_op_error(positives) {
287 (
289 "unknown stream operation".to_string(),
290 Some(
291 "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
292 .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
293 .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
294 .to_string(),
295 ),
296 )
297 } else {
298 let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
299 if expected.len() == 1 {
300 (format!("Expected {}", expected[0]), None)
301 } else {
302 (format!("Expected one of: {}", expected.join(", ")), None)
303 }
304 }
305 }
306 pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
307 };
308
309 ParseError::Located {
310 line,
311 column,
312 position,
313 message,
314 hint,
315 }
316}
317
318fn format_rule_name(rule: &Rule) -> String {
320 match rule {
321 Rule::identifier => "identifier".to_string(),
322 Rule::integer => "number".to_string(),
323 Rule::float => "number".to_string(),
324 Rule::string => "string".to_string(),
325 Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
326 Rule::type_expr => "type".to_string(),
327 Rule::expr => "expression".to_string(),
328 Rule::statement => "statement".to_string(),
329 Rule::context_decl => "context declaration".to_string(),
330 Rule::stream_decl => "stream declaration".to_string(),
331 Rule::pattern_decl => "pattern declaration".to_string(),
332 Rule::event_decl => "event declaration".to_string(),
333 Rule::fn_decl => "function declaration".to_string(),
334 Rule::INDENT => "indented block".to_string(),
335 Rule::DEDENT => "end of block".to_string(),
336 Rule::field => "field declaration (name: type)".to_string(),
337 Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
338 Rule::additive_op => "operator (+, -)".to_string(),
339 Rule::multiplicative_op => "operator (*, /, %)".to_string(),
340 Rule::postfix_suffix => "method call or member access".to_string(),
341 Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
342 Rule::sase_seq_expr => "SEQ expression".to_string(),
343 Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
344 _ => format!("{rule:?}").to_lowercase().replace('_', " "),
345 }
346}
347
348fn is_stream_op_error(positives: &[Rule]) -> bool {
352 const STREAM_OP_RULES: &[Rule] = &[
353 Rule::context_op,
354 Rule::where_op,
355 Rule::select_op,
356 Rule::window_op,
357 Rule::aggregate_op,
358 Rule::having_op,
359 Rule::partition_by_op,
360 Rule::order_by_op,
361 Rule::limit_op,
362 Rule::distinct_op,
363 Rule::map_op,
364 Rule::filter_op,
365 Rule::tap_op,
366 Rule::print_op,
367 Rule::log_op,
368 Rule::emit_op,
369 Rule::to_op,
370 Rule::pattern_op,
371 Rule::concurrent_op,
372 Rule::process_op,
373 Rule::on_error_op,
374 Rule::collect_op,
375 Rule::on_op,
376 Rule::within_op,
377 Rule::not_op,
378 Rule::fork_op,
379 Rule::any_op,
380 Rule::all_op,
381 Rule::first_op,
382 Rule::watermark_op,
383 Rule::allowed_lateness_op,
384 Rule::trend_aggregate_op,
385 Rule::score_op,
386 Rule::forecast_op,
387 Rule::enrich_op,
388 ];
389 positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
390}
391
392fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
393 let span = Span::new(pair.as_span().start(), pair.as_span().end());
394 let inner = pair.into_inner().expect_next("statement body")?;
395
396 let stmt = match inner.as_rule() {
397 Rule::context_decl => parse_context_decl(inner)?,
398 Rule::connector_decl => parse_connector_decl(inner)?,
399 Rule::stream_decl => parse_stream_decl(inner)?,
400 Rule::pattern_decl => parse_pattern_decl(inner)?,
401 Rule::event_decl => parse_event_decl(inner)?,
402 Rule::type_decl => parse_type_decl(inner)?,
403 Rule::var_decl => parse_var_decl(inner)?,
404 Rule::const_decl => parse_const_decl(inner)?,
405 Rule::fn_decl => parse_fn_decl(inner)?,
406 Rule::config_block => parse_config_block(inner)?,
407 Rule::import_stmt => parse_import_stmt(inner)?,
408 Rule::if_stmt => parse_if_stmt(inner)?,
409 Rule::for_stmt => parse_for_stmt(inner)?,
410 Rule::while_stmt => parse_while_stmt(inner)?,
411 Rule::return_stmt => parse_return_stmt(inner)?,
412 Rule::break_stmt => Stmt::Break,
413 Rule::continue_stmt => Stmt::Continue,
414 Rule::emit_stmt => parse_emit_stmt(inner)?,
415 Rule::assignment_stmt => {
416 let mut inner = inner.into_inner();
417 let name = inner.expect_next("variable name")?.as_str().to_string();
418 let value = parse_expr(inner.expect_next("assignment value")?)?;
419 Stmt::Assignment { name, value }
420 }
421 Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
422 _ => {
423 return Err(ParseError::UnexpectedToken {
424 position: span.start,
425 expected: "statement".to_string(),
426 found: format!("{:?}", inner.as_rule()),
427 })
428 }
429 };
430
431 Ok(Spanned::new(stmt, span))
432}
433
434fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
439 let mut inner = pair.into_inner();
440 let name = inner.expect_next("context name")?.as_str().to_string();
441 let mut cores = None;
442
443 for p in inner {
444 if p.as_rule() == Rule::context_params {
445 for param in p.into_inner() {
446 if param.as_rule() == Rule::context_param {
447 let core_ids: Vec<usize> = param
449 .into_inner()
450 .filter(|p| p.as_rule() == Rule::integer)
451 .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
452 .collect();
453 cores = Some(core_ids);
454 }
455 }
456 }
457 }
458
459 Ok(Stmt::ContextDecl { name, cores })
460}
461
462fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
467 let mut inner = pair.into_inner();
468 let name = inner.expect_next("connector name")?.as_str().to_string();
469 let connector_type = inner.expect_next("connector type")?.as_str().to_string();
470 let mut params = Vec::new();
471
472 for p in inner {
473 if p.as_rule() == Rule::connector_params {
474 params = parse_connector_params(p)?;
475 }
476 }
477
478 Ok(Stmt::ConnectorDecl {
479 name,
480 connector_type,
481 params,
482 })
483}
484
485fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
486 let mut params = Vec::new();
487 for p in pair.into_inner() {
488 if p.as_rule() == Rule::connector_param {
489 let mut inner = p.into_inner();
490 let name = inner.expect_next("param name")?.as_str().to_string();
491 let value_pair = inner.expect_next("param value")?;
492 let value = parse_config_value(value_pair)?;
493 params.push(ConnectorParam { name, value });
494 }
495 }
496 Ok(params)
497}
498
499fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
500 let mut inner = pair.into_inner();
501 let name = inner.expect_next("stream name")?.as_str().to_string();
502
503 let mut type_annotation = None;
504 let mut source = StreamSource::Ident(String::new());
505 let mut ops = Vec::new();
506 let mut op_spans = Vec::new();
507
508 for p in inner {
509 match p.as_rule() {
510 Rule::type_annotation => {
511 type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
512 }
513 Rule::stream_expr => {
514 let (s, o, spans) = parse_stream_expr(p)?;
515 source = s;
516 ops = o;
517 op_spans = spans;
518 }
519 _ => {}
520 }
521 }
522
523 Ok(Stmt::StreamDecl {
524 name,
525 type_annotation,
526 source,
527 ops,
528 op_spans,
529 })
530}
531
532fn parse_stream_expr(
533 pair: pest::iterators::Pair<Rule>,
534) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
535 let mut inner = pair.into_inner();
536 let source = parse_stream_source(inner.expect_next("stream source")?)?;
537 let mut ops = Vec::new();
538 let mut op_spans = Vec::new();
539
540 for p in inner {
541 if p.as_rule() == Rule::stream_op {
542 let pest_span = p.as_span();
543 let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
544 ops.push(parse_stream_op(p)?);
545 op_spans.push(span);
546 }
547 }
548
549 Ok((source, ops, op_spans))
550}
551
552fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
557 let mut inner = pair.into_inner();
558 let name = inner.expect_next("pattern name")?.as_str().to_string();
559
560 let mut expr = SasePatternExpr::Event(String::new());
561 let mut within = None;
562 let mut partition_by = None;
563
564 for p in inner {
565 match p.as_rule() {
566 Rule::sase_pattern_expr => {
567 expr = parse_sase_pattern_expr(p)?;
568 }
569 Rule::pattern_within_clause => {
570 let dur_pair = p.into_inner().expect_next("within duration")?;
571 within = Some(Expr::Duration(
572 parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
573 ));
574 }
575 Rule::pattern_partition_clause => {
576 let key = p
577 .into_inner()
578 .expect_next("partition key")?
579 .as_str()
580 .to_string();
581 partition_by = Some(Expr::Ident(key));
582 }
583 _ => {}
584 }
585 }
586
587 Ok(Stmt::PatternDecl {
588 name,
589 expr,
590 within,
591 partition_by,
592 })
593}
594
595fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
596 let inner = pair.into_inner().expect_next("SASE pattern expression")?;
597 parse_sase_or_expr(inner)
598}
599
600fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
601 let mut inner = pair.into_inner();
602 let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
603
604 for right_pair in inner {
605 let right = parse_sase_and_expr(right_pair)?;
606 left = SasePatternExpr::Or(Box::new(left), Box::new(right));
607 }
608
609 Ok(left)
610}
611
612fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
613 let mut inner = pair.into_inner();
614 let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
615
616 for right_pair in inner {
617 let right = parse_sase_not_expr(right_pair)?;
618 left = SasePatternExpr::And(Box::new(left), Box::new(right));
619 }
620
621 Ok(left)
622}
623
624fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
625 let mut inner = pair.into_inner();
626 let first = inner.expect_next("NOT or primary expression")?;
627
628 if first.as_str() == "NOT" {
629 let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
630 Ok(SasePatternExpr::Not(Box::new(expr)))
631 } else {
632 parse_sase_primary_expr(first)
633 }
634}
635
636fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
637 let inner = pair.into_inner().expect_next("SASE primary expression")?;
638
639 match inner.as_rule() {
640 Rule::sase_seq_expr => parse_sase_seq_expr(inner),
641 Rule::sase_grouped_expr => {
642 let nested = inner.into_inner().expect_next("grouped expression")?;
643 let expr = parse_sase_pattern_expr(nested)?;
644 Ok(SasePatternExpr::Group(Box::new(expr)))
645 }
646 Rule::sase_event_ref => parse_sase_event_ref(inner),
647 _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
648 }
649}
650
651fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
652 let mut items = Vec::new();
653
654 for p in pair.into_inner() {
655 if p.as_rule() == Rule::sase_seq_items {
656 for item in p.into_inner() {
657 if item.as_rule() == Rule::sase_seq_item {
658 items.push(parse_sase_seq_item(item)?);
659 }
660 }
661 }
662 }
663
664 Ok(SasePatternExpr::Seq(items))
665}
666
667fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
668 let inner = pair.into_inner().expect_next("sequence item")?;
669
670 match inner.as_rule() {
671 Rule::sase_negated_item => parse_sase_item_inner(inner, true),
672 Rule::sase_positive_item => parse_sase_item_inner(inner, false),
673 _ => parse_sase_item_inner(inner, false),
674 }
675}
676
677fn parse_sase_item_inner(
678 pair: pest::iterators::Pair<Rule>,
679 _negated: bool,
680) -> ParseResult<SasePatternItem> {
681 let mut inner = pair.into_inner();
682 let event_type = inner.expect_next("event type")?.as_str().to_string();
683
684 let mut kleene = None;
685 let mut filter = None;
686 let mut alias = None;
687
688 for p in inner {
689 match p.as_rule() {
690 Rule::kleene_op => {
691 kleene = Some(match p.as_str() {
692 "+" => KleeneOp::Plus,
693 "*" => KleeneOp::Star,
694 "?" => KleeneOp::Optional,
695 _ => KleeneOp::Plus,
696 });
697 }
698 Rule::sase_where_clause => {
699 filter = Some(parse_expr(
700 p.into_inner().expect_next("filter expression")?,
701 )?);
702 }
703 Rule::sase_alias_clause => {
704 alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
705 }
706 _ => {}
707 }
708 }
709
710 let event_type = if _negated {
713 format!("!{event_type}")
714 } else {
715 event_type
716 };
717
718 Ok(SasePatternItem {
719 event_type,
720 alias,
721 kleene,
722 filter,
723 })
724}
725
726fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
727 let item = parse_sase_item_inner(pair, false)?;
729
730 if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
732 Ok(SasePatternExpr::Event(item.event_type))
733 } else {
734 Ok(SasePatternExpr::Seq(vec![item]))
736 }
737}
738
739fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
740 let inner = pair.into_inner().expect_next("stream source type")?;
741
742 match inner.as_rule() {
743 Rule::from_connector_source => {
744 let mut inner_iter = inner.into_inner();
745 let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
746 let connector_name = inner_iter
747 .expect_next("connector name")?
748 .as_str()
749 .to_string();
750 let mut params = Vec::new();
751 for p in inner_iter {
752 if p.as_rule() == Rule::connector_params {
753 params = parse_connector_params(p)?;
754 }
755 }
756 Ok(StreamSource::FromConnector {
757 event_type,
758 connector_name,
759 params,
760 })
761 }
762 Rule::merge_source => {
763 let mut streams = Vec::new();
764 for p in inner.into_inner() {
765 if p.as_rule() == Rule::inline_stream_list {
766 for is in p.into_inner() {
767 streams.push(parse_inline_stream(is)?);
768 }
769 }
770 }
771 Ok(StreamSource::Merge(streams))
772 }
773 Rule::join_source
774 | Rule::left_join_source
775 | Rule::right_join_source
776 | Rule::full_join_source => {
777 let join_type = match inner.as_rule() {
778 Rule::left_join_source => varpulis_core::ast::JoinType::Left,
779 Rule::right_join_source => varpulis_core::ast::JoinType::Right,
780 Rule::full_join_source => varpulis_core::ast::JoinType::Full,
781 _ => varpulis_core::ast::JoinType::Inner,
782 };
783 let mut clauses = Vec::new();
784 for p in inner.into_inner() {
785 if p.as_rule() == Rule::join_clause_list {
786 for jc in p.into_inner() {
787 clauses.push(parse_join_clause(jc, join_type)?);
788 }
789 }
790 }
791 Ok(StreamSource::Join(clauses))
792 }
793 Rule::sequence_source => {
794 let decl =
795 parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
796 Ok(StreamSource::Sequence(decl))
797 }
798 Rule::timer_source => {
799 let timer_args = inner.into_inner().expect_next("timer arguments")?;
800 let decl = parse_timer_decl(timer_args)?;
801 Ok(StreamSource::Timer(decl))
802 }
803 Rule::all_source => {
804 let mut inner_iter = inner.into_inner();
805 let name = inner_iter.expect_next("event name")?.as_str().to_string();
806 let alias = inner_iter.next().map(|p| p.as_str().to_string());
807 Ok(StreamSource::AllWithAlias { name, alias })
808 }
809 Rule::aliased_source => {
810 let mut inner_iter = inner.into_inner();
811 let name = inner_iter.expect_next("event name")?.as_str().to_string();
812 let alias = inner_iter.expect_next("alias")?.as_str().to_string();
813 Ok(StreamSource::IdentWithAlias { name, alias })
814 }
815 Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
816 _ => Err(ParseError::UnexpectedToken {
817 position: 0,
818 expected: "stream source".to_string(),
819 found: format!("{:?}", inner.as_rule()),
820 }),
821 }
822}
823
824fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
825 let mut inner = pair.into_inner();
826
827 let first = inner.expect_next("stream identifier")?;
829 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
830 let name = first.as_str().to_string();
831 return Ok(InlineStreamDecl {
832 name: name.clone(),
833 source: name,
834 filter: None,
835 });
836 }
837
838 let name = first.as_str().to_string();
839 let source = inner.expect_next("stream source")?.as_str().to_string();
840 let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
841
842 Ok(InlineStreamDecl {
843 name,
844 source,
845 filter,
846 })
847}
848
849fn parse_join_clause(
850 pair: pest::iterators::Pair<Rule>,
851 join_type: varpulis_core::ast::JoinType,
852) -> ParseResult<JoinClause> {
853 let mut inner = pair.into_inner();
854
855 let first = inner.expect_next("join clause identifier")?;
856 if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
857 let name = first.as_str().to_string();
858 return Ok(JoinClause {
859 name: name.clone(),
860 source: name,
861 on: None,
862 join_type,
863 });
864 }
865
866 let name = first.as_str().to_string();
867 let source = inner.expect_next("join source")?.as_str().to_string();
868 let on = inner.next().map(|p| parse_expr(p)).transpose()?;
869
870 Ok(JoinClause {
871 name,
872 source,
873 on,
874 join_type,
875 })
876}
877
878fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
879 let mut steps = Vec::new();
880
881 for p in pair.into_inner() {
882 if p.as_rule() == Rule::sequence_step {
883 steps.push(parse_sequence_step(p)?);
884 }
885 }
886
887 Ok(SequenceDecl {
888 match_all: false,
889 timeout: None,
890 steps,
891 })
892}
893
894fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
895 let mut inner = pair.into_inner();
896 let alias = inner.expect_next("step alias")?.as_str().to_string();
897 let event_type = inner.expect_next("event type")?.as_str().to_string();
898
899 let mut filter = None;
900 let mut timeout = None;
901
902 for p in inner {
903 match p.as_rule() {
904 Rule::or_expr => filter = Some(parse_expr(p)?),
905 Rule::within_suffix => {
906 let expr = p.into_inner().expect_next("within duration")?;
907 timeout = Some(Box::new(parse_expr(expr)?));
908 }
909 _ => {}
910 }
911 }
912
913 Ok(SequenceStepDecl {
914 alias,
915 event_type,
916 filter,
917 timeout,
918 })
919}
920
921fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
922 let mut inner = pair.into_inner();
923
924 let interval = parse_expr(inner.expect_next("timer interval")?)?;
926
927 let mut initial_delay = None;
929 for p in inner {
930 if p.as_rule() == Rule::named_arg {
931 let arg = parse_named_arg(p)?;
932 if arg.name == "initial_delay" {
933 initial_delay = Some(Box::new(arg.value));
934 }
935 }
936 }
937
938 Ok(TimerDecl {
939 interval,
940 initial_delay,
941 })
942}
943
944fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
945 let inner = pair.into_inner().expect_next("stream operation")?;
946
947 match inner.as_rule() {
948 Rule::dot_op => {
949 let op_inner = inner.into_inner().expect_next("dot operation")?;
950 parse_dot_op(op_inner)
951 }
952 Rule::followed_by_op => parse_followed_by_op(inner),
953 _ => Err(ParseError::UnexpectedToken {
954 position: 0,
955 expected: "stream operation".to_string(),
956 found: format!("{:?}", inner.as_rule()),
957 }),
958 }
959}
960
961fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
962 match pair.as_rule() {
963 Rule::context_op => {
964 let name = pair
965 .into_inner()
966 .expect_next("context name")?
967 .as_str()
968 .to_string();
969 Ok(StreamOp::Context(name))
970 }
971 Rule::where_op => {
972 let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
973 Ok(StreamOp::Where(expr))
974 }
975 Rule::select_op => {
976 let mut items = Vec::new();
977 for p in pair.into_inner() {
978 if p.as_rule() == Rule::select_list {
979 for si in p.into_inner() {
980 items.push(parse_select_item(si)?);
981 }
982 }
983 }
984 Ok(StreamOp::Select(items))
985 }
986 Rule::window_op => {
987 let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
988 Ok(StreamOp::Window(args))
989 }
990 Rule::aggregate_op => {
991 let mut items = Vec::new();
992 for p in pair.into_inner() {
993 if p.as_rule() == Rule::agg_list {
994 for ai in p.into_inner() {
995 items.push(parse_agg_item(ai)?);
996 }
997 }
998 }
999 Ok(StreamOp::Aggregate(items))
1000 }
1001 Rule::having_op => {
1002 let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
1003 Ok(StreamOp::Having(expr))
1004 }
1005 Rule::map_op => {
1006 let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
1007 Ok(StreamOp::Map(expr))
1008 }
1009 Rule::filter_op => {
1010 let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
1011 Ok(StreamOp::Filter(expr))
1012 }
1013 Rule::within_op => {
1014 let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
1015 Ok(StreamOp::Within(expr))
1016 }
1017 Rule::emit_op => {
1018 let mut output_type = None;
1019 let mut fields = Vec::new();
1020 let mut target_context = None;
1021 for p in pair.into_inner() {
1022 match p.as_rule() {
1023 Rule::emit_type_cast => {
1024 output_type = Some(
1025 p.into_inner()
1026 .expect_next("type name")?
1027 .as_str()
1028 .to_string(),
1029 );
1030 }
1031 Rule::named_arg_list => {
1032 for arg in p.into_inner() {
1033 let parsed = parse_named_arg(arg)?;
1034 if parsed.name == "context" {
1036 if let Expr::Ident(ctx_name) = &parsed.value {
1037 target_context = Some(ctx_name.clone());
1038 continue;
1039 }
1040 }
1041 fields.push(parsed);
1042 }
1043 }
1044 _ => {}
1045 }
1046 }
1047 Ok(StreamOp::Emit {
1048 output_type,
1049 fields,
1050 target_context,
1051 })
1052 }
1053 Rule::print_op => {
1054 let exprs = pair
1055 .into_inner()
1056 .filter(|p| p.as_rule() == Rule::expr_list)
1057 .flat_map(|p| p.into_inner())
1058 .map(parse_expr)
1059 .collect::<ParseResult<Vec<_>>>()?;
1060 Ok(StreamOp::Print(exprs))
1061 }
1062 Rule::collect_op => Ok(StreamOp::Collect),
1063 Rule::pattern_op => {
1064 let def_pair = pair.into_inner().expect_next("pattern definition")?;
1065 let mut inner = def_pair.into_inner();
1066 let name = inner.expect_next("pattern name")?.as_str().to_string();
1067 let body_pair = inner.expect_next("pattern body")?;
1068
1069 let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
1071 let matcher = match body_inner.as_rule() {
1072 Rule::lambda_expr => parse_lambda_expr(body_inner)?,
1073 Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
1074 _ => parse_expr_inner(body_inner)?,
1075 };
1076 Ok(StreamOp::Pattern(PatternDef { name, matcher }))
1077 }
1078 Rule::partition_by_op => {
1079 let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
1080 Ok(StreamOp::PartitionBy(expr))
1081 }
1082 Rule::order_by_op => {
1083 let mut items = Vec::new();
1084 for p in pair.into_inner() {
1085 if p.as_rule() == Rule::order_list {
1086 for oi in p.into_inner() {
1087 items.push(parse_order_item(oi)?);
1088 }
1089 }
1090 }
1091 Ok(StreamOp::OrderBy(items))
1092 }
1093 Rule::limit_op => {
1094 let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1095 Ok(StreamOp::Limit(expr))
1096 }
1097 Rule::distinct_op => {
1098 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1099 Ok(StreamOp::Distinct(expr))
1100 }
1101 Rule::tap_op => {
1102 let args = pair
1103 .into_inner()
1104 .filter(|p| p.as_rule() == Rule::named_arg_list)
1105 .flat_map(|p| p.into_inner())
1106 .map(parse_named_arg)
1107 .collect::<ParseResult<Vec<_>>>()?;
1108 Ok(StreamOp::Tap(args))
1109 }
1110 Rule::log_op => {
1111 let args = pair
1112 .into_inner()
1113 .filter(|p| p.as_rule() == Rule::named_arg_list)
1114 .flat_map(|p| p.into_inner())
1115 .map(parse_named_arg)
1116 .collect::<ParseResult<Vec<_>>>()?;
1117 Ok(StreamOp::Log(args))
1118 }
1119 Rule::to_op => {
1120 let mut inner = pair.into_inner();
1121 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1122 let mut params = Vec::new();
1123 for p in inner {
1124 if p.as_rule() == Rule::connector_params {
1125 params = parse_connector_params(p)?;
1126 }
1127 }
1128 Ok(StreamOp::To {
1129 connector_name,
1130 params,
1131 })
1132 }
1133 Rule::process_op => {
1134 let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1135 Ok(StreamOp::Process(expr))
1136 }
1137 Rule::on_error_op => {
1138 let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1139 Ok(StreamOp::OnError(expr))
1140 }
1141 Rule::on_op => {
1142 let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1143 Ok(StreamOp::On(expr))
1144 }
1145 Rule::not_op => {
1146 let mut inner = pair.into_inner();
1147 let event_type = inner.expect_next("event type")?.as_str().to_string();
1148 let filter = inner.next().map(parse_expr).transpose()?;
1149 Ok(StreamOp::Not(FollowedByClause {
1150 event_type,
1151 filter,
1152 alias: None,
1153 match_all: false,
1154 }))
1155 }
1156 Rule::fork_op => {
1157 let mut paths = Vec::new();
1158 for p in pair.into_inner() {
1159 if p.as_rule() == Rule::fork_path_list {
1160 for fp in p.into_inner() {
1161 paths.push(parse_fork_path(fp)?);
1162 }
1163 }
1164 }
1165 Ok(StreamOp::Fork(paths))
1166 }
1167 Rule::any_op => {
1168 let count = pair
1169 .into_inner()
1170 .next()
1171 .map(|p| p.as_str().parse().unwrap_or(1));
1172 Ok(StreamOp::Any(count))
1173 }
1174 Rule::all_op => Ok(StreamOp::All),
1175 Rule::first_op => Ok(StreamOp::First),
1176 Rule::concurrent_op => {
1177 let args = pair
1178 .into_inner()
1179 .filter(|p| p.as_rule() == Rule::named_arg_list)
1180 .flat_map(|p| p.into_inner())
1181 .map(parse_named_arg)
1182 .collect::<ParseResult<Vec<_>>>()?;
1183 Ok(StreamOp::Concurrent(args))
1184 }
1185 Rule::watermark_op => {
1186 let args = pair
1187 .into_inner()
1188 .filter(|p| p.as_rule() == Rule::named_arg_list)
1189 .flat_map(|p| p.into_inner())
1190 .map(parse_named_arg)
1191 .collect::<ParseResult<Vec<_>>>()?;
1192 Ok(StreamOp::Watermark(args))
1193 }
1194 Rule::allowed_lateness_op => {
1195 let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1196 Ok(StreamOp::AllowedLateness(expr))
1197 }
1198 Rule::trend_aggregate_op => {
1199 let mut items = Vec::new();
1200 for p in pair.into_inner() {
1201 if p.as_rule() == Rule::trend_agg_list {
1202 for item_pair in p.into_inner() {
1203 if item_pair.as_rule() == Rule::trend_agg_item {
1204 let mut inner = item_pair.into_inner();
1205 let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1206 let func_pair = inner.expect_next("trend agg function")?;
1207 let mut func_inner = func_pair.into_inner();
1208 let func_name = func_inner
1209 .expect_next("function name")?
1210 .as_str()
1211 .to_string();
1212 let arg = func_inner.next().map(parse_expr).transpose()?;
1213 items.push(TrendAggItem {
1214 alias,
1215 func: func_name,
1216 arg,
1217 });
1218 }
1219 }
1220 }
1221 }
1222 Ok(StreamOp::TrendAggregate(items))
1223 }
1224 Rule::forecast_op => {
1225 let mut confidence = None;
1226 let mut horizon = None;
1227 let mut warmup = None;
1228 let mut max_depth = None;
1229 let mut hawkes = None;
1230 let mut conformal = None;
1231 let mut mode = None;
1232 for p in pair.into_inner() {
1233 if p.as_rule() == Rule::forecast_params {
1234 for param_pair in p.into_inner() {
1235 if param_pair.as_rule() == Rule::forecast_param {
1236 let mut inner = param_pair.into_inner();
1237 let name = inner.expect_next("forecast param name")?.as_str();
1238 let value_pair = inner.expect_next("forecast param value")?;
1239 let expr = parse_expr(value_pair)?;
1240 match name {
1241 "confidence" => confidence = Some(expr),
1242 "horizon" => horizon = Some(expr),
1243 "warmup" => warmup = Some(expr),
1244 "max_depth" => max_depth = Some(expr),
1245 "hawkes" => hawkes = Some(expr),
1246 "conformal" => conformal = Some(expr),
1247 "mode" => mode = Some(expr),
1248 _ => {}
1249 }
1250 }
1251 }
1252 }
1253 }
1254 Ok(StreamOp::Forecast(ForecastSpec {
1255 confidence,
1256 horizon,
1257 warmup,
1258 max_depth,
1259 hawkes,
1260 conformal,
1261 mode,
1262 }))
1263 }
1264 Rule::enrich_op => {
1265 let mut inner = pair.into_inner();
1266 let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1267 let mut key_expr = None;
1268 let mut fields = Vec::new();
1269 let mut cache_ttl = None;
1270 let mut timeout = None;
1271 let mut fallback = None;
1272 for p in inner {
1273 if p.as_rule() == Rule::enrich_params {
1274 for param_pair in p.into_inner() {
1275 if param_pair.as_rule() == Rule::enrich_param {
1276 let param_inner =
1277 param_pair.into_inner().expect_next("enrich param")?;
1278 match param_inner.as_rule() {
1279 Rule::enrich_key_param => {
1280 let expr_pair =
1281 param_inner.into_inner().expect_next("key expression")?;
1282 key_expr = Some(parse_expr(expr_pair)?);
1283 }
1284 Rule::enrich_fields_param => {
1285 for field in param_inner.into_inner() {
1286 if field.as_rule() == Rule::identifier {
1287 fields.push(field.as_str().to_string());
1288 }
1289 }
1290 }
1291 Rule::enrich_cache_ttl_param => {
1292 let expr_pair = param_inner
1293 .into_inner()
1294 .expect_next("cache_ttl expression")?;
1295 cache_ttl = Some(parse_expr(expr_pair)?);
1296 }
1297 Rule::enrich_timeout_param => {
1298 let expr_pair = param_inner
1299 .into_inner()
1300 .expect_next("timeout expression")?;
1301 timeout = Some(parse_expr(expr_pair)?);
1302 }
1303 Rule::enrich_fallback_param => {
1304 let literal_pair =
1305 param_inner.into_inner().expect_next("fallback literal")?;
1306 fallback = Some(parse_expr(literal_pair)?);
1307 }
1308 _ => {}
1309 }
1310 }
1311 }
1312 }
1313 }
1314 let key = key_expr.ok_or_else(|| ParseError::Located {
1315 line: 0,
1316 column: 0,
1317 position: 0,
1318 message: ".enrich() requires a key: parameter".to_string(),
1319 hint: Some("add key: <expression> to .enrich()".to_string()),
1320 })?;
1321 Ok(StreamOp::Enrich(EnrichSpec {
1322 connector_name,
1323 key_expr: Box::new(key),
1324 fields,
1325 cache_ttl,
1326 timeout,
1327 fallback,
1328 }))
1329 }
1330 Rule::score_op => {
1331 let mut model_path = String::new();
1332 let mut inputs = Vec::new();
1333 let mut outputs = Vec::new();
1334 let mut gpu = false;
1335 let mut batch_size: usize = 1;
1336 let mut device_id: i32 = 0;
1337 for p in pair.into_inner() {
1338 if p.as_rule() == Rule::score_params {
1339 for param_pair in p.into_inner() {
1340 if param_pair.as_rule() == Rule::score_param {
1341 let mut inner = param_pair.into_inner();
1342 let name = inner.expect_next("score param name")?.as_str();
1343 let value_pair = inner.expect_next("score param value")?;
1344 match name {
1345 "model" => {
1346 let raw = value_pair.as_str();
1347 model_path = raw.trim_matches('"').to_string();
1348 }
1349 "inputs" => {
1350 if value_pair.as_rule() == Rule::score_field_list {
1351 for field in value_pair.into_inner() {
1352 if field.as_rule() == Rule::identifier {
1353 inputs.push(field.as_str().to_string());
1354 }
1355 }
1356 }
1357 }
1358 "outputs" => {
1359 if value_pair.as_rule() == Rule::score_field_list {
1360 for field in value_pair.into_inner() {
1361 if field.as_rule() == Rule::identifier {
1362 outputs.push(field.as_str().to_string());
1363 }
1364 }
1365 }
1366 }
1367 "gpu" => {
1368 gpu = value_pair.as_str() == "true";
1369 }
1370 "batch_size" => {
1371 batch_size = value_pair.as_str().parse().unwrap_or(1);
1372 }
1373 "device" | "device_id" => {
1374 device_id = value_pair.as_str().parse().unwrap_or(0);
1375 }
1376 _ => {}
1377 }
1378 }
1379 }
1380 }
1381 }
1382 Ok(StreamOp::Score(ScoreSpec {
1383 model_path,
1384 inputs,
1385 outputs,
1386 gpu,
1387 batch_size,
1388 device_id,
1389 }))
1390 }
1391 _ => Err(ParseError::UnexpectedToken {
1392 position: 0,
1393 expected: "stream operation".to_string(),
1394 found: format!("{:?}", pair.as_rule()),
1395 }),
1396 }
1397}
1398
1399fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1400 let mut inner = pair.into_inner();
1401 let expr = parse_expr(inner.expect_next("order expression")?)?;
1402 let desc = inner.next().is_some_and(|p| p.as_str() == "desc");
1403 Ok(OrderItem {
1404 expr,
1405 descending: desc,
1406 })
1407}
1408
1409fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1410 let mut inner = pair.into_inner();
1411 let name = inner.expect_next("fork path name")?.as_str().to_string();
1412 let mut ops = Vec::new();
1413 for p in inner {
1414 if p.as_rule() == Rule::stream_op {
1415 ops.push(parse_stream_op(p)?);
1416 }
1417 }
1418 Ok(ForkPath { name, ops })
1419}
1420
1421fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1422 let mut inner = pair.into_inner();
1423 let mut match_all = false;
1424
1425 let first = inner.expect_next("event type or match_all")?;
1426 let event_type = if first.as_rule() == Rule::match_all_keyword {
1427 match_all = true;
1428 inner.expect_next("event type")?.as_str().to_string()
1429 } else {
1430 first.as_str().to_string()
1431 };
1432
1433 let mut filter = None;
1434 let mut alias = None;
1435
1436 for p in inner {
1437 match p.as_rule() {
1438 Rule::or_expr => filter = Some(parse_or_expr(p)?),
1439 Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1440 Rule::identifier => alias = Some(p.as_str().to_string()),
1441 _ => {}
1442 }
1443 }
1444
1445 Ok(StreamOp::FollowedBy(FollowedByClause {
1446 event_type,
1447 filter,
1448 alias,
1449 match_all,
1450 }))
1451}
1452
1453fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1454 let mut inner = pair.into_inner();
1455 let first = inner.expect_next("select field or alias")?;
1456
1457 if let Some(second) = inner.next() {
1458 Ok(SelectItem::Alias(
1459 first.as_str().to_string(),
1460 parse_expr(second)?,
1461 ))
1462 } else {
1463 Ok(SelectItem::Field(first.as_str().to_string()))
1464 }
1465}
1466
1467fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1468 let raw = pair.as_str().trim();
1469 let is_session = raw.starts_with("session");
1470
1471 let mut inner = pair.into_inner();
1472
1473 if is_session {
1474 let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1476 return Ok(WindowArgs {
1477 duration: gap_expr.clone(),
1478 sliding: None,
1479 policy: None,
1480 session_gap: Some(gap_expr),
1481 });
1482 }
1483
1484 let duration = parse_expr(inner.expect_next("window duration")?)?;
1485
1486 let mut sliding = None;
1487 let mut policy = None;
1488
1489 for p in inner {
1490 if p.as_rule() == Rule::expr {
1491 if sliding.is_none() {
1493 sliding = Some(parse_expr(p)?);
1494 } else {
1495 policy = Some(parse_expr(p)?);
1496 }
1497 }
1498 }
1499
1500 Ok(WindowArgs {
1501 duration,
1502 sliding,
1503 policy,
1504 session_gap: None,
1505 })
1506}
1507
1508fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1509 let mut inner = pair.into_inner();
1510 let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1511 let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1512 Ok(AggItem { alias, expr })
1513}
1514
1515fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1516 let mut inner = pair.into_inner();
1517 let name = inner.expect_next("argument name")?.as_str().to_string();
1518 let value = parse_expr(inner.expect_next("argument value")?)?;
1519 Ok(NamedArg { name, value })
1520}
1521
1522fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1523 let mut inner = pair.into_inner();
1524 let name = inner.expect_next("event name")?.as_str().to_string();
1525
1526 let mut extends = None;
1527 let mut fields = Vec::new();
1528
1529 for p in inner {
1530 match p.as_rule() {
1531 Rule::identifier => extends = Some(p.as_str().to_string()),
1532 Rule::field => fields.push(parse_field(p)?),
1533 _ => {}
1534 }
1535 }
1536
1537 Ok(Stmt::EventDecl {
1538 name,
1539 extends,
1540 fields,
1541 })
1542}
1543
1544fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1545 let mut inner = pair.into_inner();
1546 let name = inner.expect_next("field name")?.as_str().to_string();
1547 let ty = parse_type(inner.expect_next("field type")?)?;
1548 let optional = inner.next().is_some();
1549 Ok(Field { name, ty, optional })
1550}
1551
1552fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1553 let mut inner = pair.into_inner();
1554 let name = inner.expect_next("type name")?.as_str().to_string();
1555 let next = inner.expect_next("type body")?;
1556 match next.as_rule() {
1557 Rule::field => {
1558 let mut fields = vec![parse_field(next)?];
1559 for p in inner {
1560 if p.as_rule() == Rule::field {
1561 fields.push(parse_field(p)?);
1562 }
1563 }
1564 Ok(Stmt::TypeDecl {
1565 name,
1566 ty: None,
1567 fields,
1568 })
1569 }
1570 _ => Ok(Stmt::TypeDecl {
1571 name,
1572 ty: Some(parse_type(next)?),
1573 fields: vec![],
1574 }),
1575 }
1576}
1577
1578fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1579 let cloned = pair.clone();
1580 let inner = pair.into_inner().next().unwrap_or(cloned);
1581
1582 match inner.as_rule() {
1583 Rule::primitive_type => match inner.as_str() {
1584 "int" => Ok(Type::Int),
1585 "float" => Ok(Type::Float),
1586 "bool" => Ok(Type::Bool),
1587 "str" => Ok(Type::Str),
1588 "timestamp" => Ok(Type::Timestamp),
1589 "duration" => Ok(Type::Duration),
1590 _ => Ok(Type::Named(inner.as_str().to_string())),
1591 },
1592 Rule::array_type => {
1593 let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1594 Ok(Type::Array(Box::new(inner_type)))
1595 }
1596 Rule::map_type => {
1597 let mut inner_pairs = inner.into_inner();
1598 let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1599 let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1600 Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1601 }
1602 Rule::tuple_type => {
1603 let types: Vec<Type> = inner
1604 .into_inner()
1605 .map(parse_type)
1606 .collect::<ParseResult<Vec<_>>>()?;
1607 Ok(Type::Tuple(types))
1608 }
1609 Rule::stream_type => {
1610 let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1611 Ok(Type::Stream(Box::new(inner_type)))
1612 }
1613 Rule::optional_type => {
1614 let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1615 Ok(Type::Optional(Box::new(inner_type)))
1616 }
1617 Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1618 _ => Ok(Type::Named(inner.as_str().to_string())),
1619 }
1620}
1621
1622fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1623 let mut inner = pair.into_inner();
1624 let keyword = inner.expect_next("var_keyword")?.as_str();
1625 let mutable = keyword == "var";
1626 let name = inner.expect_next("variable name")?.as_str().to_string();
1627
1628 let mut ty = None;
1629 let mut value = Expr::Null;
1630
1631 for p in inner {
1632 match p.as_rule() {
1633 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1634 _ => value = parse_expr(p)?,
1635 }
1636 }
1637
1638 Ok(Stmt::VarDecl {
1639 mutable,
1640 name,
1641 ty,
1642 value,
1643 })
1644}
1645
1646fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1647 let mut inner = pair.into_inner();
1648 let name = inner.expect_next("constant name")?.as_str().to_string();
1649
1650 let mut ty = None;
1651 let mut value = Expr::Null;
1652
1653 for p in inner {
1654 match p.as_rule() {
1655 Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1656 _ => value = parse_expr(p)?,
1657 }
1658 }
1659
1660 Ok(Stmt::ConstDecl { name, ty, value })
1661}
1662
1663fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1664 let mut inner = pair.into_inner();
1665 let name = inner.expect_next("function name")?.as_str().to_string();
1666
1667 let mut params = Vec::new();
1668 let mut ret = None;
1669 let mut body = Vec::new();
1670
1671 for p in inner {
1672 match p.as_rule() {
1673 Rule::param_list => {
1674 for param in p.into_inner() {
1675 params.push(parse_param(param)?);
1676 }
1677 }
1678 Rule::type_expr => ret = Some(parse_type(p)?),
1679 Rule::block => body = parse_block(p)?,
1680 Rule::statement => body.push(parse_statement(p)?),
1681 _ => {}
1682 }
1683 }
1684
1685 Ok(Stmt::FnDecl {
1686 name,
1687 params,
1688 ret,
1689 body,
1690 })
1691}
1692
1693fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1694 let mut statements = Vec::new();
1695 for p in pair.into_inner() {
1696 if p.as_rule() == Rule::statement {
1697 statements.push(parse_statement(p)?);
1698 }
1699 }
1700 Ok(statements)
1701}
1702
1703fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1704 let mut inner = pair.into_inner();
1705 let name = inner.expect_next("parameter name")?.as_str().to_string();
1706 let ty = parse_type(inner.expect_next("parameter type")?)?;
1707 Ok(Param { name, ty })
1708}
1709
1710fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1711 let mut inner = pair.into_inner();
1712 let first = inner.expect_next("config name or item")?;
1713
1714 let (name, items_start) = if first.as_rule() == Rule::identifier {
1716 (first.as_str().to_string(), None)
1717 } else {
1718 ("default".to_string(), Some(first))
1720 };
1721
1722 let mut items = Vec::new();
1723
1724 if let Some(first_item) = items_start {
1726 if first_item.as_rule() == Rule::config_item {
1727 items.push(parse_config_item(first_item)?);
1728 }
1729 }
1730
1731 for p in inner {
1732 if p.as_rule() == Rule::config_item {
1733 items.push(parse_config_item(p)?);
1734 }
1735 }
1736 Ok(Stmt::Config { name, items })
1737}
1738
1739fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1740 let mut inner = pair.into_inner();
1741 let key = inner.expect_next("config key")?.as_str().to_string();
1742 let value = parse_config_value(inner.expect_next("config value")?)?;
1743 Ok(ConfigItem::Value(key, value))
1744}
1745
1746fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1747 let cloned = pair.clone();
1748 let inner = pair.into_inner().next().unwrap_or(cloned);
1749
1750 match inner.as_rule() {
1751 Rule::config_concat => {
1752 let parts: Vec<ConfigValue> = inner
1753 .into_inner()
1754 .map(|atom| {
1755 let inner_atom = atom.into_inner().next().expect("config_atom child");
1757 match inner_atom.as_rule() {
1758 Rule::string => {
1759 let s = inner_atom.as_str();
1760 Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1761 }
1762 Rule::identifier => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1763 _ => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1764 }
1765 })
1766 .collect::<ParseResult<Vec<_>>>()?;
1767 Ok(ConfigValue::Concat(parts))
1768 }
1769 Rule::config_array => {
1770 let values: Vec<ConfigValue> = inner
1771 .into_inner()
1772 .map(parse_config_value)
1773 .collect::<ParseResult<Vec<_>>>()?;
1774 Ok(ConfigValue::Array(values))
1775 }
1776 Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1777 Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1778 Rule::string => {
1779 let s = inner.as_str();
1780 Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1781 }
1782 Rule::duration => Ok(ConfigValue::Duration(
1783 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1784 )),
1785 Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1786 Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1787 _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1788 }
1789}
1790
1791fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1792 let mut inner = pair.into_inner();
1793 let path_pair = inner.expect_next("import path")?;
1794 let path = path_pair.as_str();
1795 let path = path[1..path.len() - 1].to_string();
1796 let alias = inner.next().map(|p| p.as_str().to_string());
1797 Ok(Stmt::Import { path, alias })
1798}
1799
1800fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1801 let mut inner = pair.into_inner();
1802 let cond = parse_expr(inner.expect_next("if condition")?)?;
1803
1804 let mut then_branch = Vec::new();
1805 let mut elif_branches = Vec::new();
1806 let mut else_branch = None;
1807
1808 for p in inner {
1809 match p.as_rule() {
1810 Rule::block => then_branch = parse_block(p)?,
1811 Rule::statement => then_branch.push(parse_statement(p)?),
1812 Rule::elif_clause => {
1813 let mut elif_inner = p.into_inner();
1814 let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1815 let mut elif_body = Vec::new();
1816 for ep in elif_inner {
1817 match ep.as_rule() {
1818 Rule::block => elif_body = parse_block(ep)?,
1819 Rule::statement => elif_body.push(parse_statement(ep)?),
1820 _ => {}
1821 }
1822 }
1823 elif_branches.push((elif_cond, elif_body));
1824 }
1825 Rule::else_clause => {
1826 let mut else_body = Vec::new();
1827 for ep in p.into_inner() {
1828 match ep.as_rule() {
1829 Rule::block => else_body = parse_block(ep)?,
1830 Rule::statement => else_body.push(parse_statement(ep)?),
1831 _ => {}
1832 }
1833 }
1834 else_branch = Some(else_body);
1835 }
1836 _ => {}
1837 }
1838 }
1839
1840 Ok(Stmt::If {
1841 cond,
1842 then_branch,
1843 elif_branches,
1844 else_branch,
1845 })
1846}
1847
1848fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1849 let mut inner = pair.into_inner();
1850 let var = inner.expect_next("loop variable")?.as_str().to_string();
1851 let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1852 let mut body = Vec::new();
1853 for p in inner {
1854 match p.as_rule() {
1855 Rule::block => body = parse_block(p)?,
1856 Rule::statement => body.push(parse_statement(p)?),
1857 _ => {}
1858 }
1859 }
1860 Ok(Stmt::For { var, iter, body })
1861}
1862
1863fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1864 let mut inner = pair.into_inner();
1865 let cond = parse_expr(inner.expect_next("while condition")?)?;
1866 let mut body = Vec::new();
1867 for p in inner {
1868 match p.as_rule() {
1869 Rule::block => body = parse_block(p)?,
1870 Rule::statement => body.push(parse_statement(p)?),
1871 _ => {}
1872 }
1873 }
1874 Ok(Stmt::While { cond, body })
1875}
1876
1877fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1878 let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1879 Ok(Stmt::Return(expr))
1880}
1881
1882fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1883 let mut inner = pair.into_inner();
1884 let event_type = inner.expect_next("event type name")?.as_str().to_string();
1885 let mut fields = Vec::new();
1886 for p in inner {
1887 if p.as_rule() == Rule::named_arg_list {
1888 for arg in p.into_inner() {
1889 fields.push(parse_named_arg(arg)?);
1890 }
1891 }
1892 }
1893 Ok(Stmt::Emit { event_type, fields })
1894}
1895
1896fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1901 let inner = pair.into_inner().next();
1902
1903 match inner {
1904 Some(p) => parse_expr_inner(p),
1905 None => Ok(Expr::Null),
1906 }
1907}
1908
1909fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1910 match pair.as_rule() {
1911 Rule::expr => parse_expr(pair),
1912 Rule::lambda_expr => parse_lambda_expr(pair),
1913 Rule::range_expr => parse_range_expr(pair),
1914 Rule::or_expr => parse_or_expr(pair),
1915 Rule::and_expr => parse_and_expr(pair),
1916 Rule::not_expr => parse_not_expr(pair),
1917 Rule::comparison_expr => parse_comparison_expr(pair),
1918 Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1919 Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1920 Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1921 Rule::shift_expr => parse_shift_expr(pair),
1922 Rule::additive_expr => parse_additive_expr(pair),
1923 Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1924 Rule::power_expr => parse_power_expr(pair),
1925 Rule::unary_expr => parse_unary_expr(pair),
1926 Rule::postfix_expr => parse_postfix_expr(pair),
1927 Rule::primary_expr => parse_primary_expr(pair),
1928 Rule::literal => parse_literal(pair),
1929 Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1930 Rule::if_expr => parse_if_expr(pair),
1931 _ => Ok(Expr::Ident(pair.as_str().to_string())),
1932 }
1933}
1934
1935fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1936 let mut inner = pair.into_inner();
1937 let mut params = Vec::new();
1938
1939 let first = inner.expect_next("lambda parameters")?;
1941 match first.as_rule() {
1942 Rule::identifier_list => {
1943 for p in first.into_inner() {
1944 params.push(p.as_str().to_string());
1945 }
1946 }
1947 Rule::identifier => {
1948 params.push(first.as_str().to_string());
1949 }
1950 _ => {}
1951 }
1952
1953 let body_pair = inner.expect_next("lambda body")?;
1955 let body = match body_pair.as_rule() {
1956 Rule::lambda_block => parse_lambda_block(body_pair)?,
1957 _ => parse_expr_inner(body_pair)?,
1958 };
1959
1960 Ok(Expr::Lambda {
1961 params,
1962 body: Box::new(body),
1963 })
1964}
1965
1966fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1967 let mut stmts = Vec::new();
1968 let mut final_expr = None;
1969
1970 for p in pair.into_inner() {
1971 match p.as_rule() {
1972 Rule::statement => {
1973 let stmt = parse_statement(p)?;
1975 match &stmt.node {
1976 Stmt::VarDecl {
1977 mutable,
1978 name,
1979 ty,
1980 value,
1981 } => {
1982 stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
1983 }
1984 Stmt::Expr(e) => {
1985 final_expr = Some(e.clone());
1987 }
1988 _ => {
1989 }
1991 }
1992 }
1993 _ => {
1994 final_expr = Some(parse_expr_inner(p)?);
1996 }
1997 }
1998 }
1999
2000 if stmts.is_empty() {
2002 Ok(final_expr.unwrap_or(Expr::Null))
2003 } else {
2004 Ok(Expr::Block {
2005 stmts,
2006 result: Box::new(final_expr.unwrap_or(Expr::Null)),
2007 })
2008 }
2009}
2010
2011fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2012 let mut inner = pair.into_inner();
2015 let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
2016
2017 for right_pair in inner {
2018 let right = parse_pattern_and_as_expr(right_pair)?;
2019 left = Expr::Binary {
2020 op: BinOp::Or,
2021 left: Box::new(left),
2022 right: Box::new(right),
2023 };
2024 }
2025 Ok(left)
2026}
2027
2028fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2029 let mut inner = pair.into_inner();
2030 let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
2031
2032 for right_pair in inner {
2033 let right = parse_pattern_xor_as_expr(right_pair)?;
2034 left = Expr::Binary {
2035 op: BinOp::And,
2036 left: Box::new(left),
2037 right: Box::new(right),
2038 };
2039 }
2040 Ok(left)
2041}
2042
2043fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2044 let mut inner = pair.into_inner();
2045 let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
2046
2047 for right_pair in inner {
2048 let right = parse_pattern_unary_as_expr(right_pair)?;
2049 left = Expr::Binary {
2050 op: BinOp::Xor,
2051 left: Box::new(left),
2052 right: Box::new(right),
2053 };
2054 }
2055 Ok(left)
2056}
2057
2058fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2059 let mut inner = pair.into_inner();
2060 let first = inner.expect_next("unary expression or operand")?;
2061
2062 if first.as_str() == "not" {
2063 let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
2064 Ok(Expr::Unary {
2065 op: UnaryOp::Not,
2066 expr: Box::new(expr),
2067 })
2068 } else {
2069 parse_pattern_primary_as_expr(first)
2070 }
2071}
2072
2073fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2074 let inner = pair
2075 .into_inner()
2076 .expect_next("pattern primary expression")?;
2077
2078 match inner.as_rule() {
2079 Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
2080 Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
2081 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2082 }
2083}
2084
2085fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2086 let mut inner = pair.into_inner();
2089 let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
2090
2091 for right_pair in inner {
2092 let right = Expr::Ident(right_pair.as_str().to_string());
2093 left = Expr::Binary {
2094 op: BinOp::FollowedBy,
2095 left: Box::new(left),
2096 right: Box::new(right),
2097 };
2098 }
2099 Ok(left)
2100}
2101
2102fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2103 let inner = pair.into_inner().expect_next("filter expression")?;
2104 parse_filter_or_expr(inner)
2105}
2106
2107fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2108 let mut inner = pair.into_inner();
2109 let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
2110
2111 for right_pair in inner {
2112 let right = parse_filter_and_expr(right_pair)?;
2113 left = Expr::Binary {
2114 op: BinOp::Or,
2115 left: Box::new(left),
2116 right: Box::new(right),
2117 };
2118 }
2119 Ok(left)
2120}
2121
2122fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2123 let mut inner = pair.into_inner();
2124 let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2125
2126 for right_pair in inner {
2127 let right = parse_filter_not_expr(right_pair)?;
2128 left = Expr::Binary {
2129 op: BinOp::And,
2130 left: Box::new(left),
2131 right: Box::new(right),
2132 };
2133 }
2134 Ok(left)
2135}
2136
2137fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2138 let mut inner = pair.into_inner();
2139 let first = inner.expect_next("not or expression")?;
2140
2141 if first.as_str() == "not" {
2142 let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2143 Ok(Expr::Unary {
2144 op: UnaryOp::Not,
2145 expr: Box::new(expr),
2146 })
2147 } else {
2148 parse_filter_comparison_expr(first)
2149 }
2150}
2151
2152fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2153 let mut inner = pair.into_inner();
2154 let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2155
2156 if let Some(op_pair) = inner.next() {
2157 let op = match op_pair.as_str() {
2158 "==" => BinOp::Eq,
2159 "!=" => BinOp::NotEq,
2160 "<" => BinOp::Lt,
2161 "<=" => BinOp::Le,
2162 ">" => BinOp::Gt,
2163 ">=" => BinOp::Ge,
2164 "in" => BinOp::In,
2165 "is" => BinOp::Is,
2166 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2167 _ => BinOp::Eq,
2168 };
2169 let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2170 Ok(Expr::Binary {
2171 op,
2172 left: Box::new(left),
2173 right: Box::new(right),
2174 })
2175 } else {
2176 Ok(left)
2177 }
2178}
2179
2180fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2181 let mut inner = pair.into_inner();
2182 let mut left =
2183 parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2184
2185 while let Some(op_pair) = inner.next() {
2186 let op = if op_pair.as_str() == "-" {
2187 BinOp::Sub
2188 } else {
2189 BinOp::Add
2190 };
2191 if let Some(right_pair) = inner.next() {
2192 let right = parse_filter_multiplicative_expr(right_pair)?;
2193 left = Expr::Binary {
2194 op,
2195 left: Box::new(left),
2196 right: Box::new(right),
2197 };
2198 }
2199 }
2200 Ok(left)
2201}
2202
2203fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2204 let mut inner = pair.into_inner();
2205 let mut left =
2206 parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2207
2208 while let Some(op_pair) = inner.next() {
2209 let op = match op_pair.as_str() {
2210 "*" => BinOp::Mul,
2211 "/" => BinOp::Div,
2212 "%" => BinOp::Mod,
2213 _ => BinOp::Mul,
2214 };
2215 if let Some(right_pair) = inner.next() {
2216 let right = parse_filter_unary_expr(right_pair)?;
2217 left = Expr::Binary {
2218 op,
2219 left: Box::new(left),
2220 right: Box::new(right),
2221 };
2222 }
2223 }
2224 Ok(left)
2225}
2226
2227fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2228 let mut inner = pair.into_inner();
2229 let first = inner.expect_next("unary operator or expression")?;
2230
2231 if first.as_rule() == Rule::filter_unary_op {
2233 let op_str = first.as_str();
2234 let expr =
2235 parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2236 let op = match op_str {
2237 "-" => UnaryOp::Neg,
2238 "~" => UnaryOp::BitNot,
2239 _ => unreachable!("Grammar only allows - or ~"),
2240 };
2241 Ok(Expr::Unary {
2242 op,
2243 expr: Box::new(expr),
2244 })
2245 } else {
2246 parse_filter_postfix_expr(first)
2247 }
2248}
2249
2250fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2251 let mut inner = pair.into_inner();
2252 let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2253
2254 for suffix in inner {
2255 expr = parse_filter_postfix_suffix(expr, suffix)?;
2256 }
2257 Ok(expr)
2258}
2259
2260fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2261 let mut inner = pair.into_inner();
2262
2263 if let Some(first) = inner.next() {
2264 match first.as_rule() {
2265 Rule::identifier => {
2266 Ok(Expr::Member {
2268 expr: Box::new(expr),
2269 member: first.as_str().to_string(),
2270 })
2271 }
2272 Rule::optional_member_access => {
2273 let member = first
2274 .into_inner()
2275 .expect_next("member name")?
2276 .as_str()
2277 .to_string();
2278 Ok(Expr::OptionalMember {
2279 expr: Box::new(expr),
2280 member,
2281 })
2282 }
2283 Rule::index_access => {
2284 let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2285 Ok(Expr::Index {
2286 expr: Box::new(expr),
2287 index: Box::new(index),
2288 })
2289 }
2290 Rule::call_args => {
2291 let args = first
2292 .into_inner()
2293 .filter(|p| p.as_rule() == Rule::arg_list)
2294 .flat_map(|p| p.into_inner())
2295 .map(parse_arg)
2296 .collect::<ParseResult<Vec<_>>>()?;
2297 Ok(Expr::Call {
2298 func: Box::new(expr),
2299 args,
2300 })
2301 }
2302 _ => Ok(expr),
2303 }
2304 } else {
2305 Ok(expr)
2306 }
2307}
2308
2309fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2310 let inner = pair.into_inner().expect_next("filter primary expression")?;
2311
2312 match inner.as_rule() {
2313 Rule::literal => parse_literal(inner),
2314 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2315 Rule::filter_expr => parse_filter_expr(inner),
2316 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2317 }
2318}
2319
2320fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2321 let mut inner = pair.into_inner();
2322 let left = parse_expr_inner(inner.expect_next("range start")?)?;
2323
2324 if let Some(op_pair) = inner.next() {
2325 let inclusive = op_pair.as_str() == "..=";
2326 let right = parse_expr_inner(inner.expect_next("range end")?)?;
2327 Ok(Expr::Range {
2328 start: Box::new(left),
2329 end: Box::new(right),
2330 inclusive,
2331 })
2332 } else {
2333 Ok(left)
2334 }
2335}
2336
2337fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2338 let mut inner = pair.into_inner();
2339 let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2340
2341 for right_pair in inner {
2342 let right = parse_expr_inner(right_pair)?;
2343 left = Expr::Binary {
2344 op: BinOp::Or,
2345 left: Box::new(left),
2346 right: Box::new(right),
2347 };
2348 }
2349
2350 Ok(left)
2351}
2352
2353fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2354 let mut inner = pair.into_inner();
2355 let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2356
2357 for right_pair in inner {
2358 let right = parse_expr_inner(right_pair)?;
2359 left = Expr::Binary {
2360 op: BinOp::And,
2361 left: Box::new(left),
2362 right: Box::new(right),
2363 };
2364 }
2365
2366 Ok(left)
2367}
2368
2369fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2370 let mut inner = pair.into_inner();
2371 let first = inner.expect_next("not keyword or expression")?;
2372
2373 if first.as_str() == "not" {
2374 let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2375 Ok(Expr::Unary {
2376 op: UnaryOp::Not,
2377 expr: Box::new(expr),
2378 })
2379 } else {
2380 parse_expr_inner(first)
2381 }
2382}
2383
2384fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2385 let mut inner = pair.into_inner();
2386 let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2387
2388 if let Some(op_pair) = inner.next() {
2389 let op_str = op_pair.as_str();
2390 let op = match op_str {
2391 "==" => BinOp::Eq,
2392 "!=" => BinOp::NotEq,
2393 "<" => BinOp::Lt,
2394 "<=" => BinOp::Le,
2395 ">" => BinOp::Gt,
2396 ">=" => BinOp::Ge,
2397 "in" => BinOp::In,
2398 "is" => BinOp::Is,
2399 s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2400 _ => BinOp::Eq,
2401 };
2402 let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2403 Ok(Expr::Binary {
2404 op,
2405 left: Box::new(left),
2406 right: Box::new(right),
2407 })
2408 } else {
2409 Ok(left)
2410 }
2411}
2412
2413fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2414 parse_binary_chain(pair, BinOp::BitOr)
2415}
2416
2417fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2418 parse_binary_chain(pair, BinOp::BitXor)
2419}
2420
2421fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2422 parse_binary_chain(pair, BinOp::BitAnd)
2423}
2424
2425fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2426 let mut inner = pair.into_inner();
2427 let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2428
2429 while let Some(op_or_expr) = inner.next() {
2430 let op = match op_or_expr.as_str() {
2431 "<<" => BinOp::Shl,
2432 ">>" => BinOp::Shr,
2433 _ => {
2434 let right = parse_expr_inner(op_or_expr)?;
2435 left = Expr::Binary {
2436 op: BinOp::Shl,
2437 left: Box::new(left),
2438 right: Box::new(right),
2439 };
2440 continue;
2441 }
2442 };
2443 if let Some(right_pair) = inner.next() {
2444 let right = parse_expr_inner(right_pair)?;
2445 left = Expr::Binary {
2446 op,
2447 left: Box::new(left),
2448 right: Box::new(right),
2449 };
2450 }
2451 }
2452
2453 Ok(left)
2454}
2455
2456fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2457 let mut inner = pair.into_inner();
2458 let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2459
2460 while let Some(op_pair) = inner.next() {
2461 let op_text = op_pair.as_str();
2462 let op = if op_text == "-" {
2463 BinOp::Sub
2464 } else {
2465 BinOp::Add
2466 };
2467
2468 if let Some(right_pair) = inner.next() {
2469 let right = parse_expr_inner(right_pair)?;
2470 left = Expr::Binary {
2471 op,
2472 left: Box::new(left),
2473 right: Box::new(right),
2474 };
2475 }
2476 }
2477
2478 Ok(left)
2479}
2480
2481fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2482 let mut inner = pair.into_inner();
2483 let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2484
2485 while let Some(op_pair) = inner.next() {
2486 let op_text = op_pair.as_str();
2487 let op = match op_text {
2488 "*" => BinOp::Mul,
2489 "/" => BinOp::Div,
2490 "%" => BinOp::Mod,
2491 _ => BinOp::Mul,
2492 };
2493
2494 if let Some(right_pair) = inner.next() {
2495 let right = parse_expr_inner(right_pair)?;
2496 left = Expr::Binary {
2497 op,
2498 left: Box::new(left),
2499 right: Box::new(right),
2500 };
2501 }
2502 }
2503
2504 Ok(left)
2505}
2506
2507fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2508 let mut inner = pair.into_inner();
2509 let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2510
2511 if let Some(exp_pair) = inner.next() {
2512 let exp = parse_expr_inner(exp_pair)?;
2513 Ok(Expr::Binary {
2514 op: BinOp::Pow,
2515 left: Box::new(base),
2516 right: Box::new(exp),
2517 })
2518 } else {
2519 Ok(base)
2520 }
2521}
2522
2523fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2524 let mut inner = pair.into_inner();
2525 let first = inner.expect_next("unary operator or expression")?;
2526
2527 match first.as_rule() {
2529 Rule::unary_op => {
2530 let op_str = first.as_str();
2531 let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2532 let op = match op_str {
2533 "-" => UnaryOp::Neg,
2534 "~" => UnaryOp::BitNot,
2535 _ => unreachable!("Grammar only allows - or ~"),
2536 };
2537 Ok(Expr::Unary {
2538 op,
2539 expr: Box::new(expr),
2540 })
2541 }
2542 _ => parse_expr_inner(first),
2543 }
2544}
2545
2546fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2547 let mut inner = pair.into_inner();
2548 let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2549
2550 for suffix in inner {
2551 expr = parse_postfix_suffix(expr, suffix)?;
2552 }
2553
2554 Ok(expr)
2555}
2556
2557fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2558 let inner = pair.into_inner().expect_next("postfix suffix")?;
2559
2560 match inner.as_rule() {
2561 Rule::member_access => {
2562 let member = inner
2563 .into_inner()
2564 .expect_next("member name")?
2565 .as_str()
2566 .to_string();
2567 Ok(Expr::Member {
2568 expr: Box::new(expr),
2569 member,
2570 })
2571 }
2572 Rule::optional_member_access => {
2573 let member = inner
2574 .into_inner()
2575 .expect_next("member name")?
2576 .as_str()
2577 .to_string();
2578 Ok(Expr::OptionalMember {
2579 expr: Box::new(expr),
2580 member,
2581 })
2582 }
2583 Rule::slice_access => {
2584 let slice_range = inner.into_inner().expect_next("slice range")?;
2586 let slice_inner = slice_range.into_inner();
2587
2588 let mut start = None;
2589 let mut end = None;
2590
2591 for p in slice_inner {
2592 match p.as_rule() {
2593 Rule::slice_start => {
2594 start = Some(Box::new(parse_expr_inner(
2595 p.into_inner().expect_next("slice start expression")?,
2596 )?));
2597 }
2598 Rule::slice_end => {
2599 end = Some(Box::new(parse_expr_inner(
2600 p.into_inner().expect_next("slice end expression")?,
2601 )?));
2602 }
2603 _ => {}
2604 }
2605 }
2606
2607 Ok(Expr::Slice {
2608 expr: Box::new(expr),
2609 start,
2610 end,
2611 })
2612 }
2613 Rule::index_access => {
2614 let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2615 Ok(Expr::Index {
2616 expr: Box::new(expr),
2617 index: Box::new(index),
2618 })
2619 }
2620 Rule::call_args => {
2621 let mut args = Vec::new();
2622 for p in inner.into_inner() {
2623 if p.as_rule() == Rule::arg_list {
2624 for arg in p.into_inner() {
2625 args.push(parse_arg(arg)?);
2626 }
2627 }
2628 }
2629 Ok(Expr::Call {
2630 func: Box::new(expr),
2631 args,
2632 })
2633 }
2634 _ => Ok(expr),
2635 }
2636}
2637
2638fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2639 let mut inner = pair.into_inner();
2640 let first = inner.expect_next("argument")?;
2641
2642 if let Some(second) = inner.next() {
2643 Ok(Arg::Named(
2644 first.as_str().to_string(),
2645 parse_expr_inner(second)?,
2646 ))
2647 } else {
2648 Ok(Arg::Positional(parse_expr_inner(first)?))
2649 }
2650}
2651
2652fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2653 let inner = pair.into_inner().expect_next("primary expression")?;
2654
2655 match inner.as_rule() {
2656 Rule::if_expr => parse_if_expr(inner),
2657 Rule::literal => parse_literal(inner),
2658 Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2659 Rule::array_literal => parse_array_literal(inner),
2660 Rule::map_literal => parse_map_literal(inner),
2661 Rule::expr => parse_expr(inner),
2662 _ => Ok(Expr::Ident(inner.as_str().to_string())),
2663 }
2664}
2665
2666fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2667 let mut inner = pair.into_inner();
2668 let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2669 let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2670 let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2671
2672 Ok(Expr::If {
2673 cond: Box::new(cond),
2674 then_branch: Box::new(then_branch),
2675 else_branch: Box::new(else_branch),
2676 })
2677}
2678
2679fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2680 let inner = pair.into_inner().expect_next("literal value")?;
2681
2682 match inner.as_rule() {
2683 Rule::integer => inner
2684 .as_str()
2685 .parse::<i64>()
2686 .map(Expr::Int)
2687 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2688 Rule::float => inner
2689 .as_str()
2690 .parse::<f64>()
2691 .map(Expr::Float)
2692 .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2693 Rule::string => {
2694 let s = inner.as_str();
2695 Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2696 }
2697 Rule::duration => Ok(Expr::Duration(
2698 parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2699 )),
2700 Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2701 Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2702 Rule::null => Ok(Expr::Null),
2703 _ => Ok(Expr::Null),
2704 }
2705}
2706
2707fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2708 let mut items = Vec::new();
2709 for p in pair.into_inner() {
2710 if p.as_rule() == Rule::expr_list {
2711 for expr in p.into_inner() {
2712 items.push(parse_expr(expr)?);
2713 }
2714 }
2715 }
2716 Ok(Expr::Array(items))
2717}
2718
2719fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2720 let mut entries = Vec::new();
2721 for p in pair.into_inner() {
2722 if p.as_rule() == Rule::map_entry_list {
2723 for entry in p.into_inner() {
2724 let mut inner = entry.into_inner();
2725 let key = inner.expect_next("map key")?.as_str().to_string();
2726 let key = if key.starts_with('"') {
2727 key[1..key.len() - 1].to_string()
2728 } else {
2729 key
2730 };
2731 let value = parse_expr(inner.expect_next("map value")?)?;
2732 entries.push((key, value));
2733 }
2734 }
2735 }
2736 Ok(Expr::Map(entries))
2737}
2738
2739fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2740 let mut inner = pair.into_inner();
2741 let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2742
2743 for right_pair in inner {
2744 let right = parse_expr_inner(right_pair)?;
2745 left = Expr::Binary {
2746 op,
2747 left: Box::new(left),
2748 right: Box::new(right),
2749 };
2750 }
2751
2752 Ok(left)
2753}
2754
2755#[cfg(test)]
2756mod tests {
2757 use super::*;
2758
2759 #[test]
2760 fn test_parse_simple_stream() {
2761 let result = parse("stream output = input");
2762 assert!(result.is_ok(), "Failed: {:?}", result.err());
2763 }
2764
2765 #[test]
2766 fn test_parse_stream_with_filter() {
2767 let result = parse("stream output = input.where(value > 100)");
2768 assert!(result.is_ok(), "Failed: {:?}", result.err());
2769 }
2770
2771 #[test]
2772 fn test_parse_stream_with_map() {
2773 let result = parse("stream output = input.map(x * 2)");
2774 assert!(result.is_ok(), "Failed: {:?}", result.err());
2775 }
2776
2777 #[test]
2778 fn test_parse_event_declaration() {
2779 let result = parse("event SensorReading:\n sensor_id: str\n value: float");
2780 assert!(result.is_ok(), "Failed: {:?}", result.err());
2781 }
2782
2783 #[test]
2784 fn test_parse_variable() {
2785 let result = parse("let x = 42");
2786 assert!(result.is_ok(), "Failed: {:?}", result.err());
2787 }
2788
2789 #[test]
2790 fn test_parse_function() {
2791 let result = parse("fn add(a: int, b: int) -> int:\n return a + b");
2792 assert!(result.is_ok(), "Failed: {:?}", result.err());
2793 }
2794
2795 #[test]
2796 fn test_parse_lambda() {
2797 let result = parse("let f = (x) => x * 2");
2798 assert!(result.is_ok(), "Failed: {:?}", result.err());
2799 }
2800
2801 #[test]
2802 fn test_parse_if_expression() {
2803 let result = parse("let x = if a > b then a else b");
2804 assert!(result.is_ok(), "Failed: {:?}", result.err());
2805 }
2806
2807 #[test]
2808 fn test_parse_followed_by() {
2809 let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2810 assert!(result.is_ok(), "Failed: {:?}", result.err());
2811 }
2812
2813 #[test]
2814 fn test_parse_window() {
2815 let result = parse("stream windowed = input.window(5s)");
2816 assert!(result.is_ok(), "Failed: {:?}", result.err());
2817 }
2818
2819 #[test]
2820 fn test_parse_aggregate() {
2821 let result =
2822 parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2823 assert!(result.is_ok(), "Failed: {:?}", result.err());
2824 }
2825
2826 #[test]
2827 fn test_parse_merge() {
2828 let result = parse("stream combined = merge(stream1, stream2)");
2829 assert!(result.is_ok(), "Failed: {:?}", result.err());
2830 }
2831
2832 #[test]
2833 fn test_parse_sequence() {
2834 let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2835 assert!(result.is_ok(), "Failed: {:?}", result.err());
2836 }
2837
2838 #[test]
2839 fn test_parse_config() {
2840 let result = parse("config:\n window_size: 5s\n batch_size: 100");
2841 assert!(result.is_ok(), "Failed: {:?}", result.err());
2842 }
2843
2844 #[test]
2845 fn test_parse_complex_expression() {
2846 let result = parse("let x = (a + b) * c / d - e");
2847 assert!(result.is_ok(), "Failed: {:?}", result.err());
2848 }
2849
2850 #[test]
2851 fn test_parse_sliding_window() {
2852 let result = parse("stream output = input.window(5m, sliding: 1m)");
2853 assert!(result.is_ok(), "Failed: {:?}", result.err());
2854 }
2855
2856 #[test]
2857 fn test_parse_fork_construct() {
2858 let result =
2859 parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2860 assert!(result.is_ok(), "Failed: {:?}", result.err());
2861 }
2862
2863 #[test]
2864 fn test_parse_aggregate_functions() {
2865 let result = parse(
2866 "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2867 );
2868 assert!(result.is_ok(), "Failed: {:?}", result.err());
2869 }
2870
2871 #[test]
2872 fn test_parse_complex_parentheses() {
2873 let result = parse("let x = ((a + b) * (c - d)) / e");
2874 assert!(result.is_ok(), "Failed: {:?}", result.err());
2875 }
2876
2877 #[test]
2878 fn test_parse_sequence_with_alias() {
2879 let result = parse(
2880 r#"
2881 stream TwoTicks = StockTick as first
2882 -> StockTick as second
2883 .emit(result: "two_ticks")
2884 "#,
2885 );
2886 assert!(result.is_ok(), "Failed: {:?}", result.err());
2887 }
2888
2889 #[test]
2890 fn test_parse_followed_by_with_alias() {
2891 let result = parse("stream alerts = Order as a -> Payment as b");
2892 assert!(result.is_ok(), "Failed: {:?}", result.err());
2893 }
2894
2895 #[test]
2896 fn test_parse_followed_by_with_filter_and_alias() {
2897 let result = parse(
2899 r#"
2900 stream Test = A as a
2901 -> B where value == a.base + 10 as b
2902 .emit(status: "matched")
2903 "#,
2904 );
2905 assert!(result.is_ok(), "Failed: {:?}", result.err());
2906 }
2907
2908 #[test]
2909 fn test_parse_pattern_with_lambda() {
2910 let result =
2911 parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2912 assert!(result.is_ok(), "Failed: {:?}", result.err());
2913 }
2914
2915 #[test]
2916 fn test_parse_sase_pattern_decl_simple() {
2917 let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2918 assert!(result.is_ok(), "Failed: {:?}", result.err());
2919 }
2920
2921 #[test]
2922 fn test_parse_sase_pattern_decl_with_kleene() {
2923 let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2924 assert!(result.is_ok(), "Failed: {:?}", result.err());
2925 }
2926
2927 #[test]
2928 fn test_parse_sase_pattern_decl_with_alias() {
2929 let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2930 assert!(result.is_ok(), "Failed: {:?}", result.err());
2931 }
2932
2933 #[test]
2934 fn test_parse_sase_pattern_decl_with_within() {
2935 let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2936 assert!(result.is_ok(), "Failed: {:?}", result.err());
2937 }
2938
2939 #[test]
2940 fn test_parse_sase_pattern_decl_with_partition() {
2941 let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2942 assert!(result.is_ok(), "Failed: {:?}", result.err());
2943 }
2944
2945 #[test]
2946 fn test_parse_sase_pattern_decl_full() {
2947 let result = parse(
2949 "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2950 );
2951 assert!(result.is_ok(), "Failed: {:?}", result.err());
2952 }
2953
2954 #[test]
2955 fn test_parse_sase_pattern_decl_or() {
2956 let result = parse("pattern AlertOrWarn = Login OR Logout");
2957 assert!(result.is_ok(), "Failed: {:?}", result.err());
2958 }
2959
2960 #[test]
2961 fn test_parse_sase_pattern_decl_and() {
2962 let result = parse("pattern BothEvents = Login AND Transaction");
2963 assert!(result.is_ok(), "Failed: {:?}", result.err());
2964 }
2965
2966 #[test]
2967 fn test_parse_sase_pattern_decl_not() {
2968 let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
2969 assert!(result.is_ok(), "Failed: {:?}", result.err());
2970 }
2971
2972 #[test]
2973 fn test_parse_having() {
2974 let result = parse(
2975 "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
2976 );
2977 assert!(result.is_ok(), "Failed: {:?}", result.err());
2978 }
2979
2980 #[test]
2981 fn test_parse_having_with_partition() {
2982 let result = parse(
2983 "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
2984 );
2985 assert!(result.is_ok(), "Failed: {:?}", result.err());
2986 }
2987
2988 #[test]
2989 fn test_parse_timer_source() {
2990 let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
2991 assert!(result.is_ok(), "Failed: {:?}", result.err());
2992 }
2993
2994 #[test]
2995 fn test_parse_timer_source_with_initial_delay() {
2996 let result =
2997 parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
2998 assert!(result.is_ok(), "Failed: {:?}", result.err());
2999 }
3000
3001 #[test]
3002 fn test_parse_var_decl() {
3003 let result = parse("var threshold: float = 10.0");
3004 assert!(result.is_ok(), "Failed: {:?}", result.err());
3005 }
3006
3007 #[test]
3008 fn test_parse_let_decl() {
3009 let result = parse("let max_count: int = 100");
3010 assert!(result.is_ok(), "Failed: {:?}", result.err());
3011 }
3012
3013 #[test]
3014 fn test_parse_assignment() {
3015 let result = parse("threshold := threshold + 10.0");
3016 assert!(result.is_ok(), "Failed: {:?}", result.err());
3017 }
3018
3019 #[test]
3020 fn test_parse_assignment_with_expression() {
3021 let result = parse("count := count * 2 + offset");
3022 assert!(result.is_ok(), "Failed: {:?}", result.err());
3023 }
3024
3025 #[test]
3026 fn test_parse_nested_stream_reference() {
3027 let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
3028 assert!(result.is_ok(), "Failed: {:?}", result.err());
3029 }
3030
3031 #[test]
3032 fn test_parse_multi_stage_pipeline() {
3033 let result = parse(
3034 "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
3035 );
3036 assert!(result.is_ok(), "Failed: {:?}", result.err());
3037 }
3038
3039 #[test]
3040 fn test_parse_stream_with_operations_chain() {
3041 let result = parse(
3042 "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
3043 );
3044 assert!(result.is_ok(), "Failed: {:?}", result.err());
3045 }
3046
3047 #[test]
3052 fn test_parse_connector_mqtt() {
3053 let result = parse(
3054 r#"connector MqttBroker = mqtt (
3055 host: "localhost",
3056 port: 1883,
3057 client_id: "varpulis"
3058 )"#,
3059 );
3060 assert!(result.is_ok(), "Failed: {:?}", result.err());
3061 }
3062
3063 #[test]
3064 fn test_parse_connector_kafka() {
3065 let result = parse(
3066 r#"connector KafkaCluster = kafka (
3067 brokers: ["kafka1:9092", "kafka2:9092"],
3068 group_id: "my-group"
3069 )"#,
3070 );
3071 assert!(result.is_ok(), "Failed: {:?}", result.err());
3072 }
3073
3074 #[test]
3075 fn test_parse_connector_http() {
3076 let result = parse(
3077 r#"connector ApiEndpoint = http (
3078 base_url: "https://api.example.com"
3079 )"#,
3080 );
3081 assert!(result.is_ok(), "Failed: {:?}", result.err());
3082 }
3083
3084 #[test]
3085 fn test_parse_stream_with_from_connector() {
3086 let result = parse(
3087 r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
3088 );
3089 assert!(result.is_ok(), "Failed: {:?}", result.err());
3090 }
3091
3092 #[test]
3093 fn test_parse_stream_with_from_and_operations() {
3094 let result = parse(
3095 r#"stream HighTemp = TemperatureReading
3096 .from(MqttSensors, topic: "sensors/#")
3097 .where(value > 30)
3098 .emit(alert: "high_temp")"#,
3099 );
3100 assert!(result.is_ok(), "Failed: {:?}", result.err());
3101 }
3102
3103 #[test]
3104 fn test_parse_full_connectivity_pipeline() {
3105 let result = parse(
3106 r#"
3107 connector MqttSensors = mqtt (host: "localhost", port: 1883)
3108 connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
3109
3110 event TemperatureReading:
3111 sensor_id: str
3112 value: float
3113 ts: timestamp
3114
3115 stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
3116
3117 stream HighTempAlert = Temperatures
3118 .where(value > 30)
3119 .emit(alert_type: "HIGH_TEMP", temperature: value)
3120
3121 "#,
3122 );
3123 assert!(result.is_ok(), "Failed: {:?}", result.err());
3124 }
3125
3126 #[test]
3127 fn test_parse_emit_as_type() {
3128 let result = parse(
3129 r#"stream Alerts = Temperatures
3130 .where(value > 30)
3131 .emit as AlertEvent(severity: "high", temp: value)"#,
3132 );
3133 assert!(result.is_ok(), "Failed: {:?}", result.err());
3134 }
3135
3136 #[test]
3137 fn test_parse_stream_with_to_connector() {
3138 let result = parse(
3139 r#"stream Output = Input
3140 .where(x > 0)
3141 .emit(y: x * 2)
3142 .to(KafkaOutput, topic: "output")"#,
3143 );
3144 assert!(result.is_ok(), "Failed: {:?}", result.err());
3145 }
3146
3147 #[test]
3148 fn test_emit_stmt_parses() {
3149 let result = parse(
3150 r"fn test():
3151 emit Pixel(x: 1, y: 2)",
3152 );
3153 assert!(result.is_ok(), "Failed: {:?}", result.err());
3154 let program = result.unwrap();
3155 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3157 match &body[0].node {
3158 Stmt::Emit { event_type, fields } => {
3159 assert_eq!(event_type, "Pixel");
3160 assert_eq!(fields.len(), 2);
3161 assert_eq!(fields[0].name, "x");
3162 assert_eq!(fields[1].name, "y");
3163 }
3164 other => panic!("Expected Stmt::Emit, got {other:?}"),
3165 }
3166 } else {
3167 panic!("Expected FnDecl");
3168 }
3169 }
3170
3171 #[test]
3172 fn test_emit_stmt_no_args() {
3173 let result = parse(
3174 r"fn test():
3175 emit Done()",
3176 );
3177 assert!(result.is_ok(), "Failed: {:?}", result.err());
3178 let program = result.unwrap();
3179 if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3180 match &body[0].node {
3181 Stmt::Emit { event_type, fields } => {
3182 assert_eq!(event_type, "Done");
3183 assert!(fields.is_empty());
3184 }
3185 other => panic!("Expected Stmt::Emit, got {other:?}"),
3186 }
3187 } else {
3188 panic!("Expected FnDecl");
3189 }
3190 }
3191
3192 #[test]
3193 fn test_emit_in_function_with_for_loop() {
3194 let result = parse(
3195 r"fn generate(n: int):
3196 for i in 0..n:
3197 emit Item(index: i, value: i * 2)",
3198 );
3199 assert!(result.is_ok(), "Failed: {:?}", result.err());
3200 }
3201
3202 #[test]
3203 fn test_parse_process_op() {
3204 let result = parse(
3205 r"fn do_work():
3206 emit Result(v: 42)
3207
3208stream S = timer(1s).process(do_work())",
3209 );
3210 assert!(result.is_ok(), "Failed: {:?}", result.err());
3211 }
3212
3213 #[test]
3214 fn test_parse_trend_aggregate_count_trends() {
3215 let result = parse(
3216 r#"stream S = StockTick as first
3217 -> all StockTick where price > first.price as rising
3218 -> StockTick where price < rising.price as drop
3219 .within(60s)
3220 .trend_aggregate(count: count_trends())
3221 .emit(event_type: "TrendStats", trends: count)"#,
3222 );
3223 assert!(result.is_ok(), "Failed: {:?}", result.err());
3224 let program = result.unwrap();
3225 for stmt in &program.statements {
3227 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3228 let has_trend_agg = ops
3229 .iter()
3230 .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3231 assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3232 for op in ops {
3234 if let StreamOp::TrendAggregate(items) = op {
3235 assert_eq!(items.len(), 1);
3236 assert_eq!(items[0].alias, "count");
3237 assert_eq!(items[0].func, "count_trends");
3238 assert!(items[0].arg.is_none());
3239 }
3240 }
3241 return;
3242 }
3243 }
3244 panic!("No stream declaration found");
3245 }
3246
3247 #[test]
3248 fn test_parse_trend_aggregate_multiple_items() {
3249 let result = parse(
3250 r"stream S = StockTick as first
3251 -> all StockTick as rising
3252 .within(60s)
3253 .trend_aggregate(
3254 trend_count: count_trends(),
3255 event_count: count_events(rising)
3256 )
3257 .emit(trends: trend_count, events: event_count)",
3258 );
3259 assert!(result.is_ok(), "Failed: {:?}", result.err());
3260 let program = result.unwrap();
3261 for stmt in &program.statements {
3262 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3263 for op in ops {
3264 if let StreamOp::TrendAggregate(items) = op {
3265 assert_eq!(items.len(), 2);
3266 assert_eq!(items[0].alias, "trend_count");
3267 assert_eq!(items[0].func, "count_trends");
3268 assert_eq!(items[1].alias, "event_count");
3269 assert_eq!(items[1].func, "count_events");
3270 assert!(items[1].arg.is_some());
3271 return;
3272 }
3273 }
3274 }
3275 }
3276 panic!("No TrendAggregate found");
3277 }
3278
3279 #[test]
3280 fn test_parse_score_basic() {
3281 let result = parse(
3282 r#"stream S = TradeEvent
3283 .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3284 );
3285 assert!(result.is_ok(), "Failed: {:?}", result.err());
3286 let program = result.unwrap();
3287 for stmt in &program.statements {
3288 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3289 for op in ops {
3290 if let StreamOp::Score(spec) = op {
3291 assert_eq!(spec.model_path, "models/fraud.onnx");
3292 assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3293 assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3294 return;
3295 }
3296 }
3297 }
3298 }
3299 panic!("No Score op found");
3300 }
3301
3302 #[test]
3303 fn test_parse_score_single_field() {
3304 let result = parse(
3305 r#"stream S = Event
3306 .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3307 );
3308 assert!(result.is_ok(), "Failed: {:?}", result.err());
3309 let program = result.unwrap();
3310 for stmt in &program.statements {
3311 if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3312 for op in ops {
3313 if let StreamOp::Score(spec) = op {
3314 assert_eq!(spec.model_path, "model.onnx");
3315 assert_eq!(spec.inputs, vec!["value"]);
3316 assert_eq!(spec.outputs, vec!["prediction"]);
3317 return;
3318 }
3319 }
3320 }
3321 }
3322 panic!("No Score op found");
3323 }
3324
3325 #[test]
3330 fn fuzz_regression_unmatched_brackets_timeout() {
3331 let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3333 let start = std::time::Instant::now();
3334 let result = parse(input);
3335 let elapsed = start.elapsed();
3336 assert!(
3337 result.is_err(),
3338 "Should reject deeply nested unmatched brackets"
3339 );
3340 assert!(
3341 elapsed.as_millis() < 100,
3342 "Parser should reject fast, took {elapsed:?}"
3343 );
3344 }
3345
3346 #[test]
3347 fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3348 let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3350 let start = std::time::Instant::now();
3351 let result = parse(input);
3352 let elapsed = start.elapsed();
3353 assert!(result.is_err(), "Should reject deeply nested brackets");
3354 assert!(
3355 elapsed.as_millis() < 100,
3356 "Parser should reject fast, took {elapsed:?}"
3357 );
3358 }
3359
3360 #[test]
3361 fn nesting_depth_allows_reasonable_programs() {
3362 let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3364 let result = parse(input);
3365 if let Err(ref e) = result {
3368 let msg = format!("{e}");
3369 assert!(
3370 !msg.contains("Nesting depth"),
3371 "Should allow 10 levels of nesting: {msg}"
3372 );
3373 }
3374 }
3375
3376 #[test]
3377 fn nesting_depth_ignores_brackets_in_comments() {
3378 let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3380 let result = parse(input);
3381 assert!(
3382 result.is_ok(),
3383 "Brackets in comments should be ignored: {:?}",
3384 result.err()
3385 );
3386 }
3387
3388 #[test]
3389 fn nesting_depth_ignores_brackets_in_strings() {
3390 let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3392 let result = parse(input);
3393 if let Err(ref e) = result {
3394 let msg = format!("{e}");
3395 assert!(
3396 !msg.contains("Nesting depth"),
3397 "Brackets in strings should be ignored: {msg}"
3398 );
3399 }
3400 }
3401}