Skip to main content

oxirs_stream/
stream_sql_executor.rs

1//! # Stream SQL — Parser and Execution Engine
2//!
3//! Contains the recursive-descent `Parser` that turns token streams into AST nodes,
4//! and `StreamSqlEngine` which drives lexing, parsing, caching, and execution.
5
6use anyhow::{anyhow, Result};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12
13use crate::stream_sql_ast::{
14    AggregateFunction, BinaryOperator, Expression, FromClause, JoinType, Lexer, OrderByItem,
15    QueryResult, SelectItem, SelectStatement, SqlValue, StreamMetadata, StreamSqlConfig,
16    StreamSqlStats, Token, UnaryOperator, WindowSpec, WindowType,
17};
18
19/// Parser for SQL queries
20pub struct Parser {
21    tokens: Vec<Token>,
22    position: usize,
23}
24
25impl Parser {
26    /// Create a new parser
27    pub fn new(tokens: Vec<Token>) -> Self {
28        Self {
29            tokens,
30            position: 0,
31        }
32    }
33
34    /// Get current token
35    fn current_token(&self) -> &Token {
36        self.tokens.get(self.position).unwrap_or(&Token::Eof)
37    }
38
39    /// Advance to next token
40    fn advance(&mut self) {
41        self.position += 1;
42    }
43
44    /// Expect a specific token
45    fn expect(&mut self, expected: Token) -> Result<()> {
46        if self.current_token() == &expected {
47            self.advance();
48            Ok(())
49        } else {
50            Err(anyhow!(
51                "Expected {:?}, got {:?}",
52                expected,
53                self.current_token()
54            ))
55        }
56    }
57
58    /// Parse a SELECT statement
59    pub fn parse_select(&mut self) -> Result<SelectStatement> {
60        self.expect(Token::Select)?;
61
62        // DISTINCT
63        let distinct = if self.current_token() == &Token::Distinct {
64            self.advance();
65            true
66        } else {
67            false
68        };
69
70        // SELECT list
71        let columns = self.parse_select_list()?;
72
73        // FROM clause
74        let from = if self.current_token() == &Token::From {
75            self.advance();
76            Some(self.parse_from_clause()?)
77        } else {
78            None
79        };
80
81        // WINDOW clause
82        let window = if self.current_token() == &Token::Window {
83            self.advance();
84            Some(self.parse_window_spec()?)
85        } else {
86            None
87        };
88
89        // WHERE clause
90        let where_clause = if self.current_token() == &Token::Where {
91            self.advance();
92            Some(self.parse_expression()?)
93        } else {
94            None
95        };
96
97        // GROUP BY clause
98        let group_by = if self.current_token() == &Token::Group {
99            self.advance();
100            // Expect 'BY'
101            if self.current_token() == &Token::By {
102                self.advance();
103            }
104            self.parse_expression_list()?
105        } else {
106            Vec::new()
107        };
108
109        // HAVING clause
110        let having = if self.current_token() == &Token::Having {
111            self.advance();
112            Some(self.parse_expression()?)
113        } else {
114            None
115        };
116
117        // ORDER BY clause
118        let order_by = if self.current_token() == &Token::Order {
119            self.advance();
120            // Expect 'BY'
121            if self.current_token() == &Token::By {
122                self.advance();
123            }
124            self.parse_order_by_list()?
125        } else {
126            Vec::new()
127        };
128
129        // LIMIT
130        let limit = if self.current_token() == &Token::Limit {
131            self.advance();
132            if let Token::NumberLiteral(n) = self.current_token() {
133                let limit = *n as usize;
134                self.advance();
135                Some(limit)
136            } else {
137                None
138            }
139        } else {
140            None
141        };
142
143        Ok(SelectStatement {
144            distinct,
145            columns,
146            from,
147            where_clause,
148            group_by,
149            having,
150            order_by,
151            limit,
152            offset: None,
153            window,
154        })
155    }
156
157    /// Parse SELECT list
158    fn parse_select_list(&mut self) -> Result<Vec<SelectItem>> {
159        let mut items = Vec::new();
160
161        loop {
162            let expr = self.parse_expression()?;
163
164            // Check for alias
165            let alias = if self.current_token() == &Token::As {
166                self.advance();
167                if let Token::Identifier(name) = self.current_token().clone() {
168                    self.advance();
169                    Some(name)
170                } else {
171                    None
172                }
173            } else if let Token::Identifier(name) = self.current_token().clone() {
174                // Alias without AS
175                if name.to_uppercase() != "FROM"
176                    && name.to_uppercase() != "WHERE"
177                    && name.to_uppercase() != "GROUP"
178                    && name.to_uppercase() != "ORDER"
179                    && name.to_uppercase() != "WINDOW"
180                {
181                    self.advance();
182                    Some(name)
183                } else {
184                    None
185                }
186            } else {
187                None
188            };
189
190            items.push(SelectItem { expr, alias });
191
192            if self.current_token() != &Token::Comma {
193                break;
194            }
195            self.advance(); // Skip comma
196        }
197
198        Ok(items)
199    }
200
201    /// Parse FROM clause
202    fn parse_from_clause(&mut self) -> Result<FromClause> {
203        let mut from = self.parse_table_reference()?;
204
205        // Check for joins
206        while matches!(
207            self.current_token(),
208            Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full
209        ) {
210            let join_type = self.parse_join_type()?;
211            let right = self.parse_table_reference()?;
212
213            let condition = if self.current_token() == &Token::On {
214                self.advance();
215                Some(self.parse_expression()?)
216            } else {
217                None
218            };
219
220            from = FromClause::Join {
221                left: Box::new(from),
222                right: Box::new(right),
223                join_type,
224                condition,
225            };
226        }
227
228        Ok(from)
229    }
230
231    /// Parse table reference
232    fn parse_table_reference(&mut self) -> Result<FromClause> {
233        if let Token::Identifier(name) = self.current_token().clone() {
234            self.advance();
235
236            let alias = if self.current_token() == &Token::As {
237                self.advance();
238                if let Token::Identifier(alias) = self.current_token().clone() {
239                    self.advance();
240                    Some(alias)
241                } else {
242                    None
243                }
244            } else if let Token::Identifier(alias) = self.current_token().clone() {
245                // Check if this is an alias or a keyword
246                if !matches!(
247                    alias.to_uppercase().as_str(),
248                    "WHERE"
249                        | "GROUP"
250                        | "ORDER"
251                        | "HAVING"
252                        | "LIMIT"
253                        | "JOIN"
254                        | "INNER"
255                        | "LEFT"
256                        | "RIGHT"
257                        | "FULL"
258                        | "ON"
259                        | "WINDOW"
260                ) {
261                    self.advance();
262                    Some(alias)
263                } else {
264                    None
265                }
266            } else {
267                None
268            };
269
270            Ok(FromClause::Table { name, alias })
271        } else {
272            Err(anyhow!("Expected table name"))
273        }
274    }
275
276    /// Parse join type
277    fn parse_join_type(&mut self) -> Result<JoinType> {
278        let join_type = match self.current_token() {
279            Token::Inner => {
280                self.advance();
281                JoinType::Inner
282            }
283            Token::Left => {
284                self.advance();
285                if self.current_token() == &Token::Outer {
286                    self.advance();
287                }
288                JoinType::Left
289            }
290            Token::Right => {
291                self.advance();
292                if self.current_token() == &Token::Outer {
293                    self.advance();
294                }
295                JoinType::Right
296            }
297            Token::Full => {
298                self.advance();
299                if self.current_token() == &Token::Outer {
300                    self.advance();
301                }
302                JoinType::Full
303            }
304            _ => JoinType::Inner,
305        };
306
307        // Expect JOIN keyword
308        if self.current_token() == &Token::Join {
309            self.advance();
310        }
311
312        Ok(join_type)
313    }
314
315    /// Parse window specification
316    fn parse_window_spec(&mut self) -> Result<WindowSpec> {
317        let window_type = match self.current_token() {
318            Token::Tumbling => {
319                self.advance();
320                WindowType::Tumbling
321            }
322            Token::Sliding => {
323                self.advance();
324                WindowType::Sliding
325            }
326            Token::Session => {
327                self.advance();
328                WindowType::Session
329            }
330            _ => WindowType::Tumbling,
331        };
332
333        self.expect(Token::OpenParen)?;
334
335        let mut size = Duration::from_secs(60);
336        let mut slide = None;
337        let mut gap = None;
338
339        // Parse window parameters
340        while self.current_token() != &Token::CloseParen {
341            match self.current_token() {
342                Token::Size => {
343                    self.advance();
344                    size = self.parse_duration()?;
345                }
346                Token::Slide => {
347                    self.advance();
348                    slide = Some(self.parse_duration()?);
349                }
350                Token::Gap => {
351                    self.advance();
352                    gap = Some(self.parse_duration()?);
353                }
354                Token::Comma => {
355                    self.advance();
356                }
357                _ => {
358                    self.advance();
359                }
360            }
361        }
362
363        self.expect(Token::CloseParen)?;
364
365        Ok(WindowSpec {
366            window_type,
367            size,
368            slide,
369            gap,
370            time_attribute: None,
371        })
372    }
373
374    /// Parse duration (e.g., "5 MINUTES")
375    fn parse_duration(&mut self) -> Result<Duration> {
376        let value = if let Token::NumberLiteral(n) = self.current_token() {
377            let v = *n as u64;
378            self.advance();
379            v
380        } else {
381            return Err(anyhow!("Expected number for duration"));
382        };
383
384        let unit = if let Token::Identifier(unit) = self.current_token().clone() {
385            self.advance();
386            unit.to_uppercase()
387        } else {
388            "SECONDS".to_string()
389        };
390
391        let duration = match unit.as_str() {
392            "MILLISECONDS" | "MILLIS" | "MS" => Duration::from_millis(value),
393            "SECONDS" | "SECOND" | "S" => Duration::from_secs(value),
394            "MINUTES" | "MINUTE" | "M" => Duration::from_secs(value * 60),
395            "HOURS" | "HOUR" | "H" => Duration::from_secs(value * 3600),
396            "DAYS" | "DAY" | "D" => Duration::from_secs(value * 86400),
397            _ => Duration::from_secs(value),
398        };
399
400        Ok(duration)
401    }
402
403    /// Parse expression
404    fn parse_expression(&mut self) -> Result<Expression> {
405        self.parse_or_expression()
406    }
407
408    /// Parse OR expression
409    fn parse_or_expression(&mut self) -> Result<Expression> {
410        let mut left = self.parse_and_expression()?;
411
412        while self.current_token() == &Token::Or {
413            self.advance();
414            let right = self.parse_and_expression()?;
415            left = Expression::BinaryOp {
416                left: Box::new(left),
417                op: BinaryOperator::Or,
418                right: Box::new(right),
419            };
420        }
421
422        Ok(left)
423    }
424
425    /// Parse AND expression
426    fn parse_and_expression(&mut self) -> Result<Expression> {
427        let mut left = self.parse_comparison_expression()?;
428
429        while self.current_token() == &Token::And {
430            self.advance();
431            let right = self.parse_comparison_expression()?;
432            left = Expression::BinaryOp {
433                left: Box::new(left),
434                op: BinaryOperator::And,
435                right: Box::new(right),
436            };
437        }
438
439        Ok(left)
440    }
441
442    /// Parse comparison expression
443    fn parse_comparison_expression(&mut self) -> Result<Expression> {
444        let left = self.parse_additive_expression()?;
445
446        let op = match self.current_token() {
447            Token::Equal => Some(BinaryOperator::Equal),
448            Token::NotEqual => Some(BinaryOperator::NotEqual),
449            Token::LessThan => Some(BinaryOperator::LessThan),
450            Token::LessThanOrEqual => Some(BinaryOperator::LessThanOrEqual),
451            Token::GreaterThan => Some(BinaryOperator::GreaterThan),
452            Token::GreaterThanOrEqual => Some(BinaryOperator::GreaterThanOrEqual),
453            Token::Like => Some(BinaryOperator::Like),
454            _ => None,
455        };
456
457        if let Some(op) = op {
458            self.advance();
459            let right = self.parse_additive_expression()?;
460            Ok(Expression::BinaryOp {
461                left: Box::new(left),
462                op,
463                right: Box::new(right),
464            })
465        } else {
466            Ok(left)
467        }
468    }
469
470    /// Parse additive expression
471    fn parse_additive_expression(&mut self) -> Result<Expression> {
472        let mut left = self.parse_multiplicative_expression()?;
473
474        loop {
475            let op = match self.current_token() {
476                Token::Plus => Some(BinaryOperator::Plus),
477                Token::Minus => Some(BinaryOperator::Minus),
478                _ => None,
479            };
480
481            if let Some(op) = op {
482                self.advance();
483                let right = self.parse_multiplicative_expression()?;
484                left = Expression::BinaryOp {
485                    left: Box::new(left),
486                    op,
487                    right: Box::new(right),
488                };
489            } else {
490                break;
491            }
492        }
493
494        Ok(left)
495    }
496
497    /// Parse multiplicative expression
498    fn parse_multiplicative_expression(&mut self) -> Result<Expression> {
499        let mut left = self.parse_unary_expression()?;
500
501        loop {
502            let op = match self.current_token() {
503                Token::Multiply | Token::Star => Some(BinaryOperator::Multiply),
504                Token::Divide => Some(BinaryOperator::Divide),
505                Token::Modulo => Some(BinaryOperator::Modulo),
506                _ => None,
507            };
508
509            if let Some(op) = op {
510                self.advance();
511                let right = self.parse_unary_expression()?;
512                left = Expression::BinaryOp {
513                    left: Box::new(left),
514                    op,
515                    right: Box::new(right),
516                };
517            } else {
518                break;
519            }
520        }
521
522        Ok(left)
523    }
524
525    /// Parse unary expression
526    fn parse_unary_expression(&mut self) -> Result<Expression> {
527        match self.current_token() {
528            Token::Not => {
529                self.advance();
530                let expr = self.parse_unary_expression()?;
531                Ok(Expression::UnaryOp {
532                    op: UnaryOperator::Not,
533                    expr: Box::new(expr),
534                })
535            }
536            Token::Minus => {
537                self.advance();
538                let expr = self.parse_unary_expression()?;
539                Ok(Expression::UnaryOp {
540                    op: UnaryOperator::Minus,
541                    expr: Box::new(expr),
542                })
543            }
544            _ => self.parse_primary_expression(),
545        }
546    }
547
548    /// Parse primary expression
549    fn parse_primary_expression(&mut self) -> Result<Expression> {
550        match self.current_token().clone() {
551            Token::Star => {
552                self.advance();
553                Ok(Expression::Star)
554            }
555            Token::NumberLiteral(n) => {
556                self.advance();
557                if n.fract() == 0.0 {
558                    Ok(Expression::Literal(SqlValue::Integer(n as i64)))
559                } else {
560                    Ok(Expression::Literal(SqlValue::Float(n)))
561                }
562            }
563            Token::StringLiteral(s) => {
564                self.advance();
565                Ok(Expression::Literal(SqlValue::String(s)))
566            }
567            Token::BooleanLiteral(b) => {
568                self.advance();
569                Ok(Expression::Literal(SqlValue::Boolean(b)))
570            }
571            Token::Null => {
572                self.advance();
573                Ok(Expression::Literal(SqlValue::Null))
574            }
575            Token::Count
576            | Token::Sum
577            | Token::Avg
578            | Token::Min
579            | Token::Max
580            | Token::StdDev
581            | Token::Variance => {
582                let func = match self.current_token() {
583                    Token::Count => AggregateFunction::Count,
584                    Token::Sum => AggregateFunction::Sum,
585                    Token::Avg => AggregateFunction::Avg,
586                    Token::Min => AggregateFunction::Min,
587                    Token::Max => AggregateFunction::Max,
588                    Token::StdDev => AggregateFunction::StdDev,
589                    Token::Variance => AggregateFunction::Variance,
590                    _ => unreachable!(),
591                };
592                self.advance();
593                self.expect(Token::OpenParen)?;
594
595                let distinct = if self.current_token() == &Token::Distinct {
596                    self.advance();
597                    true
598                } else {
599                    false
600                };
601
602                let expr = self.parse_expression()?;
603                self.expect(Token::CloseParen)?;
604
605                Ok(Expression::Aggregate {
606                    func,
607                    expr: Box::new(expr),
608                    distinct,
609                })
610            }
611            Token::Identifier(name) => {
612                self.advance();
613
614                // Check for function call
615                if self.current_token() == &Token::OpenParen {
616                    self.advance();
617                    let mut args = Vec::new();
618
619                    if self.current_token() != &Token::CloseParen {
620                        loop {
621                            args.push(self.parse_expression()?);
622                            if self.current_token() != &Token::Comma {
623                                break;
624                            }
625                            self.advance();
626                        }
627                    }
628
629                    self.expect(Token::CloseParen)?;
630
631                    Ok(Expression::Function {
632                        name,
633                        args,
634                        distinct: false,
635                    })
636                } else if self.current_token() == &Token::Dot {
637                    // Qualified column name
638                    self.advance();
639                    if let Token::Identifier(column) = self.current_token().clone() {
640                        self.advance();
641                        Ok(Expression::QualifiedColumn(name, column))
642                    } else {
643                        Ok(Expression::Column(name))
644                    }
645                } else {
646                    Ok(Expression::Column(name))
647                }
648            }
649            Token::OpenParen => {
650                self.advance();
651                let expr = self.parse_expression()?;
652                self.expect(Token::CloseParen)?;
653                Ok(expr)
654            }
655            _ => Err(anyhow!("Unexpected token: {:?}", self.current_token())),
656        }
657    }
658
659    /// Parse expression list
660    fn parse_expression_list(&mut self) -> Result<Vec<Expression>> {
661        let mut exprs = Vec::new();
662
663        loop {
664            exprs.push(self.parse_expression()?);
665            if self.current_token() != &Token::Comma {
666                break;
667            }
668            self.advance();
669        }
670
671        Ok(exprs)
672    }
673
674    /// Parse ORDER BY list
675    fn parse_order_by_list(&mut self) -> Result<Vec<OrderByItem>> {
676        let mut items = Vec::new();
677
678        loop {
679            let expr = self.parse_expression()?;
680
681            let ascending = if let Token::Identifier(dir) = self.current_token().clone() {
682                match dir.to_uppercase().as_str() {
683                    "ASC" => {
684                        self.advance();
685                        true
686                    }
687                    "DESC" => {
688                        self.advance();
689                        false
690                    }
691                    _ => true,
692                }
693            } else {
694                true
695            };
696
697            items.push(OrderByItem {
698                expr,
699                ascending,
700                nulls_first: None,
701            });
702
703            if self.current_token() != &Token::Comma {
704                break;
705            }
706            self.advance();
707        }
708
709        Ok(items)
710    }
711}
712
713// ---------------------------------------------------------------------------
714// Stream SQL Engine
715// ---------------------------------------------------------------------------
716
717/// Stream SQL engine
718pub struct StreamSqlEngine {
719    /// Configuration
720    config: StreamSqlConfig,
721    /// Registered streams
722    streams: Arc<RwLock<HashMap<String, StreamMetadata>>>,
723    /// Query cache
724    query_cache: Arc<RwLock<HashMap<String, SelectStatement>>>,
725    /// Statistics
726    stats: Arc<RwLock<StreamSqlStats>>,
727}
728
729impl StreamSqlEngine {
730    /// Create a new Stream SQL engine
731    pub fn new(config: StreamSqlConfig) -> Self {
732        Self {
733            config,
734            streams: Arc::new(RwLock::new(HashMap::new())),
735            query_cache: Arc::new(RwLock::new(HashMap::new())),
736            stats: Arc::new(RwLock::new(StreamSqlStats::default())),
737        }
738    }
739
740    /// Parse a SQL query
741    pub fn parse(&self, sql: &str) -> Result<SelectStatement> {
742        let mut lexer = Lexer::new(sql);
743        let tokens = lexer.tokenize();
744        let mut parser = Parser::new(tokens);
745        parser.parse_select()
746    }
747
748    /// Execute a SQL query
749    pub async fn execute(&self, sql: &str) -> Result<QueryResult> {
750        let start_time = std::time::Instant::now();
751
752        // Check cache
753        if self.config.enable_query_cache {
754            let cache = self.query_cache.read().await;
755            if cache.contains_key(sql) {
756                let mut stats = self.stats.write().await;
757                stats.cache_hits += 1;
758                debug!("Query cache hit");
759            } else {
760                let mut stats = self.stats.write().await;
761                stats.cache_misses += 1;
762            }
763        }
764
765        // Parse query
766        let statement = self.parse(sql)?;
767
768        // Update cache
769        if self.config.enable_query_cache {
770            let mut cache = self.query_cache.write().await;
771            if cache.len() < self.config.cache_size {
772                cache.insert(sql.to_string(), statement.clone());
773            }
774        }
775
776        // Execute query (placeholder - actual execution would process the AST)
777        let result = QueryResult {
778            columns: statement
779                .columns
780                .iter()
781                .map(|c| c.alias.clone().unwrap_or_else(|| "column_0".to_string()))
782                .collect(),
783            rows: Vec::new(),
784            execution_time: start_time.elapsed(),
785            rows_affected: 0,
786        };
787
788        // Update statistics
789        let mut stats = self.stats.write().await;
790        stats.queries_executed += 1;
791        stats.queries_succeeded += 1;
792        stats.avg_execution_time_ms = (stats.avg_execution_time_ms
793            * (stats.queries_executed - 1) as f64
794            + result.execution_time.as_millis() as f64)
795            / stats.queries_executed as f64;
796
797        if self.config.enable_query_logging {
798            info!(
799                "Executed query in {:?}: {}",
800                result.execution_time,
801                &sql[..sql.len().min(100)]
802            );
803        }
804
805        Ok(result)
806    }
807
808    /// Register a stream
809    pub async fn register_stream(&self, metadata: StreamMetadata) -> Result<()> {
810        let mut streams = self.streams.write().await;
811        info!("Registering stream: {}", metadata.name);
812        streams.insert(metadata.name.clone(), metadata);
813        Ok(())
814    }
815
816    /// Unregister a stream
817    pub async fn unregister_stream(&self, name: &str) -> Result<()> {
818        let mut streams = self.streams.write().await;
819        if streams.remove(name).is_some() {
820            info!("Unregistered stream: {}", name);
821            Ok(())
822        } else {
823            Err(anyhow!("Stream not found: {}", name))
824        }
825    }
826
827    /// Get stream metadata
828    pub async fn get_stream(&self, name: &str) -> Option<StreamMetadata> {
829        let streams = self.streams.read().await;
830        streams.get(name).cloned()
831    }
832
833    /// List all streams
834    pub async fn list_streams(&self) -> Vec<String> {
835        let streams = self.streams.read().await;
836        streams.keys().cloned().collect()
837    }
838
839    /// Get statistics
840    pub async fn get_stats(&self) -> StreamSqlStats {
841        self.stats.read().await.clone()
842    }
843
844    /// Clear query cache
845    pub async fn clear_cache(&self) {
846        let mut cache = self.query_cache.write().await;
847        cache.clear();
848        info!("Query cache cleared");
849    }
850
851    /// Validate a query without executing
852    pub fn validate(&self, sql: &str) -> Result<()> {
853        self.parse(sql)?;
854        Ok(())
855    }
856
857    /// Explain a query
858    pub fn explain(&self, sql: &str) -> Result<String> {
859        let statement = self.parse(sql)?;
860        Ok(format!("{:#?}", statement))
861    }
862}