Skip to main content

varpulis_parser/
pest_parser.rs

1//! Pest-based parser for VPL
2//!
3//! This module provides parsing using the pest PEG parser generator.
4
5use pest::Parser;
6use pest_derive::Parser;
7
8use crate::error::{ParseError, ParseResult};
9use crate::helpers::{parse_duration, parse_timestamp};
10use crate::indent::preprocess_indentation;
11use varpulis_core::ast::*;
12use varpulis_core::span::{Span, Spanned};
13use varpulis_core::types::Type;
14
15/// Extension trait for safer iterator extraction
16trait IteratorExt<'a> {
17    /// Get the next element or return an error with the expected rule description
18    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>>;
19}
20
21impl<'a> IteratorExt<'a> for pest::iterators::Pairs<'a, Rule> {
22    fn expect_next(&mut self, expected: &str) -> ParseResult<pest::iterators::Pair<'a, Rule>> {
23        self.next().ok_or_else(|| ParseError::Located {
24            line: 0,
25            column: 0,
26            position: 0,
27            message: format!("Expected {}", expected),
28            hint: None,
29        })
30    }
31}
32
33#[derive(Parser)]
34#[grammar = "varpulis.pest"]
35pub struct VarpulisParser;
36
37/// Parse a VPL source string into a Program AST.
38///
39/// Runs pest parsing in a thread with a 16 MB stack to prevent stack overflow
40/// from deeply nested or adversarial inputs that trigger deep recursion in the
41/// PEG recursive descent parser.
42pub fn parse(source: &str) -> ParseResult<Program> {
43    let source = source.to_string();
44    std::thread::Builder::new()
45        .stack_size(16 * 1024 * 1024)
46        .spawn(move || parse_inner(&source))
47        .map_err(|e| ParseError::InvalidToken {
48            position: 0,
49            message: format!("Failed to spawn parser thread: {}", e),
50        })?
51        .join()
52        .unwrap_or_else(|_| {
53            Err(ParseError::InvalidToken {
54                position: 0,
55                message: "Parser stack overflow on deeply nested input".to_string(),
56            })
57        })
58}
59
60/// Maximum bracket nesting depth allowed before pest parsing.
61///
62/// PEG recursive descent can cause exponential backtracking when unmatched
63/// brackets create ambiguity between array_literal, index_access, and
64/// slice_access rules.  Measured scaling is O(2.35^depth): depth 20 takes
65/// 1200s+, depth 16 takes 39s, depth 10 takes under 0.3s.  10 levels is
66/// generous for real VPL programs (typical nesting is 3-6 levels, extreme
67/// real-world is about 8) while keeping worst-case parse time under 1 second.
68const MAX_NESTING_DEPTH: usize = 10;
69
70/// O(n) pre-scan that rejects inputs with bracket nesting deeper than
71/// `MAX_NESTING_DEPTH`. Respects string literals and comments so that
72/// brackets inside `"..."`, `# ...`, or `/* ... */` are ignored.
73fn check_nesting_depth(source: &str) -> ParseResult<()> {
74    let mut depth: usize = 0;
75    let mut max_depth: usize = 0;
76    let mut max_depth_pos: usize = 0;
77    let bytes = source.as_bytes();
78    let len = bytes.len();
79    let mut i = 0;
80
81    while i < len {
82        let b = bytes[i];
83
84        // Skip double-quoted strings
85        if b == b'"' {
86            i += 1;
87            while i < len {
88                if bytes[i] == b'\\' {
89                    i += 2; // skip escaped char
90                    continue;
91                }
92                if bytes[i] == b'"' {
93                    i += 1;
94                    break;
95                }
96                i += 1;
97            }
98            continue;
99        }
100
101        // Skip VPL line comments (# to end of line)
102        if b == b'#' {
103            i += 1;
104            while i < len && bytes[i] != b'\n' {
105                i += 1;
106            }
107            continue;
108        }
109
110        // Skip block comments
111        if b == b'/' && i + 1 < len && bytes[i + 1] == b'*' {
112            i += 2;
113            while i + 1 < len {
114                if bytes[i] == b'*' && bytes[i + 1] == b'/' {
115                    i += 2;
116                    break;
117                }
118                i += 1;
119            }
120            continue;
121        }
122
123        // Track bracket depth
124        if b == b'(' || b == b'[' || b == b'{' {
125            depth += 1;
126            if depth > max_depth {
127                max_depth = depth;
128                max_depth_pos = i;
129            }
130        } else if b == b')' || b == b']' || b == b'}' {
131            depth = depth.saturating_sub(1);
132        }
133
134        if max_depth > MAX_NESTING_DEPTH {
135            return Err(ParseError::InvalidToken {
136                position: max_depth_pos,
137                message: format!(
138                    "Nesting depth exceeds maximum of {} levels",
139                    MAX_NESTING_DEPTH
140                ),
141            });
142        }
143
144        i += 1;
145    }
146
147    Ok(())
148}
149
150fn parse_inner(source: &str) -> ParseResult<Program> {
151    // Expand compile-time declaration loops (top-level for with {var} interpolation)
152    let expanded =
153        crate::expand::expand_declaration_loops(source).map_err(|e| ParseError::InvalidToken {
154            position: 0,
155            message: e,
156        })?;
157    // Preprocess to add INDENT/DEDENT markers
158    let preprocessed = preprocess_indentation(&expanded);
159
160    // Reject deeply nested input before pest parsing to prevent stack overflow
161    check_nesting_depth(&preprocessed)?;
162
163    let pairs = VarpulisParser::parse(Rule::program, &preprocessed).map_err(convert_pest_error)?;
164
165    let mut statements = Vec::new();
166
167    for pair in pairs {
168        if pair.as_rule() == Rule::program {
169            for inner in pair.into_inner() {
170                if inner.as_rule() == Rule::statement {
171                    statements.push(parse_statement(inner)?);
172                }
173            }
174        }
175    }
176
177    Ok(crate::optimize::fold_program(Program { statements }))
178}
179
180fn convert_pest_error(e: pest::error::Error<Rule>) -> ParseError {
181    let position = match e.location {
182        pest::error::InputLocation::Pos(p) => p,
183        pest::error::InputLocation::Span((s, _)) => s,
184    };
185
186    // Extract line/column from pest error
187    let (line, column) = match e.line_col {
188        pest::error::LineColLocation::Pos((l, c)) => (l, c),
189        pest::error::LineColLocation::Span((l, c), _) => (l, c),
190    };
191
192    // Create a human-readable message based on what was expected
193    let (message, hint) = match &e.variant {
194        pest::error::ErrorVariant::ParsingError {
195            positives,
196            negatives: _,
197        } => {
198            if positives.is_empty() {
199                ("Unexpected token".to_string(), None)
200            } else if is_stream_op_error(positives) {
201                // All positives are stream operations — produce a concise error
202                (
203                    "unknown stream operation".to_string(),
204                    Some(
205                        "valid operations: .where(), .select(), .emit(), .window(), .aggregate(), \
206                         .partition_by(), .within(), .having(), .to(), .context(), .log(), .print(), \
207                         .enrich(), .forecast(), .trend_aggregate(), .watermark(), .tap()"
208                            .to_string(),
209                    ),
210                )
211            } else {
212                let expected: Vec<String> = positives.iter().map(format_rule_name).collect();
213                if expected.len() == 1 {
214                    (format!("Expected {}", expected[0]), None)
215                } else {
216                    (format!("Expected one of: {}", expected.join(", ")), None)
217                }
218            }
219        }
220        pest::error::ErrorVariant::CustomError { message } => (message.clone(), None),
221    };
222
223    ParseError::Located {
224        line,
225        column,
226        position,
227        message,
228        hint,
229    }
230}
231
232/// Convert pest Rule names to human-readable format
233fn format_rule_name(rule: &Rule) -> String {
234    match rule {
235        Rule::identifier => "identifier".to_string(),
236        Rule::integer => "number".to_string(),
237        Rule::float => "number".to_string(),
238        Rule::string => "string".to_string(),
239        Rule::primitive_type => "type (int, float, bool, str, timestamp, duration)".to_string(),
240        Rule::type_expr => "type".to_string(),
241        Rule::expr => "expression".to_string(),
242        Rule::statement => "statement".to_string(),
243        Rule::context_decl => "context declaration".to_string(),
244        Rule::stream_decl => "stream declaration".to_string(),
245        Rule::pattern_decl => "pattern declaration".to_string(),
246        Rule::event_decl => "event declaration".to_string(),
247        Rule::fn_decl => "function declaration".to_string(),
248        Rule::INDENT => "indented block".to_string(),
249        Rule::DEDENT => "end of block".to_string(),
250        Rule::field => "field declaration (name: type)".to_string(),
251        Rule::comparison_op => "comparison operator (==, !=, <, >, <=, >=)".to_string(),
252        Rule::additive_op => "operator (+, -)".to_string(),
253        Rule::multiplicative_op => "operator (*, /, %)".to_string(),
254        Rule::postfix_suffix => "method call or member access".to_string(),
255        Rule::sase_pattern_expr => "SASE pattern expression".to_string(),
256        Rule::sase_seq_expr => "SEQ expression".to_string(),
257        Rule::kleene_op => "Kleene operator (+, *, ?)".to_string(),
258        _ => format!("{:?}", rule).to_lowercase().replace('_', " "),
259    }
260}
261
262/// Returns true when all positives look like stream operation rules.
263/// This is used to produce a concise "unknown stream operation" error
264/// instead of listing 30+ individual operation names.
265fn is_stream_op_error(positives: &[Rule]) -> bool {
266    const STREAM_OP_RULES: &[Rule] = &[
267        Rule::context_op,
268        Rule::where_op,
269        Rule::select_op,
270        Rule::window_op,
271        Rule::aggregate_op,
272        Rule::having_op,
273        Rule::partition_by_op,
274        Rule::order_by_op,
275        Rule::limit_op,
276        Rule::distinct_op,
277        Rule::map_op,
278        Rule::filter_op,
279        Rule::tap_op,
280        Rule::print_op,
281        Rule::log_op,
282        Rule::emit_op,
283        Rule::to_op,
284        Rule::pattern_op,
285        Rule::concurrent_op,
286        Rule::process_op,
287        Rule::on_error_op,
288        Rule::collect_op,
289        Rule::on_op,
290        Rule::within_op,
291        Rule::not_op,
292        Rule::fork_op,
293        Rule::any_op,
294        Rule::all_op,
295        Rule::first_op,
296        Rule::watermark_op,
297        Rule::allowed_lateness_op,
298        Rule::trend_aggregate_op,
299        Rule::score_op,
300        Rule::forecast_op,
301        Rule::enrich_op,
302    ];
303    positives.len() >= 10 && positives.iter().all(|r| STREAM_OP_RULES.contains(r))
304}
305
306fn parse_statement(pair: pest::iterators::Pair<Rule>) -> ParseResult<Spanned<Stmt>> {
307    let span = Span::new(pair.as_span().start(), pair.as_span().end());
308    let inner = pair.into_inner().expect_next("statement body")?;
309
310    let stmt = match inner.as_rule() {
311        Rule::context_decl => parse_context_decl(inner)?,
312        Rule::connector_decl => parse_connector_decl(inner)?,
313        Rule::stream_decl => parse_stream_decl(inner)?,
314        Rule::pattern_decl => parse_pattern_decl(inner)?,
315        Rule::event_decl => parse_event_decl(inner)?,
316        Rule::type_decl => parse_type_decl(inner)?,
317        Rule::var_decl => parse_var_decl(inner)?,
318        Rule::const_decl => parse_const_decl(inner)?,
319        Rule::fn_decl => parse_fn_decl(inner)?,
320        Rule::config_block => parse_config_block(inner)?,
321        Rule::import_stmt => parse_import_stmt(inner)?,
322        Rule::if_stmt => parse_if_stmt(inner)?,
323        Rule::for_stmt => parse_for_stmt(inner)?,
324        Rule::while_stmt => parse_while_stmt(inner)?,
325        Rule::return_stmt => parse_return_stmt(inner)?,
326        Rule::break_stmt => Stmt::Break,
327        Rule::continue_stmt => Stmt::Continue,
328        Rule::emit_stmt => parse_emit_stmt(inner)?,
329        Rule::assignment_stmt => {
330            let mut inner = inner.into_inner();
331            let name = inner.expect_next("variable name")?.as_str().to_string();
332            let value = parse_expr(inner.expect_next("assignment value")?)?;
333            Stmt::Assignment { name, value }
334        }
335        Rule::expr_stmt => Stmt::Expr(parse_expr(inner.into_inner().expect_next("expression")?)?),
336        _ => {
337            return Err(ParseError::UnexpectedToken {
338                position: span.start,
339                expected: "statement".to_string(),
340                found: format!("{:?}", inner.as_rule()),
341            })
342        }
343    };
344
345    Ok(Spanned::new(stmt, span))
346}
347
348// ============================================================================
349// Context Declaration Parsing
350// ============================================================================
351
352fn parse_context_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
353    let mut inner = pair.into_inner();
354    let name = inner.expect_next("context name")?.as_str().to_string();
355    let mut cores = None;
356
357    for p in inner {
358        if p.as_rule() == Rule::context_params {
359            for param in p.into_inner() {
360                if param.as_rule() == Rule::context_param {
361                    // Parse cores: [0, 1, 2]
362                    let core_ids: Vec<usize> = param
363                        .into_inner()
364                        .filter(|p| p.as_rule() == Rule::integer)
365                        .map(|p| p.as_str().parse::<usize>().unwrap_or(0))
366                        .collect();
367                    cores = Some(core_ids);
368                }
369            }
370        }
371    }
372
373    Ok(Stmt::ContextDecl { name, cores })
374}
375
376// ============================================================================
377// Connector Parsing
378// ============================================================================
379
380fn parse_connector_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
381    let mut inner = pair.into_inner();
382    let name = inner.expect_next("connector name")?.as_str().to_string();
383    let connector_type = inner.expect_next("connector type")?.as_str().to_string();
384    let mut params = Vec::new();
385
386    for p in inner {
387        if p.as_rule() == Rule::connector_params {
388            params = parse_connector_params(p)?;
389        }
390    }
391
392    Ok(Stmt::ConnectorDecl {
393        name,
394        connector_type,
395        params,
396    })
397}
398
399fn parse_connector_params(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<ConnectorParam>> {
400    let mut params = Vec::new();
401    for p in pair.into_inner() {
402        if p.as_rule() == Rule::connector_param {
403            let mut inner = p.into_inner();
404            let name = inner.expect_next("param name")?.as_str().to_string();
405            let value_pair = inner.expect_next("param value")?;
406            let value = parse_config_value(value_pair)?;
407            params.push(ConnectorParam { name, value });
408        }
409    }
410    Ok(params)
411}
412
413fn parse_stream_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
414    let mut inner = pair.into_inner();
415    let name = inner.expect_next("stream name")?.as_str().to_string();
416
417    let mut type_annotation = None;
418    let mut source = StreamSource::Ident("".to_string());
419    let mut ops = Vec::new();
420    let mut op_spans = Vec::new();
421
422    for p in inner {
423        match p.as_rule() {
424            Rule::type_annotation => {
425                type_annotation = Some(parse_type(p.into_inner().expect_next("type")?)?);
426            }
427            Rule::stream_expr => {
428                let (s, o, spans) = parse_stream_expr(p)?;
429                source = s;
430                ops = o;
431                op_spans = spans;
432            }
433            _ => {}
434        }
435    }
436
437    Ok(Stmt::StreamDecl {
438        name,
439        type_annotation,
440        source,
441        ops,
442        op_spans,
443    })
444}
445
446fn parse_stream_expr(
447    pair: pest::iterators::Pair<Rule>,
448) -> ParseResult<(StreamSource, Vec<StreamOp>, Vec<varpulis_core::span::Span>)> {
449    let mut inner = pair.into_inner();
450    let source = parse_stream_source(inner.expect_next("stream source")?)?;
451    let mut ops = Vec::new();
452    let mut op_spans = Vec::new();
453
454    for p in inner {
455        if p.as_rule() == Rule::stream_op {
456            let pest_span = p.as_span();
457            let span = varpulis_core::span::Span::new(pest_span.start(), pest_span.end());
458            ops.push(parse_stream_op(p)?);
459            op_spans.push(span);
460        }
461    }
462
463    Ok((source, ops, op_spans))
464}
465
466// ============================================================================
467// SASE+ Pattern Declaration Parsing
468// ============================================================================
469
470fn parse_pattern_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
471    let mut inner = pair.into_inner();
472    let name = inner.expect_next("pattern name")?.as_str().to_string();
473
474    let mut expr = SasePatternExpr::Event("".to_string());
475    let mut within = None;
476    let mut partition_by = None;
477
478    for p in inner {
479        match p.as_rule() {
480            Rule::sase_pattern_expr => {
481                expr = parse_sase_pattern_expr(p)?;
482            }
483            Rule::pattern_within_clause => {
484                let dur_pair = p.into_inner().expect_next("within duration")?;
485                within = Some(Expr::Duration(
486                    parse_duration(dur_pair.as_str()).map_err(ParseError::InvalidDuration)?,
487                ));
488            }
489            Rule::pattern_partition_clause => {
490                let key = p
491                    .into_inner()
492                    .expect_next("partition key")?
493                    .as_str()
494                    .to_string();
495                partition_by = Some(Expr::Ident(key));
496            }
497            _ => {}
498        }
499    }
500
501    Ok(Stmt::PatternDecl {
502        name,
503        expr,
504        within,
505        partition_by,
506    })
507}
508
509fn parse_sase_pattern_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
510    let inner = pair.into_inner().expect_next("SASE pattern expression")?;
511    parse_sase_or_expr(inner)
512}
513
514fn parse_sase_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
515    let mut inner = pair.into_inner();
516    let mut left = parse_sase_and_expr(inner.expect_next("OR expression operand")?)?;
517
518    for right_pair in inner {
519        let right = parse_sase_and_expr(right_pair)?;
520        left = SasePatternExpr::Or(Box::new(left), Box::new(right));
521    }
522
523    Ok(left)
524}
525
526fn parse_sase_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
527    let mut inner = pair.into_inner();
528    let mut left = parse_sase_not_expr(inner.expect_next("AND expression operand")?)?;
529
530    for right_pair in inner {
531        let right = parse_sase_not_expr(right_pair)?;
532        left = SasePatternExpr::And(Box::new(left), Box::new(right));
533    }
534
535    Ok(left)
536}
537
538fn parse_sase_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
539    let mut inner = pair.into_inner();
540    let first = inner.expect_next("NOT or primary expression")?;
541
542    if first.as_str() == "NOT" {
543        let expr = parse_sase_primary_expr(inner.expect_next("expression after NOT")?)?;
544        Ok(SasePatternExpr::Not(Box::new(expr)))
545    } else {
546        parse_sase_primary_expr(first)
547    }
548}
549
550fn parse_sase_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
551    let inner = pair.into_inner().expect_next("SASE primary expression")?;
552
553    match inner.as_rule() {
554        Rule::sase_seq_expr => parse_sase_seq_expr(inner),
555        Rule::sase_grouped_expr => {
556            let nested = inner.into_inner().expect_next("grouped expression")?;
557            let expr = parse_sase_pattern_expr(nested)?;
558            Ok(SasePatternExpr::Group(Box::new(expr)))
559        }
560        Rule::sase_event_ref => parse_sase_event_ref(inner),
561        _ => Ok(SasePatternExpr::Event(inner.as_str().to_string())),
562    }
563}
564
565fn parse_sase_seq_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
566    let mut items = Vec::new();
567
568    for p in pair.into_inner() {
569        if p.as_rule() == Rule::sase_seq_items {
570            for item in p.into_inner() {
571                if item.as_rule() == Rule::sase_seq_item {
572                    items.push(parse_sase_seq_item(item)?);
573                }
574            }
575        }
576    }
577
578    Ok(SasePatternExpr::Seq(items))
579}
580
581fn parse_sase_seq_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternItem> {
582    let inner = pair.into_inner().expect_next("sequence item")?;
583
584    match inner.as_rule() {
585        Rule::sase_negated_item => parse_sase_item_inner(inner, true),
586        Rule::sase_positive_item => parse_sase_item_inner(inner, false),
587        _ => parse_sase_item_inner(inner, false),
588    }
589}
590
591fn parse_sase_item_inner(
592    pair: pest::iterators::Pair<Rule>,
593    _negated: bool,
594) -> ParseResult<SasePatternItem> {
595    let mut inner = pair.into_inner();
596    let event_type = inner.expect_next("event type")?.as_str().to_string();
597
598    let mut kleene = None;
599    let mut filter = None;
600    let mut alias = None;
601
602    for p in inner {
603        match p.as_rule() {
604            Rule::kleene_op => {
605                kleene = Some(match p.as_str() {
606                    "+" => KleeneOp::Plus,
607                    "*" => KleeneOp::Star,
608                    "?" => KleeneOp::Optional,
609                    _ => KleeneOp::Plus,
610                });
611            }
612            Rule::sase_where_clause => {
613                filter = Some(parse_expr(
614                    p.into_inner().expect_next("filter expression")?,
615                )?);
616            }
617            Rule::sase_alias_clause => {
618                alias = Some(p.into_inner().expect_next("alias")?.as_str().to_string());
619            }
620            _ => {}
621        }
622    }
623
624    // For negated items, we prefix with "!" to indicate negation
625    // The runtime will interpret this
626    let event_type = if _negated {
627        format!("!{}", event_type)
628    } else {
629        event_type
630    };
631
632    Ok(SasePatternItem {
633        event_type,
634        alias,
635        kleene,
636        filter,
637    })
638}
639
640fn parse_sase_event_ref(pair: pest::iterators::Pair<Rule>) -> ParseResult<SasePatternExpr> {
641    // Single event reference with optional kleene, where, alias
642    let item = parse_sase_item_inner(pair, false)?;
643
644    // If it's a simple event with no modifiers, return Event variant
645    if item.alias.is_none() && item.kleene.is_none() && item.filter.is_none() {
646        Ok(SasePatternExpr::Event(item.event_type))
647    } else {
648        // Otherwise wrap in a single-item Seq
649        Ok(SasePatternExpr::Seq(vec![item]))
650    }
651}
652
653fn parse_stream_source(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamSource> {
654    let inner = pair.into_inner().expect_next("stream source type")?;
655
656    match inner.as_rule() {
657        Rule::from_connector_source => {
658            let mut inner_iter = inner.into_inner();
659            let event_type = inner_iter.expect_next("event type")?.as_str().to_string();
660            let connector_name = inner_iter
661                .expect_next("connector name")?
662                .as_str()
663                .to_string();
664            let mut params = Vec::new();
665            for p in inner_iter {
666                if p.as_rule() == Rule::connector_params {
667                    params = parse_connector_params(p)?;
668                }
669            }
670            Ok(StreamSource::FromConnector {
671                event_type,
672                connector_name,
673                params,
674            })
675        }
676        Rule::merge_source => {
677            let mut streams = Vec::new();
678            for p in inner.into_inner() {
679                if p.as_rule() == Rule::inline_stream_list {
680                    for is in p.into_inner() {
681                        streams.push(parse_inline_stream(is)?);
682                    }
683                }
684            }
685            Ok(StreamSource::Merge(streams))
686        }
687        Rule::join_source
688        | Rule::left_join_source
689        | Rule::right_join_source
690        | Rule::full_join_source => {
691            let join_type = match inner.as_rule() {
692                Rule::left_join_source => varpulis_core::ast::JoinType::Left,
693                Rule::right_join_source => varpulis_core::ast::JoinType::Right,
694                Rule::full_join_source => varpulis_core::ast::JoinType::Full,
695                _ => varpulis_core::ast::JoinType::Inner,
696            };
697            let mut clauses = Vec::new();
698            for p in inner.into_inner() {
699                if p.as_rule() == Rule::join_clause_list {
700                    for jc in p.into_inner() {
701                        clauses.push(parse_join_clause(jc, join_type)?);
702                    }
703                }
704            }
705            Ok(StreamSource::Join(clauses))
706        }
707        Rule::sequence_source => {
708            let decl =
709                parse_sequence_decl(inner.into_inner().expect_next("sequence declaration")?)?;
710            Ok(StreamSource::Sequence(decl))
711        }
712        Rule::timer_source => {
713            let timer_args = inner.into_inner().expect_next("timer arguments")?;
714            let decl = parse_timer_decl(timer_args)?;
715            Ok(StreamSource::Timer(decl))
716        }
717        Rule::all_source => {
718            let mut inner_iter = inner.into_inner();
719            let name = inner_iter.expect_next("event name")?.as_str().to_string();
720            let alias = inner_iter.next().map(|p| p.as_str().to_string());
721            Ok(StreamSource::AllWithAlias { name, alias })
722        }
723        Rule::aliased_source => {
724            let mut inner_iter = inner.into_inner();
725            let name = inner_iter.expect_next("event name")?.as_str().to_string();
726            let alias = inner_iter.expect_next("alias")?.as_str().to_string();
727            Ok(StreamSource::IdentWithAlias { name, alias })
728        }
729        Rule::identifier => Ok(StreamSource::Ident(inner.as_str().to_string())),
730        _ => Err(ParseError::UnexpectedToken {
731            position: 0,
732            expected: "stream source".to_string(),
733            found: format!("{:?}", inner.as_rule()),
734        }),
735    }
736}
737
738fn parse_inline_stream(pair: pest::iterators::Pair<Rule>) -> ParseResult<InlineStreamDecl> {
739    let mut inner = pair.into_inner();
740
741    // Check if it's a simple identifier or full declaration
742    let first = inner.expect_next("stream identifier")?;
743    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
744        let name = first.as_str().to_string();
745        return Ok(InlineStreamDecl {
746            name: name.clone(),
747            source: name,
748            filter: None,
749        });
750    }
751
752    let name = first.as_str().to_string();
753    let source = inner.expect_next("stream source")?.as_str().to_string();
754    let filter = inner.next().map(|p| parse_expr(p)).transpose()?;
755
756    Ok(InlineStreamDecl {
757        name,
758        source,
759        filter,
760    })
761}
762
763fn parse_join_clause(
764    pair: pest::iterators::Pair<Rule>,
765    join_type: varpulis_core::ast::JoinType,
766) -> ParseResult<JoinClause> {
767    let mut inner = pair.into_inner();
768
769    let first = inner.expect_next("join clause identifier")?;
770    if first.as_rule() == Rule::identifier && inner.clone().next().is_none() {
771        let name = first.as_str().to_string();
772        return Ok(JoinClause {
773            name: name.clone(),
774            source: name,
775            on: None,
776            join_type,
777        });
778    }
779
780    let name = first.as_str().to_string();
781    let source = inner.expect_next("join source")?.as_str().to_string();
782    let on = inner.next().map(|p| parse_expr(p)).transpose()?;
783
784    Ok(JoinClause {
785        name,
786        source,
787        on,
788        join_type,
789    })
790}
791
792fn parse_sequence_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceDecl> {
793    let mut steps = Vec::new();
794
795    for p in pair.into_inner() {
796        if p.as_rule() == Rule::sequence_step {
797            steps.push(parse_sequence_step(p)?);
798        }
799    }
800
801    Ok(SequenceDecl {
802        match_all: false,
803        timeout: None,
804        steps,
805    })
806}
807
808fn parse_sequence_step(pair: pest::iterators::Pair<Rule>) -> ParseResult<SequenceStepDecl> {
809    let mut inner = pair.into_inner();
810    let alias = inner.expect_next("step alias")?.as_str().to_string();
811    let event_type = inner.expect_next("event type")?.as_str().to_string();
812
813    let mut filter = None;
814    let mut timeout = None;
815
816    for p in inner {
817        match p.as_rule() {
818            Rule::or_expr => filter = Some(parse_expr(p)?),
819            Rule::within_suffix => {
820                let expr = p.into_inner().expect_next("within duration")?;
821                timeout = Some(Box::new(parse_expr(expr)?));
822            }
823            _ => {}
824        }
825    }
826
827    Ok(SequenceStepDecl {
828        alias,
829        event_type,
830        filter,
831        timeout,
832    })
833}
834
835fn parse_timer_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<TimerDecl> {
836    let mut inner = pair.into_inner();
837
838    // First argument is the interval (duration expression)
839    let interval = parse_expr(inner.expect_next("timer interval")?)?;
840
841    // Optional second argument is initial_delay (named argument)
842    let mut initial_delay = None;
843    for p in inner {
844        if p.as_rule() == Rule::named_arg {
845            let arg = parse_named_arg(p)?;
846            if arg.name == "initial_delay" {
847                initial_delay = Some(Box::new(arg.value));
848            }
849        }
850    }
851
852    Ok(TimerDecl {
853        interval,
854        initial_delay,
855    })
856}
857
858fn parse_stream_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
859    let inner = pair.into_inner().expect_next("stream operation")?;
860
861    match inner.as_rule() {
862        Rule::dot_op => {
863            let op_inner = inner.into_inner().expect_next("dot operation")?;
864            parse_dot_op(op_inner)
865        }
866        Rule::followed_by_op => parse_followed_by_op(inner),
867        _ => Err(ParseError::UnexpectedToken {
868            position: 0,
869            expected: "stream operation".to_string(),
870            found: format!("{:?}", inner.as_rule()),
871        }),
872    }
873}
874
875fn parse_dot_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
876    match pair.as_rule() {
877        Rule::context_op => {
878            let name = pair
879                .into_inner()
880                .expect_next("context name")?
881                .as_str()
882                .to_string();
883            Ok(StreamOp::Context(name))
884        }
885        Rule::where_op => {
886            let expr = parse_expr(pair.into_inner().expect_next("where expression")?)?;
887            Ok(StreamOp::Where(expr))
888        }
889        Rule::select_op => {
890            let mut items = Vec::new();
891            for p in pair.into_inner() {
892                if p.as_rule() == Rule::select_list {
893                    for si in p.into_inner() {
894                        items.push(parse_select_item(si)?);
895                    }
896                }
897            }
898            Ok(StreamOp::Select(items))
899        }
900        Rule::window_op => {
901            let args = parse_window_args(pair.into_inner().expect_next("window arguments")?)?;
902            Ok(StreamOp::Window(args))
903        }
904        Rule::aggregate_op => {
905            let mut items = Vec::new();
906            for p in pair.into_inner() {
907                if p.as_rule() == Rule::agg_list {
908                    for ai in p.into_inner() {
909                        items.push(parse_agg_item(ai)?);
910                    }
911                }
912            }
913            Ok(StreamOp::Aggregate(items))
914        }
915        Rule::having_op => {
916            let expr = parse_expr(pair.into_inner().expect_next("having expression")?)?;
917            Ok(StreamOp::Having(expr))
918        }
919        Rule::map_op => {
920            let expr = parse_expr(pair.into_inner().expect_next("map expression")?)?;
921            Ok(StreamOp::Map(expr))
922        }
923        Rule::filter_op => {
924            let expr = parse_expr(pair.into_inner().expect_next("filter expression")?)?;
925            Ok(StreamOp::Filter(expr))
926        }
927        Rule::within_op => {
928            let expr = parse_expr(pair.into_inner().expect_next("within duration")?)?;
929            Ok(StreamOp::Within(expr))
930        }
931        Rule::emit_op => {
932            let mut output_type = None;
933            let mut fields = Vec::new();
934            let mut target_context = None;
935            for p in pair.into_inner() {
936                match p.as_rule() {
937                    Rule::emit_type_cast => {
938                        output_type = Some(
939                            p.into_inner()
940                                .expect_next("type name")?
941                                .as_str()
942                                .to_string(),
943                        );
944                    }
945                    Rule::named_arg_list => {
946                        for arg in p.into_inner() {
947                            let parsed = parse_named_arg(arg)?;
948                            // Extract `context: ctx_name` as target_context
949                            if parsed.name == "context" {
950                                if let Expr::Ident(ctx_name) = &parsed.value {
951                                    target_context = Some(ctx_name.clone());
952                                    continue;
953                                }
954                            }
955                            fields.push(parsed);
956                        }
957                    }
958                    _ => {}
959                }
960            }
961            Ok(StreamOp::Emit {
962                output_type,
963                fields,
964                target_context,
965            })
966        }
967        Rule::print_op => {
968            let exprs = pair
969                .into_inner()
970                .filter(|p| p.as_rule() == Rule::expr_list)
971                .flat_map(|p| p.into_inner())
972                .map(parse_expr)
973                .collect::<ParseResult<Vec<_>>>()?;
974            Ok(StreamOp::Print(exprs))
975        }
976        Rule::collect_op => Ok(StreamOp::Collect),
977        Rule::pattern_op => {
978            let def_pair = pair.into_inner().expect_next("pattern definition")?;
979            let mut inner = def_pair.into_inner();
980            let name = inner.expect_next("pattern name")?.as_str().to_string();
981            let body_pair = inner.expect_next("pattern body")?;
982
983            // pattern_body can be lambda_expr or pattern_or_expr
984            let body_inner = body_pair.into_inner().expect_next("pattern expression")?;
985            let matcher = match body_inner.as_rule() {
986                Rule::lambda_expr => parse_lambda_expr(body_inner)?,
987                Rule::pattern_or_expr => parse_pattern_expr_as_expr(body_inner)?,
988                _ => parse_expr_inner(body_inner)?,
989            };
990            Ok(StreamOp::Pattern(PatternDef { name, matcher }))
991        }
992        Rule::partition_by_op => {
993            let expr = parse_expr(pair.into_inner().expect_next("partition expression")?)?;
994            Ok(StreamOp::PartitionBy(expr))
995        }
996        Rule::order_by_op => {
997            let mut items = Vec::new();
998            for p in pair.into_inner() {
999                if p.as_rule() == Rule::order_list {
1000                    for oi in p.into_inner() {
1001                        items.push(parse_order_item(oi)?);
1002                    }
1003                }
1004            }
1005            Ok(StreamOp::OrderBy(items))
1006        }
1007        Rule::limit_op => {
1008            let expr = parse_expr(pair.into_inner().expect_next("limit expression")?)?;
1009            Ok(StreamOp::Limit(expr))
1010        }
1011        Rule::distinct_op => {
1012            let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1013            Ok(StreamOp::Distinct(expr))
1014        }
1015        Rule::tap_op => {
1016            let args = pair
1017                .into_inner()
1018                .filter(|p| p.as_rule() == Rule::named_arg_list)
1019                .flat_map(|p| p.into_inner())
1020                .map(parse_named_arg)
1021                .collect::<ParseResult<Vec<_>>>()?;
1022            Ok(StreamOp::Tap(args))
1023        }
1024        Rule::log_op => {
1025            let args = pair
1026                .into_inner()
1027                .filter(|p| p.as_rule() == Rule::named_arg_list)
1028                .flat_map(|p| p.into_inner())
1029                .map(parse_named_arg)
1030                .collect::<ParseResult<Vec<_>>>()?;
1031            Ok(StreamOp::Log(args))
1032        }
1033        Rule::to_op => {
1034            let mut inner = pair.into_inner();
1035            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1036            let mut params = Vec::new();
1037            for p in inner {
1038                if p.as_rule() == Rule::connector_params {
1039                    params = parse_connector_params(p)?;
1040                }
1041            }
1042            Ok(StreamOp::To {
1043                connector_name,
1044                params,
1045            })
1046        }
1047        Rule::process_op => {
1048            let expr = parse_expr(pair.into_inner().expect_next("process expression")?)?;
1049            Ok(StreamOp::Process(expr))
1050        }
1051        Rule::on_error_op => {
1052            let expr = parse_expr(pair.into_inner().expect_next("on_error handler")?)?;
1053            Ok(StreamOp::OnError(expr))
1054        }
1055        Rule::on_op => {
1056            let expr = parse_expr(pair.into_inner().expect_next("on handler")?)?;
1057            Ok(StreamOp::On(expr))
1058        }
1059        Rule::not_op => {
1060            let mut inner = pair.into_inner();
1061            let event_type = inner.expect_next("event type")?.as_str().to_string();
1062            let filter = inner.next().map(parse_expr).transpose()?;
1063            Ok(StreamOp::Not(FollowedByClause {
1064                event_type,
1065                filter,
1066                alias: None,
1067                match_all: false,
1068            }))
1069        }
1070        Rule::fork_op => {
1071            let mut paths = Vec::new();
1072            for p in pair.into_inner() {
1073                if p.as_rule() == Rule::fork_path_list {
1074                    for fp in p.into_inner() {
1075                        paths.push(parse_fork_path(fp)?);
1076                    }
1077                }
1078            }
1079            Ok(StreamOp::Fork(paths))
1080        }
1081        Rule::any_op => {
1082            let count = pair
1083                .into_inner()
1084                .next()
1085                .map(|p| p.as_str().parse().unwrap_or(1));
1086            Ok(StreamOp::Any(count))
1087        }
1088        Rule::all_op => Ok(StreamOp::All),
1089        Rule::first_op => Ok(StreamOp::First),
1090        Rule::concurrent_op => {
1091            let args = pair
1092                .into_inner()
1093                .filter(|p| p.as_rule() == Rule::named_arg_list)
1094                .flat_map(|p| p.into_inner())
1095                .map(parse_named_arg)
1096                .collect::<ParseResult<Vec<_>>>()?;
1097            Ok(StreamOp::Concurrent(args))
1098        }
1099        Rule::watermark_op => {
1100            let args = pair
1101                .into_inner()
1102                .filter(|p| p.as_rule() == Rule::named_arg_list)
1103                .flat_map(|p| p.into_inner())
1104                .map(parse_named_arg)
1105                .collect::<ParseResult<Vec<_>>>()?;
1106            Ok(StreamOp::Watermark(args))
1107        }
1108        Rule::allowed_lateness_op => {
1109            let expr = parse_expr(pair.into_inner().expect_next("allowed lateness duration")?)?;
1110            Ok(StreamOp::AllowedLateness(expr))
1111        }
1112        Rule::trend_aggregate_op => {
1113            let mut items = Vec::new();
1114            for p in pair.into_inner() {
1115                if p.as_rule() == Rule::trend_agg_list {
1116                    for item_pair in p.into_inner() {
1117                        if item_pair.as_rule() == Rule::trend_agg_item {
1118                            let mut inner = item_pair.into_inner();
1119                            let alias = inner.expect_next("trend agg alias")?.as_str().to_string();
1120                            let func_pair = inner.expect_next("trend agg function")?;
1121                            let mut func_inner = func_pair.into_inner();
1122                            let func_name = func_inner
1123                                .expect_next("function name")?
1124                                .as_str()
1125                                .to_string();
1126                            let arg = func_inner.next().map(parse_expr).transpose()?;
1127                            items.push(TrendAggItem {
1128                                alias,
1129                                func: func_name,
1130                                arg,
1131                            });
1132                        }
1133                    }
1134                }
1135            }
1136            Ok(StreamOp::TrendAggregate(items))
1137        }
1138        Rule::forecast_op => {
1139            let mut confidence = None;
1140            let mut horizon = None;
1141            let mut warmup = None;
1142            let mut max_depth = None;
1143            let mut hawkes = None;
1144            let mut conformal = None;
1145            let mut mode = None;
1146            for p in pair.into_inner() {
1147                if p.as_rule() == Rule::forecast_params {
1148                    for param_pair in p.into_inner() {
1149                        if param_pair.as_rule() == Rule::forecast_param {
1150                            let mut inner = param_pair.into_inner();
1151                            let name = inner.expect_next("forecast param name")?.as_str();
1152                            let value_pair = inner.expect_next("forecast param value")?;
1153                            let expr = parse_expr(value_pair)?;
1154                            match name {
1155                                "confidence" => confidence = Some(expr),
1156                                "horizon" => horizon = Some(expr),
1157                                "warmup" => warmup = Some(expr),
1158                                "max_depth" => max_depth = Some(expr),
1159                                "hawkes" => hawkes = Some(expr),
1160                                "conformal" => conformal = Some(expr),
1161                                "mode" => mode = Some(expr),
1162                                _ => {}
1163                            }
1164                        }
1165                    }
1166                }
1167            }
1168            Ok(StreamOp::Forecast(ForecastSpec {
1169                confidence,
1170                horizon,
1171                warmup,
1172                max_depth,
1173                hawkes,
1174                conformal,
1175                mode,
1176            }))
1177        }
1178        Rule::enrich_op => {
1179            let mut inner = pair.into_inner();
1180            let connector_name = inner.expect_next("connector name")?.as_str().to_string();
1181            let mut key_expr = None;
1182            let mut fields = Vec::new();
1183            let mut cache_ttl = None;
1184            let mut timeout = None;
1185            let mut fallback = None;
1186            for p in inner {
1187                if p.as_rule() == Rule::enrich_params {
1188                    for param_pair in p.into_inner() {
1189                        if param_pair.as_rule() == Rule::enrich_param {
1190                            let param_inner =
1191                                param_pair.into_inner().expect_next("enrich param")?;
1192                            match param_inner.as_rule() {
1193                                Rule::enrich_key_param => {
1194                                    let expr_pair =
1195                                        param_inner.into_inner().expect_next("key expression")?;
1196                                    key_expr = Some(parse_expr(expr_pair)?);
1197                                }
1198                                Rule::enrich_fields_param => {
1199                                    for field in param_inner.into_inner() {
1200                                        if field.as_rule() == Rule::identifier {
1201                                            fields.push(field.as_str().to_string());
1202                                        }
1203                                    }
1204                                }
1205                                Rule::enrich_cache_ttl_param => {
1206                                    let expr_pair = param_inner
1207                                        .into_inner()
1208                                        .expect_next("cache_ttl expression")?;
1209                                    cache_ttl = Some(parse_expr(expr_pair)?);
1210                                }
1211                                Rule::enrich_timeout_param => {
1212                                    let expr_pair = param_inner
1213                                        .into_inner()
1214                                        .expect_next("timeout expression")?;
1215                                    timeout = Some(parse_expr(expr_pair)?);
1216                                }
1217                                Rule::enrich_fallback_param => {
1218                                    let literal_pair =
1219                                        param_inner.into_inner().expect_next("fallback literal")?;
1220                                    fallback = Some(parse_expr(literal_pair)?);
1221                                }
1222                                _ => {}
1223                            }
1224                        }
1225                    }
1226                }
1227            }
1228            let key = key_expr.ok_or_else(|| ParseError::Located {
1229                line: 0,
1230                column: 0,
1231                position: 0,
1232                message: ".enrich() requires a key: parameter".to_string(),
1233                hint: Some("add key: <expression> to .enrich()".to_string()),
1234            })?;
1235            Ok(StreamOp::Enrich(EnrichSpec {
1236                connector_name,
1237                key_expr: Box::new(key),
1238                fields,
1239                cache_ttl,
1240                timeout,
1241                fallback,
1242            }))
1243        }
1244        Rule::score_op => {
1245            let mut model_path = String::new();
1246            let mut inputs = Vec::new();
1247            let mut outputs = Vec::new();
1248            let mut gpu = false;
1249            let mut batch_size: usize = 1;
1250            let mut device_id: i32 = 0;
1251            for p in pair.into_inner() {
1252                if p.as_rule() == Rule::score_params {
1253                    for param_pair in p.into_inner() {
1254                        if param_pair.as_rule() == Rule::score_param {
1255                            let mut inner = param_pair.into_inner();
1256                            let name = inner.expect_next("score param name")?.as_str();
1257                            let value_pair = inner.expect_next("score param value")?;
1258                            match name {
1259                                "model" => {
1260                                    let raw = value_pair.as_str();
1261                                    model_path = raw.trim_matches('"').to_string();
1262                                }
1263                                "inputs" => {
1264                                    if value_pair.as_rule() == Rule::score_field_list {
1265                                        for field in value_pair.into_inner() {
1266                                            if field.as_rule() == Rule::identifier {
1267                                                inputs.push(field.as_str().to_string());
1268                                            }
1269                                        }
1270                                    }
1271                                }
1272                                "outputs" => {
1273                                    if value_pair.as_rule() == Rule::score_field_list {
1274                                        for field in value_pair.into_inner() {
1275                                            if field.as_rule() == Rule::identifier {
1276                                                outputs.push(field.as_str().to_string());
1277                                            }
1278                                        }
1279                                    }
1280                                }
1281                                "gpu" => {
1282                                    gpu = value_pair.as_str() == "true";
1283                                }
1284                                "batch_size" => {
1285                                    batch_size = value_pair.as_str().parse().unwrap_or(1);
1286                                }
1287                                "device" | "device_id" => {
1288                                    device_id = value_pair.as_str().parse().unwrap_or(0);
1289                                }
1290                                _ => {}
1291                            }
1292                        }
1293                    }
1294                }
1295            }
1296            Ok(StreamOp::Score(ScoreSpec {
1297                model_path,
1298                inputs,
1299                outputs,
1300                gpu,
1301                batch_size,
1302                device_id,
1303            }))
1304        }
1305        _ => Err(ParseError::UnexpectedToken {
1306            position: 0,
1307            expected: "stream operation".to_string(),
1308            found: format!("{:?}", pair.as_rule()),
1309        }),
1310    }
1311}
1312
1313fn parse_order_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<OrderItem> {
1314    let mut inner = pair.into_inner();
1315    let expr = parse_expr(inner.expect_next("order expression")?)?;
1316    let desc = inner.next().map(|p| p.as_str() == "desc").unwrap_or(false);
1317    Ok(OrderItem {
1318        expr,
1319        descending: desc,
1320    })
1321}
1322
1323fn parse_fork_path(pair: pest::iterators::Pair<Rule>) -> ParseResult<ForkPath> {
1324    let mut inner = pair.into_inner();
1325    let name = inner.expect_next("fork path name")?.as_str().to_string();
1326    let mut ops = Vec::new();
1327    for p in inner {
1328        if p.as_rule() == Rule::stream_op {
1329            ops.push(parse_stream_op(p)?);
1330        }
1331    }
1332    Ok(ForkPath { name, ops })
1333}
1334
1335fn parse_followed_by_op(pair: pest::iterators::Pair<Rule>) -> ParseResult<StreamOp> {
1336    let mut inner = pair.into_inner();
1337    let mut match_all = false;
1338
1339    let first = inner.expect_next("event type or match_all")?;
1340    let event_type = if first.as_rule() == Rule::match_all_keyword {
1341        match_all = true;
1342        inner.expect_next("event type")?.as_str().to_string()
1343    } else {
1344        first.as_str().to_string()
1345    };
1346
1347    let mut filter = None;
1348    let mut alias = None;
1349
1350    for p in inner {
1351        match p.as_rule() {
1352            Rule::or_expr => filter = Some(parse_or_expr(p)?),
1353            Rule::filter_expr => filter = Some(parse_filter_expr(p)?),
1354            Rule::identifier => alias = Some(p.as_str().to_string()),
1355            _ => {}
1356        }
1357    }
1358
1359    Ok(StreamOp::FollowedBy(FollowedByClause {
1360        event_type,
1361        filter,
1362        alias,
1363        match_all,
1364    }))
1365}
1366
1367fn parse_select_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<SelectItem> {
1368    let mut inner = pair.into_inner();
1369    let first = inner.expect_next("select field or alias")?;
1370
1371    if let Some(second) = inner.next() {
1372        Ok(SelectItem::Alias(
1373            first.as_str().to_string(),
1374            parse_expr(second)?,
1375        ))
1376    } else {
1377        Ok(SelectItem::Field(first.as_str().to_string()))
1378    }
1379}
1380
1381fn parse_window_args(pair: pest::iterators::Pair<Rule>) -> ParseResult<WindowArgs> {
1382    let raw = pair.as_str().trim();
1383    let is_session = raw.starts_with("session");
1384
1385    let mut inner = pair.into_inner();
1386
1387    if is_session {
1388        // session: <gap_expr>
1389        let gap_expr = parse_expr(inner.expect_next("session gap duration")?)?;
1390        return Ok(WindowArgs {
1391            duration: gap_expr.clone(),
1392            sliding: None,
1393            policy: None,
1394            session_gap: Some(gap_expr),
1395        });
1396    }
1397
1398    let duration = parse_expr(inner.expect_next("window duration")?)?;
1399
1400    let mut sliding = None;
1401    let mut policy = None;
1402
1403    for p in inner {
1404        if p.as_rule() == Rule::expr {
1405            // Need to determine if it's sliding or policy based on context
1406            if sliding.is_none() {
1407                sliding = Some(parse_expr(p)?);
1408            } else {
1409                policy = Some(parse_expr(p)?);
1410            }
1411        }
1412    }
1413
1414    Ok(WindowArgs {
1415        duration,
1416        sliding,
1417        policy,
1418        session_gap: None,
1419    })
1420}
1421
1422fn parse_agg_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<AggItem> {
1423    let mut inner = pair.into_inner();
1424    let alias = inner.expect_next("aggregate alias")?.as_str().to_string();
1425    let expr = parse_expr(inner.expect_next("aggregate expression")?)?;
1426    Ok(AggItem { alias, expr })
1427}
1428
1429fn parse_named_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<NamedArg> {
1430    let mut inner = pair.into_inner();
1431    let name = inner.expect_next("argument name")?.as_str().to_string();
1432    let value = parse_expr(inner.expect_next("argument value")?)?;
1433    Ok(NamedArg { name, value })
1434}
1435
1436fn parse_event_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1437    let mut inner = pair.into_inner();
1438    let name = inner.expect_next("event name")?.as_str().to_string();
1439
1440    let mut extends = None;
1441    let mut fields = Vec::new();
1442
1443    for p in inner {
1444        match p.as_rule() {
1445            Rule::identifier => extends = Some(p.as_str().to_string()),
1446            Rule::field => fields.push(parse_field(p)?),
1447            _ => {}
1448        }
1449    }
1450
1451    Ok(Stmt::EventDecl {
1452        name,
1453        extends,
1454        fields,
1455    })
1456}
1457
1458fn parse_field(pair: pest::iterators::Pair<Rule>) -> ParseResult<Field> {
1459    let mut inner = pair.into_inner();
1460    let name = inner.expect_next("field name")?.as_str().to_string();
1461    let ty = parse_type(inner.expect_next("field type")?)?;
1462    let optional = inner.next().is_some();
1463    Ok(Field { name, ty, optional })
1464}
1465
1466fn parse_type_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1467    let mut inner = pair.into_inner();
1468    let name = inner.expect_next("type name")?.as_str().to_string();
1469    let ty = parse_type(inner.expect_next("type definition")?)?;
1470    Ok(Stmt::TypeDecl { name, ty })
1471}
1472
1473fn parse_type(pair: pest::iterators::Pair<Rule>) -> ParseResult<Type> {
1474    let cloned = pair.clone();
1475    let inner = pair.into_inner().next().unwrap_or(cloned);
1476
1477    match inner.as_rule() {
1478        Rule::primitive_type => match inner.as_str() {
1479            "int" => Ok(Type::Int),
1480            "float" => Ok(Type::Float),
1481            "bool" => Ok(Type::Bool),
1482            "str" => Ok(Type::Str),
1483            "timestamp" => Ok(Type::Timestamp),
1484            "duration" => Ok(Type::Duration),
1485            _ => Ok(Type::Named(inner.as_str().to_string())),
1486        },
1487        Rule::array_type => {
1488            let inner_type = parse_type(inner.into_inner().expect_next("array element type")?)?;
1489            Ok(Type::Array(Box::new(inner_type)))
1490        }
1491        Rule::map_type => {
1492            let mut inner_pairs = inner.into_inner();
1493            let key_type = parse_type(inner_pairs.expect_next("map key type")?)?;
1494            let val_type = parse_type(inner_pairs.expect_next("map value type")?)?;
1495            Ok(Type::Map(Box::new(key_type), Box::new(val_type)))
1496        }
1497        Rule::tuple_type => {
1498            let types: Vec<Type> = inner
1499                .into_inner()
1500                .map(parse_type)
1501                .collect::<ParseResult<Vec<_>>>()?;
1502            Ok(Type::Tuple(types))
1503        }
1504        Rule::stream_type => {
1505            let inner_type = parse_type(inner.into_inner().expect_next("stream element type")?)?;
1506            Ok(Type::Stream(Box::new(inner_type)))
1507        }
1508        Rule::optional_type => {
1509            let inner_type = parse_type(inner.into_inner().expect_next("optional inner type")?)?;
1510            Ok(Type::Optional(Box::new(inner_type)))
1511        }
1512        Rule::named_type | Rule::identifier => Ok(Type::Named(inner.as_str().to_string())),
1513        _ => Ok(Type::Named(inner.as_str().to_string())),
1514    }
1515}
1516
1517fn parse_var_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1518    let mut inner = pair.into_inner();
1519    let keyword = inner.expect_next("var_keyword")?.as_str();
1520    let mutable = keyword == "var";
1521    let name = inner.expect_next("variable name")?.as_str().to_string();
1522
1523    let mut ty = None;
1524    let mut value = Expr::Null;
1525
1526    for p in inner {
1527        match p.as_rule() {
1528            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1529            _ => value = parse_expr(p)?,
1530        }
1531    }
1532
1533    Ok(Stmt::VarDecl {
1534        mutable,
1535        name,
1536        ty,
1537        value,
1538    })
1539}
1540
1541fn parse_const_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1542    let mut inner = pair.into_inner();
1543    let name = inner.expect_next("constant name")?.as_str().to_string();
1544
1545    let mut ty = None;
1546    let mut value = Expr::Null;
1547
1548    for p in inner {
1549        match p.as_rule() {
1550            Rule::type_annotation => ty = Some(parse_type(p.into_inner().expect_next("type")?)?),
1551            _ => value = parse_expr(p)?,
1552        }
1553    }
1554
1555    Ok(Stmt::ConstDecl { name, ty, value })
1556}
1557
1558fn parse_fn_decl(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1559    let mut inner = pair.into_inner();
1560    let name = inner.expect_next("function name")?.as_str().to_string();
1561
1562    let mut params = Vec::new();
1563    let mut ret = None;
1564    let mut body = Vec::new();
1565
1566    for p in inner {
1567        match p.as_rule() {
1568            Rule::param_list => {
1569                for param in p.into_inner() {
1570                    params.push(parse_param(param)?);
1571                }
1572            }
1573            Rule::type_expr => ret = Some(parse_type(p)?),
1574            Rule::block => body = parse_block(p)?,
1575            Rule::statement => body.push(parse_statement(p)?),
1576            _ => {}
1577        }
1578    }
1579
1580    Ok(Stmt::FnDecl {
1581        name,
1582        params,
1583        ret,
1584        body,
1585    })
1586}
1587
1588fn parse_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Vec<Spanned<Stmt>>> {
1589    let mut statements = Vec::new();
1590    for p in pair.into_inner() {
1591        if p.as_rule() == Rule::statement {
1592            statements.push(parse_statement(p)?);
1593        }
1594    }
1595    Ok(statements)
1596}
1597
1598fn parse_param(pair: pest::iterators::Pair<Rule>) -> ParseResult<Param> {
1599    let mut inner = pair.into_inner();
1600    let name = inner.expect_next("parameter name")?.as_str().to_string();
1601    let ty = parse_type(inner.expect_next("parameter type")?)?;
1602    Ok(Param { name, ty })
1603}
1604
1605fn parse_config_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1606    let mut inner = pair.into_inner();
1607    let first = inner.expect_next("config name or item")?;
1608
1609    // Check if first token is identifier (new syntax) or config_item (old syntax)
1610    let (name, items_start) = if first.as_rule() == Rule::identifier {
1611        (first.as_str().to_string(), None)
1612    } else {
1613        // Old syntax: config: with indentation - use "default" as name
1614        ("default".to_string(), Some(first))
1615    };
1616
1617    let mut items = Vec::new();
1618
1619    // If we have a config_item from old syntax, parse it first
1620    if let Some(first_item) = items_start {
1621        if first_item.as_rule() == Rule::config_item {
1622            items.push(parse_config_item(first_item)?);
1623        }
1624    }
1625
1626    for p in inner {
1627        if p.as_rule() == Rule::config_item {
1628            items.push(parse_config_item(p)?);
1629        }
1630    }
1631    Ok(Stmt::Config { name, items })
1632}
1633
1634fn parse_config_item(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigItem> {
1635    let mut inner = pair.into_inner();
1636    let key = inner.expect_next("config key")?.as_str().to_string();
1637    let value = parse_config_value(inner.expect_next("config value")?)?;
1638    Ok(ConfigItem::Value(key, value))
1639}
1640
1641fn parse_config_value(pair: pest::iterators::Pair<Rule>) -> ParseResult<ConfigValue> {
1642    let cloned = pair.clone();
1643    let inner = pair.into_inner().next().unwrap_or(cloned);
1644
1645    match inner.as_rule() {
1646        Rule::config_array => {
1647            let values: Vec<ConfigValue> = inner
1648                .into_inner()
1649                .map(parse_config_value)
1650                .collect::<ParseResult<Vec<_>>>()?;
1651            Ok(ConfigValue::Array(values))
1652        }
1653        Rule::integer => Ok(ConfigValue::Int(inner.as_str().parse().unwrap_or(0))),
1654        Rule::float => Ok(ConfigValue::Float(inner.as_str().parse().unwrap_or(0.0))),
1655        Rule::string => {
1656            let s = inner.as_str();
1657            Ok(ConfigValue::Str(s[1..s.len() - 1].to_string()))
1658        }
1659        Rule::duration => Ok(ConfigValue::Duration(
1660            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
1661        )),
1662        Rule::boolean => Ok(ConfigValue::Bool(inner.as_str() == "true")),
1663        Rule::identifier => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1664        _ => Ok(ConfigValue::Ident(inner.as_str().to_string())),
1665    }
1666}
1667
1668fn parse_import_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1669    let mut inner = pair.into_inner();
1670    let path_pair = inner.expect_next("import path")?;
1671    let path = path_pair.as_str();
1672    let path = path[1..path.len() - 1].to_string();
1673    let alias = inner.next().map(|p| p.as_str().to_string());
1674    Ok(Stmt::Import { path, alias })
1675}
1676
1677fn parse_if_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1678    let mut inner = pair.into_inner();
1679    let cond = parse_expr(inner.expect_next("if condition")?)?;
1680
1681    let mut then_branch = Vec::new();
1682    let mut elif_branches = Vec::new();
1683    let mut else_branch = None;
1684
1685    for p in inner {
1686        match p.as_rule() {
1687            Rule::block => then_branch = parse_block(p)?,
1688            Rule::statement => then_branch.push(parse_statement(p)?),
1689            Rule::elif_clause => {
1690                let mut elif_inner = p.into_inner();
1691                let elif_cond = parse_expr(elif_inner.expect_next("elif condition")?)?;
1692                let mut elif_body = Vec::new();
1693                for ep in elif_inner {
1694                    match ep.as_rule() {
1695                        Rule::block => elif_body = parse_block(ep)?,
1696                        Rule::statement => elif_body.push(parse_statement(ep)?),
1697                        _ => {}
1698                    }
1699                }
1700                elif_branches.push((elif_cond, elif_body));
1701            }
1702            Rule::else_clause => {
1703                let mut else_body = Vec::new();
1704                for ep in p.into_inner() {
1705                    match ep.as_rule() {
1706                        Rule::block => else_body = parse_block(ep)?,
1707                        Rule::statement => else_body.push(parse_statement(ep)?),
1708                        _ => {}
1709                    }
1710                }
1711                else_branch = Some(else_body);
1712            }
1713            _ => {}
1714        }
1715    }
1716
1717    Ok(Stmt::If {
1718        cond,
1719        then_branch,
1720        elif_branches,
1721        else_branch,
1722    })
1723}
1724
1725fn parse_for_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1726    let mut inner = pair.into_inner();
1727    let var = inner.expect_next("loop variable")?.as_str().to_string();
1728    let iter = parse_expr(inner.expect_next("iterable expression")?)?;
1729    let mut body = Vec::new();
1730    for p in inner {
1731        match p.as_rule() {
1732            Rule::block => body = parse_block(p)?,
1733            Rule::statement => body.push(parse_statement(p)?),
1734            _ => {}
1735        }
1736    }
1737    Ok(Stmt::For { var, iter, body })
1738}
1739
1740fn parse_while_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1741    let mut inner = pair.into_inner();
1742    let cond = parse_expr(inner.expect_next("while condition")?)?;
1743    let mut body = Vec::new();
1744    for p in inner {
1745        match p.as_rule() {
1746            Rule::block => body = parse_block(p)?,
1747            Rule::statement => body.push(parse_statement(p)?),
1748            _ => {}
1749        }
1750    }
1751    Ok(Stmt::While { cond, body })
1752}
1753
1754fn parse_return_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1755    let expr = pair.into_inner().next().map(parse_expr).transpose()?;
1756    Ok(Stmt::Return(expr))
1757}
1758
1759fn parse_emit_stmt(pair: pest::iterators::Pair<Rule>) -> ParseResult<Stmt> {
1760    let mut inner = pair.into_inner();
1761    let event_type = inner.expect_next("event type name")?.as_str().to_string();
1762    let mut fields = Vec::new();
1763    for p in inner {
1764        if p.as_rule() == Rule::named_arg_list {
1765            for arg in p.into_inner() {
1766                fields.push(parse_named_arg(arg)?);
1767            }
1768        }
1769    }
1770    Ok(Stmt::Emit { event_type, fields })
1771}
1772
1773// ============================================================================
1774// Expression Parsing
1775// ============================================================================
1776
1777fn parse_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1778    let inner = pair.into_inner().next();
1779
1780    match inner {
1781        Some(p) => parse_expr_inner(p),
1782        None => Ok(Expr::Null),
1783    }
1784}
1785
1786fn parse_expr_inner(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1787    match pair.as_rule() {
1788        Rule::expr => parse_expr(pair),
1789        Rule::lambda_expr => parse_lambda_expr(pair),
1790        Rule::range_expr => parse_range_expr(pair),
1791        Rule::or_expr => parse_or_expr(pair),
1792        Rule::and_expr => parse_and_expr(pair),
1793        Rule::not_expr => parse_not_expr(pair),
1794        Rule::comparison_expr => parse_comparison_expr(pair),
1795        Rule::bitwise_or_expr => parse_bitwise_or_expr(pair),
1796        Rule::bitwise_xor_expr => parse_bitwise_xor_expr(pair),
1797        Rule::bitwise_and_expr => parse_bitwise_and_expr(pair),
1798        Rule::shift_expr => parse_shift_expr(pair),
1799        Rule::additive_expr => parse_additive_expr(pair),
1800        Rule::multiplicative_expr => parse_multiplicative_expr(pair),
1801        Rule::power_expr => parse_power_expr(pair),
1802        Rule::unary_expr => parse_unary_expr(pair),
1803        Rule::postfix_expr => parse_postfix_expr(pair),
1804        Rule::primary_expr => parse_primary_expr(pair),
1805        Rule::literal => parse_literal(pair),
1806        Rule::identifier => Ok(Expr::Ident(pair.as_str().to_string())),
1807        Rule::if_expr => parse_if_expr(pair),
1808        _ => Ok(Expr::Ident(pair.as_str().to_string())),
1809    }
1810}
1811
1812fn parse_lambda_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1813    let mut inner = pair.into_inner();
1814    let mut params = Vec::new();
1815
1816    // Parse parameters
1817    let first = inner.expect_next("lambda parameters")?;
1818    match first.as_rule() {
1819        Rule::identifier_list => {
1820            for p in first.into_inner() {
1821                params.push(p.as_str().to_string());
1822            }
1823        }
1824        Rule::identifier => {
1825            params.push(first.as_str().to_string());
1826        }
1827        _ => {}
1828    }
1829
1830    // Parse body - could be expression or block
1831    let body_pair = inner.expect_next("lambda body")?;
1832    let body = match body_pair.as_rule() {
1833        Rule::lambda_block => parse_lambda_block(body_pair)?,
1834        _ => parse_expr_inner(body_pair)?,
1835    };
1836
1837    Ok(Expr::Lambda {
1838        params,
1839        body: Box::new(body),
1840    })
1841}
1842
1843fn parse_lambda_block(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1844    let mut stmts = Vec::new();
1845    let mut final_expr = None;
1846
1847    for p in pair.into_inner() {
1848        match p.as_rule() {
1849            Rule::statement => {
1850                // Check if it's a var_decl to extract for Block
1851                let stmt = parse_statement(p)?;
1852                match &stmt.node {
1853                    Stmt::VarDecl {
1854                        mutable,
1855                        name,
1856                        ty,
1857                        value,
1858                    } => {
1859                        stmts.push((name.clone(), ty.clone(), value.clone(), *mutable));
1860                    }
1861                    Stmt::Expr(e) => {
1862                        // Last expression becomes result
1863                        final_expr = Some(e.clone());
1864                    }
1865                    _ => {
1866                        // Other statements - treat as expression if possible
1867                    }
1868                }
1869            }
1870            _ => {
1871                // Expression at end of block
1872                final_expr = Some(parse_expr_inner(p)?);
1873            }
1874        }
1875    }
1876
1877    // If we have variable declarations, wrap in a Block expression
1878    if stmts.is_empty() {
1879        Ok(final_expr.unwrap_or(Expr::Null))
1880    } else {
1881        Ok(Expr::Block {
1882            stmts,
1883            result: Box::new(final_expr.unwrap_or(Expr::Null)),
1884        })
1885    }
1886}
1887
1888fn parse_pattern_expr_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1889    // Convert pattern_or_expr to an Expr representation
1890    // pattern_or_expr = pattern_and_expr ~ ("or" ~ pattern_and_expr)*
1891    let mut inner = pair.into_inner();
1892    let mut left = parse_pattern_and_as_expr(inner.expect_next("pattern expression")?)?;
1893
1894    for right_pair in inner {
1895        let right = parse_pattern_and_as_expr(right_pair)?;
1896        left = Expr::Binary {
1897            op: BinOp::Or,
1898            left: Box::new(left),
1899            right: Box::new(right),
1900        };
1901    }
1902    Ok(left)
1903}
1904
1905fn parse_pattern_and_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1906    let mut inner = pair.into_inner();
1907    let mut left = parse_pattern_xor_as_expr(inner.expect_next("and expression")?)?;
1908
1909    for right_pair in inner {
1910        let right = parse_pattern_xor_as_expr(right_pair)?;
1911        left = Expr::Binary {
1912            op: BinOp::And,
1913            left: Box::new(left),
1914            right: Box::new(right),
1915        };
1916    }
1917    Ok(left)
1918}
1919
1920fn parse_pattern_xor_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1921    let mut inner = pair.into_inner();
1922    let mut left = parse_pattern_unary_as_expr(inner.expect_next("xor expression")?)?;
1923
1924    for right_pair in inner {
1925        let right = parse_pattern_unary_as_expr(right_pair)?;
1926        left = Expr::Binary {
1927            op: BinOp::Xor,
1928            left: Box::new(left),
1929            right: Box::new(right),
1930        };
1931    }
1932    Ok(left)
1933}
1934
1935fn parse_pattern_unary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1936    let mut inner = pair.into_inner();
1937    let first = inner.expect_next("unary expression or operand")?;
1938
1939    if first.as_str() == "not" {
1940        let expr = parse_pattern_primary_as_expr(inner.expect_next("pattern expression")?)?;
1941        Ok(Expr::Unary {
1942            op: UnaryOp::Not,
1943            expr: Box::new(expr),
1944        })
1945    } else {
1946        parse_pattern_primary_as_expr(first)
1947    }
1948}
1949
1950fn parse_pattern_primary_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1951    let inner = pair
1952        .into_inner()
1953        .expect_next("pattern primary expression")?;
1954
1955    match inner.as_rule() {
1956        Rule::pattern_or_expr => parse_pattern_expr_as_expr(inner),
1957        Rule::pattern_sequence => parse_pattern_sequence_as_expr(inner),
1958        _ => Ok(Expr::Ident(inner.as_str().to_string())),
1959    }
1960}
1961
1962fn parse_pattern_sequence_as_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1963    // pattern_sequence = identifier ~ ("->" ~ identifier)*
1964    // Convert to a chain of FollowedBy binary operations
1965    let mut inner = pair.into_inner();
1966    let mut left = Expr::Ident(inner.expect_next("sequence start")?.as_str().to_string());
1967
1968    for right_pair in inner {
1969        let right = Expr::Ident(right_pair.as_str().to_string());
1970        left = Expr::Binary {
1971            op: BinOp::FollowedBy,
1972            left: Box::new(left),
1973            right: Box::new(right),
1974        };
1975    }
1976    Ok(left)
1977}
1978
1979fn parse_filter_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1980    let inner = pair.into_inner().expect_next("filter expression")?;
1981    parse_filter_or_expr(inner)
1982}
1983
1984fn parse_filter_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
1985    let mut inner = pair.into_inner();
1986    let mut left = parse_filter_and_expr(inner.expect_next("or expression operand")?)?;
1987
1988    for right_pair in inner {
1989        let right = parse_filter_and_expr(right_pair)?;
1990        left = Expr::Binary {
1991            op: BinOp::Or,
1992            left: Box::new(left),
1993            right: Box::new(right),
1994        };
1995    }
1996    Ok(left)
1997}
1998
1999fn parse_filter_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2000    let mut inner = pair.into_inner();
2001    let mut left = parse_filter_not_expr(inner.expect_next("and expression operand")?)?;
2002
2003    for right_pair in inner {
2004        let right = parse_filter_not_expr(right_pair)?;
2005        left = Expr::Binary {
2006            op: BinOp::And,
2007            left: Box::new(left),
2008            right: Box::new(right),
2009        };
2010    }
2011    Ok(left)
2012}
2013
2014fn parse_filter_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2015    let mut inner = pair.into_inner();
2016    let first = inner.expect_next("not or expression")?;
2017
2018    if first.as_str() == "not" {
2019        let expr = parse_filter_comparison_expr(inner.expect_next("expression after not")?)?;
2020        Ok(Expr::Unary {
2021            op: UnaryOp::Not,
2022            expr: Box::new(expr),
2023        })
2024    } else {
2025        parse_filter_comparison_expr(first)
2026    }
2027}
2028
2029fn parse_filter_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2030    let mut inner = pair.into_inner();
2031    let left = parse_filter_additive_expr(inner.expect_next("comparison left operand")?)?;
2032
2033    if let Some(op_pair) = inner.next() {
2034        let op = match op_pair.as_str() {
2035            "==" => BinOp::Eq,
2036            "!=" => BinOp::NotEq,
2037            "<" => BinOp::Lt,
2038            "<=" => BinOp::Le,
2039            ">" => BinOp::Gt,
2040            ">=" => BinOp::Ge,
2041            "in" => BinOp::In,
2042            "is" => BinOp::Is,
2043            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2044            _ => BinOp::Eq,
2045        };
2046        let right = parse_filter_additive_expr(inner.expect_next("comparison right operand")?)?;
2047        Ok(Expr::Binary {
2048            op,
2049            left: Box::new(left),
2050            right: Box::new(right),
2051        })
2052    } else {
2053        Ok(left)
2054    }
2055}
2056
2057fn parse_filter_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2058    let mut inner = pair.into_inner();
2059    let mut left =
2060        parse_filter_multiplicative_expr(inner.expect_next("additive expression operand")?)?;
2061
2062    while let Some(op_pair) = inner.next() {
2063        let op = if op_pair.as_str() == "-" {
2064            BinOp::Sub
2065        } else {
2066            BinOp::Add
2067        };
2068        if let Some(right_pair) = inner.next() {
2069            let right = parse_filter_multiplicative_expr(right_pair)?;
2070            left = Expr::Binary {
2071                op,
2072                left: Box::new(left),
2073                right: Box::new(right),
2074            };
2075        }
2076    }
2077    Ok(left)
2078}
2079
2080fn parse_filter_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2081    let mut inner = pair.into_inner();
2082    let mut left =
2083        parse_filter_unary_expr(inner.expect_next("multiplicative expression operand")?)?;
2084
2085    while let Some(op_pair) = inner.next() {
2086        let op = match op_pair.as_str() {
2087            "*" => BinOp::Mul,
2088            "/" => BinOp::Div,
2089            "%" => BinOp::Mod,
2090            _ => BinOp::Mul,
2091        };
2092        if let Some(right_pair) = inner.next() {
2093            let right = parse_filter_unary_expr(right_pair)?;
2094            left = Expr::Binary {
2095                op,
2096                left: Box::new(left),
2097                right: Box::new(right),
2098            };
2099        }
2100    }
2101    Ok(left)
2102}
2103
2104fn parse_filter_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2105    let mut inner = pair.into_inner();
2106    let first = inner.expect_next("unary operator or expression")?;
2107
2108    // Check if first is a unary operator or the expression itself
2109    if first.as_rule() == Rule::filter_unary_op {
2110        let op_str = first.as_str();
2111        let expr =
2112            parse_filter_postfix_expr(inner.expect_next("expression after unary operator")?)?;
2113        let op = match op_str {
2114            "-" => UnaryOp::Neg,
2115            "~" => UnaryOp::BitNot,
2116            _ => unreachable!("Grammar only allows - or ~"),
2117        };
2118        Ok(Expr::Unary {
2119            op,
2120            expr: Box::new(expr),
2121        })
2122    } else {
2123        parse_filter_postfix_expr(first)
2124    }
2125}
2126
2127fn parse_filter_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2128    let mut inner = pair.into_inner();
2129    let mut expr = parse_filter_primary_expr(inner.expect_next("postfix expression base")?)?;
2130
2131    for suffix in inner {
2132        expr = parse_filter_postfix_suffix(expr, suffix)?;
2133    }
2134    Ok(expr)
2135}
2136
2137fn parse_filter_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2138    let mut inner = pair.into_inner();
2139
2140    if let Some(first) = inner.next() {
2141        match first.as_rule() {
2142            Rule::identifier => {
2143                // Member access: .identifier
2144                Ok(Expr::Member {
2145                    expr: Box::new(expr),
2146                    member: first.as_str().to_string(),
2147                })
2148            }
2149            Rule::optional_member_access => {
2150                let member = first
2151                    .into_inner()
2152                    .expect_next("member name")?
2153                    .as_str()
2154                    .to_string();
2155                Ok(Expr::OptionalMember {
2156                    expr: Box::new(expr),
2157                    member,
2158                })
2159            }
2160            Rule::index_access => {
2161                let index = parse_expr(first.into_inner().expect_next("index expression")?)?;
2162                Ok(Expr::Index {
2163                    expr: Box::new(expr),
2164                    index: Box::new(index),
2165                })
2166            }
2167            Rule::call_args => {
2168                let args = first
2169                    .into_inner()
2170                    .filter(|p| p.as_rule() == Rule::arg_list)
2171                    .flat_map(|p| p.into_inner())
2172                    .map(parse_arg)
2173                    .collect::<ParseResult<Vec<_>>>()?;
2174                Ok(Expr::Call {
2175                    func: Box::new(expr),
2176                    args,
2177                })
2178            }
2179            _ => Ok(expr),
2180        }
2181    } else {
2182        Ok(expr)
2183    }
2184}
2185
2186fn parse_filter_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2187    let inner = pair.into_inner().expect_next("filter primary expression")?;
2188
2189    match inner.as_rule() {
2190        Rule::literal => parse_literal(inner),
2191        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2192        Rule::filter_expr => parse_filter_expr(inner),
2193        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2194    }
2195}
2196
2197fn parse_range_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2198    let mut inner = pair.into_inner();
2199    let left = parse_expr_inner(inner.expect_next("range start")?)?;
2200
2201    if let Some(op_pair) = inner.next() {
2202        let inclusive = op_pair.as_str() == "..=";
2203        let right = parse_expr_inner(inner.expect_next("range end")?)?;
2204        Ok(Expr::Range {
2205            start: Box::new(left),
2206            end: Box::new(right),
2207            inclusive,
2208        })
2209    } else {
2210        Ok(left)
2211    }
2212}
2213
2214fn parse_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2215    let mut inner = pair.into_inner();
2216    let mut left = parse_expr_inner(inner.expect_next("or expression operand")?)?;
2217
2218    for right_pair in inner {
2219        let right = parse_expr_inner(right_pair)?;
2220        left = Expr::Binary {
2221            op: BinOp::Or,
2222            left: Box::new(left),
2223            right: Box::new(right),
2224        };
2225    }
2226
2227    Ok(left)
2228}
2229
2230fn parse_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2231    let mut inner = pair.into_inner();
2232    let mut left = parse_expr_inner(inner.expect_next("and expression operand")?)?;
2233
2234    for right_pair in inner {
2235        let right = parse_expr_inner(right_pair)?;
2236        left = Expr::Binary {
2237            op: BinOp::And,
2238            left: Box::new(left),
2239            right: Box::new(right),
2240        };
2241    }
2242
2243    Ok(left)
2244}
2245
2246fn parse_not_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2247    let mut inner = pair.into_inner();
2248    let first = inner.expect_next("not keyword or expression")?;
2249
2250    if first.as_str() == "not" {
2251        let expr = parse_expr_inner(inner.expect_next("expression after not")?)?;
2252        Ok(Expr::Unary {
2253            op: UnaryOp::Not,
2254            expr: Box::new(expr),
2255        })
2256    } else {
2257        parse_expr_inner(first)
2258    }
2259}
2260
2261fn parse_comparison_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2262    let mut inner = pair.into_inner();
2263    let left = parse_expr_inner(inner.expect_next("comparison left operand")?)?;
2264
2265    if let Some(op_pair) = inner.next() {
2266        let op_str = op_pair.as_str();
2267        let op = match op_str {
2268            "==" => BinOp::Eq,
2269            "!=" => BinOp::NotEq,
2270            "<" => BinOp::Lt,
2271            "<=" => BinOp::Le,
2272            ">" => BinOp::Gt,
2273            ">=" => BinOp::Ge,
2274            "in" => BinOp::In,
2275            "is" => BinOp::Is,
2276            s if s.contains("not") && s.contains("in") => BinOp::NotIn,
2277            _ => BinOp::Eq,
2278        };
2279        let right = parse_expr_inner(inner.expect_next("comparison right operand")?)?;
2280        Ok(Expr::Binary {
2281            op,
2282            left: Box::new(left),
2283            right: Box::new(right),
2284        })
2285    } else {
2286        Ok(left)
2287    }
2288}
2289
2290fn parse_bitwise_or_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2291    parse_binary_chain(pair, BinOp::BitOr)
2292}
2293
2294fn parse_bitwise_xor_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2295    parse_binary_chain(pair, BinOp::BitXor)
2296}
2297
2298fn parse_bitwise_and_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2299    parse_binary_chain(pair, BinOp::BitAnd)
2300}
2301
2302fn parse_shift_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2303    let mut inner = pair.into_inner();
2304    let mut left = parse_expr_inner(inner.expect_next("shift expression operand")?)?;
2305
2306    while let Some(op_or_expr) = inner.next() {
2307        let op = match op_or_expr.as_str() {
2308            "<<" => BinOp::Shl,
2309            ">>" => BinOp::Shr,
2310            _ => {
2311                let right = parse_expr_inner(op_or_expr)?;
2312                left = Expr::Binary {
2313                    op: BinOp::Shl,
2314                    left: Box::new(left),
2315                    right: Box::new(right),
2316                };
2317                continue;
2318            }
2319        };
2320        if let Some(right_pair) = inner.next() {
2321            let right = parse_expr_inner(right_pair)?;
2322            left = Expr::Binary {
2323                op,
2324                left: Box::new(left),
2325                right: Box::new(right),
2326            };
2327        }
2328    }
2329
2330    Ok(left)
2331}
2332
2333fn parse_additive_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2334    let mut inner = pair.into_inner();
2335    let mut left = parse_expr_inner(inner.expect_next("additive expression operand")?)?;
2336
2337    while let Some(op_pair) = inner.next() {
2338        let op_text = op_pair.as_str();
2339        let op = if op_text == "-" {
2340            BinOp::Sub
2341        } else {
2342            BinOp::Add
2343        };
2344
2345        if let Some(right_pair) = inner.next() {
2346            let right = parse_expr_inner(right_pair)?;
2347            left = Expr::Binary {
2348                op,
2349                left: Box::new(left),
2350                right: Box::new(right),
2351            };
2352        }
2353    }
2354
2355    Ok(left)
2356}
2357
2358fn parse_multiplicative_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2359    let mut inner = pair.into_inner();
2360    let mut left = parse_expr_inner(inner.expect_next("multiplicative expression operand")?)?;
2361
2362    while let Some(op_pair) = inner.next() {
2363        let op_text = op_pair.as_str();
2364        let op = match op_text {
2365            "*" => BinOp::Mul,
2366            "/" => BinOp::Div,
2367            "%" => BinOp::Mod,
2368            _ => BinOp::Mul,
2369        };
2370
2371        if let Some(right_pair) = inner.next() {
2372            let right = parse_expr_inner(right_pair)?;
2373            left = Expr::Binary {
2374                op,
2375                left: Box::new(left),
2376                right: Box::new(right),
2377            };
2378        }
2379    }
2380
2381    Ok(left)
2382}
2383
2384fn parse_power_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2385    let mut inner = pair.into_inner();
2386    let base = parse_expr_inner(inner.expect_next("power expression base")?)?;
2387
2388    if let Some(exp_pair) = inner.next() {
2389        let exp = parse_expr_inner(exp_pair)?;
2390        Ok(Expr::Binary {
2391            op: BinOp::Pow,
2392            left: Box::new(base),
2393            right: Box::new(exp),
2394        })
2395    } else {
2396        Ok(base)
2397    }
2398}
2399
2400fn parse_unary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2401    let mut inner = pair.into_inner();
2402    let first = inner.expect_next("unary operator or expression")?;
2403
2404    // Check if first is a unary operator or the expression itself
2405    match first.as_rule() {
2406        Rule::unary_op => {
2407            let op_str = first.as_str();
2408            let expr = parse_expr_inner(inner.expect_next("expression after unary operator")?)?;
2409            let op = match op_str {
2410                "-" => UnaryOp::Neg,
2411                "~" => UnaryOp::BitNot,
2412                _ => unreachable!("Grammar only allows - or ~"),
2413            };
2414            Ok(Expr::Unary {
2415                op,
2416                expr: Box::new(expr),
2417            })
2418        }
2419        _ => parse_expr_inner(first),
2420    }
2421}
2422
2423fn parse_postfix_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2424    let mut inner = pair.into_inner();
2425    let mut expr = parse_expr_inner(inner.expect_next("postfix expression base")?)?;
2426
2427    for suffix in inner {
2428        expr = parse_postfix_suffix(expr, suffix)?;
2429    }
2430
2431    Ok(expr)
2432}
2433
2434fn parse_postfix_suffix(expr: Expr, pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2435    let inner = pair.into_inner().expect_next("postfix suffix")?;
2436
2437    match inner.as_rule() {
2438        Rule::member_access => {
2439            let member = inner
2440                .into_inner()
2441                .expect_next("member name")?
2442                .as_str()
2443                .to_string();
2444            Ok(Expr::Member {
2445                expr: Box::new(expr),
2446                member,
2447            })
2448        }
2449        Rule::optional_member_access => {
2450            let member = inner
2451                .into_inner()
2452                .expect_next("member name")?
2453                .as_str()
2454                .to_string();
2455            Ok(Expr::OptionalMember {
2456                expr: Box::new(expr),
2457                member,
2458            })
2459        }
2460        Rule::slice_access => {
2461            // Parse slice: [start:end], [:end], [start:], [:]
2462            let slice_range = inner.into_inner().expect_next("slice range")?;
2463            let slice_inner = slice_range.into_inner();
2464
2465            let mut start = None;
2466            let mut end = None;
2467
2468            for p in slice_inner {
2469                match p.as_rule() {
2470                    Rule::slice_start => {
2471                        start = Some(Box::new(parse_expr_inner(
2472                            p.into_inner().expect_next("slice start expression")?,
2473                        )?));
2474                    }
2475                    Rule::slice_end => {
2476                        end = Some(Box::new(parse_expr_inner(
2477                            p.into_inner().expect_next("slice end expression")?,
2478                        )?));
2479                    }
2480                    _ => {}
2481                }
2482            }
2483
2484            Ok(Expr::Slice {
2485                expr: Box::new(expr),
2486                start,
2487                end,
2488            })
2489        }
2490        Rule::index_access => {
2491            let index = parse_expr(inner.into_inner().expect_next("index expression")?)?;
2492            Ok(Expr::Index {
2493                expr: Box::new(expr),
2494                index: Box::new(index),
2495            })
2496        }
2497        Rule::call_args => {
2498            let mut args = Vec::new();
2499            for p in inner.into_inner() {
2500                if p.as_rule() == Rule::arg_list {
2501                    for arg in p.into_inner() {
2502                        args.push(parse_arg(arg)?);
2503                    }
2504                }
2505            }
2506            Ok(Expr::Call {
2507                func: Box::new(expr),
2508                args,
2509            })
2510        }
2511        _ => Ok(expr),
2512    }
2513}
2514
2515fn parse_arg(pair: pest::iterators::Pair<Rule>) -> ParseResult<Arg> {
2516    let mut inner = pair.into_inner();
2517    let first = inner.expect_next("argument")?;
2518
2519    if let Some(second) = inner.next() {
2520        Ok(Arg::Named(
2521            first.as_str().to_string(),
2522            parse_expr_inner(second)?,
2523        ))
2524    } else {
2525        Ok(Arg::Positional(parse_expr_inner(first)?))
2526    }
2527}
2528
2529fn parse_primary_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2530    let inner = pair.into_inner().expect_next("primary expression")?;
2531
2532    match inner.as_rule() {
2533        Rule::if_expr => parse_if_expr(inner),
2534        Rule::literal => parse_literal(inner),
2535        Rule::identifier => Ok(Expr::Ident(inner.as_str().to_string())),
2536        Rule::array_literal => parse_array_literal(inner),
2537        Rule::map_literal => parse_map_literal(inner),
2538        Rule::expr => parse_expr(inner),
2539        _ => Ok(Expr::Ident(inner.as_str().to_string())),
2540    }
2541}
2542
2543fn parse_if_expr(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2544    let mut inner = pair.into_inner();
2545    let cond = parse_expr_inner(inner.expect_next("if condition")?)?;
2546    let then_branch = parse_expr_inner(inner.expect_next("then branch")?)?;
2547    let else_branch = parse_expr_inner(inner.expect_next("else branch")?)?;
2548
2549    Ok(Expr::If {
2550        cond: Box::new(cond),
2551        then_branch: Box::new(then_branch),
2552        else_branch: Box::new(else_branch),
2553    })
2554}
2555
2556fn parse_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2557    let inner = pair.into_inner().expect_next("literal value")?;
2558
2559    match inner.as_rule() {
2560        Rule::integer => inner
2561            .as_str()
2562            .parse::<i64>()
2563            .map(Expr::Int)
2564            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2565        Rule::float => inner
2566            .as_str()
2567            .parse::<f64>()
2568            .map(Expr::Float)
2569            .map_err(|e| ParseError::InvalidNumber(format!("'{}': {}", inner.as_str(), e))),
2570        Rule::string => {
2571            let s = inner.as_str();
2572            Ok(Expr::Str(s[1..s.len() - 1].to_string()))
2573        }
2574        Rule::duration => Ok(Expr::Duration(
2575            parse_duration(inner.as_str()).map_err(ParseError::InvalidDuration)?,
2576        )),
2577        Rule::timestamp => Ok(Expr::Timestamp(parse_timestamp(inner.as_str()))),
2578        Rule::boolean => Ok(Expr::Bool(inner.as_str() == "true")),
2579        Rule::null => Ok(Expr::Null),
2580        _ => Ok(Expr::Null),
2581    }
2582}
2583
2584fn parse_array_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2585    let mut items = Vec::new();
2586    for p in pair.into_inner() {
2587        if p.as_rule() == Rule::expr_list {
2588            for expr in p.into_inner() {
2589                items.push(parse_expr(expr)?);
2590            }
2591        }
2592    }
2593    Ok(Expr::Array(items))
2594}
2595
2596fn parse_map_literal(pair: pest::iterators::Pair<Rule>) -> ParseResult<Expr> {
2597    let mut entries = Vec::new();
2598    for p in pair.into_inner() {
2599        if p.as_rule() == Rule::map_entry_list {
2600            for entry in p.into_inner() {
2601                let mut inner = entry.into_inner();
2602                let key = inner.expect_next("map key")?.as_str().to_string();
2603                let key = if key.starts_with('"') {
2604                    key[1..key.len() - 1].to_string()
2605                } else {
2606                    key
2607                };
2608                let value = parse_expr(inner.expect_next("map value")?)?;
2609                entries.push((key, value));
2610            }
2611        }
2612    }
2613    Ok(Expr::Map(entries))
2614}
2615
2616fn parse_binary_chain(pair: pest::iterators::Pair<Rule>, op: BinOp) -> ParseResult<Expr> {
2617    let mut inner = pair.into_inner();
2618    let mut left = parse_expr_inner(inner.expect_next("binary chain operand")?)?;
2619
2620    for right_pair in inner {
2621        let right = parse_expr_inner(right_pair)?;
2622        left = Expr::Binary {
2623            op,
2624            left: Box::new(left),
2625            right: Box::new(right),
2626        };
2627    }
2628
2629    Ok(left)
2630}
2631
2632#[cfg(test)]
2633mod tests {
2634    use super::*;
2635
2636    #[test]
2637    fn test_parse_simple_stream() {
2638        let result = parse("stream output = input");
2639        assert!(result.is_ok(), "Failed: {:?}", result.err());
2640    }
2641
2642    #[test]
2643    fn test_parse_stream_with_filter() {
2644        let result = parse("stream output = input.where(value > 100)");
2645        assert!(result.is_ok(), "Failed: {:?}", result.err());
2646    }
2647
2648    #[test]
2649    fn test_parse_stream_with_map() {
2650        let result = parse("stream output = input.map(x * 2)");
2651        assert!(result.is_ok(), "Failed: {:?}", result.err());
2652    }
2653
2654    #[test]
2655    fn test_parse_event_declaration() {
2656        let result = parse("event SensorReading:\n  sensor_id: str\n  value: float");
2657        assert!(result.is_ok(), "Failed: {:?}", result.err());
2658    }
2659
2660    #[test]
2661    fn test_parse_variable() {
2662        let result = parse("let x = 42");
2663        assert!(result.is_ok(), "Failed: {:?}", result.err());
2664    }
2665
2666    #[test]
2667    fn test_parse_function() {
2668        let result = parse("fn add(a: int, b: int) -> int:\n  return a + b");
2669        assert!(result.is_ok(), "Failed: {:?}", result.err());
2670    }
2671
2672    #[test]
2673    fn test_parse_lambda() {
2674        let result = parse("let f = (x) => x * 2");
2675        assert!(result.is_ok(), "Failed: {:?}", result.err());
2676    }
2677
2678    #[test]
2679    fn test_parse_if_expression() {
2680        let result = parse("let x = if a > b then a else b");
2681        assert!(result.is_ok(), "Failed: {:?}", result.err());
2682    }
2683
2684    #[test]
2685    fn test_parse_followed_by() {
2686        let result = parse("stream alerts = orders.where(amount > 1000) -> Payment where payment.order_id == orders.id");
2687        assert!(result.is_ok(), "Failed: {:?}", result.err());
2688    }
2689
2690    #[test]
2691    fn test_parse_window() {
2692        let result = parse("stream windowed = input.window(5s)");
2693        assert!(result.is_ok(), "Failed: {:?}", result.err());
2694    }
2695
2696    #[test]
2697    fn test_parse_aggregate() {
2698        let result =
2699            parse("stream stats = input.window(1m).aggregate(count: count(), avg: avg(value))");
2700        assert!(result.is_ok(), "Failed: {:?}", result.err());
2701    }
2702
2703    #[test]
2704    fn test_parse_merge() {
2705        let result = parse("stream combined = merge(stream1, stream2)");
2706        assert!(result.is_ok(), "Failed: {:?}", result.err());
2707    }
2708
2709    #[test]
2710    fn test_parse_sequence() {
2711        let result = parse("stream seq = sequence(a: EventA, b: EventB where b.id == a.id)");
2712        assert!(result.is_ok(), "Failed: {:?}", result.err());
2713    }
2714
2715    #[test]
2716    fn test_parse_config() {
2717        let result = parse("config:\n  window_size: 5s\n  batch_size: 100");
2718        assert!(result.is_ok(), "Failed: {:?}", result.err());
2719    }
2720
2721    #[test]
2722    fn test_parse_complex_expression() {
2723        let result = parse("let x = (a + b) * c / d - e");
2724        assert!(result.is_ok(), "Failed: {:?}", result.err());
2725    }
2726
2727    #[test]
2728    fn test_parse_sliding_window() {
2729        let result = parse("stream output = input.window(5m, sliding: 1m)");
2730        assert!(result.is_ok(), "Failed: {:?}", result.err());
2731    }
2732
2733    #[test]
2734    fn test_parse_fork_construct() {
2735        let result =
2736            parse("stream forked = input.fork(branch1: .where(x > 0), branch2: .where(x < 0))");
2737        assert!(result.is_ok(), "Failed: {:?}", result.err());
2738    }
2739
2740    #[test]
2741    fn test_parse_aggregate_functions() {
2742        let result = parse(
2743            "stream stats = input.window(1h).aggregate(total: sum(value), average: avg(value))",
2744        );
2745        assert!(result.is_ok(), "Failed: {:?}", result.err());
2746    }
2747
2748    #[test]
2749    fn test_parse_complex_parentheses() {
2750        let result = parse("let x = ((a + b) * (c - d)) / e");
2751        assert!(result.is_ok(), "Failed: {:?}", result.err());
2752    }
2753
2754    #[test]
2755    fn test_parse_sequence_with_alias() {
2756        let result = parse(
2757            r#"
2758            stream TwoTicks = StockTick as first
2759                -> StockTick as second
2760                .emit(result: "two_ticks")
2761        "#,
2762        );
2763        assert!(result.is_ok(), "Failed: {:?}", result.err());
2764    }
2765
2766    #[test]
2767    fn test_parse_followed_by_with_alias() {
2768        let result = parse("stream alerts = Order as a -> Payment as b");
2769        assert!(result.is_ok(), "Failed: {:?}", result.err());
2770    }
2771
2772    #[test]
2773    fn test_parse_followed_by_with_filter_and_alias() {
2774        // This is the problematic case: 'as b' should be the alias, not part of the expression
2775        let result = parse(
2776            r#"
2777            stream Test = A as a
2778                -> B where value == a.base + 10 as b
2779                .emit(status: "matched")
2780        "#,
2781        );
2782        assert!(result.is_ok(), "Failed: {:?}", result.err());
2783    }
2784
2785    #[test]
2786    fn test_parse_pattern_with_lambda() {
2787        let result =
2788            parse("stream Test = Trade.window(1m).pattern(p: x => x.len() > 3).emit(alert: true)");
2789        assert!(result.is_ok(), "Failed: {:?}", result.err());
2790    }
2791
2792    #[test]
2793    fn test_parse_sase_pattern_decl_simple() {
2794        let result = parse("pattern SimpleAlert = SEQ(Login, Transaction)");
2795        assert!(result.is_ok(), "Failed: {:?}", result.err());
2796    }
2797
2798    #[test]
2799    fn test_parse_sase_pattern_decl_with_kleene() {
2800        let result = parse("pattern MultiTx = SEQ(Login, Transaction+ where amount > 1000)");
2801        assert!(result.is_ok(), "Failed: {:?}", result.err());
2802    }
2803
2804    #[test]
2805    fn test_parse_sase_pattern_decl_with_alias() {
2806        let result = parse("pattern AliasedPattern = SEQ(Login as login, Transaction as tx)");
2807        assert!(result.is_ok(), "Failed: {:?}", result.err());
2808    }
2809
2810    #[test]
2811    fn test_parse_sase_pattern_decl_with_within() {
2812        let result = parse("pattern TimedPattern = SEQ(A, B) within 10m");
2813        assert!(result.is_ok(), "Failed: {:?}", result.err());
2814    }
2815
2816    #[test]
2817    fn test_parse_sase_pattern_decl_with_partition() {
2818        let result = parse("pattern PartitionedPattern = SEQ(A, B) partition by user_id");
2819        assert!(result.is_ok(), "Failed: {:?}", result.err());
2820    }
2821
2822    #[test]
2823    fn test_parse_sase_pattern_decl_full() {
2824        // Note: In SASE+ syntax, 'where' comes before 'as' (filter then alias)
2825        let result = parse(
2826            "pattern SuspiciousActivity = SEQ(Transaction+ where amount > 1000 as txs) within 10m partition by user_id"
2827        );
2828        assert!(result.is_ok(), "Failed: {:?}", result.err());
2829    }
2830
2831    #[test]
2832    fn test_parse_sase_pattern_decl_or() {
2833        let result = parse("pattern AlertOrWarn = Login OR Logout");
2834        assert!(result.is_ok(), "Failed: {:?}", result.err());
2835    }
2836
2837    #[test]
2838    fn test_parse_sase_pattern_decl_and() {
2839        let result = parse("pattern BothEvents = Login AND Transaction");
2840        assert!(result.is_ok(), "Failed: {:?}", result.err());
2841    }
2842
2843    #[test]
2844    fn test_parse_sase_pattern_decl_not() {
2845        let result = parse("pattern NoLogout = SEQ(Login, NOT Logout, Transaction)");
2846        assert!(result.is_ok(), "Failed: {:?}", result.err());
2847    }
2848
2849    #[test]
2850    fn test_parse_having() {
2851        let result = parse(
2852            "stream filtered = input.window(1m).aggregate(count: count(), total: sum(value)).having(count > 10)",
2853        );
2854        assert!(result.is_ok(), "Failed: {:?}", result.err());
2855    }
2856
2857    #[test]
2858    fn test_parse_having_with_partition() {
2859        let result = parse(
2860            "stream grouped = input.partition_by(category).window(5m).aggregate(avg_price: avg(price)).having(avg_price > 100.0)",
2861        );
2862        assert!(result.is_ok(), "Failed: {:?}", result.err());
2863    }
2864
2865    #[test]
2866    fn test_parse_timer_source() {
2867        let result = parse("stream heartbeat = timer(5s).emit(type: \"heartbeat\")");
2868        assert!(result.is_ok(), "Failed: {:?}", result.err());
2869    }
2870
2871    #[test]
2872    fn test_parse_timer_source_with_initial_delay() {
2873        let result =
2874            parse("stream delayed_timer = timer(1m, initial_delay: 10s).emit(type: \"periodic\")");
2875        assert!(result.is_ok(), "Failed: {:?}", result.err());
2876    }
2877
2878    #[test]
2879    fn test_parse_var_decl() {
2880        let result = parse("var threshold: float = 10.0");
2881        assert!(result.is_ok(), "Failed: {:?}", result.err());
2882    }
2883
2884    #[test]
2885    fn test_parse_let_decl() {
2886        let result = parse("let max_count: int = 100");
2887        assert!(result.is_ok(), "Failed: {:?}", result.err());
2888    }
2889
2890    #[test]
2891    fn test_parse_assignment() {
2892        let result = parse("threshold := threshold + 10.0");
2893        assert!(result.is_ok(), "Failed: {:?}", result.err());
2894    }
2895
2896    #[test]
2897    fn test_parse_assignment_with_expression() {
2898        let result = parse("count := count * 2 + offset");
2899        assert!(result.is_ok(), "Failed: {:?}", result.err());
2900    }
2901
2902    #[test]
2903    fn test_parse_nested_stream_reference() {
2904        let result = parse("stream Base = Event\nstream Derived = Base.where(x > 0)");
2905        assert!(result.is_ok(), "Failed: {:?}", result.err());
2906    }
2907
2908    #[test]
2909    fn test_parse_multi_stage_pipeline() {
2910        let result = parse(
2911            "stream L1 = Raw\nstream L2 = L1.where(a > 1)\nstream L3 = L2.window(5).aggregate(cnt: count())",
2912        );
2913        assert!(result.is_ok(), "Failed: {:?}", result.err());
2914    }
2915
2916    #[test]
2917    fn test_parse_stream_with_operations_chain() {
2918        let result = parse(
2919            "stream Processed = Source.where(valid).window(10).aggregate(sum: sum(value)).having(sum > 100)",
2920        );
2921        assert!(result.is_ok(), "Failed: {:?}", result.err());
2922    }
2923
2924    // =========================================================================
2925    // Connectivity Architecture Tests
2926    // =========================================================================
2927
2928    #[test]
2929    fn test_parse_connector_mqtt() {
2930        let result = parse(
2931            r#"connector MqttBroker = mqtt (
2932                host: "localhost",
2933                port: 1883,
2934                client_id: "varpulis"
2935            )"#,
2936        );
2937        assert!(result.is_ok(), "Failed: {:?}", result.err());
2938    }
2939
2940    #[test]
2941    fn test_parse_connector_kafka() {
2942        let result = parse(
2943            r#"connector KafkaCluster = kafka (
2944                brokers: ["kafka1:9092", "kafka2:9092"],
2945                group_id: "my-group"
2946            )"#,
2947        );
2948        assert!(result.is_ok(), "Failed: {:?}", result.err());
2949    }
2950
2951    #[test]
2952    fn test_parse_connector_http() {
2953        let result = parse(
2954            r#"connector ApiEndpoint = http (
2955                base_url: "https://api.example.com"
2956            )"#,
2957        );
2958        assert!(result.is_ok(), "Failed: {:?}", result.err());
2959    }
2960
2961    #[test]
2962    fn test_parse_stream_with_from_connector() {
2963        let result = parse(
2964            r#"stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/temp/#")"#,
2965        );
2966        assert!(result.is_ok(), "Failed: {:?}", result.err());
2967    }
2968
2969    #[test]
2970    fn test_parse_stream_with_from_and_operations() {
2971        let result = parse(
2972            r#"stream HighTemp = TemperatureReading
2973                .from(MqttSensors, topic: "sensors/#")
2974                .where(value > 30)
2975                .emit(alert: "high_temp")"#,
2976        );
2977        assert!(result.is_ok(), "Failed: {:?}", result.err());
2978    }
2979
2980    #[test]
2981    fn test_parse_full_connectivity_pipeline() {
2982        let result = parse(
2983            r#"
2984            connector MqttSensors = mqtt (host: "localhost", port: 1883)
2985            connector KafkaAlerts = kafka (brokers: ["kafka:9092"])
2986
2987            event TemperatureReading:
2988                sensor_id: str
2989                value: float
2990                ts: timestamp
2991
2992            stream Temperatures = TemperatureReading.from(MqttSensors, topic: "sensors/#")
2993
2994            stream HighTempAlert = Temperatures
2995                .where(value > 30)
2996                .emit(alert_type: "HIGH_TEMP", temperature: value)
2997
2998            "#,
2999        );
3000        assert!(result.is_ok(), "Failed: {:?}", result.err());
3001    }
3002
3003    #[test]
3004    fn test_parse_emit_as_type() {
3005        let result = parse(
3006            r#"stream Alerts = Temperatures
3007                .where(value > 30)
3008                .emit as AlertEvent(severity: "high", temp: value)"#,
3009        );
3010        assert!(result.is_ok(), "Failed: {:?}", result.err());
3011    }
3012
3013    #[test]
3014    fn test_parse_stream_with_to_connector() {
3015        let result = parse(
3016            r#"stream Output = Input
3017                .where(x > 0)
3018                .emit(y: x * 2)
3019                .to(KafkaOutput, topic: "output")"#,
3020        );
3021        assert!(result.is_ok(), "Failed: {:?}", result.err());
3022    }
3023
3024    #[test]
3025    fn test_emit_stmt_parses() {
3026        let result = parse(
3027            r#"fn test():
3028    emit Pixel(x: 1, y: 2)"#,
3029        );
3030        assert!(result.is_ok(), "Failed: {:?}", result.err());
3031        let program = result.unwrap();
3032        // Find the fn_decl
3033        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3034            match &body[0].node {
3035                Stmt::Emit { event_type, fields } => {
3036                    assert_eq!(event_type, "Pixel");
3037                    assert_eq!(fields.len(), 2);
3038                    assert_eq!(fields[0].name, "x");
3039                    assert_eq!(fields[1].name, "y");
3040                }
3041                other => panic!("Expected Stmt::Emit, got {:?}", other),
3042            }
3043        } else {
3044            panic!("Expected FnDecl");
3045        }
3046    }
3047
3048    #[test]
3049    fn test_emit_stmt_no_args() {
3050        let result = parse(
3051            r#"fn test():
3052    emit Done()"#,
3053        );
3054        assert!(result.is_ok(), "Failed: {:?}", result.err());
3055        let program = result.unwrap();
3056        if let Stmt::FnDecl { body, .. } = &program.statements[0].node {
3057            match &body[0].node {
3058                Stmt::Emit { event_type, fields } => {
3059                    assert_eq!(event_type, "Done");
3060                    assert!(fields.is_empty());
3061                }
3062                other => panic!("Expected Stmt::Emit, got {:?}", other),
3063            }
3064        } else {
3065            panic!("Expected FnDecl");
3066        }
3067    }
3068
3069    #[test]
3070    fn test_emit_in_function_with_for_loop() {
3071        let result = parse(
3072            r#"fn generate(n: int):
3073    for i in 0..n:
3074        emit Item(index: i, value: i * 2)"#,
3075        );
3076        assert!(result.is_ok(), "Failed: {:?}", result.err());
3077    }
3078
3079    #[test]
3080    fn test_parse_process_op() {
3081        let result = parse(
3082            r#"fn do_work():
3083    emit Result(v: 42)
3084
3085stream S = timer(1s).process(do_work())"#,
3086        );
3087        assert!(result.is_ok(), "Failed: {:?}", result.err());
3088    }
3089
3090    #[test]
3091    fn test_parse_trend_aggregate_count_trends() {
3092        let result = parse(
3093            r#"stream S = StockTick as first
3094    -> all StockTick where price > first.price as rising
3095    -> StockTick where price < rising.price as drop
3096    .within(60s)
3097    .trend_aggregate(count: count_trends())
3098    .emit(event_type: "TrendStats", trends: count)"#,
3099        );
3100        assert!(result.is_ok(), "Failed: {:?}", result.err());
3101        let program = result.unwrap();
3102        // Find the stream declaration and check for TrendAggregate op
3103        for stmt in &program.statements {
3104            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3105                let has_trend_agg = ops
3106                    .iter()
3107                    .any(|op| matches!(op, StreamOp::TrendAggregate(_)));
3108                assert!(has_trend_agg, "Expected TrendAggregate op in stream ops");
3109                // Check that TrendAggregate has the right item
3110                for op in ops {
3111                    if let StreamOp::TrendAggregate(items) = op {
3112                        assert_eq!(items.len(), 1);
3113                        assert_eq!(items[0].alias, "count");
3114                        assert_eq!(items[0].func, "count_trends");
3115                        assert!(items[0].arg.is_none());
3116                    }
3117                }
3118                return;
3119            }
3120        }
3121        panic!("No stream declaration found");
3122    }
3123
3124    #[test]
3125    fn test_parse_trend_aggregate_multiple_items() {
3126        let result = parse(
3127            r#"stream S = StockTick as first
3128    -> all StockTick as rising
3129    .within(60s)
3130    .trend_aggregate(
3131        trend_count: count_trends(),
3132        event_count: count_events(rising)
3133    )
3134    .emit(trends: trend_count, events: event_count)"#,
3135        );
3136        assert!(result.is_ok(), "Failed: {:?}", result.err());
3137        let program = result.unwrap();
3138        for stmt in &program.statements {
3139            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3140                for op in ops {
3141                    if let StreamOp::TrendAggregate(items) = op {
3142                        assert_eq!(items.len(), 2);
3143                        assert_eq!(items[0].alias, "trend_count");
3144                        assert_eq!(items[0].func, "count_trends");
3145                        assert_eq!(items[1].alias, "event_count");
3146                        assert_eq!(items[1].func, "count_events");
3147                        assert!(items[1].arg.is_some());
3148                        return;
3149                    }
3150                }
3151            }
3152        }
3153        panic!("No TrendAggregate found");
3154    }
3155
3156    #[test]
3157    fn test_parse_score_basic() {
3158        let result = parse(
3159            r#"stream S = TradeEvent
3160    .score(model: "models/fraud.onnx", inputs: [amount, risk_score], outputs: [fraud_prob, category])"#,
3161        );
3162        assert!(result.is_ok(), "Failed: {:?}", result.err());
3163        let program = result.unwrap();
3164        for stmt in &program.statements {
3165            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3166                for op in ops {
3167                    if let StreamOp::Score(spec) = op {
3168                        assert_eq!(spec.model_path, "models/fraud.onnx");
3169                        assert_eq!(spec.inputs, vec!["amount", "risk_score"]);
3170                        assert_eq!(spec.outputs, vec!["fraud_prob", "category"]);
3171                        return;
3172                    }
3173                }
3174            }
3175        }
3176        panic!("No Score op found");
3177    }
3178
3179    #[test]
3180    fn test_parse_score_single_field() {
3181        let result = parse(
3182            r#"stream S = Event
3183    .score(model: "model.onnx", inputs: [value], outputs: [prediction])"#,
3184        );
3185        assert!(result.is_ok(), "Failed: {:?}", result.err());
3186        let program = result.unwrap();
3187        for stmt in &program.statements {
3188            if let Stmt::StreamDecl { ops, .. } = &stmt.node {
3189                for op in ops {
3190                    if let StreamOp::Score(spec) = op {
3191                        assert_eq!(spec.model_path, "model.onnx");
3192                        assert_eq!(spec.inputs, vec!["value"]);
3193                        assert_eq!(spec.outputs, vec!["prediction"]);
3194                        return;
3195                    }
3196                }
3197            }
3198        }
3199        panic!("No Score op found");
3200    }
3201
3202    // Regression tests for fuzz-discovered parser hangs (2026-02-21).
3203    // Unmatched `[` brackets cause exponential backtracking in pest's PEG
3204    // recursive descent through array_literal / index_access / slice_access.
3205
3206    #[test]
3207    fn fuzz_regression_unmatched_brackets_timeout() {
3208        // Simplified version of the fuzzer-discovered timeout input (28 unmatched '[')
3209        let input = "c2222222s[s[22s[U2s[U6[U6[22222222s[s[22s[U2s[U6[U6[222*2222s[U6[U6[222*2222s[22s[U6[U6[22*2222s[U6[U6[222*2222s[22s[U6[U6[222*26[U6[222*2";
3210        let start = std::time::Instant::now();
3211        let result = parse(input);
3212        let elapsed = start.elapsed();
3213        assert!(
3214            result.is_err(),
3215            "Should reject deeply nested unmatched brackets"
3216        );
3217        assert!(
3218            elapsed.as_millis() < 100,
3219            "Parser should reject fast, took {:?}",
3220            elapsed
3221        );
3222    }
3223
3224    #[test]
3225    fn fuzz_regression_deeply_nested_brackets_slow_unit() {
3226        // 30 unmatched '[' — must be rejected by nesting depth check
3227        let input = "stream x[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[";
3228        let start = std::time::Instant::now();
3229        let result = parse(input);
3230        let elapsed = start.elapsed();
3231        assert!(result.is_err(), "Should reject deeply nested brackets");
3232        assert!(
3233            elapsed.as_millis() < 100,
3234            "Parser should reject fast, took {:?}",
3235            elapsed
3236        );
3237    }
3238
3239    #[test]
3240    fn nesting_depth_allows_reasonable_programs() {
3241        // 10 levels of nesting — well within the limit
3242        let input = "let x = foo(bar(baz(qux(a, [1, [2, [3, [4]]]]))))";
3243        let result = parse(input);
3244        // May fail for other reasons (not a valid program) but should NOT
3245        // fail with "Nesting depth exceeds maximum"
3246        if let Err(ref e) = result {
3247            let msg = format!("{}", e);
3248            assert!(
3249                !msg.contains("Nesting depth"),
3250                "Should allow 10 levels of nesting: {}",
3251                msg
3252            );
3253        }
3254    }
3255
3256    #[test]
3257    fn nesting_depth_ignores_brackets_in_comments() {
3258        // Brackets inside # comments should not count
3259        let input = "# [[[[[[[[[[[[[[[[[[[[[[[[[[\nstream x = y";
3260        let result = parse(input);
3261        assert!(
3262            result.is_ok(),
3263            "Brackets in comments should be ignored: {:?}",
3264            result.err()
3265        );
3266    }
3267
3268    #[test]
3269    fn nesting_depth_ignores_brackets_in_strings() {
3270        // Brackets inside strings should not count
3271        let input = r#"let x = "[[[[[[[[[[[[[[[[[[[[[[[[[[""#;
3272        let result = parse(input);
3273        if let Err(ref e) = result {
3274            let msg = format!("{}", e);
3275            assert!(
3276                !msg.contains("Nesting depth"),
3277                "Brackets in strings should be ignored: {}",
3278                msg
3279            );
3280        }
3281    }
3282}