Skip to main content

oxirs_stream/
stream_sql_ast.rs

1//! # Stream SQL — AST Types and Lexer
2//!
3//! SQL-like query language for stream processing:
4//! - Configuration types (`StreamSqlConfig`)
5//! - Lexer token definitions and `Lexer`
6//! - AST node types: `Expression`, `SelectStatement`, `WindowSpec`, etc.
7
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::Duration;
12
13/// Configuration for Stream SQL engine
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct StreamSqlConfig {
16    /// Maximum query execution time
17    pub max_execution_time: Duration,
18    /// Enable query optimization
19    pub enable_optimization: bool,
20    /// Maximum memory for query execution
21    pub max_memory_bytes: usize,
22    /// Enable parallel execution
23    pub parallel_execution: bool,
24    /// Number of worker threads
25    pub worker_threads: usize,
26    /// Enable query caching
27    pub enable_query_cache: bool,
28    /// Query cache size
29    pub cache_size: usize,
30    /// Enable query logging
31    pub enable_query_logging: bool,
32    /// Default window size
33    pub default_window_size: Duration,
34}
35
36impl Default for StreamSqlConfig {
37    fn default() -> Self {
38        Self {
39            max_execution_time: Duration::from_secs(60),
40            enable_optimization: true,
41            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
42            parallel_execution: true,
43            worker_threads: 4,
44            enable_query_cache: true,
45            cache_size: 1000,
46            enable_query_logging: true,
47            default_window_size: Duration::from_secs(60),
48        }
49    }
50}
51
52/// SQL query types
53#[derive(Debug, Clone, PartialEq)]
54pub enum QueryType {
55    /// SELECT query
56    Select,
57    /// CREATE STREAM
58    CreateStream,
59    /// DROP STREAM
60    DropStream,
61    /// INSERT INTO
62    Insert,
63    /// CREATE VIEW
64    CreateView,
65    /// DESCRIBE
66    Describe,
67    /// EXPLAIN
68    Explain,
69}
70
71/// SQL token types for lexer
72#[derive(Debug, Clone, PartialEq)]
73pub enum Token {
74    // Keywords
75    Select,
76    From,
77    Where,
78    Group,
79    By,
80    Having,
81    Order,
82    Limit,
83    Window,
84    Tumbling,
85    Sliding,
86    Session,
87    Size,
88    Slide,
89    Gap,
90    Create,
91    Stream,
92    View,
93    Drop,
94    Insert,
95    Into,
96    Values,
97    As,
98    And,
99    Or,
100    Not,
101    In,
102    Like,
103    Between,
104    Is,
105    Null,
106    Join,
107    Inner,
108    Left,
109    Right,
110    Full,
111    Outer,
112    On,
113    Describe,
114    Explain,
115    Distinct,
116    Case,
117    When,
118    Then,
119    Else,
120    End,
121
122    // Data types
123    Int,
124    Float,
125    String,
126    Boolean,
127    Timestamp,
128
129    // Operators
130    Plus,
131    Minus,
132    Multiply,
133    Divide,
134    Modulo,
135    Equal,
136    NotEqual,
137    LessThan,
138    LessThanOrEqual,
139    GreaterThan,
140    GreaterThanOrEqual,
141
142    // Punctuation
143    Comma,
144    Dot,
145    Semicolon,
146    OpenParen,
147    CloseParen,
148    OpenBracket,
149    CloseBracket,
150
151    // Literals
152    Identifier(String),
153    StringLiteral(String),
154    NumberLiteral(f64),
155    BooleanLiteral(bool),
156
157    // Aggregate functions
158    Count,
159    Sum,
160    Avg,
161    Min,
162    Max,
163    StdDev,
164    Variance,
165
166    // Special
167    Star,
168    Eof,
169}
170
171/// Lexer for SQL tokenization
172pub struct Lexer {
173    input: Vec<char>,
174    position: usize,
175    current_char: Option<char>,
176}
177
178impl Lexer {
179    /// Create a new lexer
180    pub fn new(input: &str) -> Self {
181        let chars: Vec<char> = input.chars().collect();
182        let current_char = chars.first().copied();
183        Self {
184            input: chars,
185            position: 0,
186            current_char,
187        }
188    }
189
190    /// Advance to next character
191    fn advance(&mut self) {
192        self.position += 1;
193        self.current_char = self.input.get(self.position).copied();
194    }
195
196    /// Skip whitespace
197    fn skip_whitespace(&mut self) {
198        while let Some(c) = self.current_char {
199            if c.is_whitespace() {
200                self.advance();
201            } else {
202                break;
203            }
204        }
205    }
206
207    /// Read identifier or keyword
208    fn read_identifier(&mut self) -> String {
209        let mut result = String::new();
210        while let Some(c) = self.current_char {
211            if c.is_alphanumeric() || c == '_' {
212                result.push(c);
213                self.advance();
214            } else {
215                break;
216            }
217        }
218        result
219    }
220
221    /// Read number literal
222    fn read_number(&mut self) -> f64 {
223        let mut result = String::new();
224        while let Some(c) = self.current_char {
225            if c.is_ascii_digit() || c == '.' {
226                result.push(c);
227                self.advance();
228            } else {
229                break;
230            }
231        }
232        result.parse().unwrap_or(0.0)
233    }
234
235    /// Read string literal
236    fn read_string(&mut self) -> String {
237        let quote = self
238            .current_char
239            .expect("current_char should be Some when read_string is called");
240        self.advance(); // Skip opening quote
241        let mut result = String::new();
242        while let Some(c) = self.current_char {
243            if c == quote {
244                self.advance(); // Skip closing quote
245                break;
246            } else if c == '\\' {
247                self.advance();
248                if let Some(escaped) = self.current_char {
249                    result.push(escaped);
250                    self.advance();
251                }
252            } else {
253                result.push(c);
254                self.advance();
255            }
256        }
257        result
258    }
259
260    /// Get next token
261    pub fn next_token(&mut self) -> Token {
262        self.skip_whitespace();
263
264        match self.current_char {
265            None => Token::Eof,
266            Some(c) => {
267                if c.is_alphabetic() || c == '_' {
268                    let ident = self.read_identifier();
269                    match ident.to_uppercase().as_str() {
270                        "SELECT" => Token::Select,
271                        "FROM" => Token::From,
272                        "WHERE" => Token::Where,
273                        "GROUP" => Token::Group,
274                        "BY" => Token::By,
275                        "HAVING" => Token::Having,
276                        "ORDER" => Token::Order,
277                        "LIMIT" => Token::Limit,
278                        "WINDOW" => Token::Window,
279                        "TUMBLING" => Token::Tumbling,
280                        "SLIDING" => Token::Sliding,
281                        "SESSION" => Token::Session,
282                        "SIZE" => Token::Size,
283                        "SLIDE" => Token::Slide,
284                        "GAP" => Token::Gap,
285                        "CREATE" => Token::Create,
286                        "STREAM" => Token::Stream,
287                        "VIEW" => Token::View,
288                        "DROP" => Token::Drop,
289                        "INSERT" => Token::Insert,
290                        "INTO" => Token::Into,
291                        "VALUES" => Token::Values,
292                        "AS" => Token::As,
293                        "AND" => Token::And,
294                        "OR" => Token::Or,
295                        "NOT" => Token::Not,
296                        "IN" => Token::In,
297                        "LIKE" => Token::Like,
298                        "BETWEEN" => Token::Between,
299                        "IS" => Token::Is,
300                        "NULL" => Token::Null,
301                        "JOIN" => Token::Join,
302                        "INNER" => Token::Inner,
303                        "LEFT" => Token::Left,
304                        "RIGHT" => Token::Right,
305                        "FULL" => Token::Full,
306                        "OUTER" => Token::Outer,
307                        "ON" => Token::On,
308                        "DESCRIBE" => Token::Describe,
309                        "EXPLAIN" => Token::Explain,
310                        "DISTINCT" => Token::Distinct,
311                        "CASE" => Token::Case,
312                        "WHEN" => Token::When,
313                        "THEN" => Token::Then,
314                        "ELSE" => Token::Else,
315                        "END" => Token::End,
316                        "INT" | "INTEGER" => Token::Int,
317                        "FLOAT" | "DOUBLE" => Token::Float,
318                        "STRING" | "VARCHAR" | "TEXT" => Token::String,
319                        "BOOLEAN" | "BOOL" => Token::Boolean,
320                        "TIMESTAMP" | "DATETIME" => Token::Timestamp,
321                        "COUNT" => Token::Count,
322                        "SUM" => Token::Sum,
323                        "AVG" => Token::Avg,
324                        "MIN" => Token::Min,
325                        "MAX" => Token::Max,
326                        "STDDEV" => Token::StdDev,
327                        "VARIANCE" | "VAR" => Token::Variance,
328                        "TRUE" => Token::BooleanLiteral(true),
329                        "FALSE" => Token::BooleanLiteral(false),
330                        _ => Token::Identifier(ident),
331                    }
332                } else if c.is_ascii_digit() {
333                    Token::NumberLiteral(self.read_number())
334                } else if c == '\'' || c == '"' {
335                    Token::StringLiteral(self.read_string())
336                } else {
337                    match c {
338                        '+' => {
339                            self.advance();
340                            Token::Plus
341                        }
342                        '-' => {
343                            self.advance();
344                            Token::Minus
345                        }
346                        '*' => {
347                            self.advance();
348                            Token::Star
349                        }
350                        '/' => {
351                            self.advance();
352                            Token::Divide
353                        }
354                        '%' => {
355                            self.advance();
356                            Token::Modulo
357                        }
358                        '=' => {
359                            self.advance();
360                            Token::Equal
361                        }
362                        '<' => {
363                            self.advance();
364                            if self.current_char == Some('=') {
365                                self.advance();
366                                Token::LessThanOrEqual
367                            } else if self.current_char == Some('>') {
368                                self.advance();
369                                Token::NotEqual
370                            } else {
371                                Token::LessThan
372                            }
373                        }
374                        '>' => {
375                            self.advance();
376                            if self.current_char == Some('=') {
377                                self.advance();
378                                Token::GreaterThanOrEqual
379                            } else {
380                                Token::GreaterThan
381                            }
382                        }
383                        '!' => {
384                            self.advance();
385                            if self.current_char == Some('=') {
386                                self.advance();
387                                Token::NotEqual
388                            } else {
389                                Token::Not
390                            }
391                        }
392                        ',' => {
393                            self.advance();
394                            Token::Comma
395                        }
396                        '.' => {
397                            self.advance();
398                            Token::Dot
399                        }
400                        ';' => {
401                            self.advance();
402                            Token::Semicolon
403                        }
404                        '(' => {
405                            self.advance();
406                            Token::OpenParen
407                        }
408                        ')' => {
409                            self.advance();
410                            Token::CloseParen
411                        }
412                        '[' => {
413                            self.advance();
414                            Token::OpenBracket
415                        }
416                        ']' => {
417                            self.advance();
418                            Token::CloseBracket
419                        }
420                        _ => {
421                            self.advance();
422                            Token::Eof
423                        }
424                    }
425                }
426            }
427        }
428    }
429
430    /// Tokenize entire input
431    pub fn tokenize(&mut self) -> Vec<Token> {
432        let mut tokens = Vec::new();
433        loop {
434            let token = self.next_token();
435            if token == Token::Eof {
436                tokens.push(token);
437                break;
438            }
439            tokens.push(token);
440        }
441        tokens
442    }
443}
444
445/// Expression in SQL AST
446#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
447pub enum Expression {
448    /// Column reference
449    Column(String),
450    /// Qualified column (table.column)
451    QualifiedColumn(String, String),
452    /// Literal value
453    Literal(SqlValue),
454    /// Binary operation
455    BinaryOp {
456        left: Box<Expression>,
457        op: BinaryOperator,
458        right: Box<Expression>,
459    },
460    /// Unary operation
461    UnaryOp {
462        op: UnaryOperator,
463        expr: Box<Expression>,
464    },
465    /// Function call
466    Function {
467        name: String,
468        args: Vec<Expression>,
469        distinct: bool,
470    },
471    /// Aggregate function
472    Aggregate {
473        func: AggregateFunction,
474        expr: Box<Expression>,
475        distinct: bool,
476    },
477    /// CASE expression
478    Case {
479        operand: Option<Box<Expression>>,
480        when_clauses: Vec<(Expression, Expression)>,
481        else_clause: Option<Box<Expression>>,
482    },
483    /// Subquery
484    Subquery(Box<SelectStatement>),
485    /// IN expression
486    InList {
487        expr: Box<Expression>,
488        list: Vec<Expression>,
489        negated: bool,
490    },
491    /// BETWEEN expression
492    Between {
493        expr: Box<Expression>,
494        low: Box<Expression>,
495        high: Box<Expression>,
496        negated: bool,
497    },
498    /// IS NULL
499    IsNull {
500        expr: Box<Expression>,
501        negated: bool,
502    },
503    /// Star (*)
504    Star,
505}
506
507/// SQL value types
508#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
509pub enum SqlValue {
510    Null,
511    Integer(i64),
512    Float(f64),
513    String(String),
514    Boolean(bool),
515    Timestamp(DateTime<Utc>),
516}
517
518/// Binary operators
519#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
520pub enum BinaryOperator {
521    Plus,
522    Minus,
523    Multiply,
524    Divide,
525    Modulo,
526    Equal,
527    NotEqual,
528    LessThan,
529    LessThanOrEqual,
530    GreaterThan,
531    GreaterThanOrEqual,
532    And,
533    Or,
534    Like,
535}
536
537/// Unary operators
538#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
539pub enum UnaryOperator {
540    Not,
541    Minus,
542}
543
544/// Aggregate functions
545#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
546pub enum AggregateFunction {
547    Count,
548    Sum,
549    Avg,
550    Min,
551    Max,
552    StdDev,
553    Variance,
554}
555
556/// Window specification
557#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
558pub struct WindowSpec {
559    /// Window type
560    pub window_type: WindowType,
561    /// Window size
562    pub size: Duration,
563    /// Slide interval (for sliding windows)
564    pub slide: Option<Duration>,
565    /// Session gap (for session windows)
566    pub gap: Option<Duration>,
567    /// Time attribute
568    pub time_attribute: Option<String>,
569}
570
571/// Window types
572#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
573pub enum WindowType {
574    Tumbling,
575    Sliding,
576    Session,
577}
578
579/// SELECT column specification
580#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
581pub struct SelectItem {
582    /// Expression
583    pub expr: Expression,
584    /// Alias
585    pub alias: Option<String>,
586}
587
588/// FROM clause item
589#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
590pub enum FromClause {
591    /// Simple table/stream reference
592    Table { name: String, alias: Option<String> },
593    /// Join
594    Join {
595        left: Box<FromClause>,
596        right: Box<FromClause>,
597        join_type: JoinType,
598        condition: Option<Expression>,
599    },
600    /// Subquery
601    Subquery {
602        query: Box<SelectStatement>,
603        alias: String,
604    },
605}
606
607/// Join types
608#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
609pub enum JoinType {
610    Inner,
611    Left,
612    Right,
613    Full,
614    Cross,
615}
616
617/// ORDER BY specification
618#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
619pub struct OrderByItem {
620    /// Expression to order by
621    pub expr: Expression,
622    /// Ascending or descending
623    pub ascending: bool,
624    /// NULLS FIRST or NULLS LAST
625    pub nulls_first: Option<bool>,
626}
627
628/// SELECT statement AST
629#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
630pub struct SelectStatement {
631    /// DISTINCT flag
632    pub distinct: bool,
633    /// SELECT list
634    pub columns: Vec<SelectItem>,
635    /// FROM clause
636    pub from: Option<FromClause>,
637    /// WHERE clause
638    pub where_clause: Option<Expression>,
639    /// GROUP BY clause
640    pub group_by: Vec<Expression>,
641    /// HAVING clause
642    pub having: Option<Expression>,
643    /// ORDER BY clause
644    pub order_by: Vec<OrderByItem>,
645    /// LIMIT
646    pub limit: Option<usize>,
647    /// OFFSET
648    pub offset: Option<usize>,
649    /// Window specification
650    pub window: Option<WindowSpec>,
651}
652
653/// CREATE STREAM statement
654#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
655pub struct CreateStreamStatement {
656    /// Stream name
657    pub name: String,
658    /// Column definitions
659    pub columns: Vec<ColumnDefinition>,
660    /// Stream properties
661    pub properties: HashMap<String, String>,
662}
663
664/// Column definition
665#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
666pub struct ColumnDefinition {
667    /// Column name
668    pub name: String,
669    /// Data type
670    pub data_type: DataType,
671    /// NOT NULL constraint
672    pub not_null: bool,
673    /// DEFAULT value
674    pub default: Option<Expression>,
675}
676
677/// SQL data types
678#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
679pub enum DataType {
680    Integer,
681    Float,
682    String,
683    Boolean,
684    Timestamp,
685    Array(Box<DataType>),
686    Map(Box<DataType>, Box<DataType>),
687}
688
689/// Query result row
690#[derive(Debug, Clone, Serialize, Deserialize)]
691pub struct ResultRow {
692    /// Column values
693    pub values: Vec<SqlValue>,
694}
695
696/// Query result
697#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct QueryResult {
699    /// Column names
700    pub columns: Vec<String>,
701    /// Result rows
702    pub rows: Vec<ResultRow>,
703    /// Execution time
704    pub execution_time: Duration,
705    /// Rows affected
706    pub rows_affected: usize,
707}
708
709/// Stream metadata
710#[derive(Debug, Clone, Serialize, Deserialize)]
711pub struct StreamMetadata {
712    /// Stream name
713    pub name: String,
714    /// Column definitions
715    pub columns: Vec<ColumnDefinition>,
716    /// Stream properties
717    pub properties: HashMap<String, String>,
718    /// Created at
719    pub created_at: DateTime<Utc>,
720}
721
722/// Stream SQL statistics
723#[derive(Debug, Clone, Default, Serialize, Deserialize)]
724pub struct StreamSqlStats {
725    /// Total queries executed
726    pub queries_executed: u64,
727    /// Queries succeeded
728    pub queries_succeeded: u64,
729    /// Queries failed
730    pub queries_failed: u64,
731    /// Average execution time
732    pub avg_execution_time_ms: f64,
733    /// Cache hits
734    pub cache_hits: u64,
735    /// Cache misses
736    pub cache_misses: u64,
737}