Skip to main content

varpulis_parser/
pest_parser.rs

1//! Pest-based parser for VPL
2//!
3//! This module provides parsing using the pest PEG parser generator.
4//!
5//! The `Rule` enum and its variants are auto-generated by `pest_derive`
6//! from the grammar file and cannot carry doc comments.
7
8use pest::Parser;
9use pest_derive::Parser;
10
11use crate::error::{ParseError, ParseResult};
12use crate::helpers::{parse_duration, parse_timestamp};
13use crate::indent::preprocess_indentation;
14use varpulis_core::ast::*;
15use varpulis_core::span::{Span, Spanned};
16use varpulis_core::types::Type;
17
18/// Extension trait for safer iterator extraction
19trait IteratorExt<'a> {
20    /// Get the next element or return an error with the expected rule description
21    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
22}
23
24impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
25    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
26        self.next().ok_or_else(|| ParseError::Located {
27            line: 0,
28            column: 0,
29            position: 0,
30            message: format!("Expected {expected}"),
31            hint: None,
32        })
33    }
34}
35
36/// Pest-based VPL parser.
37///
38/// The grammar rules are auto-generated from `varpulis.pest` by the
39/// `pest_derive` macro. Use [`parse`] instead of calling this directly.
40#[derive(Debug, Parser)]
41#[grammar = "varpulis.pest"]
42pub struct VarpulisParser;
43
44/// Parse a VPL source string into a Program AST.
45///
46/// Runs pest parsing in a thread with a 16 MB stack to prevent stack overflow
47/// from deeply nested or adversarial inputs that trigger deep recursion in the
48/// PEG recursive descent parser.
49pub fn parse(source: &str) -> ParseResult<Program> {
50    let source = source.to_string();
51    std::thread::Builder::new()
52        .stack_size(16 * 1024 * 1024)
53        .spawn(move || parse_inner(&source))
54        .map_err(|e| ParseError::InvalidToken {
55            position: 0,
56            message: format!("Failed to spawn parser thread: {e}"),
57        })?
58        .join()
59        .unwrap_or_else(|_| {
60            Err(ParseError::InvalidToken {
61                position: 0,
62                message: "Parser stack overflow on deeply nested input".to_string(),
63            })
64        })
65}
66
67/// Maximum bracket nesting depth allowed before pest parsing.
68///
69/// PEG recursive descent can cause exponential backtracking when unmatched
70/// brackets create ambiguity between array_literal, index_access, and
71/// slice_access rules.  Measured scaling is O(2.35^depth): depth 20 takes
72/// 1200s+, depth 16 takes 39s, depth 10 takes under 0.3s.  10 levels is
73/// generous for real VPL programs (typical nesting is 3-6 levels, extreme
74/// real-world is about 8) while keeping worst-case parse time under 1 second.
75const MAX_NESTING_DEPTH: usize = 10;
76
77/// O(n) pre-scan that rejects inputs with bracket nesting deeper than
78/// `MAX_NESTING_DEPTH`. Respects string literals and comments so that
79/// brackets inside `"..."`, `# ...`, or `/* ... */` are ignored.
80fn check_nesting_depth(source: &str) -> ParseResult<()> {
81    let mut depth: usize = 0;
82    let mut max_depth: usize = 0;
83    let mut max_depth_pos: usize = 0;
84    let bytes = source.as_bytes();
85    let len = bytes.len();
86    let mut i = 0;
87
88    while i < len {
89        let b = bytes[i];
90
91        // Skip double-quoted strings
92        if b == b'"' {
93            i += 1;
94            while i < len {
95                if bytes[i] == b'\\' {
96                    i += 2; // skip escaped char
97                    continue;
98                }
99                if bytes[i] == b'"' {
100                    i += 1;
101                    break;
102                }
103                i += 1;
104            }
105            continue;
106        }
107
108        // Skip VPL line comments (# to end of line)
109        if b == b'#' {
110            i += 1;
111            while i < len && bytes[i] != b'\n' {
112                i += 1;
113            }
114            continue;
115        }
116
117        // Skip block comments
118        if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
119            i += 2;
120            while i + 1 < len {
121                if bytes[i] == b'*' && bytes[i + 1] == b'/' {
122                    i += 2;
123                    break;
124                }
125                i += 1;
126            }
127            continue;
128        }
129
130        // Track bracket depth
131        if b == b'(' || b == b'[' || b == b'{' {
132            depth += 1;
133            if depth > max_depth {
134                max_depth = depth;
135                max_depth_pos = i;
136            }
137        } else if b == b')' || b == b']' || b == b'}' {
138            depth = depth.saturating_sub(1);
139        }
140
141        if max_depth > MAX_NESTING_DEPTH {
142            return Err(ParseError::InvalidToken {
143                position: max_depth_pos,
144                message: format!("Nesting depth exceeds maximum of {MAX_NESTING_DEPTH} levels"),
145            });
146        }
147
148        i += 1;
149    }
150
151    Ok(())
152}
153
154fn parse_inner(source: &str) -> ParseResult<Program> {
155    // Expand compile-time declaration loops (top-level for with {var} interpolation)
156    let expanded =
157        crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
158            position: 0,
159            message: e,
160        })?;
161    // Preprocess to add INDENT/DEDENT markers
162    let preprocessed = preprocess_indentation(&expanded);
163
164    // Reject deeply nested input before pest parsing to prevent stack overflow
165    check_nesting_depth(&preprocessed)?;
166
167    let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
168
169    let mut statements = Vec::new();
170
171    for pair in pairs {
172        if pair.as_rule() == Rule::program {
173            for inner in pair.into_inner() {
174                if inner.as_rule() == Rule::statement {
175                    statements.push(parse_statement(inner)?);
176                }
177            }
178        }
179    }
180
181    Ok(crate::optimize::fold_program(Program { statements }))
182}
183
184fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
185    let position = match e.location {
186        pest::error::InputLocation::Pos(p) => p,
187        pest::error::InputLocation::Span((s, _)) => s,
188    };
189
190    // Extract line/column from pest error
191    let (line, column) = match e.line_col {
192        pest::error::LineColLocation::Pos((l, c)) => (l, c),
193        pest::error::LineColLocation::Span((l, c), _) => (l, c),
194    };
195
196    // Create a human-readable message based on what was expected
197    let (message, hint) = match &e.variant {
198        pest::error::ErrorVariant::ParsingError {
199            positives,
200            negatives: _,
201        } => {
202            if positives.is_empty() {
203                ("Unexpected token".to_string(), None)
204            } else if is_stream_op_error(positives) {
205                // All positives are stream operations — produce a concise error
206                (
207                    "unknown stream operation".to_string(),
208                    Some(
209                        "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
210                         .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
211                         .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
212                            .to_string(),
213                    ),
214                )
215            } else {
216                let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
217                if expected.len() == 1 {
218                    (format!("Expected {}", expected[0]), None)
219                } else {
220                    (format!("Expected one of: {}", expected.join(", ")), None)
221                }
222            }
223        }
224        pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
225    };
226
227    ParseError::Located {
228        line,
229        column,
230        position,
231        message,
232        hint,
233    }
234}
235
236/// Convert pest Rule names to human-readable format
237fn format_rule_name(rule: &Rule) -> String {
238    match rule {
239        Rule::identifier => "identifier".to_string(),
240        Rule::integer => "number".to_string(),
241        Rule::float => "number".to_string(),
242        Rule::string => "string".to_string(),
243        Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
244        Rule::type_expr => "type".to_string(),
245        Rule::expr => "expression".to_string(),
246        Rule::statement => "statement".to_string(),
247        Rule::context_decl => "context declaration".to_string(),
248        Rule::stream_decl => "stream declaration".to_string(),
249        Rule::pattern_decl => "pattern declaration".to_string(),
250        Rule::event_decl => "event declaration".to_string(),
251        Rule::fn_decl => "function declaration".to_string(),
252        Rule::INDENT => "indented block".to_string(),
253        Rule::DEDENT => "end of block".to_string(),
254        Rule::field => "field declaration (name: type)".to_string(),
255        Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
256        Rule::additive_op => "operator (+, -)".to_string(),
257        Rule::multiplicative_op => "operator (*, /, %)".to_string(),
258        Rule::postfix_suffix => "method call or member access".to_string(),
259        Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
260        Rule::sase_seq_expr => "SEQ expression".to_string(),
261        Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
262        _ => format!("{rule:?}").to_lowercase().replace('_', " "),
263    }
264}
265
266/// Returns true when all positives look like stream operation rules.
267/// This is used to produce a concise "unknown stream operation" error
268/// instead of listing 30+ individual operation names.
269fn is_stream_op_error(positives: &[Rule]) -> bool {
270    const STREAM_OP_RULES: &[Rule] = &[
271        Rule::context_op,
272        Rule::where_op,
273        Rule::select_op,
274        Rule::window_op,
275        Rule::aggregate_op,
276        Rule::having_op,
277        Rule::partition_by_op,
278        Rule::order_by_op,
279        Rule::limit_op,
280        Rule::distinct_op,
281        Rule::map_op,
282        Rule::filter_op,
283        Rule::tap_op,
284        Rule::print_op,
285        Rule::log_op,
286        Rule::emit_op,
287        Rule::to_op,
288        Rule::pattern_op,
289        Rule::concurrent_op,
290        Rule::process_op,
291        Rule::on_error_op,
292        Rule::collect_op,
293        Rule::on_op,
294        Rule::within_op,
295        Rule::not_op,
296        Rule::fork_op,
297        Rule::any_op,
298        Rule::all_op,
299        Rule::first_op,
300        Rule::watermark_op,
301        Rule::allowed_lateness_op,
302        Rule::trend_aggregate_op,
303        Rule::score_op,
304        Rule::forecast_op,
305        Rule::enrich_op,
306    ];
307    positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
308}
309
310fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
311    let span = Span::new(pair.as_span().start(), pair.as_span().end());
312    let inner = pair.into_inner().expect_next("statement body")?;
313
314    let stmt = match inner.as_rule() {
315        Rule::context_decl => parse_context_decl(inner)?,
316        Rule::connector_decl => parse_connector_decl(inner)?,
317        Rule::stream_decl => parse_stream_decl(inner)?,
318        Rule::pattern_decl => parse_pattern_decl(inner)?,
319        Rule::event_decl => parse_event_decl(inner)?,
320        Rule::type_decl => parse_type_decl(inner)?,
321        Rule::var_decl => parse_var_decl(inner)?,
322        Rule::const_decl => parse_const_decl(inner)?,
323        Rule::fn_decl => parse_fn_decl(inner)?,
324        Rule::config_block => parse_config_block(inner)?,
325        Rule::import_stmt => parse_import_stmt(inner)?,
326        Rule::if_stmt => parse_if_stmt(inner)?,
327        Rule::for_stmt => parse_for_stmt(inner)?,
328        Rule::while_stmt => parse_while_stmt(inner)?,
329        Rule::return_stmt => parse_return_stmt(inner)?,
330        Rule::break_stmt => Stmt::Break,
331        Rule::continue_stmt => Stmt::Continue,
332        Rule::emit_stmt => parse_emit_stmt(inner)?,
333        Rule::assignment_stmt => {
334            let mut inner = inner.into_inner();
335            let name = inner.expect_next("variable name")?.as_str().to_string();
336            let value = parse_expr(inner.expect_next("assignment value")?)?;
337            Stmt::Assignment { name, value }
338        }
339        Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
340        _ => {
341            return Err(ParseError::UnexpectedToken {
342                position: span.start,
343                expected: "statement".to_string(),
344                found: format!("{:?}", inner.as_rule()),
345            })
346        }
347    };
348
349    Ok(Spanned::new(stmt, span))
350}
351
352// ============================================================================
353// Context Declaration Parsing
354// ============================================================================
355
356fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
357    let mut inner = pair.into_inner();
358    let name = inner.expect_next("context name")?.as_str().to_string();
359    let mut cores = None;
360
361    for p in inner {
362        if p.as_rule() == Rule::context_params {
363            for param in p.into_inner() {
364                if param.as_rule() == Rule::context_param {
365                    // Parse cores: [0, 1, 2]
366                    let core_ids: Vec<usize> = param
367                        .into_inner()
368                        .filter(|p| p.as_rule() == Rule::integer)
369                        .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
370                        .collect();
371                    cores = Some(core_ids);
372                }
373            }
374        }
375    }
376
377    Ok(Stmt::ContextDecl { name, cores })
378}
379
380// ============================================================================
381// Connector Parsing
382// ============================================================================
383
384fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
385    let mut inner = pair.into_inner();
386    let name = inner.expect_next("connector name")?.as_str().to_string();
387    let connector_type = inner.expect_next("connector type")?.as_str().to_string();
388    let mut params = Vec::new();
389
390    for p in inner {
391        if p.as_rule() == Rule::connector_params {
392            params = parse_connector_params(p)?;
393        }
394    }
395
396    Ok(Stmt::ConnectorDecl {
397        name,
398        connector_type,
399        params,
400    })
401}
402
403fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
404    let mut params = Vec::new();
405    for p in pair.into_inner() {
406        if p.as_rule() == Rule::connector_param {
407            let mut inner = p.into_inner();
408            let name = inner.expect_next("param name")?.as_str().to_string();
409            let value_pair = inner.expect_next("param value")?;
410            let value = parse_config_value(value_pair)?;
411            params.push(ConnectorParam { name, value });
412        }
413    }
414    Ok(params)
415}
416
417fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
418    let mut inner = pair.into_inner();
419    let name = inner.expect_next("stream name")?.as_str().to_string();
420
421    let mut type_annotation = None;
422    let mut source = StreamSource::Ident(String::new());
423    let mut ops = Vec::new();
424    let mut op_spans = Vec::new();
425
426    for p in inner {
427        match p.as_rule() {
428            Rule::type_annotation => {
429                type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
430            }
431            Rule::stream_expr => {
432                let (s, o, spans) = parse_stream_expr(p)?;
433                source = s;
434                ops = o;
435                op_spans = spans;
436            }
437            _ => {}
438        }
439    }
440
441    Ok(Stmt::StreamDecl {
442        name,
443        type_annotation,
444        source,
445        ops,
446        op_spans,
447    })
448}
449
450fn parse_stream_expr(
451    pair: pest::iterators::Pair<Rule>,
452) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
453    let mut inner = pair.into_inner();
454    let source = parse_stream_source(inner.expect_next("stream source")?)?;
455    let mut ops = Vec::new();
456    let mut op_spans = Vec::new();
457
458    for p in inner {
459        if p.as_rule() == Rule::stream_op {
460            let pest_span = p.as_span();
461            let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
462            ops.push(parse_stream_op(p)?);
463            op_spans.push(span);
464        }
465    }
466
467    Ok((source, ops, op_spans))
468}
469
470// ============================================================================
471// SASE+ Pattern Declaration Parsing
472// ============================================================================
473
474fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
475    let mut inner = pair.into_inner();
476    let name = inner.expect_next("pattern name")?.as_str().to_string();
477
478    let mut expr = SasePatternExpr::Event(String::new());
479    let mut within = None;
480    let mut partition_by = None;
481
482    for p in inner {
483        match p.as_rule() {
484            Rule::sase_pattern_expr => {
485                expr = parse_sase_pattern_expr(p)?;
486            }
487            Rule::pattern_within_clause => {
488                let dur_pair = p.into_inner().expect_next("within duration")?;
489                within = Some(Expr::Duration(
490                    parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
491                ));
492            }
493            Rule::pattern_partition_clause => {
494                let key = p
495                    .into_inner()
496                    .expect_next("partition key")?
497                    .as_str()
498                    .to_string();
499                partition_by = Some(Expr::Ident(key));
500            }
501            _ => {}
502        }
503    }
504
505    Ok(Stmt::PatternDecl {
506        name,
507        expr,
508        within,
509        partition_by,
510    })
511}
512
513fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
514    let inner = pair.into_inner().expect_next("SASE pattern expression")?;
515    parse_sase_or_expr(inner)
516}
517
518fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
519    let mut inner = pair.into_inner();
520    let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
521
522    for right_pair in inner {
523        let right = parse_sase_and_expr(right_pair)?;
524        left = SasePatternExpr::Or(Box::new(left), Box::new(right));
525    }
526
527    Ok(left)
528}
529
530fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
531    let mut inner = pair.into_inner();
532    let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
533
534    for right_pair in inner {
535        let right = parse_sase_not_expr(right_pair)?;
536        left = SasePatternExpr::And(Box::new(left), Box::new(right));
537    }
538
539    Ok(left)
540}
541
542fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
543    let mut inner = pair.into_inner();
544    let first = inner.expect_next("NOT or primary expression")?;
545
546    if first.as_str() == "NOT" {
547        let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
548        Ok(SasePatternExpr::Not(Box::new(expr)))
549    } else {
550        parse_sase_primary_expr(first)
551    }
552}
553
554fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
555    let inner = pair.into_inner().expect_next("SASE primary expression")?;
556
557    match inner.as_rule() {
558        Rule::sase_seq_expr => parse_sase_seq_expr(inner),
559        Rule::sase_grouped_expr => {
560            let nested = inner.into_inner().expect_next("grouped expression")?;
561            let expr = parse_sase_pattern_expr(nested)?;
562            Ok(SasePatternExpr::Group(Box::new(expr)))
563        }
564        Rule::sase_event_ref => parse_sase_event_ref(inner),
565        _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
566    }
567}
568
569fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
570    let mut items = Vec::new();
571
572    for p in pair.into_inner() {
573        if p.as_rule() == Rule::sase_seq_items {
574            for item in p.into_inner() {
575                if item.as_rule() == Rule::sase_seq_item {
576                    items.push(parse_sase_seq_item(item)?);
577                }
578            }
579        }
580    }
581
582    Ok(SasePatternExpr::Seq(items))
583}
584
585fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
586    let inner = pair.into_inner().expect_next("sequence item")?;
587
588    match inner.as_rule() {
589        Rule::sase_negated_item => parse_sase_item_inner(inner, true),
590        Rule::sase_positive_item => parse_sase_item_inner(inner, false),
591        _ => parse_sase_item_inner(inner, false),
592    }
593}
594
595fn parse_sase_item_inner(
596    pair: pest::iterators::Pair<Rule>,
597    _negated: bool,
598) -> ParseResult<SasePatternItem> {
599    let mut inner = pair.into_inner();
600    let event_type = inner.expect_next("event type")?.as_str().to_string();
601
602    let mut kleene = None;
603    let mut filter = None;
604    let mut alias = None;
605
606    for p in inner {
607        match p.as_rule() {
608            Rule::kleene_op => {
609                kleene = Some(match p.as_str() {
610                    "+" => KleeneOp::Plus,
611                    "*" => KleeneOp::Star,
612                    "?" => KleeneOp::Optional,
613                    _ => KleeneOp::Plus,
614                });
615            }
616            Rule::sase_where_clause => {
617                filter = Some(parse_expr(
618                    p.into_inner().expect_next("filter expression")?,
619                )?);
620            }
621            Rule::sase_alias_clause => {
622                alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
623            }
624            _ => {}
625        }
626    }
627
628    // For negated items, we prefix with "!" to indicate negation
629    // The runtime will interpret this
630    let event_type = if _negated {
631        format!("!{event_type}")
632    } else {
633        event_type
634    };
635
636    Ok(SasePatternItem {
637        event_type,
638        alias,
639        kleene,
640        filter,
641    })
642}
643
644fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
645    // Single event reference with optional kleene, where, alias
646    let item = parse_sase_item_inner(pair, false)?;
647
648    // If it's a simple event with no modifiers, return Event variant
649    if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
650        Ok(SasePatternExpr::Event(item.event_type))
651    } else {
652        // Otherwise wrap in a single-item Seq
653        Ok(SasePatternExpr::Seq(vec![item]))
654    }
655}
656
657fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
658    let inner = pair.into_inner().expect_next("stream source type")?;
659
660    match inner.as_rule() {
661        Rule::from_connector_source => {
662            let mut inner_iter = inner.into_inner();
663            let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
664            let connector_name = inner_iter
665                .expect_next("connector name")?
666                .as_str()
667                .to_string();
668            let mut params = Vec::new();
669            for p in inner_iter {
670                if p.as_rule() == Rule::connector_params {
671                    params = parse_connector_params(p)?;
672                }
673            }
674            Ok(StreamSource::FromConnector {
675                event_type,
676                connector_name,
677                params,
678            })
679        }
680        Rule::merge_source => {
681            let mut streams = Vec::new();
682            for p in inner.into_inner() {
683                if p.as_rule() == Rule::inline_stream_list {
684                    for is in p.into_inner() {
685                        streams.push(parse_inline_stream(is)?);
686                    }
687                }
688            }
689            Ok(StreamSource::Merge(streams))
690        }
691        Rule::join_source
692        | Rule::left_join_source
693        | Rule::right_join_source
694        | Rule::full_join_source => {
695            let join_type = match inner.as_rule() {
696                Rule::left_join_source => varpulis_core::ast::JoinType::Left,
697                Rule::right_join_source => varpulis_core::ast::JoinType::Right,
698                Rule::full_join_source => varpulis_core::ast::JoinType::Full,
699                _ => varpulis_core::ast::JoinType::Inner,
700            };
701            let mut clauses = Vec::new();
702            for p in inner.into_inner() {
703                if p.as_rule() == Rule::join_clause_list {
704                    for jc in p.into_inner() {
705                        clauses.push(parse_join_clause(jc, join_type)?);
706                    }
707                }
708            }
709            Ok(StreamSource::Join(clauses))
710        }
711        Rule::sequence_source => {
712            let decl =
713                parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
714            Ok(StreamSource::Sequence(decl))
715        }
716        Rule::timer_source => {
717            let timer_args = inner.into_inner().expect_next("timer arguments")?;
718            let decl = parse_timer_decl(timer_args)?;
719            Ok(StreamSource::Timer(decl))
720        }
721        Rule::all_source => {
722            let mut inner_iter = inner.into_inner();
723            let name = inner_iter.expect_next("event name")?.as_str().to_string();
724            let alias = inner_iter.next().map(|p| p.as_str().to_string());
725            Ok(StreamSource::AllWithAlias { name, alias })
726        }
727        Rule::aliased_source => {
728            let mut inner_iter = inner.into_inner();
729            let name = inner_iter.expect_next("event name")?.as_str().to_string();
730            let alias = inner_iter.expect_next("alias")?.as_str().to_string();
731            Ok(StreamSource::IdentWithAlias { name, alias })
732        }
733        Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
734        _ => Err(ParseError::UnexpectedToken {
735            position: 0,
736            expected: "stream source".to_string(),
737            found: format!("{:?}", inner.as_rule()),
738        }),
739    }
740}
741
742fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
743    let mut inner = pair.into_inner();
744
745    // Check if it's a simple identifier or full declaration
746    let first = inner.expect_next("stream identifier")?;
747    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
748        let name = first.as_str().to_string();
749        return Ok(InlineStreamDecl {
750            name: name.clone(),
751            source: name,
752            filter: None,
753        });
754    }
755
756    let name = first.as_str().to_string();
757    let source = inner.expect_next("stream source")?.as_str().to_string();
758    let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
759
760    Ok(InlineStreamDecl {
761        name,
762        source,
763        filter,
764    })
765}
766
767fn parse_join_clause(
768    pair: pest::iterators::Pair<Rule>,
769    join_type: varpulis_core::ast::JoinType,
770) -> ParseResult<JoinClause> {
771    let mut inner = pair.into_inner();
772
773    let first = inner.expect_next("join clause identifier")?;
774    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
775        let name = first.as_str().to_string();
776        return Ok(JoinClause {
777            name: name.clone(),
778            source: name,
779            on: None,
780            join_type,
781        });
782    }
783
784    let name = first.as_str().to_string();
785    let source = inner.expect_next("join source")?.as_str().to_string();
786    let on = inner.next().map(|p| parse_expr(p)).transpose()?;
787
788    Ok(JoinClause {
789        name,
790        source,
791        on,
792        join_type,
793    })
794}
795
796fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
797    let mut steps = Vec::new();
798
799    for p in pair.into_inner() {
800        if p.as_rule() == Rule::sequence_step {
801            steps.push(parse_sequence_step(p)?);
802        }
803    }
804
805    Ok(SequenceDecl {
806        match_all: false,
807        timeout: None,
808        steps,
809    })
810}
811
812fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
813    let mut inner = pair.into_inner();
814    let alias = inner.expect_next("step alias")?.as_str().to_string();
815    let event_type = inner.expect_next("event type")?.as_str().to_string();
816
817    let mut filter = None;
818    let mut timeout = None;
819
820    for p in inner {
821        match p.as_rule() {
822            Rule::or_expr => filter = Some(parse_expr(p)?),
823            Rule::within_suffix => {
824                let expr = p.into_inner().expect_next("within duration")?;
825                timeout = Some(Box::new(parse_expr(expr)?));
826            }
827            _ => {}
828        }
829    }
830
831    Ok(SequenceStepDecl {
832        alias,
833        event_type,
834        filter,
835        timeout,
836    })
837}
838
839fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
840    let mut inner = pair.into_inner();
841
842    // First argument is the interval (duration expression)
843    let interval = parse_expr(inner.expect_next("timer interval")?)?;
844
845    // Optional second argument is initial_delay (named argument)
846    let mut initial_delay = None;
847    for p in inner {
848        if p.as_rule() == Rule::named_arg {
849            let arg = parse_named_arg(p)?;
850            if arg.name == "initial_delay" {
851                initial_delay = Some(Box::new(arg.value));
852            }
853        }
854    }
855
856    Ok(TimerDecl {
857        interval,
858        initial_delay,
859    })
860}
861
862fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
863    let inner = pair.into_inner().expect_next("stream operation")?;
864
865    match inner.as_rule() {
866        Rule::dot_op => {
867            let op_inner = inner.into_inner().expect_next("dot operation")?;
868            parse_dot_op(op_inner)
869        }
870        Rule::followed_by_op => parse_followed_by_op(inner),
871        _ => Err(ParseError::UnexpectedToken {
872            position: 0,
873            expected: "stream operation".to_string(),
874            found: format!("{:?}", inner.as_rule()),
875        }),
876    }
877}
878
879fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
880    match pair.as_rule() {
881        Rule::context_op => {
882            let name = pair
883                .into_inner()
884                .expect_next("context name")?
885                .as_str()
886                .to_string();
887            Ok(StreamOp::Context(name))
888        }
889        Rule::where_op => {
890            let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
891            Ok(StreamOp::Where(expr))
892        }
893        Rule::select_op => {
894            let mut items = Vec::new();
895            for p in pair.into_inner() {
896                if p.as_rule() == Rule::select_list {
897                    for si in p.into_inner() {
898                        items.push(parse_select_item(si)?);
899                    }
900                }
901            }
902            Ok(StreamOp::Select(items))
903        }
904        Rule::window_op => {
905            let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
906            Ok(StreamOp::Window(args))
907        }
908        Rule::aggregate_op => {
909            let mut items = Vec::new();
910            for p in pair.into_inner() {
911                if p.as_rule() == Rule::agg_list {
912                    for ai in p.into_inner() {
913                        items.push(parse_agg_item(ai)?);
914                    }
915                }
916            }
917            Ok(StreamOp::Aggregate(items))
918        }
919        Rule::having_op => {
920            let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
921            Ok(StreamOp::Having(expr))
922        }
923        Rule::map_op => {
924            let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
925            Ok(StreamOp::Map(expr))
926        }
927        Rule::filter_op => {
928            let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
929            Ok(StreamOp::Filter(expr))
930        }
931        Rule::within_op => {
932            let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
933            Ok(StreamOp::Within(expr))
934        }
935        Rule::emit_op => {
936            let mut output_type = None;
937            let mut fields = Vec::new();
938            let mut target_context = None;
939            for p in pair.into_inner() {
940                match p.as_rule() {
941                    Rule::emit_type_cast => {
942                        output_type = Some(
943                            p.into_inner()
944                                .expect_next("type name")?
945                                .as_str()
946                                .to_string(),
947                        );
948                    }
949                    Rule::named_arg_list => {
950                        for arg in p.into_inner() {
951                            let parsed = parse_named_arg(arg)?;
952                            // Extract `context: ctx_name` as target_context
953                            if parsed.name == "context" {
954                                if let Expr::Ident(ctx_name) = &parsed.value {
955                                    target_context = Some(ctx_name.clone());
956                                    continue;
957                                }
958                            }
959                            fields.push(parsed);
960                        }
961                    }
962                    _ => {}
963                }
964            }
965            Ok(StreamOp::Emit {
966                output_type,
967                fields,
968                target_context,
969            })
970        }
971        Rule::print_op => {
972            let exprs = pair
973                .into_inner()
974                .filter(|p| p.as_rule() == Rule::expr_list)
975                .flat_map(|p| p.into_inner())
976                .map(parse_expr)
977                .collect::<ParseResult<Vec<_>>>()?;
978            Ok(StreamOp::Print(exprs))
979        }
980        Rule::collect_op => Ok(StreamOp::Collect),
981        Rule::pattern_op => {
982            let def_pair = pair.into_inner().expect_next("pattern definition")?;
983            let mut inner = def_pair.into_inner();
984            let name = inner.expect_next("pattern name")?.as_str().to_string();
985            let body_pair = inner.expect_next("pattern body")?;
986
987            // pattern_body can be lambda_expr or pattern_or_expr
988            let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
989            let matcher = match body_inner.as_rule() {
990                Rule::lambda_expr => parse_lambda_expr(body_inner)?,
991                Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
992                _ => parse_expr_inner(body_inner)?,
993            };
994            Ok(StreamOp::Pattern(PatternDef { name, matcher }))
995        }
996        Rule::partition_by_op => {
997            let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
998            Ok(StreamOp::PartitionBy(expr))
999        }
1000        Rule::order_by_op => {
1001            let mut items = Vec::new();
1002            for p in pair.into_inner() {
1003                if p.as_rule() == Rule::order_list {
1004                    for oi in p.into_inner() {
1005                        items.push(parse_order_item(oi)?);
1006                    }
1007                }
1008            }
1009            Ok(StreamOp::OrderBy(items))
1010        }
1011        Rule::limit_op => {
1012            let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1013            Ok(StreamOp::Limit(expr))
1014        }
1015        Rule::distinct_op => {
1016            let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1017            Ok(StreamOp::Distinct(expr))
1018        }
1019        Rule::tap_op => {
1020            let args = pair
1021                .into_inner()
1022                .filter(|p| p.as_rule() == Rule::named_arg_list)
1023                .flat_map(|p| p.into_inner())
1024                .map(parse_named_arg)
1025                .collect::<ParseResult<Vec<_>>>()?;
1026            Ok(StreamOp::Tap(args))
1027        }
1028        Rule::log_op => {
1029            let args = pair
1030                .into_inner()
1031                .filter(|p| p.as_rule() == Rule::named_arg_list)
1032                .flat_map(|p| p.into_inner())
1033                .map(parse_named_arg)
1034                .collect::<ParseResult<Vec<_>>>()?;
1035            Ok(StreamOp::Log(args))
1036        }
1037        Rule::to_op => {
1038            let mut inner = pair.into_inner();
1039            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1040            let mut params = Vec::new();
1041            for p in inner {
1042                if p.as_rule() == Rule::connector_params {
1043                    params = parse_connector_params(p)?;
1044                }
1045            }
1046            Ok(StreamOp::To {
1047                connector_name,
1048                params,
1049            })
1050        }
1051        Rule::process_op => {
1052            let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1053            Ok(StreamOp::Process(expr))
1054        }
1055        Rule::on_error_op => {
1056            let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1057            Ok(StreamOp::OnError(expr))
1058        }
1059        Rule::on_op => {
1060            let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1061            Ok(StreamOp::On(expr))
1062        }
1063        Rule::not_op => {
1064            let mut inner = pair.into_inner();
1065            let event_type = inner.expect_next("event type")?.as_str().to_string();
1066            let filter = inner.next().map(parse_expr).transpose()?;
1067            Ok(StreamOp::Not(FollowedByClause {
1068                event_type,
1069                filter,
1070                alias: None,
1071                match_all: false,
1072            }))
1073        }
1074        Rule::fork_op => {
1075            let mut paths = Vec::new();
1076            for p in pair.into_inner() {
1077                if p.as_rule() == Rule::fork_path_list {
1078                    for fp in p.into_inner() {
1079                        paths.push(parse_fork_path(fp)?);
1080                    }
1081                }
1082            }
1083            Ok(StreamOp::Fork(paths))
1084        }
1085        Rule::any_op => {
1086            let count = pair
1087                .into_inner()
1088                .next()
1089                .map(|p| p.as_str().parse().unwrap_or(1));
1090            Ok(StreamOp::Any(count))
1091        }
1092        Rule::all_op => Ok(StreamOp::All),
1093        Rule::first_op => Ok(StreamOp::First),
1094        Rule::concurrent_op => {
1095            let args = pair
1096                .into_inner()
1097                .filter(|p| p.as_rule() == Rule::named_arg_list)
1098                .flat_map(|p| p.into_inner())
1099                .map(parse_named_arg)
1100                .collect::<ParseResult<Vec<_>>>()?;
1101            Ok(StreamOp::Concurrent(args))
1102        }
1103        Rule::watermark_op => {
1104            let args = pair
1105                .into_inner()
1106                .filter(|p| p.as_rule() == Rule::named_arg_list)
1107                .flat_map(|p| p.into_inner())
1108                .map(parse_named_arg)
1109                .collect::<ParseResult<Vec<_>>>()?;
1110            Ok(StreamOp::Watermark(args))
1111        }
1112        Rule::allowed_lateness_op => {
1113            let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1114            Ok(StreamOp::AllowedLateness(expr))
1115        }
1116        Rule::trend_aggregate_op => {
1117            let mut items = Vec::new();
1118            for p in pair.into_inner() {
1119                if p.as_rule() == Rule::trend_agg_list {
1120                    for item_pair in p.into_inner() {
1121                        if item_pair.as_rule() == Rule::trend_agg_item {
1122                            let mut inner = item_pair.into_inner();
1123                            let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1124                            let func_pair = inner.expect_next("trend agg function")?;
1125                            let mut func_inner = func_pair.into_inner();
1126                            let func_name = func_inner
1127                                .expect_next("function name")?
1128                                .as_str()
1129                                .to_string();
1130                            let arg = func_inner.next().map(parse_expr).transpose()?;
1131                            items.push(TrendAggItem {
1132                                alias,
1133                                func: func_name,
1134                                arg,
1135                            });
1136                        }
1137                    }
1138                }
1139            }
1140            Ok(StreamOp::TrendAggregate(items))
1141        }
1142        Rule::forecast_op => {
1143            let mut confidence = None;
1144            let mut horizon = None;
1145            let mut warmup = None;
1146            let mut max_depth = None;
1147            let mut hawkes = None;
1148            let mut conformal = None;
1149            let mut mode = None;
1150            for p in pair.into_inner() {
1151                if p.as_rule() == Rule::forecast_params {
1152                    for param_pair in p.into_inner() {
1153                        if param_pair.as_rule() == Rule::forecast_param {
1154                            let mut inner = param_pair.into_inner();
1155                            let name = inner.expect_next("forecast param name")?.as_str();
1156                            let value_pair = inner.expect_next("forecast param value")?;
1157                            let expr = parse_expr(value_pair)?;
1158                            match name {
1159                                "confidence" => confidence = Some(expr),
1160                                "horizon" => horizon = Some(expr),
1161                                "warmup" => warmup = Some(expr),
1162                                "max_depth" => max_depth = Some(expr),
1163                                "hawkes" => hawkes = Some(expr),
1164                                "conformal" => conformal = Some(expr),
1165                                "mode" => mode = Some(expr),
1166                                _ => {}
1167                            }
1168                        }
1169                    }
1170                }
1171            }
1172            Ok(StreamOp::Forecast(ForecastSpec {
1173                confidence,
1174                horizon,
1175                warmup,
1176                max_depth,
1177                hawkes,
1178                conformal,
1179                mode,
1180            }))
1181        }
1182        Rule::enrich_op => {
1183            let mut inner = pair.into_inner();
1184            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1185            let mut key_expr = None;
1186            let mut fields = Vec::new();
1187            let mut cache_ttl = None;
1188            let mut timeout = None;
1189            let mut fallback = None;
1190            for p in inner {
1191                if p.as_rule() == Rule::enrich_params {
1192                    for param_pair in p.into_inner() {
1193                        if param_pair.as_rule() == Rule::enrich_param {
1194                            let param_inner =
1195                                param_pair.into_inner().expect_next("enrich param")?;
1196                            match param_inner.as_rule() {
1197                                Rule::enrich_key_param => {
1198                                    let expr_pair =
1199                                        param_inner.into_inner().expect_next("key expression")?;
1200                                    key_expr = Some(parse_expr(expr_pair)?);
1201                                }
1202                                Rule::enrich_fields_param => {
1203                                    for field in param_inner.into_inner() {
1204                                        if field.as_rule() == Rule::identifier {
1205                                            fields.push(field.as_str().to_string());
1206                                        }
1207                                    }
1208                                }
1209                                Rule::enrich_cache_ttl_param => {
1210                                    let expr_pair = param_inner
1211                                        .into_inner()
1212                                        .expect_next("cache_ttl expression")?;
1213                                    cache_ttl = Some(parse_expr(expr_pair)?);
1214                                }
1215                                Rule::enrich_timeout_param => {
1216                                    let expr_pair = param_inner
1217                                        .into_inner()
1218                                        .expect_next("timeout expression")?;
1219                                    timeout = Some(parse_expr(expr_pair)?);
1220                                }
1221                                Rule::enrich_fallback_param => {
1222                                    let literal_pair =
1223                                        param_inner.into_inner().expect_next("fallback literal")?;
1224                                    fallback = Some(parse_expr(literal_pair)?);
1225                                }
1226                                _ => {}
1227                            }
1228                        }
1229                    }
1230                }
1231            }
1232            let key = key_expr.ok_or_else(|| ParseError::Located {
1233                line: 0,
1234                column: 0,
1235                position: 0,
1236                message: ".enrich() requires a key: parameter".to_string(),
1237                hint: Some("add key: <expression> to .enrich()".to_string()),
1238            })?;
1239            Ok(StreamOp::Enrich(EnrichSpec {
1240                connector_name,
1241                key_expr: Box::new(key),
1242                fields,
1243                cache_ttl,
1244                timeout,
1245                fallback,
1246            }))
1247        }
1248        Rule::score_op => {
1249            let mut model_path = String::new();
1250            let mut inputs = Vec::new();
1251            let mut outputs = Vec::new();
1252            let mut gpu = false;
1253            let mut batch_size: usize = 1;
1254            let mut device_id: i32 = 0;
1255            for p in pair.into_inner() {
1256                if p.as_rule() == Rule::score_params {
1257                    for param_pair in p.into_inner() {
1258                        if param_pair.as_rule() == Rule::score_param {
1259                            let mut inner = param_pair.into_inner();
1260                            let name = inner.expect_next("score param name")?.as_str();
1261                            let value_pair = inner.expect_next("score param value")?;
1262                            match name {
1263                                "model" => {
1264                                    let raw = value_pair.as_str();
1265                                    model_path = raw.trim_matches('"').to_string();
1266                                }
1267                                "inputs" => {
1268                                    if value_pair.as_rule() == Rule::score_field_list {
1269                                        for field in value_pair.into_inner() {
1270                                            if field.as_rule() == Rule::identifier {
1271                                                inputs.push(field.as_str().to_string());
1272                                            }
1273                                        }
1274                                    }
1275                                }
1276                                "outputs" => {
1277                                    if value_pair.as_rule() == Rule::score_field_list {
1278                                        for field in value_pair.into_inner() {
1279                                            if field.as_rule() == Rule::identifier {
1280                                                outputs.push(field.as_str().to_string());
1281                                            }
1282                                        }
1283                                    }
1284                                }
1285                                "gpu" => {
1286                                    gpu = value_pair.as_str() == "true";
1287                                }
1288                                "batch_size" => {
1289                                    batch_size = value_pair.as_str().parse().unwrap_or(1);
1290                                }
1291                                "device" | "device_id" => {
1292                                    device_id = value_pair.as_str().parse().unwrap_or(0);
1293                                }
1294                                _ => {}
1295                            }
1296                        }
1297                    }
1298                }
1299            }
1300            Ok(StreamOp::Score(ScoreSpec {
1301                model_path,
1302                inputs,
1303                outputs,
1304                gpu,
1305                batch_size,
1306                device_id,
1307            }))
1308        }
1309        _ => Err(ParseError::UnexpectedToken {
1310            position: 0,
1311            expected: "stream operation".to_string(),
1312            found: format!("{:?}", pair.as_rule()),
1313        }),
1314    }
1315}
1316
1317fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1318    let mut inner = pair.into_inner();
1319    let expr = parse_expr(inner.expect_next("order expression")?)?;
1320    let desc = inner.next().is_some_and(|p| p.as_str() == "desc");
1321    Ok(OrderItem {
1322        expr,
1323        descending: desc,
1324    })
1325}
1326
1327fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1328    let mut inner = pair.into_inner();
1329    let name = inner.expect_next("fork path name")?.as_str().to_string();
1330    let mut ops = Vec::new();
1331    for p in inner {
1332        if p.as_rule() == Rule::stream_op {
1333            ops.push(parse_stream_op(p)?);
1334        }
1335    }
1336    Ok(ForkPath { name, ops })
1337}
1338
1339fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1340    let mut inner = pair.into_inner();
1341    let mut match_all = false;
1342
1343    let first = inner.expect_next("event type or match_all")?;
1344    let event_type = if first.as_rule() == Rule::match_all_keyword {
1345        match_all = true;
1346        inner.expect_next("event type")?.as_str().to_string()
1347    } else {
1348        first.as_str().to_string()
1349    };
1350
1351    let mut filter = None;
1352    let mut alias = None;
1353
1354    for p in inner {
1355        match p.as_rule() {
1356            Rule::or_expr => filter = Some(parse_or_expr(p)?),
1357            Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1358            Rule::identifier => alias = Some(p.as_str().to_string()),
1359            _ => {}
1360        }
1361    }
1362
1363    Ok(StreamOp::FollowedBy(FollowedByClause {
1364        event_type,
1365        filter,
1366        alias,
1367        match_all,
1368    }))
1369}
1370
1371fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1372    let mut inner = pair.into_inner();
1373    let first = inner.expect_next("select field or alias")?;
1374
1375    if let Some(second) = inner.next() {
1376        Ok(SelectItem::Alias(
1377            first.as_str().to_string(),
1378            parse_expr(second)?,
1379        ))
1380    } else {
1381        Ok(SelectItem::Field(first.as_str().to_string()))
1382    }
1383}
1384
1385fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1386    let raw = pair.as_str().trim();
1387    let is_session = raw.starts_with("session");
1388
1389    let mut inner = pair.into_inner();
1390
1391    if is_session {
1392        // session: <gap_expr>
1393        let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1394        return Ok(WindowArgs {
1395            duration: gap_expr.clone(),
1396            sliding: None,
1397            policy: None,
1398            session_gap: Some(gap_expr),
1399        });
1400    }
1401
1402    let duration = parse_expr(inner.expect_next("window duration")?)?;
1403
1404    let mut sliding = None;
1405    let mut policy = None;
1406
1407    for p in inner {
1408        if p.as_rule() == Rule::expr {
1409            // Need to determine if it's sliding or policy based on context
1410            if sliding.is_none() {
1411                sliding = Some(parse_expr(p)?);
1412            } else {
1413                policy = Some(parse_expr(p)?);
1414            }
1415        }
1416    }
1417
1418    Ok(WindowArgs {
1419        duration,
1420        sliding,
1421        policy,
1422        session_gap: None,
1423    })
1424}
1425
1426fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1427    let mut inner = pair.into_inner();
1428    let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1429    let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1430    Ok(AggItem { alias, expr })
1431}
1432
1433fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1434    let mut inner = pair.into_inner();
1435    let name = inner.expect_next("argument name")?.as_str().to_string();
1436    let value = parse_expr(inner.expect_next("argument value")?)?;
1437    Ok(NamedArg { name, value })
1438}
1439
1440fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1441    let mut inner = pair.into_inner();
1442    let name = inner.expect_next("event name")?.as_str().to_string();
1443
1444    let mut extends = None;
1445    let mut fields = Vec::new();
1446
1447    for p in inner {
1448        match p.as_rule() {
1449            Rule::identifier => extends = Some(p.as_str().to_string()),
1450            Rule::field => fields.push(parse_field(p)?),
1451            _ => {}
1452        }
1453    }
1454
1455    Ok(Stmt::EventDecl {
1456        name,
1457        extends,
1458        fields,
1459    })
1460}
1461
1462fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1463    let mut inner = pair.into_inner();
1464    let name = inner.expect_next("field name")?.as_str().to_string();
1465    let ty = parse_type(inner.expect_next("field type")?)?;
1466    let optional = inner.next().is_some();
1467    Ok(Field { name, ty, optional })
1468}
1469
1470fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1471    let mut inner = pair.into_inner();
1472    let name = inner.expect_next("type name")?.as_str().to_string();
1473    let ty = parse_type(inner.expect_next("type definition")?)?;
1474    Ok(Stmt::TypeDecl { name, ty })
1475}
1476
1477fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1478    let cloned = pair.clone();
1479    let inner = pair.into_inner().next().unwrap_or(cloned);
1480
1481    match inner.as_rule() {
1482        Rule::primitive_type => match inner.as_str() {
1483            "int" => Ok(Type::Int),
1484            "float" => Ok(Type::Float),
1485            "bool" => Ok(Type::Bool),
1486            "str" => Ok(Type::Str),
1487            "timestamp" => Ok(Type::Timestamp),
1488            "duration" => Ok(Type::Duration),
1489            _ => Ok(Type::Named(inner.as_str().to_string())),
1490        },
1491        Rule::array_type => {
1492            let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1493            Ok(Type::Array(Box::new(inner_type)))
1494        }
1495        Rule::map_type => {
1496            let mut inner_pairs = inner.into_inner();
1497            let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1498            let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1499            Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1500        }
1501        Rule::tuple_type => {
1502            let types: Vec<Type> = inner
1503                .into_inner()
1504                .map(parse_type)
1505                .collect::<ParseResult<Vec<_>>>()?;
1506            Ok(Type::Tuple(types))
1507        }
1508        Rule::stream_type => {
1509            let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1510            Ok(Type::Stream(Box::new(inner_type)))
1511        }
1512        Rule::optional_type => {
1513            let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1514            Ok(Type::Optional(Box::new(inner_type)))
1515        }
1516        Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1517        _ => Ok(Type::Named(inner.as_str().to_string())),
1518    }
1519}
1520
1521fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1522    let mut inner = pair.into_inner();
1523    let keyword = inner.expect_next("var_keyword")?.as_str();
1524    let mutable = keyword == "var";
1525    let name = inner.expect_next("variable name")?.as_str().to_string();
1526
1527    let mut ty = None;
1528    let mut value = Expr::Null;
1529
1530    for p in inner {
1531        match p.as_rule() {
1532            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1533            _ => value = parse_expr(p)?,
1534        }
1535    }
1536
1537    Ok(Stmt::VarDecl {
1538        mutable,
1539        name,
1540        ty,
1541        value,
1542    })
1543}
1544
1545fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1546    let mut inner = pair.into_inner();
1547    let name = inner.expect_next("constant name")?.as_str().to_string();
1548
1549    let mut ty = None;
1550    let mut value = Expr::Null;
1551
1552    for p in inner {
1553        match p.as_rule() {
1554            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1555            _ => value = parse_expr(p)?,
1556        }
1557    }
1558
1559    Ok(Stmt::ConstDecl { name, ty, value })
1560}
1561
1562fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1563    let mut inner = pair.into_inner();
1564    let name = inner.expect_next("function name")?.as_str().to_string();
1565
1566    let mut params = Vec::new();
1567    let mut ret = None;
1568    let mut body = Vec::new();
1569
1570    for p in inner {
1571        match p.as_rule() {
1572            Rule::param_list => {
1573                for param in p.into_inner() {
1574                    params.push(parse_param(param)?);
1575                }
1576            }
1577            Rule::type_expr => ret = Some(parse_type(p)?),
1578            Rule::block => body = parse_block(p)?,
1579            Rule::statement => body.push(parse_statement(p)?),
1580            _ => {}
1581        }
1582    }
1583
1584    Ok(Stmt::FnDecl {
1585        name,
1586        params,
1587        ret,
1588        body,
1589    })
1590}
1591
1592fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1593    let mut statements = Vec::new();
1594    for p in pair.into_inner() {
1595        if p.as_rule() == Rule::statement {
1596            statements.push(parse_statement(p)?);
1597        }
1598    }
1599    Ok(statements)
1600}
1601
1602fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1603    let mut inner = pair.into_inner();
1604    let name = inner.expect_next("parameter name")?.as_str().to_string();
1605    let ty = parse_type(inner.expect_next("parameter type")?)?;
1606    Ok(Param { name, ty })
1607}
1608
1609fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1610    let mut inner = pair.into_inner();
1611    let first = inner.expect_next("config name or item")?;
1612
1613    // Check if first token is identifier (new syntax) or config_item (old syntax)
1614    let (name, items_start) = if first.as_rule() == Rule::identifier {
1615        (first.as_str().to_string(), None)
1616    } else {
1617        // Old syntax: config: with indentation - use "default" as name
1618        ("default".to_string(), Some(first))
1619    };
1620
1621    let mut items = Vec::new();
1622
1623    // If we have a config_item from old syntax, parse it first
1624    if let Some(first_item) = items_start {
1625        if first_item.as_rule() == Rule::config_item {
1626            items.push(parse_config_item(first_item)?);
1627        }
1628    }
1629
1630    for p in inner {
1631        if p.as_rule() == Rule::config_item {
1632            items.push(parse_config_item(p)?);
1633        }
1634    }
1635    Ok(Stmt::Config { name, items })
1636}
1637
1638fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1639    let mut inner = pair.into_inner();
1640    let key = inner.expect_next("config key")?.as_str().to_string();
1641    let value = parse_config_value(inner.expect_next("config value")?)?;
1642    Ok(ConfigItem::Value(key, value))
1643}
1644
1645fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1646    let cloned = pair.clone();
1647    let inner = pair.into_inner().next().unwrap_or(cloned);
1648
1649    match inner.as_rule() {
1650        Rule::config_array => {
1651            let values: Vec<ConfigValue> = inner
1652                .into_inner()
1653                .map(parse_config_value)
1654                .collect::<ParseResult<Vec<_>>>()?;
1655            Ok(ConfigValue::Array(values))
1656        }
1657        Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1658        Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1659        Rule::string => {
1660            let s = inner.as_str();
1661            Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1662        }
1663        Rule::duration => Ok(ConfigValue::Duration(
1664            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1665        )),
1666        Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1667        Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1668        _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1669    }
1670}
1671
1672fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1673    let mut inner = pair.into_inner();
1674    let path_pair = inner.expect_next("import path")?;
1675    let path = path_pair.as_str();
1676    let path = path[1..path.len() - 1].to_string();
1677    let alias = inner.next().map(|p| p.as_str().to_string());
1678    Ok(Stmt::Import { path, alias })
1679}
1680
1681fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1682    let mut inner = pair.into_inner();
1683    let cond = parse_expr(inner.expect_next("if condition")?)?;
1684
1685    let mut then_branch = Vec::new();
1686    let mut elif_branches = Vec::new();
1687    let mut else_branch = None;
1688
1689    for p in inner {
1690        match p.as_rule() {
1691            Rule::block => then_branch = parse_block(p)?,
1692            Rule::statement => then_branch.push(parse_statement(p)?),
1693            Rule::elif_clause => {
1694                let mut elif_inner = p.into_inner();
1695                let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1696                let mut elif_body = Vec::new();
1697                for ep in elif_inner {
1698                    match ep.as_rule() {
1699                        Rule::block => elif_body = parse_block(ep)?,
1700                        Rule::statement => elif_body.push(parse_statement(ep)?),
1701                        _ => {}
1702                    }
1703                }
1704                elif_branches.push((elif_cond, elif_body));
1705            }
1706            Rule::else_clause => {
1707                let mut else_body = Vec::new();
1708                for ep in p.into_inner() {
1709                    match ep.as_rule() {
1710                        Rule::block => else_body = parse_block(ep)?,
1711                        Rule::statement => else_body.push(parse_statement(ep)?),
1712                        _ => {}
1713                    }
1714                }
1715                else_branch = Some(else_body);
1716            }
1717            _ => {}
1718        }
1719    }
1720
1721    Ok(Stmt::If {
1722        cond,
1723        then_branch,
1724        elif_branches,
1725        else_branch,
1726    })
1727}
1728
1729fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1730    let mut inner = pair.into_inner();
1731    let var = inner.expect_next("loop variable")?.as_str().to_string();
1732    let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1733    let mut body = Vec::new();
1734    for p in inner {
1735        match p.as_rule() {
1736            Rule::block => body = parse_block(p)?,
1737            Rule::statement => body.push(parse_statement(p)?),
1738            _ => {}
1739        }
1740    }
1741    Ok(Stmt::For { var, iter, body })
1742}
1743
1744fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1745    let mut inner = pair.into_inner();
1746    let cond = parse_expr(inner.expect_next("while condition")?)?;
1747    let mut body = Vec::new();
1748    for p in inner {
1749        match p.as_rule() {
1750            Rule::block => body = parse_block(p)?,
1751            Rule::statement => body.push(parse_statement(p)?),
1752            _ => {}
1753        }
1754    }
1755    Ok(Stmt::While { cond, body })
1756}
1757
1758fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1759    let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1760    Ok(Stmt::Return(expr))
1761}
1762
1763fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1764    let mut inner = pair.into_inner();
1765    let event_type = inner.expect_next("event type name")?.as_str().to_string();
1766    let mut fields = Vec::new();
1767    for p in inner {
1768        if p.as_rule() == Rule::named_arg_list {
1769            for arg in p.into_inner() {
1770                fields.push(parse_named_arg(arg)?);
1771            }
1772        }
1773    }
1774    Ok(Stmt::Emit { event_type, fields })
1775}
1776
1777// ============================================================================
1778// Expression Parsing
1779// ============================================================================
1780
1781fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1782    let inner = pair.into_inner().next();
1783
1784    match inner {
1785        Some(p) => parse_expr_inner(p),
1786        None => Ok(Expr::Null),
1787    }
1788}
1789
1790fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1791    match pair.as_rule() {
1792        Rule::expr => parse_expr(pair),
1793        Rule::lambda_expr => parse_lambda_expr(pair),
1794        Rule::range_expr => parse_range_expr(pair),
1795        Rule::or_expr => parse_or_expr(pair),
1796        Rule::and_expr => parse_and_expr(pair),
1797        Rule::not_expr => parse_not_expr(pair),
1798        Rule::comparison_expr => parse_comparison_expr(pair),
1799        Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1800        Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1801        Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1802        Rule::shift_expr => parse_shift_expr(pair),
1803        Rule::additive_expr => parse_additive_expr(pair),
1804        Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1805        Rule::power_expr => parse_power_expr(pair),
1806        Rule::unary_expr => parse_unary_expr(pair),
1807        Rule::postfix_expr => parse_postfix_expr(pair),
1808        Rule::primary_expr => parse_primary_expr(pair),
1809        Rule::literal => parse_literal(pair),
1810        Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1811        Rule::if_expr => parse_if_expr(pair),
1812        _ => Ok(Expr::Ident(pair.as_str().to_string())),
1813    }
1814}
1815
1816fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1817    let mut inner = pair.into_inner();
1818    let mut params = Vec::new();
1819
1820    // Parse parameters
1821    let first = inner.expect_next("lambda parameters")?;
1822    match first.as_rule() {
1823        Rule::identifier_list => {
1824            for p in first.into_inner() {
1825                params.push(p.as_str().to_string());
1826            }
1827        }
1828        Rule::identifier => {
1829            params.push(first.as_str().to_string());
1830        }
1831        _ => {}
1832    }
1833
1834    // Parse body - could be expression or block
1835    let body_pair = inner.expect_next("lambda body")?;
1836    let body = match body_pair.as_rule() {
1837        Rule::lambda_block => parse_lambda_block(body_pair)?,
1838        _ => parse_expr_inner(body_pair)?,
1839    };
1840
1841    Ok(Expr::Lambda {
1842        params,
1843        body: Box::new(body),
1844    })
1845}
1846
1847fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1848    let mut stmts = Vec::new();
1849    let mut final_expr = None;
1850
1851    for p in pair.into_inner() {
1852        match p.as_rule() {
1853            Rule::statement => {
1854                // Check if it's a var_decl to extract for Block
1855                let stmt = parse_statement(p)?;
1856                match &stmt.node {
1857                    Stmt::VarDecl {
1858                        mutable,
1859                        name,
1860                        ty,
1861                        value,
1862                    } => {
1863                        stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
1864                    }
1865                    Stmt::Expr(e) => {
1866                        // Last expression becomes result
1867                        final_expr = Some(e.clone());
1868                    }
1869                    _ => {
1870                        // Other statements - treat as expression if possible
1871                    }
1872                }
1873            }
1874            _ => {
1875                // Expression at end of block
1876                final_expr = Some(parse_expr_inner(p)?);
1877            }
1878        }
1879    }
1880
1881    // If we have variable declarations, wrap in a Block expression
1882    if stmts.is_empty() {
1883        Ok(final_expr.unwrap_or(Expr::Null))
1884    } else {
1885        Ok(Expr::Block {
1886            stmts,
1887            result: Box::new(final_expr.unwrap_or(Expr::Null)),
1888        })
1889    }
1890}
1891
1892fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1893    // Convert pattern_or_expr to an Expr representation
1894    // pattern_or_expr = pattern_and_expr ~ ("or" ~ pattern_and_expr)*
1895    let mut inner = pair.into_inner();
1896    let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
1897
1898    for right_pair in inner {
1899        let right = parse_pattern_and_as_expr(right_pair)?;
1900        left = Expr::Binary {
1901            op: BinOp::Or,
1902            left: Box::new(left),
1903            right: Box::new(right),
1904        };
1905    }
1906    Ok(left)
1907}
1908
1909fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1910    let mut inner = pair.into_inner();
1911    let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
1912
1913    for right_pair in inner {
1914        let right = parse_pattern_xor_as_expr(right_pair)?;
1915        left = Expr::Binary {
1916            op: BinOp::And,
1917            left: Box::new(left),
1918            right: Box::new(right),
1919        };
1920    }
1921    Ok(left)
1922}
1923
1924fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1925    let mut inner = pair.into_inner();
1926    let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
1927
1928    for right_pair in inner {
1929        let right = parse_pattern_unary_as_expr(right_pair)?;
1930        left = Expr::Binary {
1931            op: BinOp::Xor,
1932            left: Box::new(left),
1933            right: Box::new(right),
1934        };
1935    }
1936    Ok(left)
1937}
1938
1939fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1940    let mut inner = pair.into_inner();
1941    let first = inner.expect_next("unary expression or operand")?;
1942
1943    if first.as_str() == "not" {
1944        let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
1945        Ok(Expr::Unary {
1946            op: UnaryOp::Not,
1947            expr: Box::new(expr),
1948        })
1949    } else {
1950        parse_pattern_primary_as_expr(first)
1951    }
1952}
1953
1954fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1955    let inner = pair
1956        .into_inner()
1957        .expect_next("pattern primary expression")?;
1958
1959    match inner.as_rule() {
1960        Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
1961        Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
1962        _ => Ok(Expr::Ident(inner.as_str().to_string())),
1963    }
1964}
1965
1966fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1967    // pattern_sequence = identifier ~ ("->" ~ identifier)*
1968    // Convert to a chain of FollowedBy binary operations
1969    let mut inner = pair.into_inner();
1970    let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
1971
1972    for right_pair in inner {
1973        let right = Expr::Ident(right_pair.as_str().to_string());
1974        left = Expr::Binary {
1975            op: BinOp::FollowedBy,
1976            left: Box::new(left),
1977            right: Box::new(right),
1978        };
1979    }
1980    Ok(left)
1981}
1982
1983fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1984    let inner = pair.into_inner().expect_next("filter expression")?;
1985    parse_filter_or_expr(inner)
1986}
1987
1988fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1989    let mut inner = pair.into_inner();
1990    let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
1991
1992    for right_pair in inner {
1993        let right = parse_filter_and_expr(right_pair)?;
1994        left = Expr::Binary {
1995            op: BinOp::Or,
1996            left: Box::new(left),
1997            right: Box::new(right),
1998        };
1999    }
2000    Ok(left)
2001}
2002
2003fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2004    let mut inner = pair.into_inner();
2005    let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2006
2007    for right_pair in inner {
2008        let right = parse_filter_not_expr(right_pair)?;
2009        left = Expr::Binary {
2010            op: BinOp::And,
2011            left: Box::new(left),
2012            right: Box::new(right),
2013        };
2014    }
2015    Ok(left)
2016}
2017
2018fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2019    let mut inner = pair.into_inner();
2020    let first = inner.expect_next("not or expression")?;
2021
2022    if first.as_str() == "not" {
2023        let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2024        Ok(Expr::Unary {
2025            op: UnaryOp::Not,
2026            expr: Box::new(expr),
2027        })
2028    } else {
2029        parse_filter_comparison_expr(first)
2030    }
2031}
2032
2033fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2034    let mut inner = pair.into_inner();
2035    let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2036
2037    if let Some(op_pair) = inner.next() {
2038        let op = match op_pair.as_str() {
2039            "==" => BinOp::Eq,
2040            "!=" => BinOp::NotEq,
2041            "<" => BinOp::Lt,
2042            "<=" => BinOp::Le,
2043            ">" => BinOp::Gt,
2044            ">=" => BinOp::Ge,
2045            "in" => BinOp::In,
2046            "is" => BinOp::Is,
2047            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2048            _ => BinOp::Eq,
2049        };
2050        let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2051        Ok(Expr::Binary {
2052            op,
2053            left: Box::new(left),
2054            right: Box::new(right),
2055        })
2056    } else {
2057        Ok(left)
2058    }
2059}
2060
2061fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2062    let mut inner = pair.into_inner();
2063    let mut left =
2064        parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2065
2066    while let Some(op_pair) = inner.next() {
2067        let op = if op_pair.as_str() == "-" {
2068            BinOp::Sub
2069        } else {
2070            BinOp::Add
2071        };
2072        if let Some(right_pair) = inner.next() {
2073            let right = parse_filter_multiplicative_expr(right_pair)?;
2074            left = Expr::Binary {
2075                op,
2076                left: Box::new(left),
2077                right: Box::new(right),
2078            };
2079        }
2080    }
2081    Ok(left)
2082}
2083
2084fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2085    let mut inner = pair.into_inner();
2086    let mut left =
2087        parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2088
2089    while let Some(op_pair) = inner.next() {
2090        let op = match op_pair.as_str() {
2091            "*" => BinOp::Mul,
2092            "/" => BinOp::Div,
2093            "%" => BinOp::Mod,
2094            _ => BinOp::Mul,
2095        };
2096        if let Some(right_pair) = inner.next() {
2097            let right = parse_filter_unary_expr(right_pair)?;
2098            left = Expr::Binary {
2099                op,
2100                left: Box::new(left),
2101                right: Box::new(right),
2102            };
2103        }
2104    }
2105    Ok(left)
2106}
2107
2108fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2109    let mut inner = pair.into_inner();
2110    let first = inner.expect_next("unary operator or expression")?;
2111
2112    // Check if first is a unary operator or the expression itself
2113    if first.as_rule() == Rule::filter_unary_op {
2114        let op_str = first.as_str();
2115        let expr =
2116            parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2117        let op = match op_str {
2118            "-" => UnaryOp::Neg,
2119            "~" => UnaryOp::BitNot,
2120            _ => unreachable!("Grammar only allows - or ~"),
2121        };
2122        Ok(Expr::Unary {
2123            op,
2124            expr: Box::new(expr),
2125        })
2126    } else {
2127        parse_filter_postfix_expr(first)
2128    }
2129}
2130
2131fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2132    let mut inner = pair.into_inner();
2133    let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2134
2135    for suffix in inner {
2136        expr = parse_filter_postfix_suffix(expr, suffix)?;
2137    }
2138    Ok(expr)
2139}
2140
2141fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2142    let mut inner = pair.into_inner();
2143
2144    if let Some(first) = inner.next() {
2145        match first.as_rule() {
2146            Rule::identifier => {
2147                // Member access: .identifier
2148                Ok(Expr::Member {
2149                    expr: Box::new(expr),
2150                    member: first.as_str().to_string(),
2151                })
2152            }
2153            Rule::optional_member_access => {
2154                let member = first
2155                    .into_inner()
2156                    .expect_next("member name")?
2157                    .as_str()
2158                    .to_string();
2159                Ok(Expr::OptionalMember {
2160                    expr: Box::new(expr),
2161                    member,
2162                })
2163            }
2164            Rule::index_access => {
2165                let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2166                Ok(Expr::Index {
2167                    expr: Box::new(expr),
2168                    index: Box::new(index),
2169                })
2170            }
2171            Rule::call_args => {
2172                let args = first
2173                    .into_inner()
2174                    .filter(|p| p.as_rule() == Rule::arg_list)
2175                    .flat_map(|p| p.into_inner())
2176                    .map(parse_arg)
2177                    .collect::<ParseResult<Vec<_>>>()?;
2178                Ok(Expr::Call {
2179                    func: Box::new(expr),
2180                    args,
2181                })
2182            }
2183            _ => Ok(expr),
2184        }
2185    } else {
2186        Ok(expr)
2187    }
2188}
2189
2190fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2191    let inner = pair.into_inner().expect_next("filter primary expression")?;
2192
2193    match inner.as_rule() {
2194        Rule::literal => parse_literal(inner),
2195        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2196        Rule::filter_expr => parse_filter_expr(inner),
2197        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2198    }
2199}
2200
2201fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2202    let mut inner = pair.into_inner();
2203    let left = parse_expr_inner(inner.expect_next("range start")?)?;
2204
2205    if let Some(op_pair) = inner.next() {
2206        let inclusive = op_pair.as_str() == "..=";
2207        let right = parse_expr_inner(inner.expect_next("range end")?)?;
2208        Ok(Expr::Range {
2209            start: Box::new(left),
2210            end: Box::new(right),
2211            inclusive,
2212        })
2213    } else {
2214        Ok(left)
2215    }
2216}
2217
2218fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2219    let mut inner = pair.into_inner();
2220    let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2221
2222    for right_pair in inner {
2223        let right = parse_expr_inner(right_pair)?;
2224        left = Expr::Binary {
2225            op: BinOp::Or,
2226            left: Box::new(left),
2227            right: Box::new(right),
2228        };
2229    }
2230
2231    Ok(left)
2232}
2233
2234fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2235    let mut inner = pair.into_inner();
2236    let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2237
2238    for right_pair in inner {
2239        let right = parse_expr_inner(right_pair)?;
2240        left = Expr::Binary {
2241            op: BinOp::And,
2242            left: Box::new(left),
2243            right: Box::new(right),
2244        };
2245    }
2246
2247    Ok(left)
2248}
2249
2250fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2251    let mut inner = pair.into_inner();
2252    let first = inner.expect_next("not keyword or expression")?;
2253
2254    if first.as_str() == "not" {
2255        let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2256        Ok(Expr::Unary {
2257            op: UnaryOp::Not,
2258            expr: Box::new(expr),
2259        })
2260    } else {
2261        parse_expr_inner(first)
2262    }
2263}
2264
2265fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2266    let mut inner = pair.into_inner();
2267    let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2268
2269    if let Some(op_pair) = inner.next() {
2270        let op_str = op_pair.as_str();
2271        let op = match op_str {
2272            "==" => BinOp::Eq,
2273            "!=" => BinOp::NotEq,
2274            "<" => BinOp::Lt,
2275            "<=" => BinOp::Le,
2276            ">" => BinOp::Gt,
2277            ">=" => BinOp::Ge,
2278            "in" => BinOp::In,
2279            "is" => BinOp::Is,
2280            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2281            _ => BinOp::Eq,
2282        };
2283        let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2284        Ok(Expr::Binary {
2285            op,
2286            left: Box::new(left),
2287            right: Box::new(right),
2288        })
2289    } else {
2290        Ok(left)
2291    }
2292}
2293
2294fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2295    parse_binary_chain(pair, BinOp::BitOr)
2296}
2297
2298fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2299    parse_binary_chain(pair, BinOp::BitXor)
2300}
2301
2302fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2303    parse_binary_chain(pair, BinOp::BitAnd)
2304}
2305
2306fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2307    let mut inner = pair.into_inner();
2308    let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2309
2310    while let Some(op_or_expr) = inner.next() {
2311        let op = match op_or_expr.as_str() {
2312            "<<" => BinOp::Shl,
2313            ">>" => BinOp::Shr,
2314            _ => {
2315                let right = parse_expr_inner(op_or_expr)?;
2316                left = Expr::Binary {
2317                    op: BinOp::Shl,
2318                    left: Box::new(left),
2319                    right: Box::new(right),
2320                };
2321                continue;
2322            }
2323        };
2324        if let Some(right_pair) = inner.next() {
2325            let right = parse_expr_inner(right_pair)?;
2326            left = Expr::Binary {
2327                op,
2328                left: Box::new(left),
2329                right: Box::new(right),
2330            };
2331        }
2332    }
2333
2334    Ok(left)
2335}
2336
2337fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2338    let mut inner = pair.into_inner();
2339    let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2340
2341    while let Some(op_pair) = inner.next() {
2342        let op_text = op_pair.as_str();
2343        let op = if op_text == "-" {
2344            BinOp::Sub
2345        } else {
2346            BinOp::Add
2347        };
2348
2349        if let Some(right_pair) = inner.next() {
2350            let right = parse_expr_inner(right_pair)?;
2351            left = Expr::Binary {
2352                op,
2353                left: Box::new(left),
2354                right: Box::new(right),
2355            };
2356        }
2357    }
2358
2359    Ok(left)
2360}
2361
2362fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2363    let mut inner = pair.into_inner();
2364    let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2365
2366    while let Some(op_pair) = inner.next() {
2367        let op_text = op_pair.as_str();
2368        let op = match op_text {
2369            "*" => BinOp::Mul,
2370            "/" => BinOp::Div,
2371            "%" => BinOp::Mod,
2372            _ => BinOp::Mul,
2373        };
2374
2375        if let Some(right_pair) = inner.next() {
2376            let right = parse_expr_inner(right_pair)?;
2377            left = Expr::Binary {
2378                op,
2379                left: Box::new(left),
2380                right: Box::new(right),
2381            };
2382        }
2383    }
2384
2385    Ok(left)
2386}
2387
2388fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2389    let mut inner = pair.into_inner();
2390    let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2391
2392    if let Some(exp_pair) = inner.next() {
2393        let exp = parse_expr_inner(exp_pair)?;
2394        Ok(Expr::Binary {
2395            op: BinOp::Pow,
2396            left: Box::new(base),
2397            right: Box::new(exp),
2398        })
2399    } else {
2400        Ok(base)
2401    }
2402}
2403
2404fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2405    let mut inner = pair.into_inner();
2406    let first = inner.expect_next("unary operator or expression")?;
2407
2408    // Check if first is a unary operator or the expression itself
2409    match first.as_rule() {
2410        Rule::unary_op => {
2411            let op_str = first.as_str();
2412            let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2413            let op = match op_str {
2414                "-" => UnaryOp::Neg,
2415                "~" => UnaryOp::BitNot,
2416                _ => unreachable!("Grammar only allows - or ~"),
2417            };
2418            Ok(Expr::Unary {
2419                op,
2420                expr: Box::new(expr),
2421            })
2422        }
2423        _ => parse_expr_inner(first),
2424    }
2425}
2426
2427fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2428    let mut inner = pair.into_inner();
2429    let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2430
2431    for suffix in inner {
2432        expr = parse_postfix_suffix(expr, suffix)?;
2433    }
2434
2435    Ok(expr)
2436}
2437
2438fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2439    let inner = pair.into_inner().expect_next("postfix suffix")?;
2440
2441    match inner.as_rule() {
2442        Rule::member_access => {
2443            let member = inner
2444                .into_inner()
2445                .expect_next("member name")?
2446                .as_str()
2447                .to_string();
2448            Ok(Expr::Member {
2449                expr: Box::new(expr),
2450                member,
2451            })
2452        }
2453        Rule::optional_member_access => {
2454            let member = inner
2455                .into_inner()
2456                .expect_next("member name")?
2457                .as_str()
2458                .to_string();
2459            Ok(Expr::OptionalMember {
2460                expr: Box::new(expr),
2461                member,
2462            })
2463        }
2464        Rule::slice_access => {
2465            // Parse slice: [start:end], [:end], [start:], [:]
2466            let slice_range = inner.into_inner().expect_next("slice range")?;
2467            let slice_inner = slice_range.into_inner();
2468
2469            let mut start = None;
2470            let mut end = None;
2471
2472            for p in slice_inner {
2473                match p.as_rule() {
2474                    Rule::slice_start => {
2475                        start = Some(Box::new(parse_expr_inner(
2476                            p.into_inner().expect_next("slice start expression")?,
2477                        )?));
2478                    }
2479                    Rule::slice_end => {
2480                        end = Some(Box::new(parse_expr_inner(
2481                            p.into_inner().expect_next("slice end expression")?,
2482                        )?));
2483                    }
2484                    _ => {}
2485                }
2486            }
2487
2488            Ok(Expr::Slice {
2489                expr: Box::new(expr),
2490                start,
2491                end,
2492            })
2493        }
2494        Rule::index_access => {
2495            let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2496            Ok(Expr::Index {
2497                expr: Box::new(expr),
2498                index: Box::new(index),
2499            })
2500        }
2501        Rule::call_args => {
2502            let mut args = Vec::new();
2503            for p in inner.into_inner() {
2504                if p.as_rule() == Rule::arg_list {
2505                    for arg in p.into_inner() {
2506                        args.push(parse_arg(arg)?);
2507                    }
2508                }
2509            }
2510            Ok(Expr::Call {
2511                func: Box::new(expr),
2512                args,
2513            })
2514        }
2515        _ => Ok(expr),
2516    }
2517}
2518
2519fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2520    let mut inner = pair.into_inner();
2521    let first = inner.expect_next("argument")?;
2522
2523    if let Some(second) = inner.next() {
2524        Ok(Arg::Named(
2525            first.as_str().to_string(),
2526            parse_expr_inner(second)?,
2527        ))
2528    } else {
2529        Ok(Arg::Positional(parse_expr_inner(first)?))
2530    }
2531}
2532
2533fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2534    let inner = pair.into_inner().expect_next("primary expression")?;
2535
2536    match inner.as_rule() {
2537        Rule::if_expr => parse_if_expr(inner),
2538        Rule::literal => parse_literal(inner),
2539        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2540        Rule::array_literal => parse_array_literal(inner),
2541        Rule::map_literal => parse_map_literal(inner),
2542        Rule::expr => parse_expr(inner),
2543        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2544    }
2545}
2546
2547fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2548    let mut inner = pair.into_inner();
2549    let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2550    let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2551    let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2552
2553    Ok(Expr::If {
2554        cond: Box::new(cond),
2555        then_branch: Box::new(then_branch),
2556        else_branch: Box::new(else_branch),
2557    })
2558}
2559
2560fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2561    let inner = pair.into_inner().expect_next("literal value")?;
2562
2563    match inner.as_rule() {
2564        Rule::integer => inner
2565            .as_str()
2566            .parse::<i64>()
2567            .map(Expr::Int)
2568            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2569        Rule::float => inner
2570            .as_str()
2571            .parse::<f64>()
2572            .map(Expr::Float)
2573            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2574        Rule::string => {
2575            let s = inner.as_str();
2576            Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2577        }
2578        Rule::duration => Ok(Expr::Duration(
2579            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2580        )),
2581        Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2582        Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2583        Rule::null => Ok(Expr::Null),
2584        _ => Ok(Expr::Null),
2585    }
2586}
2587
2588fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2589    let mut items = Vec::new();
2590    for p in pair.into_inner() {
2591        if p.as_rule() == Rule::expr_list {
2592            for expr in p.into_inner() {
2593                items.push(parse_expr(expr)?);
2594            }
2595        }
2596    }
2597    Ok(Expr::Array(items))
2598}
2599
2600fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2601    let mut entries = Vec::new();
2602    for p in pair.into_inner() {
2603        if p.as_rule() == Rule::map_entry_list {
2604            for entry in p.into_inner() {
2605                let mut inner = entry.into_inner();
2606                let key = inner.expect_next("map key")?.as_str().to_string();
2607                let key = if key.starts_with('"') {
2608                    key[1..key.len() - 1].to_string()
2609                } else {
2610                    key
2611                };
2612                let value = parse_expr(inner.expect_next("map value")?)?;
2613                entries.push((key, value));
2614            }
2615        }
2616    }
2617    Ok(Expr::Map(entries))
2618}
2619
2620fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2621    let mut inner = pair.into_inner();
2622    let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2623
2624    for right_pair in inner {
2625        let right = parse_expr_inner(right_pair)?;
2626        left = Expr::Binary {
2627            op,
2628            left: Box::new(left),
2629            right: Box::new(right),
2630        };
2631    }
2632
2633    Ok(left)
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638    use super::*;
2639
2640    #[test]
2641    fn test_parse_simple_stream() {
2642        let result = parse("stream output = input");
2643        assert!(result.is_ok(), "Failed: {:?}", result.err());
2644    }
2645
2646    #[test]
2647    fn test_parse_stream_with_filter() {
2648        let result = parse("stream output = input.where(value > 100)");
2649        assert!(result.is_ok(), "Failed: {:?}", result.err());
2650    }
2651
2652    #[test]
2653    fn test_parse_stream_with_map() {
2654        let result = parse("stream output = input.map(x * 2)");
2655        assert!(result.is_ok(), "Failed: {:?}", result.err());
2656    }
2657
2658    #[test]
2659    fn test_parse_event_declaration() {
2660        let result = parse("event SensorReading:\n  sensor_id: str\n  value: float");
2661        assert!(result.is_ok(), "Failed: {:?}", result.err());
2662    }
2663
2664    #[test]
2665    fn test_parse_variable() {
2666        let result = parse("let x = 42");
2667        assert!(result.is_ok(), "Failed: {:?}", result.err());
2668    }
2669
2670    #[test]
2671    fn test_parse_function() {
2672        let result = parse("fn add(a: int, b: int) -> int:\n  return a + b");
2673        assert!(result.is_ok(), "Failed: {:?}", result.err());
2674    }
2675
2676    #[test]
2677    fn test_parse_lambda() {
2678        let result = parse("let f = (x) => x * 2");
2679        assert!(result.is_ok(), "Failed: {:?}", result.err());
2680    }
2681
2682    #[test]
2683    fn test_parse_if_expression() {
2684        let result = parse("let x = if a > b then a else b");
2685        assert!(result.is_ok(), "Failed: {:?}", result.err());
2686    }
2687
2688    #[test]
2689    fn test_parse_followed_by() {
2690        let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2691        assert!(result.is_ok(), "Failed: {:?}", result.err());
2692    }
2693
2694    #[test]
2695    fn test_parse_window() {
2696        let result = parse("stream windowed = input.window(5s)");
2697        assert!(result.is_ok(), "Failed: {:?}", result.err());
2698    }
2699
2700    #[test]
2701    fn test_parse_aggregate() {
2702        let result =
2703            parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2704        assert!(result.is_ok(), "Failed: {:?}", result.err());
2705    }
2706
2707    #[test]
2708    fn test_parse_merge() {
2709        let result = parse("stream combined = merge(stream1, stream2)");
2710        assert!(result.is_ok(), "Failed: {:?}", result.err());
2711    }
2712
2713    #[test]
2714    fn test_parse_sequence() {
2715        let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2716        assert!(result.is_ok(), "Failed: {:?}", result.err());
2717    }
2718
2719    #[test]
2720    fn test_parse_config() {
2721        let result = parse("config:\n  window_size: 5s\n  batch_size: 100");
2722        assert!(result.is_ok(), "Failed: {:?}", result.err());
2723    }
2724
2725    #[test]
2726    fn test_parse_complex_expression() {
2727        let result = parse("let x = (a + b) * c / d - e");
2728        assert!(result.is_ok(), "Failed: {:?}", result.err());
2729    }
2730
2731    #[test]
2732    fn test_parse_sliding_window() {
2733        let result = parse("stream output = input.window(5m, sliding: 1m)");
2734        assert!(result.is_ok(), "Failed: {:?}", result.err());
2735    }
2736
2737    #[test]
2738    fn test_parse_fork_construct() {
2739        let result =
2740            parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2741        assert!(result.is_ok(), "Failed: {:?}", result.err());
2742    }
2743
2744    #[test]
2745    fn test_parse_aggregate_functions() {
2746        let result = parse(
2747            "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2748        );
2749        assert!(result.is_ok(), "Failed: {:?}", result.err());
2750    }
2751
2752    #[test]
2753    fn test_parse_complex_parentheses() {
2754        let result = parse("let x = ((a + b) * (c - d)) / e");
2755        assert!(result.is_ok(), "Failed: {:?}", result.err());
2756    }
2757
2758    #[test]
2759    fn test_parse_sequence_with_alias() {
2760        let result = parse(
2761            r#"
2762            stream TwoTicks = StockTick as first
2763                -> StockTick as second
2764                .emit(result: "two_ticks")
2765        "#,
2766        );
2767        assert!(result.is_ok(), "Failed: {:?}", result.err());
2768    }
2769
2770    #[test]
2771    fn test_parse_followed_by_with_alias() {
2772        let result = parse("stream alerts = Order as a -> Payment as b");
2773        assert!(result.is_ok(), "Failed: {:?}", result.err());
2774    }
2775
2776    #[test]
2777    fn test_parse_followed_by_with_filter_and_alias() {
2778        // This is the problematic case: 'as b' should be the alias, not part of the expression
2779        let result = parse(
2780            r#"
2781            stream Test = A as a
2782                -> B where value == a.base + 10 as b
2783                .emit(status: "matched")
2784        "#,
2785        );
2786        assert!(result.is_ok(), "Failed: {:?}", result.err());
2787    }
2788
2789    #[test]
2790    fn test_parse_pattern_with_lambda() {
2791        let result =
2792            parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2793        assert!(result.is_ok(), "Failed: {:?}", result.err());
2794    }
2795
2796    #[test]
2797    fn test_parse_sase_pattern_decl_simple() {
2798        let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2799        assert!(result.is_ok(), "Failed: {:?}", result.err());
2800    }
2801
2802    #[test]
2803    fn test_parse_sase_pattern_decl_with_kleene() {
2804        let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2805        assert!(result.is_ok(), "Failed: {:?}", result.err());
2806    }
2807
2808    #[test]
2809    fn test_parse_sase_pattern_decl_with_alias() {
2810        let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2811        assert!(result.is_ok(), "Failed: {:?}", result.err());
2812    }
2813
2814    #[test]
2815    fn test_parse_sase_pattern_decl_with_within() {
2816        let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2817        assert!(result.is_ok(), "Failed: {:?}", result.err());
2818    }
2819
2820    #[test]
2821    fn test_parse_sase_pattern_decl_with_partition() {
2822        let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2823        assert!(result.is_ok(), "Failed: {:?}", result.err());
2824    }
2825
2826    #[test]
2827    fn test_parse_sase_pattern_decl_full() {
2828        // Note: In SASE+ syntax, 'where' comes before 'as' (filter then alias)
2829        let result = parse(
2830            "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2831        );
2832        assert!(result.is_ok(), "Failed: {:?}", result.err());
2833    }
2834
2835    #[test]
2836    fn test_parse_sase_pattern_decl_or() {
2837        let result = parse("pattern AlertOrWarn = Login OR Logout");
2838        assert!(result.is_ok(), "Failed: {:?}", result.err());
2839    }
2840
2841    #[test]
2842    fn test_parse_sase_pattern_decl_and() {
2843        let result = parse("pattern BothEvents = Login AND Transaction");
2844        assert!(result.is_ok(), "Failed: {:?}", result.err());
2845    }
2846
2847    #[test]
2848    fn test_parse_sase_pattern_decl_not() {
2849        let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
2850        assert!(result.is_ok(), "Failed: {:?}", result.err());
2851    }
2852
2853    #[test]
2854    fn test_parse_having() {
2855        let result = parse(
2856            "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
2857        );
2858        assert!(result.is_ok(), "Failed: {:?}", result.err());
2859    }
2860
2861    #[test]
2862    fn test_parse_having_with_partition() {
2863        let result = parse(
2864            "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
2865        );
2866        assert!(result.is_ok(), "Failed: {:?}", result.err());
2867    }
2868
2869    #[test]
2870    fn test_parse_timer_source() {
2871        let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
2872        assert!(result.is_ok(), "Failed: {:?}", result.err());
2873    }
2874
2875    #[test]
2876    fn test_parse_timer_source_with_initial_delay() {
2877        let result =
2878            parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
2879        assert!(result.is_ok(), "Failed: {:?}", result.err());
2880    }
2881
2882    #[test]
2883    fn test_parse_var_decl() {
2884        let result = parse("var threshold: float = 10.0");
2885        assert!(result.is_ok(), "Failed: {:?}", result.err());
2886    }
2887
2888    #[test]
2889    fn test_parse_let_decl() {
2890        let result = parse("let max_count: int = 100");
2891        assert!(result.is_ok(), "Failed: {:?}", result.err());
2892    }
2893
2894    #[test]
2895    fn test_parse_assignment() {
2896        let result = parse("threshold := threshold + 10.0");
2897        assert!(result.is_ok(), "Failed: {:?}", result.err());
2898    }
2899
2900    #[test]
2901    fn test_parse_assignment_with_expression() {
2902        let result = parse("count := count * 2 + offset");
2903        assert!(result.is_ok(), "Failed: {:?}", result.err());
2904    }
2905
2906    #[test]
2907    fn test_parse_nested_stream_reference() {
2908        let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
2909        assert!(result.is_ok(), "Failed: {:?}", result.err());
2910    }
2911
2912    #[test]
2913    fn test_parse_multi_stage_pipeline() {
2914        let result = parse(
2915            "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
2916        );
2917        assert!(result.is_ok(), "Failed: {:?}", result.err());
2918    }
2919
2920    #[test]
2921    fn test_parse_stream_with_operations_chain() {
2922        let result = parse(
2923            "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
2924        );
2925        assert!(result.is_ok(), "Failed: {:?}", result.err());
2926    }
2927
2928    // =========================================================================
2929    // Connectivity Architecture Tests
2930    // =========================================================================
2931
2932    #[test]
2933    fn test_parse_connector_mqtt() {
2934        let result = parse(
2935            r#"connector MqttBroker = mqtt (
2936                host: "localhost",
2937                port: 1883,
2938                client_id: "varpulis"
2939            )"#,
2940        );
2941        assert!(result.is_ok(), "Failed: {:?}", result.err());
2942    }
2943
2944    #[test]
2945    fn test_parse_connector_kafka() {
2946        let result = parse(
2947            r#"connector KafkaCluster = kafka (
2948                brokers: ["kafka1:9092", "kafka2:9092"],
2949                group_id: "my-group"
2950            )"#,
2951        );
2952        assert!(result.is_ok(), "Failed: {:?}", result.err());
2953    }
2954
2955    #[test]
2956    fn test_parse_connector_http() {
2957        let result = parse(
2958            r#"connector ApiEndpoint = http (
2959                base_url: "https://api.example.com"
2960            )"#,
2961        );
2962        assert!(result.is_ok(), "Failed: {:?}", result.err());
2963    }
2964
2965    #[test]
2966    fn test_parse_stream_with_from_connector() {
2967        let result = parse(
2968            r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
2969        );
2970        assert!(result.is_ok(), "Failed: {:?}", result.err());
2971    }
2972
2973    #[test]
2974    fn test_parse_stream_with_from_and_operations() {
2975        let result = parse(
2976            r#"stream HighTemp = TemperatureReading
2977                .from(MqttSensors, topic: "sensors/#")
2978                .where(value > 30)
2979                .emit(alert: "high_temp")"#,
2980        );
2981        assert!(result.is_ok(), "Failed: {:?}", result.err());
2982    }
2983
2984    #[test]
2985    fn test_parse_full_connectivity_pipeline() {
2986        let result = parse(
2987            r#"
2988            connector MqttSensors = mqtt (host: "localhost", port: 1883)
2989            connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
2990
2991            event TemperatureReading:
2992                sensor_id: str
2993                value: float
2994                ts: timestamp
2995
2996            stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
2997
2998            stream HighTempAlert = Temperatures
2999                .where(value > 30)
3000                .emit(alert_type: "HIGH_TEMP", temperature: value)
3001
3002            "#,
3003        );
3004        assert!(result.is_ok(), "Failed: {:?}", result.err());
3005    }
3006
3007    #[test]
3008    fn test_parse_emit_as_type() {
3009        let result = parse(
3010            r#"stream Alerts = Temperatures
3011                .where(value > 30)
3012                .emit as AlertEvent(severity: "high", temp: value)"#,
3013        );
3014        assert!(result.is_ok(), "Failed: {:?}", result.err());
3015    }
3016
3017    #[test]
3018    fn test_parse_stream_with_to_connector() {
3019        let result = parse(
3020            r#"stream Output = Input
3021                .where(x > 0)
3022                .emit(y: x * 2)
3023                .to(KafkaOutput, topic: "output")"#,
3024        );
3025        assert!(result.is_ok(), "Failed: {:?}", result.err());
3026    }
3027
3028    #[test]
3029    fn test_emit_stmt_parses() {
3030        let result = parse(
3031            r"fn test():
3032    emit Pixel(x: 1, y: 2)",
3033        );
3034        assert!(result.is_ok(), "Failed: {:?}", result.err());
3035        let program = result.unwrap();
3036        // Find the fn_decl
3037        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3038            match &body[0].node {
3039                Stmt::Emit { event_type, fields } => {
3040                    assert_eq!(event_type, "Pixel");
3041                    assert_eq!(fields.len(), 2);
3042                    assert_eq!(fields[0].name, "x");
3043                    assert_eq!(fields[1].name, "y");
3044                }
3045                other => panic!("Expected Stmt::Emit, got {other:?}"),
3046            }
3047        } else {
3048            panic!("Expected FnDecl");
3049        }
3050    }
3051
3052    #[test]
3053    fn test_emit_stmt_no_args() {
3054        let result = parse(
3055            r"fn test():
3056    emit Done()",
3057        );
3058        assert!(result.is_ok(), "Failed: {:?}", result.err());
3059        let program = result.unwrap();
3060        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3061            match &body[0].node {
3062                Stmt::Emit { event_type, fields } => {
3063                    assert_eq!(event_type, "Done");
3064                    assert!(fields.is_empty());
3065                }
3066                other => panic!("Expected Stmt::Emit, got {other:?}"),
3067            }
3068        } else {
3069            panic!("Expected FnDecl");
3070        }
3071    }
3072
3073    #[test]
3074    fn test_emit_in_function_with_for_loop() {
3075        let result = parse(
3076            r"fn generate(n: int):
3077    for i in 0..n:
3078        emit Item(index: i, value: i * 2)",
3079        );
3080        assert!(result.is_ok(), "Failed: {:?}", result.err());
3081    }
3082
3083    #[test]
3084    fn test_parse_process_op() {
3085        let result = parse(
3086            r"fn do_work():
3087    emit Result(v: 42)
3088
3089stream S = timer(1s).process(do_work())",
3090        );
3091        assert!(result.is_ok(), "Failed: {:?}", result.err());
3092    }
3093
3094    #[test]
3095    fn test_parse_trend_aggregate_count_trends() {
3096        let result = parse(
3097            r#"stream S = StockTick as first
3098    -> all StockTick where price > first.price as rising
3099    -> StockTick where price < rising.price as drop
3100    .within(60s)
3101    .trend_aggregate(count: count_trends())
3102    .emit(event_type: "TrendStats", trends: count)"#,
3103        );
3104        assert!(result.is_ok(), "Failed: {:?}", result.err());
3105        let program = result.unwrap();
3106        // Find the stream declaration and check for TrendAggregate op
3107        for stmt in &program.statements {
3108            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3109                let has_trend_agg = ops
3110                    .iter()
3111                    .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3112                assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3113                // Check that TrendAggregate has the right item
3114                for op in ops {
3115                    if let StreamOp::TrendAggregate(items) = op {
3116                        assert_eq!(items.len(), 1);
3117                        assert_eq!(items[0].alias, "count");
3118                        assert_eq!(items[0].func, "count_trends");
3119                        assert!(items[0].arg.is_none());
3120                    }
3121                }
3122                return;
3123            }
3124        }
3125        panic!("No stream declaration found");
3126    }
3127
3128    #[test]
3129    fn test_parse_trend_aggregate_multiple_items() {
3130        let result = parse(
3131            r"stream S = StockTick as first
3132    -> all StockTick as rising
3133    .within(60s)
3134    .trend_aggregate(
3135        trend_count: count_trends(),
3136        event_count: count_events(rising)
3137    )
3138    .emit(trends: trend_count, events: event_count)",
3139        );
3140        assert!(result.is_ok(), "Failed: {:?}", result.err());
3141        let program = result.unwrap();
3142        for stmt in &program.statements {
3143            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3144                for op in ops {
3145                    if let StreamOp::TrendAggregate(items) = op {
3146                        assert_eq!(items.len(), 2);
3147                        assert_eq!(items[0].alias, "trend_count");
3148                        assert_eq!(items[0].func, "count_trends");
3149                        assert_eq!(items[1].alias, "event_count");
3150                        assert_eq!(items[1].func, "count_events");
3151                        assert!(items[1].arg.is_some());
3152                        return;
3153                    }
3154                }
3155            }
3156        }
3157        panic!("No TrendAggregate found");
3158    }
3159
3160    #[test]
3161    fn test_parse_score_basic() {
3162        let result = parse(
3163            r#"stream S = TradeEvent
3164    .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3165        );
3166        assert!(result.is_ok(), "Failed: {:?}", result.err());
3167        let program = result.unwrap();
3168        for stmt in &program.statements {
3169            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3170                for op in ops {
3171                    if let StreamOp::Score(spec) = op {
3172                        assert_eq!(spec.model_path, "models/fraud.onnx");
3173                        assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3174                        assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3175                        return;
3176                    }
3177                }
3178            }
3179        }
3180        panic!("No Score op found");
3181    }
3182
3183    #[test]
3184    fn test_parse_score_single_field() {
3185        let result = parse(
3186            r#"stream S = Event
3187    .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3188        );
3189        assert!(result.is_ok(), "Failed: {:?}", result.err());
3190        let program = result.unwrap();
3191        for stmt in &program.statements {
3192            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3193                for op in ops {
3194                    if let StreamOp::Score(spec) = op {
3195                        assert_eq!(spec.model_path, "model.onnx");
3196                        assert_eq!(spec.inputs, vec!["value"]);
3197                        assert_eq!(spec.outputs, vec!["prediction"]);
3198                        return;
3199                    }
3200                }
3201            }
3202        }
3203        panic!("No Score op found");
3204    }
3205
3206    // Regression tests for fuzz-discovered parser hangs (2026-02-21).
3207    // Unmatched `[` brackets cause exponential backtracking in pest's PEG
3208    // recursive descent through array_literal / index_access / slice_access.
3209
3210    #[test]
3211    fn fuzz_regression_unmatched_brackets_timeout() {
3212        // Simplified version of the fuzzer-discovered timeout input (28 unmatched '[')
3213        let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3214        let start = std::time::Instant::now();
3215        let result = parse(input);
3216        let elapsed = start.elapsed();
3217        assert!(
3218            result.is_err(),
3219            "Should reject deeply nested unmatched brackets"
3220        );
3221        assert!(
3222            elapsed.as_millis() < 100,
3223            "Parser should reject fast, took {elapsed:?}"
3224        );
3225    }
3226
3227    #[test]
3228    fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3229        // 30 unmatched '[' — must be rejected by nesting depth check
3230        let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3231        let start = std::time::Instant::now();
3232        let result = parse(input);
3233        let elapsed = start.elapsed();
3234        assert!(result.is_err(), "Should reject deeply nested brackets");
3235        assert!(
3236            elapsed.as_millis() < 100,
3237            "Parser should reject fast, took {elapsed:?}"
3238        );
3239    }
3240
3241    #[test]
3242    fn nesting_depth_allows_reasonable_programs() {
3243        // 10 levels of nesting — well within the limit
3244        let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3245        let result = parse(input);
3246        // May fail for other reasons (not a valid program) but should NOT
3247        // fail with "Nesting depth exceeds maximum"
3248        if let Err(ref e) = result {
3249            let msg = format!("{e}");
3250            assert!(
3251                !msg.contains("Nesting depth"),
3252                "Should allow 10 levels of nesting: {msg}"
3253            );
3254        }
3255    }
3256
3257    #[test]
3258    fn nesting_depth_ignores_brackets_in_comments() {
3259        // Brackets inside # comments should not count
3260        let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3261        let result = parse(input);
3262        assert!(
3263            result.is_ok(),
3264            "Brackets in comments should be ignored: {:?}",
3265            result.err()
3266        );
3267    }
3268
3269    #[test]
3270    fn nesting_depth_ignores_brackets_in_strings() {
3271        // Brackets inside strings should not count
3272        let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3273        let result = parse(input);
3274        if let Err(ref e) = result {
3275            let msg = format!("{e}");
3276            assert!(
3277                !msg.contains("Nesting depth"),
3278                "Brackets in strings should be ignored: {msg}"
3279            );
3280        }
3281    }
3282}