1use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::Duration;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct StreamSqlConfig {
16 pub max_execution_time: Duration,
18 pub enable_optimization: bool,
20 pub max_memory_bytes: usize,
22 pub parallel_execution: bool,
24 pub worker_threads: usize,
26 pub enable_query_cache: bool,
28 pub cache_size: usize,
30 pub enable_query_logging: bool,
32 pub default_window_size: Duration,
34}
35
36impl Default for StreamSqlConfig {
37 fn default() -> Self {
38 Self {
39 max_execution_time: Duration::from_secs(60),
40 enable_optimization: true,
41 max_memory_bytes: 1024 * 1024 * 1024, parallel_execution: true,
43 worker_threads: 4,
44 enable_query_cache: true,
45 cache_size: 1000,
46 enable_query_logging: true,
47 default_window_size: Duration::from_secs(60),
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq)]
54pub enum QueryType {
55 Select,
57 CreateStream,
59 DropStream,
61 Insert,
63 CreateView,
65 Describe,
67 Explain,
69}
70
71#[derive(Debug, Clone, PartialEq)]
73pub enum Token {
74 Select,
76 From,
77 Where,
78 Group,
79 By,
80 Having,
81 Order,
82 Limit,
83 Window,
84 Tumbling,
85 Sliding,
86 Session,
87 Size,
88 Slide,
89 Gap,
90 Create,
91 Stream,
92 View,
93 Drop,
94 Insert,
95 Into,
96 Values,
97 As,
98 And,
99 Or,
100 Not,
101 In,
102 Like,
103 Between,
104 Is,
105 Null,
106 Join,
107 Inner,
108 Left,
109 Right,
110 Full,
111 Outer,
112 On,
113 Describe,
114 Explain,
115 Distinct,
116 Case,
117 When,
118 Then,
119 Else,
120 End,
121
122 Int,
124 Float,
125 String,
126 Boolean,
127 Timestamp,
128
129 Plus,
131 Minus,
132 Multiply,
133 Divide,
134 Modulo,
135 Equal,
136 NotEqual,
137 LessThan,
138 LessThanOrEqual,
139 GreaterThan,
140 GreaterThanOrEqual,
141
142 Comma,
144 Dot,
145 Semicolon,
146 OpenParen,
147 CloseParen,
148 OpenBracket,
149 CloseBracket,
150
151 Identifier(String),
153 StringLiteral(String),
154 NumberLiteral(f64),
155 BooleanLiteral(bool),
156
157 Count,
159 Sum,
160 Avg,
161 Min,
162 Max,
163 StdDev,
164 Variance,
165
166 Star,
168 Eof,
169}
170
171pub struct Lexer {
173 input: Vec<char>,
174 position: usize,
175 current_char: Option<char>,
176}
177
178impl Lexer {
179 pub fn new(input: &str) -> Self {
181 let chars: Vec<char> = input.chars().collect();
182 let current_char = chars.first().copied();
183 Self {
184 input: chars,
185 position: 0,
186 current_char,
187 }
188 }
189
190 fn advance(&mut self) {
192 self.position += 1;
193 self.current_char = self.input.get(self.position).copied();
194 }
195
196 fn skip_whitespace(&mut self) {
198 while let Some(c) = self.current_char {
199 if c.is_whitespace() {
200 self.advance();
201 } else {
202 break;
203 }
204 }
205 }
206
207 fn read_identifier(&mut self) -> String {
209 let mut result = String::new();
210 while let Some(c) = self.current_char {
211 if c.is_alphanumeric() || c == '_' {
212 result.push(c);
213 self.advance();
214 } else {
215 break;
216 }
217 }
218 result
219 }
220
221 fn read_number(&mut self) -> f64 {
223 let mut result = String::new();
224 while let Some(c) = self.current_char {
225 if c.is_ascii_digit() || c == '.' {
226 result.push(c);
227 self.advance();
228 } else {
229 break;
230 }
231 }
232 result.parse().unwrap_or(0.0)
233 }
234
235 fn read_string(&mut self) -> String {
237 let quote = self
238 .current_char
239 .expect("current_char should be Some when read_string is called");
240 self.advance(); let mut result = String::new();
242 while let Some(c) = self.current_char {
243 if c == quote {
244 self.advance(); break;
246 } else if c == '\\' {
247 self.advance();
248 if let Some(escaped) = self.current_char {
249 result.push(escaped);
250 self.advance();
251 }
252 } else {
253 result.push(c);
254 self.advance();
255 }
256 }
257 result
258 }
259
260 pub fn next_token(&mut self) -> Token {
262 self.skip_whitespace();
263
264 match self.current_char {
265 None => Token::Eof,
266 Some(c) => {
267 if c.is_alphabetic() || c == '_' {
268 let ident = self.read_identifier();
269 match ident.to_uppercase().as_str() {
270 "SELECT" => Token::Select,
271 "FROM" => Token::From,
272 "WHERE" => Token::Where,
273 "GROUP" => Token::Group,
274 "BY" => Token::By,
275 "HAVING" => Token::Having,
276 "ORDER" => Token::Order,
277 "LIMIT" => Token::Limit,
278 "WINDOW" => Token::Window,
279 "TUMBLING" => Token::Tumbling,
280 "SLIDING" => Token::Sliding,
281 "SESSION" => Token::Session,
282 "SIZE" => Token::Size,
283 "SLIDE" => Token::Slide,
284 "GAP" => Token::Gap,
285 "CREATE" => Token::Create,
286 "STREAM" => Token::Stream,
287 "VIEW" => Token::View,
288 "DROP" => Token::Drop,
289 "INSERT" => Token::Insert,
290 "INTO" => Token::Into,
291 "VALUES" => Token::Values,
292 "AS" => Token::As,
293 "AND" => Token::And,
294 "OR" => Token::Or,
295 "NOT" => Token::Not,
296 "IN" => Token::In,
297 "LIKE" => Token::Like,
298 "BETWEEN" => Token::Between,
299 "IS" => Token::Is,
300 "NULL" => Token::Null,
301 "JOIN" => Token::Join,
302 "INNER" => Token::Inner,
303 "LEFT" => Token::Left,
304 "RIGHT" => Token::Right,
305 "FULL" => Token::Full,
306 "OUTER" => Token::Outer,
307 "ON" => Token::On,
308 "DESCRIBE" => Token::Describe,
309 "EXPLAIN" => Token::Explain,
310 "DISTINCT" => Token::Distinct,
311 "CASE" => Token::Case,
312 "WHEN" => Token::When,
313 "THEN" => Token::Then,
314 "ELSE" => Token::Else,
315 "END" => Token::End,
316 "INT" | "INTEGER" => Token::Int,
317 "FLOAT" | "DOUBLE" => Token::Float,
318 "STRING" | "VARCHAR" | "TEXT" => Token::String,
319 "BOOLEAN" | "BOOL" => Token::Boolean,
320 "TIMESTAMP" | "DATETIME" => Token::Timestamp,
321 "COUNT" => Token::Count,
322 "SUM" => Token::Sum,
323 "AVG" => Token::Avg,
324 "MIN" => Token::Min,
325 "MAX" => Token::Max,
326 "STDDEV" => Token::StdDev,
327 "VARIANCE" | "VAR" => Token::Variance,
328 "TRUE" => Token::BooleanLiteral(true),
329 "FALSE" => Token::BooleanLiteral(false),
330 _ => Token::Identifier(ident),
331 }
332 } else if c.is_ascii_digit() {
333 Token::NumberLiteral(self.read_number())
334 } else if c == '\'' || c == '"' {
335 Token::StringLiteral(self.read_string())
336 } else {
337 match c {
338 '+' => {
339 self.advance();
340 Token::Plus
341 }
342 '-' => {
343 self.advance();
344 Token::Minus
345 }
346 '*' => {
347 self.advance();
348 Token::Star
349 }
350 '/' => {
351 self.advance();
352 Token::Divide
353 }
354 '%' => {
355 self.advance();
356 Token::Modulo
357 }
358 '=' => {
359 self.advance();
360 Token::Equal
361 }
362 '<' => {
363 self.advance();
364 if self.current_char == Some('=') {
365 self.advance();
366 Token::LessThanOrEqual
367 } else if self.current_char == Some('>') {
368 self.advance();
369 Token::NotEqual
370 } else {
371 Token::LessThan
372 }
373 }
374 '>' => {
375 self.advance();
376 if self.current_char == Some('=') {
377 self.advance();
378 Token::GreaterThanOrEqual
379 } else {
380 Token::GreaterThan
381 }
382 }
383 '!' => {
384 self.advance();
385 if self.current_char == Some('=') {
386 self.advance();
387 Token::NotEqual
388 } else {
389 Token::Not
390 }
391 }
392 ',' => {
393 self.advance();
394 Token::Comma
395 }
396 '.' => {
397 self.advance();
398 Token::Dot
399 }
400 ';' => {
401 self.advance();
402 Token::Semicolon
403 }
404 '(' => {
405 self.advance();
406 Token::OpenParen
407 }
408 ')' => {
409 self.advance();
410 Token::CloseParen
411 }
412 '[' => {
413 self.advance();
414 Token::OpenBracket
415 }
416 ']' => {
417 self.advance();
418 Token::CloseBracket
419 }
420 _ => {
421 self.advance();
422 Token::Eof
423 }
424 }
425 }
426 }
427 }
428 }
429
430 pub fn tokenize(&mut self) -> Vec<Token> {
432 let mut tokens = Vec::new();
433 loop {
434 let token = self.next_token();
435 if token == Token::Eof {
436 tokens.push(token);
437 break;
438 }
439 tokens.push(token);
440 }
441 tokens
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
447pub enum Expression {
448 Column(String),
450 QualifiedColumn(String, String),
452 Literal(SqlValue),
454 BinaryOp {
456 left: Box<Expression>,
457 op: BinaryOperator,
458 right: Box<Expression>,
459 },
460 UnaryOp {
462 op: UnaryOperator,
463 expr: Box<Expression>,
464 },
465 Function {
467 name: String,
468 args: Vec<Expression>,
469 distinct: bool,
470 },
471 Aggregate {
473 func: AggregateFunction,
474 expr: Box<Expression>,
475 distinct: bool,
476 },
477 Case {
479 operand: Option<Box<Expression>>,
480 when_clauses: Vec<(Expression, Expression)>,
481 else_clause: Option<Box<Expression>>,
482 },
483 Subquery(Box<SelectStatement>),
485 InList {
487 expr: Box<Expression>,
488 list: Vec<Expression>,
489 negated: bool,
490 },
491 Between {
493 expr: Box<Expression>,
494 low: Box<Expression>,
495 high: Box<Expression>,
496 negated: bool,
497 },
498 IsNull {
500 expr: Box<Expression>,
501 negated: bool,
502 },
503 Star,
505}
506
507#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
509pub enum SqlValue {
510 Null,
511 Integer(i64),
512 Float(f64),
513 String(String),
514 Boolean(bool),
515 Timestamp(DateTime<Utc>),
516}
517
518#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
520pub enum BinaryOperator {
521 Plus,
522 Minus,
523 Multiply,
524 Divide,
525 Modulo,
526 Equal,
527 NotEqual,
528 LessThan,
529 LessThanOrEqual,
530 GreaterThan,
531 GreaterThanOrEqual,
532 And,
533 Or,
534 Like,
535}
536
537#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
539pub enum UnaryOperator {
540 Not,
541 Minus,
542}
543
544#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
546pub enum AggregateFunction {
547 Count,
548 Sum,
549 Avg,
550 Min,
551 Max,
552 StdDev,
553 Variance,
554}
555
556#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
558pub struct WindowSpec {
559 pub window_type: WindowType,
561 pub size: Duration,
563 pub slide: Option<Duration>,
565 pub gap: Option<Duration>,
567 pub time_attribute: Option<String>,
569}
570
571#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
573pub enum WindowType {
574 Tumbling,
575 Sliding,
576 Session,
577}
578
579#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
581pub struct SelectItem {
582 pub expr: Expression,
584 pub alias: Option<String>,
586}
587
588#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
590pub enum FromClause {
591 Table { name: String, alias: Option<String> },
593 Join {
595 left: Box<FromClause>,
596 right: Box<FromClause>,
597 join_type: JoinType,
598 condition: Option<Expression>,
599 },
600 Subquery {
602 query: Box<SelectStatement>,
603 alias: String,
604 },
605}
606
607#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
609pub enum JoinType {
610 Inner,
611 Left,
612 Right,
613 Full,
614 Cross,
615}
616
617#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
619pub struct OrderByItem {
620 pub expr: Expression,
622 pub ascending: bool,
624 pub nulls_first: Option<bool>,
626}
627
628#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
630pub struct SelectStatement {
631 pub distinct: bool,
633 pub columns: Vec<SelectItem>,
635 pub from: Option<FromClause>,
637 pub where_clause: Option<Expression>,
639 pub group_by: Vec<Expression>,
641 pub having: Option<Expression>,
643 pub order_by: Vec<OrderByItem>,
645 pub limit: Option<usize>,
647 pub offset: Option<usize>,
649 pub window: Option<WindowSpec>,
651}
652
653#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
655pub struct CreateStreamStatement {
656 pub name: String,
658 pub columns: Vec<ColumnDefinition>,
660 pub properties: HashMap<String, String>,
662}
663
664#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
666pub struct ColumnDefinition {
667 pub name: String,
669 pub data_type: DataType,
671 pub not_null: bool,
673 pub default: Option<Expression>,
675}
676
677#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
679pub enum DataType {
680 Integer,
681 Float,
682 String,
683 Boolean,
684 Timestamp,
685 Array(Box<DataType>),
686 Map(Box<DataType>, Box<DataType>),
687}
688
689#[derive(Debug, Clone, Serialize, Deserialize)]
691pub struct ResultRow {
692 pub values: Vec<SqlValue>,
694}
695
696#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct QueryResult {
699 pub columns: Vec<String>,
701 pub rows: Vec<ResultRow>,
703 pub execution_time: Duration,
705 pub rows_affected: usize,
707}
708
709#[derive(Debug, Clone, Serialize, Deserialize)]
711pub struct StreamMetadata {
712 pub name: String,
714 pub columns: Vec<ColumnDefinition>,
716 pub properties: HashMap<String, String>,
718 pub created_at: DateTime<Utc>,
720}
721
722#[derive(Debug, Clone, Default, Serialize, Deserialize)]
724pub struct StreamSqlStats {
725 pub queries_executed: u64,
727 pub queries_succeeded: u64,
729 pub queries_failed: u64,
731 pub avg_execution_time_ms: f64,
733 pub cache_hits: u64,
735 pub cache_misses: u64,
737}