Skip to main content

oxirs_stream/
stream_sql_tests.rs

1//! # Stream SQL — Tests
2
3#[cfg(test)]
4mod tests {
5    use chrono::Utc;
6    use std::collections::HashMap;
7    use std::time::Duration;
8
9    use crate::stream_sql_ast::{
10        ColumnDefinition, DataType, FromClause, Lexer, StreamMetadata, StreamSqlConfig, Token,
11        WindowType,
12    };
13    use crate::stream_sql_executor::{Parser, StreamSqlEngine};
14
15    #[test]
16    fn test_lexer_basic() {
17        let mut lexer = Lexer::new("SELECT * FROM events");
18        let tokens = lexer.tokenize();
19
20        assert_eq!(tokens.len(), 5);
21        assert_eq!(tokens[0], Token::Select);
22        assert_eq!(tokens[1], Token::Star);
23        assert_eq!(tokens[2], Token::From);
24        assert_eq!(tokens[3], Token::Identifier("events".to_string()));
25        assert_eq!(tokens[4], Token::Eof);
26    }
27
28    #[test]
29    fn test_lexer_with_literals() {
30        let mut lexer = Lexer::new("SELECT name, 42, 'hello' FROM events");
31        let tokens = lexer.tokenize();
32
33        assert!(matches!(tokens[1], Token::Identifier(_)));
34        assert!(matches!(tokens[3], Token::NumberLiteral(_)));
35        assert!(matches!(tokens[5], Token::StringLiteral(_)));
36    }
37
38    #[test]
39    fn test_parser_simple_select() {
40        let mut lexer = Lexer::new("SELECT id, name FROM users WHERE id = 1");
41        let tokens = lexer.tokenize();
42        let mut parser = Parser::new(tokens);
43        let result = parser.parse_select();
44
45        assert!(result.is_ok());
46        let stmt = result.unwrap();
47        assert_eq!(stmt.columns.len(), 2);
48        assert!(stmt.where_clause.is_some());
49    }
50
51    #[test]
52    fn test_parser_aggregate() {
53        let mut lexer = Lexer::new("SELECT COUNT(*), AVG(value) FROM events");
54        let tokens = lexer.tokenize();
55        let mut parser = Parser::new(tokens);
56        let result = parser.parse_select();
57
58        assert!(result.is_ok());
59        let stmt = result.unwrap();
60        assert_eq!(stmt.columns.len(), 2);
61    }
62
63    #[test]
64    fn test_parser_window() {
65        let mut lexer = Lexer::new("SELECT * FROM events WINDOW TUMBLING (SIZE 5 MINUTES)");
66        let tokens = lexer.tokenize();
67        let mut parser = Parser::new(tokens);
68        let result = parser.parse_select();
69
70        assert!(result.is_ok());
71        let stmt = result.unwrap();
72        assert!(stmt.window.is_some());
73        let window = stmt.window.unwrap();
74        assert_eq!(window.window_type, WindowType::Tumbling);
75        assert_eq!(window.size, Duration::from_secs(300));
76    }
77
78    #[test]
79    fn test_parser_group_by() {
80        let mut lexer = Lexer::new("SELECT sensor_id, AVG(temp) FROM sensors GROUP BY sensor_id");
81        let tokens = lexer.tokenize();
82        let mut parser = Parser::new(tokens);
83        let result = parser.parse_select();
84
85        assert!(result.is_ok());
86        let stmt = result.unwrap();
87        assert!(!stmt.group_by.is_empty());
88    }
89
90    #[test]
91    fn test_parser_join() {
92        let mut lexer = Lexer::new("SELECT * FROM a JOIN b ON a.id = b.aid");
93        let tokens = lexer.tokenize();
94        let mut parser = Parser::new(tokens);
95        let result = parser.parse_select();
96
97        assert!(result.is_ok());
98        let stmt = result.unwrap();
99        assert!(matches!(stmt.from, Some(FromClause::Join { .. })));
100    }
101
102    #[tokio::test]
103    async fn test_engine_basic() {
104        let config = StreamSqlConfig::default();
105        let engine = StreamSqlEngine::new(config);
106
107        let result = engine.execute("SELECT * FROM events").await;
108        assert!(result.is_ok());
109
110        let stats = engine.get_stats().await;
111        assert_eq!(stats.queries_executed, 1);
112        assert_eq!(stats.queries_succeeded, 1);
113    }
114
115    #[tokio::test]
116    async fn test_engine_stream_registration() {
117        let config = StreamSqlConfig::default();
118        let engine = StreamSqlEngine::new(config);
119
120        let metadata = StreamMetadata {
121            name: "events".to_string(),
122            columns: vec![
123                ColumnDefinition {
124                    name: "id".to_string(),
125                    data_type: DataType::Integer,
126                    not_null: true,
127                    default: None,
128                },
129                ColumnDefinition {
130                    name: "value".to_string(),
131                    data_type: DataType::Float,
132                    not_null: false,
133                    default: None,
134                },
135            ],
136            properties: HashMap::new(),
137            created_at: Utc::now(),
138        };
139
140        engine.register_stream(metadata).await.unwrap();
141
142        let streams = engine.list_streams().await;
143        assert_eq!(streams.len(), 1);
144        assert!(streams.contains(&"events".to_string()));
145
146        let stream = engine.get_stream("events").await;
147        assert!(stream.is_some());
148        assert_eq!(stream.unwrap().columns.len(), 2);
149
150        engine.unregister_stream("events").await.unwrap();
151        let streams = engine.list_streams().await;
152        assert!(streams.is_empty());
153    }
154
155    #[test]
156    fn test_engine_validate() {
157        let config = StreamSqlConfig::default();
158        let engine = StreamSqlEngine::new(config);
159
160        assert!(engine.validate("SELECT * FROM events").is_ok());
161        assert!(engine.validate("INVALID SQL").is_err());
162    }
163
164    #[test]
165    fn test_engine_explain() {
166        let config = StreamSqlConfig::default();
167        let engine = StreamSqlEngine::new(config);
168
169        let result = engine.explain("SELECT COUNT(*) FROM events WHERE value > 10");
170        assert!(result.is_ok());
171        let explanation = result.unwrap();
172        assert!(!explanation.is_empty());
173    }
174
175    #[tokio::test]
176    async fn test_engine_caching() {
177        let config = StreamSqlConfig {
178            enable_query_cache: true,
179            cache_size: 100,
180            ..Default::default()
181        };
182        let engine = StreamSqlEngine::new(config);
183
184        // First execution - cache miss
185        engine.execute("SELECT * FROM events").await.unwrap();
186
187        // Second execution - cache hit
188        engine.execute("SELECT * FROM events").await.unwrap();
189
190        let stats = engine.get_stats().await;
191        assert_eq!(stats.cache_misses, 1);
192        assert_eq!(stats.cache_hits, 1);
193    }
194
195    #[test]
196    fn test_parser_complex_expression() {
197        let mut lexer = Lexer::new(
198            "SELECT * FROM events WHERE (value > 10 AND status = 'active') OR priority = 1",
199        );
200        let tokens = lexer.tokenize();
201        let mut parser = Parser::new(tokens);
202        let result = parser.parse_select();
203
204        assert!(result.is_ok());
205        let stmt = result.unwrap();
206        assert!(stmt.where_clause.is_some());
207    }
208
209    #[test]
210    fn test_parser_order_by() {
211        let mut lexer = Lexer::new("SELECT * FROM events ORDER BY created_at DESC, id ASC");
212        let tokens = lexer.tokenize();
213        let mut parser = Parser::new(tokens);
214        let result = parser.parse_select();
215
216        assert!(result.is_ok(), "Parse failed: {:?}", result);
217        let stmt = result.unwrap();
218        assert_eq!(stmt.order_by.len(), 2);
219        assert!(!stmt.order_by[0].ascending);
220        assert!(stmt.order_by[1].ascending);
221    }
222
223    #[test]
224    fn test_parser_distinct() {
225        let mut lexer = Lexer::new("SELECT DISTINCT sensor_id FROM readings");
226        let tokens = lexer.tokenize();
227        let mut parser = Parser::new(tokens);
228        let result = parser.parse_select();
229
230        assert!(result.is_ok());
231        let stmt = result.unwrap();
232        assert!(stmt.distinct);
233    }
234
235    #[test]
236    fn test_parser_sliding_window() {
237        let mut lexer =
238            Lexer::new("SELECT * FROM events WINDOW SLIDING (SIZE 10 SECONDS, SLIDE 5 SECONDS)");
239        let tokens = lexer.tokenize();
240        let mut parser = Parser::new(tokens);
241        let result = parser.parse_select();
242
243        assert!(result.is_ok());
244        let stmt = result.unwrap();
245        assert!(stmt.window.is_some());
246        let window = stmt.window.unwrap();
247        assert_eq!(window.window_type, WindowType::Sliding);
248        assert_eq!(window.size, Duration::from_secs(10));
249        assert_eq!(window.slide, Some(Duration::from_secs(5)));
250    }
251}