1use anyhow::{anyhow, Result};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12
13use crate::stream_sql_ast::{
14 AggregateFunction, BinaryOperator, Expression, FromClause, JoinType, Lexer, OrderByItem,
15 QueryResult, SelectItem, SelectStatement, SqlValue, StreamMetadata, StreamSqlConfig,
16 StreamSqlStats, Token, UnaryOperator, WindowSpec, WindowType,
17};
18
19pub struct Parser {
21 tokens: Vec<Token>,
22 position: usize,
23}
24
25impl Parser {
26 pub fn new(tokens: Vec<Token>) -> Self {
28 Self {
29 tokens,
30 position: 0,
31 }
32 }
33
34 fn current_token(&self) -> &Token {
36 self.tokens.get(self.position).unwrap_or(&Token::Eof)
37 }
38
39 fn advance(&mut self) {
41 self.position += 1;
42 }
43
44 fn expect(&mut self, expected: Token) -> Result<()> {
46 if self.current_token() == &expected {
47 self.advance();
48 Ok(())
49 } else {
50 Err(anyhow!(
51 "Expected {:?}, got {:?}",
52 expected,
53 self.current_token()
54 ))
55 }
56 }
57
58 pub fn parse_select(&mut self) -> Result<SelectStatement> {
60 self.expect(Token::Select)?;
61
62 let distinct = if self.current_token() == &Token::Distinct {
64 self.advance();
65 true
66 } else {
67 false
68 };
69
70 let columns = self.parse_select_list()?;
72
73 let from = if self.current_token() == &Token::From {
75 self.advance();
76 Some(self.parse_from_clause()?)
77 } else {
78 None
79 };
80
81 let window = if self.current_token() == &Token::Window {
83 self.advance();
84 Some(self.parse_window_spec()?)
85 } else {
86 None
87 };
88
89 let where_clause = if self.current_token() == &Token::Where {
91 self.advance();
92 Some(self.parse_expression()?)
93 } else {
94 None
95 };
96
97 let group_by = if self.current_token() == &Token::Group {
99 self.advance();
100 if self.current_token() == &Token::By {
102 self.advance();
103 }
104 self.parse_expression_list()?
105 } else {
106 Vec::new()
107 };
108
109 let having = if self.current_token() == &Token::Having {
111 self.advance();
112 Some(self.parse_expression()?)
113 } else {
114 None
115 };
116
117 let order_by = if self.current_token() == &Token::Order {
119 self.advance();
120 if self.current_token() == &Token::By {
122 self.advance();
123 }
124 self.parse_order_by_list()?
125 } else {
126 Vec::new()
127 };
128
129 let limit = if self.current_token() == &Token::Limit {
131 self.advance();
132 if let Token::NumberLiteral(n) = self.current_token() {
133 let limit = *n as usize;
134 self.advance();
135 Some(limit)
136 } else {
137 None
138 }
139 } else {
140 None
141 };
142
143 Ok(SelectStatement {
144 distinct,
145 columns,
146 from,
147 where_clause,
148 group_by,
149 having,
150 order_by,
151 limit,
152 offset: None,
153 window,
154 })
155 }
156
157 fn parse_select_list(&mut self) -> Result<Vec<SelectItem>> {
159 let mut items = Vec::new();
160
161 loop {
162 let expr = self.parse_expression()?;
163
164 let alias = if self.current_token() == &Token::As {
166 self.advance();
167 if let Token::Identifier(name) = self.current_token().clone() {
168 self.advance();
169 Some(name)
170 } else {
171 None
172 }
173 } else if let Token::Identifier(name) = self.current_token().clone() {
174 if name.to_uppercase() != "FROM"
176 && name.to_uppercase() != "WHERE"
177 && name.to_uppercase() != "GROUP"
178 && name.to_uppercase() != "ORDER"
179 && name.to_uppercase() != "WINDOW"
180 {
181 self.advance();
182 Some(name)
183 } else {
184 None
185 }
186 } else {
187 None
188 };
189
190 items.push(SelectItem { expr, alias });
191
192 if self.current_token() != &Token::Comma {
193 break;
194 }
195 self.advance(); }
197
198 Ok(items)
199 }
200
201 fn parse_from_clause(&mut self) -> Result<FromClause> {
203 let mut from = self.parse_table_reference()?;
204
205 while matches!(
207 self.current_token(),
208 Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full
209 ) {
210 let join_type = self.parse_join_type()?;
211 let right = self.parse_table_reference()?;
212
213 let condition = if self.current_token() == &Token::On {
214 self.advance();
215 Some(self.parse_expression()?)
216 } else {
217 None
218 };
219
220 from = FromClause::Join {
221 left: Box::new(from),
222 right: Box::new(right),
223 join_type,
224 condition,
225 };
226 }
227
228 Ok(from)
229 }
230
231 fn parse_table_reference(&mut self) -> Result<FromClause> {
233 if let Token::Identifier(name) = self.current_token().clone() {
234 self.advance();
235
236 let alias = if self.current_token() == &Token::As {
237 self.advance();
238 if let Token::Identifier(alias) = self.current_token().clone() {
239 self.advance();
240 Some(alias)
241 } else {
242 None
243 }
244 } else if let Token::Identifier(alias) = self.current_token().clone() {
245 if !matches!(
247 alias.to_uppercase().as_str(),
248 "WHERE"
249 | "GROUP"
250 | "ORDER"
251 | "HAVING"
252 | "LIMIT"
253 | "JOIN"
254 | "INNER"
255 | "LEFT"
256 | "RIGHT"
257 | "FULL"
258 | "ON"
259 | "WINDOW"
260 ) {
261 self.advance();
262 Some(alias)
263 } else {
264 None
265 }
266 } else {
267 None
268 };
269
270 Ok(FromClause::Table { name, alias })
271 } else {
272 Err(anyhow!("Expected table name"))
273 }
274 }
275
276 fn parse_join_type(&mut self) -> Result<JoinType> {
278 let join_type = match self.current_token() {
279 Token::Inner => {
280 self.advance();
281 JoinType::Inner
282 }
283 Token::Left => {
284 self.advance();
285 if self.current_token() == &Token::Outer {
286 self.advance();
287 }
288 JoinType::Left
289 }
290 Token::Right => {
291 self.advance();
292 if self.current_token() == &Token::Outer {
293 self.advance();
294 }
295 JoinType::Right
296 }
297 Token::Full => {
298 self.advance();
299 if self.current_token() == &Token::Outer {
300 self.advance();
301 }
302 JoinType::Full
303 }
304 _ => JoinType::Inner,
305 };
306
307 if self.current_token() == &Token::Join {
309 self.advance();
310 }
311
312 Ok(join_type)
313 }
314
315 fn parse_window_spec(&mut self) -> Result<WindowSpec> {
317 let window_type = match self.current_token() {
318 Token::Tumbling => {
319 self.advance();
320 WindowType::Tumbling
321 }
322 Token::Sliding => {
323 self.advance();
324 WindowType::Sliding
325 }
326 Token::Session => {
327 self.advance();
328 WindowType::Session
329 }
330 _ => WindowType::Tumbling,
331 };
332
333 self.expect(Token::OpenParen)?;
334
335 let mut size = Duration::from_secs(60);
336 let mut slide = None;
337 let mut gap = None;
338
339 while self.current_token() != &Token::CloseParen {
341 match self.current_token() {
342 Token::Size => {
343 self.advance();
344 size = self.parse_duration()?;
345 }
346 Token::Slide => {
347 self.advance();
348 slide = Some(self.parse_duration()?);
349 }
350 Token::Gap => {
351 self.advance();
352 gap = Some(self.parse_duration()?);
353 }
354 Token::Comma => {
355 self.advance();
356 }
357 _ => {
358 self.advance();
359 }
360 }
361 }
362
363 self.expect(Token::CloseParen)?;
364
365 Ok(WindowSpec {
366 window_type,
367 size,
368 slide,
369 gap,
370 time_attribute: None,
371 })
372 }
373
374 fn parse_duration(&mut self) -> Result<Duration> {
376 let value = if let Token::NumberLiteral(n) = self.current_token() {
377 let v = *n as u64;
378 self.advance();
379 v
380 } else {
381 return Err(anyhow!("Expected number for duration"));
382 };
383
384 let unit = if let Token::Identifier(unit) = self.current_token().clone() {
385 self.advance();
386 unit.to_uppercase()
387 } else {
388 "SECONDS".to_string()
389 };
390
391 let duration = match unit.as_str() {
392 "MILLISECONDS" | "MILLIS" | "MS" => Duration::from_millis(value),
393 "SECONDS" | "SECOND" | "S" => Duration::from_secs(value),
394 "MINUTES" | "MINUTE" | "M" => Duration::from_secs(value * 60),
395 "HOURS" | "HOUR" | "H" => Duration::from_secs(value * 3600),
396 "DAYS" | "DAY" | "D" => Duration::from_secs(value * 86400),
397 _ => Duration::from_secs(value),
398 };
399
400 Ok(duration)
401 }
402
403 fn parse_expression(&mut self) -> Result<Expression> {
405 self.parse_or_expression()
406 }
407
408 fn parse_or_expression(&mut self) -> Result<Expression> {
410 let mut left = self.parse_and_expression()?;
411
412 while self.current_token() == &Token::Or {
413 self.advance();
414 let right = self.parse_and_expression()?;
415 left = Expression::BinaryOp {
416 left: Box::new(left),
417 op: BinaryOperator::Or,
418 right: Box::new(right),
419 };
420 }
421
422 Ok(left)
423 }
424
425 fn parse_and_expression(&mut self) -> Result<Expression> {
427 let mut left = self.parse_comparison_expression()?;
428
429 while self.current_token() == &Token::And {
430 self.advance();
431 let right = self.parse_comparison_expression()?;
432 left = Expression::BinaryOp {
433 left: Box::new(left),
434 op: BinaryOperator::And,
435 right: Box::new(right),
436 };
437 }
438
439 Ok(left)
440 }
441
442 fn parse_comparison_expression(&mut self) -> Result<Expression> {
444 let left = self.parse_additive_expression()?;
445
446 let op = match self.current_token() {
447 Token::Equal => Some(BinaryOperator::Equal),
448 Token::NotEqual => Some(BinaryOperator::NotEqual),
449 Token::LessThan => Some(BinaryOperator::LessThan),
450 Token::LessThanOrEqual => Some(BinaryOperator::LessThanOrEqual),
451 Token::GreaterThan => Some(BinaryOperator::GreaterThan),
452 Token::GreaterThanOrEqual => Some(BinaryOperator::GreaterThanOrEqual),
453 Token::Like => Some(BinaryOperator::Like),
454 _ => None,
455 };
456
457 if let Some(op) = op {
458 self.advance();
459 let right = self.parse_additive_expression()?;
460 Ok(Expression::BinaryOp {
461 left: Box::new(left),
462 op,
463 right: Box::new(right),
464 })
465 } else {
466 Ok(left)
467 }
468 }
469
470 fn parse_additive_expression(&mut self) -> Result<Expression> {
472 let mut left = self.parse_multiplicative_expression()?;
473
474 loop {
475 let op = match self.current_token() {
476 Token::Plus => Some(BinaryOperator::Plus),
477 Token::Minus => Some(BinaryOperator::Minus),
478 _ => None,
479 };
480
481 if let Some(op) = op {
482 self.advance();
483 let right = self.parse_multiplicative_expression()?;
484 left = Expression::BinaryOp {
485 left: Box::new(left),
486 op,
487 right: Box::new(right),
488 };
489 } else {
490 break;
491 }
492 }
493
494 Ok(left)
495 }
496
497 fn parse_multiplicative_expression(&mut self) -> Result<Expression> {
499 let mut left = self.parse_unary_expression()?;
500
501 loop {
502 let op = match self.current_token() {
503 Token::Multiply | Token::Star => Some(BinaryOperator::Multiply),
504 Token::Divide => Some(BinaryOperator::Divide),
505 Token::Modulo => Some(BinaryOperator::Modulo),
506 _ => None,
507 };
508
509 if let Some(op) = op {
510 self.advance();
511 let right = self.parse_unary_expression()?;
512 left = Expression::BinaryOp {
513 left: Box::new(left),
514 op,
515 right: Box::new(right),
516 };
517 } else {
518 break;
519 }
520 }
521
522 Ok(left)
523 }
524
525 fn parse_unary_expression(&mut self) -> Result<Expression> {
527 match self.current_token() {
528 Token::Not => {
529 self.advance();
530 let expr = self.parse_unary_expression()?;
531 Ok(Expression::UnaryOp {
532 op: UnaryOperator::Not,
533 expr: Box::new(expr),
534 })
535 }
536 Token::Minus => {
537 self.advance();
538 let expr = self.parse_unary_expression()?;
539 Ok(Expression::UnaryOp {
540 op: UnaryOperator::Minus,
541 expr: Box::new(expr),
542 })
543 }
544 _ => self.parse_primary_expression(),
545 }
546 }
547
548 fn parse_primary_expression(&mut self) -> Result<Expression> {
550 match self.current_token().clone() {
551 Token::Star => {
552 self.advance();
553 Ok(Expression::Star)
554 }
555 Token::NumberLiteral(n) => {
556 self.advance();
557 if n.fract() == 0.0 {
558 Ok(Expression::Literal(SqlValue::Integer(n as i64)))
559 } else {
560 Ok(Expression::Literal(SqlValue::Float(n)))
561 }
562 }
563 Token::StringLiteral(s) => {
564 self.advance();
565 Ok(Expression::Literal(SqlValue::String(s)))
566 }
567 Token::BooleanLiteral(b) => {
568 self.advance();
569 Ok(Expression::Literal(SqlValue::Boolean(b)))
570 }
571 Token::Null => {
572 self.advance();
573 Ok(Expression::Literal(SqlValue::Null))
574 }
575 Token::Count
576 | Token::Sum
577 | Token::Avg
578 | Token::Min
579 | Token::Max
580 | Token::StdDev
581 | Token::Variance => {
582 let func = match self.current_token() {
583 Token::Count => AggregateFunction::Count,
584 Token::Sum => AggregateFunction::Sum,
585 Token::Avg => AggregateFunction::Avg,
586 Token::Min => AggregateFunction::Min,
587 Token::Max => AggregateFunction::Max,
588 Token::StdDev => AggregateFunction::StdDev,
589 Token::Variance => AggregateFunction::Variance,
590 _ => unreachable!(),
591 };
592 self.advance();
593 self.expect(Token::OpenParen)?;
594
595 let distinct = if self.current_token() == &Token::Distinct {
596 self.advance();
597 true
598 } else {
599 false
600 };
601
602 let expr = self.parse_expression()?;
603 self.expect(Token::CloseParen)?;
604
605 Ok(Expression::Aggregate {
606 func,
607 expr: Box::new(expr),
608 distinct,
609 })
610 }
611 Token::Identifier(name) => {
612 self.advance();
613
614 if self.current_token() == &Token::OpenParen {
616 self.advance();
617 let mut args = Vec::new();
618
619 if self.current_token() != &Token::CloseParen {
620 loop {
621 args.push(self.parse_expression()?);
622 if self.current_token() != &Token::Comma {
623 break;
624 }
625 self.advance();
626 }
627 }
628
629 self.expect(Token::CloseParen)?;
630
631 Ok(Expression::Function {
632 name,
633 args,
634 distinct: false,
635 })
636 } else if self.current_token() == &Token::Dot {
637 self.advance();
639 if let Token::Identifier(column) = self.current_token().clone() {
640 self.advance();
641 Ok(Expression::QualifiedColumn(name, column))
642 } else {
643 Ok(Expression::Column(name))
644 }
645 } else {
646 Ok(Expression::Column(name))
647 }
648 }
649 Token::OpenParen => {
650 self.advance();
651 let expr = self.parse_expression()?;
652 self.expect(Token::CloseParen)?;
653 Ok(expr)
654 }
655 _ => Err(anyhow!("Unexpected token: {:?}", self.current_token())),
656 }
657 }
658
659 fn parse_expression_list(&mut self) -> Result<Vec<Expression>> {
661 let mut exprs = Vec::new();
662
663 loop {
664 exprs.push(self.parse_expression()?);
665 if self.current_token() != &Token::Comma {
666 break;
667 }
668 self.advance();
669 }
670
671 Ok(exprs)
672 }
673
674 fn parse_order_by_list(&mut self) -> Result<Vec<OrderByItem>> {
676 let mut items = Vec::new();
677
678 loop {
679 let expr = self.parse_expression()?;
680
681 let ascending = if let Token::Identifier(dir) = self.current_token().clone() {
682 match dir.to_uppercase().as_str() {
683 "ASC" => {
684 self.advance();
685 true
686 }
687 "DESC" => {
688 self.advance();
689 false
690 }
691 _ => true,
692 }
693 } else {
694 true
695 };
696
697 items.push(OrderByItem {
698 expr,
699 ascending,
700 nulls_first: None,
701 });
702
703 if self.current_token() != &Token::Comma {
704 break;
705 }
706 self.advance();
707 }
708
709 Ok(items)
710 }
711}
712
713pub struct StreamSqlEngine {
719 config: StreamSqlConfig,
721 streams: Arc<RwLock<HashMap<String, StreamMetadata>>>,
723 query_cache: Arc<RwLock<HashMap<String, SelectStatement>>>,
725 stats: Arc<RwLock<StreamSqlStats>>,
727}
728
729impl StreamSqlEngine {
730 pub fn new(config: StreamSqlConfig) -> Self {
732 Self {
733 config,
734 streams: Arc::new(RwLock::new(HashMap::new())),
735 query_cache: Arc::new(RwLock::new(HashMap::new())),
736 stats: Arc::new(RwLock::new(StreamSqlStats::default())),
737 }
738 }
739
740 pub fn parse(&self, sql: &str) -> Result<SelectStatement> {
742 let mut lexer = Lexer::new(sql);
743 let tokens = lexer.tokenize();
744 let mut parser = Parser::new(tokens);
745 parser.parse_select()
746 }
747
748 pub async fn execute(&self, sql: &str) -> Result<QueryResult> {
750 let start_time = std::time::Instant::now();
751
752 if self.config.enable_query_cache {
754 let cache = self.query_cache.read().await;
755 if cache.contains_key(sql) {
756 let mut stats = self.stats.write().await;
757 stats.cache_hits += 1;
758 debug!("Query cache hit");
759 } else {
760 let mut stats = self.stats.write().await;
761 stats.cache_misses += 1;
762 }
763 }
764
765 let statement = self.parse(sql)?;
767
768 if self.config.enable_query_cache {
770 let mut cache = self.query_cache.write().await;
771 if cache.len() < self.config.cache_size {
772 cache.insert(sql.to_string(), statement.clone());
773 }
774 }
775
776 let result = QueryResult {
778 columns: statement
779 .columns
780 .iter()
781 .map(|c| c.alias.clone().unwrap_or_else(|| "column_0".to_string()))
782 .collect(),
783 rows: Vec::new(),
784 execution_time: start_time.elapsed(),
785 rows_affected: 0,
786 };
787
788 let mut stats = self.stats.write().await;
790 stats.queries_executed += 1;
791 stats.queries_succeeded += 1;
792 stats.avg_execution_time_ms = (stats.avg_execution_time_ms
793 * (stats.queries_executed - 1) as f64
794 + result.execution_time.as_millis() as f64)
795 / stats.queries_executed as f64;
796
797 if self.config.enable_query_logging {
798 info!(
799 "Executed query in {:?}: {}",
800 result.execution_time,
801 &sql[..sql.len().min(100)]
802 );
803 }
804
805 Ok(result)
806 }
807
808 pub async fn register_stream(&self, metadata: StreamMetadata) -> Result<()> {
810 let mut streams = self.streams.write().await;
811 info!("Registering stream: {}", metadata.name);
812 streams.insert(metadata.name.clone(), metadata);
813 Ok(())
814 }
815
816 pub async fn unregister_stream(&self, name: &str) -> Result<()> {
818 let mut streams = self.streams.write().await;
819 if streams.remove(name).is_some() {
820 info!("Unregistered stream: {}", name);
821 Ok(())
822 } else {
823 Err(anyhow!("Stream not found: {}", name))
824 }
825 }
826
827 pub async fn get_stream(&self, name: &str) -> Option<StreamMetadata> {
829 let streams = self.streams.read().await;
830 streams.get(name).cloned()
831 }
832
833 pub async fn list_streams(&self) -> Vec<String> {
835 let streams = self.streams.read().await;
836 streams.keys().cloned().collect()
837 }
838
839 pub async fn get_stats(&self) -> StreamSqlStats {
841 self.stats.read().await.clone()
842 }
843
844 pub async fn clear_cache(&self) {
846 let mut cache = self.query_cache.write().await;
847 cache.clear();
848 info!("Query cache cleared");
849 }
850
851 pub fn validate(&self, sql: &str) -> Result<()> {
853 self.parse(sql)?;
854 Ok(())
855 }
856
857 pub fn explain(&self, sql: &str) -> Result<String> {
859 let statement = self.parse(sql)?;
860 Ok(format!("{:#?}", statement))
861 }
862}