oxirs_stream/
stream_sql.rs

1//! # Stream SQL - SQL-like Query Language for Streams
2//!
3//! This module provides a SQL-like query language specifically designed
4//! for stream processing, enabling developers to write familiar SQL queries
5//! for real-time data processing.
6//!
7//! ## Features
8//! - SQL parsing and execution for streams
9//! - Window functions (TUMBLING, SLIDING, SESSION)
10//! - Aggregate functions (COUNT, SUM, AVG, MIN, MAX, STDDEV)
11//! - Stream joins with temporal semantics
12//! - Pattern matching in streams
13//! - Query optimization
14//!
15//! ## Example
16//! ```sql
17//! SELECT sensor_id, AVG(temperature) as avg_temp
18//! FROM sensor_stream
19//! WINDOW TUMBLING (SIZE 5 MINUTES)
20//! GROUP BY sensor_id
21//! HAVING AVG(temperature) > 30
22//! ```
23
24use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::RwLock;
31use tracing::{debug, info};
32
33/// Configuration for Stream SQL engine
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct StreamSqlConfig {
36    /// Maximum query execution time
37    pub max_execution_time: Duration,
38    /// Enable query optimization
39    pub enable_optimization: bool,
40    /// Maximum memory for query execution
41    pub max_memory_bytes: usize,
42    /// Enable parallel execution
43    pub parallel_execution: bool,
44    /// Number of worker threads
45    pub worker_threads: usize,
46    /// Enable query caching
47    pub enable_query_cache: bool,
48    /// Query cache size
49    pub cache_size: usize,
50    /// Enable query logging
51    pub enable_query_logging: bool,
52    /// Default window size
53    pub default_window_size: Duration,
54}
55
56impl Default for StreamSqlConfig {
57    fn default() -> Self {
58        Self {
59            max_execution_time: Duration::from_secs(60),
60            enable_optimization: true,
61            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
62            parallel_execution: true,
63            worker_threads: 4,
64            enable_query_cache: true,
65            cache_size: 1000,
66            enable_query_logging: true,
67            default_window_size: Duration::from_secs(60),
68        }
69    }
70}
71
72/// SQL query types
73#[derive(Debug, Clone, PartialEq)]
74pub enum QueryType {
75    /// SELECT query
76    Select,
77    /// CREATE STREAM
78    CreateStream,
79    /// DROP STREAM
80    DropStream,
81    /// INSERT INTO
82    Insert,
83    /// CREATE VIEW
84    CreateView,
85    /// DESCRIBE
86    Describe,
87    /// EXPLAIN
88    Explain,
89}
90
91/// SQL token types for lexer
92#[derive(Debug, Clone, PartialEq)]
93pub enum Token {
94    // Keywords
95    Select,
96    From,
97    Where,
98    Group,
99    By,
100    Having,
101    Order,
102    Limit,
103    Window,
104    Tumbling,
105    Sliding,
106    Session,
107    Size,
108    Slide,
109    Gap,
110    Create,
111    Stream,
112    View,
113    Drop,
114    Insert,
115    Into,
116    Values,
117    As,
118    And,
119    Or,
120    Not,
121    In,
122    Like,
123    Between,
124    Is,
125    Null,
126    Join,
127    Inner,
128    Left,
129    Right,
130    Full,
131    Outer,
132    On,
133    Describe,
134    Explain,
135    Distinct,
136    Case,
137    When,
138    Then,
139    Else,
140    End,
141
142    // Data types
143    Int,
144    Float,
145    String,
146    Boolean,
147    Timestamp,
148
149    // Operators
150    Plus,
151    Minus,
152    Multiply,
153    Divide,
154    Modulo,
155    Equal,
156    NotEqual,
157    LessThan,
158    LessThanOrEqual,
159    GreaterThan,
160    GreaterThanOrEqual,
161
162    // Punctuation
163    Comma,
164    Dot,
165    Semicolon,
166    OpenParen,
167    CloseParen,
168    OpenBracket,
169    CloseBracket,
170
171    // Literals
172    Identifier(String),
173    StringLiteral(String),
174    NumberLiteral(f64),
175    BooleanLiteral(bool),
176
177    // Aggregate functions
178    Count,
179    Sum,
180    Avg,
181    Min,
182    Max,
183    StdDev,
184    Variance,
185
186    // Special
187    Star,
188    Eof,
189}
190
191/// Lexer for SQL tokenization
192pub struct Lexer {
193    input: Vec<char>,
194    position: usize,
195    current_char: Option<char>,
196}
197
198impl Lexer {
199    /// Create a new lexer
200    pub fn new(input: &str) -> Self {
201        let chars: Vec<char> = input.chars().collect();
202        let current_char = chars.first().copied();
203        Self {
204            input: chars,
205            position: 0,
206            current_char,
207        }
208    }
209
210    /// Advance to next character
211    fn advance(&mut self) {
212        self.position += 1;
213        self.current_char = self.input.get(self.position).copied();
214    }
215
216    /// Peek at next character
217    fn peek(&self) -> Option<char> {
218        self.input.get(self.position + 1).copied()
219    }
220
221    /// Skip whitespace
222    fn skip_whitespace(&mut self) {
223        while let Some(c) = self.current_char {
224            if c.is_whitespace() {
225                self.advance();
226            } else {
227                break;
228            }
229        }
230    }
231
232    /// Read identifier or keyword
233    fn read_identifier(&mut self) -> String {
234        let mut result = String::new();
235        while let Some(c) = self.current_char {
236            if c.is_alphanumeric() || c == '_' {
237                result.push(c);
238                self.advance();
239            } else {
240                break;
241            }
242        }
243        result
244    }
245
246    /// Read number literal
247    fn read_number(&mut self) -> f64 {
248        let mut result = String::new();
249        while let Some(c) = self.current_char {
250            if c.is_ascii_digit() || c == '.' {
251                result.push(c);
252                self.advance();
253            } else {
254                break;
255            }
256        }
257        result.parse().unwrap_or(0.0)
258    }
259
260    /// Read string literal
261    fn read_string(&mut self) -> String {
262        let quote = self.current_char.unwrap();
263        self.advance(); // Skip opening quote
264        let mut result = String::new();
265        while let Some(c) = self.current_char {
266            if c == quote {
267                self.advance(); // Skip closing quote
268                break;
269            } else if c == '\\' {
270                self.advance();
271                if let Some(escaped) = self.current_char {
272                    result.push(escaped);
273                    self.advance();
274                }
275            } else {
276                result.push(c);
277                self.advance();
278            }
279        }
280        result
281    }
282
283    /// Get next token
284    pub fn next_token(&mut self) -> Token {
285        self.skip_whitespace();
286
287        match self.current_char {
288            None => Token::Eof,
289            Some(c) => {
290                if c.is_alphabetic() || c == '_' {
291                    let ident = self.read_identifier();
292                    match ident.to_uppercase().as_str() {
293                        "SELECT" => Token::Select,
294                        "FROM" => Token::From,
295                        "WHERE" => Token::Where,
296                        "GROUP" => Token::Group,
297                        "BY" => Token::By,
298                        "HAVING" => Token::Having,
299                        "ORDER" => Token::Order,
300                        "LIMIT" => Token::Limit,
301                        "WINDOW" => Token::Window,
302                        "TUMBLING" => Token::Tumbling,
303                        "SLIDING" => Token::Sliding,
304                        "SESSION" => Token::Session,
305                        "SIZE" => Token::Size,
306                        "SLIDE" => Token::Slide,
307                        "GAP" => Token::Gap,
308                        "CREATE" => Token::Create,
309                        "STREAM" => Token::Stream,
310                        "VIEW" => Token::View,
311                        "DROP" => Token::Drop,
312                        "INSERT" => Token::Insert,
313                        "INTO" => Token::Into,
314                        "VALUES" => Token::Values,
315                        "AS" => Token::As,
316                        "AND" => Token::And,
317                        "OR" => Token::Or,
318                        "NOT" => Token::Not,
319                        "IN" => Token::In,
320                        "LIKE" => Token::Like,
321                        "BETWEEN" => Token::Between,
322                        "IS" => Token::Is,
323                        "NULL" => Token::Null,
324                        "JOIN" => Token::Join,
325                        "INNER" => Token::Inner,
326                        "LEFT" => Token::Left,
327                        "RIGHT" => Token::Right,
328                        "FULL" => Token::Full,
329                        "OUTER" => Token::Outer,
330                        "ON" => Token::On,
331                        "DESCRIBE" => Token::Describe,
332                        "EXPLAIN" => Token::Explain,
333                        "DISTINCT" => Token::Distinct,
334                        "CASE" => Token::Case,
335                        "WHEN" => Token::When,
336                        "THEN" => Token::Then,
337                        "ELSE" => Token::Else,
338                        "END" => Token::End,
339                        "INT" | "INTEGER" => Token::Int,
340                        "FLOAT" | "DOUBLE" => Token::Float,
341                        "STRING" | "VARCHAR" | "TEXT" => Token::String,
342                        "BOOLEAN" | "BOOL" => Token::Boolean,
343                        "TIMESTAMP" | "DATETIME" => Token::Timestamp,
344                        "COUNT" => Token::Count,
345                        "SUM" => Token::Sum,
346                        "AVG" => Token::Avg,
347                        "MIN" => Token::Min,
348                        "MAX" => Token::Max,
349                        "STDDEV" => Token::StdDev,
350                        "VARIANCE" | "VAR" => Token::Variance,
351                        "TRUE" => Token::BooleanLiteral(true),
352                        "FALSE" => Token::BooleanLiteral(false),
353                        _ => Token::Identifier(ident),
354                    }
355                } else if c.is_ascii_digit() {
356                    Token::NumberLiteral(self.read_number())
357                } else if c == '\'' || c == '"' {
358                    Token::StringLiteral(self.read_string())
359                } else {
360                    match c {
361                        '+' => {
362                            self.advance();
363                            Token::Plus
364                        }
365                        '-' => {
366                            self.advance();
367                            Token::Minus
368                        }
369                        '*' => {
370                            self.advance();
371                            Token::Star
372                        }
373                        '/' => {
374                            self.advance();
375                            Token::Divide
376                        }
377                        '%' => {
378                            self.advance();
379                            Token::Modulo
380                        }
381                        '=' => {
382                            self.advance();
383                            Token::Equal
384                        }
385                        '<' => {
386                            self.advance();
387                            if self.current_char == Some('=') {
388                                self.advance();
389                                Token::LessThanOrEqual
390                            } else if self.current_char == Some('>') {
391                                self.advance();
392                                Token::NotEqual
393                            } else {
394                                Token::LessThan
395                            }
396                        }
397                        '>' => {
398                            self.advance();
399                            if self.current_char == Some('=') {
400                                self.advance();
401                                Token::GreaterThanOrEqual
402                            } else {
403                                Token::GreaterThan
404                            }
405                        }
406                        '!' => {
407                            self.advance();
408                            if self.current_char == Some('=') {
409                                self.advance();
410                                Token::NotEqual
411                            } else {
412                                Token::Not
413                            }
414                        }
415                        ',' => {
416                            self.advance();
417                            Token::Comma
418                        }
419                        '.' => {
420                            self.advance();
421                            Token::Dot
422                        }
423                        ';' => {
424                            self.advance();
425                            Token::Semicolon
426                        }
427                        '(' => {
428                            self.advance();
429                            Token::OpenParen
430                        }
431                        ')' => {
432                            self.advance();
433                            Token::CloseParen
434                        }
435                        '[' => {
436                            self.advance();
437                            Token::OpenBracket
438                        }
439                        ']' => {
440                            self.advance();
441                            Token::CloseBracket
442                        }
443                        _ => {
444                            self.advance();
445                            Token::Eof
446                        }
447                    }
448                }
449            }
450        }
451    }
452
453    /// Tokenize entire input
454    pub fn tokenize(&mut self) -> Vec<Token> {
455        let mut tokens = Vec::new();
456        loop {
457            let token = self.next_token();
458            if token == Token::Eof {
459                tokens.push(token);
460                break;
461            }
462            tokens.push(token);
463        }
464        tokens
465    }
466}
467
468/// Expression in SQL AST
469#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
470pub enum Expression {
471    /// Column reference
472    Column(String),
473    /// Qualified column (table.column)
474    QualifiedColumn(String, String),
475    /// Literal value
476    Literal(SqlValue),
477    /// Binary operation
478    BinaryOp {
479        left: Box<Expression>,
480        op: BinaryOperator,
481        right: Box<Expression>,
482    },
483    /// Unary operation
484    UnaryOp {
485        op: UnaryOperator,
486        expr: Box<Expression>,
487    },
488    /// Function call
489    Function {
490        name: String,
491        args: Vec<Expression>,
492        distinct: bool,
493    },
494    /// Aggregate function
495    Aggregate {
496        func: AggregateFunction,
497        expr: Box<Expression>,
498        distinct: bool,
499    },
500    /// CASE expression
501    Case {
502        operand: Option<Box<Expression>>,
503        when_clauses: Vec<(Expression, Expression)>,
504        else_clause: Option<Box<Expression>>,
505    },
506    /// Subquery
507    Subquery(Box<SelectStatement>),
508    /// IN expression
509    InList {
510        expr: Box<Expression>,
511        list: Vec<Expression>,
512        negated: bool,
513    },
514    /// BETWEEN expression
515    Between {
516        expr: Box<Expression>,
517        low: Box<Expression>,
518        high: Box<Expression>,
519        negated: bool,
520    },
521    /// IS NULL
522    IsNull {
523        expr: Box<Expression>,
524        negated: bool,
525    },
526    /// Star (*)
527    Star,
528}
529
530/// SQL value types
531#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
532pub enum SqlValue {
533    Null,
534    Integer(i64),
535    Float(f64),
536    String(String),
537    Boolean(bool),
538    Timestamp(DateTime<Utc>),
539}
540
541/// Binary operators
542#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
543pub enum BinaryOperator {
544    Plus,
545    Minus,
546    Multiply,
547    Divide,
548    Modulo,
549    Equal,
550    NotEqual,
551    LessThan,
552    LessThanOrEqual,
553    GreaterThan,
554    GreaterThanOrEqual,
555    And,
556    Or,
557    Like,
558}
559
560/// Unary operators
561#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
562pub enum UnaryOperator {
563    Not,
564    Minus,
565}
566
567/// Aggregate functions
568#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
569pub enum AggregateFunction {
570    Count,
571    Sum,
572    Avg,
573    Min,
574    Max,
575    StdDev,
576    Variance,
577}
578
579/// Window specification
580#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
581pub struct WindowSpec {
582    /// Window type
583    pub window_type: WindowType,
584    /// Window size
585    pub size: Duration,
586    /// Slide interval (for sliding windows)
587    pub slide: Option<Duration>,
588    /// Session gap (for session windows)
589    pub gap: Option<Duration>,
590    /// Time attribute
591    pub time_attribute: Option<String>,
592}
593
594/// Window types
595#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
596pub enum WindowType {
597    Tumbling,
598    Sliding,
599    Session,
600}
601
602/// SELECT column specification
603#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
604pub struct SelectItem {
605    /// Expression
606    pub expr: Expression,
607    /// Alias
608    pub alias: Option<String>,
609}
610
611/// FROM clause item
612#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
613pub enum FromClause {
614    /// Simple table/stream reference
615    Table { name: String, alias: Option<String> },
616    /// Join
617    Join {
618        left: Box<FromClause>,
619        right: Box<FromClause>,
620        join_type: JoinType,
621        condition: Option<Expression>,
622    },
623    /// Subquery
624    Subquery {
625        query: Box<SelectStatement>,
626        alias: String,
627    },
628}
629
630/// Join types
631#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
632pub enum JoinType {
633    Inner,
634    Left,
635    Right,
636    Full,
637    Cross,
638}
639
640/// ORDER BY specification
641#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
642pub struct OrderByItem {
643    /// Expression to order by
644    pub expr: Expression,
645    /// Ascending or descending
646    pub ascending: bool,
647    /// NULLS FIRST or NULLS LAST
648    pub nulls_first: Option<bool>,
649}
650
651/// SELECT statement AST
652#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
653pub struct SelectStatement {
654    /// DISTINCT flag
655    pub distinct: bool,
656    /// SELECT list
657    pub columns: Vec<SelectItem>,
658    /// FROM clause
659    pub from: Option<FromClause>,
660    /// WHERE clause
661    pub where_clause: Option<Expression>,
662    /// GROUP BY clause
663    pub group_by: Vec<Expression>,
664    /// HAVING clause
665    pub having: Option<Expression>,
666    /// ORDER BY clause
667    pub order_by: Vec<OrderByItem>,
668    /// LIMIT
669    pub limit: Option<usize>,
670    /// OFFSET
671    pub offset: Option<usize>,
672    /// Window specification
673    pub window: Option<WindowSpec>,
674}
675
676/// CREATE STREAM statement
677#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
678pub struct CreateStreamStatement {
679    /// Stream name
680    pub name: String,
681    /// Column definitions
682    pub columns: Vec<ColumnDefinition>,
683    /// Stream properties
684    pub properties: HashMap<String, String>,
685}
686
687/// Column definition
688#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
689pub struct ColumnDefinition {
690    /// Column name
691    pub name: String,
692    /// Data type
693    pub data_type: DataType,
694    /// NOT NULL constraint
695    pub not_null: bool,
696    /// DEFAULT value
697    pub default: Option<Expression>,
698}
699
700/// SQL data types
701#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
702pub enum DataType {
703    Integer,
704    Float,
705    String,
706    Boolean,
707    Timestamp,
708    Array(Box<DataType>),
709    Map(Box<DataType>, Box<DataType>),
710}
711
712/// Parser for SQL queries
713pub struct Parser {
714    tokens: Vec<Token>,
715    position: usize,
716}
717
718impl Parser {
719    /// Create a new parser
720    pub fn new(tokens: Vec<Token>) -> Self {
721        Self {
722            tokens,
723            position: 0,
724        }
725    }
726
727    /// Get current token
728    fn current_token(&self) -> &Token {
729        self.tokens.get(self.position).unwrap_or(&Token::Eof)
730    }
731
732    /// Peek at next token
733    fn peek_token(&self) -> &Token {
734        self.tokens.get(self.position + 1).unwrap_or(&Token::Eof)
735    }
736
737    /// Advance to next token
738    fn advance(&mut self) {
739        self.position += 1;
740    }
741
742    /// Expect a specific token
743    fn expect(&mut self, expected: Token) -> Result<()> {
744        if self.current_token() == &expected {
745            self.advance();
746            Ok(())
747        } else {
748            Err(anyhow!(
749                "Expected {:?}, got {:?}",
750                expected,
751                self.current_token()
752            ))
753        }
754    }
755
756    /// Parse a SELECT statement
757    pub fn parse_select(&mut self) -> Result<SelectStatement> {
758        self.expect(Token::Select)?;
759
760        // DISTINCT
761        let distinct = if self.current_token() == &Token::Distinct {
762            self.advance();
763            true
764        } else {
765            false
766        };
767
768        // SELECT list
769        let columns = self.parse_select_list()?;
770
771        // FROM clause
772        let from = if self.current_token() == &Token::From {
773            self.advance();
774            Some(self.parse_from_clause()?)
775        } else {
776            None
777        };
778
779        // WINDOW clause
780        let window = if self.current_token() == &Token::Window {
781            self.advance();
782            Some(self.parse_window_spec()?)
783        } else {
784            None
785        };
786
787        // WHERE clause
788        let where_clause = if self.current_token() == &Token::Where {
789            self.advance();
790            Some(self.parse_expression()?)
791        } else {
792            None
793        };
794
795        // GROUP BY clause
796        let group_by = if self.current_token() == &Token::Group {
797            self.advance();
798            // Expect 'BY'
799            if self.current_token() == &Token::By {
800                self.advance();
801            }
802            self.parse_expression_list()?
803        } else {
804            Vec::new()
805        };
806
807        // HAVING clause
808        let having = if self.current_token() == &Token::Having {
809            self.advance();
810            Some(self.parse_expression()?)
811        } else {
812            None
813        };
814
815        // ORDER BY clause
816        let order_by = if self.current_token() == &Token::Order {
817            self.advance();
818            // Expect 'BY'
819            if self.current_token() == &Token::By {
820                self.advance();
821            }
822            self.parse_order_by_list()?
823        } else {
824            Vec::new()
825        };
826
827        // LIMIT
828        let limit = if self.current_token() == &Token::Limit {
829            self.advance();
830            if let Token::NumberLiteral(n) = self.current_token() {
831                let limit = *n as usize;
832                self.advance();
833                Some(limit)
834            } else {
835                None
836            }
837        } else {
838            None
839        };
840
841        Ok(SelectStatement {
842            distinct,
843            columns,
844            from,
845            where_clause,
846            group_by,
847            having,
848            order_by,
849            limit,
850            offset: None,
851            window,
852        })
853    }
854
855    /// Parse SELECT list
856    fn parse_select_list(&mut self) -> Result<Vec<SelectItem>> {
857        let mut items = Vec::new();
858
859        loop {
860            let expr = self.parse_expression()?;
861
862            // Check for alias
863            let alias = if self.current_token() == &Token::As {
864                self.advance();
865                if let Token::Identifier(name) = self.current_token().clone() {
866                    self.advance();
867                    Some(name)
868                } else {
869                    None
870                }
871            } else if let Token::Identifier(name) = self.current_token().clone() {
872                // Alias without AS
873                if name.to_uppercase() != "FROM"
874                    && name.to_uppercase() != "WHERE"
875                    && name.to_uppercase() != "GROUP"
876                    && name.to_uppercase() != "ORDER"
877                    && name.to_uppercase() != "WINDOW"
878                {
879                    self.advance();
880                    Some(name)
881                } else {
882                    None
883                }
884            } else {
885                None
886            };
887
888            items.push(SelectItem { expr, alias });
889
890            if self.current_token() != &Token::Comma {
891                break;
892            }
893            self.advance(); // Skip comma
894        }
895
896        Ok(items)
897    }
898
899    /// Parse FROM clause
900    fn parse_from_clause(&mut self) -> Result<FromClause> {
901        let mut from = self.parse_table_reference()?;
902
903        // Check for joins
904        while matches!(
905            self.current_token(),
906            Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full
907        ) {
908            let join_type = self.parse_join_type()?;
909            let right = self.parse_table_reference()?;
910
911            let condition = if self.current_token() == &Token::On {
912                self.advance();
913                Some(self.parse_expression()?)
914            } else {
915                None
916            };
917
918            from = FromClause::Join {
919                left: Box::new(from),
920                right: Box::new(right),
921                join_type,
922                condition,
923            };
924        }
925
926        Ok(from)
927    }
928
929    /// Parse table reference
930    fn parse_table_reference(&mut self) -> Result<FromClause> {
931        if let Token::Identifier(name) = self.current_token().clone() {
932            self.advance();
933
934            let alias = if self.current_token() == &Token::As {
935                self.advance();
936                if let Token::Identifier(alias) = self.current_token().clone() {
937                    self.advance();
938                    Some(alias)
939                } else {
940                    None
941                }
942            } else if let Token::Identifier(alias) = self.current_token().clone() {
943                // Check if this is an alias or a keyword
944                if !matches!(
945                    alias.to_uppercase().as_str(),
946                    "WHERE"
947                        | "GROUP"
948                        | "ORDER"
949                        | "HAVING"
950                        | "LIMIT"
951                        | "JOIN"
952                        | "INNER"
953                        | "LEFT"
954                        | "RIGHT"
955                        | "FULL"
956                        | "ON"
957                        | "WINDOW"
958                ) {
959                    self.advance();
960                    Some(alias)
961                } else {
962                    None
963                }
964            } else {
965                None
966            };
967
968            Ok(FromClause::Table { name, alias })
969        } else {
970            Err(anyhow!("Expected table name"))
971        }
972    }
973
974    /// Parse join type
975    fn parse_join_type(&mut self) -> Result<JoinType> {
976        let join_type = match self.current_token() {
977            Token::Inner => {
978                self.advance();
979                JoinType::Inner
980            }
981            Token::Left => {
982                self.advance();
983                if self.current_token() == &Token::Outer {
984                    self.advance();
985                }
986                JoinType::Left
987            }
988            Token::Right => {
989                self.advance();
990                if self.current_token() == &Token::Outer {
991                    self.advance();
992                }
993                JoinType::Right
994            }
995            Token::Full => {
996                self.advance();
997                if self.current_token() == &Token::Outer {
998                    self.advance();
999                }
1000                JoinType::Full
1001            }
1002            _ => JoinType::Inner,
1003        };
1004
1005        // Expect JOIN keyword
1006        if self.current_token() == &Token::Join {
1007            self.advance();
1008        }
1009
1010        Ok(join_type)
1011    }
1012
1013    /// Parse window specification
1014    fn parse_window_spec(&mut self) -> Result<WindowSpec> {
1015        let window_type = match self.current_token() {
1016            Token::Tumbling => {
1017                self.advance();
1018                WindowType::Tumbling
1019            }
1020            Token::Sliding => {
1021                self.advance();
1022                WindowType::Sliding
1023            }
1024            Token::Session => {
1025                self.advance();
1026                WindowType::Session
1027            }
1028            _ => WindowType::Tumbling,
1029        };
1030
1031        self.expect(Token::OpenParen)?;
1032
1033        let mut size = Duration::from_secs(60);
1034        let mut slide = None;
1035        let mut gap = None;
1036
1037        // Parse window parameters
1038        while self.current_token() != &Token::CloseParen {
1039            match self.current_token() {
1040                Token::Size => {
1041                    self.advance();
1042                    size = self.parse_duration()?;
1043                }
1044                Token::Slide => {
1045                    self.advance();
1046                    slide = Some(self.parse_duration()?);
1047                }
1048                Token::Gap => {
1049                    self.advance();
1050                    gap = Some(self.parse_duration()?);
1051                }
1052                Token::Comma => {
1053                    self.advance();
1054                }
1055                _ => {
1056                    self.advance();
1057                }
1058            }
1059        }
1060
1061        self.expect(Token::CloseParen)?;
1062
1063        Ok(WindowSpec {
1064            window_type,
1065            size,
1066            slide,
1067            gap,
1068            time_attribute: None,
1069        })
1070    }
1071
1072    /// Parse duration (e.g., "5 MINUTES")
1073    fn parse_duration(&mut self) -> Result<Duration> {
1074        let value = if let Token::NumberLiteral(n) = self.current_token() {
1075            let v = *n as u64;
1076            self.advance();
1077            v
1078        } else {
1079            return Err(anyhow!("Expected number for duration"));
1080        };
1081
1082        let unit = if let Token::Identifier(unit) = self.current_token().clone() {
1083            self.advance();
1084            unit.to_uppercase()
1085        } else {
1086            "SECONDS".to_string()
1087        };
1088
1089        let duration = match unit.as_str() {
1090            "MILLISECONDS" | "MILLIS" | "MS" => Duration::from_millis(value),
1091            "SECONDS" | "SECOND" | "S" => Duration::from_secs(value),
1092            "MINUTES" | "MINUTE" | "M" => Duration::from_secs(value * 60),
1093            "HOURS" | "HOUR" | "H" => Duration::from_secs(value * 3600),
1094            "DAYS" | "DAY" | "D" => Duration::from_secs(value * 86400),
1095            _ => Duration::from_secs(value),
1096        };
1097
1098        Ok(duration)
1099    }
1100
1101    /// Parse expression
1102    fn parse_expression(&mut self) -> Result<Expression> {
1103        self.parse_or_expression()
1104    }
1105
1106    /// Parse OR expression
1107    fn parse_or_expression(&mut self) -> Result<Expression> {
1108        let mut left = self.parse_and_expression()?;
1109
1110        while self.current_token() == &Token::Or {
1111            self.advance();
1112            let right = self.parse_and_expression()?;
1113            left = Expression::BinaryOp {
1114                left: Box::new(left),
1115                op: BinaryOperator::Or,
1116                right: Box::new(right),
1117            };
1118        }
1119
1120        Ok(left)
1121    }
1122
1123    /// Parse AND expression
1124    fn parse_and_expression(&mut self) -> Result<Expression> {
1125        let mut left = self.parse_comparison_expression()?;
1126
1127        while self.current_token() == &Token::And {
1128            self.advance();
1129            let right = self.parse_comparison_expression()?;
1130            left = Expression::BinaryOp {
1131                left: Box::new(left),
1132                op: BinaryOperator::And,
1133                right: Box::new(right),
1134            };
1135        }
1136
1137        Ok(left)
1138    }
1139
1140    /// Parse comparison expression
1141    fn parse_comparison_expression(&mut self) -> Result<Expression> {
1142        let left = self.parse_additive_expression()?;
1143
1144        let op = match self.current_token() {
1145            Token::Equal => Some(BinaryOperator::Equal),
1146            Token::NotEqual => Some(BinaryOperator::NotEqual),
1147            Token::LessThan => Some(BinaryOperator::LessThan),
1148            Token::LessThanOrEqual => Some(BinaryOperator::LessThanOrEqual),
1149            Token::GreaterThan => Some(BinaryOperator::GreaterThan),
1150            Token::GreaterThanOrEqual => Some(BinaryOperator::GreaterThanOrEqual),
1151            Token::Like => Some(BinaryOperator::Like),
1152            _ => None,
1153        };
1154
1155        if let Some(op) = op {
1156            self.advance();
1157            let right = self.parse_additive_expression()?;
1158            Ok(Expression::BinaryOp {
1159                left: Box::new(left),
1160                op,
1161                right: Box::new(right),
1162            })
1163        } else {
1164            Ok(left)
1165        }
1166    }
1167
1168    /// Parse additive expression
1169    fn parse_additive_expression(&mut self) -> Result<Expression> {
1170        let mut left = self.parse_multiplicative_expression()?;
1171
1172        loop {
1173            let op = match self.current_token() {
1174                Token::Plus => Some(BinaryOperator::Plus),
1175                Token::Minus => Some(BinaryOperator::Minus),
1176                _ => None,
1177            };
1178
1179            if let Some(op) = op {
1180                self.advance();
1181                let right = self.parse_multiplicative_expression()?;
1182                left = Expression::BinaryOp {
1183                    left: Box::new(left),
1184                    op,
1185                    right: Box::new(right),
1186                };
1187            } else {
1188                break;
1189            }
1190        }
1191
1192        Ok(left)
1193    }
1194
1195    /// Parse multiplicative expression
1196    fn parse_multiplicative_expression(&mut self) -> Result<Expression> {
1197        let mut left = self.parse_unary_expression()?;
1198
1199        loop {
1200            let op = match self.current_token() {
1201                Token::Multiply | Token::Star => Some(BinaryOperator::Multiply),
1202                Token::Divide => Some(BinaryOperator::Divide),
1203                Token::Modulo => Some(BinaryOperator::Modulo),
1204                _ => None,
1205            };
1206
1207            if let Some(op) = op {
1208                self.advance();
1209                let right = self.parse_unary_expression()?;
1210                left = Expression::BinaryOp {
1211                    left: Box::new(left),
1212                    op,
1213                    right: Box::new(right),
1214                };
1215            } else {
1216                break;
1217            }
1218        }
1219
1220        Ok(left)
1221    }
1222
1223    /// Parse unary expression
1224    fn parse_unary_expression(&mut self) -> Result<Expression> {
1225        match self.current_token() {
1226            Token::Not => {
1227                self.advance();
1228                let expr = self.parse_unary_expression()?;
1229                Ok(Expression::UnaryOp {
1230                    op: UnaryOperator::Not,
1231                    expr: Box::new(expr),
1232                })
1233            }
1234            Token::Minus => {
1235                self.advance();
1236                let expr = self.parse_unary_expression()?;
1237                Ok(Expression::UnaryOp {
1238                    op: UnaryOperator::Minus,
1239                    expr: Box::new(expr),
1240                })
1241            }
1242            _ => self.parse_primary_expression(),
1243        }
1244    }
1245
1246    /// Parse primary expression
1247    fn parse_primary_expression(&mut self) -> Result<Expression> {
1248        match self.current_token().clone() {
1249            Token::Star => {
1250                self.advance();
1251                Ok(Expression::Star)
1252            }
1253            Token::NumberLiteral(n) => {
1254                self.advance();
1255                if n.fract() == 0.0 {
1256                    Ok(Expression::Literal(SqlValue::Integer(n as i64)))
1257                } else {
1258                    Ok(Expression::Literal(SqlValue::Float(n)))
1259                }
1260            }
1261            Token::StringLiteral(s) => {
1262                self.advance();
1263                Ok(Expression::Literal(SqlValue::String(s)))
1264            }
1265            Token::BooleanLiteral(b) => {
1266                self.advance();
1267                Ok(Expression::Literal(SqlValue::Boolean(b)))
1268            }
1269            Token::Null => {
1270                self.advance();
1271                Ok(Expression::Literal(SqlValue::Null))
1272            }
1273            Token::Count
1274            | Token::Sum
1275            | Token::Avg
1276            | Token::Min
1277            | Token::Max
1278            | Token::StdDev
1279            | Token::Variance => {
1280                let func = match self.current_token() {
1281                    Token::Count => AggregateFunction::Count,
1282                    Token::Sum => AggregateFunction::Sum,
1283                    Token::Avg => AggregateFunction::Avg,
1284                    Token::Min => AggregateFunction::Min,
1285                    Token::Max => AggregateFunction::Max,
1286                    Token::StdDev => AggregateFunction::StdDev,
1287                    Token::Variance => AggregateFunction::Variance,
1288                    _ => unreachable!(),
1289                };
1290                self.advance();
1291                self.expect(Token::OpenParen)?;
1292
1293                let distinct = if self.current_token() == &Token::Distinct {
1294                    self.advance();
1295                    true
1296                } else {
1297                    false
1298                };
1299
1300                let expr = self.parse_expression()?;
1301                self.expect(Token::CloseParen)?;
1302
1303                Ok(Expression::Aggregate {
1304                    func,
1305                    expr: Box::new(expr),
1306                    distinct,
1307                })
1308            }
1309            Token::Identifier(name) => {
1310                self.advance();
1311
1312                // Check for function call
1313                if self.current_token() == &Token::OpenParen {
1314                    self.advance();
1315                    let mut args = Vec::new();
1316
1317                    if self.current_token() != &Token::CloseParen {
1318                        loop {
1319                            args.push(self.parse_expression()?);
1320                            if self.current_token() != &Token::Comma {
1321                                break;
1322                            }
1323                            self.advance();
1324                        }
1325                    }
1326
1327                    self.expect(Token::CloseParen)?;
1328
1329                    Ok(Expression::Function {
1330                        name,
1331                        args,
1332                        distinct: false,
1333                    })
1334                } else if self.current_token() == &Token::Dot {
1335                    // Qualified column name
1336                    self.advance();
1337                    if let Token::Identifier(column) = self.current_token().clone() {
1338                        self.advance();
1339                        Ok(Expression::QualifiedColumn(name, column))
1340                    } else {
1341                        Ok(Expression::Column(name))
1342                    }
1343                } else {
1344                    Ok(Expression::Column(name))
1345                }
1346            }
1347            Token::OpenParen => {
1348                self.advance();
1349                let expr = self.parse_expression()?;
1350                self.expect(Token::CloseParen)?;
1351                Ok(expr)
1352            }
1353            _ => Err(anyhow!("Unexpected token: {:?}", self.current_token())),
1354        }
1355    }
1356
1357    /// Parse expression list
1358    fn parse_expression_list(&mut self) -> Result<Vec<Expression>> {
1359        let mut exprs = Vec::new();
1360
1361        loop {
1362            exprs.push(self.parse_expression()?);
1363            if self.current_token() != &Token::Comma {
1364                break;
1365            }
1366            self.advance();
1367        }
1368
1369        Ok(exprs)
1370    }
1371
1372    /// Parse ORDER BY list
1373    fn parse_order_by_list(&mut self) -> Result<Vec<OrderByItem>> {
1374        let mut items = Vec::new();
1375
1376        loop {
1377            let expr = self.parse_expression()?;
1378
1379            let ascending = if let Token::Identifier(dir) = self.current_token().clone() {
1380                match dir.to_uppercase().as_str() {
1381                    "ASC" => {
1382                        self.advance();
1383                        true
1384                    }
1385                    "DESC" => {
1386                        self.advance();
1387                        false
1388                    }
1389                    _ => true,
1390                }
1391            } else {
1392                true
1393            };
1394
1395            items.push(OrderByItem {
1396                expr,
1397                ascending,
1398                nulls_first: None,
1399            });
1400
1401            if self.current_token() != &Token::Comma {
1402                break;
1403            }
1404            self.advance();
1405        }
1406
1407        Ok(items)
1408    }
1409}
1410
1411/// Query result row
1412#[derive(Debug, Clone, Serialize, Deserialize)]
1413pub struct ResultRow {
1414    /// Column values
1415    pub values: Vec<SqlValue>,
1416}
1417
1418/// Query result
1419#[derive(Debug, Clone, Serialize, Deserialize)]
1420pub struct QueryResult {
1421    /// Column names
1422    pub columns: Vec<String>,
1423    /// Result rows
1424    pub rows: Vec<ResultRow>,
1425    /// Execution time
1426    pub execution_time: Duration,
1427    /// Rows affected
1428    pub rows_affected: usize,
1429}
1430
1431/// Stream SQL engine
1432pub struct StreamSqlEngine {
1433    /// Configuration
1434    config: StreamSqlConfig,
1435    /// Registered streams
1436    streams: Arc<RwLock<HashMap<String, StreamMetadata>>>,
1437    /// Query cache
1438    query_cache: Arc<RwLock<HashMap<String, SelectStatement>>>,
1439    /// Statistics
1440    stats: Arc<RwLock<StreamSqlStats>>,
1441}
1442
1443/// Stream metadata
1444#[derive(Debug, Clone, Serialize, Deserialize)]
1445pub struct StreamMetadata {
1446    /// Stream name
1447    pub name: String,
1448    /// Column definitions
1449    pub columns: Vec<ColumnDefinition>,
1450    /// Stream properties
1451    pub properties: HashMap<String, String>,
1452    /// Created at
1453    pub created_at: DateTime<Utc>,
1454}
1455
1456/// Stream SQL statistics
1457#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1458pub struct StreamSqlStats {
1459    /// Total queries executed
1460    pub queries_executed: u64,
1461    /// Queries succeeded
1462    pub queries_succeeded: u64,
1463    /// Queries failed
1464    pub queries_failed: u64,
1465    /// Average execution time
1466    pub avg_execution_time_ms: f64,
1467    /// Cache hits
1468    pub cache_hits: u64,
1469    /// Cache misses
1470    pub cache_misses: u64,
1471}
1472
1473impl StreamSqlEngine {
1474    /// Create a new Stream SQL engine
1475    pub fn new(config: StreamSqlConfig) -> Self {
1476        Self {
1477            config,
1478            streams: Arc::new(RwLock::new(HashMap::new())),
1479            query_cache: Arc::new(RwLock::new(HashMap::new())),
1480            stats: Arc::new(RwLock::new(StreamSqlStats::default())),
1481        }
1482    }
1483
1484    /// Parse a SQL query
1485    pub fn parse(&self, sql: &str) -> Result<SelectStatement> {
1486        let mut lexer = Lexer::new(sql);
1487        let tokens = lexer.tokenize();
1488        let mut parser = Parser::new(tokens);
1489        parser.parse_select()
1490    }
1491
1492    /// Execute a SQL query
1493    pub async fn execute(&self, sql: &str) -> Result<QueryResult> {
1494        let start_time = std::time::Instant::now();
1495
1496        // Check cache
1497        if self.config.enable_query_cache {
1498            let cache = self.query_cache.read().await;
1499            if cache.contains_key(sql) {
1500                let mut stats = self.stats.write().await;
1501                stats.cache_hits += 1;
1502                debug!("Query cache hit");
1503            } else {
1504                let mut stats = self.stats.write().await;
1505                stats.cache_misses += 1;
1506            }
1507        }
1508
1509        // Parse query
1510        let statement = self.parse(sql)?;
1511
1512        // Update cache
1513        if self.config.enable_query_cache {
1514            let mut cache = self.query_cache.write().await;
1515            if cache.len() < self.config.cache_size {
1516                cache.insert(sql.to_string(), statement.clone());
1517            }
1518        }
1519
1520        // Execute query (placeholder - actual execution would process the AST)
1521        let result = QueryResult {
1522            columns: statement
1523                .columns
1524                .iter()
1525                .map(|c| c.alias.clone().unwrap_or_else(|| format!("column_{}", 0)))
1526                .collect(),
1527            rows: Vec::new(),
1528            execution_time: start_time.elapsed(),
1529            rows_affected: 0,
1530        };
1531
1532        // Update statistics
1533        let mut stats = self.stats.write().await;
1534        stats.queries_executed += 1;
1535        stats.queries_succeeded += 1;
1536        stats.avg_execution_time_ms = (stats.avg_execution_time_ms
1537            * (stats.queries_executed - 1) as f64
1538            + result.execution_time.as_millis() as f64)
1539            / stats.queries_executed as f64;
1540
1541        if self.config.enable_query_logging {
1542            info!(
1543                "Executed query in {:?}: {}",
1544                result.execution_time,
1545                &sql[..sql.len().min(100)]
1546            );
1547        }
1548
1549        Ok(result)
1550    }
1551
1552    /// Register a stream
1553    pub async fn register_stream(&self, metadata: StreamMetadata) -> Result<()> {
1554        let mut streams = self.streams.write().await;
1555        info!("Registering stream: {}", metadata.name);
1556        streams.insert(metadata.name.clone(), metadata);
1557        Ok(())
1558    }
1559
1560    /// Unregister a stream
1561    pub async fn unregister_stream(&self, name: &str) -> Result<()> {
1562        let mut streams = self.streams.write().await;
1563        if streams.remove(name).is_some() {
1564            info!("Unregistered stream: {}", name);
1565            Ok(())
1566        } else {
1567            Err(anyhow!("Stream not found: {}", name))
1568        }
1569    }
1570
1571    /// Get stream metadata
1572    pub async fn get_stream(&self, name: &str) -> Option<StreamMetadata> {
1573        let streams = self.streams.read().await;
1574        streams.get(name).cloned()
1575    }
1576
1577    /// List all streams
1578    pub async fn list_streams(&self) -> Vec<String> {
1579        let streams = self.streams.read().await;
1580        streams.keys().cloned().collect()
1581    }
1582
1583    /// Get statistics
1584    pub async fn get_stats(&self) -> StreamSqlStats {
1585        self.stats.read().await.clone()
1586    }
1587
1588    /// Clear query cache
1589    pub async fn clear_cache(&self) {
1590        let mut cache = self.query_cache.write().await;
1591        cache.clear();
1592        info!("Query cache cleared");
1593    }
1594
1595    /// Validate a query without executing
1596    pub fn validate(&self, sql: &str) -> Result<()> {
1597        self.parse(sql)?;
1598        Ok(())
1599    }
1600
1601    /// Explain a query
1602    pub fn explain(&self, sql: &str) -> Result<String> {
1603        let statement = self.parse(sql)?;
1604        Ok(format!("{:#?}", statement))
1605    }
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610    use super::*;
1611
1612    #[test]
1613    fn test_lexer_basic() {
1614        let mut lexer = Lexer::new("SELECT * FROM events");
1615        let tokens = lexer.tokenize();
1616
1617        assert_eq!(tokens.len(), 5);
1618        assert_eq!(tokens[0], Token::Select);
1619        assert_eq!(tokens[1], Token::Star);
1620        assert_eq!(tokens[2], Token::From);
1621        assert_eq!(tokens[3], Token::Identifier("events".to_string()));
1622        assert_eq!(tokens[4], Token::Eof);
1623    }
1624
1625    #[test]
1626    fn test_lexer_with_literals() {
1627        let mut lexer = Lexer::new("SELECT name, 42, 'hello' FROM events");
1628        let tokens = lexer.tokenize();
1629
1630        assert!(matches!(tokens[1], Token::Identifier(_)));
1631        assert!(matches!(tokens[3], Token::NumberLiteral(_)));
1632        assert!(matches!(tokens[5], Token::StringLiteral(_)));
1633    }
1634
1635    #[test]
1636    fn test_parser_simple_select() {
1637        let mut lexer = Lexer::new("SELECT id, name FROM users WHERE id = 1");
1638        let tokens = lexer.tokenize();
1639        let mut parser = Parser::new(tokens);
1640        let result = parser.parse_select();
1641
1642        assert!(result.is_ok());
1643        let stmt = result.unwrap();
1644        assert_eq!(stmt.columns.len(), 2);
1645        assert!(stmt.where_clause.is_some());
1646    }
1647
1648    #[test]
1649    fn test_parser_aggregate() {
1650        let mut lexer = Lexer::new("SELECT COUNT(*), AVG(value) FROM events");
1651        let tokens = lexer.tokenize();
1652        let mut parser = Parser::new(tokens);
1653        let result = parser.parse_select();
1654
1655        assert!(result.is_ok());
1656        let stmt = result.unwrap();
1657        assert_eq!(stmt.columns.len(), 2);
1658    }
1659
1660    #[test]
1661    fn test_parser_window() {
1662        let mut lexer = Lexer::new("SELECT * FROM events WINDOW TUMBLING (SIZE 5 MINUTES)");
1663        let tokens = lexer.tokenize();
1664        let mut parser = Parser::new(tokens);
1665        let result = parser.parse_select();
1666
1667        assert!(result.is_ok());
1668        let stmt = result.unwrap();
1669        assert!(stmt.window.is_some());
1670        let window = stmt.window.unwrap();
1671        assert_eq!(window.window_type, WindowType::Tumbling);
1672        assert_eq!(window.size, Duration::from_secs(300));
1673    }
1674
1675    #[test]
1676    fn test_parser_group_by() {
1677        let mut lexer = Lexer::new("SELECT sensor_id, AVG(temp) FROM sensors GROUP BY sensor_id");
1678        let tokens = lexer.tokenize();
1679        let mut parser = Parser::new(tokens);
1680        let result = parser.parse_select();
1681
1682        assert!(result.is_ok());
1683        let stmt = result.unwrap();
1684        assert!(!stmt.group_by.is_empty());
1685    }
1686
1687    #[test]
1688    fn test_parser_join() {
1689        let mut lexer = Lexer::new("SELECT * FROM a JOIN b ON a.id = b.aid");
1690        let tokens = lexer.tokenize();
1691        let mut parser = Parser::new(tokens);
1692        let result = parser.parse_select();
1693
1694        assert!(result.is_ok());
1695        let stmt = result.unwrap();
1696        assert!(matches!(stmt.from, Some(FromClause::Join { .. })));
1697    }
1698
1699    #[tokio::test]
1700    async fn test_engine_basic() {
1701        let config = StreamSqlConfig::default();
1702        let engine = StreamSqlEngine::new(config);
1703
1704        let result = engine.execute("SELECT * FROM events").await;
1705        assert!(result.is_ok());
1706
1707        let stats = engine.get_stats().await;
1708        assert_eq!(stats.queries_executed, 1);
1709        assert_eq!(stats.queries_succeeded, 1);
1710    }
1711
1712    #[tokio::test]
1713    async fn test_engine_stream_registration() {
1714        let config = StreamSqlConfig::default();
1715        let engine = StreamSqlEngine::new(config);
1716
1717        let metadata = StreamMetadata {
1718            name: "events".to_string(),
1719            columns: vec![
1720                ColumnDefinition {
1721                    name: "id".to_string(),
1722                    data_type: DataType::Integer,
1723                    not_null: true,
1724                    default: None,
1725                },
1726                ColumnDefinition {
1727                    name: "value".to_string(),
1728                    data_type: DataType::Float,
1729                    not_null: false,
1730                    default: None,
1731                },
1732            ],
1733            properties: HashMap::new(),
1734            created_at: Utc::now(),
1735        };
1736
1737        engine.register_stream(metadata).await.unwrap();
1738
1739        let streams = engine.list_streams().await;
1740        assert_eq!(streams.len(), 1);
1741        assert!(streams.contains(&"events".to_string()));
1742
1743        let stream = engine.get_stream("events").await;
1744        assert!(stream.is_some());
1745        assert_eq!(stream.unwrap().columns.len(), 2);
1746
1747        engine.unregister_stream("events").await.unwrap();
1748        let streams = engine.list_streams().await;
1749        assert!(streams.is_empty());
1750    }
1751
1752    #[test]
1753    fn test_engine_validate() {
1754        let config = StreamSqlConfig::default();
1755        let engine = StreamSqlEngine::new(config);
1756
1757        assert!(engine.validate("SELECT * FROM events").is_ok());
1758        assert!(engine.validate("INVALID SQL").is_err());
1759    }
1760
1761    #[test]
1762    fn test_engine_explain() {
1763        let config = StreamSqlConfig::default();
1764        let engine = StreamSqlEngine::new(config);
1765
1766        let result = engine.explain("SELECT COUNT(*) FROM events WHERE value > 10");
1767        assert!(result.is_ok());
1768        let explanation = result.unwrap();
1769        assert!(!explanation.is_empty());
1770    }
1771
1772    #[tokio::test]
1773    async fn test_engine_caching() {
1774        let config = StreamSqlConfig {
1775            enable_query_cache: true,
1776            cache_size: 100,
1777            ..Default::default()
1778        };
1779        let engine = StreamSqlEngine::new(config);
1780
1781        // First execution - cache miss
1782        engine.execute("SELECT * FROM events").await.unwrap();
1783
1784        // Second execution - cache hit
1785        engine.execute("SELECT * FROM events").await.unwrap();
1786
1787        let stats = engine.get_stats().await;
1788        assert_eq!(stats.cache_misses, 1);
1789        assert_eq!(stats.cache_hits, 1);
1790    }
1791
1792    #[test]
1793    fn test_parser_complex_expression() {
1794        let mut lexer = Lexer::new(
1795            "SELECT * FROM events WHERE (value > 10 AND status = 'active') OR priority = 1",
1796        );
1797        let tokens = lexer.tokenize();
1798        let mut parser = Parser::new(tokens);
1799        let result = parser.parse_select();
1800
1801        assert!(result.is_ok());
1802        let stmt = result.unwrap();
1803        assert!(stmt.where_clause.is_some());
1804    }
1805
1806    #[test]
1807    fn test_parser_order_by() {
1808        let mut lexer = Lexer::new("SELECT * FROM events ORDER BY created_at DESC, id ASC");
1809        let tokens = lexer.tokenize();
1810        let mut parser = Parser::new(tokens);
1811        let result = parser.parse_select();
1812
1813        assert!(result.is_ok(), "Parse failed: {:?}", result);
1814        let stmt = result.unwrap();
1815        assert_eq!(stmt.order_by.len(), 2);
1816        assert!(!stmt.order_by[0].ascending);
1817        assert!(stmt.order_by[1].ascending);
1818    }
1819
1820    #[test]
1821    fn test_parser_distinct() {
1822        let mut lexer = Lexer::new("SELECT DISTINCT sensor_id FROM readings");
1823        let tokens = lexer.tokenize();
1824        let mut parser = Parser::new(tokens);
1825        let result = parser.parse_select();
1826
1827        assert!(result.is_ok());
1828        let stmt = result.unwrap();
1829        assert!(stmt.distinct);
1830    }
1831
1832    #[test]
1833    fn test_parser_sliding_window() {
1834        let mut lexer =
1835            Lexer::new("SELECT * FROM events WINDOW SLIDING (SIZE 10 SECONDS, SLIDE 5 SECONDS)");
1836        let tokens = lexer.tokenize();
1837        let mut parser = Parser::new(tokens);
1838        let result = parser.parse_select();
1839
1840        assert!(result.is_ok());
1841        let stmt = result.unwrap();
1842        assert!(stmt.window.is_some());
1843        let window = stmt.window.unwrap();
1844        assert_eq!(window.window_type, WindowType::Sliding);
1845        assert_eq!(window.size, Duration::from_secs(10));
1846        assert_eq!(window.slide, Some(Duration::from_secs(5)));
1847    }
1848}