1pub mod execution;
4
5use crate::execution::physical_plan::PhysicalPlan;
6use crate::execution::metrics::ExecutionMetrics;
7use kotoba_core::types::Value;
9
10use kotoba_storage::KeyValueStore;
11use std::sync::Arc;
12
13type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
15
16#[async_trait::async_trait]
17pub trait QueryExecutor<T: KeyValueStore + 'static>: Send + Sync {
18 async fn execute(&self, plan: PhysicalPlan) -> Result<Vec<Value>>;
19}
20
21pub struct DefaultQueryExecutor<T: KeyValueStore + 'static> {
22 storage: Arc<T>,
23}
24
25impl<T: KeyValueStore + 'static> DefaultQueryExecutor<T> {
26 pub fn new(storage: Arc<T>) -> Self {
27 Self { storage }
28 }
29}
30
31#[async_trait::async_trait]
32impl<T: KeyValueStore + 'static> QueryExecutor<T> for DefaultQueryExecutor<T> {
33 async fn execute(&self, plan: PhysicalPlan) -> Result<Vec<Value>> {
34 let mut metrics = ExecutionMetrics::new();
35 Ok(vec![])
38 }
39}
40
41pub mod planner;
42pub mod prelude {
43 pub use crate::execution::*;
45 pub use crate::planner::*;
46 }
49
50#[cfg(test)]
51mod tests {
52 use super::*;
53 use kotoba_core::types::*;
54 use kotoba_core::ir::*;
55 use kotoba_memory::MemoryKeyValueStore;
56 use std::collections::HashMap;
57 use std::sync::Arc;
58
59 struct MockKeyValueStore {
61 data: HashMap<Vec<u8>, Vec<u8>>,
62 }
63
64 impl MockKeyValueStore {
65 fn new() -> Self {
66 Self {
67 data: HashMap::new(),
68 }
69 }
70 }
71
72 #[async_trait::async_trait]
73 impl KeyValueStore for MockKeyValueStore {
74 async fn put(&self, _key: &[u8], _value: &[u8]) -> anyhow::Result<()> {
75 Ok(())
76 }
77
78 async fn get(&self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
79 Ok(None)
80 }
81
82 async fn delete(&self, _key: &[u8]) -> anyhow::Result<()> {
83 Ok(())
84 }
85
86 async fn scan(&self, _prefix: &[u8]) -> anyhow::Result<Vec<(Vec<u8>, Vec<u8>)>> {
87 Ok(vec![])
88 }
89 }
90
91 #[test]
92 fn test_column_creation() {
93 let column = Column::new("id".to_string(), "String".to_string());
94 assert_eq!(column.name, "id");
95 assert_eq!(column.data_type, "String");
96 }
97
98 #[test]
99 fn test_physical_plan_creation() {
100 let column = Column::new("name".to_string(), "String".to_string());
101 let scan_op = PhysicalOp::Scan {
102 table: "users".to_string(),
103 filter: None,
104 projection: vec!["name".to_string()],
105 };
106
107 let plan = PhysicalPlan::new(scan_op, vec![column]);
108 assert_eq!(plan.schema.len(), 1);
109 assert_eq!(plan.schema[0].name, "name");
110 }
111
112 #[test]
113 fn test_physical_op_scan() {
114 let scan_op = PhysicalOp::Scan {
115 table: "users".to_string(),
116 filter: Some(Expr::Const(Value::Int(1))),
117 projection: vec!["id".to_string(), "name".to_string()],
118 };
119
120 match scan_op {
121 PhysicalOp::Scan { table, filter, projection } => {
122 assert_eq!(table, "users");
123 assert!(filter.is_some());
124 assert_eq!(projection.len(), 2);
125 }
126 _ => panic!("Expected Scan operation"),
127 }
128 }
129
130 #[test]
131 fn test_physical_op_filter() {
132 let filter_op = PhysicalOp::Filter {
133 input: Box::new(PhysicalOp::Scan {
134 table: "users".to_string(),
135 filter: None,
136 projection: vec!["name".to_string()],
137 }),
138 condition: Expr::Const(Value::Boolean(true)),
139 };
140
141 match filter_op {
142 PhysicalOp::Filter { input, condition } => {
143 match *input {
144 PhysicalOp::Scan { table, .. } => assert_eq!(table, "users"),
145 _ => panic!("Expected nested Scan operation"),
146 }
147 assert!(matches!(condition, Expr::Const(Value::Boolean(true))));
148 }
149 _ => panic!("Expected Filter operation"),
150 }
151 }
152
153 #[test]
154 fn test_physical_op_projection() {
155 let projection_op = PhysicalOp::Projection {
156 input: Box::new(PhysicalOp::Scan {
157 table: "users".to_string(),
158 filter: None,
159 projection: vec!["*".to_string()],
160 }),
161 expressions: vec![
162 (Expr::Var("name".to_string()), "user_name".to_string()),
163 (Expr::Const(Value::String("active".to_string())), "status".to_string()),
164 ],
165 };
166
167 match projection_op {
168 PhysicalOp::Projection { input, expressions } => {
169 match *input {
170 PhysicalOp::Scan { table, .. } => assert_eq!(table, "users"),
171 _ => panic!("Expected nested Scan operation"),
172 }
173 assert_eq!(expressions.len(), 2);
174 assert_eq!(expressions[0].1, "user_name");
175 assert_eq!(expressions[1].1, "status");
176 }
177 _ => panic!("Expected Projection operation"),
178 }
179 }
180
181 #[test]
182 fn test_physical_op_sort() {
183 let sort_op = PhysicalOp::Sort {
184 input: Box::new(PhysicalOp::Scan {
185 table: "products".to_string(),
186 filter: None,
187 projection: vec!["name".to_string(), "price".to_string()],
188 }),
189 order_by: vec![
190 ("price".to_string(), SortDirection::Desc),
191 ("name".to_string(), SortDirection::Asc),
192 ],
193 };
194
195 match sort_op {
196 PhysicalOp::Sort { input, order_by } => {
197 match *input {
198 PhysicalOp::Scan { table, .. } => assert_eq!(table, "products"),
199 _ => panic!("Expected nested Scan operation"),
200 }
201 assert_eq!(order_by.len(), 2);
202 assert_eq!(order_by[0].1, SortDirection::Desc);
203 assert_eq!(order_by[1].1, SortDirection::Asc);
204 }
205 _ => panic!("Expected Sort operation"),
206 }
207 }
208
209 #[test]
210 fn test_physical_op_group_by() {
211 let group_by_op = PhysicalOp::GroupBy {
212 input: Box::new(PhysicalOp::Scan {
213 table: "orders".to_string(),
214 filter: None,
215 projection: vec!["*".to_string()],
216 }),
217 group_by: vec!["customer_id".to_string()],
218 aggregates: vec![
219 AggregateExpr {
220 function: AggregateFunction::Sum,
221 argument: Expr::Var("amount".to_string()),
222 alias: "total_amount".to_string(),
223 },
224 AggregateExpr {
225 function: AggregateFunction::Count,
226 argument: Expr::Const(Value::Int(1)),
227 alias: "order_count".to_string(),
228 },
229 ],
230 };
231
232 match group_by_op {
233 PhysicalOp::GroupBy { input, group_by, aggregates } => {
234 match *input {
235 PhysicalOp::Scan { table, .. } => assert_eq!(table, "orders"),
236 _ => panic!("Expected nested Scan operation"),
237 }
238 assert_eq!(group_by.len(), 1);
239 assert_eq!(group_by[0], "customer_id");
240 assert_eq!(aggregates.len(), 2);
241 assert_eq!(aggregates[0].function, AggregateFunction::Sum);
242 assert_eq!(aggregates[1].function, AggregateFunction::Count);
243 }
244 _ => panic!("Expected GroupBy operation"),
245 }
246 }
247
248 #[test]
249 fn test_physical_op_join() {
250 let join_op = PhysicalOp::Join {
251 left: Box::new(PhysicalOp::Scan {
252 table: "users".to_string(),
253 filter: None,
254 projection: vec!["id".to_string(), "name".to_string()],
255 }),
256 right: Box::new(PhysicalOp::Scan {
257 table: "orders".to_string(),
258 filter: None,
259 projection: vec!["user_id".to_string(), "amount".to_string()],
260 }),
261 join_type: JoinType::Inner,
262 condition: Expr::Binary {
263 left: Box::new(Expr::Var("id".to_string())),
264 op: BinaryOp::Eq,
265 right: Box::new(Expr::Var("user_id".to_string())),
266 },
267 };
268
269 match join_op {
270 PhysicalOp::Join { left, right, join_type, condition } => {
271 match *left {
272 PhysicalOp::Scan { table, .. } => assert_eq!(table, "users"),
273 _ => panic!("Expected left Scan operation"),
274 }
275 match *right {
276 PhysicalOp::Scan { table, .. } => assert_eq!(table, "orders"),
277 _ => panic!("Expected right Scan operation"),
278 }
279 assert_eq!(join_type, JoinType::Inner);
280 assert!(matches!(condition, Expr::Binary { .. }));
281 }
282 _ => panic!("Expected Join operation"),
283 }
284 }
285
286 #[test]
287 fn test_physical_op_union() {
288 let union_op = PhysicalOp::Union {
289 left: Box::new(PhysicalOp::Scan {
290 table: "active_users".to_string(),
291 filter: None,
292 projection: vec!["id".to_string(), "name".to_string()],
293 }),
294 right: Box::new(PhysicalOp::Scan {
295 table: "inactive_users".to_string(),
296 filter: None,
297 projection: vec!["id".to_string(), "name".to_string()],
298 }),
299 };
300
301 match union_op {
302 PhysicalOp::Union { left, right } => {
303 match *left {
304 PhysicalOp::Scan { table, .. } => assert_eq!(table, "active_users"),
305 _ => panic!("Expected left Scan operation"),
306 }
307 match *right {
308 PhysicalOp::Scan { table, .. } => assert_eq!(table, "inactive_users"),
309 _ => panic!("Expected right Scan operation"),
310 }
311 }
312 _ => panic!("Expected Union operation"),
313 }
314 }
315
316 #[test]
317 fn test_physical_op_limit() {
318 let limit_op = PhysicalOp::Limit {
319 input: Box::new(PhysicalOp::Scan {
320 table: "products".to_string(),
321 filter: None,
322 projection: vec!["*".to_string()],
323 }),
324 limit: 100,
325 offset: 50,
326 };
327
328 match limit_op {
329 PhysicalOp::Limit { input, limit, offset } => {
330 match *input {
331 PhysicalOp::Scan { table, .. } => assert_eq!(table, "products"),
332 _ => panic!("Expected nested Scan operation"),
333 }
334 assert_eq!(limit, 100);
335 assert_eq!(offset, 50);
336 }
337 _ => panic!("Expected Limit operation"),
338 }
339 }
340
341 #[test]
342 fn test_aggregate_expr_creation() {
343 let aggregate_expr = AggregateExpr {
344 function: AggregateFunction::Avg,
345 argument: Expr::Var("price".to_string()),
346 alias: "avg_price".to_string(),
347 };
348
349 assert_eq!(aggregate_expr.function, AggregateFunction::Avg);
350 assert_eq!(aggregate_expr.alias, "avg_price");
351 match aggregate_expr.argument {
352 Expr::Var(var) => assert_eq!(var, "price"),
353 _ => panic!("Expected Var expression"),
354 }
355 }
356
357 #[test]
358 fn test_sort_direction() {
359 assert_eq!(SortDirection::Asc as u8, 0);
360 assert_eq!(SortDirection::Desc as u8, 1);
361 }
362
363 #[test]
364 fn test_join_type() {
365 assert_eq!(JoinType::Inner as u8, 0);
366 assert_eq!(JoinType::Left as u8, 1);
367 assert_eq!(JoinType::Right as u8, 2);
368 assert_eq!(JoinType::Full as u8, 3);
369 }
370
371 #[test]
372 fn test_aggregate_function() {
373 assert_eq!(AggregateFunction::Count as u8, 0);
374 assert_eq!(AggregateFunction::Sum as u8, 1);
375 assert_eq!(AggregateFunction::Avg as u8, 2);
376 assert_eq!(AggregateFunction::Min as u8, 3);
377 assert_eq!(AggregateFunction::Max as u8, 4);
378 assert_eq!(AggregateFunction::CountDistinct as u8, 5);
379 }
380
381 #[test]
382 fn test_query_executor_creation() {
383 let mock_store = Arc::new(MockKeyValueStore::new());
384 let executor = DefaultQueryExecutor::new(mock_store);
385 assert!(true);
387 }
388
389 #[tokio::test]
390 async fn test_query_executor_execute() {
391 let mock_store = Arc::new(MockKeyValueStore::new());
392 let executor = DefaultQueryExecutor::new(mock_store);
393
394 let scan_op = PhysicalOp::Scan {
396 table: "test".to_string(),
397 filter: None,
398 projection: vec!["id".to_string()],
399 };
400 let plan = PhysicalPlan::new(scan_op, vec![Column::new("id".to_string(), "String".to_string())]);
401
402 let result = executor.execute(plan).await;
403 assert!(result.is_ok());
405 assert!(result.unwrap().is_empty());
406 }
407
408 #[test]
409 fn test_physical_plan_complex() {
410 let plan = PhysicalPlan::new(
412 PhysicalOp::Projection {
413 input: Box::new(PhysicalOp::Sort {
414 input: Box::new(PhysicalOp::Filter {
415 input: Box::new(PhysicalOp::Scan {
416 table: "users".to_string(),
417 filter: None,
418 projection: vec!["id".to_string(), "name".to_string(), "age".to_string()],
419 }),
420 condition: Expr::Binary {
421 left: Box::new(Expr::Var("age".to_string())),
422 op: BinaryOp::Gt,
423 right: Box::new(Expr::Const(Value::Int(18))),
424 },
425 }),
426 order_by: vec![("name".to_string(), SortDirection::Asc)],
427 }),
428 expressions: vec![
429 (Expr::Var("name".to_string()), "full_name".to_string()),
430 (Expr::Var("age".to_string()), "user_age".to_string()),
431 ],
432 },
433 vec![
434 Column::new("full_name".to_string(), "String".to_string()),
435 Column::new("user_age".to_string(), "Int".to_string()),
436 ],
437 );
438
439 match plan.root {
441 PhysicalOp::Projection { expressions, .. } => {
442 assert_eq!(expressions.len(), 2);
443 assert_eq!(expressions[0].1, "full_name");
444 assert_eq!(expressions[1].1, "user_age");
445 }
446 _ => panic!("Expected Projection as root operation"),
447 }
448
449 assert_eq!(plan.schema.len(), 2);
450 assert_eq!(plan.schema[0].name, "full_name");
451 assert_eq!(plan.schema[1].name, "user_age");
452 }
453
454 #[test]
455 fn test_physical_plan_serialization() {
456 let column = Column::new("test_col".to_string(), "String".to_string());
457 let scan_op = PhysicalOp::Scan {
458 table: "test_table".to_string(),
459 filter: Some(Expr::Const(Value::Boolean(true))),
460 projection: vec!["test_col".to_string()],
461 };
462 let plan = PhysicalPlan::new(scan_op, vec![column]);
463
464 let json_result = serde_json::to_string(&plan);
466 assert!(json_result.is_ok());
467 }
468
469 #[test]
470 fn test_physical_plan_clone() {
471 let original_plan = PhysicalPlan::new(
472 PhysicalOp::Scan {
473 table: "users".to_string(),
474 filter: None,
475 projection: vec!["id".to_string()],
476 },
477 vec![Column::new("id".to_string(), "String".to_string())],
478 );
479
480 let cloned_plan = original_plan.clone();
481
482 match cloned_plan.root {
483 PhysicalOp::Scan { table, .. } => assert_eq!(table, "users"),
484 _ => panic!("Expected Scan operation in cloned plan"),
485 }
486
487 assert_eq!(cloned_plan.schema.len(), 1);
488 assert_eq!(cloned_plan.schema[0].name, "id");
489 }
490
491 #[test]
492 fn test_physical_plan_debug() {
493 let plan = PhysicalPlan::new(
494 PhysicalOp::Limit {
495 input: Box::new(PhysicalOp::Scan {
496 table: "test".to_string(),
497 filter: None,
498 projection: vec!["col".to_string()],
499 }),
500 limit: 10,
501 offset: 0,
502 },
503 vec![Column::new("col".to_string(), "String".to_string())],
504 );
505
506 let debug_str = format!("{:?}", plan);
507 assert!(debug_str.contains("Limit"));
508 assert!(debug_str.contains("Scan"));
509 assert!(debug_str.contains("test"));
510 }
511
512 #[test]
513 fn test_column_debug() {
514 let column = Column::new("test_column".to_string(), "Integer".to_string());
515 let debug_str = format!("{:?}", column);
516 assert!(debug_str.contains("test_column"));
517 assert!(debug_str.contains("Integer"));
518 }
519
520 #[test]
521 fn test_column_clone() {
522 let original = Column::new("original".to_string(), "String".to_string());
523 let cloned = original.clone();
524 assert_eq!(original.name, cloned.name);
525 assert_eq!(original.data_type, cloned.data_type);
526 }
527
528 #[test]
529 fn test_column_equality() {
530 let col1 = Column::new("test".to_string(), "String".to_string());
531 let col2 = Column::new("test".to_string(), "String".to_string());
532 let col3 = Column::new("different".to_string(), "String".to_string());
533
534 assert_eq!(col1, col2);
535 assert_ne!(col1, col3);
536 }
537
538 #[tokio::test]
539 async fn test_query_executor_with_memory_store() {
540 let memory_store = Arc::new(MemoryKeyValueStore::new());
541 let executor = DefaultQueryExecutor::new(memory_store);
542
543 let scan_op = PhysicalOp::Scan {
545 table: "test_table".to_string(),
546 filter: None,
547 projection: vec!["id".to_string()],
548 };
549 let plan = PhysicalPlan::new(scan_op, vec![Column::new("id".to_string(), "String".to_string())]);
550
551 let result = executor.execute(plan).await;
552 assert!(result.is_ok());
553 assert!(result.unwrap().is_empty());
555 }
556
557 #[test]
558 fn test_physical_plan_edge_cases() {
559 let plan_empty_schema = PhysicalPlan::new(
561 PhysicalOp::Scan {
562 table: "test".to_string(),
563 filter: None,
564 projection: vec![],
565 },
566 vec![],
567 );
568 assert_eq!(plan_empty_schema.schema.len(), 0);
569
570 let complex_op = PhysicalOp::Limit {
572 input: Box::new(PhysicalOp::Union {
573 left: Box::new(PhysicalOp::Scan {
574 table: "table1".to_string(),
575 filter: None,
576 projection: vec!["col".to_string()],
577 }),
578 right: Box::new(PhysicalOp::Scan {
579 table: "table2".to_string(),
580 filter: None,
581 projection: vec!["col".to_string()],
582 }),
583 }),
584 limit: 1000,
585 offset: 0,
586 };
587
588 match complex_op {
589 PhysicalOp::Limit { input, limit, offset } => {
590 assert_eq!(limit, 1000);
591 assert_eq!(offset, 0);
592 match *input {
593 PhysicalOp::Union { .. } => {
594 }
596 _ => panic!("Expected Union operation"),
597 }
598 }
599 _ => panic!("Expected Limit operation"),
600 }
601 }
602
603 #[test]
604 fn test_physical_op_variants_coverage() {
605 let scan = PhysicalOp::Scan {
609 table: "t".to_string(),
610 filter: None,
611 projection: vec![],
612 };
613 assert!(matches!(scan, PhysicalOp::Scan { .. }));
614
615 let filter = PhysicalOp::Filter {
617 input: Box::new(scan),
618 condition: Expr::Const(Value::Boolean(true)),
619 };
620 assert!(matches!(filter, PhysicalOp::Filter { .. }));
621
622 let projection = PhysicalOp::Projection {
624 input: Box::new(PhysicalOp::Scan {
625 table: "t".to_string(),
626 filter: None,
627 projection: vec![],
628 }),
629 expressions: vec![],
630 };
631 assert!(matches!(projection, PhysicalOp::Projection { .. }));
632
633 let sort = PhysicalOp::Sort {
635 input: Box::new(PhysicalOp::Scan {
636 table: "t".to_string(),
637 filter: None,
638 projection: vec![],
639 }),
640 order_by: vec![],
641 };
642 assert!(matches!(sort, PhysicalOp::Sort { .. }));
643
644 let group_by = PhysicalOp::GroupBy {
646 input: Box::new(PhysicalOp::Scan {
647 table: "t".to_string(),
648 filter: None,
649 projection: vec![],
650 }),
651 group_by: vec![],
652 aggregates: vec![],
653 };
654 assert!(matches!(group_by, PhysicalOp::GroupBy { .. }));
655
656 let join = PhysicalOp::Join {
658 left: Box::new(PhysicalOp::Scan {
659 table: "t1".to_string(),
660 filter: None,
661 projection: vec![],
662 }),
663 right: Box::new(PhysicalOp::Scan {
664 table: "t2".to_string(),
665 filter: None,
666 projection: vec![],
667 }),
668 join_type: JoinType::Inner,
669 condition: Expr::Const(Value::Boolean(true)),
670 };
671 assert!(matches!(join, PhysicalOp::Join { .. }));
672
673 let union = PhysicalOp::Union {
675 left: Box::new(PhysicalOp::Scan {
676 table: "t1".to_string(),
677 filter: None,
678 projection: vec![],
679 }),
680 right: Box::new(PhysicalOp::Scan {
681 table: "t2".to_string(),
682 filter: None,
683 projection: vec![],
684 }),
685 };
686 assert!(matches!(union, PhysicalOp::Union { .. }));
687
688 let limit = PhysicalOp::Limit {
690 input: Box::new(PhysicalOp::Scan {
691 table: "t".to_string(),
692 filter: None,
693 projection: vec![],
694 }),
695 limit: 10,
696 offset: 0,
697 };
698 assert!(matches!(limit, PhysicalOp::Limit { .. }));
699 }
700
701 #[test]
702 fn test_aggregate_function_coverage() {
703 let count = AggregateFunction::Count;
705 let sum = AggregateFunction::Sum;
706 let avg = AggregateFunction::Avg;
707 let min = AggregateFunction::Min;
708 let max = AggregateFunction::Max;
709 let count_distinct = AggregateFunction::CountDistinct;
710
711 assert!(format!("{:?}", count).contains("Count"));
713 assert!(format!("{:?}", sum).contains("Sum"));
714 assert!(format!("{:?}", avg).contains("Avg"));
715 assert!(format!("{:?}", min).contains("Min"));
716 assert!(format!("{:?}", max).contains("Max"));
717 assert!(format!("{:?}", count_distinct).contains("CountDistinct"));
718 }
719
720 #[test]
721 fn test_sort_direction_coverage() {
722 let asc = SortDirection::Asc;
723 let desc = SortDirection::Desc;
724
725 assert!(format!("{:?}", asc).contains("Asc"));
726 assert!(format!("{:?}", desc).contains("Desc"));
727 }
728
729 #[test]
730 fn test_join_type_coverage() {
731 let inner = JoinType::Inner;
732 let left = JoinType::Left;
733 let right = JoinType::Right;
734 let full = JoinType::Full;
735
736 assert!(format!("{:?}", inner).contains("Inner"));
737 assert!(format!("{:?}", left).contains("Left"));
738 assert!(format!("{:?}", right).contains("Right"));
739 assert!(format!("{:?}", full).contains("Full"));
740 }
741}