Skip to main content

varpulis_parser/
pest_parser.rs

1//! Pest-based parser for VPL
2//!
3//! This module provides parsing using the pest PEG parser generator.
4//!
5//! The `Rule` enum and its variants are auto-generated by `pest_derive`
6//! from the grammar file and cannot carry doc comments.
7
8use pest::Parser;
9use pest_derive::Parser;
10use varpulis_core::ast::*;
11use varpulis_core::span::{Span, Spanned};
12use varpulis_core::types::Type;
13
14use crate::error::{ParseError, ParseResult};
15use crate::helpers::{parse_duration, parse_timestamp};
16use crate::indent::preprocess_indentation;
17
18/// Extension trait for safer iterator extraction
19trait IteratorExt<'a> {
20    /// Get the next element or return an error with the expected rule description
21    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
22}
23
24impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
25    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
26        self.next().ok_or_else(|| ParseError::Located {
27            line: 0,
28            column: 0,
29            position: 0,
30            message: format!("Expected {expected}"),
31            hint: None,
32        })
33    }
34}
35
36/// Pest-based VPL parser.
37///
38/// The grammar rules are auto-generated from `varpulis.pest` by the
39/// `pest_derive` macro. Use [`parse`] instead of calling this directly.
40#[derive(Debug, Parser)]
41#[grammar = "varpulis.pest"]
42pub struct VarpulisParser;
43
44/// Maximum wall-clock time the parser is allowed to run before being aborted.
45///
46/// PEG recursive-descent parsers can exhibit exponential backtracking on
47/// adversarial inputs even after bracket-depth and unmatched-bracket pre-scans.
48/// A hard timeout protects callers (CLI, LSP, fuzz targets) from hangs.
49/// 10 seconds is orders of magnitude more than any real VPL program needs
50/// (typical parse time is <5 ms for a 1000-line file).
51const PARSE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
52
53/// Parse a VPL source string into a Program AST.
54///
55/// Runs pest parsing in a dedicated thread with a 16 MB stack and a wall-clock
56/// timeout to guard against stack overflow and exponential backtracking on
57/// adversarial inputs.
58pub fn parse(source: &str) -> ParseResult<Program> {
59    let source = source.to_string();
60    let handle = std::thread::Builder::new()
61        .stack_size(16 * 1024 * 1024)
62        .spawn(move || parse_inner(&source))
63        .map_err(|e| ParseError::InvalidToken {
64            position: 0,
65            message: format!("Failed to spawn parser thread: {e}"),
66        })?;
67
68    // Park the current thread until the parser finishes or the timeout fires.
69    let deadline = std::time::Instant::now() + PARSE_TIMEOUT;
70    loop {
71        if handle.is_finished() {
72            return handle.join().unwrap_or_else(|_| {
73                Err(ParseError::InvalidToken {
74                    position: 0,
75                    message: "Parser stack overflow on deeply nested input".to_string(),
76                })
77            });
78        }
79        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
80        if remaining.is_zero() {
81            return Err(ParseError::InvalidToken {
82                position: 0,
83                message: format!(
84                    "Parser timed out after {}s on pathological input",
85                    PARSE_TIMEOUT.as_secs()
86                ),
87            });
88        }
89        std::thread::sleep(std::time::Duration::from_millis(5).min(remaining));
90    }
91}
92
93/// Maximum bracket nesting depth allowed before pest parsing.
94///
95/// PEG recursive descent can cause exponential backtracking when unmatched
96/// brackets create ambiguity between array_literal, index_access, and
97/// slice_access rules.  Measured scaling is O(2.35^depth): depth 20 takes
98/// 1200s+, depth 16 takes 39s, depth 10 takes under 0.3s.  10 levels is
99/// generous for real VPL programs (typical nesting is 3-6 levels, extreme
100/// real-world is about 8) while keeping worst-case parse time under 1 second.
101const MAX_NESTING_DEPTH: usize = 10;
102
103/// Maximum number of unmatched open brackets allowed.
104///
105/// Even when nesting depth stays within limits, many unmatched open brackets
106/// at different positions (e.g. `a[b[c[d[` with no closing `]`) cause
107/// combinatorial explosion as pest tries each `[` as a potential array_literal,
108/// index_access, or slice_access start. 6 is generous for real VPL programs
109/// (unmatched brackets are always parse errors anyway).
110const MAX_UNMATCHED_OPEN_BRACKETS: usize = 6;
111
112/// O(n) pre-scan that rejects inputs with bracket nesting deeper than
113/// `MAX_NESTING_DEPTH` or too many unmatched open brackets. Respects string
114/// literals and comments so that brackets inside `"..."`, `# ...`, or
115/// `/* ... */` are ignored.
116fn check_nesting_depth(source: &str) -> ParseResult<()> {
117    let mut depth: usize = 0;
118    let mut max_depth: usize = 0;
119    let mut max_depth_pos: usize = 0;
120    let bytes = source.as_bytes();
121    let len = bytes.len();
122    let mut i = 0;
123
124    // Per-bracket-type counters for unmatched detection
125    let mut paren_open: usize = 0;
126    let mut paren_close: usize = 0;
127    let mut square_open: usize = 0;
128    let mut square_close: usize = 0;
129    let mut curly_open: usize = 0;
130    let mut curly_close: usize = 0;
131
132    while i < len {
133        let b = bytes[i];
134
135        // Skip double-quoted strings
136        if b == b'"' {
137            i += 1;
138            while i < len {
139                if bytes[i] == b'\\' {
140                    i += 2; // skip escaped char
141                    continue;
142                }
143                if bytes[i] == b'"' {
144                    i += 1;
145                    break;
146                }
147                i += 1;
148            }
149            continue;
150        }
151
152        // Skip VPL line comments (# to end of line)
153        if b == b'#' {
154            i += 1;
155            while i < len && bytes[i] != b'\n' {
156                i += 1;
157            }
158            continue;
159        }
160
161        // Skip block comments
162        if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
163            i += 2;
164            while i + 1 < len {
165                if bytes[i] == b'*' && bytes[i + 1] == b'/' {
166                    i += 2;
167                    break;
168                }
169                i += 1;
170            }
171            continue;
172        }
173
174        // Track bracket depth
175        match b {
176            b'(' => {
177                depth += 1;
178                paren_open += 1;
179            }
180            b'[' => {
181                depth += 1;
182                square_open += 1;
183            }
184            b'{' => {
185                depth += 1;
186                curly_open += 1;
187            }
188            b')' => {
189                depth = depth.saturating_sub(1);
190                paren_close += 1;
191            }
192            b']' => {
193                depth = depth.saturating_sub(1);
194                square_close += 1;
195            }
196            b'}' => {
197                depth = depth.saturating_sub(1);
198                curly_close += 1;
199            }
200            _ => {}
201        }
202
203        if depth > max_depth {
204            max_depth = depth;
205            max_depth_pos = i;
206        }
207
208        if max_depth > MAX_NESTING_DEPTH {
209            return Err(ParseError::InvalidToken {
210                position: max_depth_pos,
211                message: format!("Nesting depth exceeds maximum of {MAX_NESTING_DEPTH} levels"),
212            });
213        }
214
215        i += 1;
216    }
217
218    // Reject inputs with too many unmatched open brackets.
219    // Unmatched brackets are always parse errors, but they cause exponential
220    // backtracking in pest before it can report the error.
221    let unmatched = paren_open.saturating_sub(paren_close)
222        + square_open.saturating_sub(square_close)
223        + curly_open.saturating_sub(curly_close);
224    if unmatched > MAX_UNMATCHED_OPEN_BRACKETS {
225        return Err(ParseError::InvalidToken {
226            position: max_depth_pos,
227            message: format!(
228                "Too many unmatched open brackets ({unmatched}); maximum is {MAX_UNMATCHED_OPEN_BRACKETS}"
229            ),
230        });
231    }
232
233    Ok(())
234}
235
236fn parse_inner(source: &str) -> ParseResult<Program> {
237    // Expand compile-time declaration loops (top-level for with {var} interpolation)
238    let expanded =
239        crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
240            position: 0,
241            message: e,
242        })?;
243    // Preprocess to add INDENT/DEDENT markers
244    let preprocessed = preprocess_indentation(&expanded);
245
246    // Reject deeply nested input before pest parsing to prevent stack overflow
247    check_nesting_depth(&preprocessed)?;
248
249    let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
250
251    let mut statements = Vec::new();
252
253    for pair in pairs {
254        if pair.as_rule() == Rule::program {
255            for inner in pair.into_inner() {
256                if inner.as_rule() == Rule::statement {
257                    statements.push(parse_statement(inner)?);
258                }
259            }
260        }
261    }
262
263    Ok(crate::optimize::fold_program(Program { statements }))
264}
265
266fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
267    let position = match e.location {
268        pest::error::InputLocation::Pos(p) => p,
269        pest::error::InputLocation::Span((s, _)) => s,
270    };
271
272    // Extract line/column from pest error
273    let (line, column) = match e.line_col {
274        pest::error::LineColLocation::Pos((l, c)) => (l, c),
275        pest::error::LineColLocation::Span((l, c), _) => (l, c),
276    };
277
278    // Create a human-readable message based on what was expected
279    let (message, hint) = match &e.variant {
280        pest::error::ErrorVariant::ParsingError {
281            positives,
282            negatives: _,
283        } => {
284            if positives.is_empty() {
285                ("Unexpected token".to_string(), None)
286            } else if is_stream_op_error(positives) {
287                // All positives are stream operations — produce a concise error
288                (
289                    "unknown stream operation".to_string(),
290                    Some(
291                        "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
292                         .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
293                         .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
294                            .to_string(),
295                    ),
296                )
297            } else {
298                let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
299                if expected.len() == 1 {
300                    (format!("Expected {}", expected[0]), None)
301                } else {
302                    (format!("Expected one of: {}", expected.join(", ")), None)
303                }
304            }
305        }
306        pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
307    };
308
309    ParseError::Located {
310        line,
311        column,
312        position,
313        message,
314        hint,
315    }
316}
317
318/// Convert pest Rule names to human-readable format
319fn format_rule_name(rule: &Rule) -> String {
320    match rule {
321        Rule::identifier => "identifier".to_string(),
322        Rule::integer => "number".to_string(),
323        Rule::float => "number".to_string(),
324        Rule::string => "string".to_string(),
325        Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
326        Rule::type_expr => "type".to_string(),
327        Rule::expr => "expression".to_string(),
328        Rule::statement => "statement".to_string(),
329        Rule::context_decl => "context declaration".to_string(),
330        Rule::stream_decl => "stream declaration".to_string(),
331        Rule::pattern_decl => "pattern declaration".to_string(),
332        Rule::event_decl => "event declaration".to_string(),
333        Rule::fn_decl => "function declaration".to_string(),
334        Rule::INDENT => "indented block".to_string(),
335        Rule::DEDENT => "end of block".to_string(),
336        Rule::field => "field declaration (name: type)".to_string(),
337        Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
338        Rule::additive_op => "operator (+, -)".to_string(),
339        Rule::multiplicative_op => "operator (*, /, %)".to_string(),
340        Rule::postfix_suffix => "method call or member access".to_string(),
341        Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
342        Rule::sase_seq_expr => "SEQ expression".to_string(),
343        Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
344        _ => format!("{rule:?}").to_lowercase().replace('_', " "),
345    }
346}
347
348/// Returns true when all positives look like stream operation rules.
349/// This is used to produce a concise "unknown stream operation" error
350/// instead of listing 30+ individual operation names.
351fn is_stream_op_error(positives: &[Rule]) -> bool {
352    const STREAM_OP_RULES: &[Rule] = &[
353        Rule::context_op,
354        Rule::where_op,
355        Rule::select_op,
356        Rule::window_op,
357        Rule::aggregate_op,
358        Rule::having_op,
359        Rule::partition_by_op,
360        Rule::order_by_op,
361        Rule::limit_op,
362        Rule::distinct_op,
363        Rule::map_op,
364        Rule::filter_op,
365        Rule::tap_op,
366        Rule::print_op,
367        Rule::log_op,
368        Rule::emit_op,
369        Rule::to_op,
370        Rule::pattern_op,
371        Rule::concurrent_op,
372        Rule::process_op,
373        Rule::on_error_op,
374        Rule::collect_op,
375        Rule::on_op,
376        Rule::within_op,
377        Rule::not_op,
378        Rule::fork_op,
379        Rule::any_op,
380        Rule::all_op,
381        Rule::first_op,
382        Rule::watermark_op,
383        Rule::allowed_lateness_op,
384        Rule::trend_aggregate_op,
385        Rule::score_op,
386        Rule::forecast_op,
387        Rule::enrich_op,
388    ];
389    positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
390}
391
392fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
393    let span = Span::new(pair.as_span().start(), pair.as_span().end());
394    let inner = pair.into_inner().expect_next("statement body")?;
395
396    let stmt = match inner.as_rule() {
397        Rule::context_decl => parse_context_decl(inner)?,
398        Rule::connector_decl => parse_connector_decl(inner)?,
399        Rule::stream_decl => parse_stream_decl(inner)?,
400        Rule::pattern_decl => parse_pattern_decl(inner)?,
401        Rule::event_decl => parse_event_decl(inner)?,
402        Rule::type_decl => parse_type_decl(inner)?,
403        Rule::var_decl => parse_var_decl(inner)?,
404        Rule::const_decl => parse_const_decl(inner)?,
405        Rule::fn_decl => parse_fn_decl(inner)?,
406        Rule::config_block => parse_config_block(inner)?,
407        Rule::import_stmt => parse_import_stmt(inner)?,
408        Rule::if_stmt => parse_if_stmt(inner)?,
409        Rule::for_stmt => parse_for_stmt(inner)?,
410        Rule::while_stmt => parse_while_stmt(inner)?,
411        Rule::return_stmt => parse_return_stmt(inner)?,
412        Rule::break_stmt => Stmt::Break,
413        Rule::continue_stmt => Stmt::Continue,
414        Rule::emit_stmt => parse_emit_stmt(inner)?,
415        Rule::assignment_stmt => {
416            let mut inner = inner.into_inner();
417            let name = inner.expect_next("variable name")?.as_str().to_string();
418            let value = parse_expr(inner.expect_next("assignment value")?)?;
419            Stmt::Assignment { name, value }
420        }
421        Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
422        _ => {
423            return Err(ParseError::UnexpectedToken {
424                position: span.start,
425                expected: "statement".to_string(),
426                found: format!("{:?}", inner.as_rule()),
427            })
428        }
429    };
430
431    Ok(Spanned::new(stmt, span))
432}
433
434// ============================================================================
435// Context Declaration Parsing
436// ============================================================================
437
438fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
439    let mut inner = pair.into_inner();
440    let name = inner.expect_next("context name")?.as_str().to_string();
441    let mut cores = None;
442
443    for p in inner {
444        if p.as_rule() == Rule::context_params {
445            for param in p.into_inner() {
446                if param.as_rule() == Rule::context_param {
447                    // Parse cores: [0, 1, 2]
448                    let core_ids: Vec<usize> = param
449                        .into_inner()
450                        .filter(|p| p.as_rule() == Rule::integer)
451                        .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
452                        .collect();
453                    cores = Some(core_ids);
454                }
455            }
456        }
457    }
458
459    Ok(Stmt::ContextDecl { name, cores })
460}
461
462// ============================================================================
463// Connector Parsing
464// ============================================================================
465
466fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
467    let mut inner = pair.into_inner();
468    let name = inner.expect_next("connector name")?.as_str().to_string();
469    let connector_type = inner.expect_next("connector type")?.as_str().to_string();
470    let mut params = Vec::new();
471
472    for p in inner {
473        if p.as_rule() == Rule::connector_params {
474            params = parse_connector_params(p)?;
475        }
476    }
477
478    Ok(Stmt::ConnectorDecl {
479        name,
480        connector_type,
481        params,
482    })
483}
484
485fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
486    let mut params = Vec::new();
487    for p in pair.into_inner() {
488        if p.as_rule() == Rule::connector_param {
489            let mut inner = p.into_inner();
490            let name = inner.expect_next("param name")?.as_str().to_string();
491            let value_pair = inner.expect_next("param value")?;
492            let value = parse_config_value(value_pair)?;
493            params.push(ConnectorParam { name, value });
494        }
495    }
496    Ok(params)
497}
498
499fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
500    let mut inner = pair.into_inner();
501    let name = inner.expect_next("stream name")?.as_str().to_string();
502
503    let mut type_annotation = None;
504    let mut source = StreamSource::Ident(String::new());
505    let mut ops = Vec::new();
506    let mut op_spans = Vec::new();
507
508    for p in inner {
509        match p.as_rule() {
510            Rule::type_annotation => {
511                type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
512            }
513            Rule::stream_expr => {
514                let (s, o, spans) = parse_stream_expr(p)?;
515                source = s;
516                ops = o;
517                op_spans = spans;
518            }
519            _ => {}
520        }
521    }
522
523    Ok(Stmt::StreamDecl {
524        name,
525        type_annotation,
526        source,
527        ops,
528        op_spans,
529    })
530}
531
532fn parse_stream_expr(
533    pair: pest::iterators::Pair<Rule>,
534) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
535    let mut inner = pair.into_inner();
536    let source = parse_stream_source(inner.expect_next("stream source")?)?;
537    let mut ops = Vec::new();
538    let mut op_spans = Vec::new();
539
540    for p in inner {
541        if p.as_rule() == Rule::stream_op {
542            let pest_span = p.as_span();
543            let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
544            ops.push(parse_stream_op(p)?);
545            op_spans.push(span);
546        }
547    }
548
549    Ok((source, ops, op_spans))
550}
551
552// ============================================================================
553// SASE+ Pattern Declaration Parsing
554// ============================================================================
555
556fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
557    let mut inner = pair.into_inner();
558    let name = inner.expect_next("pattern name")?.as_str().to_string();
559
560    let mut expr = SasePatternExpr::Event(String::new());
561    let mut within = None;
562    let mut partition_by = None;
563
564    for p in inner {
565        match p.as_rule() {
566            Rule::sase_pattern_expr => {
567                expr = parse_sase_pattern_expr(p)?;
568            }
569            Rule::pattern_within_clause => {
570                let dur_pair = p.into_inner().expect_next("within duration")?;
571                within = Some(Expr::Duration(
572                    parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
573                ));
574            }
575            Rule::pattern_partition_clause => {
576                let key = p
577                    .into_inner()
578                    .expect_next("partition key")?
579                    .as_str()
580                    .to_string();
581                partition_by = Some(Expr::Ident(key));
582            }
583            _ => {}
584        }
585    }
586
587    Ok(Stmt::PatternDecl {
588        name,
589        expr,
590        within,
591        partition_by,
592    })
593}
594
595fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
596    let inner = pair.into_inner().expect_next("SASE pattern expression")?;
597    parse_sase_or_expr(inner)
598}
599
600fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
601    let mut inner = pair.into_inner();
602    let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
603
604    for right_pair in inner {
605        let right = parse_sase_and_expr(right_pair)?;
606        left = SasePatternExpr::Or(Box::new(left), Box::new(right));
607    }
608
609    Ok(left)
610}
611
612fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
613    let mut inner = pair.into_inner();
614    let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
615
616    for right_pair in inner {
617        let right = parse_sase_not_expr(right_pair)?;
618        left = SasePatternExpr::And(Box::new(left), Box::new(right));
619    }
620
621    Ok(left)
622}
623
624fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
625    let mut inner = pair.into_inner();
626    let first = inner.expect_next("NOT or primary expression")?;
627
628    if first.as_str() == "NOT" {
629        let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
630        Ok(SasePatternExpr::Not(Box::new(expr)))
631    } else {
632        parse_sase_primary_expr(first)
633    }
634}
635
636fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
637    let inner = pair.into_inner().expect_next("SASE primary expression")?;
638
639    match inner.as_rule() {
640        Rule::sase_seq_expr => parse_sase_seq_expr(inner),
641        Rule::sase_grouped_expr => {
642            let nested = inner.into_inner().expect_next("grouped expression")?;
643            let expr = parse_sase_pattern_expr(nested)?;
644            Ok(SasePatternExpr::Group(Box::new(expr)))
645        }
646        Rule::sase_event_ref => parse_sase_event_ref(inner),
647        _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
648    }
649}
650
651fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
652    let mut items = Vec::new();
653
654    for p in pair.into_inner() {
655        if p.as_rule() == Rule::sase_seq_items {
656            for item in p.into_inner() {
657                if item.as_rule() == Rule::sase_seq_item {
658                    items.push(parse_sase_seq_item(item)?);
659                }
660            }
661        }
662    }
663
664    Ok(SasePatternExpr::Seq(items))
665}
666
667fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
668    let inner = pair.into_inner().expect_next("sequence item")?;
669
670    match inner.as_rule() {
671        Rule::sase_negated_item => parse_sase_item_inner(inner, true),
672        Rule::sase_positive_item => parse_sase_item_inner(inner, false),
673        _ => parse_sase_item_inner(inner, false),
674    }
675}
676
677fn parse_sase_item_inner(
678    pair: pest::iterators::Pair<Rule>,
679    _negated: bool,
680) -> ParseResult<SasePatternItem> {
681    let mut inner = pair.into_inner();
682    let event_type = inner.expect_next("event type")?.as_str().to_string();
683
684    let mut kleene = None;
685    let mut filter = None;
686    let mut alias = None;
687
688    for p in inner {
689        match p.as_rule() {
690            Rule::kleene_op => {
691                kleene = Some(match p.as_str() {
692                    "+" => KleeneOp::Plus,
693                    "*" => KleeneOp::Star,
694                    "?" => KleeneOp::Optional,
695                    _ => KleeneOp::Plus,
696                });
697            }
698            Rule::sase_where_clause => {
699                filter = Some(parse_expr(
700                    p.into_inner().expect_next("filter expression")?,
701                )?);
702            }
703            Rule::sase_alias_clause => {
704                alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
705            }
706            _ => {}
707        }
708    }
709
710    // For negated items, we prefix with "!" to indicate negation
711    // The runtime will interpret this
712    let event_type = if _negated {
713        format!("!{event_type}")
714    } else {
715        event_type
716    };
717
718    Ok(SasePatternItem {
719        event_type,
720        alias,
721        kleene,
722        filter,
723    })
724}
725
726fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
727    // Single event reference with optional kleene, where, alias
728    let item = parse_sase_item_inner(pair, false)?;
729
730    // If it's a simple event with no modifiers, return Event variant
731    if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
732        Ok(SasePatternExpr::Event(item.event_type))
733    } else {
734        // Otherwise wrap in a single-item Seq
735        Ok(SasePatternExpr::Seq(vec![item]))
736    }
737}
738
739fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
740    let inner = pair.into_inner().expect_next("stream source type")?;
741
742    match inner.as_rule() {
743        Rule::from_connector_source => {
744            let mut inner_iter = inner.into_inner();
745            let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
746            let connector_name = inner_iter
747                .expect_next("connector name")?
748                .as_str()
749                .to_string();
750            let mut params = Vec::new();
751            for p in inner_iter {
752                if p.as_rule() == Rule::connector_params {
753                    params = parse_connector_params(p)?;
754                }
755            }
756            Ok(StreamSource::FromConnector {
757                event_type,
758                connector_name,
759                params,
760            })
761        }
762        Rule::merge_source => {
763            let mut streams = Vec::new();
764            for p in inner.into_inner() {
765                if p.as_rule() == Rule::inline_stream_list {
766                    for is in p.into_inner() {
767                        streams.push(parse_inline_stream(is)?);
768                    }
769                }
770            }
771            Ok(StreamSource::Merge(streams))
772        }
773        Rule::join_source
774        | Rule::left_join_source
775        | Rule::right_join_source
776        | Rule::full_join_source => {
777            let join_type = match inner.as_rule() {
778                Rule::left_join_source => varpulis_core::ast::JoinType::Left,
779                Rule::right_join_source => varpulis_core::ast::JoinType::Right,
780                Rule::full_join_source => varpulis_core::ast::JoinType::Full,
781                _ => varpulis_core::ast::JoinType::Inner,
782            };
783            let mut clauses = Vec::new();
784            for p in inner.into_inner() {
785                if p.as_rule() == Rule::join_clause_list {
786                    for jc in p.into_inner() {
787                        clauses.push(parse_join_clause(jc, join_type)?);
788                    }
789                }
790            }
791            Ok(StreamSource::Join(clauses))
792        }
793        Rule::sequence_source => {
794            let decl =
795                parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
796            Ok(StreamSource::Sequence(decl))
797        }
798        Rule::timer_source => {
799            let timer_args = inner.into_inner().expect_next("timer arguments")?;
800            let decl = parse_timer_decl(timer_args)?;
801            Ok(StreamSource::Timer(decl))
802        }
803        Rule::all_source => {
804            let mut inner_iter = inner.into_inner();
805            let name = inner_iter.expect_next("event name")?.as_str().to_string();
806            let alias = inner_iter.next().map(|p| p.as_str().to_string());
807            Ok(StreamSource::AllWithAlias { name, alias })
808        }
809        Rule::aliased_source => {
810            let mut inner_iter = inner.into_inner();
811            let name = inner_iter.expect_next("event name")?.as_str().to_string();
812            let alias = inner_iter.expect_next("alias")?.as_str().to_string();
813            Ok(StreamSource::IdentWithAlias { name, alias })
814        }
815        Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
816        _ => Err(ParseError::UnexpectedToken {
817            position: 0,
818            expected: "stream source".to_string(),
819            found: format!("{:?}", inner.as_rule()),
820        }),
821    }
822}
823
824fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
825    let mut inner = pair.into_inner();
826
827    // Check if it's a simple identifier or full declaration
828    let first = inner.expect_next("stream identifier")?;
829    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
830        let name = first.as_str().to_string();
831        return Ok(InlineStreamDecl {
832            name: name.clone(),
833            source: name,
834            filter: None,
835        });
836    }
837
838    let name = first.as_str().to_string();
839    let source = inner.expect_next("stream source")?.as_str().to_string();
840    let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
841
842    Ok(InlineStreamDecl {
843        name,
844        source,
845        filter,
846    })
847}
848
849fn parse_join_clause(
850    pair: pest::iterators::Pair<Rule>,
851    join_type: varpulis_core::ast::JoinType,
852) -> ParseResult<JoinClause> {
853    let mut inner = pair.into_inner();
854
855    let first = inner.expect_next("join clause identifier")?;
856    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
857        let name = first.as_str().to_string();
858        return Ok(JoinClause {
859            name: name.clone(),
860            source: name,
861            on: None,
862            join_type,
863        });
864    }
865
866    let name = first.as_str().to_string();
867    let source = inner.expect_next("join source")?.as_str().to_string();
868    let on = inner.next().map(|p| parse_expr(p)).transpose()?;
869
870    Ok(JoinClause {
871        name,
872        source,
873        on,
874        join_type,
875    })
876}
877
878fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
879    let mut steps = Vec::new();
880
881    for p in pair.into_inner() {
882        if p.as_rule() == Rule::sequence_step {
883            steps.push(parse_sequence_step(p)?);
884        }
885    }
886
887    Ok(SequenceDecl {
888        match_all: false,
889        timeout: None,
890        steps,
891    })
892}
893
894fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
895    let mut inner = pair.into_inner();
896    let alias = inner.expect_next("step alias")?.as_str().to_string();
897    let event_type = inner.expect_next("event type")?.as_str().to_string();
898
899    let mut filter = None;
900    let mut timeout = None;
901
902    for p in inner {
903        match p.as_rule() {
904            Rule::or_expr => filter = Some(parse_expr(p)?),
905            Rule::within_suffix => {
906                let expr = p.into_inner().expect_next("within duration")?;
907                timeout = Some(Box::new(parse_expr(expr)?));
908            }
909            _ => {}
910        }
911    }
912
913    Ok(SequenceStepDecl {
914        alias,
915        event_type,
916        filter,
917        timeout,
918    })
919}
920
921fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
922    let mut inner = pair.into_inner();
923
924    // First argument is the interval (duration expression)
925    let interval = parse_expr(inner.expect_next("timer interval")?)?;
926
927    // Optional second argument is initial_delay (named argument)
928    let mut initial_delay = None;
929    for p in inner {
930        if p.as_rule() == Rule::named_arg {
931            let arg = parse_named_arg(p)?;
932            if arg.name == "initial_delay" {
933                initial_delay = Some(Box::new(arg.value));
934            }
935        }
936    }
937
938    Ok(TimerDecl {
939        interval,
940        initial_delay,
941    })
942}
943
944fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
945    let inner = pair.into_inner().expect_next("stream operation")?;
946
947    match inner.as_rule() {
948        Rule::dot_op => {
949            let op_inner = inner.into_inner().expect_next("dot operation")?;
950            parse_dot_op(op_inner)
951        }
952        Rule::followed_by_op => parse_followed_by_op(inner),
953        _ => Err(ParseError::UnexpectedToken {
954            position: 0,
955            expected: "stream operation".to_string(),
956            found: format!("{:?}", inner.as_rule()),
957        }),
958    }
959}
960
961fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
962    match pair.as_rule() {
963        Rule::context_op => {
964            let name = pair
965                .into_inner()
966                .expect_next("context name")?
967                .as_str()
968                .to_string();
969            Ok(StreamOp::Context(name))
970        }
971        Rule::where_op => {
972            let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
973            Ok(StreamOp::Where(expr))
974        }
975        Rule::select_op => {
976            let mut items = Vec::new();
977            for p in pair.into_inner() {
978                if p.as_rule() == Rule::select_list {
979                    for si in p.into_inner() {
980                        items.push(parse_select_item(si)?);
981                    }
982                }
983            }
984            Ok(StreamOp::Select(items))
985        }
986        Rule::window_op => {
987            let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
988            Ok(StreamOp::Window(args))
989        }
990        Rule::aggregate_op => {
991            let mut items = Vec::new();
992            for p in pair.into_inner() {
993                if p.as_rule() == Rule::agg_list {
994                    for ai in p.into_inner() {
995                        items.push(parse_agg_item(ai)?);
996                    }
997                }
998            }
999            Ok(StreamOp::Aggregate(items))
1000        }
1001        Rule::having_op => {
1002            let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
1003            Ok(StreamOp::Having(expr))
1004        }
1005        Rule::map_op => {
1006            let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
1007            Ok(StreamOp::Map(expr))
1008        }
1009        Rule::filter_op => {
1010            let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
1011            Ok(StreamOp::Filter(expr))
1012        }
1013        Rule::within_op => {
1014            let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
1015            Ok(StreamOp::Within(expr))
1016        }
1017        Rule::emit_op => {
1018            let mut output_type = None;
1019            let mut fields = Vec::new();
1020            let mut target_context = None;
1021            for p in pair.into_inner() {
1022                match p.as_rule() {
1023                    Rule::emit_type_cast => {
1024                        output_type = Some(
1025                            p.into_inner()
1026                                .expect_next("type name")?
1027                                .as_str()
1028                                .to_string(),
1029                        );
1030                    }
1031                    Rule::named_arg_list => {
1032                        for arg in p.into_inner() {
1033                            let parsed = parse_named_arg(arg)?;
1034                            // Extract `context: ctx_name` as target_context
1035                            if parsed.name == "context" {
1036                                if let Expr::Ident(ctx_name) = &parsed.value {
1037                                    target_context = Some(ctx_name.clone());
1038                                    continue;
1039                                }
1040                            }
1041                            fields.push(parsed);
1042                        }
1043                    }
1044                    _ => {}
1045                }
1046            }
1047            Ok(StreamOp::Emit {
1048                output_type,
1049                fields,
1050                target_context,
1051            })
1052        }
1053        Rule::print_op => {
1054            let exprs = pair
1055                .into_inner()
1056                .filter(|p| p.as_rule() == Rule::expr_list)
1057                .flat_map(|p| p.into_inner())
1058                .map(parse_expr)
1059                .collect::<ParseResult<Vec<_>>>()?;
1060            Ok(StreamOp::Print(exprs))
1061        }
1062        Rule::collect_op => Ok(StreamOp::Collect),
1063        Rule::pattern_op => {
1064            let def_pair = pair.into_inner().expect_next("pattern definition")?;
1065            let mut inner = def_pair.into_inner();
1066            let name = inner.expect_next("pattern name")?.as_str().to_string();
1067            let body_pair = inner.expect_next("pattern body")?;
1068
1069            // pattern_body can be lambda_expr or pattern_or_expr
1070            let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
1071            let matcher = match body_inner.as_rule() {
1072                Rule::lambda_expr => parse_lambda_expr(body_inner)?,
1073                Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
1074                _ => parse_expr_inner(body_inner)?,
1075            };
1076            Ok(StreamOp::Pattern(PatternDef { name, matcher }))
1077        }
1078        Rule::partition_by_op => {
1079            let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
1080            Ok(StreamOp::PartitionBy(expr))
1081        }
1082        Rule::order_by_op => {
1083            let mut items = Vec::new();
1084            for p in pair.into_inner() {
1085                if p.as_rule() == Rule::order_list {
1086                    for oi in p.into_inner() {
1087                        items.push(parse_order_item(oi)?);
1088                    }
1089                }
1090            }
1091            Ok(StreamOp::OrderBy(items))
1092        }
1093        Rule::limit_op => {
1094            let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1095            Ok(StreamOp::Limit(expr))
1096        }
1097        Rule::distinct_op => {
1098            let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1099            Ok(StreamOp::Distinct(expr))
1100        }
1101        Rule::tap_op => {
1102            let args = pair
1103                .into_inner()
1104                .filter(|p| p.as_rule() == Rule::named_arg_list)
1105                .flat_map(|p| p.into_inner())
1106                .map(parse_named_arg)
1107                .collect::<ParseResult<Vec<_>>>()?;
1108            Ok(StreamOp::Tap(args))
1109        }
1110        Rule::log_op => {
1111            let args = pair
1112                .into_inner()
1113                .filter(|p| p.as_rule() == Rule::named_arg_list)
1114                .flat_map(|p| p.into_inner())
1115                .map(parse_named_arg)
1116                .collect::<ParseResult<Vec<_>>>()?;
1117            Ok(StreamOp::Log(args))
1118        }
1119        Rule::to_op => {
1120            let mut inner = pair.into_inner();
1121            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1122            let mut params = Vec::new();
1123            for p in inner {
1124                if p.as_rule() == Rule::connector_params {
1125                    params = parse_connector_params(p)?;
1126                }
1127            }
1128            Ok(StreamOp::To {
1129                connector_name,
1130                params,
1131            })
1132        }
1133        Rule::process_op => {
1134            let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1135            Ok(StreamOp::Process(expr))
1136        }
1137        Rule::on_error_op => {
1138            let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1139            Ok(StreamOp::OnError(expr))
1140        }
1141        Rule::on_op => {
1142            let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1143            Ok(StreamOp::On(expr))
1144        }
1145        Rule::not_op => {
1146            let mut inner = pair.into_inner();
1147            let event_type = inner.expect_next("event type")?.as_str().to_string();
1148            let filter = inner.next().map(parse_expr).transpose()?;
1149            Ok(StreamOp::Not(FollowedByClause {
1150                event_type,
1151                filter,
1152                alias: None,
1153                match_all: false,
1154            }))
1155        }
1156        Rule::fork_op => {
1157            let mut paths = Vec::new();
1158            for p in pair.into_inner() {
1159                if p.as_rule() == Rule::fork_path_list {
1160                    for fp in p.into_inner() {
1161                        paths.push(parse_fork_path(fp)?);
1162                    }
1163                }
1164            }
1165            Ok(StreamOp::Fork(paths))
1166        }
1167        Rule::any_op => {
1168            let count = pair
1169                .into_inner()
1170                .next()
1171                .map(|p| p.as_str().parse().unwrap_or(1));
1172            Ok(StreamOp::Any(count))
1173        }
1174        Rule::all_op => Ok(StreamOp::All),
1175        Rule::first_op => Ok(StreamOp::First),
1176        Rule::concurrent_op => {
1177            let args = pair
1178                .into_inner()
1179                .filter(|p| p.as_rule() == Rule::named_arg_list)
1180                .flat_map(|p| p.into_inner())
1181                .map(parse_named_arg)
1182                .collect::<ParseResult<Vec<_>>>()?;
1183            Ok(StreamOp::Concurrent(args))
1184        }
1185        Rule::watermark_op => {
1186            let args = pair
1187                .into_inner()
1188                .filter(|p| p.as_rule() == Rule::named_arg_list)
1189                .flat_map(|p| p.into_inner())
1190                .map(parse_named_arg)
1191                .collect::<ParseResult<Vec<_>>>()?;
1192            Ok(StreamOp::Watermark(args))
1193        }
1194        Rule::allowed_lateness_op => {
1195            let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1196            Ok(StreamOp::AllowedLateness(expr))
1197        }
1198        Rule::trend_aggregate_op => {
1199            let mut items = Vec::new();
1200            for p in pair.into_inner() {
1201                if p.as_rule() == Rule::trend_agg_list {
1202                    for item_pair in p.into_inner() {
1203                        if item_pair.as_rule() == Rule::trend_agg_item {
1204                            let mut inner = item_pair.into_inner();
1205                            let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1206                            let func_pair = inner.expect_next("trend agg function")?;
1207                            let mut func_inner = func_pair.into_inner();
1208                            let func_name = func_inner
1209                                .expect_next("function name")?
1210                                .as_str()
1211                                .to_string();
1212                            let arg = func_inner.next().map(parse_expr).transpose()?;
1213                            items.push(TrendAggItem {
1214                                alias,
1215                                func: func_name,
1216                                arg,
1217                            });
1218                        }
1219                    }
1220                }
1221            }
1222            Ok(StreamOp::TrendAggregate(items))
1223        }
1224        Rule::forecast_op => {
1225            let mut confidence = None;
1226            let mut horizon = None;
1227            let mut warmup = None;
1228            let mut max_depth = None;
1229            let mut hawkes = None;
1230            let mut conformal = None;
1231            let mut mode = None;
1232            for p in pair.into_inner() {
1233                if p.as_rule() == Rule::forecast_params {
1234                    for param_pair in p.into_inner() {
1235                        if param_pair.as_rule() == Rule::forecast_param {
1236                            let mut inner = param_pair.into_inner();
1237                            let name = inner.expect_next("forecast param name")?.as_str();
1238                            let value_pair = inner.expect_next("forecast param value")?;
1239                            let expr = parse_expr(value_pair)?;
1240                            match name {
1241                                "confidence" => confidence = Some(expr),
1242                                "horizon" => horizon = Some(expr),
1243                                "warmup" => warmup = Some(expr),
1244                                "max_depth" => max_depth = Some(expr),
1245                                "hawkes" => hawkes = Some(expr),
1246                                "conformal" => conformal = Some(expr),
1247                                "mode" => mode = Some(expr),
1248                                _ => {}
1249                            }
1250                        }
1251                    }
1252                }
1253            }
1254            Ok(StreamOp::Forecast(ForecastSpec {
1255                confidence,
1256                horizon,
1257                warmup,
1258                max_depth,
1259                hawkes,
1260                conformal,
1261                mode,
1262            }))
1263        }
1264        Rule::enrich_op => {
1265            let mut inner = pair.into_inner();
1266            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1267            let mut key_expr = None;
1268            let mut fields = Vec::new();
1269            let mut cache_ttl = None;
1270            let mut timeout = None;
1271            let mut fallback = None;
1272            for p in inner {
1273                if p.as_rule() == Rule::enrich_params {
1274                    for param_pair in p.into_inner() {
1275                        if param_pair.as_rule() == Rule::enrich_param {
1276                            let param_inner =
1277                                param_pair.into_inner().expect_next("enrich param")?;
1278                            match param_inner.as_rule() {
1279                                Rule::enrich_key_param => {
1280                                    let expr_pair =
1281                                        param_inner.into_inner().expect_next("key expression")?;
1282                                    key_expr = Some(parse_expr(expr_pair)?);
1283                                }
1284                                Rule::enrich_fields_param => {
1285                                    for field in param_inner.into_inner() {
1286                                        if field.as_rule() == Rule::identifier {
1287                                            fields.push(field.as_str().to_string());
1288                                        }
1289                                    }
1290                                }
1291                                Rule::enrich_cache_ttl_param => {
1292                                    let expr_pair = param_inner
1293                                        .into_inner()
1294                                        .expect_next("cache_ttl expression")?;
1295                                    cache_ttl = Some(parse_expr(expr_pair)?);
1296                                }
1297                                Rule::enrich_timeout_param => {
1298                                    let expr_pair = param_inner
1299                                        .into_inner()
1300                                        .expect_next("timeout expression")?;
1301                                    timeout = Some(parse_expr(expr_pair)?);
1302                                }
1303                                Rule::enrich_fallback_param => {
1304                                    let literal_pair =
1305                                        param_inner.into_inner().expect_next("fallback literal")?;
1306                                    fallback = Some(parse_expr(literal_pair)?);
1307                                }
1308                                _ => {}
1309                            }
1310                        }
1311                    }
1312                }
1313            }
1314            let key = key_expr.ok_or_else(|| ParseError::Located {
1315                line: 0,
1316                column: 0,
1317                position: 0,
1318                message: ".enrich() requires a key: parameter".to_string(),
1319                hint: Some("add key: <expression> to .enrich()".to_string()),
1320            })?;
1321            Ok(StreamOp::Enrich(EnrichSpec {
1322                connector_name,
1323                key_expr: Box::new(key),
1324                fields,
1325                cache_ttl,
1326                timeout,
1327                fallback,
1328            }))
1329        }
1330        Rule::score_op => {
1331            let mut model_path = String::new();
1332            let mut inputs = Vec::new();
1333            let mut outputs = Vec::new();
1334            let mut gpu = false;
1335            let mut batch_size: usize = 1;
1336            let mut device_id: i32 = 0;
1337            for p in pair.into_inner() {
1338                if p.as_rule() == Rule::score_params {
1339                    for param_pair in p.into_inner() {
1340                        if param_pair.as_rule() == Rule::score_param {
1341                            let mut inner = param_pair.into_inner();
1342                            let name = inner.expect_next("score param name")?.as_str();
1343                            let value_pair = inner.expect_next("score param value")?;
1344                            match name {
1345                                "model" => {
1346                                    let raw = value_pair.as_str();
1347                                    model_path = raw.trim_matches('"').to_string();
1348                                }
1349                                "inputs" => {
1350                                    if value_pair.as_rule() == Rule::score_field_list {
1351                                        for field in value_pair.into_inner() {
1352                                            if field.as_rule() == Rule::identifier {
1353                                                inputs.push(field.as_str().to_string());
1354                                            }
1355                                        }
1356                                    }
1357                                }
1358                                "outputs" => {
1359                                    if value_pair.as_rule() == Rule::score_field_list {
1360                                        for field in value_pair.into_inner() {
1361                                            if field.as_rule() == Rule::identifier {
1362                                                outputs.push(field.as_str().to_string());
1363                                            }
1364                                        }
1365                                    }
1366                                }
1367                                "gpu" => {
1368                                    gpu = value_pair.as_str() == "true";
1369                                }
1370                                "batch_size" => {
1371                                    batch_size = value_pair.as_str().parse().unwrap_or(1);
1372                                }
1373                                "device" | "device_id" => {
1374                                    device_id = value_pair.as_str().parse().unwrap_or(0);
1375                                }
1376                                _ => {}
1377                            }
1378                        }
1379                    }
1380                }
1381            }
1382            Ok(StreamOp::Score(ScoreSpec {
1383                model_path,
1384                inputs,
1385                outputs,
1386                gpu,
1387                batch_size,
1388                device_id,
1389            }))
1390        }
1391        _ => Err(ParseError::UnexpectedToken {
1392            position: 0,
1393            expected: "stream operation".to_string(),
1394            found: format!("{:?}", pair.as_rule()),
1395        }),
1396    }
1397}
1398
1399fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1400    let mut inner = pair.into_inner();
1401    let expr = parse_expr(inner.expect_next("order expression")?)?;
1402    let desc = inner.next().is_some_and(|p| p.as_str() == "desc");
1403    Ok(OrderItem {
1404        expr,
1405        descending: desc,
1406    })
1407}
1408
1409fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1410    let mut inner = pair.into_inner();
1411    let name = inner.expect_next("fork path name")?.as_str().to_string();
1412    let mut ops = Vec::new();
1413    for p in inner {
1414        if p.as_rule() == Rule::stream_op {
1415            ops.push(parse_stream_op(p)?);
1416        }
1417    }
1418    Ok(ForkPath { name, ops })
1419}
1420
1421fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1422    let mut inner = pair.into_inner();
1423    let mut match_all = false;
1424
1425    let first = inner.expect_next("event type or match_all")?;
1426    let event_type = if first.as_rule() == Rule::match_all_keyword {
1427        match_all = true;
1428        inner.expect_next("event type")?.as_str().to_string()
1429    } else {
1430        first.as_str().to_string()
1431    };
1432
1433    let mut filter = None;
1434    let mut alias = None;
1435
1436    for p in inner {
1437        match p.as_rule() {
1438            Rule::or_expr => filter = Some(parse_or_expr(p)?),
1439            Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1440            Rule::identifier => alias = Some(p.as_str().to_string()),
1441            _ => {}
1442        }
1443    }
1444
1445    Ok(StreamOp::FollowedBy(FollowedByClause {
1446        event_type,
1447        filter,
1448        alias,
1449        match_all,
1450    }))
1451}
1452
1453fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1454    let mut inner = pair.into_inner();
1455    let first = inner.expect_next("select field or alias")?;
1456
1457    if let Some(second) = inner.next() {
1458        Ok(SelectItem::Alias(
1459            first.as_str().to_string(),
1460            parse_expr(second)?,
1461        ))
1462    } else {
1463        Ok(SelectItem::Field(first.as_str().to_string()))
1464    }
1465}
1466
1467fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1468    let raw = pair.as_str().trim();
1469    let is_session = raw.starts_with("session");
1470
1471    let mut inner = pair.into_inner();
1472
1473    if is_session {
1474        // session: <gap_expr>
1475        let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1476        return Ok(WindowArgs {
1477            duration: gap_expr.clone(),
1478            sliding: None,
1479            policy: None,
1480            session_gap: Some(gap_expr),
1481        });
1482    }
1483
1484    let duration = parse_expr(inner.expect_next("window duration")?)?;
1485
1486    let mut sliding = None;
1487    let mut policy = None;
1488
1489    for p in inner {
1490        if p.as_rule() == Rule::expr {
1491            // Need to determine if it's sliding or policy based on context
1492            if sliding.is_none() {
1493                sliding = Some(parse_expr(p)?);
1494            } else {
1495                policy = Some(parse_expr(p)?);
1496            }
1497        }
1498    }
1499
1500    Ok(WindowArgs {
1501        duration,
1502        sliding,
1503        policy,
1504        session_gap: None,
1505    })
1506}
1507
1508fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1509    let mut inner = pair.into_inner();
1510    let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1511    let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1512    Ok(AggItem { alias, expr })
1513}
1514
1515fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1516    let mut inner = pair.into_inner();
1517    let name = inner.expect_next("argument name")?.as_str().to_string();
1518    let value = parse_expr(inner.expect_next("argument value")?)?;
1519    Ok(NamedArg { name, value })
1520}
1521
1522fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1523    let mut inner = pair.into_inner();
1524    let name = inner.expect_next("event name")?.as_str().to_string();
1525
1526    let mut extends = None;
1527    let mut fields = Vec::new();
1528
1529    for p in inner {
1530        match p.as_rule() {
1531            Rule::identifier => extends = Some(p.as_str().to_string()),
1532            Rule::field => fields.push(parse_field(p)?),
1533            _ => {}
1534        }
1535    }
1536
1537    Ok(Stmt::EventDecl {
1538        name,
1539        extends,
1540        fields,
1541    })
1542}
1543
1544fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1545    let mut inner = pair.into_inner();
1546    let name = inner.expect_next("field name")?.as_str().to_string();
1547    let ty = parse_type(inner.expect_next("field type")?)?;
1548    let optional = inner.next().is_some();
1549    Ok(Field { name, ty, optional })
1550}
1551
1552fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1553    let mut inner = pair.into_inner();
1554    let name = inner.expect_next("type name")?.as_str().to_string();
1555    let next = inner.expect_next("type body")?;
1556    match next.as_rule() {
1557        Rule::field => {
1558            let mut fields = vec![parse_field(next)?];
1559            for p in inner {
1560                if p.as_rule() == Rule::field {
1561                    fields.push(parse_field(p)?);
1562                }
1563            }
1564            Ok(Stmt::TypeDecl {
1565                name,
1566                ty: None,
1567                fields,
1568            })
1569        }
1570        _ => Ok(Stmt::TypeDecl {
1571            name,
1572            ty: Some(parse_type(next)?),
1573            fields: vec![],
1574        }),
1575    }
1576}
1577
1578fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1579    let cloned = pair.clone();
1580    let inner = pair.into_inner().next().unwrap_or(cloned);
1581
1582    match inner.as_rule() {
1583        Rule::primitive_type => match inner.as_str() {
1584            "int" => Ok(Type::Int),
1585            "float" => Ok(Type::Float),
1586            "bool" => Ok(Type::Bool),
1587            "str" => Ok(Type::Str),
1588            "timestamp" => Ok(Type::Timestamp),
1589            "duration" => Ok(Type::Duration),
1590            _ => Ok(Type::Named(inner.as_str().to_string())),
1591        },
1592        Rule::array_type => {
1593            let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1594            Ok(Type::Array(Box::new(inner_type)))
1595        }
1596        Rule::map_type => {
1597            let mut inner_pairs = inner.into_inner();
1598            let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1599            let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1600            Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1601        }
1602        Rule::tuple_type => {
1603            let types: Vec<Type> = inner
1604                .into_inner()
1605                .map(parse_type)
1606                .collect::<ParseResult<Vec<_>>>()?;
1607            Ok(Type::Tuple(types))
1608        }
1609        Rule::stream_type => {
1610            let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1611            Ok(Type::Stream(Box::new(inner_type)))
1612        }
1613        Rule::optional_type => {
1614            let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1615            Ok(Type::Optional(Box::new(inner_type)))
1616        }
1617        Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1618        _ => Ok(Type::Named(inner.as_str().to_string())),
1619    }
1620}
1621
1622fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1623    let mut inner = pair.into_inner();
1624    let keyword = inner.expect_next("var_keyword")?.as_str();
1625    let mutable = keyword == "var";
1626    let name = inner.expect_next("variable name")?.as_str().to_string();
1627
1628    let mut ty = None;
1629    let mut value = Expr::Null;
1630
1631    for p in inner {
1632        match p.as_rule() {
1633            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1634            _ => value = parse_expr(p)?,
1635        }
1636    }
1637
1638    Ok(Stmt::VarDecl {
1639        mutable,
1640        name,
1641        ty,
1642        value,
1643    })
1644}
1645
1646fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1647    let mut inner = pair.into_inner();
1648    let name = inner.expect_next("constant name")?.as_str().to_string();
1649
1650    let mut ty = None;
1651    let mut value = Expr::Null;
1652
1653    for p in inner {
1654        match p.as_rule() {
1655            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1656            _ => value = parse_expr(p)?,
1657        }
1658    }
1659
1660    Ok(Stmt::ConstDecl { name, ty, value })
1661}
1662
1663fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1664    let mut inner = pair.into_inner();
1665    let name = inner.expect_next("function name")?.as_str().to_string();
1666
1667    let mut params = Vec::new();
1668    let mut ret = None;
1669    let mut body = Vec::new();
1670
1671    for p in inner {
1672        match p.as_rule() {
1673            Rule::param_list => {
1674                for param in p.into_inner() {
1675                    params.push(parse_param(param)?);
1676                }
1677            }
1678            Rule::type_expr => ret = Some(parse_type(p)?),
1679            Rule::block => body = parse_block(p)?,
1680            Rule::statement => body.push(parse_statement(p)?),
1681            _ => {}
1682        }
1683    }
1684
1685    Ok(Stmt::FnDecl {
1686        name,
1687        params,
1688        ret,
1689        body,
1690    })
1691}
1692
1693fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1694    let mut statements = Vec::new();
1695    for p in pair.into_inner() {
1696        if p.as_rule() == Rule::statement {
1697            statements.push(parse_statement(p)?);
1698        }
1699    }
1700    Ok(statements)
1701}
1702
1703fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1704    let mut inner = pair.into_inner();
1705    let name = inner.expect_next("parameter name")?.as_str().to_string();
1706    let ty = parse_type(inner.expect_next("parameter type")?)?;
1707    Ok(Param { name, ty })
1708}
1709
1710fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1711    let mut inner = pair.into_inner();
1712    let first = inner.expect_next("config name or item")?;
1713
1714    // Check if first token is identifier (new syntax) or config_item (old syntax)
1715    let (name, items_start) = if first.as_rule() == Rule::identifier {
1716        (first.as_str().to_string(), None)
1717    } else {
1718        // Old syntax: config: with indentation - use "default" as name
1719        ("default".to_string(), Some(first))
1720    };
1721
1722    let mut items = Vec::new();
1723
1724    // If we have a config_item from old syntax, parse it first
1725    if let Some(first_item) = items_start {
1726        if first_item.as_rule() == Rule::config_item {
1727            items.push(parse_config_item(first_item)?);
1728        }
1729    }
1730
1731    for p in inner {
1732        if p.as_rule() == Rule::config_item {
1733            items.push(parse_config_item(p)?);
1734        }
1735    }
1736    Ok(Stmt::Config { name, items })
1737}
1738
1739fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1740    let mut inner = pair.into_inner();
1741    let key = inner.expect_next("config key")?.as_str().to_string();
1742    let value = parse_config_value(inner.expect_next("config value")?)?;
1743    Ok(ConfigItem::Value(key, value))
1744}
1745
1746fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1747    let cloned = pair.clone();
1748    let inner = pair.into_inner().next().unwrap_or(cloned);
1749
1750    match inner.as_rule() {
1751        Rule::config_concat => {
1752            let parts: Vec<ConfigValue> = inner
1753                .into_inner()
1754                .map(|atom| {
1755                    // Each atom is a config_atom containing either string or identifier
1756                    let inner_atom = atom.into_inner().next().expect("config_atom child");
1757                    match inner_atom.as_rule() {
1758                        Rule::string => {
1759                            let s = inner_atom.as_str();
1760                            Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1761                        }
1762                        Rule::identifier => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1763                        _ => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1764                    }
1765                })
1766                .collect::<ParseResult<Vec<_>>>()?;
1767            Ok(ConfigValue::Concat(parts))
1768        }
1769        Rule::config_array => {
1770            let values: Vec<ConfigValue> = inner
1771                .into_inner()
1772                .map(parse_config_value)
1773                .collect::<ParseResult<Vec<_>>>()?;
1774            Ok(ConfigValue::Array(values))
1775        }
1776        Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1777        Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1778        Rule::string => {
1779            let s = inner.as_str();
1780            Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1781        }
1782        Rule::duration => Ok(ConfigValue::Duration(
1783            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1784        )),
1785        Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1786        Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1787        _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1788    }
1789}
1790
1791fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1792    let mut inner = pair.into_inner();
1793    let path_pair = inner.expect_next("import path")?;
1794    let path = path_pair.as_str();
1795    let path = path[1..path.len() - 1].to_string();
1796    let alias = inner.next().map(|p| p.as_str().to_string());
1797    Ok(Stmt::Import { path, alias })
1798}
1799
1800fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1801    let mut inner = pair.into_inner();
1802    let cond = parse_expr(inner.expect_next("if condition")?)?;
1803
1804    let mut then_branch = Vec::new();
1805    let mut elif_branches = Vec::new();
1806    let mut else_branch = None;
1807
1808    for p in inner {
1809        match p.as_rule() {
1810            Rule::block => then_branch = parse_block(p)?,
1811            Rule::statement => then_branch.push(parse_statement(p)?),
1812            Rule::elif_clause => {
1813                let mut elif_inner = p.into_inner();
1814                let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1815                let mut elif_body = Vec::new();
1816                for ep in elif_inner {
1817                    match ep.as_rule() {
1818                        Rule::block => elif_body = parse_block(ep)?,
1819                        Rule::statement => elif_body.push(parse_statement(ep)?),
1820                        _ => {}
1821                    }
1822                }
1823                elif_branches.push((elif_cond, elif_body));
1824            }
1825            Rule::else_clause => {
1826                let mut else_body = Vec::new();
1827                for ep in p.into_inner() {
1828                    match ep.as_rule() {
1829                        Rule::block => else_body = parse_block(ep)?,
1830                        Rule::statement => else_body.push(parse_statement(ep)?),
1831                        _ => {}
1832                    }
1833                }
1834                else_branch = Some(else_body);
1835            }
1836            _ => {}
1837        }
1838    }
1839
1840    Ok(Stmt::If {
1841        cond,
1842        then_branch,
1843        elif_branches,
1844        else_branch,
1845    })
1846}
1847
1848fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1849    let mut inner = pair.into_inner();
1850    let var = inner.expect_next("loop variable")?.as_str().to_string();
1851    let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1852    let mut body = Vec::new();
1853    for p in inner {
1854        match p.as_rule() {
1855            Rule::block => body = parse_block(p)?,
1856            Rule::statement => body.push(parse_statement(p)?),
1857            _ => {}
1858        }
1859    }
1860    Ok(Stmt::For { var, iter, body })
1861}
1862
1863fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1864    let mut inner = pair.into_inner();
1865    let cond = parse_expr(inner.expect_next("while condition")?)?;
1866    let mut body = Vec::new();
1867    for p in inner {
1868        match p.as_rule() {
1869            Rule::block => body = parse_block(p)?,
1870            Rule::statement => body.push(parse_statement(p)?),
1871            _ => {}
1872        }
1873    }
1874    Ok(Stmt::While { cond, body })
1875}
1876
1877fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1878    let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1879    Ok(Stmt::Return(expr))
1880}
1881
1882fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1883    let mut inner = pair.into_inner();
1884    let event_type = inner.expect_next("event type name")?.as_str().to_string();
1885    let mut fields = Vec::new();
1886    for p in inner {
1887        if p.as_rule() == Rule::named_arg_list {
1888            for arg in p.into_inner() {
1889                fields.push(parse_named_arg(arg)?);
1890            }
1891        }
1892    }
1893    Ok(Stmt::Emit { event_type, fields })
1894}
1895
1896// ============================================================================
1897// Expression Parsing
1898// ============================================================================
1899
1900fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1901    let inner = pair.into_inner().next();
1902
1903    match inner {
1904        Some(p) => parse_expr_inner(p),
1905        None => Ok(Expr::Null),
1906    }
1907}
1908
1909fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1910    match pair.as_rule() {
1911        Rule::expr => parse_expr(pair),
1912        Rule::lambda_expr => parse_lambda_expr(pair),
1913        Rule::range_expr => parse_range_expr(pair),
1914        Rule::or_expr => parse_or_expr(pair),
1915        Rule::and_expr => parse_and_expr(pair),
1916        Rule::not_expr => parse_not_expr(pair),
1917        Rule::comparison_expr => parse_comparison_expr(pair),
1918        Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1919        Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1920        Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1921        Rule::shift_expr => parse_shift_expr(pair),
1922        Rule::additive_expr => parse_additive_expr(pair),
1923        Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1924        Rule::power_expr => parse_power_expr(pair),
1925        Rule::unary_expr => parse_unary_expr(pair),
1926        Rule::postfix_expr => parse_postfix_expr(pair),
1927        Rule::primary_expr => parse_primary_expr(pair),
1928        Rule::literal => parse_literal(pair),
1929        Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1930        Rule::if_expr => parse_if_expr(pair),
1931        _ => Ok(Expr::Ident(pair.as_str().to_string())),
1932    }
1933}
1934
1935fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1936    let mut inner = pair.into_inner();
1937    let mut params = Vec::new();
1938
1939    // Parse parameters
1940    let first = inner.expect_next("lambda parameters")?;
1941    match first.as_rule() {
1942        Rule::identifier_list => {
1943            for p in first.into_inner() {
1944                params.push(p.as_str().to_string());
1945            }
1946        }
1947        Rule::identifier => {
1948            params.push(first.as_str().to_string());
1949        }
1950        _ => {}
1951    }
1952
1953    // Parse body - could be expression or block
1954    let body_pair = inner.expect_next("lambda body")?;
1955    let body = match body_pair.as_rule() {
1956        Rule::lambda_block => parse_lambda_block(body_pair)?,
1957        _ => parse_expr_inner(body_pair)?,
1958    };
1959
1960    Ok(Expr::Lambda {
1961        params,
1962        body: Box::new(body),
1963    })
1964}
1965
1966fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1967    let mut stmts = Vec::new();
1968    let mut final_expr = None;
1969
1970    for p in pair.into_inner() {
1971        match p.as_rule() {
1972            Rule::statement => {
1973                // Check if it's a var_decl to extract for Block
1974                let stmt = parse_statement(p)?;
1975                match &stmt.node {
1976                    Stmt::VarDecl {
1977                        mutable,
1978                        name,
1979                        ty,
1980                        value,
1981                    } => {
1982                        stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
1983                    }
1984                    Stmt::Expr(e) => {
1985                        // Last expression becomes result
1986                        final_expr = Some(e.clone());
1987                    }
1988                    _ => {
1989                        // Other statements - treat as expression if possible
1990                    }
1991                }
1992            }
1993            _ => {
1994                // Expression at end of block
1995                final_expr = Some(parse_expr_inner(p)?);
1996            }
1997        }
1998    }
1999
2000    // If we have variable declarations, wrap in a Block expression
2001    if stmts.is_empty() {
2002        Ok(final_expr.unwrap_or(Expr::Null))
2003    } else {
2004        Ok(Expr::Block {
2005            stmts,
2006            result: Box::new(final_expr.unwrap_or(Expr::Null)),
2007        })
2008    }
2009}
2010
2011fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2012    // Convert pattern_or_expr to an Expr representation
2013    // pattern_or_expr = pattern_and_expr ~ ("or" ~ pattern_and_expr)*
2014    let mut inner = pair.into_inner();
2015    let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
2016
2017    for right_pair in inner {
2018        let right = parse_pattern_and_as_expr(right_pair)?;
2019        left = Expr::Binary {
2020            op: BinOp::Or,
2021            left: Box::new(left),
2022            right: Box::new(right),
2023        };
2024    }
2025    Ok(left)
2026}
2027
2028fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2029    let mut inner = pair.into_inner();
2030    let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
2031
2032    for right_pair in inner {
2033        let right = parse_pattern_xor_as_expr(right_pair)?;
2034        left = Expr::Binary {
2035            op: BinOp::And,
2036            left: Box::new(left),
2037            right: Box::new(right),
2038        };
2039    }
2040    Ok(left)
2041}
2042
2043fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2044    let mut inner = pair.into_inner();
2045    let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
2046
2047    for right_pair in inner {
2048        let right = parse_pattern_unary_as_expr(right_pair)?;
2049        left = Expr::Binary {
2050            op: BinOp::Xor,
2051            left: Box::new(left),
2052            right: Box::new(right),
2053        };
2054    }
2055    Ok(left)
2056}
2057
2058fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2059    let mut inner = pair.into_inner();
2060    let first = inner.expect_next("unary expression or operand")?;
2061
2062    if first.as_str() == "not" {
2063        let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
2064        Ok(Expr::Unary {
2065            op: UnaryOp::Not,
2066            expr: Box::new(expr),
2067        })
2068    } else {
2069        parse_pattern_primary_as_expr(first)
2070    }
2071}
2072
2073fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2074    let inner = pair
2075        .into_inner()
2076        .expect_next("pattern primary expression")?;
2077
2078    match inner.as_rule() {
2079        Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
2080        Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
2081        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2082    }
2083}
2084
2085fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2086    // pattern_sequence = identifier ~ ("->" ~ identifier)*
2087    // Convert to a chain of FollowedBy binary operations
2088    let mut inner = pair.into_inner();
2089    let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
2090
2091    for right_pair in inner {
2092        let right = Expr::Ident(right_pair.as_str().to_string());
2093        left = Expr::Binary {
2094            op: BinOp::FollowedBy,
2095            left: Box::new(left),
2096            right: Box::new(right),
2097        };
2098    }
2099    Ok(left)
2100}
2101
2102fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2103    let inner = pair.into_inner().expect_next("filter expression")?;
2104    parse_filter_or_expr(inner)
2105}
2106
2107fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2108    let mut inner = pair.into_inner();
2109    let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
2110
2111    for right_pair in inner {
2112        let right = parse_filter_and_expr(right_pair)?;
2113        left = Expr::Binary {
2114            op: BinOp::Or,
2115            left: Box::new(left),
2116            right: Box::new(right),
2117        };
2118    }
2119    Ok(left)
2120}
2121
2122fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2123    let mut inner = pair.into_inner();
2124    let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2125
2126    for right_pair in inner {
2127        let right = parse_filter_not_expr(right_pair)?;
2128        left = Expr::Binary {
2129            op: BinOp::And,
2130            left: Box::new(left),
2131            right: Box::new(right),
2132        };
2133    }
2134    Ok(left)
2135}
2136
2137fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2138    let mut inner = pair.into_inner();
2139    let first = inner.expect_next("not or expression")?;
2140
2141    if first.as_str() == "not" {
2142        let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2143        Ok(Expr::Unary {
2144            op: UnaryOp::Not,
2145            expr: Box::new(expr),
2146        })
2147    } else {
2148        parse_filter_comparison_expr(first)
2149    }
2150}
2151
2152fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2153    let mut inner = pair.into_inner();
2154    let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2155
2156    if let Some(op_pair) = inner.next() {
2157        let op = match op_pair.as_str() {
2158            "==" => BinOp::Eq,
2159            "!=" => BinOp::NotEq,
2160            "<" => BinOp::Lt,
2161            "<=" => BinOp::Le,
2162            ">" => BinOp::Gt,
2163            ">=" => BinOp::Ge,
2164            "in" => BinOp::In,
2165            "is" => BinOp::Is,
2166            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2167            _ => BinOp::Eq,
2168        };
2169        let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2170        Ok(Expr::Binary {
2171            op,
2172            left: Box::new(left),
2173            right: Box::new(right),
2174        })
2175    } else {
2176        Ok(left)
2177    }
2178}
2179
2180fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2181    let mut inner = pair.into_inner();
2182    let mut left =
2183        parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2184
2185    while let Some(op_pair) = inner.next() {
2186        let op = if op_pair.as_str() == "-" {
2187            BinOp::Sub
2188        } else {
2189            BinOp::Add
2190        };
2191        if let Some(right_pair) = inner.next() {
2192            let right = parse_filter_multiplicative_expr(right_pair)?;
2193            left = Expr::Binary {
2194                op,
2195                left: Box::new(left),
2196                right: Box::new(right),
2197            };
2198        }
2199    }
2200    Ok(left)
2201}
2202
2203fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2204    let mut inner = pair.into_inner();
2205    let mut left =
2206        parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2207
2208    while let Some(op_pair) = inner.next() {
2209        let op = match op_pair.as_str() {
2210            "*" => BinOp::Mul,
2211            "/" => BinOp::Div,
2212            "%" => BinOp::Mod,
2213            _ => BinOp::Mul,
2214        };
2215        if let Some(right_pair) = inner.next() {
2216            let right = parse_filter_unary_expr(right_pair)?;
2217            left = Expr::Binary {
2218                op,
2219                left: Box::new(left),
2220                right: Box::new(right),
2221            };
2222        }
2223    }
2224    Ok(left)
2225}
2226
2227fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2228    let mut inner = pair.into_inner();
2229    let first = inner.expect_next("unary operator or expression")?;
2230
2231    // Check if first is a unary operator or the expression itself
2232    if first.as_rule() == Rule::filter_unary_op {
2233        let op_str = first.as_str();
2234        let expr =
2235            parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2236        let op = match op_str {
2237            "-" => UnaryOp::Neg,
2238            "~" => UnaryOp::BitNot,
2239            _ => unreachable!("Grammar only allows - or ~"),
2240        };
2241        Ok(Expr::Unary {
2242            op,
2243            expr: Box::new(expr),
2244        })
2245    } else {
2246        parse_filter_postfix_expr(first)
2247    }
2248}
2249
2250fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2251    let mut inner = pair.into_inner();
2252    let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2253
2254    for suffix in inner {
2255        expr = parse_filter_postfix_suffix(expr, suffix)?;
2256    }
2257    Ok(expr)
2258}
2259
2260fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2261    let mut inner = pair.into_inner();
2262
2263    if let Some(first) = inner.next() {
2264        match first.as_rule() {
2265            Rule::identifier => {
2266                // Member access: .identifier
2267                Ok(Expr::Member {
2268                    expr: Box::new(expr),
2269                    member: first.as_str().to_string(),
2270                })
2271            }
2272            Rule::optional_member_access => {
2273                let member = first
2274                    .into_inner()
2275                    .expect_next("member name")?
2276                    .as_str()
2277                    .to_string();
2278                Ok(Expr::OptionalMember {
2279                    expr: Box::new(expr),
2280                    member,
2281                })
2282            }
2283            Rule::index_access => {
2284                let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2285                Ok(Expr::Index {
2286                    expr: Box::new(expr),
2287                    index: Box::new(index),
2288                })
2289            }
2290            Rule::call_args => {
2291                let args = first
2292                    .into_inner()
2293                    .filter(|p| p.as_rule() == Rule::arg_list)
2294                    .flat_map(|p| p.into_inner())
2295                    .map(parse_arg)
2296                    .collect::<ParseResult<Vec<_>>>()?;
2297                Ok(Expr::Call {
2298                    func: Box::new(expr),
2299                    args,
2300                })
2301            }
2302            _ => Ok(expr),
2303        }
2304    } else {
2305        Ok(expr)
2306    }
2307}
2308
2309fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2310    let inner = pair.into_inner().expect_next("filter primary expression")?;
2311
2312    match inner.as_rule() {
2313        Rule::literal => parse_literal(inner),
2314        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2315        Rule::filter_expr => parse_filter_expr(inner),
2316        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2317    }
2318}
2319
2320fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2321    let mut inner = pair.into_inner();
2322    let left = parse_expr_inner(inner.expect_next("range start")?)?;
2323
2324    if let Some(op_pair) = inner.next() {
2325        let inclusive = op_pair.as_str() == "..=";
2326        let right = parse_expr_inner(inner.expect_next("range end")?)?;
2327        Ok(Expr::Range {
2328            start: Box::new(left),
2329            end: Box::new(right),
2330            inclusive,
2331        })
2332    } else {
2333        Ok(left)
2334    }
2335}
2336
2337fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2338    let mut inner = pair.into_inner();
2339    let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2340
2341    for right_pair in inner {
2342        let right = parse_expr_inner(right_pair)?;
2343        left = Expr::Binary {
2344            op: BinOp::Or,
2345            left: Box::new(left),
2346            right: Box::new(right),
2347        };
2348    }
2349
2350    Ok(left)
2351}
2352
2353fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2354    let mut inner = pair.into_inner();
2355    let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2356
2357    for right_pair in inner {
2358        let right = parse_expr_inner(right_pair)?;
2359        left = Expr::Binary {
2360            op: BinOp::And,
2361            left: Box::new(left),
2362            right: Box::new(right),
2363        };
2364    }
2365
2366    Ok(left)
2367}
2368
2369fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2370    let mut inner = pair.into_inner();
2371    let first = inner.expect_next("not keyword or expression")?;
2372
2373    if first.as_str() == "not" {
2374        let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2375        Ok(Expr::Unary {
2376            op: UnaryOp::Not,
2377            expr: Box::new(expr),
2378        })
2379    } else {
2380        parse_expr_inner(first)
2381    }
2382}
2383
2384fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2385    let mut inner = pair.into_inner();
2386    let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2387
2388    if let Some(op_pair) = inner.next() {
2389        let op_str = op_pair.as_str();
2390        let op = match op_str {
2391            "==" => BinOp::Eq,
2392            "!=" => BinOp::NotEq,
2393            "<" => BinOp::Lt,
2394            "<=" => BinOp::Le,
2395            ">" => BinOp::Gt,
2396            ">=" => BinOp::Ge,
2397            "in" => BinOp::In,
2398            "is" => BinOp::Is,
2399            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2400            _ => BinOp::Eq,
2401        };
2402        let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2403        Ok(Expr::Binary {
2404            op,
2405            left: Box::new(left),
2406            right: Box::new(right),
2407        })
2408    } else {
2409        Ok(left)
2410    }
2411}
2412
2413fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2414    parse_binary_chain(pair, BinOp::BitOr)
2415}
2416
2417fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2418    parse_binary_chain(pair, BinOp::BitXor)
2419}
2420
2421fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2422    parse_binary_chain(pair, BinOp::BitAnd)
2423}
2424
2425fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2426    let mut inner = pair.into_inner();
2427    let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2428
2429    while let Some(op_or_expr) = inner.next() {
2430        let op = match op_or_expr.as_str() {
2431            "<<" => BinOp::Shl,
2432            ">>" => BinOp::Shr,
2433            _ => {
2434                let right = parse_expr_inner(op_or_expr)?;
2435                left = Expr::Binary {
2436                    op: BinOp::Shl,
2437                    left: Box::new(left),
2438                    right: Box::new(right),
2439                };
2440                continue;
2441            }
2442        };
2443        if let Some(right_pair) = inner.next() {
2444            let right = parse_expr_inner(right_pair)?;
2445            left = Expr::Binary {
2446                op,
2447                left: Box::new(left),
2448                right: Box::new(right),
2449            };
2450        }
2451    }
2452
2453    Ok(left)
2454}
2455
2456fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2457    let mut inner = pair.into_inner();
2458    let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2459
2460    while let Some(op_pair) = inner.next() {
2461        let op_text = op_pair.as_str();
2462        let op = if op_text == "-" {
2463            BinOp::Sub
2464        } else {
2465            BinOp::Add
2466        };
2467
2468        if let Some(right_pair) = inner.next() {
2469            let right = parse_expr_inner(right_pair)?;
2470            left = Expr::Binary {
2471                op,
2472                left: Box::new(left),
2473                right: Box::new(right),
2474            };
2475        }
2476    }
2477
2478    Ok(left)
2479}
2480
2481fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2482    let mut inner = pair.into_inner();
2483    let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2484
2485    while let Some(op_pair) = inner.next() {
2486        let op_text = op_pair.as_str();
2487        let op = match op_text {
2488            "*" => BinOp::Mul,
2489            "/" => BinOp::Div,
2490            "%" => BinOp::Mod,
2491            _ => BinOp::Mul,
2492        };
2493
2494        if let Some(right_pair) = inner.next() {
2495            let right = parse_expr_inner(right_pair)?;
2496            left = Expr::Binary {
2497                op,
2498                left: Box::new(left),
2499                right: Box::new(right),
2500            };
2501        }
2502    }
2503
2504    Ok(left)
2505}
2506
2507fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2508    let mut inner = pair.into_inner();
2509    let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2510
2511    if let Some(exp_pair) = inner.next() {
2512        let exp = parse_expr_inner(exp_pair)?;
2513        Ok(Expr::Binary {
2514            op: BinOp::Pow,
2515            left: Box::new(base),
2516            right: Box::new(exp),
2517        })
2518    } else {
2519        Ok(base)
2520    }
2521}
2522
2523fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2524    let mut inner = pair.into_inner();
2525    let first = inner.expect_next("unary operator or expression")?;
2526
2527    // Check if first is a unary operator or the expression itself
2528    match first.as_rule() {
2529        Rule::unary_op => {
2530            let op_str = first.as_str();
2531            let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2532            let op = match op_str {
2533                "-" => UnaryOp::Neg,
2534                "~" => UnaryOp::BitNot,
2535                _ => unreachable!("Grammar only allows - or ~"),
2536            };
2537            Ok(Expr::Unary {
2538                op,
2539                expr: Box::new(expr),
2540            })
2541        }
2542        _ => parse_expr_inner(first),
2543    }
2544}
2545
2546fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2547    let mut inner = pair.into_inner();
2548    let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2549
2550    for suffix in inner {
2551        expr = parse_postfix_suffix(expr, suffix)?;
2552    }
2553
2554    Ok(expr)
2555}
2556
2557fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2558    let inner = pair.into_inner().expect_next("postfix suffix")?;
2559
2560    match inner.as_rule() {
2561        Rule::member_access => {
2562            let member = inner
2563                .into_inner()
2564                .expect_next("member name")?
2565                .as_str()
2566                .to_string();
2567            Ok(Expr::Member {
2568                expr: Box::new(expr),
2569                member,
2570            })
2571        }
2572        Rule::optional_member_access => {
2573            let member = inner
2574                .into_inner()
2575                .expect_next("member name")?
2576                .as_str()
2577                .to_string();
2578            Ok(Expr::OptionalMember {
2579                expr: Box::new(expr),
2580                member,
2581            })
2582        }
2583        Rule::slice_access => {
2584            // Parse slice: [start:end], [:end], [start:], [:]
2585            let slice_range = inner.into_inner().expect_next("slice range")?;
2586            let slice_inner = slice_range.into_inner();
2587
2588            let mut start = None;
2589            let mut end = None;
2590
2591            for p in slice_inner {
2592                match p.as_rule() {
2593                    Rule::slice_start => {
2594                        start = Some(Box::new(parse_expr_inner(
2595                            p.into_inner().expect_next("slice start expression")?,
2596                        )?));
2597                    }
2598                    Rule::slice_end => {
2599                        end = Some(Box::new(parse_expr_inner(
2600                            p.into_inner().expect_next("slice end expression")?,
2601                        )?));
2602                    }
2603                    _ => {}
2604                }
2605            }
2606
2607            Ok(Expr::Slice {
2608                expr: Box::new(expr),
2609                start,
2610                end,
2611            })
2612        }
2613        Rule::index_access => {
2614            let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2615            Ok(Expr::Index {
2616                expr: Box::new(expr),
2617                index: Box::new(index),
2618            })
2619        }
2620        Rule::call_args => {
2621            let mut args = Vec::new();
2622            for p in inner.into_inner() {
2623                if p.as_rule() == Rule::arg_list {
2624                    for arg in p.into_inner() {
2625                        args.push(parse_arg(arg)?);
2626                    }
2627                }
2628            }
2629            Ok(Expr::Call {
2630                func: Box::new(expr),
2631                args,
2632            })
2633        }
2634        _ => Ok(expr),
2635    }
2636}
2637
2638fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2639    let mut inner = pair.into_inner();
2640    let first = inner.expect_next("argument")?;
2641
2642    if let Some(second) = inner.next() {
2643        Ok(Arg::Named(
2644            first.as_str().to_string(),
2645            parse_expr_inner(second)?,
2646        ))
2647    } else {
2648        Ok(Arg::Positional(parse_expr_inner(first)?))
2649    }
2650}
2651
2652fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2653    let inner = pair.into_inner().expect_next("primary expression")?;
2654
2655    match inner.as_rule() {
2656        Rule::if_expr => parse_if_expr(inner),
2657        Rule::literal => parse_literal(inner),
2658        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2659        Rule::array_literal => parse_array_literal(inner),
2660        Rule::map_literal => parse_map_literal(inner),
2661        Rule::expr => parse_expr(inner),
2662        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2663    }
2664}
2665
2666fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2667    let mut inner = pair.into_inner();
2668    let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2669    let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2670    let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2671
2672    Ok(Expr::If {
2673        cond: Box::new(cond),
2674        then_branch: Box::new(then_branch),
2675        else_branch: Box::new(else_branch),
2676    })
2677}
2678
2679fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2680    let inner = pair.into_inner().expect_next("literal value")?;
2681
2682    match inner.as_rule() {
2683        Rule::integer => inner
2684            .as_str()
2685            .parse::<i64>()
2686            .map(Expr::Int)
2687            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2688        Rule::float => inner
2689            .as_str()
2690            .parse::<f64>()
2691            .map(Expr::Float)
2692            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2693        Rule::string => {
2694            let s = inner.as_str();
2695            Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2696        }
2697        Rule::duration => Ok(Expr::Duration(
2698            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2699        )),
2700        Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2701        Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2702        Rule::null => Ok(Expr::Null),
2703        _ => Ok(Expr::Null),
2704    }
2705}
2706
2707fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2708    let mut items = Vec::new();
2709    for p in pair.into_inner() {
2710        if p.as_rule() == Rule::expr_list {
2711            for expr in p.into_inner() {
2712                items.push(parse_expr(expr)?);
2713            }
2714        }
2715    }
2716    Ok(Expr::Array(items))
2717}
2718
2719fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2720    let mut entries = Vec::new();
2721    for p in pair.into_inner() {
2722        if p.as_rule() == Rule::map_entry_list {
2723            for entry in p.into_inner() {
2724                let mut inner = entry.into_inner();
2725                let key = inner.expect_next("map key")?.as_str().to_string();
2726                let key = if key.starts_with('"') {
2727                    key[1..key.len() - 1].to_string()
2728                } else {
2729                    key
2730                };
2731                let value = parse_expr(inner.expect_next("map value")?)?;
2732                entries.push((key, value));
2733            }
2734        }
2735    }
2736    Ok(Expr::Map(entries))
2737}
2738
2739fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2740    let mut inner = pair.into_inner();
2741    let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2742
2743    for right_pair in inner {
2744        let right = parse_expr_inner(right_pair)?;
2745        left = Expr::Binary {
2746            op,
2747            left: Box::new(left),
2748            right: Box::new(right),
2749        };
2750    }
2751
2752    Ok(left)
2753}
2754
2755#[cfg(test)]
2756mod tests {
2757    use super::*;
2758
2759    #[test]
2760    fn test_parse_simple_stream() {
2761        let result = parse("stream output = input");
2762        assert!(result.is_ok(), "Failed: {:?}", result.err());
2763    }
2764
2765    #[test]
2766    fn test_parse_stream_with_filter() {
2767        let result = parse("stream output = input.where(value > 100)");
2768        assert!(result.is_ok(), "Failed: {:?}", result.err());
2769    }
2770
2771    #[test]
2772    fn test_parse_stream_with_map() {
2773        let result = parse("stream output = input.map(x * 2)");
2774        assert!(result.is_ok(), "Failed: {:?}", result.err());
2775    }
2776
2777    #[test]
2778    fn test_parse_event_declaration() {
2779        let result = parse("event SensorReading:\n  sensor_id: str\n  value: float");
2780        assert!(result.is_ok(), "Failed: {:?}", result.err());
2781    }
2782
2783    #[test]
2784    fn test_parse_variable() {
2785        let result = parse("let x = 42");
2786        assert!(result.is_ok(), "Failed: {:?}", result.err());
2787    }
2788
2789    #[test]
2790    fn test_parse_function() {
2791        let result = parse("fn add(a: int, b: int) -> int:\n  return a + b");
2792        assert!(result.is_ok(), "Failed: {:?}", result.err());
2793    }
2794
2795    #[test]
2796    fn test_parse_lambda() {
2797        let result = parse("let f = (x) => x * 2");
2798        assert!(result.is_ok(), "Failed: {:?}", result.err());
2799    }
2800
2801    #[test]
2802    fn test_parse_if_expression() {
2803        let result = parse("let x = if a > b then a else b");
2804        assert!(result.is_ok(), "Failed: {:?}", result.err());
2805    }
2806
2807    #[test]
2808    fn test_parse_followed_by() {
2809        let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2810        assert!(result.is_ok(), "Failed: {:?}", result.err());
2811    }
2812
2813    #[test]
2814    fn test_parse_window() {
2815        let result = parse("stream windowed = input.window(5s)");
2816        assert!(result.is_ok(), "Failed: {:?}", result.err());
2817    }
2818
2819    #[test]
2820    fn test_parse_aggregate() {
2821        let result =
2822            parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2823        assert!(result.is_ok(), "Failed: {:?}", result.err());
2824    }
2825
2826    #[test]
2827    fn test_parse_merge() {
2828        let result = parse("stream combined = merge(stream1, stream2)");
2829        assert!(result.is_ok(), "Failed: {:?}", result.err());
2830    }
2831
2832    #[test]
2833    fn test_parse_sequence() {
2834        let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2835        assert!(result.is_ok(), "Failed: {:?}", result.err());
2836    }
2837
2838    #[test]
2839    fn test_parse_config() {
2840        let result = parse("config:\n  window_size: 5s\n  batch_size: 100");
2841        assert!(result.is_ok(), "Failed: {:?}", result.err());
2842    }
2843
2844    #[test]
2845    fn test_parse_complex_expression() {
2846        let result = parse("let x = (a + b) * c / d - e");
2847        assert!(result.is_ok(), "Failed: {:?}", result.err());
2848    }
2849
2850    #[test]
2851    fn test_parse_sliding_window() {
2852        let result = parse("stream output = input.window(5m, sliding: 1m)");
2853        assert!(result.is_ok(), "Failed: {:?}", result.err());
2854    }
2855
2856    #[test]
2857    fn test_parse_fork_construct() {
2858        let result =
2859            parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2860        assert!(result.is_ok(), "Failed: {:?}", result.err());
2861    }
2862
2863    #[test]
2864    fn test_parse_aggregate_functions() {
2865        let result = parse(
2866            "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2867        );
2868        assert!(result.is_ok(), "Failed: {:?}", result.err());
2869    }
2870
2871    #[test]
2872    fn test_parse_complex_parentheses() {
2873        let result = parse("let x = ((a + b) * (c - d)) / e");
2874        assert!(result.is_ok(), "Failed: {:?}", result.err());
2875    }
2876
2877    #[test]
2878    fn test_parse_sequence_with_alias() {
2879        let result = parse(
2880            r#"
2881            stream TwoTicks = StockTick as first
2882                -> StockTick as second
2883                .emit(result: "two_ticks")
2884        "#,
2885        );
2886        assert!(result.is_ok(), "Failed: {:?}", result.err());
2887    }
2888
2889    #[test]
2890    fn test_parse_followed_by_with_alias() {
2891        let result = parse("stream alerts = Order as a -> Payment as b");
2892        assert!(result.is_ok(), "Failed: {:?}", result.err());
2893    }
2894
2895    #[test]
2896    fn test_parse_followed_by_with_filter_and_alias() {
2897        // This is the problematic case: 'as b' should be the alias, not part of the expression
2898        let result = parse(
2899            r#"
2900            stream Test = A as a
2901                -> B where value == a.base + 10 as b
2902                .emit(status: "matched")
2903        "#,
2904        );
2905        assert!(result.is_ok(), "Failed: {:?}", result.err());
2906    }
2907
2908    #[test]
2909    fn test_parse_pattern_with_lambda() {
2910        let result =
2911            parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2912        assert!(result.is_ok(), "Failed: {:?}", result.err());
2913    }
2914
2915    #[test]
2916    fn test_parse_sase_pattern_decl_simple() {
2917        let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2918        assert!(result.is_ok(), "Failed: {:?}", result.err());
2919    }
2920
2921    #[test]
2922    fn test_parse_sase_pattern_decl_with_kleene() {
2923        let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2924        assert!(result.is_ok(), "Failed: {:?}", result.err());
2925    }
2926
2927    #[test]
2928    fn test_parse_sase_pattern_decl_with_alias() {
2929        let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2930        assert!(result.is_ok(), "Failed: {:?}", result.err());
2931    }
2932
2933    #[test]
2934    fn test_parse_sase_pattern_decl_with_within() {
2935        let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2936        assert!(result.is_ok(), "Failed: {:?}", result.err());
2937    }
2938
2939    #[test]
2940    fn test_parse_sase_pattern_decl_with_partition() {
2941        let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2942        assert!(result.is_ok(), "Failed: {:?}", result.err());
2943    }
2944
2945    #[test]
2946    fn test_parse_sase_pattern_decl_full() {
2947        // Note: In SASE+ syntax, 'where' comes before 'as' (filter then alias)
2948        let result = parse(
2949            "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2950        );
2951        assert!(result.is_ok(), "Failed: {:?}", result.err());
2952    }
2953
2954    #[test]
2955    fn test_parse_sase_pattern_decl_or() {
2956        let result = parse("pattern AlertOrWarn = Login OR Logout");
2957        assert!(result.is_ok(), "Failed: {:?}", result.err());
2958    }
2959
2960    #[test]
2961    fn test_parse_sase_pattern_decl_and() {
2962        let result = parse("pattern BothEvents = Login AND Transaction");
2963        assert!(result.is_ok(), "Failed: {:?}", result.err());
2964    }
2965
2966    #[test]
2967    fn test_parse_sase_pattern_decl_not() {
2968        let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
2969        assert!(result.is_ok(), "Failed: {:?}", result.err());
2970    }
2971
2972    #[test]
2973    fn test_parse_having() {
2974        let result = parse(
2975            "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
2976        );
2977        assert!(result.is_ok(), "Failed: {:?}", result.err());
2978    }
2979
2980    #[test]
2981    fn test_parse_having_with_partition() {
2982        let result = parse(
2983            "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
2984        );
2985        assert!(result.is_ok(), "Failed: {:?}", result.err());
2986    }
2987
2988    #[test]
2989    fn test_parse_timer_source() {
2990        let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
2991        assert!(result.is_ok(), "Failed: {:?}", result.err());
2992    }
2993
2994    #[test]
2995    fn test_parse_timer_source_with_initial_delay() {
2996        let result =
2997            parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
2998        assert!(result.is_ok(), "Failed: {:?}", result.err());
2999    }
3000
3001    #[test]
3002    fn test_parse_var_decl() {
3003        let result = parse("var threshold: float = 10.0");
3004        assert!(result.is_ok(), "Failed: {:?}", result.err());
3005    }
3006
3007    #[test]
3008    fn test_parse_let_decl() {
3009        let result = parse("let max_count: int = 100");
3010        assert!(result.is_ok(), "Failed: {:?}", result.err());
3011    }
3012
3013    #[test]
3014    fn test_parse_assignment() {
3015        let result = parse("threshold := threshold + 10.0");
3016        assert!(result.is_ok(), "Failed: {:?}", result.err());
3017    }
3018
3019    #[test]
3020    fn test_parse_assignment_with_expression() {
3021        let result = parse("count := count * 2 + offset");
3022        assert!(result.is_ok(), "Failed: {:?}", result.err());
3023    }
3024
3025    #[test]
3026    fn test_parse_nested_stream_reference() {
3027        let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
3028        assert!(result.is_ok(), "Failed: {:?}", result.err());
3029    }
3030
3031    #[test]
3032    fn test_parse_multi_stage_pipeline() {
3033        let result = parse(
3034            "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
3035        );
3036        assert!(result.is_ok(), "Failed: {:?}", result.err());
3037    }
3038
3039    #[test]
3040    fn test_parse_stream_with_operations_chain() {
3041        let result = parse(
3042            "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
3043        );
3044        assert!(result.is_ok(), "Failed: {:?}", result.err());
3045    }
3046
3047    // =========================================================================
3048    // Connectivity Architecture Tests
3049    // =========================================================================
3050
3051    #[test]
3052    fn test_parse_connector_mqtt() {
3053        let result = parse(
3054            r#"connector MqttBroker = mqtt (
3055                host: "localhost",
3056                port: 1883,
3057                client_id: "varpulis"
3058            )"#,
3059        );
3060        assert!(result.is_ok(), "Failed: {:?}", result.err());
3061    }
3062
3063    #[test]
3064    fn test_parse_connector_kafka() {
3065        let result = parse(
3066            r#"connector KafkaCluster = kafka (
3067                brokers: ["kafka1:9092", "kafka2:9092"],
3068                group_id: "my-group"
3069            )"#,
3070        );
3071        assert!(result.is_ok(), "Failed: {:?}", result.err());
3072    }
3073
3074    #[test]
3075    fn test_parse_connector_http() {
3076        let result = parse(
3077            r#"connector ApiEndpoint = http (
3078                base_url: "https://api.example.com"
3079            )"#,
3080        );
3081        assert!(result.is_ok(), "Failed: {:?}", result.err());
3082    }
3083
3084    #[test]
3085    fn test_parse_stream_with_from_connector() {
3086        let result = parse(
3087            r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
3088        );
3089        assert!(result.is_ok(), "Failed: {:?}", result.err());
3090    }
3091
3092    #[test]
3093    fn test_parse_stream_with_from_and_operations() {
3094        let result = parse(
3095            r#"stream HighTemp = TemperatureReading
3096                .from(MqttSensors, topic: "sensors/#")
3097                .where(value > 30)
3098                .emit(alert: "high_temp")"#,
3099        );
3100        assert!(result.is_ok(), "Failed: {:?}", result.err());
3101    }
3102
3103    #[test]
3104    fn test_parse_full_connectivity_pipeline() {
3105        let result = parse(
3106            r#"
3107            connector MqttSensors = mqtt (host: "localhost", port: 1883)
3108            connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
3109
3110            event TemperatureReading:
3111                sensor_id: str
3112                value: float
3113                ts: timestamp
3114
3115            stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
3116
3117            stream HighTempAlert = Temperatures
3118                .where(value > 30)
3119                .emit(alert_type: "HIGH_TEMP", temperature: value)
3120
3121            "#,
3122        );
3123        assert!(result.is_ok(), "Failed: {:?}", result.err());
3124    }
3125
3126    #[test]
3127    fn test_parse_emit_as_type() {
3128        let result = parse(
3129            r#"stream Alerts = Temperatures
3130                .where(value > 30)
3131                .emit as AlertEvent(severity: "high", temp: value)"#,
3132        );
3133        assert!(result.is_ok(), "Failed: {:?}", result.err());
3134    }
3135
3136    #[test]
3137    fn test_parse_stream_with_to_connector() {
3138        let result = parse(
3139            r#"stream Output = Input
3140                .where(x > 0)
3141                .emit(y: x * 2)
3142                .to(KafkaOutput, topic: "output")"#,
3143        );
3144        assert!(result.is_ok(), "Failed: {:?}", result.err());
3145    }
3146
3147    #[test]
3148    fn test_emit_stmt_parses() {
3149        let result = parse(
3150            r"fn test():
3151    emit Pixel(x: 1, y: 2)",
3152        );
3153        assert!(result.is_ok(), "Failed: {:?}", result.err());
3154        let program = result.unwrap();
3155        // Find the fn_decl
3156        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3157            match &body[0].node {
3158                Stmt::Emit { event_type, fields } => {
3159                    assert_eq!(event_type, "Pixel");
3160                    assert_eq!(fields.len(), 2);
3161                    assert_eq!(fields[0].name, "x");
3162                    assert_eq!(fields[1].name, "y");
3163                }
3164                other => panic!("Expected Stmt::Emit, got {other:?}"),
3165            }
3166        } else {
3167            panic!("Expected FnDecl");
3168        }
3169    }
3170
3171    #[test]
3172    fn test_emit_stmt_no_args() {
3173        let result = parse(
3174            r"fn test():
3175    emit Done()",
3176        );
3177        assert!(result.is_ok(), "Failed: {:?}", result.err());
3178        let program = result.unwrap();
3179        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3180            match &body[0].node {
3181                Stmt::Emit { event_type, fields } => {
3182                    assert_eq!(event_type, "Done");
3183                    assert!(fields.is_empty());
3184                }
3185                other => panic!("Expected Stmt::Emit, got {other:?}"),
3186            }
3187        } else {
3188            panic!("Expected FnDecl");
3189        }
3190    }
3191
3192    #[test]
3193    fn test_emit_in_function_with_for_loop() {
3194        let result = parse(
3195            r"fn generate(n: int):
3196    for i in 0..n:
3197        emit Item(index: i, value: i * 2)",
3198        );
3199        assert!(result.is_ok(), "Failed: {:?}", result.err());
3200    }
3201
3202    #[test]
3203    fn test_parse_process_op() {
3204        let result = parse(
3205            r"fn do_work():
3206    emit Result(v: 42)
3207
3208stream S = timer(1s).process(do_work())",
3209        );
3210        assert!(result.is_ok(), "Failed: {:?}", result.err());
3211    }
3212
3213    #[test]
3214    fn test_parse_trend_aggregate_count_trends() {
3215        let result = parse(
3216            r#"stream S = StockTick as first
3217    -> all StockTick where price > first.price as rising
3218    -> StockTick where price < rising.price as drop
3219    .within(60s)
3220    .trend_aggregate(count: count_trends())
3221    .emit(event_type: "TrendStats", trends: count)"#,
3222        );
3223        assert!(result.is_ok(), "Failed: {:?}", result.err());
3224        let program = result.unwrap();
3225        // Find the stream declaration and check for TrendAggregate op
3226        for stmt in &program.statements {
3227            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3228                let has_trend_agg = ops
3229                    .iter()
3230                    .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3231                assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3232                // Check that TrendAggregate has the right item
3233                for op in ops {
3234                    if let StreamOp::TrendAggregate(items) = op {
3235                        assert_eq!(items.len(), 1);
3236                        assert_eq!(items[0].alias, "count");
3237                        assert_eq!(items[0].func, "count_trends");
3238                        assert!(items[0].arg.is_none());
3239                    }
3240                }
3241                return;
3242            }
3243        }
3244        panic!("No stream declaration found");
3245    }
3246
3247    #[test]
3248    fn test_parse_trend_aggregate_multiple_items() {
3249        let result = parse(
3250            r"stream S = StockTick as first
3251    -> all StockTick as rising
3252    .within(60s)
3253    .trend_aggregate(
3254        trend_count: count_trends(),
3255        event_count: count_events(rising)
3256    )
3257    .emit(trends: trend_count, events: event_count)",
3258        );
3259        assert!(result.is_ok(), "Failed: {:?}", result.err());
3260        let program = result.unwrap();
3261        for stmt in &program.statements {
3262            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3263                for op in ops {
3264                    if let StreamOp::TrendAggregate(items) = op {
3265                        assert_eq!(items.len(), 2);
3266                        assert_eq!(items[0].alias, "trend_count");
3267                        assert_eq!(items[0].func, "count_trends");
3268                        assert_eq!(items[1].alias, "event_count");
3269                        assert_eq!(items[1].func, "count_events");
3270                        assert!(items[1].arg.is_some());
3271                        return;
3272                    }
3273                }
3274            }
3275        }
3276        panic!("No TrendAggregate found");
3277    }
3278
3279    #[test]
3280    fn test_parse_score_basic() {
3281        let result = parse(
3282            r#"stream S = TradeEvent
3283    .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3284        );
3285        assert!(result.is_ok(), "Failed: {:?}", result.err());
3286        let program = result.unwrap();
3287        for stmt in &program.statements {
3288            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3289                for op in ops {
3290                    if let StreamOp::Score(spec) = op {
3291                        assert_eq!(spec.model_path, "models/fraud.onnx");
3292                        assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3293                        assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3294                        return;
3295                    }
3296                }
3297            }
3298        }
3299        panic!("No Score op found");
3300    }
3301
3302    #[test]
3303    fn test_parse_score_single_field() {
3304        let result = parse(
3305            r#"stream S = Event
3306    .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3307        );
3308        assert!(result.is_ok(), "Failed: {:?}", result.err());
3309        let program = result.unwrap();
3310        for stmt in &program.statements {
3311            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3312                for op in ops {
3313                    if let StreamOp::Score(spec) = op {
3314                        assert_eq!(spec.model_path, "model.onnx");
3315                        assert_eq!(spec.inputs, vec!["value"]);
3316                        assert_eq!(spec.outputs, vec!["prediction"]);
3317                        return;
3318                    }
3319                }
3320            }
3321        }
3322        panic!("No Score op found");
3323    }
3324
3325    // Regression tests for fuzz-discovered parser hangs (2026-02-21).
3326    // Unmatched `[` brackets cause exponential backtracking in pest's PEG
3327    // recursive descent through array_literal / index_access / slice_access.
3328
3329    #[test]
3330    fn fuzz_regression_unmatched_brackets_timeout() {
3331        // Simplified version of the fuzzer-discovered timeout input (28 unmatched '[')
3332        let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3333        let start = std::time::Instant::now();
3334        let result = parse(input);
3335        let elapsed = start.elapsed();
3336        assert!(
3337            result.is_err(),
3338            "Should reject deeply nested unmatched brackets"
3339        );
3340        assert!(
3341            elapsed.as_millis() < 100,
3342            "Parser should reject fast, took {elapsed:?}"
3343        );
3344    }
3345
3346    #[test]
3347    fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3348        // 30 unmatched '[' — must be rejected by nesting depth check
3349        let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3350        let start = std::time::Instant::now();
3351        let result = parse(input);
3352        let elapsed = start.elapsed();
3353        assert!(result.is_err(), "Should reject deeply nested brackets");
3354        assert!(
3355            elapsed.as_millis() < 100,
3356            "Parser should reject fast, took {elapsed:?}"
3357        );
3358    }
3359
3360    #[test]
3361    fn nesting_depth_allows_reasonable_programs() {
3362        // 10 levels of nesting — well within the limit
3363        let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3364        let result = parse(input);
3365        // May fail for other reasons (not a valid program) but should NOT
3366        // fail with "Nesting depth exceeds maximum"
3367        if let Err(ref e) = result {
3368            let msg = format!("{e}");
3369            assert!(
3370                !msg.contains("Nesting depth"),
3371                "Should allow 10 levels of nesting: {msg}"
3372            );
3373        }
3374    }
3375
3376    #[test]
3377    fn nesting_depth_ignores_brackets_in_comments() {
3378        // Brackets inside # comments should not count
3379        let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3380        let result = parse(input);
3381        assert!(
3382            result.is_ok(),
3383            "Brackets in comments should be ignored: {:?}",
3384            result.err()
3385        );
3386    }
3387
3388    #[test]
3389    fn nesting_depth_ignores_brackets_in_strings() {
3390        // Brackets inside strings should not count
3391        let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3392        let result = parse(input);
3393        if let Err(ref e) = result {
3394            let msg = format!("{e}");
3395            assert!(
3396                !msg.contains("Nesting depth"),
3397                "Brackets in strings should be ignored: {msg}"
3398            );
3399        }
3400    }
3401}