1use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::RwLock;
31use tracing::{debug, info};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct StreamSqlConfig {
36 pub max_execution_time: Duration,
38 pub enable_optimization: bool,
40 pub max_memory_bytes: usize,
42 pub parallel_execution: bool,
44 pub worker_threads: usize,
46 pub enable_query_cache: bool,
48 pub cache_size: usize,
50 pub enable_query_logging: bool,
52 pub default_window_size: Duration,
54}
55
56impl Default for StreamSqlConfig {
57 fn default() -> Self {
58 Self {
59 max_execution_time: Duration::from_secs(60),
60 enable_optimization: true,
61 max_memory_bytes: 1024 * 1024 * 1024, parallel_execution: true,
63 worker_threads: 4,
64 enable_query_cache: true,
65 cache_size: 1000,
66 enable_query_logging: true,
67 default_window_size: Duration::from_secs(60),
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq)]
74pub enum QueryType {
75 Select,
77 CreateStream,
79 DropStream,
81 Insert,
83 CreateView,
85 Describe,
87 Explain,
89}
90
91#[derive(Debug, Clone, PartialEq)]
93pub enum Token {
94 Select,
96 From,
97 Where,
98 Group,
99 By,
100 Having,
101 Order,
102 Limit,
103 Window,
104 Tumbling,
105 Sliding,
106 Session,
107 Size,
108 Slide,
109 Gap,
110 Create,
111 Stream,
112 View,
113 Drop,
114 Insert,
115 Into,
116 Values,
117 As,
118 And,
119 Or,
120 Not,
121 In,
122 Like,
123 Between,
124 Is,
125 Null,
126 Join,
127 Inner,
128 Left,
129 Right,
130 Full,
131 Outer,
132 On,
133 Describe,
134 Explain,
135 Distinct,
136 Case,
137 When,
138 Then,
139 Else,
140 End,
141
142 Int,
144 Float,
145 String,
146 Boolean,
147 Timestamp,
148
149 Plus,
151 Minus,
152 Multiply,
153 Divide,
154 Modulo,
155 Equal,
156 NotEqual,
157 LessThan,
158 LessThanOrEqual,
159 GreaterThan,
160 GreaterThanOrEqual,
161
162 Comma,
164 Dot,
165 Semicolon,
166 OpenParen,
167 CloseParen,
168 OpenBracket,
169 CloseBracket,
170
171 Identifier(String),
173 StringLiteral(String),
174 NumberLiteral(f64),
175 BooleanLiteral(bool),
176
177 Count,
179 Sum,
180 Avg,
181 Min,
182 Max,
183 StdDev,
184 Variance,
185
186 Star,
188 Eof,
189}
190
191pub struct Lexer {
193 input: Vec<char>,
194 position: usize,
195 current_char: Option<char>,
196}
197
198impl Lexer {
199 pub fn new(input: &str) -> Self {
201 let chars: Vec<char> = input.chars().collect();
202 let current_char = chars.first().copied();
203 Self {
204 input: chars,
205 position: 0,
206 current_char,
207 }
208 }
209
210 fn advance(&mut self) {
212 self.position += 1;
213 self.current_char = self.input.get(self.position).copied();
214 }
215
216 fn peek(&self) -> Option<char> {
218 self.input.get(self.position + 1).copied()
219 }
220
221 fn skip_whitespace(&mut self) {
223 while let Some(c) = self.current_char {
224 if c.is_whitespace() {
225 self.advance();
226 } else {
227 break;
228 }
229 }
230 }
231
232 fn read_identifier(&mut self) -> String {
234 let mut result = String::new();
235 while let Some(c) = self.current_char {
236 if c.is_alphanumeric() || c == '_' {
237 result.push(c);
238 self.advance();
239 } else {
240 break;
241 }
242 }
243 result
244 }
245
246 fn read_number(&mut self) -> f64 {
248 let mut result = String::new();
249 while let Some(c) = self.current_char {
250 if c.is_ascii_digit() || c == '.' {
251 result.push(c);
252 self.advance();
253 } else {
254 break;
255 }
256 }
257 result.parse().unwrap_or(0.0)
258 }
259
260 fn read_string(&mut self) -> String {
262 let quote = self
263 .current_char
264 .expect("current_char should be Some when read_string is called");
265 self.advance(); let mut result = String::new();
267 while let Some(c) = self.current_char {
268 if c == quote {
269 self.advance(); break;
271 } else if c == '\\' {
272 self.advance();
273 if let Some(escaped) = self.current_char {
274 result.push(escaped);
275 self.advance();
276 }
277 } else {
278 result.push(c);
279 self.advance();
280 }
281 }
282 result
283 }
284
285 pub fn next_token(&mut self) -> Token {
287 self.skip_whitespace();
288
289 match self.current_char {
290 None => Token::Eof,
291 Some(c) => {
292 if c.is_alphabetic() || c == '_' {
293 let ident = self.read_identifier();
294 match ident.to_uppercase().as_str() {
295 "SELECT" => Token::Select,
296 "FROM" => Token::From,
297 "WHERE" => Token::Where,
298 "GROUP" => Token::Group,
299 "BY" => Token::By,
300 "HAVING" => Token::Having,
301 "ORDER" => Token::Order,
302 "LIMIT" => Token::Limit,
303 "WINDOW" => Token::Window,
304 "TUMBLING" => Token::Tumbling,
305 "SLIDING" => Token::Sliding,
306 "SESSION" => Token::Session,
307 "SIZE" => Token::Size,
308 "SLIDE" => Token::Slide,
309 "GAP" => Token::Gap,
310 "CREATE" => Token::Create,
311 "STREAM" => Token::Stream,
312 "VIEW" => Token::View,
313 "DROP" => Token::Drop,
314 "INSERT" => Token::Insert,
315 "INTO" => Token::Into,
316 "VALUES" => Token::Values,
317 "AS" => Token::As,
318 "AND" => Token::And,
319 "OR" => Token::Or,
320 "NOT" => Token::Not,
321 "IN" => Token::In,
322 "LIKE" => Token::Like,
323 "BETWEEN" => Token::Between,
324 "IS" => Token::Is,
325 "NULL" => Token::Null,
326 "JOIN" => Token::Join,
327 "INNER" => Token::Inner,
328 "LEFT" => Token::Left,
329 "RIGHT" => Token::Right,
330 "FULL" => Token::Full,
331 "OUTER" => Token::Outer,
332 "ON" => Token::On,
333 "DESCRIBE" => Token::Describe,
334 "EXPLAIN" => Token::Explain,
335 "DISTINCT" => Token::Distinct,
336 "CASE" => Token::Case,
337 "WHEN" => Token::When,
338 "THEN" => Token::Then,
339 "ELSE" => Token::Else,
340 "END" => Token::End,
341 "INT" | "INTEGER" => Token::Int,
342 "FLOAT" | "DOUBLE" => Token::Float,
343 "STRING" | "VARCHAR" | "TEXT" => Token::String,
344 "BOOLEAN" | "BOOL" => Token::Boolean,
345 "TIMESTAMP" | "DATETIME" => Token::Timestamp,
346 "COUNT" => Token::Count,
347 "SUM" => Token::Sum,
348 "AVG" => Token::Avg,
349 "MIN" => Token::Min,
350 "MAX" => Token::Max,
351 "STDDEV" => Token::StdDev,
352 "VARIANCE" | "VAR" => Token::Variance,
353 "TRUE" => Token::BooleanLiteral(true),
354 "FALSE" => Token::BooleanLiteral(false),
355 _ => Token::Identifier(ident),
356 }
357 } else if c.is_ascii_digit() {
358 Token::NumberLiteral(self.read_number())
359 } else if c == '\'' || c == '"' {
360 Token::StringLiteral(self.read_string())
361 } else {
362 match c {
363 '+' => {
364 self.advance();
365 Token::Plus
366 }
367 '-' => {
368 self.advance();
369 Token::Minus
370 }
371 '*' => {
372 self.advance();
373 Token::Star
374 }
375 '/' => {
376 self.advance();
377 Token::Divide
378 }
379 '%' => {
380 self.advance();
381 Token::Modulo
382 }
383 '=' => {
384 self.advance();
385 Token::Equal
386 }
387 '<' => {
388 self.advance();
389 if self.current_char == Some('=') {
390 self.advance();
391 Token::LessThanOrEqual
392 } else if self.current_char == Some('>') {
393 self.advance();
394 Token::NotEqual
395 } else {
396 Token::LessThan
397 }
398 }
399 '>' => {
400 self.advance();
401 if self.current_char == Some('=') {
402 self.advance();
403 Token::GreaterThanOrEqual
404 } else {
405 Token::GreaterThan
406 }
407 }
408 '!' => {
409 self.advance();
410 if self.current_char == Some('=') {
411 self.advance();
412 Token::NotEqual
413 } else {
414 Token::Not
415 }
416 }
417 ',' => {
418 self.advance();
419 Token::Comma
420 }
421 '.' => {
422 self.advance();
423 Token::Dot
424 }
425 ';' => {
426 self.advance();
427 Token::Semicolon
428 }
429 '(' => {
430 self.advance();
431 Token::OpenParen
432 }
433 ')' => {
434 self.advance();
435 Token::CloseParen
436 }
437 '[' => {
438 self.advance();
439 Token::OpenBracket
440 }
441 ']' => {
442 self.advance();
443 Token::CloseBracket
444 }
445 _ => {
446 self.advance();
447 Token::Eof
448 }
449 }
450 }
451 }
452 }
453 }
454
455 pub fn tokenize(&mut self) -> Vec<Token> {
457 let mut tokens = Vec::new();
458 loop {
459 let token = self.next_token();
460 if token == Token::Eof {
461 tokens.push(token);
462 break;
463 }
464 tokens.push(token);
465 }
466 tokens
467 }
468}
469
470#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
472pub enum Expression {
473 Column(String),
475 QualifiedColumn(String, String),
477 Literal(SqlValue),
479 BinaryOp {
481 left: Box<Expression>,
482 op: BinaryOperator,
483 right: Box<Expression>,
484 },
485 UnaryOp {
487 op: UnaryOperator,
488 expr: Box<Expression>,
489 },
490 Function {
492 name: String,
493 args: Vec<Expression>,
494 distinct: bool,
495 },
496 Aggregate {
498 func: AggregateFunction,
499 expr: Box<Expression>,
500 distinct: bool,
501 },
502 Case {
504 operand: Option<Box<Expression>>,
505 when_clauses: Vec<(Expression, Expression)>,
506 else_clause: Option<Box<Expression>>,
507 },
508 Subquery(Box<SelectStatement>),
510 InList {
512 expr: Box<Expression>,
513 list: Vec<Expression>,
514 negated: bool,
515 },
516 Between {
518 expr: Box<Expression>,
519 low: Box<Expression>,
520 high: Box<Expression>,
521 negated: bool,
522 },
523 IsNull {
525 expr: Box<Expression>,
526 negated: bool,
527 },
528 Star,
530}
531
532#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
534pub enum SqlValue {
535 Null,
536 Integer(i64),
537 Float(f64),
538 String(String),
539 Boolean(bool),
540 Timestamp(DateTime<Utc>),
541}
542
543#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
545pub enum BinaryOperator {
546 Plus,
547 Minus,
548 Multiply,
549 Divide,
550 Modulo,
551 Equal,
552 NotEqual,
553 LessThan,
554 LessThanOrEqual,
555 GreaterThan,
556 GreaterThanOrEqual,
557 And,
558 Or,
559 Like,
560}
561
562#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
564pub enum UnaryOperator {
565 Not,
566 Minus,
567}
568
569#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
571pub enum AggregateFunction {
572 Count,
573 Sum,
574 Avg,
575 Min,
576 Max,
577 StdDev,
578 Variance,
579}
580
581#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
583pub struct WindowSpec {
584 pub window_type: WindowType,
586 pub size: Duration,
588 pub slide: Option<Duration>,
590 pub gap: Option<Duration>,
592 pub time_attribute: Option<String>,
594}
595
596#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
598pub enum WindowType {
599 Tumbling,
600 Sliding,
601 Session,
602}
603
604#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
606pub struct SelectItem {
607 pub expr: Expression,
609 pub alias: Option<String>,
611}
612
613#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
615pub enum FromClause {
616 Table { name: String, alias: Option<String> },
618 Join {
620 left: Box<FromClause>,
621 right: Box<FromClause>,
622 join_type: JoinType,
623 condition: Option<Expression>,
624 },
625 Subquery {
627 query: Box<SelectStatement>,
628 alias: String,
629 },
630}
631
632#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
634pub enum JoinType {
635 Inner,
636 Left,
637 Right,
638 Full,
639 Cross,
640}
641
642#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
644pub struct OrderByItem {
645 pub expr: Expression,
647 pub ascending: bool,
649 pub nulls_first: Option<bool>,
651}
652
653#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
655pub struct SelectStatement {
656 pub distinct: bool,
658 pub columns: Vec<SelectItem>,
660 pub from: Option<FromClause>,
662 pub where_clause: Option<Expression>,
664 pub group_by: Vec<Expression>,
666 pub having: Option<Expression>,
668 pub order_by: Vec<OrderByItem>,
670 pub limit: Option<usize>,
672 pub offset: Option<usize>,
674 pub window: Option<WindowSpec>,
676}
677
678#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
680pub struct CreateStreamStatement {
681 pub name: String,
683 pub columns: Vec<ColumnDefinition>,
685 pub properties: HashMap<String, String>,
687}
688
689#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
691pub struct ColumnDefinition {
692 pub name: String,
694 pub data_type: DataType,
696 pub not_null: bool,
698 pub default: Option<Expression>,
700}
701
702#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
704pub enum DataType {
705 Integer,
706 Float,
707 String,
708 Boolean,
709 Timestamp,
710 Array(Box<DataType>),
711 Map(Box<DataType>, Box<DataType>),
712}
713
714pub struct Parser {
716 tokens: Vec<Token>,
717 position: usize,
718}
719
720impl Parser {
721 pub fn new(tokens: Vec<Token>) -> Self {
723 Self {
724 tokens,
725 position: 0,
726 }
727 }
728
729 fn current_token(&self) -> &Token {
731 self.tokens.get(self.position).unwrap_or(&Token::Eof)
732 }
733
734 fn peek_token(&self) -> &Token {
736 self.tokens.get(self.position + 1).unwrap_or(&Token::Eof)
737 }
738
739 fn advance(&mut self) {
741 self.position += 1;
742 }
743
744 fn expect(&mut self, expected: Token) -> Result<()> {
746 if self.current_token() == &expected {
747 self.advance();
748 Ok(())
749 } else {
750 Err(anyhow!(
751 "Expected {:?}, got {:?}",
752 expected,
753 self.current_token()
754 ))
755 }
756 }
757
758 pub fn parse_select(&mut self) -> Result<SelectStatement> {
760 self.expect(Token::Select)?;
761
762 let distinct = if self.current_token() == &Token::Distinct {
764 self.advance();
765 true
766 } else {
767 false
768 };
769
770 let columns = self.parse_select_list()?;
772
773 let from = if self.current_token() == &Token::From {
775 self.advance();
776 Some(self.parse_from_clause()?)
777 } else {
778 None
779 };
780
781 let window = if self.current_token() == &Token::Window {
783 self.advance();
784 Some(self.parse_window_spec()?)
785 } else {
786 None
787 };
788
789 let where_clause = if self.current_token() == &Token::Where {
791 self.advance();
792 Some(self.parse_expression()?)
793 } else {
794 None
795 };
796
797 let group_by = if self.current_token() == &Token::Group {
799 self.advance();
800 if self.current_token() == &Token::By {
802 self.advance();
803 }
804 self.parse_expression_list()?
805 } else {
806 Vec::new()
807 };
808
809 let having = if self.current_token() == &Token::Having {
811 self.advance();
812 Some(self.parse_expression()?)
813 } else {
814 None
815 };
816
817 let order_by = if self.current_token() == &Token::Order {
819 self.advance();
820 if self.current_token() == &Token::By {
822 self.advance();
823 }
824 self.parse_order_by_list()?
825 } else {
826 Vec::new()
827 };
828
829 let limit = if self.current_token() == &Token::Limit {
831 self.advance();
832 if let Token::NumberLiteral(n) = self.current_token() {
833 let limit = *n as usize;
834 self.advance();
835 Some(limit)
836 } else {
837 None
838 }
839 } else {
840 None
841 };
842
843 Ok(SelectStatement {
844 distinct,
845 columns,
846 from,
847 where_clause,
848 group_by,
849 having,
850 order_by,
851 limit,
852 offset: None,
853 window,
854 })
855 }
856
857 fn parse_select_list(&mut self) -> Result<Vec<SelectItem>> {
859 let mut items = Vec::new();
860
861 loop {
862 let expr = self.parse_expression()?;
863
864 let alias = if self.current_token() == &Token::As {
866 self.advance();
867 if let Token::Identifier(name) = self.current_token().clone() {
868 self.advance();
869 Some(name)
870 } else {
871 None
872 }
873 } else if let Token::Identifier(name) = self.current_token().clone() {
874 if name.to_uppercase() != "FROM"
876 && name.to_uppercase() != "WHERE"
877 && name.to_uppercase() != "GROUP"
878 && name.to_uppercase() != "ORDER"
879 && name.to_uppercase() != "WINDOW"
880 {
881 self.advance();
882 Some(name)
883 } else {
884 None
885 }
886 } else {
887 None
888 };
889
890 items.push(SelectItem { expr, alias });
891
892 if self.current_token() != &Token::Comma {
893 break;
894 }
895 self.advance(); }
897
898 Ok(items)
899 }
900
901 fn parse_from_clause(&mut self) -> Result<FromClause> {
903 let mut from = self.parse_table_reference()?;
904
905 while matches!(
907 self.current_token(),
908 Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full
909 ) {
910 let join_type = self.parse_join_type()?;
911 let right = self.parse_table_reference()?;
912
913 let condition = if self.current_token() == &Token::On {
914 self.advance();
915 Some(self.parse_expression()?)
916 } else {
917 None
918 };
919
920 from = FromClause::Join {
921 left: Box::new(from),
922 right: Box::new(right),
923 join_type,
924 condition,
925 };
926 }
927
928 Ok(from)
929 }
930
931 fn parse_table_reference(&mut self) -> Result<FromClause> {
933 if let Token::Identifier(name) = self.current_token().clone() {
934 self.advance();
935
936 let alias = if self.current_token() == &Token::As {
937 self.advance();
938 if let Token::Identifier(alias) = self.current_token().clone() {
939 self.advance();
940 Some(alias)
941 } else {
942 None
943 }
944 } else if let Token::Identifier(alias) = self.current_token().clone() {
945 if !matches!(
947 alias.to_uppercase().as_str(),
948 "WHERE"
949 | "GROUP"
950 | "ORDER"
951 | "HAVING"
952 | "LIMIT"
953 | "JOIN"
954 | "INNER"
955 | "LEFT"
956 | "RIGHT"
957 | "FULL"
958 | "ON"
959 | "WINDOW"
960 ) {
961 self.advance();
962 Some(alias)
963 } else {
964 None
965 }
966 } else {
967 None
968 };
969
970 Ok(FromClause::Table { name, alias })
971 } else {
972 Err(anyhow!("Expected table name"))
973 }
974 }
975
976 fn parse_join_type(&mut self) -> Result<JoinType> {
978 let join_type = match self.current_token() {
979 Token::Inner => {
980 self.advance();
981 JoinType::Inner
982 }
983 Token::Left => {
984 self.advance();
985 if self.current_token() == &Token::Outer {
986 self.advance();
987 }
988 JoinType::Left
989 }
990 Token::Right => {
991 self.advance();
992 if self.current_token() == &Token::Outer {
993 self.advance();
994 }
995 JoinType::Right
996 }
997 Token::Full => {
998 self.advance();
999 if self.current_token() == &Token::Outer {
1000 self.advance();
1001 }
1002 JoinType::Full
1003 }
1004 _ => JoinType::Inner,
1005 };
1006
1007 if self.current_token() == &Token::Join {
1009 self.advance();
1010 }
1011
1012 Ok(join_type)
1013 }
1014
1015 fn parse_window_spec(&mut self) -> Result<WindowSpec> {
1017 let window_type = match self.current_token() {
1018 Token::Tumbling => {
1019 self.advance();
1020 WindowType::Tumbling
1021 }
1022 Token::Sliding => {
1023 self.advance();
1024 WindowType::Sliding
1025 }
1026 Token::Session => {
1027 self.advance();
1028 WindowType::Session
1029 }
1030 _ => WindowType::Tumbling,
1031 };
1032
1033 self.expect(Token::OpenParen)?;
1034
1035 let mut size = Duration::from_secs(60);
1036 let mut slide = None;
1037 let mut gap = None;
1038
1039 while self.current_token() != &Token::CloseParen {
1041 match self.current_token() {
1042 Token::Size => {
1043 self.advance();
1044 size = self.parse_duration()?;
1045 }
1046 Token::Slide => {
1047 self.advance();
1048 slide = Some(self.parse_duration()?);
1049 }
1050 Token::Gap => {
1051 self.advance();
1052 gap = Some(self.parse_duration()?);
1053 }
1054 Token::Comma => {
1055 self.advance();
1056 }
1057 _ => {
1058 self.advance();
1059 }
1060 }
1061 }
1062
1063 self.expect(Token::CloseParen)?;
1064
1065 Ok(WindowSpec {
1066 window_type,
1067 size,
1068 slide,
1069 gap,
1070 time_attribute: None,
1071 })
1072 }
1073
1074 fn parse_duration(&mut self) -> Result<Duration> {
1076 let value = if let Token::NumberLiteral(n) = self.current_token() {
1077 let v = *n as u64;
1078 self.advance();
1079 v
1080 } else {
1081 return Err(anyhow!("Expected number for duration"));
1082 };
1083
1084 let unit = if let Token::Identifier(unit) = self.current_token().clone() {
1085 self.advance();
1086 unit.to_uppercase()
1087 } else {
1088 "SECONDS".to_string()
1089 };
1090
1091 let duration = match unit.as_str() {
1092 "MILLISECONDS" | "MILLIS" | "MS" => Duration::from_millis(value),
1093 "SECONDS" | "SECOND" | "S" => Duration::from_secs(value),
1094 "MINUTES" | "MINUTE" | "M" => Duration::from_secs(value * 60),
1095 "HOURS" | "HOUR" | "H" => Duration::from_secs(value * 3600),
1096 "DAYS" | "DAY" | "D" => Duration::from_secs(value * 86400),
1097 _ => Duration::from_secs(value),
1098 };
1099
1100 Ok(duration)
1101 }
1102
1103 fn parse_expression(&mut self) -> Result<Expression> {
1105 self.parse_or_expression()
1106 }
1107
1108 fn parse_or_expression(&mut self) -> Result<Expression> {
1110 let mut left = self.parse_and_expression()?;
1111
1112 while self.current_token() == &Token::Or {
1113 self.advance();
1114 let right = self.parse_and_expression()?;
1115 left = Expression::BinaryOp {
1116 left: Box::new(left),
1117 op: BinaryOperator::Or,
1118 right: Box::new(right),
1119 };
1120 }
1121
1122 Ok(left)
1123 }
1124
1125 fn parse_and_expression(&mut self) -> Result<Expression> {
1127 let mut left = self.parse_comparison_expression()?;
1128
1129 while self.current_token() == &Token::And {
1130 self.advance();
1131 let right = self.parse_comparison_expression()?;
1132 left = Expression::BinaryOp {
1133 left: Box::new(left),
1134 op: BinaryOperator::And,
1135 right: Box::new(right),
1136 };
1137 }
1138
1139 Ok(left)
1140 }
1141
1142 fn parse_comparison_expression(&mut self) -> Result<Expression> {
1144 let left = self.parse_additive_expression()?;
1145
1146 let op = match self.current_token() {
1147 Token::Equal => Some(BinaryOperator::Equal),
1148 Token::NotEqual => Some(BinaryOperator::NotEqual),
1149 Token::LessThan => Some(BinaryOperator::LessThan),
1150 Token::LessThanOrEqual => Some(BinaryOperator::LessThanOrEqual),
1151 Token::GreaterThan => Some(BinaryOperator::GreaterThan),
1152 Token::GreaterThanOrEqual => Some(BinaryOperator::GreaterThanOrEqual),
1153 Token::Like => Some(BinaryOperator::Like),
1154 _ => None,
1155 };
1156
1157 if let Some(op) = op {
1158 self.advance();
1159 let right = self.parse_additive_expression()?;
1160 Ok(Expression::BinaryOp {
1161 left: Box::new(left),
1162 op,
1163 right: Box::new(right),
1164 })
1165 } else {
1166 Ok(left)
1167 }
1168 }
1169
1170 fn parse_additive_expression(&mut self) -> Result<Expression> {
1172 let mut left = self.parse_multiplicative_expression()?;
1173
1174 loop {
1175 let op = match self.current_token() {
1176 Token::Plus => Some(BinaryOperator::Plus),
1177 Token::Minus => Some(BinaryOperator::Minus),
1178 _ => None,
1179 };
1180
1181 if let Some(op) = op {
1182 self.advance();
1183 let right = self.parse_multiplicative_expression()?;
1184 left = Expression::BinaryOp {
1185 left: Box::new(left),
1186 op,
1187 right: Box::new(right),
1188 };
1189 } else {
1190 break;
1191 }
1192 }
1193
1194 Ok(left)
1195 }
1196
1197 fn parse_multiplicative_expression(&mut self) -> Result<Expression> {
1199 let mut left = self.parse_unary_expression()?;
1200
1201 loop {
1202 let op = match self.current_token() {
1203 Token::Multiply | Token::Star => Some(BinaryOperator::Multiply),
1204 Token::Divide => Some(BinaryOperator::Divide),
1205 Token::Modulo => Some(BinaryOperator::Modulo),
1206 _ => None,
1207 };
1208
1209 if let Some(op) = op {
1210 self.advance();
1211 let right = self.parse_unary_expression()?;
1212 left = Expression::BinaryOp {
1213 left: Box::new(left),
1214 op,
1215 right: Box::new(right),
1216 };
1217 } else {
1218 break;
1219 }
1220 }
1221
1222 Ok(left)
1223 }
1224
1225 fn parse_unary_expression(&mut self) -> Result<Expression> {
1227 match self.current_token() {
1228 Token::Not => {
1229 self.advance();
1230 let expr = self.parse_unary_expression()?;
1231 Ok(Expression::UnaryOp {
1232 op: UnaryOperator::Not,
1233 expr: Box::new(expr),
1234 })
1235 }
1236 Token::Minus => {
1237 self.advance();
1238 let expr = self.parse_unary_expression()?;
1239 Ok(Expression::UnaryOp {
1240 op: UnaryOperator::Minus,
1241 expr: Box::new(expr),
1242 })
1243 }
1244 _ => self.parse_primary_expression(),
1245 }
1246 }
1247
1248 fn parse_primary_expression(&mut self) -> Result<Expression> {
1250 match self.current_token().clone() {
1251 Token::Star => {
1252 self.advance();
1253 Ok(Expression::Star)
1254 }
1255 Token::NumberLiteral(n) => {
1256 self.advance();
1257 if n.fract() == 0.0 {
1258 Ok(Expression::Literal(SqlValue::Integer(n as i64)))
1259 } else {
1260 Ok(Expression::Literal(SqlValue::Float(n)))
1261 }
1262 }
1263 Token::StringLiteral(s) => {
1264 self.advance();
1265 Ok(Expression::Literal(SqlValue::String(s)))
1266 }
1267 Token::BooleanLiteral(b) => {
1268 self.advance();
1269 Ok(Expression::Literal(SqlValue::Boolean(b)))
1270 }
1271 Token::Null => {
1272 self.advance();
1273 Ok(Expression::Literal(SqlValue::Null))
1274 }
1275 Token::Count
1276 | Token::Sum
1277 | Token::Avg
1278 | Token::Min
1279 | Token::Max
1280 | Token::StdDev
1281 | Token::Variance => {
1282 let func = match self.current_token() {
1283 Token::Count => AggregateFunction::Count,
1284 Token::Sum => AggregateFunction::Sum,
1285 Token::Avg => AggregateFunction::Avg,
1286 Token::Min => AggregateFunction::Min,
1287 Token::Max => AggregateFunction::Max,
1288 Token::StdDev => AggregateFunction::StdDev,
1289 Token::Variance => AggregateFunction::Variance,
1290 _ => unreachable!(),
1291 };
1292 self.advance();
1293 self.expect(Token::OpenParen)?;
1294
1295 let distinct = if self.current_token() == &Token::Distinct {
1296 self.advance();
1297 true
1298 } else {
1299 false
1300 };
1301
1302 let expr = self.parse_expression()?;
1303 self.expect(Token::CloseParen)?;
1304
1305 Ok(Expression::Aggregate {
1306 func,
1307 expr: Box::new(expr),
1308 distinct,
1309 })
1310 }
1311 Token::Identifier(name) => {
1312 self.advance();
1313
1314 if self.current_token() == &Token::OpenParen {
1316 self.advance();
1317 let mut args = Vec::new();
1318
1319 if self.current_token() != &Token::CloseParen {
1320 loop {
1321 args.push(self.parse_expression()?);
1322 if self.current_token() != &Token::Comma {
1323 break;
1324 }
1325 self.advance();
1326 }
1327 }
1328
1329 self.expect(Token::CloseParen)?;
1330
1331 Ok(Expression::Function {
1332 name,
1333 args,
1334 distinct: false,
1335 })
1336 } else if self.current_token() == &Token::Dot {
1337 self.advance();
1339 if let Token::Identifier(column) = self.current_token().clone() {
1340 self.advance();
1341 Ok(Expression::QualifiedColumn(name, column))
1342 } else {
1343 Ok(Expression::Column(name))
1344 }
1345 } else {
1346 Ok(Expression::Column(name))
1347 }
1348 }
1349 Token::OpenParen => {
1350 self.advance();
1351 let expr = self.parse_expression()?;
1352 self.expect(Token::CloseParen)?;
1353 Ok(expr)
1354 }
1355 _ => Err(anyhow!("Unexpected token: {:?}", self.current_token())),
1356 }
1357 }
1358
1359 fn parse_expression_list(&mut self) -> Result<Vec<Expression>> {
1361 let mut exprs = Vec::new();
1362
1363 loop {
1364 exprs.push(self.parse_expression()?);
1365 if self.current_token() != &Token::Comma {
1366 break;
1367 }
1368 self.advance();
1369 }
1370
1371 Ok(exprs)
1372 }
1373
1374 fn parse_order_by_list(&mut self) -> Result<Vec<OrderByItem>> {
1376 let mut items = Vec::new();
1377
1378 loop {
1379 let expr = self.parse_expression()?;
1380
1381 let ascending = if let Token::Identifier(dir) = self.current_token().clone() {
1382 match dir.to_uppercase().as_str() {
1383 "ASC" => {
1384 self.advance();
1385 true
1386 }
1387 "DESC" => {
1388 self.advance();
1389 false
1390 }
1391 _ => true,
1392 }
1393 } else {
1394 true
1395 };
1396
1397 items.push(OrderByItem {
1398 expr,
1399 ascending,
1400 nulls_first: None,
1401 });
1402
1403 if self.current_token() != &Token::Comma {
1404 break;
1405 }
1406 self.advance();
1407 }
1408
1409 Ok(items)
1410 }
1411}
1412
1413#[derive(Debug, Clone, Serialize, Deserialize)]
1415pub struct ResultRow {
1416 pub values: Vec<SqlValue>,
1418}
1419
1420#[derive(Debug, Clone, Serialize, Deserialize)]
1422pub struct QueryResult {
1423 pub columns: Vec<String>,
1425 pub rows: Vec<ResultRow>,
1427 pub execution_time: Duration,
1429 pub rows_affected: usize,
1431}
1432
1433pub struct StreamSqlEngine {
1435 config: StreamSqlConfig,
1437 streams: Arc<RwLock<HashMap<String, StreamMetadata>>>,
1439 query_cache: Arc<RwLock<HashMap<String, SelectStatement>>>,
1441 stats: Arc<RwLock<StreamSqlStats>>,
1443}
1444
1445#[derive(Debug, Clone, Serialize, Deserialize)]
1447pub struct StreamMetadata {
1448 pub name: String,
1450 pub columns: Vec<ColumnDefinition>,
1452 pub properties: HashMap<String, String>,
1454 pub created_at: DateTime<Utc>,
1456}
1457
1458#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1460pub struct StreamSqlStats {
1461 pub queries_executed: u64,
1463 pub queries_succeeded: u64,
1465 pub queries_failed: u64,
1467 pub avg_execution_time_ms: f64,
1469 pub cache_hits: u64,
1471 pub cache_misses: u64,
1473}
1474
1475impl StreamSqlEngine {
1476 pub fn new(config: StreamSqlConfig) -> Self {
1478 Self {
1479 config,
1480 streams: Arc::new(RwLock::new(HashMap::new())),
1481 query_cache: Arc::new(RwLock::new(HashMap::new())),
1482 stats: Arc::new(RwLock::new(StreamSqlStats::default())),
1483 }
1484 }
1485
1486 pub fn parse(&self, sql: &str) -> Result<SelectStatement> {
1488 let mut lexer = Lexer::new(sql);
1489 let tokens = lexer.tokenize();
1490 let mut parser = Parser::new(tokens);
1491 parser.parse_select()
1492 }
1493
1494 pub async fn execute(&self, sql: &str) -> Result<QueryResult> {
1496 let start_time = std::time::Instant::now();
1497
1498 if self.config.enable_query_cache {
1500 let cache = self.query_cache.read().await;
1501 if cache.contains_key(sql) {
1502 let mut stats = self.stats.write().await;
1503 stats.cache_hits += 1;
1504 debug!("Query cache hit");
1505 } else {
1506 let mut stats = self.stats.write().await;
1507 stats.cache_misses += 1;
1508 }
1509 }
1510
1511 let statement = self.parse(sql)?;
1513
1514 if self.config.enable_query_cache {
1516 let mut cache = self.query_cache.write().await;
1517 if cache.len() < self.config.cache_size {
1518 cache.insert(sql.to_string(), statement.clone());
1519 }
1520 }
1521
1522 let result = QueryResult {
1524 columns: statement
1525 .columns
1526 .iter()
1527 .map(|c| c.alias.clone().unwrap_or_else(|| format!("column_{}", 0)))
1528 .collect(),
1529 rows: Vec::new(),
1530 execution_time: start_time.elapsed(),
1531 rows_affected: 0,
1532 };
1533
1534 let mut stats = self.stats.write().await;
1536 stats.queries_executed += 1;
1537 stats.queries_succeeded += 1;
1538 stats.avg_execution_time_ms = (stats.avg_execution_time_ms
1539 * (stats.queries_executed - 1) as f64
1540 + result.execution_time.as_millis() as f64)
1541 / stats.queries_executed as f64;
1542
1543 if self.config.enable_query_logging {
1544 info!(
1545 "Executed query in {:?}: {}",
1546 result.execution_time,
1547 &sql[..sql.len().min(100)]
1548 );
1549 }
1550
1551 Ok(result)
1552 }
1553
1554 pub async fn register_stream(&self, metadata: StreamMetadata) -> Result<()> {
1556 let mut streams = self.streams.write().await;
1557 info!("Registering stream: {}", metadata.name);
1558 streams.insert(metadata.name.clone(), metadata);
1559 Ok(())
1560 }
1561
1562 pub async fn unregister_stream(&self, name: &str) -> Result<()> {
1564 let mut streams = self.streams.write().await;
1565 if streams.remove(name).is_some() {
1566 info!("Unregistered stream: {}", name);
1567 Ok(())
1568 } else {
1569 Err(anyhow!("Stream not found: {}", name))
1570 }
1571 }
1572
1573 pub async fn get_stream(&self, name: &str) -> Option<StreamMetadata> {
1575 let streams = self.streams.read().await;
1576 streams.get(name).cloned()
1577 }
1578
1579 pub async fn list_streams(&self) -> Vec<String> {
1581 let streams = self.streams.read().await;
1582 streams.keys().cloned().collect()
1583 }
1584
1585 pub async fn get_stats(&self) -> StreamSqlStats {
1587 self.stats.read().await.clone()
1588 }
1589
1590 pub async fn clear_cache(&self) {
1592 let mut cache = self.query_cache.write().await;
1593 cache.clear();
1594 info!("Query cache cleared");
1595 }
1596
1597 pub fn validate(&self, sql: &str) -> Result<()> {
1599 self.parse(sql)?;
1600 Ok(())
1601 }
1602
1603 pub fn explain(&self, sql: &str) -> Result<String> {
1605 let statement = self.parse(sql)?;
1606 Ok(format!("{:#?}", statement))
1607 }
1608}
1609
1610#[cfg(test)]
1611mod tests {
1612 use super::*;
1613
1614 #[test]
1615 fn test_lexer_basic() {
1616 let mut lexer = Lexer::new("SELECT * FROM events");
1617 let tokens = lexer.tokenize();
1618
1619 assert_eq!(tokens.len(), 5);
1620 assert_eq!(tokens[0], Token::Select);
1621 assert_eq!(tokens[1], Token::Star);
1622 assert_eq!(tokens[2], Token::From);
1623 assert_eq!(tokens[3], Token::Identifier("events".to_string()));
1624 assert_eq!(tokens[4], Token::Eof);
1625 }
1626
1627 #[test]
1628 fn test_lexer_with_literals() {
1629 let mut lexer = Lexer::new("SELECT name, 42, 'hello' FROM events");
1630 let tokens = lexer.tokenize();
1631
1632 assert!(matches!(tokens[1], Token::Identifier(_)));
1633 assert!(matches!(tokens[3], Token::NumberLiteral(_)));
1634 assert!(matches!(tokens[5], Token::StringLiteral(_)));
1635 }
1636
1637 #[test]
1638 fn test_parser_simple_select() {
1639 let mut lexer = Lexer::new("SELECT id, name FROM users WHERE id = 1");
1640 let tokens = lexer.tokenize();
1641 let mut parser = Parser::new(tokens);
1642 let result = parser.parse_select();
1643
1644 assert!(result.is_ok());
1645 let stmt = result.unwrap();
1646 assert_eq!(stmt.columns.len(), 2);
1647 assert!(stmt.where_clause.is_some());
1648 }
1649
1650 #[test]
1651 fn test_parser_aggregate() {
1652 let mut lexer = Lexer::new("SELECT COUNT(*), AVG(value) FROM events");
1653 let tokens = lexer.tokenize();
1654 let mut parser = Parser::new(tokens);
1655 let result = parser.parse_select();
1656
1657 assert!(result.is_ok());
1658 let stmt = result.unwrap();
1659 assert_eq!(stmt.columns.len(), 2);
1660 }
1661
1662 #[test]
1663 fn test_parser_window() {
1664 let mut lexer = Lexer::new("SELECT * FROM events WINDOW TUMBLING (SIZE 5 MINUTES)");
1665 let tokens = lexer.tokenize();
1666 let mut parser = Parser::new(tokens);
1667 let result = parser.parse_select();
1668
1669 assert!(result.is_ok());
1670 let stmt = result.unwrap();
1671 assert!(stmt.window.is_some());
1672 let window = stmt.window.unwrap();
1673 assert_eq!(window.window_type, WindowType::Tumbling);
1674 assert_eq!(window.size, Duration::from_secs(300));
1675 }
1676
1677 #[test]
1678 fn test_parser_group_by() {
1679 let mut lexer = Lexer::new("SELECT sensor_id, AVG(temp) FROM sensors GROUP BY sensor_id");
1680 let tokens = lexer.tokenize();
1681 let mut parser = Parser::new(tokens);
1682 let result = parser.parse_select();
1683
1684 assert!(result.is_ok());
1685 let stmt = result.unwrap();
1686 assert!(!stmt.group_by.is_empty());
1687 }
1688
1689 #[test]
1690 fn test_parser_join() {
1691 let mut lexer = Lexer::new("SELECT * FROM a JOIN b ON a.id = b.aid");
1692 let tokens = lexer.tokenize();
1693 let mut parser = Parser::new(tokens);
1694 let result = parser.parse_select();
1695
1696 assert!(result.is_ok());
1697 let stmt = result.unwrap();
1698 assert!(matches!(stmt.from, Some(FromClause::Join { .. })));
1699 }
1700
1701 #[tokio::test]
1702 async fn test_engine_basic() {
1703 let config = StreamSqlConfig::default();
1704 let engine = StreamSqlEngine::new(config);
1705
1706 let result = engine.execute("SELECT * FROM events").await;
1707 assert!(result.is_ok());
1708
1709 let stats = engine.get_stats().await;
1710 assert_eq!(stats.queries_executed, 1);
1711 assert_eq!(stats.queries_succeeded, 1);
1712 }
1713
1714 #[tokio::test]
1715 async fn test_engine_stream_registration() {
1716 let config = StreamSqlConfig::default();
1717 let engine = StreamSqlEngine::new(config);
1718
1719 let metadata = StreamMetadata {
1720 name: "events".to_string(),
1721 columns: vec![
1722 ColumnDefinition {
1723 name: "id".to_string(),
1724 data_type: DataType::Integer,
1725 not_null: true,
1726 default: None,
1727 },
1728 ColumnDefinition {
1729 name: "value".to_string(),
1730 data_type: DataType::Float,
1731 not_null: false,
1732 default: None,
1733 },
1734 ],
1735 properties: HashMap::new(),
1736 created_at: Utc::now(),
1737 };
1738
1739 engine.register_stream(metadata).await.unwrap();
1740
1741 let streams = engine.list_streams().await;
1742 assert_eq!(streams.len(), 1);
1743 assert!(streams.contains(&"events".to_string()));
1744
1745 let stream = engine.get_stream("events").await;
1746 assert!(stream.is_some());
1747 assert_eq!(stream.unwrap().columns.len(), 2);
1748
1749 engine.unregister_stream("events").await.unwrap();
1750 let streams = engine.list_streams().await;
1751 assert!(streams.is_empty());
1752 }
1753
1754 #[test]
1755 fn test_engine_validate() {
1756 let config = StreamSqlConfig::default();
1757 let engine = StreamSqlEngine::new(config);
1758
1759 assert!(engine.validate("SELECT * FROM events").is_ok());
1760 assert!(engine.validate("INVALID SQL").is_err());
1761 }
1762
1763 #[test]
1764 fn test_engine_explain() {
1765 let config = StreamSqlConfig::default();
1766 let engine = StreamSqlEngine::new(config);
1767
1768 let result = engine.explain("SELECT COUNT(*) FROM events WHERE value > 10");
1769 assert!(result.is_ok());
1770 let explanation = result.unwrap();
1771 assert!(!explanation.is_empty());
1772 }
1773
1774 #[tokio::test]
1775 async fn test_engine_caching() {
1776 let config = StreamSqlConfig {
1777 enable_query_cache: true,
1778 cache_size: 100,
1779 ..Default::default()
1780 };
1781 let engine = StreamSqlEngine::new(config);
1782
1783 engine.execute("SELECT * FROM events").await.unwrap();
1785
1786 engine.execute("SELECT * FROM events").await.unwrap();
1788
1789 let stats = engine.get_stats().await;
1790 assert_eq!(stats.cache_misses, 1);
1791 assert_eq!(stats.cache_hits, 1);
1792 }
1793
1794 #[test]
1795 fn test_parser_complex_expression() {
1796 let mut lexer = Lexer::new(
1797 "SELECT * FROM events WHERE (value > 10 AND status = 'active') OR priority = 1",
1798 );
1799 let tokens = lexer.tokenize();
1800 let mut parser = Parser::new(tokens);
1801 let result = parser.parse_select();
1802
1803 assert!(result.is_ok());
1804 let stmt = result.unwrap();
1805 assert!(stmt.where_clause.is_some());
1806 }
1807
1808 #[test]
1809 fn test_parser_order_by() {
1810 let mut lexer = Lexer::new("SELECT * FROM events ORDER BY created_at DESC, id ASC");
1811 let tokens = lexer.tokenize();
1812 let mut parser = Parser::new(tokens);
1813 let result = parser.parse_select();
1814
1815 assert!(result.is_ok(), "Parse failed: {:?}", result);
1816 let stmt = result.unwrap();
1817 assert_eq!(stmt.order_by.len(), 2);
1818 assert!(!stmt.order_by[0].ascending);
1819 assert!(stmt.order_by[1].ascending);
1820 }
1821
1822 #[test]
1823 fn test_parser_distinct() {
1824 let mut lexer = Lexer::new("SELECT DISTINCT sensor_id FROM readings");
1825 let tokens = lexer.tokenize();
1826 let mut parser = Parser::new(tokens);
1827 let result = parser.parse_select();
1828
1829 assert!(result.is_ok());
1830 let stmt = result.unwrap();
1831 assert!(stmt.distinct);
1832 }
1833
1834 #[test]
1835 fn test_parser_sliding_window() {
1836 let mut lexer =
1837 Lexer::new("SELECT * FROM events WINDOW SLIDING (SIZE 10 SECONDS, SLIDE 5 SECONDS)");
1838 let tokens = lexer.tokenize();
1839 let mut parser = Parser::new(tokens);
1840 let result = parser.parse_select();
1841
1842 assert!(result.is_ok());
1843 let stmt = result.unwrap();
1844 assert!(stmt.window.is_some());
1845 let window = stmt.window.unwrap();
1846 assert_eq!(window.window_type, WindowType::Sliding);
1847 assert_eq!(window.size, Duration::from_secs(10));
1848 assert_eq!(window.slide, Some(Duration::from_secs(5)));
1849 }
1850}