1use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::RwLock;
31use tracing::{debug, info};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct StreamSqlConfig {
36 pub max_execution_time: Duration,
38 pub enable_optimization: bool,
40 pub max_memory_bytes: usize,
42 pub parallel_execution: bool,
44 pub worker_threads: usize,
46 pub enable_query_cache: bool,
48 pub cache_size: usize,
50 pub enable_query_logging: bool,
52 pub default_window_size: Duration,
54}
55
56impl Default for StreamSqlConfig {
57 fn default() -> Self {
58 Self {
59 max_execution_time: Duration::from_secs(60),
60 enable_optimization: true,
61 max_memory_bytes: 1024 * 1024 * 1024, parallel_execution: true,
63 worker_threads: 4,
64 enable_query_cache: true,
65 cache_size: 1000,
66 enable_query_logging: true,
67 default_window_size: Duration::from_secs(60),
68 }
69 }
70}
71
72#[derive(Debug, Clone, PartialEq)]
74pub enum QueryType {
75 Select,
77 CreateStream,
79 DropStream,
81 Insert,
83 CreateView,
85 Describe,
87 Explain,
89}
90
91#[derive(Debug, Clone, PartialEq)]
93pub enum Token {
94 Select,
96 From,
97 Where,
98 Group,
99 By,
100 Having,
101 Order,
102 Limit,
103 Window,
104 Tumbling,
105 Sliding,
106 Session,
107 Size,
108 Slide,
109 Gap,
110 Create,
111 Stream,
112 View,
113 Drop,
114 Insert,
115 Into,
116 Values,
117 As,
118 And,
119 Or,
120 Not,
121 In,
122 Like,
123 Between,
124 Is,
125 Null,
126 Join,
127 Inner,
128 Left,
129 Right,
130 Full,
131 Outer,
132 On,
133 Describe,
134 Explain,
135 Distinct,
136 Case,
137 When,
138 Then,
139 Else,
140 End,
141
142 Int,
144 Float,
145 String,
146 Boolean,
147 Timestamp,
148
149 Plus,
151 Minus,
152 Multiply,
153 Divide,
154 Modulo,
155 Equal,
156 NotEqual,
157 LessThan,
158 LessThanOrEqual,
159 GreaterThan,
160 GreaterThanOrEqual,
161
162 Comma,
164 Dot,
165 Semicolon,
166 OpenParen,
167 CloseParen,
168 OpenBracket,
169 CloseBracket,
170
171 Identifier(String),
173 StringLiteral(String),
174 NumberLiteral(f64),
175 BooleanLiteral(bool),
176
177 Count,
179 Sum,
180 Avg,
181 Min,
182 Max,
183 StdDev,
184 Variance,
185
186 Star,
188 Eof,
189}
190
191pub struct Lexer {
193 input: Vec<char>,
194 position: usize,
195 current_char: Option<char>,
196}
197
198impl Lexer {
199 pub fn new(input: &str) -> Self {
201 let chars: Vec<char> = input.chars().collect();
202 let current_char = chars.first().copied();
203 Self {
204 input: chars,
205 position: 0,
206 current_char,
207 }
208 }
209
210 fn advance(&mut self) {
212 self.position += 1;
213 self.current_char = self.input.get(self.position).copied();
214 }
215
216 fn peek(&self) -> Option<char> {
218 self.input.get(self.position + 1).copied()
219 }
220
221 fn skip_whitespace(&mut self) {
223 while let Some(c) = self.current_char {
224 if c.is_whitespace() {
225 self.advance();
226 } else {
227 break;
228 }
229 }
230 }
231
232 fn read_identifier(&mut self) -> String {
234 let mut result = String::new();
235 while let Some(c) = self.current_char {
236 if c.is_alphanumeric() || c == '_' {
237 result.push(c);
238 self.advance();
239 } else {
240 break;
241 }
242 }
243 result
244 }
245
246 fn read_number(&mut self) -> f64 {
248 let mut result = String::new();
249 while let Some(c) = self.current_char {
250 if c.is_ascii_digit() || c == '.' {
251 result.push(c);
252 self.advance();
253 } else {
254 break;
255 }
256 }
257 result.parse().unwrap_or(0.0)
258 }
259
260 fn read_string(&mut self) -> String {
262 let quote = self.current_char.unwrap();
263 self.advance(); let mut result = String::new();
265 while let Some(c) = self.current_char {
266 if c == quote {
267 self.advance(); break;
269 } else if c == '\\' {
270 self.advance();
271 if let Some(escaped) = self.current_char {
272 result.push(escaped);
273 self.advance();
274 }
275 } else {
276 result.push(c);
277 self.advance();
278 }
279 }
280 result
281 }
282
283 pub fn next_token(&mut self) -> Token {
285 self.skip_whitespace();
286
287 match self.current_char {
288 None => Token::Eof,
289 Some(c) => {
290 if c.is_alphabetic() || c == '_' {
291 let ident = self.read_identifier();
292 match ident.to_uppercase().as_str() {
293 "SELECT" => Token::Select,
294 "FROM" => Token::From,
295 "WHERE" => Token::Where,
296 "GROUP" => Token::Group,
297 "BY" => Token::By,
298 "HAVING" => Token::Having,
299 "ORDER" => Token::Order,
300 "LIMIT" => Token::Limit,
301 "WINDOW" => Token::Window,
302 "TUMBLING" => Token::Tumbling,
303 "SLIDING" => Token::Sliding,
304 "SESSION" => Token::Session,
305 "SIZE" => Token::Size,
306 "SLIDE" => Token::Slide,
307 "GAP" => Token::Gap,
308 "CREATE" => Token::Create,
309 "STREAM" => Token::Stream,
310 "VIEW" => Token::View,
311 "DROP" => Token::Drop,
312 "INSERT" => Token::Insert,
313 "INTO" => Token::Into,
314 "VALUES" => Token::Values,
315 "AS" => Token::As,
316 "AND" => Token::And,
317 "OR" => Token::Or,
318 "NOT" => Token::Not,
319 "IN" => Token::In,
320 "LIKE" => Token::Like,
321 "BETWEEN" => Token::Between,
322 "IS" => Token::Is,
323 "NULL" => Token::Null,
324 "JOIN" => Token::Join,
325 "INNER" => Token::Inner,
326 "LEFT" => Token::Left,
327 "RIGHT" => Token::Right,
328 "FULL" => Token::Full,
329 "OUTER" => Token::Outer,
330 "ON" => Token::On,
331 "DESCRIBE" => Token::Describe,
332 "EXPLAIN" => Token::Explain,
333 "DISTINCT" => Token::Distinct,
334 "CASE" => Token::Case,
335 "WHEN" => Token::When,
336 "THEN" => Token::Then,
337 "ELSE" => Token::Else,
338 "END" => Token::End,
339 "INT" | "INTEGER" => Token::Int,
340 "FLOAT" | "DOUBLE" => Token::Float,
341 "STRING" | "VARCHAR" | "TEXT" => Token::String,
342 "BOOLEAN" | "BOOL" => Token::Boolean,
343 "TIMESTAMP" | "DATETIME" => Token::Timestamp,
344 "COUNT" => Token::Count,
345 "SUM" => Token::Sum,
346 "AVG" => Token::Avg,
347 "MIN" => Token::Min,
348 "MAX" => Token::Max,
349 "STDDEV" => Token::StdDev,
350 "VARIANCE" | "VAR" => Token::Variance,
351 "TRUE" => Token::BooleanLiteral(true),
352 "FALSE" => Token::BooleanLiteral(false),
353 _ => Token::Identifier(ident),
354 }
355 } else if c.is_ascii_digit() {
356 Token::NumberLiteral(self.read_number())
357 } else if c == '\'' || c == '"' {
358 Token::StringLiteral(self.read_string())
359 } else {
360 match c {
361 '+' => {
362 self.advance();
363 Token::Plus
364 }
365 '-' => {
366 self.advance();
367 Token::Minus
368 }
369 '*' => {
370 self.advance();
371 Token::Star
372 }
373 '/' => {
374 self.advance();
375 Token::Divide
376 }
377 '%' => {
378 self.advance();
379 Token::Modulo
380 }
381 '=' => {
382 self.advance();
383 Token::Equal
384 }
385 '<' => {
386 self.advance();
387 if self.current_char == Some('=') {
388 self.advance();
389 Token::LessThanOrEqual
390 } else if self.current_char == Some('>') {
391 self.advance();
392 Token::NotEqual
393 } else {
394 Token::LessThan
395 }
396 }
397 '>' => {
398 self.advance();
399 if self.current_char == Some('=') {
400 self.advance();
401 Token::GreaterThanOrEqual
402 } else {
403 Token::GreaterThan
404 }
405 }
406 '!' => {
407 self.advance();
408 if self.current_char == Some('=') {
409 self.advance();
410 Token::NotEqual
411 } else {
412 Token::Not
413 }
414 }
415 ',' => {
416 self.advance();
417 Token::Comma
418 }
419 '.' => {
420 self.advance();
421 Token::Dot
422 }
423 ';' => {
424 self.advance();
425 Token::Semicolon
426 }
427 '(' => {
428 self.advance();
429 Token::OpenParen
430 }
431 ')' => {
432 self.advance();
433 Token::CloseParen
434 }
435 '[' => {
436 self.advance();
437 Token::OpenBracket
438 }
439 ']' => {
440 self.advance();
441 Token::CloseBracket
442 }
443 _ => {
444 self.advance();
445 Token::Eof
446 }
447 }
448 }
449 }
450 }
451 }
452
453 pub fn tokenize(&mut self) -> Vec<Token> {
455 let mut tokens = Vec::new();
456 loop {
457 let token = self.next_token();
458 if token == Token::Eof {
459 tokens.push(token);
460 break;
461 }
462 tokens.push(token);
463 }
464 tokens
465 }
466}
467
468#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
470pub enum Expression {
471 Column(String),
473 QualifiedColumn(String, String),
475 Literal(SqlValue),
477 BinaryOp {
479 left: Box<Expression>,
480 op: BinaryOperator,
481 right: Box<Expression>,
482 },
483 UnaryOp {
485 op: UnaryOperator,
486 expr: Box<Expression>,
487 },
488 Function {
490 name: String,
491 args: Vec<Expression>,
492 distinct: bool,
493 },
494 Aggregate {
496 func: AggregateFunction,
497 expr: Box<Expression>,
498 distinct: bool,
499 },
500 Case {
502 operand: Option<Box<Expression>>,
503 when_clauses: Vec<(Expression, Expression)>,
504 else_clause: Option<Box<Expression>>,
505 },
506 Subquery(Box<SelectStatement>),
508 InList {
510 expr: Box<Expression>,
511 list: Vec<Expression>,
512 negated: bool,
513 },
514 Between {
516 expr: Box<Expression>,
517 low: Box<Expression>,
518 high: Box<Expression>,
519 negated: bool,
520 },
521 IsNull {
523 expr: Box<Expression>,
524 negated: bool,
525 },
526 Star,
528}
529
530#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
532pub enum SqlValue {
533 Null,
534 Integer(i64),
535 Float(f64),
536 String(String),
537 Boolean(bool),
538 Timestamp(DateTime<Utc>),
539}
540
541#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
543pub enum BinaryOperator {
544 Plus,
545 Minus,
546 Multiply,
547 Divide,
548 Modulo,
549 Equal,
550 NotEqual,
551 LessThan,
552 LessThanOrEqual,
553 GreaterThan,
554 GreaterThanOrEqual,
555 And,
556 Or,
557 Like,
558}
559
560#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
562pub enum UnaryOperator {
563 Not,
564 Minus,
565}
566
567#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
569pub enum AggregateFunction {
570 Count,
571 Sum,
572 Avg,
573 Min,
574 Max,
575 StdDev,
576 Variance,
577}
578
579#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
581pub struct WindowSpec {
582 pub window_type: WindowType,
584 pub size: Duration,
586 pub slide: Option<Duration>,
588 pub gap: Option<Duration>,
590 pub time_attribute: Option<String>,
592}
593
594#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
596pub enum WindowType {
597 Tumbling,
598 Sliding,
599 Session,
600}
601
602#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
604pub struct SelectItem {
605 pub expr: Expression,
607 pub alias: Option<String>,
609}
610
611#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
613pub enum FromClause {
614 Table { name: String, alias: Option<String> },
616 Join {
618 left: Box<FromClause>,
619 right: Box<FromClause>,
620 join_type: JoinType,
621 condition: Option<Expression>,
622 },
623 Subquery {
625 query: Box<SelectStatement>,
626 alias: String,
627 },
628}
629
630#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
632pub enum JoinType {
633 Inner,
634 Left,
635 Right,
636 Full,
637 Cross,
638}
639
640#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
642pub struct OrderByItem {
643 pub expr: Expression,
645 pub ascending: bool,
647 pub nulls_first: Option<bool>,
649}
650
651#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
653pub struct SelectStatement {
654 pub distinct: bool,
656 pub columns: Vec<SelectItem>,
658 pub from: Option<FromClause>,
660 pub where_clause: Option<Expression>,
662 pub group_by: Vec<Expression>,
664 pub having: Option<Expression>,
666 pub order_by: Vec<OrderByItem>,
668 pub limit: Option<usize>,
670 pub offset: Option<usize>,
672 pub window: Option<WindowSpec>,
674}
675
676#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
678pub struct CreateStreamStatement {
679 pub name: String,
681 pub columns: Vec<ColumnDefinition>,
683 pub properties: HashMap<String, String>,
685}
686
687#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
689pub struct ColumnDefinition {
690 pub name: String,
692 pub data_type: DataType,
694 pub not_null: bool,
696 pub default: Option<Expression>,
698}
699
700#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
702pub enum DataType {
703 Integer,
704 Float,
705 String,
706 Boolean,
707 Timestamp,
708 Array(Box<DataType>),
709 Map(Box<DataType>, Box<DataType>),
710}
711
712pub struct Parser {
714 tokens: Vec<Token>,
715 position: usize,
716}
717
718impl Parser {
719 pub fn new(tokens: Vec<Token>) -> Self {
721 Self {
722 tokens,
723 position: 0,
724 }
725 }
726
727 fn current_token(&self) -> &Token {
729 self.tokens.get(self.position).unwrap_or(&Token::Eof)
730 }
731
732 fn peek_token(&self) -> &Token {
734 self.tokens.get(self.position + 1).unwrap_or(&Token::Eof)
735 }
736
737 fn advance(&mut self) {
739 self.position += 1;
740 }
741
742 fn expect(&mut self, expected: Token) -> Result<()> {
744 if self.current_token() == &expected {
745 self.advance();
746 Ok(())
747 } else {
748 Err(anyhow!(
749 "Expected {:?}, got {:?}",
750 expected,
751 self.current_token()
752 ))
753 }
754 }
755
756 pub fn parse_select(&mut self) -> Result<SelectStatement> {
758 self.expect(Token::Select)?;
759
760 let distinct = if self.current_token() == &Token::Distinct {
762 self.advance();
763 true
764 } else {
765 false
766 };
767
768 let columns = self.parse_select_list()?;
770
771 let from = if self.current_token() == &Token::From {
773 self.advance();
774 Some(self.parse_from_clause()?)
775 } else {
776 None
777 };
778
779 let window = if self.current_token() == &Token::Window {
781 self.advance();
782 Some(self.parse_window_spec()?)
783 } else {
784 None
785 };
786
787 let where_clause = if self.current_token() == &Token::Where {
789 self.advance();
790 Some(self.parse_expression()?)
791 } else {
792 None
793 };
794
795 let group_by = if self.current_token() == &Token::Group {
797 self.advance();
798 if self.current_token() == &Token::By {
800 self.advance();
801 }
802 self.parse_expression_list()?
803 } else {
804 Vec::new()
805 };
806
807 let having = if self.current_token() == &Token::Having {
809 self.advance();
810 Some(self.parse_expression()?)
811 } else {
812 None
813 };
814
815 let order_by = if self.current_token() == &Token::Order {
817 self.advance();
818 if self.current_token() == &Token::By {
820 self.advance();
821 }
822 self.parse_order_by_list()?
823 } else {
824 Vec::new()
825 };
826
827 let limit = if self.current_token() == &Token::Limit {
829 self.advance();
830 if let Token::NumberLiteral(n) = self.current_token() {
831 let limit = *n as usize;
832 self.advance();
833 Some(limit)
834 } else {
835 None
836 }
837 } else {
838 None
839 };
840
841 Ok(SelectStatement {
842 distinct,
843 columns,
844 from,
845 where_clause,
846 group_by,
847 having,
848 order_by,
849 limit,
850 offset: None,
851 window,
852 })
853 }
854
855 fn parse_select_list(&mut self) -> Result<Vec<SelectItem>> {
857 let mut items = Vec::new();
858
859 loop {
860 let expr = self.parse_expression()?;
861
862 let alias = if self.current_token() == &Token::As {
864 self.advance();
865 if let Token::Identifier(name) = self.current_token().clone() {
866 self.advance();
867 Some(name)
868 } else {
869 None
870 }
871 } else if let Token::Identifier(name) = self.current_token().clone() {
872 if name.to_uppercase() != "FROM"
874 && name.to_uppercase() != "WHERE"
875 && name.to_uppercase() != "GROUP"
876 && name.to_uppercase() != "ORDER"
877 && name.to_uppercase() != "WINDOW"
878 {
879 self.advance();
880 Some(name)
881 } else {
882 None
883 }
884 } else {
885 None
886 };
887
888 items.push(SelectItem { expr, alias });
889
890 if self.current_token() != &Token::Comma {
891 break;
892 }
893 self.advance(); }
895
896 Ok(items)
897 }
898
899 fn parse_from_clause(&mut self) -> Result<FromClause> {
901 let mut from = self.parse_table_reference()?;
902
903 while matches!(
905 self.current_token(),
906 Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full
907 ) {
908 let join_type = self.parse_join_type()?;
909 let right = self.parse_table_reference()?;
910
911 let condition = if self.current_token() == &Token::On {
912 self.advance();
913 Some(self.parse_expression()?)
914 } else {
915 None
916 };
917
918 from = FromClause::Join {
919 left: Box::new(from),
920 right: Box::new(right),
921 join_type,
922 condition,
923 };
924 }
925
926 Ok(from)
927 }
928
929 fn parse_table_reference(&mut self) -> Result<FromClause> {
931 if let Token::Identifier(name) = self.current_token().clone() {
932 self.advance();
933
934 let alias = if self.current_token() == &Token::As {
935 self.advance();
936 if let Token::Identifier(alias) = self.current_token().clone() {
937 self.advance();
938 Some(alias)
939 } else {
940 None
941 }
942 } else if let Token::Identifier(alias) = self.current_token().clone() {
943 if !matches!(
945 alias.to_uppercase().as_str(),
946 "WHERE"
947 | "GROUP"
948 | "ORDER"
949 | "HAVING"
950 | "LIMIT"
951 | "JOIN"
952 | "INNER"
953 | "LEFT"
954 | "RIGHT"
955 | "FULL"
956 | "ON"
957 | "WINDOW"
958 ) {
959 self.advance();
960 Some(alias)
961 } else {
962 None
963 }
964 } else {
965 None
966 };
967
968 Ok(FromClause::Table { name, alias })
969 } else {
970 Err(anyhow!("Expected table name"))
971 }
972 }
973
974 fn parse_join_type(&mut self) -> Result<JoinType> {
976 let join_type = match self.current_token() {
977 Token::Inner => {
978 self.advance();
979 JoinType::Inner
980 }
981 Token::Left => {
982 self.advance();
983 if self.current_token() == &Token::Outer {
984 self.advance();
985 }
986 JoinType::Left
987 }
988 Token::Right => {
989 self.advance();
990 if self.current_token() == &Token::Outer {
991 self.advance();
992 }
993 JoinType::Right
994 }
995 Token::Full => {
996 self.advance();
997 if self.current_token() == &Token::Outer {
998 self.advance();
999 }
1000 JoinType::Full
1001 }
1002 _ => JoinType::Inner,
1003 };
1004
1005 if self.current_token() == &Token::Join {
1007 self.advance();
1008 }
1009
1010 Ok(join_type)
1011 }
1012
1013 fn parse_window_spec(&mut self) -> Result<WindowSpec> {
1015 let window_type = match self.current_token() {
1016 Token::Tumbling => {
1017 self.advance();
1018 WindowType::Tumbling
1019 }
1020 Token::Sliding => {
1021 self.advance();
1022 WindowType::Sliding
1023 }
1024 Token::Session => {
1025 self.advance();
1026 WindowType::Session
1027 }
1028 _ => WindowType::Tumbling,
1029 };
1030
1031 self.expect(Token::OpenParen)?;
1032
1033 let mut size = Duration::from_secs(60);
1034 let mut slide = None;
1035 let mut gap = None;
1036
1037 while self.current_token() != &Token::CloseParen {
1039 match self.current_token() {
1040 Token::Size => {
1041 self.advance();
1042 size = self.parse_duration()?;
1043 }
1044 Token::Slide => {
1045 self.advance();
1046 slide = Some(self.parse_duration()?);
1047 }
1048 Token::Gap => {
1049 self.advance();
1050 gap = Some(self.parse_duration()?);
1051 }
1052 Token::Comma => {
1053 self.advance();
1054 }
1055 _ => {
1056 self.advance();
1057 }
1058 }
1059 }
1060
1061 self.expect(Token::CloseParen)?;
1062
1063 Ok(WindowSpec {
1064 window_type,
1065 size,
1066 slide,
1067 gap,
1068 time_attribute: None,
1069 })
1070 }
1071
1072 fn parse_duration(&mut self) -> Result<Duration> {
1074 let value = if let Token::NumberLiteral(n) = self.current_token() {
1075 let v = *n as u64;
1076 self.advance();
1077 v
1078 } else {
1079 return Err(anyhow!("Expected number for duration"));
1080 };
1081
1082 let unit = if let Token::Identifier(unit) = self.current_token().clone() {
1083 self.advance();
1084 unit.to_uppercase()
1085 } else {
1086 "SECONDS".to_string()
1087 };
1088
1089 let duration = match unit.as_str() {
1090 "MILLISECONDS" | "MILLIS" | "MS" => Duration::from_millis(value),
1091 "SECONDS" | "SECOND" | "S" => Duration::from_secs(value),
1092 "MINUTES" | "MINUTE" | "M" => Duration::from_secs(value * 60),
1093 "HOURS" | "HOUR" | "H" => Duration::from_secs(value * 3600),
1094 "DAYS" | "DAY" | "D" => Duration::from_secs(value * 86400),
1095 _ => Duration::from_secs(value),
1096 };
1097
1098 Ok(duration)
1099 }
1100
1101 fn parse_expression(&mut self) -> Result<Expression> {
1103 self.parse_or_expression()
1104 }
1105
1106 fn parse_or_expression(&mut self) -> Result<Expression> {
1108 let mut left = self.parse_and_expression()?;
1109
1110 while self.current_token() == &Token::Or {
1111 self.advance();
1112 let right = self.parse_and_expression()?;
1113 left = Expression::BinaryOp {
1114 left: Box::new(left),
1115 op: BinaryOperator::Or,
1116 right: Box::new(right),
1117 };
1118 }
1119
1120 Ok(left)
1121 }
1122
1123 fn parse_and_expression(&mut self) -> Result<Expression> {
1125 let mut left = self.parse_comparison_expression()?;
1126
1127 while self.current_token() == &Token::And {
1128 self.advance();
1129 let right = self.parse_comparison_expression()?;
1130 left = Expression::BinaryOp {
1131 left: Box::new(left),
1132 op: BinaryOperator::And,
1133 right: Box::new(right),
1134 };
1135 }
1136
1137 Ok(left)
1138 }
1139
1140 fn parse_comparison_expression(&mut self) -> Result<Expression> {
1142 let left = self.parse_additive_expression()?;
1143
1144 let op = match self.current_token() {
1145 Token::Equal => Some(BinaryOperator::Equal),
1146 Token::NotEqual => Some(BinaryOperator::NotEqual),
1147 Token::LessThan => Some(BinaryOperator::LessThan),
1148 Token::LessThanOrEqual => Some(BinaryOperator::LessThanOrEqual),
1149 Token::GreaterThan => Some(BinaryOperator::GreaterThan),
1150 Token::GreaterThanOrEqual => Some(BinaryOperator::GreaterThanOrEqual),
1151 Token::Like => Some(BinaryOperator::Like),
1152 _ => None,
1153 };
1154
1155 if let Some(op) = op {
1156 self.advance();
1157 let right = self.parse_additive_expression()?;
1158 Ok(Expression::BinaryOp {
1159 left: Box::new(left),
1160 op,
1161 right: Box::new(right),
1162 })
1163 } else {
1164 Ok(left)
1165 }
1166 }
1167
1168 fn parse_additive_expression(&mut self) -> Result<Expression> {
1170 let mut left = self.parse_multiplicative_expression()?;
1171
1172 loop {
1173 let op = match self.current_token() {
1174 Token::Plus => Some(BinaryOperator::Plus),
1175 Token::Minus => Some(BinaryOperator::Minus),
1176 _ => None,
1177 };
1178
1179 if let Some(op) = op {
1180 self.advance();
1181 let right = self.parse_multiplicative_expression()?;
1182 left = Expression::BinaryOp {
1183 left: Box::new(left),
1184 op,
1185 right: Box::new(right),
1186 };
1187 } else {
1188 break;
1189 }
1190 }
1191
1192 Ok(left)
1193 }
1194
1195 fn parse_multiplicative_expression(&mut self) -> Result<Expression> {
1197 let mut left = self.parse_unary_expression()?;
1198
1199 loop {
1200 let op = match self.current_token() {
1201 Token::Multiply | Token::Star => Some(BinaryOperator::Multiply),
1202 Token::Divide => Some(BinaryOperator::Divide),
1203 Token::Modulo => Some(BinaryOperator::Modulo),
1204 _ => None,
1205 };
1206
1207 if let Some(op) = op {
1208 self.advance();
1209 let right = self.parse_unary_expression()?;
1210 left = Expression::BinaryOp {
1211 left: Box::new(left),
1212 op,
1213 right: Box::new(right),
1214 };
1215 } else {
1216 break;
1217 }
1218 }
1219
1220 Ok(left)
1221 }
1222
1223 fn parse_unary_expression(&mut self) -> Result<Expression> {
1225 match self.current_token() {
1226 Token::Not => {
1227 self.advance();
1228 let expr = self.parse_unary_expression()?;
1229 Ok(Expression::UnaryOp {
1230 op: UnaryOperator::Not,
1231 expr: Box::new(expr),
1232 })
1233 }
1234 Token::Minus => {
1235 self.advance();
1236 let expr = self.parse_unary_expression()?;
1237 Ok(Expression::UnaryOp {
1238 op: UnaryOperator::Minus,
1239 expr: Box::new(expr),
1240 })
1241 }
1242 _ => self.parse_primary_expression(),
1243 }
1244 }
1245
1246 fn parse_primary_expression(&mut self) -> Result<Expression> {
1248 match self.current_token().clone() {
1249 Token::Star => {
1250 self.advance();
1251 Ok(Expression::Star)
1252 }
1253 Token::NumberLiteral(n) => {
1254 self.advance();
1255 if n.fract() == 0.0 {
1256 Ok(Expression::Literal(SqlValue::Integer(n as i64)))
1257 } else {
1258 Ok(Expression::Literal(SqlValue::Float(n)))
1259 }
1260 }
1261 Token::StringLiteral(s) => {
1262 self.advance();
1263 Ok(Expression::Literal(SqlValue::String(s)))
1264 }
1265 Token::BooleanLiteral(b) => {
1266 self.advance();
1267 Ok(Expression::Literal(SqlValue::Boolean(b)))
1268 }
1269 Token::Null => {
1270 self.advance();
1271 Ok(Expression::Literal(SqlValue::Null))
1272 }
1273 Token::Count
1274 | Token::Sum
1275 | Token::Avg
1276 | Token::Min
1277 | Token::Max
1278 | Token::StdDev
1279 | Token::Variance => {
1280 let func = match self.current_token() {
1281 Token::Count => AggregateFunction::Count,
1282 Token::Sum => AggregateFunction::Sum,
1283 Token::Avg => AggregateFunction::Avg,
1284 Token::Min => AggregateFunction::Min,
1285 Token::Max => AggregateFunction::Max,
1286 Token::StdDev => AggregateFunction::StdDev,
1287 Token::Variance => AggregateFunction::Variance,
1288 _ => unreachable!(),
1289 };
1290 self.advance();
1291 self.expect(Token::OpenParen)?;
1292
1293 let distinct = if self.current_token() == &Token::Distinct {
1294 self.advance();
1295 true
1296 } else {
1297 false
1298 };
1299
1300 let expr = self.parse_expression()?;
1301 self.expect(Token::CloseParen)?;
1302
1303 Ok(Expression::Aggregate {
1304 func,
1305 expr: Box::new(expr),
1306 distinct,
1307 })
1308 }
1309 Token::Identifier(name) => {
1310 self.advance();
1311
1312 if self.current_token() == &Token::OpenParen {
1314 self.advance();
1315 let mut args = Vec::new();
1316
1317 if self.current_token() != &Token::CloseParen {
1318 loop {
1319 args.push(self.parse_expression()?);
1320 if self.current_token() != &Token::Comma {
1321 break;
1322 }
1323 self.advance();
1324 }
1325 }
1326
1327 self.expect(Token::CloseParen)?;
1328
1329 Ok(Expression::Function {
1330 name,
1331 args,
1332 distinct: false,
1333 })
1334 } else if self.current_token() == &Token::Dot {
1335 self.advance();
1337 if let Token::Identifier(column) = self.current_token().clone() {
1338 self.advance();
1339 Ok(Expression::QualifiedColumn(name, column))
1340 } else {
1341 Ok(Expression::Column(name))
1342 }
1343 } else {
1344 Ok(Expression::Column(name))
1345 }
1346 }
1347 Token::OpenParen => {
1348 self.advance();
1349 let expr = self.parse_expression()?;
1350 self.expect(Token::CloseParen)?;
1351 Ok(expr)
1352 }
1353 _ => Err(anyhow!("Unexpected token: {:?}", self.current_token())),
1354 }
1355 }
1356
1357 fn parse_expression_list(&mut self) -> Result<Vec<Expression>> {
1359 let mut exprs = Vec::new();
1360
1361 loop {
1362 exprs.push(self.parse_expression()?);
1363 if self.current_token() != &Token::Comma {
1364 break;
1365 }
1366 self.advance();
1367 }
1368
1369 Ok(exprs)
1370 }
1371
1372 fn parse_order_by_list(&mut self) -> Result<Vec<OrderByItem>> {
1374 let mut items = Vec::new();
1375
1376 loop {
1377 let expr = self.parse_expression()?;
1378
1379 let ascending = if let Token::Identifier(dir) = self.current_token().clone() {
1380 match dir.to_uppercase().as_str() {
1381 "ASC" => {
1382 self.advance();
1383 true
1384 }
1385 "DESC" => {
1386 self.advance();
1387 false
1388 }
1389 _ => true,
1390 }
1391 } else {
1392 true
1393 };
1394
1395 items.push(OrderByItem {
1396 expr,
1397 ascending,
1398 nulls_first: None,
1399 });
1400
1401 if self.current_token() != &Token::Comma {
1402 break;
1403 }
1404 self.advance();
1405 }
1406
1407 Ok(items)
1408 }
1409}
1410
1411#[derive(Debug, Clone, Serialize, Deserialize)]
1413pub struct ResultRow {
1414 pub values: Vec<SqlValue>,
1416}
1417
1418#[derive(Debug, Clone, Serialize, Deserialize)]
1420pub struct QueryResult {
1421 pub columns: Vec<String>,
1423 pub rows: Vec<ResultRow>,
1425 pub execution_time: Duration,
1427 pub rows_affected: usize,
1429}
1430
1431pub struct StreamSqlEngine {
1433 config: StreamSqlConfig,
1435 streams: Arc<RwLock<HashMap<String, StreamMetadata>>>,
1437 query_cache: Arc<RwLock<HashMap<String, SelectStatement>>>,
1439 stats: Arc<RwLock<StreamSqlStats>>,
1441}
1442
1443#[derive(Debug, Clone, Serialize, Deserialize)]
1445pub struct StreamMetadata {
1446 pub name: String,
1448 pub columns: Vec<ColumnDefinition>,
1450 pub properties: HashMap<String, String>,
1452 pub created_at: DateTime<Utc>,
1454}
1455
1456#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1458pub struct StreamSqlStats {
1459 pub queries_executed: u64,
1461 pub queries_succeeded: u64,
1463 pub queries_failed: u64,
1465 pub avg_execution_time_ms: f64,
1467 pub cache_hits: u64,
1469 pub cache_misses: u64,
1471}
1472
1473impl StreamSqlEngine {
1474 pub fn new(config: StreamSqlConfig) -> Self {
1476 Self {
1477 config,
1478 streams: Arc::new(RwLock::new(HashMap::new())),
1479 query_cache: Arc::new(RwLock::new(HashMap::new())),
1480 stats: Arc::new(RwLock::new(StreamSqlStats::default())),
1481 }
1482 }
1483
1484 pub fn parse(&self, sql: &str) -> Result<SelectStatement> {
1486 let mut lexer = Lexer::new(sql);
1487 let tokens = lexer.tokenize();
1488 let mut parser = Parser::new(tokens);
1489 parser.parse_select()
1490 }
1491
1492 pub async fn execute(&self, sql: &str) -> Result<QueryResult> {
1494 let start_time = std::time::Instant::now();
1495
1496 if self.config.enable_query_cache {
1498 let cache = self.query_cache.read().await;
1499 if cache.contains_key(sql) {
1500 let mut stats = self.stats.write().await;
1501 stats.cache_hits += 1;
1502 debug!("Query cache hit");
1503 } else {
1504 let mut stats = self.stats.write().await;
1505 stats.cache_misses += 1;
1506 }
1507 }
1508
1509 let statement = self.parse(sql)?;
1511
1512 if self.config.enable_query_cache {
1514 let mut cache = self.query_cache.write().await;
1515 if cache.len() < self.config.cache_size {
1516 cache.insert(sql.to_string(), statement.clone());
1517 }
1518 }
1519
1520 let result = QueryResult {
1522 columns: statement
1523 .columns
1524 .iter()
1525 .map(|c| c.alias.clone().unwrap_or_else(|| format!("column_{}", 0)))
1526 .collect(),
1527 rows: Vec::new(),
1528 execution_time: start_time.elapsed(),
1529 rows_affected: 0,
1530 };
1531
1532 let mut stats = self.stats.write().await;
1534 stats.queries_executed += 1;
1535 stats.queries_succeeded += 1;
1536 stats.avg_execution_time_ms = (stats.avg_execution_time_ms
1537 * (stats.queries_executed - 1) as f64
1538 + result.execution_time.as_millis() as f64)
1539 / stats.queries_executed as f64;
1540
1541 if self.config.enable_query_logging {
1542 info!(
1543 "Executed query in {:?}: {}",
1544 result.execution_time,
1545 &sql[..sql.len().min(100)]
1546 );
1547 }
1548
1549 Ok(result)
1550 }
1551
1552 pub async fn register_stream(&self, metadata: StreamMetadata) -> Result<()> {
1554 let mut streams = self.streams.write().await;
1555 info!("Registering stream: {}", metadata.name);
1556 streams.insert(metadata.name.clone(), metadata);
1557 Ok(())
1558 }
1559
1560 pub async fn unregister_stream(&self, name: &str) -> Result<()> {
1562 let mut streams = self.streams.write().await;
1563 if streams.remove(name).is_some() {
1564 info!("Unregistered stream: {}", name);
1565 Ok(())
1566 } else {
1567 Err(anyhow!("Stream not found: {}", name))
1568 }
1569 }
1570
1571 pub async fn get_stream(&self, name: &str) -> Option<StreamMetadata> {
1573 let streams = self.streams.read().await;
1574 streams.get(name).cloned()
1575 }
1576
1577 pub async fn list_streams(&self) -> Vec<String> {
1579 let streams = self.streams.read().await;
1580 streams.keys().cloned().collect()
1581 }
1582
1583 pub async fn get_stats(&self) -> StreamSqlStats {
1585 self.stats.read().await.clone()
1586 }
1587
1588 pub async fn clear_cache(&self) {
1590 let mut cache = self.query_cache.write().await;
1591 cache.clear();
1592 info!("Query cache cleared");
1593 }
1594
1595 pub fn validate(&self, sql: &str) -> Result<()> {
1597 self.parse(sql)?;
1598 Ok(())
1599 }
1600
1601 pub fn explain(&self, sql: &str) -> Result<String> {
1603 let statement = self.parse(sql)?;
1604 Ok(format!("{:#?}", statement))
1605 }
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610 use super::*;
1611
1612 #[test]
1613 fn test_lexer_basic() {
1614 let mut lexer = Lexer::new("SELECT * FROM events");
1615 let tokens = lexer.tokenize();
1616
1617 assert_eq!(tokens.len(), 5);
1618 assert_eq!(tokens[0], Token::Select);
1619 assert_eq!(tokens[1], Token::Star);
1620 assert_eq!(tokens[2], Token::From);
1621 assert_eq!(tokens[3], Token::Identifier("events".to_string()));
1622 assert_eq!(tokens[4], Token::Eof);
1623 }
1624
1625 #[test]
1626 fn test_lexer_with_literals() {
1627 let mut lexer = Lexer::new("SELECT name, 42, 'hello' FROM events");
1628 let tokens = lexer.tokenize();
1629
1630 assert!(matches!(tokens[1], Token::Identifier(_)));
1631 assert!(matches!(tokens[3], Token::NumberLiteral(_)));
1632 assert!(matches!(tokens[5], Token::StringLiteral(_)));
1633 }
1634
1635 #[test]
1636 fn test_parser_simple_select() {
1637 let mut lexer = Lexer::new("SELECT id, name FROM users WHERE id = 1");
1638 let tokens = lexer.tokenize();
1639 let mut parser = Parser::new(tokens);
1640 let result = parser.parse_select();
1641
1642 assert!(result.is_ok());
1643 let stmt = result.unwrap();
1644 assert_eq!(stmt.columns.len(), 2);
1645 assert!(stmt.where_clause.is_some());
1646 }
1647
1648 #[test]
1649 fn test_parser_aggregate() {
1650 let mut lexer = Lexer::new("SELECT COUNT(*), AVG(value) FROM events");
1651 let tokens = lexer.tokenize();
1652 let mut parser = Parser::new(tokens);
1653 let result = parser.parse_select();
1654
1655 assert!(result.is_ok());
1656 let stmt = result.unwrap();
1657 assert_eq!(stmt.columns.len(), 2);
1658 }
1659
1660 #[test]
1661 fn test_parser_window() {
1662 let mut lexer = Lexer::new("SELECT * FROM events WINDOW TUMBLING (SIZE 5 MINUTES)");
1663 let tokens = lexer.tokenize();
1664 let mut parser = Parser::new(tokens);
1665 let result = parser.parse_select();
1666
1667 assert!(result.is_ok());
1668 let stmt = result.unwrap();
1669 assert!(stmt.window.is_some());
1670 let window = stmt.window.unwrap();
1671 assert_eq!(window.window_type, WindowType::Tumbling);
1672 assert_eq!(window.size, Duration::from_secs(300));
1673 }
1674
1675 #[test]
1676 fn test_parser_group_by() {
1677 let mut lexer = Lexer::new("SELECT sensor_id, AVG(temp) FROM sensors GROUP BY sensor_id");
1678 let tokens = lexer.tokenize();
1679 let mut parser = Parser::new(tokens);
1680 let result = parser.parse_select();
1681
1682 assert!(result.is_ok());
1683 let stmt = result.unwrap();
1684 assert!(!stmt.group_by.is_empty());
1685 }
1686
1687 #[test]
1688 fn test_parser_join() {
1689 let mut lexer = Lexer::new("SELECT * FROM a JOIN b ON a.id = b.aid");
1690 let tokens = lexer.tokenize();
1691 let mut parser = Parser::new(tokens);
1692 let result = parser.parse_select();
1693
1694 assert!(result.is_ok());
1695 let stmt = result.unwrap();
1696 assert!(matches!(stmt.from, Some(FromClause::Join { .. })));
1697 }
1698
1699 #[tokio::test]
1700 async fn test_engine_basic() {
1701 let config = StreamSqlConfig::default();
1702 let engine = StreamSqlEngine::new(config);
1703
1704 let result = engine.execute("SELECT * FROM events").await;
1705 assert!(result.is_ok());
1706
1707 let stats = engine.get_stats().await;
1708 assert_eq!(stats.queries_executed, 1);
1709 assert_eq!(stats.queries_succeeded, 1);
1710 }
1711
1712 #[tokio::test]
1713 async fn test_engine_stream_registration() {
1714 let config = StreamSqlConfig::default();
1715 let engine = StreamSqlEngine::new(config);
1716
1717 let metadata = StreamMetadata {
1718 name: "events".to_string(),
1719 columns: vec![
1720 ColumnDefinition {
1721 name: "id".to_string(),
1722 data_type: DataType::Integer,
1723 not_null: true,
1724 default: None,
1725 },
1726 ColumnDefinition {
1727 name: "value".to_string(),
1728 data_type: DataType::Float,
1729 not_null: false,
1730 default: None,
1731 },
1732 ],
1733 properties: HashMap::new(),
1734 created_at: Utc::now(),
1735 };
1736
1737 engine.register_stream(metadata).await.unwrap();
1738
1739 let streams = engine.list_streams().await;
1740 assert_eq!(streams.len(), 1);
1741 assert!(streams.contains(&"events".to_string()));
1742
1743 let stream = engine.get_stream("events").await;
1744 assert!(stream.is_some());
1745 assert_eq!(stream.unwrap().columns.len(), 2);
1746
1747 engine.unregister_stream("events").await.unwrap();
1748 let streams = engine.list_streams().await;
1749 assert!(streams.is_empty());
1750 }
1751
1752 #[test]
1753 fn test_engine_validate() {
1754 let config = StreamSqlConfig::default();
1755 let engine = StreamSqlEngine::new(config);
1756
1757 assert!(engine.validate("SELECT * FROM events").is_ok());
1758 assert!(engine.validate("INVALID SQL").is_err());
1759 }
1760
1761 #[test]
1762 fn test_engine_explain() {
1763 let config = StreamSqlConfig::default();
1764 let engine = StreamSqlEngine::new(config);
1765
1766 let result = engine.explain("SELECT COUNT(*) FROM events WHERE value > 10");
1767 assert!(result.is_ok());
1768 let explanation = result.unwrap();
1769 assert!(!explanation.is_empty());
1770 }
1771
1772 #[tokio::test]
1773 async fn test_engine_caching() {
1774 let config = StreamSqlConfig {
1775 enable_query_cache: true,
1776 cache_size: 100,
1777 ..Default::default()
1778 };
1779 let engine = StreamSqlEngine::new(config);
1780
1781 engine.execute("SELECT * FROM events").await.unwrap();
1783
1784 engine.execute("SELECT * FROM events").await.unwrap();
1786
1787 let stats = engine.get_stats().await;
1788 assert_eq!(stats.cache_misses, 1);
1789 assert_eq!(stats.cache_hits, 1);
1790 }
1791
1792 #[test]
1793 fn test_parser_complex_expression() {
1794 let mut lexer = Lexer::new(
1795 "SELECT * FROM events WHERE (value > 10 AND status = 'active') OR priority = 1",
1796 );
1797 let tokens = lexer.tokenize();
1798 let mut parser = Parser::new(tokens);
1799 let result = parser.parse_select();
1800
1801 assert!(result.is_ok());
1802 let stmt = result.unwrap();
1803 assert!(stmt.where_clause.is_some());
1804 }
1805
1806 #[test]
1807 fn test_parser_order_by() {
1808 let mut lexer = Lexer::new("SELECT * FROM events ORDER BY created_at DESC, id ASC");
1809 let tokens = lexer.tokenize();
1810 let mut parser = Parser::new(tokens);
1811 let result = parser.parse_select();
1812
1813 assert!(result.is_ok(), "Parse failed: {:?}", result);
1814 let stmt = result.unwrap();
1815 assert_eq!(stmt.order_by.len(), 2);
1816 assert!(!stmt.order_by[0].ascending);
1817 assert!(stmt.order_by[1].ascending);
1818 }
1819
1820 #[test]
1821 fn test_parser_distinct() {
1822 let mut lexer = Lexer::new("SELECT DISTINCT sensor_id FROM readings");
1823 let tokens = lexer.tokenize();
1824 let mut parser = Parser::new(tokens);
1825 let result = parser.parse_select();
1826
1827 assert!(result.is_ok());
1828 let stmt = result.unwrap();
1829 assert!(stmt.distinct);
1830 }
1831
1832 #[test]
1833 fn test_parser_sliding_window() {
1834 let mut lexer =
1835 Lexer::new("SELECT * FROM events WINDOW SLIDING (SIZE 10 SECONDS, SLIDE 5 SECONDS)");
1836 let tokens = lexer.tokenize();
1837 let mut parser = Parser::new(tokens);
1838 let result = parser.parse_select();
1839
1840 assert!(result.is_ok());
1841 let stmt = result.unwrap();
1842 assert!(stmt.window.is_some());
1843 let window = stmt.window.unwrap();
1844 assert_eq!(window.window_type, WindowType::Sliding);
1845 assert_eq!(window.size, Duration::from_secs(10));
1846 assert_eq!(window.slide, Some(Duration::from_secs(5)));
1847 }
1848}