Skip to main content

oxirs_stream/
stream_sql.rs

1//! # Stream SQL - SQL-like Query Language for Streams
2//!
3//! This module provides a SQL-like query language specifically designed
4//! for stream processing, enabling developers to write familiar SQL queries
5//! for real-time data processing.
6//!
7//! ## Features
8//! - SQL parsing and execution for streams
9//! - Window functions (TUMBLING, SLIDING, SESSION)
10//! - Aggregate functions (COUNT, SUM, AVG, MIN, MAX, STDDEV)
11//! - Stream joins with temporal semantics
12//! - Pattern matching in streams
13//! - Query optimization
14//!
15//! ## Example
16//! ```sql
17//! SELECT sensor_id, AVG(temperature) as avg_temp
18//! FROM sensor_stream
19//! WINDOW TUMBLING (SIZE 5 MINUTES)
20//! GROUP BY sensor_id
21//! HAVING AVG(temperature) > 30
22//! ```
23
24use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::RwLock;
31use tracing::{debug, info};
32
33/// Configuration for Stream SQL engine
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct StreamSqlConfig {
36    /// Maximum query execution time
37    pub max_execution_time: Duration,
38    /// Enable query optimization
39    pub enable_optimization: bool,
40    /// Maximum memory for query execution
41    pub max_memory_bytes: usize,
42    /// Enable parallel execution
43    pub parallel_execution: bool,
44    /// Number of worker threads
45    pub worker_threads: usize,
46    /// Enable query caching
47    pub enable_query_cache: bool,
48    /// Query cache size
49    pub cache_size: usize,
50    /// Enable query logging
51    pub enable_query_logging: bool,
52    /// Default window size
53    pub default_window_size: Duration,
54}
55
56impl Default for StreamSqlConfig {
57    fn default() -> Self {
58        Self {
59            max_execution_time: Duration::from_secs(60),
60            enable_optimization: true,
61            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
62            parallel_execution: true,
63            worker_threads: 4,
64            enable_query_cache: true,
65            cache_size: 1000,
66            enable_query_logging: true,
67            default_window_size: Duration::from_secs(60),
68        }
69    }
70}
71
72/// SQL query types
73#[derive(Debug, Clone, PartialEq)]
74pub enum QueryType {
75    /// SELECT query
76    Select,
77    /// CREATE STREAM
78    CreateStream,
79    /// DROP STREAM
80    DropStream,
81    /// INSERT INTO
82    Insert,
83    /// CREATE VIEW
84    CreateView,
85    /// DESCRIBE
86    Describe,
87    /// EXPLAIN
88    Explain,
89}
90
91/// SQL token types for lexer
92#[derive(Debug, Clone, PartialEq)]
93pub enum Token {
94    // Keywords
95    Select,
96    From,
97    Where,
98    Group,
99    By,
100    Having,
101    Order,
102    Limit,
103    Window,
104    Tumbling,
105    Sliding,
106    Session,
107    Size,
108    Slide,
109    Gap,
110    Create,
111    Stream,
112    View,
113    Drop,
114    Insert,
115    Into,
116    Values,
117    As,
118    And,
119    Or,
120    Not,
121    In,
122    Like,
123    Between,
124    Is,
125    Null,
126    Join,
127    Inner,
128    Left,
129    Right,
130    Full,
131    Outer,
132    On,
133    Describe,
134    Explain,
135    Distinct,
136    Case,
137    When,
138    Then,
139    Else,
140    End,
141
142    // Data types
143    Int,
144    Float,
145    String,
146    Boolean,
147    Timestamp,
148
149    // Operators
150    Plus,
151    Minus,
152    Multiply,
153    Divide,
154    Modulo,
155    Equal,
156    NotEqual,
157    LessThan,
158    LessThanOrEqual,
159    GreaterThan,
160    GreaterThanOrEqual,
161
162    // Punctuation
163    Comma,
164    Dot,
165    Semicolon,
166    OpenParen,
167    CloseParen,
168    OpenBracket,
169    CloseBracket,
170
171    // Literals
172    Identifier(String),
173    StringLiteral(String),
174    NumberLiteral(f64),
175    BooleanLiteral(bool),
176
177    // Aggregate functions
178    Count,
179    Sum,
180    Avg,
181    Min,
182    Max,
183    StdDev,
184    Variance,
185
186    // Special
187    Star,
188    Eof,
189}
190
191/// Lexer for SQL tokenization
192pub struct Lexer {
193    input: Vec<char>,
194    position: usize,
195    current_char: Option<char>,
196}
197
198impl Lexer {
199    /// Create a new lexer
200    pub fn new(input: &str) -> Self {
201        let chars: Vec<char> = input.chars().collect();
202        let current_char = chars.first().copied();
203        Self {
204            input: chars,
205            position: 0,
206            current_char,
207        }
208    }
209
210    /// Advance to next character
211    fn advance(&mut self) {
212        self.position += 1;
213        self.current_char = self.input.get(self.position).copied();
214    }
215
216    /// Peek at next character
217    fn peek(&self) -> Option<char> {
218        self.input.get(self.position + 1).copied()
219    }
220
221    /// Skip whitespace
222    fn skip_whitespace(&mut self) {
223        while let Some(c) = self.current_char {
224            if c.is_whitespace() {
225                self.advance();
226            } else {
227                break;
228            }
229        }
230    }
231
232    /// Read identifier or keyword
233    fn read_identifier(&mut self) -> String {
234        let mut result = String::new();
235        while let Some(c) = self.current_char {
236            if c.is_alphanumeric() || c == '_' {
237                result.push(c);
238                self.advance();
239            } else {
240                break;
241            }
242        }
243        result
244    }
245
246    /// Read number literal
247    fn read_number(&mut self) -> f64 {
248        let mut result = String::new();
249        while let Some(c) = self.current_char {
250            if c.is_ascii_digit() || c == '.' {
251                result.push(c);
252                self.advance();
253            } else {
254                break;
255            }
256        }
257        result.parse().unwrap_or(0.0)
258    }
259
260    /// Read string literal
261    fn read_string(&mut self) -> String {
262        let quote = self
263            .current_char
264            .expect("current_char should be Some when read_string is called");
265        self.advance(); // Skip opening quote
266        let mut result = String::new();
267        while let Some(c) = self.current_char {
268            if c == quote {
269                self.advance(); // Skip closing quote
270                break;
271            } else if c == '\\' {
272                self.advance();
273                if let Some(escaped) = self.current_char {
274                    result.push(escaped);
275                    self.advance();
276                }
277            } else {
278                result.push(c);
279                self.advance();
280            }
281        }
282        result
283    }
284
285    /// Get next token
286    pub fn next_token(&mut self) -> Token {
287        self.skip_whitespace();
288
289        match self.current_char {
290            None => Token::Eof,
291            Some(c) => {
292                if c.is_alphabetic() || c == '_' {
293                    let ident = self.read_identifier();
294                    match ident.to_uppercase().as_str() {
295                        "SELECT" => Token::Select,
296                        "FROM" => Token::From,
297                        "WHERE" => Token::Where,
298                        "GROUP" => Token::Group,
299                        "BY" => Token::By,
300                        "HAVING" => Token::Having,
301                        "ORDER" => Token::Order,
302                        "LIMIT" => Token::Limit,
303                        "WINDOW" => Token::Window,
304                        "TUMBLING" => Token::Tumbling,
305                        "SLIDING" => Token::Sliding,
306                        "SESSION" => Token::Session,
307                        "SIZE" => Token::Size,
308                        "SLIDE" => Token::Slide,
309                        "GAP" => Token::Gap,
310                        "CREATE" => Token::Create,
311                        "STREAM" => Token::Stream,
312                        "VIEW" => Token::View,
313                        "DROP" => Token::Drop,
314                        "INSERT" => Token::Insert,
315                        "INTO" => Token::Into,
316                        "VALUES" => Token::Values,
317                        "AS" => Token::As,
318                        "AND" => Token::And,
319                        "OR" => Token::Or,
320                        "NOT" => Token::Not,
321                        "IN" => Token::In,
322                        "LIKE" => Token::Like,
323                        "BETWEEN" => Token::Between,
324                        "IS" => Token::Is,
325                        "NULL" => Token::Null,
326                        "JOIN" => Token::Join,
327                        "INNER" => Token::Inner,
328                        "LEFT" => Token::Left,
329                        "RIGHT" => Token::Right,
330                        "FULL" => Token::Full,
331                        "OUTER" => Token::Outer,
332                        "ON" => Token::On,
333                        "DESCRIBE" => Token::Describe,
334                        "EXPLAIN" => Token::Explain,
335                        "DISTINCT" => Token::Distinct,
336                        "CASE" => Token::Case,
337                        "WHEN" => Token::When,
338                        "THEN" => Token::Then,
339                        "ELSE" => Token::Else,
340                        "END" => Token::End,
341                        "INT" | "INTEGER" => Token::Int,
342                        "FLOAT" | "DOUBLE" => Token::Float,
343                        "STRING" | "VARCHAR" | "TEXT" => Token::String,
344                        "BOOLEAN" | "BOOL" => Token::Boolean,
345                        "TIMESTAMP" | "DATETIME" => Token::Timestamp,
346                        "COUNT" => Token::Count,
347                        "SUM" => Token::Sum,
348                        "AVG" => Token::Avg,
349                        "MIN" => Token::Min,
350                        "MAX" => Token::Max,
351                        "STDDEV" => Token::StdDev,
352                        "VARIANCE" | "VAR" => Token::Variance,
353                        "TRUE" => Token::BooleanLiteral(true),
354                        "FALSE" => Token::BooleanLiteral(false),
355                        _ => Token::Identifier(ident),
356                    }
357                } else if c.is_ascii_digit() {
358                    Token::NumberLiteral(self.read_number())
359                } else if c == '\'' || c == '"' {
360                    Token::StringLiteral(self.read_string())
361                } else {
362                    match c {
363                        '+' => {
364                            self.advance();
365                            Token::Plus
366                        }
367                        '-' => {
368                            self.advance();
369                            Token::Minus
370                        }
371                        '*' => {
372                            self.advance();
373                            Token::Star
374                        }
375                        '/' => {
376                            self.advance();
377                            Token::Divide
378                        }
379                        '%' => {
380                            self.advance();
381                            Token::Modulo
382                        }
383                        '=' => {
384                            self.advance();
385                            Token::Equal
386                        }
387                        '<' => {
388                            self.advance();
389                            if self.current_char == Some('=') {
390                                self.advance();
391                                Token::LessThanOrEqual
392                            } else if self.current_char == Some('>') {
393                                self.advance();
394                                Token::NotEqual
395                            } else {
396                                Token::LessThan
397                            }
398                        }
399                        '>' => {
400                            self.advance();
401                            if self.current_char == Some('=') {
402                                self.advance();
403                                Token::GreaterThanOrEqual
404                            } else {
405                                Token::GreaterThan
406                            }
407                        }
408                        '!' => {
409                            self.advance();
410                            if self.current_char == Some('=') {
411                                self.advance();
412                                Token::NotEqual
413                            } else {
414                                Token::Not
415                            }
416                        }
417                        ',' => {
418                            self.advance();
419                            Token::Comma
420                        }
421                        '.' => {
422                            self.advance();
423                            Token::Dot
424                        }
425                        ';' => {
426                            self.advance();
427                            Token::Semicolon
428                        }
429                        '(' => {
430                            self.advance();
431                            Token::OpenParen
432                        }
433                        ')' => {
434                            self.advance();
435                            Token::CloseParen
436                        }
437                        '[' => {
438                            self.advance();
439                            Token::OpenBracket
440                        }
441                        ']' => {
442                            self.advance();
443                            Token::CloseBracket
444                        }
445                        _ => {
446                            self.advance();
447                            Token::Eof
448                        }
449                    }
450                }
451            }
452        }
453    }
454
455    /// Tokenize entire input
456    pub fn tokenize(&mut self) -> Vec<Token> {
457        let mut tokens = Vec::new();
458        loop {
459            let token = self.next_token();
460            if token == Token::Eof {
461                tokens.push(token);
462                break;
463            }
464            tokens.push(token);
465        }
466        tokens
467    }
468}
469
470/// Expression in SQL AST
471#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
472pub enum Expression {
473    /// Column reference
474    Column(String),
475    /// Qualified column (table.column)
476    QualifiedColumn(String, String),
477    /// Literal value
478    Literal(SqlValue),
479    /// Binary operation
480    BinaryOp {
481        left: Box<Expression>,
482        op: BinaryOperator,
483        right: Box<Expression>,
484    },
485    /// Unary operation
486    UnaryOp {
487        op: UnaryOperator,
488        expr: Box<Expression>,
489    },
490    /// Function call
491    Function {
492        name: String,
493        args: Vec<Expression>,
494        distinct: bool,
495    },
496    /// Aggregate function
497    Aggregate {
498        func: AggregateFunction,
499        expr: Box<Expression>,
500        distinct: bool,
501    },
502    /// CASE expression
503    Case {
504        operand: Option<Box<Expression>>,
505        when_clauses: Vec<(Expression, Expression)>,
506        else_clause: Option<Box<Expression>>,
507    },
508    /// Subquery
509    Subquery(Box<SelectStatement>),
510    /// IN expression
511    InList {
512        expr: Box<Expression>,
513        list: Vec<Expression>,
514        negated: bool,
515    },
516    /// BETWEEN expression
517    Between {
518        expr: Box<Expression>,
519        low: Box<Expression>,
520        high: Box<Expression>,
521        negated: bool,
522    },
523    /// IS NULL
524    IsNull {
525        expr: Box<Expression>,
526        negated: bool,
527    },
528    /// Star (*)
529    Star,
530}
531
532/// SQL value types
533#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
534pub enum SqlValue {
535    Null,
536    Integer(i64),
537    Float(f64),
538    String(String),
539    Boolean(bool),
540    Timestamp(DateTime<Utc>),
541}
542
543/// Binary operators
544#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
545pub enum BinaryOperator {
546    Plus,
547    Minus,
548    Multiply,
549    Divide,
550    Modulo,
551    Equal,
552    NotEqual,
553    LessThan,
554    LessThanOrEqual,
555    GreaterThan,
556    GreaterThanOrEqual,
557    And,
558    Or,
559    Like,
560}
561
562/// Unary operators
563#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
564pub enum UnaryOperator {
565    Not,
566    Minus,
567}
568
569/// Aggregate functions
570#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
571pub enum AggregateFunction {
572    Count,
573    Sum,
574    Avg,
575    Min,
576    Max,
577    StdDev,
578    Variance,
579}
580
581/// Window specification
582#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
583pub struct WindowSpec {
584    /// Window type
585    pub window_type: WindowType,
586    /// Window size
587    pub size: Duration,
588    /// Slide interval (for sliding windows)
589    pub slide: Option<Duration>,
590    /// Session gap (for session windows)
591    pub gap: Option<Duration>,
592    /// Time attribute
593    pub time_attribute: Option<String>,
594}
595
596/// Window types
597#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
598pub enum WindowType {
599    Tumbling,
600    Sliding,
601    Session,
602}
603
604/// SELECT column specification
605#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
606pub struct SelectItem {
607    /// Expression
608    pub expr: Expression,
609    /// Alias
610    pub alias: Option<String>,
611}
612
613/// FROM clause item
614#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
615pub enum FromClause {
616    /// Simple table/stream reference
617    Table { name: String, alias: Option<String> },
618    /// Join
619    Join {
620        left: Box<FromClause>,
621        right: Box<FromClause>,
622        join_type: JoinType,
623        condition: Option<Expression>,
624    },
625    /// Subquery
626    Subquery {
627        query: Box<SelectStatement>,
628        alias: String,
629    },
630}
631
632/// Join types
633#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
634pub enum JoinType {
635    Inner,
636    Left,
637    Right,
638    Full,
639    Cross,
640}
641
642/// ORDER BY specification
643#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
644pub struct OrderByItem {
645    /// Expression to order by
646    pub expr: Expression,
647    /// Ascending or descending
648    pub ascending: bool,
649    /// NULLS FIRST or NULLS LAST
650    pub nulls_first: Option<bool>,
651}
652
653/// SELECT statement AST
654#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
655pub struct SelectStatement {
656    /// DISTINCT flag
657    pub distinct: bool,
658    /// SELECT list
659    pub columns: Vec<SelectItem>,
660    /// FROM clause
661    pub from: Option<FromClause>,
662    /// WHERE clause
663    pub where_clause: Option<Expression>,
664    /// GROUP BY clause
665    pub group_by: Vec<Expression>,
666    /// HAVING clause
667    pub having: Option<Expression>,
668    /// ORDER BY clause
669    pub order_by: Vec<OrderByItem>,
670    /// LIMIT
671    pub limit: Option<usize>,
672    /// OFFSET
673    pub offset: Option<usize>,
674    /// Window specification
675    pub window: Option<WindowSpec>,
676}
677
678/// CREATE STREAM statement
679#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
680pub struct CreateStreamStatement {
681    /// Stream name
682    pub name: String,
683    /// Column definitions
684    pub columns: Vec<ColumnDefinition>,
685    /// Stream properties
686    pub properties: HashMap<String, String>,
687}
688
689/// Column definition
690#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
691pub struct ColumnDefinition {
692    /// Column name
693    pub name: String,
694    /// Data type
695    pub data_type: DataType,
696    /// NOT NULL constraint
697    pub not_null: bool,
698    /// DEFAULT value
699    pub default: Option<Expression>,
700}
701
702/// SQL data types
703#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
704pub enum DataType {
705    Integer,
706    Float,
707    String,
708    Boolean,
709    Timestamp,
710    Array(Box<DataType>),
711    Map(Box<DataType>, Box<DataType>),
712}
713
714/// Parser for SQL queries
715pub struct Parser {
716    tokens: Vec<Token>,
717    position: usize,
718}
719
720impl Parser {
721    /// Create a new parser
722    pub fn new(tokens: Vec<Token>) -> Self {
723        Self {
724            tokens,
725            position: 0,
726        }
727    }
728
729    /// Get current token
730    fn current_token(&self) -> &Token {
731        self.tokens.get(self.position).unwrap_or(&Token::Eof)
732    }
733
734    /// Peek at next token
735    fn peek_token(&self) -> &Token {
736        self.tokens.get(self.position + 1).unwrap_or(&Token::Eof)
737    }
738
739    /// Advance to next token
740    fn advance(&mut self) {
741        self.position += 1;
742    }
743
744    /// Expect a specific token
745    fn expect(&mut self, expected: Token) -> Result<()> {
746        if self.current_token() == &expected {
747            self.advance();
748            Ok(())
749        } else {
750            Err(anyhow!(
751                "Expected {:?}, got {:?}",
752                expected,
753                self.current_token()
754            ))
755        }
756    }
757
758    /// Parse a SELECT statement
759    pub fn parse_select(&mut self) -> Result<SelectStatement> {
760        self.expect(Token::Select)?;
761
762        // DISTINCT
763        let distinct = if self.current_token() == &Token::Distinct {
764            self.advance();
765            true
766        } else {
767            false
768        };
769
770        // SELECT list
771        let columns = self.parse_select_list()?;
772
773        // FROM clause
774        let from = if self.current_token() == &Token::From {
775            self.advance();
776            Some(self.parse_from_clause()?)
777        } else {
778            None
779        };
780
781        // WINDOW clause
782        let window = if self.current_token() == &Token::Window {
783            self.advance();
784            Some(self.parse_window_spec()?)
785        } else {
786            None
787        };
788
789        // WHERE clause
790        let where_clause = if self.current_token() == &Token::Where {
791            self.advance();
792            Some(self.parse_expression()?)
793        } else {
794            None
795        };
796
797        // GROUP BY clause
798        let group_by = if self.current_token() == &Token::Group {
799            self.advance();
800            // Expect 'BY'
801            if self.current_token() == &Token::By {
802                self.advance();
803            }
804            self.parse_expression_list()?
805        } else {
806            Vec::new()
807        };
808
809        // HAVING clause
810        let having = if self.current_token() == &Token::Having {
811            self.advance();
812            Some(self.parse_expression()?)
813        } else {
814            None
815        };
816
817        // ORDER BY clause
818        let order_by = if self.current_token() == &Token::Order {
819            self.advance();
820            // Expect 'BY'
821            if self.current_token() == &Token::By {
822                self.advance();
823            }
824            self.parse_order_by_list()?
825        } else {
826            Vec::new()
827        };
828
829        // LIMIT
830        let limit = if self.current_token() == &Token::Limit {
831            self.advance();
832            if let Token::NumberLiteral(n) = self.current_token() {
833                let limit = *n as usize;
834                self.advance();
835                Some(limit)
836            } else {
837                None
838            }
839        } else {
840            None
841        };
842
843        Ok(SelectStatement {
844            distinct,
845            columns,
846            from,
847            where_clause,
848            group_by,
849            having,
850            order_by,
851            limit,
852            offset: None,
853            window,
854        })
855    }
856
857    /// Parse SELECT list
858    fn parse_select_list(&mut self) -> Result<Vec<SelectItem>> {
859        let mut items = Vec::new();
860
861        loop {
862            let expr = self.parse_expression()?;
863
864            // Check for alias
865            let alias = if self.current_token() == &Token::As {
866                self.advance();
867                if let Token::Identifier(name) = self.current_token().clone() {
868                    self.advance();
869                    Some(name)
870                } else {
871                    None
872                }
873            } else if let Token::Identifier(name) = self.current_token().clone() {
874                // Alias without AS
875                if name.to_uppercase() != "FROM"
876                    && name.to_uppercase() != "WHERE"
877                    && name.to_uppercase() != "GROUP"
878                    && name.to_uppercase() != "ORDER"
879                    && name.to_uppercase() != "WINDOW"
880                {
881                    self.advance();
882                    Some(name)
883                } else {
884                    None
885                }
886            } else {
887                None
888            };
889
890            items.push(SelectItem { expr, alias });
891
892            if self.current_token() != &Token::Comma {
893                break;
894            }
895            self.advance(); // Skip comma
896        }
897
898        Ok(items)
899    }
900
901    /// Parse FROM clause
902    fn parse_from_clause(&mut self) -> Result<FromClause> {
903        let mut from = self.parse_table_reference()?;
904
905        // Check for joins
906        while matches!(
907            self.current_token(),
908            Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full
909        ) {
910            let join_type = self.parse_join_type()?;
911            let right = self.parse_table_reference()?;
912
913            let condition = if self.current_token() == &Token::On {
914                self.advance();
915                Some(self.parse_expression()?)
916            } else {
917                None
918            };
919
920            from = FromClause::Join {
921                left: Box::new(from),
922                right: Box::new(right),
923                join_type,
924                condition,
925            };
926        }
927
928        Ok(from)
929    }
930
931    /// Parse table reference
932    fn parse_table_reference(&mut self) -> Result<FromClause> {
933        if let Token::Identifier(name) = self.current_token().clone() {
934            self.advance();
935
936            let alias = if self.current_token() == &Token::As {
937                self.advance();
938                if let Token::Identifier(alias) = self.current_token().clone() {
939                    self.advance();
940                    Some(alias)
941                } else {
942                    None
943                }
944            } else if let Token::Identifier(alias) = self.current_token().clone() {
945                // Check if this is an alias or a keyword
946                if !matches!(
947                    alias.to_uppercase().as_str(),
948                    "WHERE"
949                        | "GROUP"
950                        | "ORDER"
951                        | "HAVING"
952                        | "LIMIT"
953                        | "JOIN"
954                        | "INNER"
955                        | "LEFT"
956                        | "RIGHT"
957                        | "FULL"
958                        | "ON"
959                        | "WINDOW"
960                ) {
961                    self.advance();
962                    Some(alias)
963                } else {
964                    None
965                }
966            } else {
967                None
968            };
969
970            Ok(FromClause::Table { name, alias })
971        } else {
972            Err(anyhow!("Expected table name"))
973        }
974    }
975
976    /// Parse join type
977    fn parse_join_type(&mut self) -> Result<JoinType> {
978        let join_type = match self.current_token() {
979            Token::Inner => {
980                self.advance();
981                JoinType::Inner
982            }
983            Token::Left => {
984                self.advance();
985                if self.current_token() == &Token::Outer {
986                    self.advance();
987                }
988                JoinType::Left
989            }
990            Token::Right => {
991                self.advance();
992                if self.current_token() == &Token::Outer {
993                    self.advance();
994                }
995                JoinType::Right
996            }
997            Token::Full => {
998                self.advance();
999                if self.current_token() == &Token::Outer {
1000                    self.advance();
1001                }
1002                JoinType::Full
1003            }
1004            _ => JoinType::Inner,
1005        };
1006
1007        // Expect JOIN keyword
1008        if self.current_token() == &Token::Join {
1009            self.advance();
1010        }
1011
1012        Ok(join_type)
1013    }
1014
1015    /// Parse window specification
1016    fn parse_window_spec(&mut self) -> Result<WindowSpec> {
1017        let window_type = match self.current_token() {
1018            Token::Tumbling => {
1019                self.advance();
1020                WindowType::Tumbling
1021            }
1022            Token::Sliding => {
1023                self.advance();
1024                WindowType::Sliding
1025            }
1026            Token::Session => {
1027                self.advance();
1028                WindowType::Session
1029            }
1030            _ => WindowType::Tumbling,
1031        };
1032
1033        self.expect(Token::OpenParen)?;
1034
1035        let mut size = Duration::from_secs(60);
1036        let mut slide = None;
1037        let mut gap = None;
1038
1039        // Parse window parameters
1040        while self.current_token() != &Token::CloseParen {
1041            match self.current_token() {
1042                Token::Size => {
1043                    self.advance();
1044                    size = self.parse_duration()?;
1045                }
1046                Token::Slide => {
1047                    self.advance();
1048                    slide = Some(self.parse_duration()?);
1049                }
1050                Token::Gap => {
1051                    self.advance();
1052                    gap = Some(self.parse_duration()?);
1053                }
1054                Token::Comma => {
1055                    self.advance();
1056                }
1057                _ => {
1058                    self.advance();
1059                }
1060            }
1061        }
1062
1063        self.expect(Token::CloseParen)?;
1064
1065        Ok(WindowSpec {
1066            window_type,
1067            size,
1068            slide,
1069            gap,
1070            time_attribute: None,
1071        })
1072    }
1073
1074    /// Parse duration (e.g., "5 MINUTES")
1075    fn parse_duration(&mut self) -> Result<Duration> {
1076        let value = if let Token::NumberLiteral(n) = self.current_token() {
1077            let v = *n as u64;
1078            self.advance();
1079            v
1080        } else {
1081            return Err(anyhow!("Expected number for duration"));
1082        };
1083
1084        let unit = if let Token::Identifier(unit) = self.current_token().clone() {
1085            self.advance();
1086            unit.to_uppercase()
1087        } else {
1088            "SECONDS".to_string()
1089        };
1090
1091        let duration = match unit.as_str() {
1092            "MILLISECONDS" | "MILLIS" | "MS" => Duration::from_millis(value),
1093            "SECONDS" | "SECOND" | "S" => Duration::from_secs(value),
1094            "MINUTES" | "MINUTE" | "M" => Duration::from_secs(value * 60),
1095            "HOURS" | "HOUR" | "H" => Duration::from_secs(value * 3600),
1096            "DAYS" | "DAY" | "D" => Duration::from_secs(value * 86400),
1097            _ => Duration::from_secs(value),
1098        };
1099
1100        Ok(duration)
1101    }
1102
1103    /// Parse expression
1104    fn parse_expression(&mut self) -> Result<Expression> {
1105        self.parse_or_expression()
1106    }
1107
1108    /// Parse OR expression
1109    fn parse_or_expression(&mut self) -> Result<Expression> {
1110        let mut left = self.parse_and_expression()?;
1111
1112        while self.current_token() == &Token::Or {
1113            self.advance();
1114            let right = self.parse_and_expression()?;
1115            left = Expression::BinaryOp {
1116                left: Box::new(left),
1117                op: BinaryOperator::Or,
1118                right: Box::new(right),
1119            };
1120        }
1121
1122        Ok(left)
1123    }
1124
1125    /// Parse AND expression
1126    fn parse_and_expression(&mut self) -> Result<Expression> {
1127        let mut left = self.parse_comparison_expression()?;
1128
1129        while self.current_token() == &Token::And {
1130            self.advance();
1131            let right = self.parse_comparison_expression()?;
1132            left = Expression::BinaryOp {
1133                left: Box::new(left),
1134                op: BinaryOperator::And,
1135                right: Box::new(right),
1136            };
1137        }
1138
1139        Ok(left)
1140    }
1141
1142    /// Parse comparison expression
1143    fn parse_comparison_expression(&mut self) -> Result<Expression> {
1144        let left = self.parse_additive_expression()?;
1145
1146        let op = match self.current_token() {
1147            Token::Equal => Some(BinaryOperator::Equal),
1148            Token::NotEqual => Some(BinaryOperator::NotEqual),
1149            Token::LessThan => Some(BinaryOperator::LessThan),
1150            Token::LessThanOrEqual => Some(BinaryOperator::LessThanOrEqual),
1151            Token::GreaterThan => Some(BinaryOperator::GreaterThan),
1152            Token::GreaterThanOrEqual => Some(BinaryOperator::GreaterThanOrEqual),
1153            Token::Like => Some(BinaryOperator::Like),
1154            _ => None,
1155        };
1156
1157        if let Some(op) = op {
1158            self.advance();
1159            let right = self.parse_additive_expression()?;
1160            Ok(Expression::BinaryOp {
1161                left: Box::new(left),
1162                op,
1163                right: Box::new(right),
1164            })
1165        } else {
1166            Ok(left)
1167        }
1168    }
1169
1170    /// Parse additive expression
1171    fn parse_additive_expression(&mut self) -> Result<Expression> {
1172        let mut left = self.parse_multiplicative_expression()?;
1173
1174        loop {
1175            let op = match self.current_token() {
1176                Token::Plus => Some(BinaryOperator::Plus),
1177                Token::Minus => Some(BinaryOperator::Minus),
1178                _ => None,
1179            };
1180
1181            if let Some(op) = op {
1182                self.advance();
1183                let right = self.parse_multiplicative_expression()?;
1184                left = Expression::BinaryOp {
1185                    left: Box::new(left),
1186                    op,
1187                    right: Box::new(right),
1188                };
1189            } else {
1190                break;
1191            }
1192        }
1193
1194        Ok(left)
1195    }
1196
1197    /// Parse multiplicative expression
1198    fn parse_multiplicative_expression(&mut self) -> Result<Expression> {
1199        let mut left = self.parse_unary_expression()?;
1200
1201        loop {
1202            let op = match self.current_token() {
1203                Token::Multiply | Token::Star => Some(BinaryOperator::Multiply),
1204                Token::Divide => Some(BinaryOperator::Divide),
1205                Token::Modulo => Some(BinaryOperator::Modulo),
1206                _ => None,
1207            };
1208
1209            if let Some(op) = op {
1210                self.advance();
1211                let right = self.parse_unary_expression()?;
1212                left = Expression::BinaryOp {
1213                    left: Box::new(left),
1214                    op,
1215                    right: Box::new(right),
1216                };
1217            } else {
1218                break;
1219            }
1220        }
1221
1222        Ok(left)
1223    }
1224
1225    /// Parse unary expression
1226    fn parse_unary_expression(&mut self) -> Result<Expression> {
1227        match self.current_token() {
1228            Token::Not => {
1229                self.advance();
1230                let expr = self.parse_unary_expression()?;
1231                Ok(Expression::UnaryOp {
1232                    op: UnaryOperator::Not,
1233                    expr: Box::new(expr),
1234                })
1235            }
1236            Token::Minus => {
1237                self.advance();
1238                let expr = self.parse_unary_expression()?;
1239                Ok(Expression::UnaryOp {
1240                    op: UnaryOperator::Minus,
1241                    expr: Box::new(expr),
1242                })
1243            }
1244            _ => self.parse_primary_expression(),
1245        }
1246    }
1247
1248    /// Parse primary expression
1249    fn parse_primary_expression(&mut self) -> Result<Expression> {
1250        match self.current_token().clone() {
1251            Token::Star => {
1252                self.advance();
1253                Ok(Expression::Star)
1254            }
1255            Token::NumberLiteral(n) => {
1256                self.advance();
1257                if n.fract() == 0.0 {
1258                    Ok(Expression::Literal(SqlValue::Integer(n as i64)))
1259                } else {
1260                    Ok(Expression::Literal(SqlValue::Float(n)))
1261                }
1262            }
1263            Token::StringLiteral(s) => {
1264                self.advance();
1265                Ok(Expression::Literal(SqlValue::String(s)))
1266            }
1267            Token::BooleanLiteral(b) => {
1268                self.advance();
1269                Ok(Expression::Literal(SqlValue::Boolean(b)))
1270            }
1271            Token::Null => {
1272                self.advance();
1273                Ok(Expression::Literal(SqlValue::Null))
1274            }
1275            Token::Count
1276            | Token::Sum
1277            | Token::Avg
1278            | Token::Min
1279            | Token::Max
1280            | Token::StdDev
1281            | Token::Variance => {
1282                let func = match self.current_token() {
1283                    Token::Count => AggregateFunction::Count,
1284                    Token::Sum => AggregateFunction::Sum,
1285                    Token::Avg => AggregateFunction::Avg,
1286                    Token::Min => AggregateFunction::Min,
1287                    Token::Max => AggregateFunction::Max,
1288                    Token::StdDev => AggregateFunction::StdDev,
1289                    Token::Variance => AggregateFunction::Variance,
1290                    _ => unreachable!(),
1291                };
1292                self.advance();
1293                self.expect(Token::OpenParen)?;
1294
1295                let distinct = if self.current_token() == &Token::Distinct {
1296                    self.advance();
1297                    true
1298                } else {
1299                    false
1300                };
1301
1302                let expr = self.parse_expression()?;
1303                self.expect(Token::CloseParen)?;
1304
1305                Ok(Expression::Aggregate {
1306                    func,
1307                    expr: Box::new(expr),
1308                    distinct,
1309                })
1310            }
1311            Token::Identifier(name) => {
1312                self.advance();
1313
1314                // Check for function call
1315                if self.current_token() == &Token::OpenParen {
1316                    self.advance();
1317                    let mut args = Vec::new();
1318
1319                    if self.current_token() != &Token::CloseParen {
1320                        loop {
1321                            args.push(self.parse_expression()?);
1322                            if self.current_token() != &Token::Comma {
1323                                break;
1324                            }
1325                            self.advance();
1326                        }
1327                    }
1328
1329                    self.expect(Token::CloseParen)?;
1330
1331                    Ok(Expression::Function {
1332                        name,
1333                        args,
1334                        distinct: false,
1335                    })
1336                } else if self.current_token() == &Token::Dot {
1337                    // Qualified column name
1338                    self.advance();
1339                    if let Token::Identifier(column) = self.current_token().clone() {
1340                        self.advance();
1341                        Ok(Expression::QualifiedColumn(name, column))
1342                    } else {
1343                        Ok(Expression::Column(name))
1344                    }
1345                } else {
1346                    Ok(Expression::Column(name))
1347                }
1348            }
1349            Token::OpenParen => {
1350                self.advance();
1351                let expr = self.parse_expression()?;
1352                self.expect(Token::CloseParen)?;
1353                Ok(expr)
1354            }
1355            _ => Err(anyhow!("Unexpected token: {:?}", self.current_token())),
1356        }
1357    }
1358
1359    /// Parse expression list
1360    fn parse_expression_list(&mut self) -> Result<Vec<Expression>> {
1361        let mut exprs = Vec::new();
1362
1363        loop {
1364            exprs.push(self.parse_expression()?);
1365            if self.current_token() != &Token::Comma {
1366                break;
1367            }
1368            self.advance();
1369        }
1370
1371        Ok(exprs)
1372    }
1373
1374    /// Parse ORDER BY list
1375    fn parse_order_by_list(&mut self) -> Result<Vec<OrderByItem>> {
1376        let mut items = Vec::new();
1377
1378        loop {
1379            let expr = self.parse_expression()?;
1380
1381            let ascending = if let Token::Identifier(dir) = self.current_token().clone() {
1382                match dir.to_uppercase().as_str() {
1383                    "ASC" => {
1384                        self.advance();
1385                        true
1386                    }
1387                    "DESC" => {
1388                        self.advance();
1389                        false
1390                    }
1391                    _ => true,
1392                }
1393            } else {
1394                true
1395            };
1396
1397            items.push(OrderByItem {
1398                expr,
1399                ascending,
1400                nulls_first: None,
1401            });
1402
1403            if self.current_token() != &Token::Comma {
1404                break;
1405            }
1406            self.advance();
1407        }
1408
1409        Ok(items)
1410    }
1411}
1412
1413/// Query result row
1414#[derive(Debug, Clone, Serialize, Deserialize)]
1415pub struct ResultRow {
1416    /// Column values
1417    pub values: Vec<SqlValue>,
1418}
1419
1420/// Query result
1421#[derive(Debug, Clone, Serialize, Deserialize)]
1422pub struct QueryResult {
1423    /// Column names
1424    pub columns: Vec<String>,
1425    /// Result rows
1426    pub rows: Vec<ResultRow>,
1427    /// Execution time
1428    pub execution_time: Duration,
1429    /// Rows affected
1430    pub rows_affected: usize,
1431}
1432
1433/// Stream SQL engine
1434pub struct StreamSqlEngine {
1435    /// Configuration
1436    config: StreamSqlConfig,
1437    /// Registered streams
1438    streams: Arc<RwLock<HashMap<String, StreamMetadata>>>,
1439    /// Query cache
1440    query_cache: Arc<RwLock<HashMap<String, SelectStatement>>>,
1441    /// Statistics
1442    stats: Arc<RwLock<StreamSqlStats>>,
1443}
1444
1445/// Stream metadata
1446#[derive(Debug, Clone, Serialize, Deserialize)]
1447pub struct StreamMetadata {
1448    /// Stream name
1449    pub name: String,
1450    /// Column definitions
1451    pub columns: Vec<ColumnDefinition>,
1452    /// Stream properties
1453    pub properties: HashMap<String, String>,
1454    /// Created at
1455    pub created_at: DateTime<Utc>,
1456}
1457
1458/// Stream SQL statistics
1459#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1460pub struct StreamSqlStats {
1461    /// Total queries executed
1462    pub queries_executed: u64,
1463    /// Queries succeeded
1464    pub queries_succeeded: u64,
1465    /// Queries failed
1466    pub queries_failed: u64,
1467    /// Average execution time
1468    pub avg_execution_time_ms: f64,
1469    /// Cache hits
1470    pub cache_hits: u64,
1471    /// Cache misses
1472    pub cache_misses: u64,
1473}
1474
1475impl StreamSqlEngine {
1476    /// Create a new Stream SQL engine
1477    pub fn new(config: StreamSqlConfig) -> Self {
1478        Self {
1479            config,
1480            streams: Arc::new(RwLock::new(HashMap::new())),
1481            query_cache: Arc::new(RwLock::new(HashMap::new())),
1482            stats: Arc::new(RwLock::new(StreamSqlStats::default())),
1483        }
1484    }
1485
1486    /// Parse a SQL query
1487    pub fn parse(&self, sql: &str) -> Result<SelectStatement> {
1488        let mut lexer = Lexer::new(sql);
1489        let tokens = lexer.tokenize();
1490        let mut parser = Parser::new(tokens);
1491        parser.parse_select()
1492    }
1493
1494    /// Execute a SQL query
1495    pub async fn execute(&self, sql: &str) -> Result<QueryResult> {
1496        let start_time = std::time::Instant::now();
1497
1498        // Check cache
1499        if self.config.enable_query_cache {
1500            let cache = self.query_cache.read().await;
1501            if cache.contains_key(sql) {
1502                let mut stats = self.stats.write().await;
1503                stats.cache_hits += 1;
1504                debug!("Query cache hit");
1505            } else {
1506                let mut stats = self.stats.write().await;
1507                stats.cache_misses += 1;
1508            }
1509        }
1510
1511        // Parse query
1512        let statement = self.parse(sql)?;
1513
1514        // Update cache
1515        if self.config.enable_query_cache {
1516            let mut cache = self.query_cache.write().await;
1517            if cache.len() < self.config.cache_size {
1518                cache.insert(sql.to_string(), statement.clone());
1519            }
1520        }
1521
1522        // Execute query (placeholder - actual execution would process the AST)
1523        let result = QueryResult {
1524            columns: statement
1525                .columns
1526                .iter()
1527                .map(|c| c.alias.clone().unwrap_or_else(|| format!("column_{}", 0)))
1528                .collect(),
1529            rows: Vec::new(),
1530            execution_time: start_time.elapsed(),
1531            rows_affected: 0,
1532        };
1533
1534        // Update statistics
1535        let mut stats = self.stats.write().await;
1536        stats.queries_executed += 1;
1537        stats.queries_succeeded += 1;
1538        stats.avg_execution_time_ms = (stats.avg_execution_time_ms
1539            * (stats.queries_executed - 1) as f64
1540            + result.execution_time.as_millis() as f64)
1541            / stats.queries_executed as f64;
1542
1543        if self.config.enable_query_logging {
1544            info!(
1545                "Executed query in {:?}: {}",
1546                result.execution_time,
1547                &sql[..sql.len().min(100)]
1548            );
1549        }
1550
1551        Ok(result)
1552    }
1553
1554    /// Register a stream
1555    pub async fn register_stream(&self, metadata: StreamMetadata) -> Result<()> {
1556        let mut streams = self.streams.write().await;
1557        info!("Registering stream: {}", metadata.name);
1558        streams.insert(metadata.name.clone(), metadata);
1559        Ok(())
1560    }
1561
1562    /// Unregister a stream
1563    pub async fn unregister_stream(&self, name: &str) -> Result<()> {
1564        let mut streams = self.streams.write().await;
1565        if streams.remove(name).is_some() {
1566            info!("Unregistered stream: {}", name);
1567            Ok(())
1568        } else {
1569            Err(anyhow!("Stream not found: {}", name))
1570        }
1571    }
1572
1573    /// Get stream metadata
1574    pub async fn get_stream(&self, name: &str) -> Option<StreamMetadata> {
1575        let streams = self.streams.read().await;
1576        streams.get(name).cloned()
1577    }
1578
1579    /// List all streams
1580    pub async fn list_streams(&self) -> Vec<String> {
1581        let streams = self.streams.read().await;
1582        streams.keys().cloned().collect()
1583    }
1584
1585    /// Get statistics
1586    pub async fn get_stats(&self) -> StreamSqlStats {
1587        self.stats.read().await.clone()
1588    }
1589
1590    /// Clear query cache
1591    pub async fn clear_cache(&self) {
1592        let mut cache = self.query_cache.write().await;
1593        cache.clear();
1594        info!("Query cache cleared");
1595    }
1596
1597    /// Validate a query without executing
1598    pub fn validate(&self, sql: &str) -> Result<()> {
1599        self.parse(sql)?;
1600        Ok(())
1601    }
1602
1603    /// Explain a query
1604    pub fn explain(&self, sql: &str) -> Result<String> {
1605        let statement = self.parse(sql)?;
1606        Ok(format!("{:#?}", statement))
1607    }
1608}
1609
1610#[cfg(test)]
1611mod tests {
1612    use super::*;
1613
1614    #[test]
1615    fn test_lexer_basic() {
1616        let mut lexer = Lexer::new("SELECT * FROM events");
1617        let tokens = lexer.tokenize();
1618
1619        assert_eq!(tokens.len(), 5);
1620        assert_eq!(tokens[0], Token::Select);
1621        assert_eq!(tokens[1], Token::Star);
1622        assert_eq!(tokens[2], Token::From);
1623        assert_eq!(tokens[3], Token::Identifier("events".to_string()));
1624        assert_eq!(tokens[4], Token::Eof);
1625    }
1626
1627    #[test]
1628    fn test_lexer_with_literals() {
1629        let mut lexer = Lexer::new("SELECT name, 42, 'hello' FROM events");
1630        let tokens = lexer.tokenize();
1631
1632        assert!(matches!(tokens[1], Token::Identifier(_)));
1633        assert!(matches!(tokens[3], Token::NumberLiteral(_)));
1634        assert!(matches!(tokens[5], Token::StringLiteral(_)));
1635    }
1636
1637    #[test]
1638    fn test_parser_simple_select() {
1639        let mut lexer = Lexer::new("SELECT id, name FROM users WHERE id = 1");
1640        let tokens = lexer.tokenize();
1641        let mut parser = Parser::new(tokens);
1642        let result = parser.parse_select();
1643
1644        assert!(result.is_ok());
1645        let stmt = result.unwrap();
1646        assert_eq!(stmt.columns.len(), 2);
1647        assert!(stmt.where_clause.is_some());
1648    }
1649
1650    #[test]
1651    fn test_parser_aggregate() {
1652        let mut lexer = Lexer::new("SELECT COUNT(*), AVG(value) FROM events");
1653        let tokens = lexer.tokenize();
1654        let mut parser = Parser::new(tokens);
1655        let result = parser.parse_select();
1656
1657        assert!(result.is_ok());
1658        let stmt = result.unwrap();
1659        assert_eq!(stmt.columns.len(), 2);
1660    }
1661
1662    #[test]
1663    fn test_parser_window() {
1664        let mut lexer = Lexer::new("SELECT * FROM events WINDOW TUMBLING (SIZE 5 MINUTES)");
1665        let tokens = lexer.tokenize();
1666        let mut parser = Parser::new(tokens);
1667        let result = parser.parse_select();
1668
1669        assert!(result.is_ok());
1670        let stmt = result.unwrap();
1671        assert!(stmt.window.is_some());
1672        let window = stmt.window.unwrap();
1673        assert_eq!(window.window_type, WindowType::Tumbling);
1674        assert_eq!(window.size, Duration::from_secs(300));
1675    }
1676
1677    #[test]
1678    fn test_parser_group_by() {
1679        let mut lexer = Lexer::new("SELECT sensor_id, AVG(temp) FROM sensors GROUP BY sensor_id");
1680        let tokens = lexer.tokenize();
1681        let mut parser = Parser::new(tokens);
1682        let result = parser.parse_select();
1683
1684        assert!(result.is_ok());
1685        let stmt = result.unwrap();
1686        assert!(!stmt.group_by.is_empty());
1687    }
1688
1689    #[test]
1690    fn test_parser_join() {
1691        let mut lexer = Lexer::new("SELECT * FROM a JOIN b ON a.id = b.aid");
1692        let tokens = lexer.tokenize();
1693        let mut parser = Parser::new(tokens);
1694        let result = parser.parse_select();
1695
1696        assert!(result.is_ok());
1697        let stmt = result.unwrap();
1698        assert!(matches!(stmt.from, Some(FromClause::Join { .. })));
1699    }
1700
1701    #[tokio::test]
1702    async fn test_engine_basic() {
1703        let config = StreamSqlConfig::default();
1704        let engine = StreamSqlEngine::new(config);
1705
1706        let result = engine.execute("SELECT * FROM events").await;
1707        assert!(result.is_ok());
1708
1709        let stats = engine.get_stats().await;
1710        assert_eq!(stats.queries_executed, 1);
1711        assert_eq!(stats.queries_succeeded, 1);
1712    }
1713
1714    #[tokio::test]
1715    async fn test_engine_stream_registration() {
1716        let config = StreamSqlConfig::default();
1717        let engine = StreamSqlEngine::new(config);
1718
1719        let metadata = StreamMetadata {
1720            name: "events".to_string(),
1721            columns: vec![
1722                ColumnDefinition {
1723                    name: "id".to_string(),
1724                    data_type: DataType::Integer,
1725                    not_null: true,
1726                    default: None,
1727                },
1728                ColumnDefinition {
1729                    name: "value".to_string(),
1730                    data_type: DataType::Float,
1731                    not_null: false,
1732                    default: None,
1733                },
1734            ],
1735            properties: HashMap::new(),
1736            created_at: Utc::now(),
1737        };
1738
1739        engine.register_stream(metadata).await.unwrap();
1740
1741        let streams = engine.list_streams().await;
1742        assert_eq!(streams.len(), 1);
1743        assert!(streams.contains(&"events".to_string()));
1744
1745        let stream = engine.get_stream("events").await;
1746        assert!(stream.is_some());
1747        assert_eq!(stream.unwrap().columns.len(), 2);
1748
1749        engine.unregister_stream("events").await.unwrap();
1750        let streams = engine.list_streams().await;
1751        assert!(streams.is_empty());
1752    }
1753
1754    #[test]
1755    fn test_engine_validate() {
1756        let config = StreamSqlConfig::default();
1757        let engine = StreamSqlEngine::new(config);
1758
1759        assert!(engine.validate("SELECT * FROM events").is_ok());
1760        assert!(engine.validate("INVALID SQL").is_err());
1761    }
1762
1763    #[test]
1764    fn test_engine_explain() {
1765        let config = StreamSqlConfig::default();
1766        let engine = StreamSqlEngine::new(config);
1767
1768        let result = engine.explain("SELECT COUNT(*) FROM events WHERE value > 10");
1769        assert!(result.is_ok());
1770        let explanation = result.unwrap();
1771        assert!(!explanation.is_empty());
1772    }
1773
1774    #[tokio::test]
1775    async fn test_engine_caching() {
1776        let config = StreamSqlConfig {
1777            enable_query_cache: true,
1778            cache_size: 100,
1779            ..Default::default()
1780        };
1781        let engine = StreamSqlEngine::new(config);
1782
1783        // First execution - cache miss
1784        engine.execute("SELECT * FROM events").await.unwrap();
1785
1786        // Second execution - cache hit
1787        engine.execute("SELECT * FROM events").await.unwrap();
1788
1789        let stats = engine.get_stats().await;
1790        assert_eq!(stats.cache_misses, 1);
1791        assert_eq!(stats.cache_hits, 1);
1792    }
1793
1794    #[test]
1795    fn test_parser_complex_expression() {
1796        let mut lexer = Lexer::new(
1797            "SELECT * FROM events WHERE (value > 10 AND status = 'active') OR priority = 1",
1798        );
1799        let tokens = lexer.tokenize();
1800        let mut parser = Parser::new(tokens);
1801        let result = parser.parse_select();
1802
1803        assert!(result.is_ok());
1804        let stmt = result.unwrap();
1805        assert!(stmt.where_clause.is_some());
1806    }
1807
1808    #[test]
1809    fn test_parser_order_by() {
1810        let mut lexer = Lexer::new("SELECT * FROM events ORDER BY created_at DESC, id ASC");
1811        let tokens = lexer.tokenize();
1812        let mut parser = Parser::new(tokens);
1813        let result = parser.parse_select();
1814
1815        assert!(result.is_ok(), "Parse failed: {:?}", result);
1816        let stmt = result.unwrap();
1817        assert_eq!(stmt.order_by.len(), 2);
1818        assert!(!stmt.order_by[0].ascending);
1819        assert!(stmt.order_by[1].ascending);
1820    }
1821
1822    #[test]
1823    fn test_parser_distinct() {
1824        let mut lexer = Lexer::new("SELECT DISTINCT sensor_id FROM readings");
1825        let tokens = lexer.tokenize();
1826        let mut parser = Parser::new(tokens);
1827        let result = parser.parse_select();
1828
1829        assert!(result.is_ok());
1830        let stmt = result.unwrap();
1831        assert!(stmt.distinct);
1832    }
1833
1834    #[test]
1835    fn test_parser_sliding_window() {
1836        let mut lexer =
1837            Lexer::new("SELECT * FROM events WINDOW SLIDING (SIZE 10 SECONDS, SLIDE 5 SECONDS)");
1838        let tokens = lexer.tokenize();
1839        let mut parser = Parser::new(tokens);
1840        let result = parser.parse_select();
1841
1842        assert!(result.is_ok());
1843        let stmt = result.unwrap();
1844        assert!(stmt.window.is_some());
1845        let window = stmt.window.unwrap();
1846        assert_eq!(window.window_type, WindowType::Sliding);
1847        assert_eq!(window.size, Duration::from_secs(10));
1848        assert_eq!(window.slide, Some(Duration::from_secs(5)));
1849    }
1850}