Skip to main content

varpulis_parser/
pest_parser.rs

1//! Pest-based parser for VPL
2//!
3//! This module provides parsing using the pest PEG parser generator.
4//!
5//! The `Rule` enum and its variants are auto-generated by `pest_derive`
6//! from the grammar file and cannot carry doc comments.
7
8use pest::Parser;
9use pest_derive::Parser;
10use varpulis_core::ast::*;
11use varpulis_core::span::{Span, Spanned};
12use varpulis_core::types::Type;
13
14use crate::error::{ParseError, ParseResult};
15use crate::helpers::{parse_duration, parse_timestamp};
16use crate::indent::preprocess_indentation;
17
18/// Extension trait for safer iterator extraction
19trait IteratorExt<'a> {
20    /// Get the next element or return an error with the expected rule description
21    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
22}
23
24impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
25    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
26        self.next().ok_or_else(|| ParseError::Located {
27            line: 0,
28            column: 0,
29            position: 0,
30            message: format!("Expected {expected}"),
31            hint: None,
32        })
33    }
34}
35
36/// Pest-based VPL parser.
37///
38/// The grammar rules are auto-generated from `varpulis.pest` by the
39/// `pest_derive` macro. Use [`parse`] instead of calling this directly.
40#[derive(Debug, Parser)]
41#[grammar = "varpulis.pest"]
42pub struct VarpulisParser;
43
44/// Maximum wall-clock time the parser is allowed to run before being aborted.
45///
46/// PEG recursive-descent parsers can exhibit exponential backtracking on
47/// adversarial inputs even after bracket-depth and unmatched-bracket pre-scans.
48/// A hard timeout protects callers (CLI, LSP, fuzz targets) from hangs.
49/// 10 seconds is orders of magnitude more than any real VPL program needs
50/// (typical parse time is <5 ms for a 1000-line file).
51const PARSE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
52
53/// Parse a VPL source string into a Program AST.
54///
55/// On native targets: runs pest parsing in a dedicated thread with a 16 MB
56/// stack and a wall-clock timeout to guard against stack overflow and
57/// exponential backtracking on adversarial inputs.
58///
59/// On WASM: calls the parser directly (no threads, no timeout).
60#[cfg(not(target_arch = "wasm32"))]
61pub fn parse(source: &str) -> ParseResult<Program> {
62    let source = source.to_string();
63    let handle = std::thread::Builder::new()
64        .stack_size(16 * 1024 * 1024)
65        .spawn(move || parse_inner(&source))
66        .map_err(|e| ParseError::InvalidToken {
67            position: 0,
68            message: format!("Failed to spawn parser thread: {e}"),
69        })?;
70
71    // Park the current thread until the parser finishes or the timeout fires.
72    let deadline = std::time::Instant::now() + PARSE_TIMEOUT;
73    loop {
74        if handle.is_finished() {
75            return handle.join().unwrap_or_else(|_| {
76                Err(ParseError::InvalidToken {
77                    position: 0,
78                    message: "Parser stack overflow on deeply nested input".to_string(),
79                })
80            });
81        }
82        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
83        if remaining.is_zero() {
84            return Err(ParseError::InvalidToken {
85                position: 0,
86                message: format!(
87                    "Parser timed out after {}s on pathological input",
88                    PARSE_TIMEOUT.as_secs()
89                ),
90            });
91        }
92        std::thread::sleep(std::time::Duration::from_millis(5).min(remaining));
93    }
94}
95
96/// WASM fallback: parse directly without thread spawning or timeout.
97#[cfg(target_arch = "wasm32")]
98pub fn parse(source: &str) -> ParseResult<Program> {
99    parse_inner(source)
100}
101
102/// Maximum bracket nesting depth allowed before pest parsing.
103///
104/// PEG recursive descent can cause exponential backtracking when unmatched
105/// brackets create ambiguity between array_literal, index_access, and
106/// slice_access rules.  Measured scaling is O(2.35^depth): depth 20 takes
107/// 1200s+, depth 16 takes 39s, depth 10 takes under 0.3s.  10 levels is
108/// generous for real VPL programs (typical nesting is 3-6 levels, extreme
109/// real-world is about 8) while keeping worst-case parse time under 1 second.
110const MAX_NESTING_DEPTH: usize = 10;
111
112/// Maximum number of unmatched open brackets allowed.
113///
114/// Even when nesting depth stays within limits, many unmatched open brackets
115/// at different positions (e.g. `a[b[c[d[` with no closing `]`) cause
116/// combinatorial explosion as pest tries each `[` as a potential array_literal,
117/// index_access, or slice_access start. 6 is generous for real VPL programs
118/// (unmatched brackets are always parse errors anyway).
119const MAX_UNMATCHED_OPEN_BRACKETS: usize = 6;
120
121/// O(n) pre-scan that rejects inputs with bracket nesting deeper than
122/// `MAX_NESTING_DEPTH` or too many unmatched open brackets. Respects string
123/// literals and comments so that brackets inside `"..."`, `# ...`, or
124/// `/* ... */` are ignored.
125fn check_nesting_depth(source: &str) -> ParseResult<()> {
126    let mut depth: usize = 0;
127    let mut max_depth: usize = 0;
128    let mut max_depth_pos: usize = 0;
129    let bytes = source.as_bytes();
130    let len = bytes.len();
131    let mut i = 0;
132
133    // Per-bracket-type counters for unmatched detection
134    let mut paren_open: usize = 0;
135    let mut paren_close: usize = 0;
136    let mut square_open: usize = 0;
137    let mut square_close: usize = 0;
138    let mut curly_open: usize = 0;
139    let mut curly_close: usize = 0;
140
141    while i < len {
142        let b = bytes[i];
143
144        // Skip double-quoted strings
145        if b == b'"' {
146            i += 1;
147            while i < len {
148                if bytes[i] == b'\\' {
149                    i += 2; // skip escaped char
150                    continue;
151                }
152                if bytes[i] == b'"' {
153                    i += 1;
154                    break;
155                }
156                i += 1;
157            }
158            continue;
159        }
160
161        // Skip VPL line comments (# to end of line)
162        if b == b'#' {
163            i += 1;
164            while i < len && bytes[i] != b'\n' {
165                i += 1;
166            }
167            continue;
168        }
169
170        // Skip block comments
171        if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
172            i += 2;
173            while i + 1 < len {
174                if bytes[i] == b'*' && bytes[i + 1] == b'/' {
175                    i += 2;
176                    break;
177                }
178                i += 1;
179            }
180            continue;
181        }
182
183        // Track bracket depth
184        match b {
185            b'(' => {
186                depth += 1;
187                paren_open += 1;
188            }
189            b'[' => {
190                depth += 1;
191                square_open += 1;
192            }
193            b'{' => {
194                depth += 1;
195                curly_open += 1;
196            }
197            b')' => {
198                depth = depth.saturating_sub(1);
199                paren_close += 1;
200            }
201            b']' => {
202                depth = depth.saturating_sub(1);
203                square_close += 1;
204            }
205            b'}' => {
206                depth = depth.saturating_sub(1);
207                curly_close += 1;
208            }
209            _ => {}
210        }
211
212        if depth > max_depth {
213            max_depth = depth;
214            max_depth_pos = i;
215        }
216
217        if max_depth > MAX_NESTING_DEPTH {
218            return Err(ParseError::InvalidToken {
219                position: max_depth_pos,
220                message: format!("Nesting depth exceeds maximum of {MAX_NESTING_DEPTH} levels"),
221            });
222        }
223
224        i += 1;
225    }
226
227    // Reject inputs with too many unmatched open brackets.
228    // Unmatched brackets are always parse errors, but they cause exponential
229    // backtracking in pest before it can report the error.
230    let unmatched = paren_open.saturating_sub(paren_close)
231        + square_open.saturating_sub(square_close)
232        + curly_open.saturating_sub(curly_close);
233    if unmatched > MAX_UNMATCHED_OPEN_BRACKETS {
234        return Err(ParseError::InvalidToken {
235            position: max_depth_pos,
236            message: format!(
237                "Too many unmatched open brackets ({unmatched}); maximum is {MAX_UNMATCHED_OPEN_BRACKETS}"
238            ),
239        });
240    }
241
242    Ok(())
243}
244
245fn parse_inner(source: &str) -> ParseResult<Program> {
246    // Expand compile-time declaration loops (top-level for with {var} interpolation)
247    let expanded =
248        crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
249            position: 0,
250            message: e,
251        })?;
252    // Preprocess to add INDENT/DEDENT markers
253    let preprocessed = preprocess_indentation(&expanded);
254
255    // Reject deeply nested input before pest parsing to prevent stack overflow
256    check_nesting_depth(&preprocessed)?;
257
258    let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
259
260    let mut statements = Vec::new();
261
262    for pair in pairs {
263        if pair.as_rule() == Rule::program {
264            for inner in pair.into_inner() {
265                if inner.as_rule() == Rule::statement {
266                    statements.push(parse_statement(inner)?);
267                }
268            }
269        }
270    }
271
272    Ok(crate::optimize::fold_program(Program { statements }))
273}
274
275fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
276    let position = match e.location {
277        pest::error::InputLocation::Pos(p) => p,
278        pest::error::InputLocation::Span((s, _)) => s,
279    };
280
281    // Extract line/column from pest error
282    let (line, column) = match e.line_col {
283        pest::error::LineColLocation::Pos((l, c)) => (l, c),
284        pest::error::LineColLocation::Span((l, c), _) => (l, c),
285    };
286
287    // Create a human-readable message based on what was expected
288    let (message, hint) = match &e.variant {
289        pest::error::ErrorVariant::ParsingError {
290            positives,
291            negatives: _,
292        } => {
293            if positives.is_empty() {
294                ("Unexpected token".to_string(), None)
295            } else if is_stream_op_error(positives) {
296                // All positives are stream operations — produce a concise error
297                (
298                    "unknown stream operation".to_string(),
299                    Some(
300                        "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
301                         .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
302                         .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
303                            .to_string(),
304                    ),
305                )
306            } else {
307                let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
308                if expected.len() == 1 {
309                    (format!("Expected {}", expected[0]), None)
310                } else {
311                    (format!("Expected one of: {}", expected.join(", ")), None)
312                }
313            }
314        }
315        pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
316    };
317
318    ParseError::Located {
319        line,
320        column,
321        position,
322        message,
323        hint,
324    }
325}
326
327/// Convert pest Rule names to human-readable format
328fn format_rule_name(rule: &Rule) -> String {
329    match rule {
330        Rule::identifier => "identifier".to_string(),
331        Rule::integer => "number".to_string(),
332        Rule::float => "number".to_string(),
333        Rule::string => "string".to_string(),
334        Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
335        Rule::type_expr => "type".to_string(),
336        Rule::expr => "expression".to_string(),
337        Rule::statement => "statement".to_string(),
338        Rule::context_decl => "context declaration".to_string(),
339        Rule::stream_decl => "stream declaration".to_string(),
340        Rule::pattern_decl => "pattern declaration".to_string(),
341        Rule::event_decl => "event declaration".to_string(),
342        Rule::fn_decl => "function declaration".to_string(),
343        Rule::INDENT => "indented block".to_string(),
344        Rule::DEDENT => "end of block".to_string(),
345        Rule::field => "field declaration (name: type)".to_string(),
346        Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
347        Rule::additive_op => "operator (+, -)".to_string(),
348        Rule::multiplicative_op => "operator (*, /, %)".to_string(),
349        Rule::postfix_suffix => "method call or member access".to_string(),
350        Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
351        Rule::sase_seq_expr => "SEQ expression".to_string(),
352        Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
353        _ => format!("{rule:?}").to_lowercase().replace('_', " "),
354    }
355}
356
357/// Returns true when all positives look like stream operation rules.
358/// This is used to produce a concise "unknown stream operation" error
359/// instead of listing 30+ individual operation names.
360fn is_stream_op_error(positives: &[Rule]) -> bool {
361    const STREAM_OP_RULES: &[Rule] = &[
362        Rule::context_op,
363        Rule::where_op,
364        Rule::select_op,
365        Rule::window_op,
366        Rule::aggregate_op,
367        Rule::having_op,
368        Rule::partition_by_op,
369        Rule::order_by_op,
370        Rule::limit_op,
371        Rule::distinct_op,
372        Rule::map_op,
373        Rule::filter_op,
374        Rule::tap_op,
375        Rule::print_op,
376        Rule::log_op,
377        Rule::emit_op,
378        Rule::to_op,
379        Rule::pattern_op,
380        Rule::concurrent_op,
381        Rule::process_op,
382        Rule::on_error_op,
383        Rule::collect_op,
384        Rule::on_op,
385        Rule::within_op,
386        Rule::not_op,
387        Rule::fork_op,
388        Rule::any_op,
389        Rule::all_op,
390        Rule::first_op,
391        Rule::watermark_op,
392        Rule::allowed_lateness_op,
393        Rule::trend_aggregate_op,
394        Rule::score_op,
395        Rule::forecast_op,
396        Rule::enrich_op,
397        Rule::alert_op,
398    ];
399    positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
400}
401
402fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
403    let span = Span::new(pair.as_span().start(), pair.as_span().end());
404    let inner = pair.into_inner().expect_next("statement body")?;
405
406    let stmt = match inner.as_rule() {
407        Rule::context_decl => parse_context_decl(inner)?,
408        Rule::connector_decl => parse_connector_decl(inner)?,
409        Rule::stream_decl => parse_stream_decl(inner)?,
410        Rule::pattern_decl => parse_pattern_decl(inner)?,
411        Rule::event_decl => parse_event_decl(inner)?,
412        Rule::type_decl => parse_type_decl(inner)?,
413        Rule::var_decl => parse_var_decl(inner)?,
414        Rule::const_decl => parse_const_decl(inner)?,
415        Rule::fn_decl => parse_fn_decl(inner)?,
416        Rule::config_block => parse_config_block(inner)?,
417        Rule::import_stmt => parse_import_stmt(inner)?,
418        Rule::if_stmt => parse_if_stmt(inner)?,
419        Rule::for_stmt => parse_for_stmt(inner)?,
420        Rule::while_stmt => parse_while_stmt(inner)?,
421        Rule::return_stmt => parse_return_stmt(inner)?,
422        Rule::break_stmt => Stmt::Break,
423        Rule::continue_stmt => Stmt::Continue,
424        Rule::emit_stmt => parse_emit_stmt(inner)?,
425        Rule::assignment_stmt => {
426            let mut inner = inner.into_inner();
427            let name = inner.expect_next("variable name")?.as_str().to_string();
428            let value = parse_expr(inner.expect_next("assignment value")?)?;
429            Stmt::Assignment { name, value }
430        }
431        Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
432        _ => {
433            return Err(ParseError::UnexpectedToken {
434                position: span.start,
435                expected: "statement".to_string(),
436                found: format!("{:?}", inner.as_rule()),
437            })
438        }
439    };
440
441    Ok(Spanned::new(stmt, span))
442}
443
444// ============================================================================
445// Context Declaration Parsing
446// ============================================================================
447
448fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
449    let mut inner = pair.into_inner();
450    let name = inner.expect_next("context name")?.as_str().to_string();
451    let mut cores = None;
452
453    for p in inner {
454        if p.as_rule() == Rule::context_params {
455            for param in p.into_inner() {
456                if param.as_rule() == Rule::context_param {
457                    // Parse cores: [0, 1, 2]
458                    let core_ids: Vec<usize> = param
459                        .into_inner()
460                        .filter(|p| p.as_rule() == Rule::integer)
461                        .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
462                        .collect();
463                    cores = Some(core_ids);
464                }
465            }
466        }
467    }
468
469    Ok(Stmt::ContextDecl { name, cores })
470}
471
472// ============================================================================
473// Connector Parsing
474// ============================================================================
475
476fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
477    let mut inner = pair.into_inner();
478    let name = inner.expect_next("connector name")?.as_str().to_string();
479    let connector_type = inner.expect_next("connector type")?.as_str().to_string();
480    let mut params = Vec::new();
481
482    for p in inner {
483        if p.as_rule() == Rule::connector_params {
484            params = parse_connector_params(p)?;
485        }
486    }
487
488    Ok(Stmt::ConnectorDecl {
489        name,
490        connector_type,
491        params,
492    })
493}
494
495fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
496    let mut params = Vec::new();
497    for p in pair.into_inner() {
498        if p.as_rule() == Rule::connector_param {
499            let mut inner = p.into_inner();
500            let name = inner.expect_next("param name")?.as_str().to_string();
501            let value_pair = inner.expect_next("param value")?;
502            let value = parse_config_value(value_pair)?;
503            params.push(ConnectorParam { name, value });
504        }
505    }
506    Ok(params)
507}
508
509fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
510    let mut inner = pair.into_inner();
511    let name = inner.expect_next("stream name")?.as_str().to_string();
512
513    let mut type_annotation = None;
514    let mut source = StreamSource::Ident(String::new());
515    let mut ops = Vec::new();
516    let mut op_spans = Vec::new();
517
518    for p in inner {
519        match p.as_rule() {
520            Rule::type_annotation => {
521                type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
522            }
523            Rule::stream_expr => {
524                let (s, o, spans) = parse_stream_expr(p)?;
525                source = s;
526                ops = o;
527                op_spans = spans;
528            }
529            _ => {}
530        }
531    }
532
533    Ok(Stmt::StreamDecl {
534        name,
535        type_annotation,
536        source,
537        ops,
538        op_spans,
539    })
540}
541
542fn parse_stream_expr(
543    pair: pest::iterators::Pair<Rule>,
544) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
545    let mut inner = pair.into_inner();
546    let source = parse_stream_source(inner.expect_next("stream source")?)?;
547    let mut ops = Vec::new();
548    let mut op_spans = Vec::new();
549
550    for p in inner {
551        if p.as_rule() == Rule::stream_op {
552            let pest_span = p.as_span();
553            let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
554            ops.push(parse_stream_op(p)?);
555            op_spans.push(span);
556        }
557    }
558
559    Ok((source, ops, op_spans))
560}
561
562// ============================================================================
563// SASE+ Pattern Declaration Parsing
564// ============================================================================
565
566fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
567    let mut inner = pair.into_inner();
568    let name = inner.expect_next("pattern name")?.as_str().to_string();
569
570    let mut expr = SasePatternExpr::Event(String::new());
571    let mut within = None;
572    let mut partition_by = None;
573
574    for p in inner {
575        match p.as_rule() {
576            Rule::sase_pattern_expr => {
577                expr = parse_sase_pattern_expr(p)?;
578            }
579            Rule::pattern_within_clause => {
580                let dur_pair = p.into_inner().expect_next("within duration")?;
581                within = Some(Expr::Duration(
582                    parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
583                ));
584            }
585            Rule::pattern_partition_clause => {
586                let key = p
587                    .into_inner()
588                    .expect_next("partition key")?
589                    .as_str()
590                    .to_string();
591                partition_by = Some(Expr::Ident(key));
592            }
593            _ => {}
594        }
595    }
596
597    Ok(Stmt::PatternDecl {
598        name,
599        expr,
600        within,
601        partition_by,
602    })
603}
604
605fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
606    let inner = pair.into_inner().expect_next("SASE pattern expression")?;
607    parse_sase_or_expr(inner)
608}
609
610fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
611    let mut inner = pair.into_inner();
612    let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
613
614    for right_pair in inner {
615        let right = parse_sase_and_expr(right_pair)?;
616        left = SasePatternExpr::Or(Box::new(left), Box::new(right));
617    }
618
619    Ok(left)
620}
621
622fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
623    let mut inner = pair.into_inner();
624    let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
625
626    for right_pair in inner {
627        let right = parse_sase_not_expr(right_pair)?;
628        left = SasePatternExpr::And(Box::new(left), Box::new(right));
629    }
630
631    Ok(left)
632}
633
634fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
635    let mut inner = pair.into_inner();
636    let first = inner.expect_next("NOT or primary expression")?;
637
638    if first.as_rule() == Rule::sase_not_keyword {
639        let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
640        Ok(SasePatternExpr::Not(Box::new(expr)))
641    } else {
642        parse_sase_primary_expr(first)
643    }
644}
645
646fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
647    let inner = pair.into_inner().expect_next("SASE primary expression")?;
648
649    match inner.as_rule() {
650        Rule::sase_seq_expr => parse_sase_seq_expr(inner),
651        Rule::sase_grouped_expr => {
652            let nested = inner.into_inner().expect_next("grouped expression")?;
653            let expr = parse_sase_pattern_expr(nested)?;
654            Ok(SasePatternExpr::Group(Box::new(expr)))
655        }
656        Rule::sase_event_ref => parse_sase_event_ref(inner),
657        _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
658    }
659}
660
661fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
662    let mut items = Vec::new();
663
664    for p in pair.into_inner() {
665        if p.as_rule() == Rule::sase_seq_items {
666            for item in p.into_inner() {
667                if item.as_rule() == Rule::sase_seq_item {
668                    items.push(parse_sase_seq_item(item)?);
669                }
670            }
671        }
672    }
673
674    Ok(SasePatternExpr::Seq(items))
675}
676
677fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
678    let inner = pair.into_inner().expect_next("sequence item")?;
679
680    match inner.as_rule() {
681        Rule::sase_negated_item => parse_sase_item_inner(inner, true),
682        Rule::sase_positive_item => parse_sase_item_inner(inner, false),
683        _ => parse_sase_item_inner(inner, false),
684    }
685}
686
687fn parse_sase_item_inner(
688    pair: pest::iterators::Pair<Rule>,
689    _negated: bool,
690) -> ParseResult<SasePatternItem> {
691    let mut inner = pair.into_inner();
692    let event_type = inner.expect_next("event type")?.as_str().to_string();
693
694    let mut kleene = None;
695    let mut filter = None;
696    let mut alias = None;
697
698    for p in inner {
699        match p.as_rule() {
700            Rule::kleene_op => {
701                kleene = Some(match p.as_str() {
702                    "+" => KleeneOp::Plus,
703                    "*" => KleeneOp::Star,
704                    "?" => KleeneOp::Optional,
705                    _ => KleeneOp::Plus,
706                });
707            }
708            Rule::sase_where_clause => {
709                filter = Some(parse_expr(
710                    p.into_inner().expect_next("filter expression")?,
711                )?);
712            }
713            Rule::sase_alias_clause => {
714                alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
715            }
716            _ => {}
717        }
718    }
719
720    // For negated items, we prefix with "!" to indicate negation
721    // The runtime will interpret this
722    let event_type = if _negated {
723        format!("!{event_type}")
724    } else {
725        event_type
726    };
727
728    Ok(SasePatternItem {
729        event_type,
730        alias,
731        kleene,
732        filter,
733    })
734}
735
736fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
737    // Single event reference with optional kleene, where, alias
738    let item = parse_sase_item_inner(pair, false)?;
739
740    // If it's a simple event with no modifiers, return Event variant
741    if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
742        Ok(SasePatternExpr::Event(item.event_type))
743    } else {
744        // Otherwise wrap in a single-item Seq
745        Ok(SasePatternExpr::Seq(vec![item]))
746    }
747}
748
749fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
750    let inner = pair.into_inner().expect_next("stream source type")?;
751
752    match inner.as_rule() {
753        Rule::from_connector_source => {
754            let mut inner_iter = inner.into_inner();
755            let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
756            let connector_name = inner_iter
757                .expect_next("connector name")?
758                .as_str()
759                .to_string();
760            let mut params = Vec::new();
761            for p in inner_iter {
762                if p.as_rule() == Rule::connector_params {
763                    params = parse_connector_params(p)?;
764                }
765            }
766            Ok(StreamSource::FromConnector {
767                event_type,
768                connector_name,
769                params,
770            })
771        }
772        Rule::merge_source => {
773            let mut streams = Vec::new();
774            for p in inner.into_inner() {
775                if p.as_rule() == Rule::inline_stream_list {
776                    for is in p.into_inner() {
777                        streams.push(parse_inline_stream(is)?);
778                    }
779                }
780            }
781            Ok(StreamSource::Merge(streams))
782        }
783        Rule::join_source
784        | Rule::left_join_source
785        | Rule::right_join_source
786        | Rule::full_join_source => {
787            let join_type = match inner.as_rule() {
788                Rule::left_join_source => varpulis_core::ast::JoinType::Left,
789                Rule::right_join_source => varpulis_core::ast::JoinType::Right,
790                Rule::full_join_source => varpulis_core::ast::JoinType::Full,
791                _ => varpulis_core::ast::JoinType::Inner,
792            };
793            let mut clauses = Vec::new();
794            for p in inner.into_inner() {
795                if p.as_rule() == Rule::join_clause_list {
796                    for jc in p.into_inner() {
797                        clauses.push(parse_join_clause(jc, join_type)?);
798                    }
799                }
800            }
801            Ok(StreamSource::Join(clauses))
802        }
803        Rule::sequence_source => {
804            let decl =
805                parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
806            Ok(StreamSource::Sequence(decl))
807        }
808        Rule::timer_source => {
809            let timer_args = inner.into_inner().expect_next("timer arguments")?;
810            let decl = parse_timer_decl(timer_args)?;
811            Ok(StreamSource::Timer(decl))
812        }
813        Rule::all_source => {
814            let mut inner_iter = inner.into_inner();
815            let name = inner_iter.expect_next("event name")?.as_str().to_string();
816            let alias = inner_iter.next().map(|p| p.as_str().to_string());
817            Ok(StreamSource::AllWithAlias { name, alias })
818        }
819        Rule::filtered_source => {
820            let mut inner_iter = inner.into_inner();
821            let name = inner_iter.expect_next("event type")?.as_str().to_string();
822            let filter_pair = inner_iter.expect_next("filter expression")?;
823            let filter = parse_filter_expr(filter_pair)?;
824            let alias = inner_iter.next().map(|p| p.as_str().to_string());
825            Ok(StreamSource::IdentWithFilterAndAlias {
826                name,
827                filter,
828                alias,
829            })
830        }
831        Rule::aliased_source => {
832            let mut inner_iter = inner.into_inner();
833            let name = inner_iter.expect_next("event name")?.as_str().to_string();
834            let alias = inner_iter.expect_next("alias")?.as_str().to_string();
835            Ok(StreamSource::IdentWithAlias { name, alias })
836        }
837        Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
838        _ => Err(ParseError::UnexpectedToken {
839            position: 0,
840            expected: "stream source".to_string(),
841            found: format!("{:?}", inner.as_rule()),
842        }),
843    }
844}
845
846fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
847    let mut inner = pair.into_inner();
848
849    // Check if it's a simple identifier or full declaration
850    let first = inner.expect_next("stream identifier")?;
851    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
852        let name = first.as_str().to_string();
853        return Ok(InlineStreamDecl {
854            name: name.clone(),
855            source: name,
856            filter: None,
857        });
858    }
859
860    let name = first.as_str().to_string();
861    let source = inner.expect_next("stream source")?.as_str().to_string();
862    let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
863
864    Ok(InlineStreamDecl {
865        name,
866        source,
867        filter,
868    })
869}
870
871fn parse_join_clause(
872    pair: pest::iterators::Pair<Rule>,
873    join_type: varpulis_core::ast::JoinType,
874) -> ParseResult<JoinClause> {
875    let mut inner = pair.into_inner();
876
877    let first = inner.expect_next("join clause identifier")?;
878    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
879        let name = first.as_str().to_string();
880        return Ok(JoinClause {
881            name: name.clone(),
882            source: name,
883            on: None,
884            join_type,
885        });
886    }
887
888    let name = first.as_str().to_string();
889    let source = inner.expect_next("join source")?.as_str().to_string();
890    let on = inner.next().map(|p| parse_expr(p)).transpose()?;
891
892    Ok(JoinClause {
893        name,
894        source,
895        on,
896        join_type,
897    })
898}
899
900fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
901    let mut steps = Vec::new();
902
903    for p in pair.into_inner() {
904        if p.as_rule() == Rule::sequence_step {
905            steps.push(parse_sequence_step(p)?);
906        }
907    }
908
909    Ok(SequenceDecl {
910        match_all: false,
911        timeout: None,
912        steps,
913    })
914}
915
916fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
917    let mut inner = pair.into_inner();
918    let alias = inner.expect_next("step alias")?.as_str().to_string();
919    let event_type = inner.expect_next("event type")?.as_str().to_string();
920
921    let mut filter = None;
922    let mut timeout = None;
923
924    for p in inner {
925        match p.as_rule() {
926            Rule::or_expr => filter = Some(parse_expr(p)?),
927            Rule::within_suffix => {
928                let expr = p.into_inner().expect_next("within duration")?;
929                timeout = Some(Box::new(parse_expr(expr)?));
930            }
931            _ => {}
932        }
933    }
934
935    Ok(SequenceStepDecl {
936        alias,
937        event_type,
938        filter,
939        timeout,
940    })
941}
942
943fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
944    let mut inner = pair.into_inner();
945
946    // First argument is the interval (duration expression)
947    let interval = parse_expr(inner.expect_next("timer interval")?)?;
948
949    // Optional second argument is initial_delay (named argument)
950    let mut initial_delay = None;
951    for p in inner {
952        if p.as_rule() == Rule::named_arg {
953            let arg = parse_named_arg(p)?;
954            if arg.name == "initial_delay" {
955                initial_delay = Some(Box::new(arg.value));
956            }
957        }
958    }
959
960    Ok(TimerDecl {
961        interval,
962        initial_delay,
963    })
964}
965
966fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
967    let inner = pair.into_inner().expect_next("stream operation")?;
968
969    match inner.as_rule() {
970        Rule::dot_op => {
971            let op_inner = inner.into_inner().expect_next("dot operation")?;
972            parse_dot_op(op_inner)
973        }
974        Rule::followed_by_op => parse_followed_by_op(inner),
975        _ => Err(ParseError::UnexpectedToken {
976            position: 0,
977            expected: "stream operation".to_string(),
978            found: format!("{:?}", inner.as_rule()),
979        }),
980    }
981}
982
983fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
984    match pair.as_rule() {
985        Rule::context_op => {
986            let name = pair
987                .into_inner()
988                .expect_next("context name")?
989                .as_str()
990                .to_string();
991            Ok(StreamOp::Context(name))
992        }
993        Rule::where_op => {
994            let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
995            Ok(StreamOp::Where(expr))
996        }
997        Rule::select_op => {
998            let mut items = Vec::new();
999            for p in pair.into_inner() {
1000                if p.as_rule() == Rule::select_list {
1001                    for si in p.into_inner() {
1002                        items.push(parse_select_item(si)?);
1003                    }
1004                }
1005            }
1006            Ok(StreamOp::Select(items))
1007        }
1008        Rule::window_op => {
1009            let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
1010            Ok(StreamOp::Window(args))
1011        }
1012        Rule::aggregate_op => {
1013            let mut items = Vec::new();
1014            for p in pair.into_inner() {
1015                if p.as_rule() == Rule::agg_list {
1016                    for ai in p.into_inner() {
1017                        items.push(parse_agg_item(ai)?);
1018                    }
1019                }
1020            }
1021            Ok(StreamOp::Aggregate(items))
1022        }
1023        Rule::having_op => {
1024            let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
1025            Ok(StreamOp::Having(expr))
1026        }
1027        Rule::map_op => {
1028            let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
1029            Ok(StreamOp::Map(expr))
1030        }
1031        Rule::filter_op => {
1032            let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
1033            Ok(StreamOp::Filter(expr))
1034        }
1035        Rule::within_op => {
1036            let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
1037            Ok(StreamOp::Within(expr))
1038        }
1039        Rule::emit_op => {
1040            let mut output_type = None;
1041            let mut fields = Vec::new();
1042            let mut target_context = None;
1043            for p in pair.into_inner() {
1044                match p.as_rule() {
1045                    Rule::emit_type_cast => {
1046                        output_type = Some(
1047                            p.into_inner()
1048                                .expect_next("type name")?
1049                                .as_str()
1050                                .to_string(),
1051                        );
1052                    }
1053                    Rule::named_arg_list => {
1054                        for arg in p.into_inner() {
1055                            let parsed = parse_named_arg(arg)?;
1056                            // Extract `context: ctx_name` as target_context
1057                            if parsed.name == "context" {
1058                                if let Expr::Ident(ctx_name) = &parsed.value {
1059                                    target_context = Some(ctx_name.clone());
1060                                    continue;
1061                                }
1062                            }
1063                            fields.push(parsed);
1064                        }
1065                    }
1066                    _ => {}
1067                }
1068            }
1069            Ok(StreamOp::Emit {
1070                output_type,
1071                fields,
1072                target_context,
1073            })
1074        }
1075        Rule::print_op => {
1076            let exprs = pair
1077                .into_inner()
1078                .filter(|p| p.as_rule() == Rule::expr_list)
1079                .flat_map(|p| p.into_inner())
1080                .map(parse_expr)
1081                .collect::<ParseResult<Vec<_>>>()?;
1082            Ok(StreamOp::Print(exprs))
1083        }
1084        Rule::collect_op => Ok(StreamOp::Collect),
1085        Rule::pattern_op => {
1086            let def_pair = pair.into_inner().expect_next("pattern definition")?;
1087            let mut inner = def_pair.into_inner();
1088            let name = inner.expect_next("pattern name")?.as_str().to_string();
1089            let body_pair = inner.expect_next("pattern body")?;
1090
1091            // pattern_body can be lambda_expr or pattern_or_expr
1092            let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
1093            let matcher = match body_inner.as_rule() {
1094                Rule::lambda_expr => parse_lambda_expr(body_inner)?,
1095                Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
1096                _ => parse_expr_inner(body_inner)?,
1097            };
1098            Ok(StreamOp::Pattern(PatternDef { name, matcher }))
1099        }
1100        Rule::partition_by_op => {
1101            let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
1102            Ok(StreamOp::PartitionBy(expr))
1103        }
1104        Rule::order_by_op => {
1105            let mut items = Vec::new();
1106            for p in pair.into_inner() {
1107                if p.as_rule() == Rule::order_list {
1108                    for oi in p.into_inner() {
1109                        items.push(parse_order_item(oi)?);
1110                    }
1111                }
1112            }
1113            Ok(StreamOp::OrderBy(items))
1114        }
1115        Rule::limit_op => {
1116            let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1117            Ok(StreamOp::Limit(expr))
1118        }
1119        Rule::distinct_op => {
1120            let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1121            Ok(StreamOp::Distinct(expr))
1122        }
1123        Rule::tap_op => {
1124            let args = pair
1125                .into_inner()
1126                .filter(|p| p.as_rule() == Rule::named_arg_list)
1127                .flat_map(|p| p.into_inner())
1128                .map(parse_named_arg)
1129                .collect::<ParseResult<Vec<_>>>()?;
1130            Ok(StreamOp::Tap(args))
1131        }
1132        Rule::log_op => {
1133            let args = pair
1134                .into_inner()
1135                .filter(|p| p.as_rule() == Rule::named_arg_list)
1136                .flat_map(|p| p.into_inner())
1137                .map(parse_named_arg)
1138                .collect::<ParseResult<Vec<_>>>()?;
1139            Ok(StreamOp::Log(args))
1140        }
1141        Rule::to_op => {
1142            let mut inner = pair.into_inner();
1143            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1144            let mut params = Vec::new();
1145            for p in inner {
1146                if p.as_rule() == Rule::connector_params {
1147                    params = parse_connector_params(p)?;
1148                }
1149            }
1150            Ok(StreamOp::To {
1151                connector_name,
1152                params,
1153            })
1154        }
1155        Rule::process_op => {
1156            let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1157            Ok(StreamOp::Process(expr))
1158        }
1159        Rule::on_error_op => {
1160            let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1161            Ok(StreamOp::OnError(expr))
1162        }
1163        Rule::on_op => {
1164            let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1165            Ok(StreamOp::On(expr))
1166        }
1167        Rule::not_op => {
1168            let mut inner = pair.into_inner();
1169            let event_type = inner.expect_next("event type")?.as_str().to_string();
1170            let filter = inner.next().map(parse_expr).transpose()?;
1171            Ok(StreamOp::Not(FollowedByClause {
1172                event_type,
1173                filter,
1174                alias: None,
1175                match_all: false,
1176            }))
1177        }
1178        Rule::fork_op => {
1179            let mut paths = Vec::new();
1180            for p in pair.into_inner() {
1181                if p.as_rule() == Rule::fork_path_list {
1182                    for fp in p.into_inner() {
1183                        paths.push(parse_fork_path(fp)?);
1184                    }
1185                }
1186            }
1187            Ok(StreamOp::Fork(paths))
1188        }
1189        Rule::any_op => {
1190            let count = pair
1191                .into_inner()
1192                .next()
1193                .map(|p| p.as_str().parse().unwrap_or(1));
1194            Ok(StreamOp::Any(count))
1195        }
1196        Rule::all_op => Ok(StreamOp::All),
1197        Rule::first_op => Ok(StreamOp::First),
1198        Rule::concurrent_op => {
1199            let args = pair
1200                .into_inner()
1201                .filter(|p| p.as_rule() == Rule::named_arg_list)
1202                .flat_map(|p| p.into_inner())
1203                .map(parse_named_arg)
1204                .collect::<ParseResult<Vec<_>>>()?;
1205            Ok(StreamOp::Concurrent(args))
1206        }
1207        Rule::watermark_op => {
1208            let args = pair
1209                .into_inner()
1210                .filter(|p| p.as_rule() == Rule::named_arg_list)
1211                .flat_map(|p| p.into_inner())
1212                .map(parse_named_arg)
1213                .collect::<ParseResult<Vec<_>>>()?;
1214            Ok(StreamOp::Watermark(args))
1215        }
1216        Rule::allowed_lateness_op => {
1217            let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1218            Ok(StreamOp::AllowedLateness(expr))
1219        }
1220        Rule::trend_aggregate_op => {
1221            let mut items = Vec::new();
1222            for p in pair.into_inner() {
1223                if p.as_rule() == Rule::trend_agg_list {
1224                    for item_pair in p.into_inner() {
1225                        if item_pair.as_rule() == Rule::trend_agg_item {
1226                            let mut inner = item_pair.into_inner();
1227                            let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1228                            let func_pair = inner.expect_next("trend agg function")?;
1229                            let mut func_inner = func_pair.into_inner();
1230                            let func_name = func_inner
1231                                .expect_next("function name")?
1232                                .as_str()
1233                                .to_string();
1234                            let arg = func_inner.next().map(parse_expr).transpose()?;
1235                            items.push(TrendAggItem {
1236                                alias,
1237                                func: func_name,
1238                                arg,
1239                            });
1240                        }
1241                    }
1242                }
1243            }
1244            Ok(StreamOp::TrendAggregate(items))
1245        }
1246        Rule::forecast_op => {
1247            let mut confidence = None;
1248            let mut horizon = None;
1249            let mut warmup = None;
1250            let mut max_depth = None;
1251            let mut hawkes = None;
1252            let mut conformal = None;
1253            let mut mode = None;
1254            for p in pair.into_inner() {
1255                if p.as_rule() == Rule::forecast_params {
1256                    for param_pair in p.into_inner() {
1257                        if param_pair.as_rule() == Rule::forecast_param {
1258                            let mut inner = param_pair.into_inner();
1259                            let name = inner.expect_next("forecast param name")?.as_str();
1260                            let value_pair = inner.expect_next("forecast param value")?;
1261                            let expr = parse_expr(value_pair)?;
1262                            match name {
1263                                "confidence" => confidence = Some(expr),
1264                                "horizon" => horizon = Some(expr),
1265                                "warmup" => warmup = Some(expr),
1266                                "max_depth" => max_depth = Some(expr),
1267                                "hawkes" => hawkes = Some(expr),
1268                                "conformal" => conformal = Some(expr),
1269                                "mode" => mode = Some(expr),
1270                                _ => {}
1271                            }
1272                        }
1273                    }
1274                }
1275            }
1276            Ok(StreamOp::Forecast(ForecastSpec {
1277                confidence,
1278                horizon,
1279                warmup,
1280                max_depth,
1281                hawkes,
1282                conformal,
1283                mode,
1284            }))
1285        }
1286        Rule::enrich_op => {
1287            let mut inner = pair.into_inner();
1288            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1289            let mut key_expr = None;
1290            let mut fields = Vec::new();
1291            let mut cache_ttl = None;
1292            let mut timeout = None;
1293            let mut fallback = None;
1294            for p in inner {
1295                if p.as_rule() == Rule::enrich_params {
1296                    for param_pair in p.into_inner() {
1297                        if param_pair.as_rule() == Rule::enrich_param {
1298                            let param_inner =
1299                                param_pair.into_inner().expect_next("enrich param")?;
1300                            match param_inner.as_rule() {
1301                                Rule::enrich_key_param => {
1302                                    let expr_pair =
1303                                        param_inner.into_inner().expect_next("key expression")?;
1304                                    key_expr = Some(parse_expr(expr_pair)?);
1305                                }
1306                                Rule::enrich_fields_param => {
1307                                    for field in param_inner.into_inner() {
1308                                        if field.as_rule() == Rule::identifier {
1309                                            fields.push(field.as_str().to_string());
1310                                        }
1311                                    }
1312                                }
1313                                Rule::enrich_cache_ttl_param => {
1314                                    let expr_pair = param_inner
1315                                        .into_inner()
1316                                        .expect_next("cache_ttl expression")?;
1317                                    cache_ttl = Some(parse_expr(expr_pair)?);
1318                                }
1319                                Rule::enrich_timeout_param => {
1320                                    let expr_pair = param_inner
1321                                        .into_inner()
1322                                        .expect_next("timeout expression")?;
1323                                    timeout = Some(parse_expr(expr_pair)?);
1324                                }
1325                                Rule::enrich_fallback_param => {
1326                                    let literal_pair =
1327                                        param_inner.into_inner().expect_next("fallback literal")?;
1328                                    fallback = Some(parse_expr(literal_pair)?);
1329                                }
1330                                _ => {}
1331                            }
1332                        }
1333                    }
1334                }
1335            }
1336            let key = key_expr.ok_or_else(|| ParseError::Located {
1337                line: 0,
1338                column: 0,
1339                position: 0,
1340                message: ".enrich() requires a key: parameter".to_string(),
1341                hint: Some("add key: <expression> to .enrich()".to_string()),
1342            })?;
1343            Ok(StreamOp::Enrich(EnrichSpec {
1344                connector_name,
1345                key_expr: Box::new(key),
1346                fields,
1347                cache_ttl,
1348                timeout,
1349                fallback,
1350            }))
1351        }
1352        Rule::score_op => {
1353            let mut model_path = String::new();
1354            let mut inputs = Vec::new();
1355            let mut outputs = Vec::new();
1356            let mut gpu = false;
1357            let mut batch_size: usize = 1;
1358            let mut device_id: i32 = 0;
1359            for p in pair.into_inner() {
1360                if p.as_rule() == Rule::score_params {
1361                    for param_pair in p.into_inner() {
1362                        if param_pair.as_rule() == Rule::score_param {
1363                            let mut inner = param_pair.into_inner();
1364                            let name = inner.expect_next("score param name")?.as_str();
1365                            let value_pair = inner.expect_next("score param value")?;
1366                            match name {
1367                                "model" => {
1368                                    let raw = value_pair.as_str();
1369                                    model_path = raw.trim_matches('"').to_string();
1370                                }
1371                                "inputs" => {
1372                                    if value_pair.as_rule() == Rule::score_field_list {
1373                                        for field in value_pair.into_inner() {
1374                                            if field.as_rule() == Rule::identifier {
1375                                                inputs.push(field.as_str().to_string());
1376                                            }
1377                                        }
1378                                    }
1379                                }
1380                                "outputs" => {
1381                                    if value_pair.as_rule() == Rule::score_field_list {
1382                                        for field in value_pair.into_inner() {
1383                                            if field.as_rule() == Rule::identifier {
1384                                                outputs.push(field.as_str().to_string());
1385                                            }
1386                                        }
1387                                    }
1388                                }
1389                                "gpu" => {
1390                                    gpu = value_pair.as_str() == "true";
1391                                }
1392                                "batch_size" => {
1393                                    batch_size = value_pair.as_str().parse().unwrap_or(1);
1394                                }
1395                                "device" | "device_id" => {
1396                                    device_id = value_pair.as_str().parse().unwrap_or(0);
1397                                }
1398                                _ => {}
1399                            }
1400                        }
1401                    }
1402                }
1403            }
1404            Ok(StreamOp::Score(ScoreSpec {
1405                model_path,
1406                inputs,
1407                outputs,
1408                gpu,
1409                batch_size,
1410                device_id,
1411            }))
1412        }
1413        Rule::alert_op => {
1414            let args = pair
1415                .into_inner()
1416                .filter(|p| p.as_rule() == Rule::named_arg_list)
1417                .flat_map(|p| p.into_inner())
1418                .map(parse_named_arg)
1419                .collect::<ParseResult<Vec<_>>>()?;
1420            Ok(StreamOp::Alert(args))
1421        }
1422        _ => Err(ParseError::UnexpectedToken {
1423            position: 0,
1424            expected: "stream operation".to_string(),
1425            found: format!("{:?}", pair.as_rule()),
1426        }),
1427    }
1428}
1429
1430fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1431    let mut inner = pair.into_inner();
1432    let expr = parse_expr(inner.expect_next("order expression")?)?;
1433    let desc = inner.next().is_some_and(|p| p.as_str() == "desc");
1434    Ok(OrderItem {
1435        expr,
1436        descending: desc,
1437    })
1438}
1439
1440fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1441    let mut inner = pair.into_inner();
1442    let name = inner.expect_next("fork path name")?.as_str().to_string();
1443    let mut ops = Vec::new();
1444    for p in inner {
1445        if p.as_rule() == Rule::stream_op {
1446            ops.push(parse_stream_op(p)?);
1447        }
1448    }
1449    Ok(ForkPath { name, ops })
1450}
1451
1452fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1453    let mut inner = pair.into_inner();
1454    let mut match_all = false;
1455
1456    let first = inner.expect_next("event type or match_all")?;
1457    let event_type = if first.as_rule() == Rule::match_all_keyword {
1458        match_all = true;
1459        inner.expect_next("event type")?.as_str().to_string()
1460    } else {
1461        first.as_str().to_string()
1462    };
1463
1464    let mut filter = None;
1465    let mut alias = None;
1466
1467    for p in inner {
1468        match p.as_rule() {
1469            Rule::or_expr => filter = Some(parse_or_expr(p)?),
1470            Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1471            Rule::identifier => alias = Some(p.as_str().to_string()),
1472            _ => {}
1473        }
1474    }
1475
1476    Ok(StreamOp::FollowedBy(FollowedByClause {
1477        event_type,
1478        filter,
1479        alias,
1480        match_all,
1481    }))
1482}
1483
1484fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1485    let mut inner = pair.into_inner();
1486    let first = inner.expect_next("select field or alias")?;
1487
1488    if let Some(second) = inner.next() {
1489        Ok(SelectItem::Alias(
1490            first.as_str().to_string(),
1491            parse_expr(second)?,
1492        ))
1493    } else {
1494        Ok(SelectItem::Field(first.as_str().to_string()))
1495    }
1496}
1497
1498fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1499    let raw = pair.as_str().trim();
1500    let is_session = raw.starts_with("session");
1501
1502    let mut inner = pair.into_inner();
1503
1504    if is_session {
1505        // session: <gap_expr>
1506        let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1507        return Ok(WindowArgs {
1508            duration: gap_expr.clone(),
1509            sliding: None,
1510            policy: None,
1511            session_gap: Some(gap_expr),
1512        });
1513    }
1514
1515    let duration = parse_expr(inner.expect_next("window duration")?)?;
1516
1517    let mut sliding = None;
1518    let mut policy = None;
1519
1520    for p in inner {
1521        if p.as_rule() == Rule::expr {
1522            // Need to determine if it's sliding or policy based on context
1523            if sliding.is_none() {
1524                sliding = Some(parse_expr(p)?);
1525            } else {
1526                policy = Some(parse_expr(p)?);
1527            }
1528        }
1529    }
1530
1531    Ok(WindowArgs {
1532        duration,
1533        sliding,
1534        policy,
1535        session_gap: None,
1536    })
1537}
1538
1539fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1540    let mut inner = pair.into_inner();
1541    let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1542    let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1543    Ok(AggItem { alias, expr })
1544}
1545
1546fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1547    let mut inner = pair.into_inner();
1548    let name = inner.expect_next("argument name")?.as_str().to_string();
1549    let value = parse_expr(inner.expect_next("argument value")?)?;
1550    Ok(NamedArg { name, value })
1551}
1552
1553fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1554    let mut inner = pair.into_inner();
1555    let name = inner.expect_next("event name")?.as_str().to_string();
1556
1557    let mut extends = None;
1558    let mut fields = Vec::new();
1559
1560    for p in inner {
1561        match p.as_rule() {
1562            Rule::identifier => extends = Some(p.as_str().to_string()),
1563            Rule::field => fields.push(parse_field(p)?),
1564            _ => {}
1565        }
1566    }
1567
1568    Ok(Stmt::EventDecl {
1569        name,
1570        extends,
1571        fields,
1572    })
1573}
1574
1575fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1576    let mut inner = pair.into_inner();
1577    let name = inner.expect_next("field name")?.as_str().to_string();
1578    let ty = parse_type(inner.expect_next("field type")?)?;
1579    let optional = inner.next().is_some();
1580    Ok(Field { name, ty, optional })
1581}
1582
1583fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1584    let mut inner = pair.into_inner();
1585    let name = inner.expect_next("type name")?.as_str().to_string();
1586    let next = inner.expect_next("type body")?;
1587    match next.as_rule() {
1588        Rule::field => {
1589            let mut fields = vec![parse_field(next)?];
1590            for p in inner {
1591                if p.as_rule() == Rule::field {
1592                    fields.push(parse_field(p)?);
1593                }
1594            }
1595            Ok(Stmt::TypeDecl {
1596                name,
1597                ty: None,
1598                fields,
1599            })
1600        }
1601        _ => Ok(Stmt::TypeDecl {
1602            name,
1603            ty: Some(parse_type(next)?),
1604            fields: vec![],
1605        }),
1606    }
1607}
1608
1609fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1610    let cloned = pair.clone();
1611    let inner = pair.into_inner().next().unwrap_or(cloned);
1612
1613    match inner.as_rule() {
1614        Rule::primitive_type => match inner.as_str() {
1615            "int" => Ok(Type::Int),
1616            "float" => Ok(Type::Float),
1617            "bool" => Ok(Type::Bool),
1618            "str" => Ok(Type::Str),
1619            "timestamp" => Ok(Type::Timestamp),
1620            "duration" => Ok(Type::Duration),
1621            _ => Ok(Type::Named(inner.as_str().to_string())),
1622        },
1623        Rule::array_type => {
1624            let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1625            Ok(Type::Array(Box::new(inner_type)))
1626        }
1627        Rule::map_type => {
1628            let mut inner_pairs = inner.into_inner();
1629            let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1630            let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1631            Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1632        }
1633        Rule::tuple_type => {
1634            let types: Vec<Type> = inner
1635                .into_inner()
1636                .map(parse_type)
1637                .collect::<ParseResult<Vec<_>>>()?;
1638            Ok(Type::Tuple(types))
1639        }
1640        Rule::stream_type => {
1641            let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1642            Ok(Type::Stream(Box::new(inner_type)))
1643        }
1644        Rule::optional_type => {
1645            let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1646            Ok(Type::Optional(Box::new(inner_type)))
1647        }
1648        Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1649        _ => Ok(Type::Named(inner.as_str().to_string())),
1650    }
1651}
1652
1653fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1654    let mut inner = pair.into_inner();
1655    let keyword = inner.expect_next("var_keyword")?.as_str();
1656    let mutable = keyword == "var";
1657    let name = inner.expect_next("variable name")?.as_str().to_string();
1658
1659    let mut ty = None;
1660    let mut value = Expr::Null;
1661
1662    for p in inner {
1663        match p.as_rule() {
1664            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1665            _ => value = parse_expr(p)?,
1666        }
1667    }
1668
1669    Ok(Stmt::VarDecl {
1670        mutable,
1671        name,
1672        ty,
1673        value,
1674    })
1675}
1676
1677fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1678    let mut inner = pair.into_inner();
1679    let name = inner.expect_next("constant name")?.as_str().to_string();
1680
1681    let mut ty = None;
1682    let mut value = Expr::Null;
1683
1684    for p in inner {
1685        match p.as_rule() {
1686            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1687            _ => value = parse_expr(p)?,
1688        }
1689    }
1690
1691    Ok(Stmt::ConstDecl { name, ty, value })
1692}
1693
1694fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1695    let mut inner = pair.into_inner();
1696    let name = inner.expect_next("function name")?.as_str().to_string();
1697
1698    let mut params = Vec::new();
1699    let mut ret = None;
1700    let mut body = Vec::new();
1701
1702    for p in inner {
1703        match p.as_rule() {
1704            Rule::param_list => {
1705                for param in p.into_inner() {
1706                    params.push(parse_param(param)?);
1707                }
1708            }
1709            Rule::type_expr => ret = Some(parse_type(p)?),
1710            Rule::block => body = parse_block(p)?,
1711            Rule::statement => body.push(parse_statement(p)?),
1712            _ => {}
1713        }
1714    }
1715
1716    Ok(Stmt::FnDecl {
1717        name,
1718        params,
1719        ret,
1720        body,
1721    })
1722}
1723
1724fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1725    let mut statements = Vec::new();
1726    for p in pair.into_inner() {
1727        if p.as_rule() == Rule::statement {
1728            statements.push(parse_statement(p)?);
1729        }
1730    }
1731    Ok(statements)
1732}
1733
1734fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1735    let mut inner = pair.into_inner();
1736    let name = inner.expect_next("parameter name")?.as_str().to_string();
1737    let ty = parse_type(inner.expect_next("parameter type")?)?;
1738    Ok(Param { name, ty })
1739}
1740
1741fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1742    let mut inner = pair.into_inner();
1743    let first = inner.expect_next("config name or item")?;
1744
1745    // Check if first token is identifier (new syntax) or config_item (old syntax)
1746    let (name, items_start) = if first.as_rule() == Rule::identifier {
1747        (first.as_str().to_string(), None)
1748    } else {
1749        // Old syntax: config: with indentation - use "default" as name
1750        ("default".to_string(), Some(first))
1751    };
1752
1753    let mut items = Vec::new();
1754
1755    // If we have a config_item from old syntax, parse it first
1756    if let Some(first_item) = items_start {
1757        if first_item.as_rule() == Rule::config_item {
1758            items.push(parse_config_item(first_item)?);
1759        }
1760    }
1761
1762    for p in inner {
1763        if p.as_rule() == Rule::config_item {
1764            items.push(parse_config_item(p)?);
1765        }
1766    }
1767    Ok(Stmt::Config { name, items })
1768}
1769
1770fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1771    let mut inner = pair.into_inner();
1772    let key = inner.expect_next("config key")?.as_str().to_string();
1773    let value = parse_config_value(inner.expect_next("config value")?)?;
1774    Ok(ConfigItem::Value(key, value))
1775}
1776
1777fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1778    let cloned = pair.clone();
1779    let inner = pair.into_inner().next().unwrap_or(cloned);
1780
1781    match inner.as_rule() {
1782        Rule::config_concat => {
1783            let parts: Vec<ConfigValue> = inner
1784                .into_inner()
1785                .map(|atom| {
1786                    // Each atom is a config_atom containing either string or identifier
1787                    let inner_atom = atom.into_inner().next().expect("config_atom child");
1788                    match inner_atom.as_rule() {
1789                        Rule::string => {
1790                            let s = inner_atom.as_str();
1791                            Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1792                        }
1793                        Rule::identifier => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1794                        _ => Ok(ConfigValue::Ident(inner_atom.as_str().to_string())),
1795                    }
1796                })
1797                .collect::<ParseResult<Vec<_>>>()?;
1798            Ok(ConfigValue::Concat(parts))
1799        }
1800        Rule::config_array => {
1801            let values: Vec<ConfigValue> = inner
1802                .into_inner()
1803                .map(parse_config_value)
1804                .collect::<ParseResult<Vec<_>>>()?;
1805            Ok(ConfigValue::Array(values))
1806        }
1807        Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1808        Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1809        Rule::string => {
1810            let s = inner.as_str();
1811            Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1812        }
1813        Rule::duration => Ok(ConfigValue::Duration(
1814            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1815        )),
1816        Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1817        Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1818        _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1819    }
1820}
1821
1822fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1823    let mut inner = pair.into_inner();
1824    let path_pair = inner.expect_next("import path")?;
1825    let path = path_pair.as_str();
1826    let path = path[1..path.len() - 1].to_string();
1827    let alias = inner.next().map(|p| p.as_str().to_string());
1828    Ok(Stmt::Import { path, alias })
1829}
1830
1831fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1832    let mut inner = pair.into_inner();
1833    let cond = parse_expr(inner.expect_next("if condition")?)?;
1834
1835    let mut then_branch = Vec::new();
1836    let mut elif_branches = Vec::new();
1837    let mut else_branch = None;
1838
1839    for p in inner {
1840        match p.as_rule() {
1841            Rule::block => then_branch = parse_block(p)?,
1842            Rule::statement => then_branch.push(parse_statement(p)?),
1843            Rule::elif_clause => {
1844                let mut elif_inner = p.into_inner();
1845                let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1846                let mut elif_body = Vec::new();
1847                for ep in elif_inner {
1848                    match ep.as_rule() {
1849                        Rule::block => elif_body = parse_block(ep)?,
1850                        Rule::statement => elif_body.push(parse_statement(ep)?),
1851                        _ => {}
1852                    }
1853                }
1854                elif_branches.push((elif_cond, elif_body));
1855            }
1856            Rule::else_clause => {
1857                let mut else_body = Vec::new();
1858                for ep in p.into_inner() {
1859                    match ep.as_rule() {
1860                        Rule::block => else_body = parse_block(ep)?,
1861                        Rule::statement => else_body.push(parse_statement(ep)?),
1862                        _ => {}
1863                    }
1864                }
1865                else_branch = Some(else_body);
1866            }
1867            _ => {}
1868        }
1869    }
1870
1871    Ok(Stmt::If {
1872        cond,
1873        then_branch,
1874        elif_branches,
1875        else_branch,
1876    })
1877}
1878
1879fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1880    let mut inner = pair.into_inner();
1881    let var = inner.expect_next("loop variable")?.as_str().to_string();
1882    let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1883    let mut body = Vec::new();
1884    for p in inner {
1885        match p.as_rule() {
1886            Rule::block => body = parse_block(p)?,
1887            Rule::statement => body.push(parse_statement(p)?),
1888            _ => {}
1889        }
1890    }
1891    Ok(Stmt::For { var, iter, body })
1892}
1893
1894fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1895    let mut inner = pair.into_inner();
1896    let cond = parse_expr(inner.expect_next("while condition")?)?;
1897    let mut body = Vec::new();
1898    for p in inner {
1899        match p.as_rule() {
1900            Rule::block => body = parse_block(p)?,
1901            Rule::statement => body.push(parse_statement(p)?),
1902            _ => {}
1903        }
1904    }
1905    Ok(Stmt::While { cond, body })
1906}
1907
1908fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1909    let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1910    Ok(Stmt::Return(expr))
1911}
1912
1913fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1914    let mut inner = pair.into_inner();
1915    let event_type = inner.expect_next("event type name")?.as_str().to_string();
1916    let mut fields = Vec::new();
1917    for p in inner {
1918        if p.as_rule() == Rule::named_arg_list {
1919            for arg in p.into_inner() {
1920                fields.push(parse_named_arg(arg)?);
1921            }
1922        }
1923    }
1924    Ok(Stmt::Emit { event_type, fields })
1925}
1926
1927// ============================================================================
1928// Expression Parsing
1929// ============================================================================
1930
1931fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1932    let inner = pair.into_inner().next();
1933
1934    match inner {
1935        Some(p) => parse_expr_inner(p),
1936        None => Ok(Expr::Null),
1937    }
1938}
1939
1940fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1941    match pair.as_rule() {
1942        Rule::expr => parse_expr(pair),
1943        Rule::lambda_expr => parse_lambda_expr(pair),
1944        Rule::range_expr => parse_range_expr(pair),
1945        Rule::or_expr => parse_or_expr(pair),
1946        Rule::and_expr => parse_and_expr(pair),
1947        Rule::not_expr => parse_not_expr(pair),
1948        Rule::comparison_expr => parse_comparison_expr(pair),
1949        Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1950        Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1951        Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1952        Rule::shift_expr => parse_shift_expr(pair),
1953        Rule::additive_expr => parse_additive_expr(pair),
1954        Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1955        Rule::power_expr => parse_power_expr(pair),
1956        Rule::unary_expr => parse_unary_expr(pair),
1957        Rule::postfix_expr => parse_postfix_expr(pair),
1958        Rule::primary_expr => parse_primary_expr(pair),
1959        Rule::literal => parse_literal(pair),
1960        Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1961        Rule::if_expr => parse_if_expr(pair),
1962        _ => Ok(Expr::Ident(pair.as_str().to_string())),
1963    }
1964}
1965
1966fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1967    let mut inner = pair.into_inner();
1968    let mut params = Vec::new();
1969
1970    // Parse parameters
1971    let first = inner.expect_next("lambda parameters")?;
1972    match first.as_rule() {
1973        Rule::identifier_list => {
1974            for p in first.into_inner() {
1975                params.push(p.as_str().to_string());
1976            }
1977        }
1978        Rule::identifier => {
1979            params.push(first.as_str().to_string());
1980        }
1981        _ => {}
1982    }
1983
1984    // Parse body - could be expression or block
1985    let body_pair = inner.expect_next("lambda body")?;
1986    let body = match body_pair.as_rule() {
1987        Rule::lambda_block => parse_lambda_block(body_pair)?,
1988        _ => parse_expr_inner(body_pair)?,
1989    };
1990
1991    Ok(Expr::Lambda {
1992        params,
1993        body: Box::new(body),
1994    })
1995}
1996
1997fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1998    let mut stmts = Vec::new();
1999    let mut final_expr = None;
2000
2001    for p in pair.into_inner() {
2002        match p.as_rule() {
2003            Rule::statement => {
2004                // Check if it's a var_decl to extract for Block
2005                let stmt = parse_statement(p)?;
2006                match &stmt.node {
2007                    Stmt::VarDecl {
2008                        mutable,
2009                        name,
2010                        ty,
2011                        value,
2012                    } => {
2013                        stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
2014                    }
2015                    Stmt::Expr(e) => {
2016                        // Last expression becomes result
2017                        final_expr = Some(e.clone());
2018                    }
2019                    _ => {
2020                        // Other statements - treat as expression if possible
2021                    }
2022                }
2023            }
2024            _ => {
2025                // Expression at end of block
2026                final_expr = Some(parse_expr_inner(p)?);
2027            }
2028        }
2029    }
2030
2031    // If we have variable declarations, wrap in a Block expression
2032    if stmts.is_empty() {
2033        Ok(final_expr.unwrap_or(Expr::Null))
2034    } else {
2035        Ok(Expr::Block {
2036            stmts,
2037            result: Box::new(final_expr.unwrap_or(Expr::Null)),
2038        })
2039    }
2040}
2041
2042fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2043    // Convert pattern_or_expr to an Expr representation
2044    // pattern_or_expr = pattern_and_expr ~ ("or" ~ pattern_and_expr)*
2045    let mut inner = pair.into_inner();
2046    let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
2047
2048    for right_pair in inner {
2049        let right = parse_pattern_and_as_expr(right_pair)?;
2050        left = Expr::Binary {
2051            op: BinOp::Or,
2052            left: Box::new(left),
2053            right: Box::new(right),
2054        };
2055    }
2056    Ok(left)
2057}
2058
2059fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2060    let mut inner = pair.into_inner();
2061    let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
2062
2063    for right_pair in inner {
2064        let right = parse_pattern_xor_as_expr(right_pair)?;
2065        left = Expr::Binary {
2066            op: BinOp::And,
2067            left: Box::new(left),
2068            right: Box::new(right),
2069        };
2070    }
2071    Ok(left)
2072}
2073
2074fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2075    let mut inner = pair.into_inner();
2076    let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
2077
2078    for right_pair in inner {
2079        let right = parse_pattern_unary_as_expr(right_pair)?;
2080        left = Expr::Binary {
2081            op: BinOp::Xor,
2082            left: Box::new(left),
2083            right: Box::new(right),
2084        };
2085    }
2086    Ok(left)
2087}
2088
2089fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2090    let mut inner = pair.into_inner();
2091    let first = inner.expect_next("unary expression or operand")?;
2092
2093    if first.as_rule() == Rule::not_keyword {
2094        let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
2095        Ok(Expr::Unary {
2096            op: UnaryOp::Not,
2097            expr: Box::new(expr),
2098        })
2099    } else {
2100        parse_pattern_primary_as_expr(first)
2101    }
2102}
2103
2104fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2105    let inner = pair
2106        .into_inner()
2107        .expect_next("pattern primary expression")?;
2108
2109    match inner.as_rule() {
2110        Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
2111        Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
2112        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2113    }
2114}
2115
2116fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2117    // pattern_sequence = identifier ~ ("->" ~ identifier)*
2118    // Convert to a chain of FollowedBy binary operations
2119    let mut inner = pair.into_inner();
2120    let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
2121
2122    for right_pair in inner {
2123        let right = Expr::Ident(right_pair.as_str().to_string());
2124        left = Expr::Binary {
2125            op: BinOp::FollowedBy,
2126            left: Box::new(left),
2127            right: Box::new(right),
2128        };
2129    }
2130    Ok(left)
2131}
2132
2133fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2134    let inner = pair.into_inner().expect_next("filter expression")?;
2135    parse_filter_or_expr(inner)
2136}
2137
2138fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2139    let mut inner = pair.into_inner();
2140    let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
2141
2142    for right_pair in inner {
2143        let right = parse_filter_and_expr(right_pair)?;
2144        left = Expr::Binary {
2145            op: BinOp::Or,
2146            left: Box::new(left),
2147            right: Box::new(right),
2148        };
2149    }
2150    Ok(left)
2151}
2152
2153fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2154    let mut inner = pair.into_inner();
2155    let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2156
2157    for right_pair in inner {
2158        let right = parse_filter_not_expr(right_pair)?;
2159        left = Expr::Binary {
2160            op: BinOp::And,
2161            left: Box::new(left),
2162            right: Box::new(right),
2163        };
2164    }
2165    Ok(left)
2166}
2167
2168fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2169    let mut inner = pair.into_inner();
2170    let first = inner.expect_next("not or expression")?;
2171
2172    if first.as_rule() == Rule::not_keyword {
2173        let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2174        Ok(Expr::Unary {
2175            op: UnaryOp::Not,
2176            expr: Box::new(expr),
2177        })
2178    } else {
2179        parse_filter_comparison_expr(first)
2180    }
2181}
2182
2183fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2184    let mut inner = pair.into_inner();
2185    let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2186
2187    if let Some(op_pair) = inner.next() {
2188        let op = match op_pair.as_str() {
2189            "==" => BinOp::Eq,
2190            "!=" => BinOp::NotEq,
2191            "<" => BinOp::Lt,
2192            "<=" => BinOp::Le,
2193            ">" => BinOp::Gt,
2194            ">=" => BinOp::Ge,
2195            "in" => BinOp::In,
2196            "is" => BinOp::Is,
2197            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2198            _ => BinOp::Eq,
2199        };
2200        let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2201        Ok(Expr::Binary {
2202            op,
2203            left: Box::new(left),
2204            right: Box::new(right),
2205        })
2206    } else {
2207        Ok(left)
2208    }
2209}
2210
2211fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2212    let mut inner = pair.into_inner();
2213    let mut left =
2214        parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2215
2216    while let Some(op_pair) = inner.next() {
2217        let op = if op_pair.as_str() == "-" {
2218            BinOp::Sub
2219        } else {
2220            BinOp::Add
2221        };
2222        if let Some(right_pair) = inner.next() {
2223            let right = parse_filter_multiplicative_expr(right_pair)?;
2224            left = Expr::Binary {
2225                op,
2226                left: Box::new(left),
2227                right: Box::new(right),
2228            };
2229        }
2230    }
2231    Ok(left)
2232}
2233
2234fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2235    let mut inner = pair.into_inner();
2236    let mut left =
2237        parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2238
2239    while let Some(op_pair) = inner.next() {
2240        let op = match op_pair.as_str() {
2241            "*" => BinOp::Mul,
2242            "/" => BinOp::Div,
2243            "%" => BinOp::Mod,
2244            _ => BinOp::Mul,
2245        };
2246        if let Some(right_pair) = inner.next() {
2247            let right = parse_filter_unary_expr(right_pair)?;
2248            left = Expr::Binary {
2249                op,
2250                left: Box::new(left),
2251                right: Box::new(right),
2252            };
2253        }
2254    }
2255    Ok(left)
2256}
2257
2258fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2259    let mut inner = pair.into_inner();
2260    let first = inner.expect_next("unary operator or expression")?;
2261
2262    // Check if first is a unary operator or the expression itself
2263    if first.as_rule() == Rule::filter_unary_op {
2264        let op_str = first.as_str();
2265        let expr =
2266            parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2267        let op = match op_str {
2268            "-" => UnaryOp::Neg,
2269            "~" => UnaryOp::BitNot,
2270            _ => unreachable!("Grammar only allows - or ~"),
2271        };
2272        Ok(Expr::Unary {
2273            op,
2274            expr: Box::new(expr),
2275        })
2276    } else {
2277        parse_filter_postfix_expr(first)
2278    }
2279}
2280
2281fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2282    let mut inner = pair.into_inner();
2283    let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2284
2285    for suffix in inner {
2286        expr = parse_filter_postfix_suffix(expr, suffix)?;
2287    }
2288    Ok(expr)
2289}
2290
2291fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2292    let mut inner = pair.into_inner();
2293
2294    if let Some(first) = inner.next() {
2295        match first.as_rule() {
2296            Rule::identifier => {
2297                // Member access: .identifier
2298                Ok(Expr::Member {
2299                    expr: Box::new(expr),
2300                    member: first.as_str().to_string(),
2301                })
2302            }
2303            Rule::optional_member_access => {
2304                let member = first
2305                    .into_inner()
2306                    .expect_next("member name")?
2307                    .as_str()
2308                    .to_string();
2309                Ok(Expr::OptionalMember {
2310                    expr: Box::new(expr),
2311                    member,
2312                })
2313            }
2314            Rule::index_access => {
2315                let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2316                Ok(Expr::Index {
2317                    expr: Box::new(expr),
2318                    index: Box::new(index),
2319                })
2320            }
2321            Rule::call_args => {
2322                let args = first
2323                    .into_inner()
2324                    .filter(|p| p.as_rule() == Rule::arg_list)
2325                    .flat_map(|p| p.into_inner())
2326                    .map(parse_arg)
2327                    .collect::<ParseResult<Vec<_>>>()?;
2328                Ok(Expr::Call {
2329                    func: Box::new(expr),
2330                    args,
2331                })
2332            }
2333            _ => Ok(expr),
2334        }
2335    } else {
2336        Ok(expr)
2337    }
2338}
2339
2340fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2341    let inner = pair.into_inner().expect_next("filter primary expression")?;
2342
2343    match inner.as_rule() {
2344        Rule::literal => parse_literal(inner),
2345        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2346        Rule::filter_expr => parse_filter_expr(inner),
2347        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2348    }
2349}
2350
2351fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2352    let mut inner = pair.into_inner();
2353    let left = parse_expr_inner(inner.expect_next("range start")?)?;
2354
2355    if let Some(op_pair) = inner.next() {
2356        let inclusive = op_pair.as_str() == "..=";
2357        let right = parse_expr_inner(inner.expect_next("range end")?)?;
2358        Ok(Expr::Range {
2359            start: Box::new(left),
2360            end: Box::new(right),
2361            inclusive,
2362        })
2363    } else {
2364        Ok(left)
2365    }
2366}
2367
2368fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2369    let mut inner = pair.into_inner();
2370    let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2371
2372    for right_pair in inner {
2373        let right = parse_expr_inner(right_pair)?;
2374        left = Expr::Binary {
2375            op: BinOp::Or,
2376            left: Box::new(left),
2377            right: Box::new(right),
2378        };
2379    }
2380
2381    Ok(left)
2382}
2383
2384fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2385    let mut inner = pair.into_inner();
2386    let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2387
2388    for right_pair in inner {
2389        let right = parse_expr_inner(right_pair)?;
2390        left = Expr::Binary {
2391            op: BinOp::And,
2392            left: Box::new(left),
2393            right: Box::new(right),
2394        };
2395    }
2396
2397    Ok(left)
2398}
2399
2400fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2401    let mut inner = pair.into_inner();
2402    let first = inner.expect_next("not keyword or expression")?;
2403
2404    if first.as_rule() == Rule::not_keyword {
2405        let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2406        Ok(Expr::Unary {
2407            op: UnaryOp::Not,
2408            expr: Box::new(expr),
2409        })
2410    } else {
2411        parse_expr_inner(first)
2412    }
2413}
2414
2415fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2416    let mut inner = pair.into_inner();
2417    let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2418
2419    if let Some(op_pair) = inner.next() {
2420        let op_str = op_pair.as_str();
2421        let op = match op_str {
2422            "==" => BinOp::Eq,
2423            "!=" => BinOp::NotEq,
2424            "<" => BinOp::Lt,
2425            "<=" => BinOp::Le,
2426            ">" => BinOp::Gt,
2427            ">=" => BinOp::Ge,
2428            "in" => BinOp::In,
2429            "is" => BinOp::Is,
2430            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2431            _ => BinOp::Eq,
2432        };
2433        let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2434        Ok(Expr::Binary {
2435            op,
2436            left: Box::new(left),
2437            right: Box::new(right),
2438        })
2439    } else {
2440        Ok(left)
2441    }
2442}
2443
2444fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2445    parse_binary_chain(pair, BinOp::BitOr)
2446}
2447
2448fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2449    parse_binary_chain(pair, BinOp::BitXor)
2450}
2451
2452fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2453    parse_binary_chain(pair, BinOp::BitAnd)
2454}
2455
2456fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2457    let mut inner = pair.into_inner();
2458    let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2459
2460    while let Some(op_or_expr) = inner.next() {
2461        let op = match op_or_expr.as_str() {
2462            "<<" => BinOp::Shl,
2463            ">>" => BinOp::Shr,
2464            _ => {
2465                let right = parse_expr_inner(op_or_expr)?;
2466                left = Expr::Binary {
2467                    op: BinOp::Shl,
2468                    left: Box::new(left),
2469                    right: Box::new(right),
2470                };
2471                continue;
2472            }
2473        };
2474        if let Some(right_pair) = inner.next() {
2475            let right = parse_expr_inner(right_pair)?;
2476            left = Expr::Binary {
2477                op,
2478                left: Box::new(left),
2479                right: Box::new(right),
2480            };
2481        }
2482    }
2483
2484    Ok(left)
2485}
2486
2487fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2488    let mut inner = pair.into_inner();
2489    let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2490
2491    while let Some(op_pair) = inner.next() {
2492        let op_text = op_pair.as_str();
2493        let op = if op_text == "-" {
2494            BinOp::Sub
2495        } else {
2496            BinOp::Add
2497        };
2498
2499        if let Some(right_pair) = inner.next() {
2500            let right = parse_expr_inner(right_pair)?;
2501            left = Expr::Binary {
2502                op,
2503                left: Box::new(left),
2504                right: Box::new(right),
2505            };
2506        }
2507    }
2508
2509    Ok(left)
2510}
2511
2512fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2513    let mut inner = pair.into_inner();
2514    let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2515
2516    while let Some(op_pair) = inner.next() {
2517        let op_text = op_pair.as_str();
2518        let op = match op_text {
2519            "*" => BinOp::Mul,
2520            "/" => BinOp::Div,
2521            "%" => BinOp::Mod,
2522            _ => BinOp::Mul,
2523        };
2524
2525        if let Some(right_pair) = inner.next() {
2526            let right = parse_expr_inner(right_pair)?;
2527            left = Expr::Binary {
2528                op,
2529                left: Box::new(left),
2530                right: Box::new(right),
2531            };
2532        }
2533    }
2534
2535    Ok(left)
2536}
2537
2538fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2539    let mut inner = pair.into_inner();
2540    let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2541
2542    if let Some(exp_pair) = inner.next() {
2543        let exp = parse_expr_inner(exp_pair)?;
2544        Ok(Expr::Binary {
2545            op: BinOp::Pow,
2546            left: Box::new(base),
2547            right: Box::new(exp),
2548        })
2549    } else {
2550        Ok(base)
2551    }
2552}
2553
2554fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2555    let mut inner = pair.into_inner();
2556    let first = inner.expect_next("unary operator or expression")?;
2557
2558    // Check if first is a unary operator or the expression itself
2559    match first.as_rule() {
2560        Rule::unary_op => {
2561            let op_str = first.as_str();
2562            let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2563            let op = match op_str {
2564                "-" => UnaryOp::Neg,
2565                "~" => UnaryOp::BitNot,
2566                _ => unreachable!("Grammar only allows - or ~"),
2567            };
2568            Ok(Expr::Unary {
2569                op,
2570                expr: Box::new(expr),
2571            })
2572        }
2573        _ => parse_expr_inner(first),
2574    }
2575}
2576
2577fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2578    let mut inner = pair.into_inner();
2579    let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2580
2581    for suffix in inner {
2582        expr = parse_postfix_suffix(expr, suffix)?;
2583    }
2584
2585    Ok(expr)
2586}
2587
2588fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2589    let inner = pair.into_inner().expect_next("postfix suffix")?;
2590
2591    match inner.as_rule() {
2592        Rule::member_access => {
2593            let member = inner
2594                .into_inner()
2595                .expect_next("member name")?
2596                .as_str()
2597                .to_string();
2598            Ok(Expr::Member {
2599                expr: Box::new(expr),
2600                member,
2601            })
2602        }
2603        Rule::optional_member_access => {
2604            let member = inner
2605                .into_inner()
2606                .expect_next("member name")?
2607                .as_str()
2608                .to_string();
2609            Ok(Expr::OptionalMember {
2610                expr: Box::new(expr),
2611                member,
2612            })
2613        }
2614        Rule::slice_access => {
2615            // Parse slice: [start:end], [:end], [start:], [:]
2616            let slice_range = inner.into_inner().expect_next("slice range")?;
2617            let slice_inner = slice_range.into_inner();
2618
2619            let mut start = None;
2620            let mut end = None;
2621
2622            for p in slice_inner {
2623                match p.as_rule() {
2624                    Rule::slice_start => {
2625                        start = Some(Box::new(parse_expr_inner(
2626                            p.into_inner().expect_next("slice start expression")?,
2627                        )?));
2628                    }
2629                    Rule::slice_end => {
2630                        end = Some(Box::new(parse_expr_inner(
2631                            p.into_inner().expect_next("slice end expression")?,
2632                        )?));
2633                    }
2634                    _ => {}
2635                }
2636            }
2637
2638            Ok(Expr::Slice {
2639                expr: Box::new(expr),
2640                start,
2641                end,
2642            })
2643        }
2644        Rule::index_access => {
2645            let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2646            Ok(Expr::Index {
2647                expr: Box::new(expr),
2648                index: Box::new(index),
2649            })
2650        }
2651        Rule::call_args => {
2652            let mut args = Vec::new();
2653            for p in inner.into_inner() {
2654                if p.as_rule() == Rule::arg_list {
2655                    for arg in p.into_inner() {
2656                        args.push(parse_arg(arg)?);
2657                    }
2658                }
2659            }
2660            Ok(Expr::Call {
2661                func: Box::new(expr),
2662                args,
2663            })
2664        }
2665        _ => Ok(expr),
2666    }
2667}
2668
2669fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2670    let mut inner = pair.into_inner();
2671    let first = inner.expect_next("argument")?;
2672
2673    if let Some(second) = inner.next() {
2674        Ok(Arg::Named(
2675            first.as_str().to_string(),
2676            parse_expr_inner(second)?,
2677        ))
2678    } else {
2679        Ok(Arg::Positional(parse_expr_inner(first)?))
2680    }
2681}
2682
2683fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2684    let inner = pair.into_inner().expect_next("primary expression")?;
2685
2686    match inner.as_rule() {
2687        Rule::if_expr => parse_if_expr(inner),
2688        Rule::literal => parse_literal(inner),
2689        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2690        Rule::array_literal => parse_array_literal(inner),
2691        Rule::map_literal => parse_map_literal(inner),
2692        Rule::expr => parse_expr(inner),
2693        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2694    }
2695}
2696
2697fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2698    let mut inner = pair.into_inner();
2699    let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2700    let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2701    let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2702
2703    Ok(Expr::If {
2704        cond: Box::new(cond),
2705        then_branch: Box::new(then_branch),
2706        else_branch: Box::new(else_branch),
2707    })
2708}
2709
2710fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2711    let inner = pair.into_inner().expect_next("literal value")?;
2712
2713    match inner.as_rule() {
2714        Rule::integer => inner
2715            .as_str()
2716            .parse::<i64>()
2717            .map(Expr::Int)
2718            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2719        Rule::float => inner
2720            .as_str()
2721            .parse::<f64>()
2722            .map(Expr::Float)
2723            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2724        Rule::string => {
2725            let s = inner.as_str();
2726            Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2727        }
2728        Rule::duration => Ok(Expr::Duration(
2729            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2730        )),
2731        Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2732        Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2733        Rule::null => Ok(Expr::Null),
2734        _ => Ok(Expr::Null),
2735    }
2736}
2737
2738fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2739    let mut items = Vec::new();
2740    for p in pair.into_inner() {
2741        if p.as_rule() == Rule::expr_list {
2742            for expr in p.into_inner() {
2743                items.push(parse_expr(expr)?);
2744            }
2745        }
2746    }
2747    Ok(Expr::Array(items))
2748}
2749
2750fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2751    let mut entries = Vec::new();
2752    for p in pair.into_inner() {
2753        if p.as_rule() == Rule::map_entry_list {
2754            for entry in p.into_inner() {
2755                let mut inner = entry.into_inner();
2756                let key = inner.expect_next("map key")?.as_str().to_string();
2757                let key = if key.starts_with('"') {
2758                    key[1..key.len() - 1].to_string()
2759                } else {
2760                    key
2761                };
2762                let value = parse_expr(inner.expect_next("map value")?)?;
2763                entries.push((key, value));
2764            }
2765        }
2766    }
2767    Ok(Expr::Map(entries))
2768}
2769
2770fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2771    let mut inner = pair.into_inner();
2772    let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2773
2774    for right_pair in inner {
2775        let right = parse_expr_inner(right_pair)?;
2776        left = Expr::Binary {
2777            op,
2778            left: Box::new(left),
2779            right: Box::new(right),
2780        };
2781    }
2782
2783    Ok(left)
2784}
2785
2786#[cfg(test)]
2787mod tests {
2788    use super::*;
2789
2790    #[test]
2791    fn test_parse_simple_stream() {
2792        let result = parse("stream output = input");
2793        assert!(result.is_ok(), "Failed: {:?}", result.err());
2794    }
2795
2796    #[test]
2797    fn test_parse_stream_with_filter() {
2798        let result = parse("stream output = input.where(value > 100)");
2799        assert!(result.is_ok(), "Failed: {:?}", result.err());
2800    }
2801
2802    #[test]
2803    fn test_parse_stream_with_map() {
2804        let result = parse("stream output = input.map(x * 2)");
2805        assert!(result.is_ok(), "Failed: {:?}", result.err());
2806    }
2807
2808    #[test]
2809    fn test_parse_event_declaration() {
2810        let result = parse("event SensorReading:\n  sensor_id: str\n  value: float");
2811        assert!(result.is_ok(), "Failed: {:?}", result.err());
2812    }
2813
2814    #[test]
2815    fn test_parse_variable() {
2816        let result = parse("let x = 42");
2817        assert!(result.is_ok(), "Failed: {:?}", result.err());
2818    }
2819
2820    #[test]
2821    fn test_parse_function() {
2822        let result = parse("fn add(a: int, b: int) -> int:\n  return a + b");
2823        assert!(result.is_ok(), "Failed: {:?}", result.err());
2824    }
2825
2826    #[test]
2827    fn test_parse_lambda() {
2828        let result = parse("let f = (x) => x * 2");
2829        assert!(result.is_ok(), "Failed: {:?}", result.err());
2830    }
2831
2832    #[test]
2833    fn test_parse_if_expression() {
2834        let result = parse("let x = if a > b then a else b");
2835        assert!(result.is_ok(), "Failed: {:?}", result.err());
2836    }
2837
2838    #[test]
2839    fn test_parse_followed_by() {
2840        let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2841        assert!(result.is_ok(), "Failed: {:?}", result.err());
2842    }
2843
2844    #[test]
2845    fn test_parse_window() {
2846        let result = parse("stream windowed = input.window(5s)");
2847        assert!(result.is_ok(), "Failed: {:?}", result.err());
2848    }
2849
2850    #[test]
2851    fn test_parse_aggregate() {
2852        let result =
2853            parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2854        assert!(result.is_ok(), "Failed: {:?}", result.err());
2855    }
2856
2857    #[test]
2858    fn test_parse_merge() {
2859        let result = parse("stream combined = merge(stream1, stream2)");
2860        assert!(result.is_ok(), "Failed: {:?}", result.err());
2861    }
2862
2863    #[test]
2864    fn test_parse_sequence() {
2865        let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2866        assert!(result.is_ok(), "Failed: {:?}", result.err());
2867    }
2868
2869    #[test]
2870    fn test_parse_config() {
2871        let result = parse("config:\n  window_size: 5s\n  batch_size: 100");
2872        assert!(result.is_ok(), "Failed: {:?}", result.err());
2873    }
2874
2875    #[test]
2876    fn test_parse_complex_expression() {
2877        let result = parse("let x = (a + b) * c / d - e");
2878        assert!(result.is_ok(), "Failed: {:?}", result.err());
2879    }
2880
2881    #[test]
2882    fn test_parse_sliding_window() {
2883        let result = parse("stream output = input.window(5m, sliding: 1m)");
2884        assert!(result.is_ok(), "Failed: {:?}", result.err());
2885    }
2886
2887    #[test]
2888    fn test_parse_fork_construct() {
2889        let result =
2890            parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2891        assert!(result.is_ok(), "Failed: {:?}", result.err());
2892    }
2893
2894    #[test]
2895    fn test_parse_aggregate_functions() {
2896        let result = parse(
2897            "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2898        );
2899        assert!(result.is_ok(), "Failed: {:?}", result.err());
2900    }
2901
2902    #[test]
2903    fn test_parse_complex_parentheses() {
2904        let result = parse("let x = ((a + b) * (c - d)) / e");
2905        assert!(result.is_ok(), "Failed: {:?}", result.err());
2906    }
2907
2908    #[test]
2909    fn test_parse_sequence_with_alias() {
2910        let result = parse(
2911            r#"
2912            stream TwoTicks = StockTick as first
2913                -> StockTick as second
2914                .emit(result: "two_ticks")
2915        "#,
2916        );
2917        assert!(result.is_ok(), "Failed: {:?}", result.err());
2918    }
2919
2920    #[test]
2921    fn test_parse_followed_by_with_alias() {
2922        let result = parse("stream alerts = Order as a -> Payment as b");
2923        assert!(result.is_ok(), "Failed: {:?}", result.err());
2924    }
2925
2926    #[test]
2927    fn test_parse_followed_by_with_filter_and_alias() {
2928        // This is the problematic case: 'as b' should be the alias, not part of the expression
2929        let result = parse(
2930            r#"
2931            stream Test = A as a
2932                -> B where value == a.base + 10 as b
2933                .emit(status: "matched")
2934        "#,
2935        );
2936        assert!(result.is_ok(), "Failed: {:?}", result.err());
2937    }
2938
2939    #[test]
2940    fn test_parse_pattern_with_lambda() {
2941        let result =
2942            parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2943        assert!(result.is_ok(), "Failed: {:?}", result.err());
2944    }
2945
2946    #[test]
2947    fn test_parse_sase_pattern_decl_simple() {
2948        let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2949        assert!(result.is_ok(), "Failed: {:?}", result.err());
2950    }
2951
2952    #[test]
2953    fn test_parse_sase_pattern_decl_with_kleene() {
2954        let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2955        assert!(result.is_ok(), "Failed: {:?}", result.err());
2956    }
2957
2958    #[test]
2959    fn test_parse_sase_pattern_decl_with_alias() {
2960        let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2961        assert!(result.is_ok(), "Failed: {:?}", result.err());
2962    }
2963
2964    #[test]
2965    fn test_parse_sase_pattern_decl_with_within() {
2966        let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2967        assert!(result.is_ok(), "Failed: {:?}", result.err());
2968    }
2969
2970    #[test]
2971    fn test_parse_sase_pattern_decl_with_partition() {
2972        let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2973        assert!(result.is_ok(), "Failed: {:?}", result.err());
2974    }
2975
2976    #[test]
2977    fn test_parse_sase_pattern_decl_full() {
2978        // Note: In SASE+ syntax, 'where' comes before 'as' (filter then alias)
2979        let result = parse(
2980            "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2981        );
2982        assert!(result.is_ok(), "Failed: {:?}", result.err());
2983    }
2984
2985    #[test]
2986    fn test_parse_sase_pattern_decl_or() {
2987        let result = parse("pattern AlertOrWarn = Login OR Logout");
2988        assert!(result.is_ok(), "Failed: {:?}", result.err());
2989    }
2990
2991    #[test]
2992    fn test_parse_sase_pattern_decl_and() {
2993        let result = parse("pattern BothEvents = Login AND Transaction");
2994        assert!(result.is_ok(), "Failed: {:?}", result.err());
2995    }
2996
2997    #[test]
2998    fn test_parse_sase_pattern_decl_not() {
2999        let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
3000        assert!(result.is_ok(), "Failed: {:?}", result.err());
3001    }
3002
3003    #[test]
3004    fn test_parse_having() {
3005        let result = parse(
3006            "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
3007        );
3008        assert!(result.is_ok(), "Failed: {:?}", result.err());
3009    }
3010
3011    #[test]
3012    fn test_parse_having_with_partition() {
3013        let result = parse(
3014            "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
3015        );
3016        assert!(result.is_ok(), "Failed: {:?}", result.err());
3017    }
3018
3019    #[test]
3020    fn test_parse_timer_source() {
3021        let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
3022        assert!(result.is_ok(), "Failed: {:?}", result.err());
3023    }
3024
3025    #[test]
3026    fn test_parse_timer_source_with_initial_delay() {
3027        let result =
3028            parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
3029        assert!(result.is_ok(), "Failed: {:?}", result.err());
3030    }
3031
3032    #[test]
3033    fn test_parse_var_decl() {
3034        let result = parse("var threshold: float = 10.0");
3035        assert!(result.is_ok(), "Failed: {:?}", result.err());
3036    }
3037
3038    #[test]
3039    fn test_parse_let_decl() {
3040        let result = parse("let max_count: int = 100");
3041        assert!(result.is_ok(), "Failed: {:?}", result.err());
3042    }
3043
3044    #[test]
3045    fn test_parse_assignment() {
3046        let result = parse("threshold := threshold + 10.0");
3047        assert!(result.is_ok(), "Failed: {:?}", result.err());
3048    }
3049
3050    #[test]
3051    fn test_parse_assignment_with_expression() {
3052        let result = parse("count := count * 2 + offset");
3053        assert!(result.is_ok(), "Failed: {:?}", result.err());
3054    }
3055
3056    #[test]
3057    fn test_parse_nested_stream_reference() {
3058        let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
3059        assert!(result.is_ok(), "Failed: {:?}", result.err());
3060    }
3061
3062    #[test]
3063    fn test_parse_multi_stage_pipeline() {
3064        let result = parse(
3065            "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
3066        );
3067        assert!(result.is_ok(), "Failed: {:?}", result.err());
3068    }
3069
3070    #[test]
3071    fn test_parse_stream_with_operations_chain() {
3072        let result = parse(
3073            "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
3074        );
3075        assert!(result.is_ok(), "Failed: {:?}", result.err());
3076    }
3077
3078    // =========================================================================
3079    // Connectivity Architecture Tests
3080    // =========================================================================
3081
3082    #[test]
3083    fn test_parse_connector_mqtt() {
3084        let result = parse(
3085            r#"connector MqttBroker = mqtt (
3086                host: "localhost",
3087                port: 1883,
3088                client_id: "varpulis"
3089            )"#,
3090        );
3091        assert!(result.is_ok(), "Failed: {:?}", result.err());
3092    }
3093
3094    #[test]
3095    fn test_parse_connector_kafka() {
3096        let result = parse(
3097            r#"connector KafkaCluster = kafka (
3098                brokers: ["kafka1:9092", "kafka2:9092"],
3099                group_id: "my-group"
3100            )"#,
3101        );
3102        assert!(result.is_ok(), "Failed: {:?}", result.err());
3103    }
3104
3105    #[test]
3106    fn test_parse_connector_http() {
3107        let result = parse(
3108            r#"connector ApiEndpoint = http (
3109                base_url: "https://api.example.com"
3110            )"#,
3111        );
3112        assert!(result.is_ok(), "Failed: {:?}", result.err());
3113    }
3114
3115    #[test]
3116    fn test_parse_stream_with_from_connector() {
3117        let result = parse(
3118            r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
3119        );
3120        assert!(result.is_ok(), "Failed: {:?}", result.err());
3121    }
3122
3123    #[test]
3124    fn test_parse_stream_with_from_and_operations() {
3125        let result = parse(
3126            r#"stream HighTemp = TemperatureReading
3127                .from(MqttSensors, topic: "sensors/#")
3128                .where(value > 30)
3129                .emit(alert: "high_temp")"#,
3130        );
3131        assert!(result.is_ok(), "Failed: {:?}", result.err());
3132    }
3133
3134    #[test]
3135    fn test_parse_full_connectivity_pipeline() {
3136        let result = parse(
3137            r#"
3138            connector MqttSensors = mqtt (host: "localhost", port: 1883)
3139            connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
3140
3141            event TemperatureReading:
3142                sensor_id: str
3143                value: float
3144                ts: timestamp
3145
3146            stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
3147
3148            stream HighTempAlert = Temperatures
3149                .where(value > 30)
3150                .emit(alert_type: "HIGH_TEMP", temperature: value)
3151
3152            "#,
3153        );
3154        assert!(result.is_ok(), "Failed: {:?}", result.err());
3155    }
3156
3157    #[test]
3158    fn test_parse_emit_as_type() {
3159        let result = parse(
3160            r#"stream Alerts = Temperatures
3161                .where(value > 30)
3162                .emit as AlertEvent(severity: "high", temp: value)"#,
3163        );
3164        assert!(result.is_ok(), "Failed: {:?}", result.err());
3165    }
3166
3167    #[test]
3168    fn test_parse_stream_with_to_connector() {
3169        let result = parse(
3170            r#"stream Output = Input
3171                .where(x > 0)
3172                .emit(y: x * 2)
3173                .to(KafkaOutput, topic: "output")"#,
3174        );
3175        assert!(result.is_ok(), "Failed: {:?}", result.err());
3176    }
3177
3178    #[test]
3179    fn test_emit_stmt_parses() {
3180        let result = parse(
3181            r"fn test():
3182    emit Pixel(x: 1, y: 2)",
3183        );
3184        assert!(result.is_ok(), "Failed: {:?}", result.err());
3185        let program = result.unwrap();
3186        // Find the fn_decl
3187        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3188            match &body[0].node {
3189                Stmt::Emit { event_type, fields } => {
3190                    assert_eq!(event_type, "Pixel");
3191                    assert_eq!(fields.len(), 2);
3192                    assert_eq!(fields[0].name, "x");
3193                    assert_eq!(fields[1].name, "y");
3194                }
3195                other => panic!("Expected Stmt::Emit, got {other:?}"),
3196            }
3197        } else {
3198            panic!("Expected FnDecl");
3199        }
3200    }
3201
3202    #[test]
3203    fn test_emit_stmt_no_args() {
3204        let result = parse(
3205            r"fn test():
3206    emit Done()",
3207        );
3208        assert!(result.is_ok(), "Failed: {:?}", result.err());
3209        let program = result.unwrap();
3210        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3211            match &body[0].node {
3212                Stmt::Emit { event_type, fields } => {
3213                    assert_eq!(event_type, "Done");
3214                    assert!(fields.is_empty());
3215                }
3216                other => panic!("Expected Stmt::Emit, got {other:?}"),
3217            }
3218        } else {
3219            panic!("Expected FnDecl");
3220        }
3221    }
3222
3223    #[test]
3224    fn test_emit_in_function_with_for_loop() {
3225        let result = parse(
3226            r"fn generate(n: int):
3227    for i in 0..n:
3228        emit Item(index: i, value: i * 2)",
3229        );
3230        assert!(result.is_ok(), "Failed: {:?}", result.err());
3231    }
3232
3233    #[test]
3234    fn test_parse_process_op() {
3235        let result = parse(
3236            r"fn do_work():
3237    emit Result(v: 42)
3238
3239stream S = timer(1s).process(do_work())",
3240        );
3241        assert!(result.is_ok(), "Failed: {:?}", result.err());
3242    }
3243
3244    #[test]
3245    fn test_parse_trend_aggregate_count_trends() {
3246        let result = parse(
3247            r#"stream S = StockTick as first
3248    -> all StockTick where price > first.price as rising
3249    -> StockTick where price < rising.price as drop
3250    .within(60s)
3251    .trend_aggregate(count: count_trends())
3252    .emit(event_type: "TrendStats", trends: count)"#,
3253        );
3254        assert!(result.is_ok(), "Failed: {:?}", result.err());
3255        let program = result.unwrap();
3256        // Find the stream declaration and check for TrendAggregate op
3257        for stmt in &program.statements {
3258            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3259                let has_trend_agg = ops
3260                    .iter()
3261                    .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3262                assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3263                // Check that TrendAggregate has the right item
3264                for op in ops {
3265                    if let StreamOp::TrendAggregate(items) = op {
3266                        assert_eq!(items.len(), 1);
3267                        assert_eq!(items[0].alias, "count");
3268                        assert_eq!(items[0].func, "count_trends");
3269                        assert!(items[0].arg.is_none());
3270                    }
3271                }
3272                return;
3273            }
3274        }
3275        panic!("No stream declaration found");
3276    }
3277
3278    #[test]
3279    fn test_parse_trend_aggregate_multiple_items() {
3280        let result = parse(
3281            r"stream S = StockTick as first
3282    -> all StockTick as rising
3283    .within(60s)
3284    .trend_aggregate(
3285        trend_count: count_trends(),
3286        event_count: count_events(rising)
3287    )
3288    .emit(trends: trend_count, events: event_count)",
3289        );
3290        assert!(result.is_ok(), "Failed: {:?}", result.err());
3291        let program = result.unwrap();
3292        for stmt in &program.statements {
3293            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3294                for op in ops {
3295                    if let StreamOp::TrendAggregate(items) = op {
3296                        assert_eq!(items.len(), 2);
3297                        assert_eq!(items[0].alias, "trend_count");
3298                        assert_eq!(items[0].func, "count_trends");
3299                        assert_eq!(items[1].alias, "event_count");
3300                        assert_eq!(items[1].func, "count_events");
3301                        assert!(items[1].arg.is_some());
3302                        return;
3303                    }
3304                }
3305            }
3306        }
3307        panic!("No TrendAggregate found");
3308    }
3309
3310    #[test]
3311    fn test_parse_score_basic() {
3312        let result = parse(
3313            r#"stream S = TradeEvent
3314    .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3315        );
3316        assert!(result.is_ok(), "Failed: {:?}", result.err());
3317        let program = result.unwrap();
3318        for stmt in &program.statements {
3319            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3320                for op in ops {
3321                    if let StreamOp::Score(spec) = op {
3322                        assert_eq!(spec.model_path, "models/fraud.onnx");
3323                        assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3324                        assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3325                        return;
3326                    }
3327                }
3328            }
3329        }
3330        panic!("No Score op found");
3331    }
3332
3333    #[test]
3334    fn test_parse_score_single_field() {
3335        let result = parse(
3336            r#"stream S = Event
3337    .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3338        );
3339        assert!(result.is_ok(), "Failed: {:?}", result.err());
3340        let program = result.unwrap();
3341        for stmt in &program.statements {
3342            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3343                for op in ops {
3344                    if let StreamOp::Score(spec) = op {
3345                        assert_eq!(spec.model_path, "model.onnx");
3346                        assert_eq!(spec.inputs, vec!["value"]);
3347                        assert_eq!(spec.outputs, vec!["prediction"]);
3348                        return;
3349                    }
3350                }
3351            }
3352        }
3353        panic!("No Score op found");
3354    }
3355
3356    // Regression tests for fuzz-discovered parser hangs (2026-02-21).
3357    // Unmatched `[` brackets cause exponential backtracking in pest's PEG
3358    // recursive descent through array_literal / index_access / slice_access.
3359
3360    #[test]
3361    fn fuzz_regression_unmatched_brackets_timeout() {
3362        // Simplified version of the fuzzer-discovered timeout input (28 unmatched '[')
3363        let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3364        let start = std::time::Instant::now();
3365        let result = parse(input);
3366        let elapsed = start.elapsed();
3367        assert!(
3368            result.is_err(),
3369            "Should reject deeply nested unmatched brackets"
3370        );
3371        assert!(
3372            elapsed.as_millis() < 100,
3373            "Parser should reject fast, took {elapsed:?}"
3374        );
3375    }
3376
3377    #[test]
3378    fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3379        // 30 unmatched '[' — must be rejected by nesting depth check
3380        let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3381        let start = std::time::Instant::now();
3382        let result = parse(input);
3383        let elapsed = start.elapsed();
3384        assert!(result.is_err(), "Should reject deeply nested brackets");
3385        assert!(
3386            elapsed.as_millis() < 100,
3387            "Parser should reject fast, took {elapsed:?}"
3388        );
3389    }
3390
3391    #[test]
3392    fn nesting_depth_allows_reasonable_programs() {
3393        // 10 levels of nesting — well within the limit
3394        let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3395        let result = parse(input);
3396        // May fail for other reasons (not a valid program) but should NOT
3397        // fail with "Nesting depth exceeds maximum"
3398        if let Err(ref e) = result {
3399            let msg = format!("{e}");
3400            assert!(
3401                !msg.contains("Nesting depth"),
3402                "Should allow 10 levels of nesting: {msg}"
3403            );
3404        }
3405    }
3406
3407    #[test]
3408    fn nesting_depth_ignores_brackets_in_comments() {
3409        // Brackets inside # comments should not count
3410        let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3411        let result = parse(input);
3412        assert!(
3413            result.is_ok(),
3414            "Brackets in comments should be ignored: {:?}",
3415            result.err()
3416        );
3417    }
3418
3419    #[test]
3420    fn nesting_depth_ignores_brackets_in_strings() {
3421        // Brackets inside strings should not count
3422        let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3423        let result = parse(input);
3424        if let Err(ref e) = result {
3425            let msg = format!("{e}");
3426            assert!(
3427                !msg.contains("Nesting depth"),
3428                "Brackets in strings should be ignored: {msg}"
3429            );
3430        }
3431    }
3432}