oxirs_stream/
stream_sql_tests.rs1#[cfg(test)]
4mod tests {
5 use chrono::Utc;
6 use std::collections::HashMap;
7 use std::time::Duration;
8
9 use crate::stream_sql_ast::{
10 ColumnDefinition, DataType, FromClause, Lexer, StreamMetadata, StreamSqlConfig, Token,
11 WindowType,
12 };
13 use crate::stream_sql_executor::{Parser, StreamSqlEngine};
14
15 #[test]
16 fn test_lexer_basic() {
17 let mut lexer = Lexer::new("SELECT * FROM events");
18 let tokens = lexer.tokenize();
19
20 assert_eq!(tokens.len(), 5);
21 assert_eq!(tokens[0], Token::Select);
22 assert_eq!(tokens[1], Token::Star);
23 assert_eq!(tokens[2], Token::From);
24 assert_eq!(tokens[3], Token::Identifier("events".to_string()));
25 assert_eq!(tokens[4], Token::Eof);
26 }
27
28 #[test]
29 fn test_lexer_with_literals() {
30 let mut lexer = Lexer::new("SELECT name, 42, 'hello' FROM events");
31 let tokens = lexer.tokenize();
32
33 assert!(matches!(tokens[1], Token::Identifier(_)));
34 assert!(matches!(tokens[3], Token::NumberLiteral(_)));
35 assert!(matches!(tokens[5], Token::StringLiteral(_)));
36 }
37
38 #[test]
39 fn test_parser_simple_select() {
40 let mut lexer = Lexer::new("SELECT id, name FROM users WHERE id = 1");
41 let tokens = lexer.tokenize();
42 let mut parser = Parser::new(tokens);
43 let result = parser.parse_select();
44
45 assert!(result.is_ok());
46 let stmt = result.unwrap();
47 assert_eq!(stmt.columns.len(), 2);
48 assert!(stmt.where_clause.is_some());
49 }
50
51 #[test]
52 fn test_parser_aggregate() {
53 let mut lexer = Lexer::new("SELECT COUNT(*), AVG(value) FROM events");
54 let tokens = lexer.tokenize();
55 let mut parser = Parser::new(tokens);
56 let result = parser.parse_select();
57
58 assert!(result.is_ok());
59 let stmt = result.unwrap();
60 assert_eq!(stmt.columns.len(), 2);
61 }
62
63 #[test]
64 fn test_parser_window() {
65 let mut lexer = Lexer::new("SELECT * FROM events WINDOW TUMBLING (SIZE 5 MINUTES)");
66 let tokens = lexer.tokenize();
67 let mut parser = Parser::new(tokens);
68 let result = parser.parse_select();
69
70 assert!(result.is_ok());
71 let stmt = result.unwrap();
72 assert!(stmt.window.is_some());
73 let window = stmt.window.unwrap();
74 assert_eq!(window.window_type, WindowType::Tumbling);
75 assert_eq!(window.size, Duration::from_secs(300));
76 }
77
78 #[test]
79 fn test_parser_group_by() {
80 let mut lexer = Lexer::new("SELECT sensor_id, AVG(temp) FROM sensors GROUP BY sensor_id");
81 let tokens = lexer.tokenize();
82 let mut parser = Parser::new(tokens);
83 let result = parser.parse_select();
84
85 assert!(result.is_ok());
86 let stmt = result.unwrap();
87 assert!(!stmt.group_by.is_empty());
88 }
89
90 #[test]
91 fn test_parser_join() {
92 let mut lexer = Lexer::new("SELECT * FROM a JOIN b ON a.id = b.aid");
93 let tokens = lexer.tokenize();
94 let mut parser = Parser::new(tokens);
95 let result = parser.parse_select();
96
97 assert!(result.is_ok());
98 let stmt = result.unwrap();
99 assert!(matches!(stmt.from, Some(FromClause::Join { .. })));
100 }
101
102 #[tokio::test]
103 async fn test_engine_basic() {
104 let config = StreamSqlConfig::default();
105 let engine = StreamSqlEngine::new(config);
106
107 let result = engine.execute("SELECT * FROM events").await;
108 assert!(result.is_ok());
109
110 let stats = engine.get_stats().await;
111 assert_eq!(stats.queries_executed, 1);
112 assert_eq!(stats.queries_succeeded, 1);
113 }
114
115 #[tokio::test]
116 async fn test_engine_stream_registration() {
117 let config = StreamSqlConfig::default();
118 let engine = StreamSqlEngine::new(config);
119
120 let metadata = StreamMetadata {
121 name: "events".to_string(),
122 columns: vec![
123 ColumnDefinition {
124 name: "id".to_string(),
125 data_type: DataType::Integer,
126 not_null: true,
127 default: None,
128 },
129 ColumnDefinition {
130 name: "value".to_string(),
131 data_type: DataType::Float,
132 not_null: false,
133 default: None,
134 },
135 ],
136 properties: HashMap::new(),
137 created_at: Utc::now(),
138 };
139
140 engine.register_stream(metadata).await.unwrap();
141
142 let streams = engine.list_streams().await;
143 assert_eq!(streams.len(), 1);
144 assert!(streams.contains(&"events".to_string()));
145
146 let stream = engine.get_stream("events").await;
147 assert!(stream.is_some());
148 assert_eq!(stream.unwrap().columns.len(), 2);
149
150 engine.unregister_stream("events").await.unwrap();
151 let streams = engine.list_streams().await;
152 assert!(streams.is_empty());
153 }
154
155 #[test]
156 fn test_engine_validate() {
157 let config = StreamSqlConfig::default();
158 let engine = StreamSqlEngine::new(config);
159
160 assert!(engine.validate("SELECT * FROM events").is_ok());
161 assert!(engine.validate("INVALID SQL").is_err());
162 }
163
164 #[test]
165 fn test_engine_explain() {
166 let config = StreamSqlConfig::default();
167 let engine = StreamSqlEngine::new(config);
168
169 let result = engine.explain("SELECT COUNT(*) FROM events WHERE value > 10");
170 assert!(result.is_ok());
171 let explanation = result.unwrap();
172 assert!(!explanation.is_empty());
173 }
174
175 #[tokio::test]
176 async fn test_engine_caching() {
177 let config = StreamSqlConfig {
178 enable_query_cache: true,
179 cache_size: 100,
180 ..Default::default()
181 };
182 let engine = StreamSqlEngine::new(config);
183
184 engine.execute("SELECT * FROM events").await.unwrap();
186
187 engine.execute("SELECT * FROM events").await.unwrap();
189
190 let stats = engine.get_stats().await;
191 assert_eq!(stats.cache_misses, 1);
192 assert_eq!(stats.cache_hits, 1);
193 }
194
195 #[test]
196 fn test_parser_complex_expression() {
197 let mut lexer = Lexer::new(
198 "SELECT * FROM events WHERE (value > 10 AND status = 'active') OR priority = 1",
199 );
200 let tokens = lexer.tokenize();
201 let mut parser = Parser::new(tokens);
202 let result = parser.parse_select();
203
204 assert!(result.is_ok());
205 let stmt = result.unwrap();
206 assert!(stmt.where_clause.is_some());
207 }
208
209 #[test]
210 fn test_parser_order_by() {
211 let mut lexer = Lexer::new("SELECT * FROM events ORDER BY created_at DESC, id ASC");
212 let tokens = lexer.tokenize();
213 let mut parser = Parser::new(tokens);
214 let result = parser.parse_select();
215
216 assert!(result.is_ok(), "Parse failed: {:?}", result);
217 let stmt = result.unwrap();
218 assert_eq!(stmt.order_by.len(), 2);
219 assert!(!stmt.order_by[0].ascending);
220 assert!(stmt.order_by[1].ascending);
221 }
222
223 #[test]
224 fn test_parser_distinct() {
225 let mut lexer = Lexer::new("SELECT DISTINCT sensor_id FROM readings");
226 let tokens = lexer.tokenize();
227 let mut parser = Parser::new(tokens);
228 let result = parser.parse_select();
229
230 assert!(result.is_ok());
231 let stmt = result.unwrap();
232 assert!(stmt.distinct);
233 }
234
235 #[test]
236 fn test_parser_sliding_window() {
237 let mut lexer =
238 Lexer::new("SELECT * FROM events WINDOW SLIDING (SIZE 10 SECONDS, SLIDE 5 SECONDS)");
239 let tokens = lexer.tokenize();
240 let mut parser = Parser::new(tokens);
241 let result = parser.parse_select();
242
243 assert!(result.is_ok());
244 let stmt = result.unwrap();
245 assert!(stmt.window.is_some());
246 let window = stmt.window.unwrap();
247 assert_eq!(window.window_type, WindowType::Sliding);
248 assert_eq!(window.size, Duration::from_secs(10));
249 assert_eq!(window.slide, Some(Duration::from_secs(5)));
250 }
251}