1#[allow(unused_imports)]
7use alloc::boxed::Box;
8use alloc::rc::Rc;
9use alloc::vec::Vec;
10use cynos_core::{Row, Value};
11use cynos_index::KeyRange;
12use cynos_query::context::{ExecutionContext, IndexInfo, QueryIndexType, TableStats};
13use cynos_query::executor::{DataSource, ExecutionError, ExecutionResult, PhysicalPlanRunner};
14use cynos_query::planner::{LogicalPlan, PhysicalPlan, QueryPlanner};
15use cynos_storage::TableCache;
16
17#[cfg(target_arch = "wasm32")]
18use wasm_bindgen::prelude::*;
19
20#[cfg(target_arch = "wasm32")]
21#[wasm_bindgen]
22extern "C" {
23 #[wasm_bindgen(js_namespace = console)]
24 fn log(s: &str);
25}
26
27pub struct TableCacheDataSource<'a> {
31 cache: &'a TableCache,
32}
33
34impl<'a> TableCacheDataSource<'a> {
35 pub fn new(cache: &'a TableCache) -> Self {
37 Self { cache }
38 }
39}
40
41impl<'a> DataSource for TableCacheDataSource<'a> {
42 fn get_table_rows(&self, table: &str) -> ExecutionResult<Vec<Rc<Row>>> {
43 let store = self
44 .cache
45 .get_table(table)
46 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
47 Ok(store.scan().collect())
49 }
50
51 fn get_index_range(
52 &self,
53 table: &str,
54 index: &str,
55 range_start: Option<&Value>,
56 range_end: Option<&Value>,
57 include_start: bool,
58 include_end: bool,
59 ) -> ExecutionResult<Vec<Rc<Row>>> {
60 self.get_index_range_with_limit(
61 table,
62 index,
63 range_start,
64 range_end,
65 include_start,
66 include_end,
67 None,
68 0,
69 false,
70 )
71 }
72
73 fn get_index_range_with_limit(
74 &self,
75 table: &str,
76 index: &str,
77 range_start: Option<&Value>,
78 range_end: Option<&Value>,
79 include_start: bool,
80 include_end: bool,
81 limit: Option<usize>,
82 offset: usize,
83 reverse: bool,
84 ) -> ExecutionResult<Vec<Rc<Row>>> {
85 let store = self
86 .cache
87 .get_table(table)
88 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
89
90 let range = match (range_start, range_end) {
92 (Some(start), Some(end)) => Some(KeyRange::bound(
93 start.clone(),
94 end.clone(),
95 !include_start,
96 !include_end,
97 )),
98 (Some(start), None) => Some(KeyRange::lower_bound(start.clone(), !include_start)),
99 (None, Some(end)) => Some(KeyRange::upper_bound(end.clone(), !include_end)),
100 (None, None) => None,
101 };
102
103 Ok(store.index_scan_with_options(index, range.as_ref(), limit, offset, reverse))
105 }
106
107 fn get_index_point(&self, table: &str, index: &str, key: &Value) -> ExecutionResult<Vec<Rc<Row>>> {
108 let store = self
109 .cache
110 .get_table(table)
111 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
112
113 let range = KeyRange::only(key.clone());
115
116 Ok(store.index_scan(index, Some(&range)))
117 }
118
119 fn get_index_point_with_limit(
120 &self,
121 table: &str,
122 index: &str,
123 key: &Value,
124 limit: Option<usize>,
125 ) -> ExecutionResult<Vec<Rc<Row>>> {
126 let store = self
127 .cache
128 .get_table(table)
129 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
130
131 let range = KeyRange::only(key.clone());
133
134 Ok(store.index_scan_with_limit(index, Some(&range), limit))
135 }
136
137 fn get_column_count(&self, table: &str) -> ExecutionResult<usize> {
138 let store = self
139 .cache
140 .get_table(table)
141 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
142 Ok(store.schema().columns().len())
143 }
144
145 fn get_gin_index_rows(
146 &self,
147 table: &str,
148 index: &str,
149 key: &str,
150 value: &str,
151 ) -> ExecutionResult<Vec<Rc<Row>>> {
152 let store = self
153 .cache
154 .get_table(table)
155 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
156
157 Ok(store.gin_index_get_by_key_value(index, key, value))
158 }
159
160 fn get_gin_index_rows_by_key(
161 &self,
162 table: &str,
163 index: &str,
164 key: &str,
165 ) -> ExecutionResult<Vec<Rc<Row>>> {
166 let store = self
167 .cache
168 .get_table(table)
169 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
170
171 Ok(store.gin_index_get_by_key(index, key))
172 }
173
174 fn get_gin_index_rows_multi(
175 &self,
176 table: &str,
177 index: &str,
178 pairs: &[(&str, &str)],
179 ) -> ExecutionResult<Vec<Rc<Row>>> {
180 let store = self
181 .cache
182 .get_table(table)
183 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
184
185 Ok(store.gin_index_get_by_key_values_all(index, pairs))
186 }
187}
188
189pub fn build_execution_context(cache: &TableCache, table_name: &str) -> ExecutionContext {
191 let mut ctx = ExecutionContext::new();
192
193 if let Some(store) = cache.get_table(table_name) {
194 let schema = store.schema();
195
196 let mut indexes = Vec::new();
198
199 for idx in schema.indices() {
201 let index_type = match idx.get_index_type() {
202 cynos_core::schema::IndexType::Gin => QueryIndexType::Gin,
203 _ => QueryIndexType::BTree,
204 };
205 indexes.push(
206 IndexInfo::new(
207 idx.name(),
208 idx.columns().iter().map(|c| c.name.clone()).collect(),
209 idx.is_unique(),
210 )
211 .with_type(index_type),
212 );
213 }
214
215 let stats = TableStats {
216 row_count: store.len(),
217 is_sorted: false,
218 indexes,
219 };
220
221 ctx.register_table(table_name, stats);
222 }
223
224 ctx
225}
226
227pub fn execute_plan(
235 cache: &TableCache,
236 table_name: &str,
237 plan: LogicalPlan,
238) -> ExecutionResult<Vec<Rc<Row>>> {
239 execute_plan_internal(cache, table_name, plan, false)
240}
241
242pub fn execute_plan_debug(
244 cache: &TableCache,
245 table_name: &str,
246 plan: LogicalPlan,
247) -> ExecutionResult<Vec<Rc<Row>>> {
248 execute_plan_internal(cache, table_name, plan, true)
249}
250
251fn execute_plan_internal(
252 cache: &TableCache,
253 table_name: &str,
254 plan: LogicalPlan,
255 _debug: bool,
256) -> ExecutionResult<Vec<Rc<Row>>> {
257 let ctx = build_execution_context(cache, table_name);
259
260 let planner = QueryPlanner::new(ctx);
262
263 let physical_plan = planner.plan(plan);
265
266 let data_source = TableCacheDataSource::new(cache);
268 let runner = PhysicalPlanRunner::new(&data_source);
269 let relation = runner.execute(&physical_plan)?;
270
271 Ok(relation.entries.into_iter().map(|e| e.row).collect())
273}
274
275pub fn compile_plan(
278 cache: &TableCache,
279 table_name: &str,
280 plan: LogicalPlan,
281) -> PhysicalPlan {
282 let ctx = build_execution_context(cache, table_name);
284
285 let planner = QueryPlanner::new(ctx);
287 planner.plan(plan)
288}
289
290#[derive(Debug)]
292pub struct ExplainResult {
293 pub logical_plan: String,
294 pub optimized_plan: String,
295 pub physical_plan: String,
296}
297
298pub fn explain_plan(
302 cache: &TableCache,
303 table_name: &str,
304 plan: LogicalPlan,
305) -> ExplainResult {
306 let logical_plan = alloc::format!("{:#?}", plan);
307
308 let ctx = build_execution_context(cache, table_name);
310
311 let planner = QueryPlanner::new(ctx);
313
314 let optimized_plan_node = planner.optimize_logical(plan.clone());
316 let optimized_plan = alloc::format!("{:#?}", optimized_plan_node);
317
318 let physical_plan_node = planner.plan(plan);
320 let physical_plan = alloc::format!("{:#?}", physical_plan_node);
321
322 ExplainResult {
323 logical_plan,
324 optimized_plan,
325 physical_plan,
326 }
327}
328
329pub fn execute_physical_plan(
332 cache: &TableCache,
333 physical_plan: &PhysicalPlan,
334) -> ExecutionResult<Vec<Rc<Row>>> {
335 let data_source = TableCacheDataSource::new(cache);
336 let runner = PhysicalPlanRunner::new(&data_source);
337 let relation = runner.execute(physical_plan)?;
338
339 Ok(relation.entries.into_iter().map(|e| e.row).collect())
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use cynos_query::ast::Expr as AstExpr;
347 use cynos_query::optimizer::{IndexSelection, OptimizerPass};
348
349 #[test]
350 fn test_table_cache_data_source() {
351 let cache = TableCache::new();
353 let _data_source = TableCacheDataSource::new(&cache);
354 }
355
356 #[test]
357 fn test_index_selection_with_empty_table_name() {
358 let mut ctx = ExecutionContext::new();
361 ctx.register_table(
362 "tasks",
363 TableStats {
364 row_count: 100000,
365 is_sorted: false,
366 indexes: alloc::vec![
367 IndexInfo::new("idx_status", alloc::vec!["status".into()], false),
368 IndexInfo::new("idx_priority", alloc::vec!["priority".into()], false),
369 ],
370 },
371 );
372
373 let pass = IndexSelection::with_context(ctx);
374
375 let plan = LogicalPlan::Filter {
377 input: Box::new(LogicalPlan::Scan {
378 table: "tasks".into(),
379 }),
380 predicate: AstExpr::eq(
381 AstExpr::column("", "status", 2), AstExpr::literal(cynos_core::Value::String("todo".into())),
383 ),
384 };
385
386 let optimized = pass.optimize(plan.clone());
387
388 println!("Input plan: {:?}", plan);
390 println!("Optimized plan: {:?}", optimized);
391
392 assert!(
394 matches!(optimized, LogicalPlan::IndexGet { .. }),
395 "Expected IndexGet but got {:?}",
396 optimized
397 );
398 }
399
400 #[test]
401 fn test_full_optimizer_pipeline() {
402 let mut ctx = ExecutionContext::new();
404 ctx.register_table(
405 "tasks",
406 TableStats {
407 row_count: 100000,
408 is_sorted: false,
409 indexes: alloc::vec![
410 IndexInfo::new("idx_status", alloc::vec!["status".into()], false),
411 IndexInfo::new("idx_priority", alloc::vec!["priority".into()], false),
412 ],
413 },
414 );
415
416 let planner = QueryPlanner::new(ctx);
418
419 let plan = LogicalPlan::Filter {
421 input: Box::new(LogicalPlan::Scan {
422 table: "tasks".into(),
423 }),
424 predicate: AstExpr::eq(
425 AstExpr::column("", "status", 2),
426 AstExpr::literal(cynos_core::Value::String("todo".into())),
427 ),
428 };
429
430 println!("Input plan: {:?}", plan);
431
432 let optimized = planner.optimize_logical(plan.clone());
434 println!("After optimize_logical(): {:?}", optimized);
435
436 let physical = planner.plan(plan);
438 println!("Physical plan: {:?}", physical);
439
440 assert!(
442 matches!(optimized, LogicalPlan::IndexGet { .. }),
443 "Expected IndexGet but got {:?}",
444 optimized
445 );
446 }
447
448 #[test]
449 fn test_end_to_end_with_real_table() {
450 use cynos_core::schema::TableBuilder;
451 use cynos_core::{DataType, Row, Value};
452
453 let table = TableBuilder::new("tasks")
455 .unwrap()
456 .add_column("id", DataType::Int64).unwrap()
457 .add_column("status", DataType::String).unwrap()
458 .add_column("priority", DataType::String).unwrap()
459 .add_primary_key(&["id"], false).unwrap()
460 .add_index("idx_status", &["status"], false).unwrap()
461 .add_index("idx_priority", &["priority"], false).unwrap()
462 .build()
463 .unwrap();
464
465 let mut cache = TableCache::new();
467 cache.create_table(table).unwrap();
468
469 let store = cache.get_table_mut("tasks").unwrap();
471 for i in 0..1000 {
472 let status = if i % 5 == 0 { "todo" } else { "done" };
473 let priority = if i % 4 == 0 { "high" } else { "low" };
474 store.insert(Row::new(
475 i as u64,
476 alloc::vec![
477 Value::Int64(i),
478 Value::String(status.into()),
479 Value::String(priority.into()),
480 ],
481 )).unwrap();
482 }
483
484 let plan = LogicalPlan::Filter {
486 input: Box::new(LogicalPlan::Scan {
487 table: "tasks".into(),
488 }),
489 predicate: AstExpr::eq(
490 AstExpr::column("", "status", 1),
491 AstExpr::literal(Value::String("todo".into())),
492 ),
493 };
494
495 println!("Input plan: {:?}", plan);
496
497 let ctx = build_execution_context(&cache, "tasks");
499 println!("Context indexes: {:?}", ctx.get_stats("tasks").map(|s| &s.indexes));
500
501 let planner = QueryPlanner::new(ctx);
502 let optimized = planner.optimize_logical(plan.clone());
503 println!("Optimized plan: {:?}", optimized);
504
505 let physical = planner.plan(plan.clone());
506 println!("Physical plan: {:?}", physical);
507
508 let result = execute_plan(&cache, "tasks", plan).unwrap();
510
511 println!("Result count: {}", result.len());
512
513 assert_eq!(result.len(), 200, "Expected 200 rows with status='todo'");
515
516 for row in &result {
518 assert_eq!(
519 row.get(1),
520 Some(&Value::String("todo".into())),
521 "All rows should have status='todo'"
522 );
523 }
524 }
525
526 #[test]
527 fn test_execute_plan_with_limit() {
528 use cynos_core::schema::TableBuilder;
529 use cynos_core::{DataType, Row, Value};
530
531 let table = TableBuilder::new("tasks")
533 .unwrap()
534 .add_column("id", DataType::Int64).unwrap()
535 .add_column("status", DataType::String).unwrap()
536 .add_column("priority", DataType::String).unwrap()
537 .add_primary_key(&["id"], false).unwrap()
538 .add_index("idx_status", &["status"], false).unwrap()
539 .build()
540 .unwrap();
541
542 let mut cache = TableCache::new();
544 cache.create_table(table).unwrap();
545
546 let store = cache.get_table_mut("tasks").unwrap();
548 for i in 0..1000 {
549 let status = if i % 5 == 0 { "todo" } else { "done" };
550 store.insert(Row::new(
551 i as u64,
552 alloc::vec![
553 Value::Int64(i),
554 Value::String(status.into()),
555 Value::String("low".into()),
556 ],
557 )).unwrap();
558 }
559
560 let plan = LogicalPlan::Limit {
562 input: Box::new(LogicalPlan::Filter {
563 input: Box::new(LogicalPlan::Scan {
564 table: "tasks".into(),
565 }),
566 predicate: AstExpr::eq(
567 AstExpr::column("", "status", 1),
568 AstExpr::literal(Value::String("todo".into())),
569 ),
570 }),
571 limit: 10,
572 offset: 0,
573 };
574
575 println!("Input plan with LIMIT: {:?}", plan);
576
577 let result = execute_plan(&cache, "tasks", plan).unwrap();
579
580 println!("Result count: {} (expected 10)", result.len());
581
582 assert_eq!(result.len(), 10, "Expected 10 rows due to LIMIT");
584
585 for row in &result {
587 assert_eq!(
588 row.get(1),
589 Some(&Value::String("todo".into())),
590 "All rows should have status='todo'"
591 );
592 }
593 }
594
595 #[test]
596 fn test_order_by_desc_with_index() {
597 use cynos_core::schema::TableBuilder;
598 use cynos_core::{DataType, Row, Value};
599 use cynos_query::ast::SortOrder;
600 use cynos_query::planner::PhysicalPlan;
601
602 let table = TableBuilder::new("scores")
604 .unwrap()
605 .add_column("id", DataType::Int64).unwrap()
606 .add_column("score", DataType::Int64).unwrap()
607 .add_primary_key(&["id"], false).unwrap()
608 .add_index("idx_score", &["score"], false).unwrap()
609 .build()
610 .unwrap();
611
612 let mut cache = TableCache::new();
614 cache.create_table(table).unwrap();
615
616 let store = cache.get_table_mut("scores").unwrap();
618 for i in 1..=5 {
619 store.insert(Row::new(
620 i as u64,
621 alloc::vec![
622 Value::Int64(i),
623 Value::Int64(i * 10),
624 ],
625 )).unwrap();
626 }
627
628 let plan = LogicalPlan::Limit {
630 input: Box::new(LogicalPlan::Sort {
631 input: Box::new(LogicalPlan::Scan {
632 table: "scores".into(),
633 }),
634 order_by: alloc::vec![(AstExpr::column("scores", "score", 1), SortOrder::Desc)],
635 }),
636 limit: 3,
637 offset: 0,
638 };
639
640 println!("Input plan: {:?}", plan);
641
642 let ctx = build_execution_context(&cache, "scores");
644 println!("Context indexes: {:?}", ctx.get_stats("scores").map(|s| &s.indexes));
645
646 let planner = QueryPlanner::new(ctx.clone());
647 let physical = planner.plan(plan.clone());
648 println!("Physical plan (single line): {:?}", physical);
649 println!("Physical plan (pretty): {:#?}", physical);
650 println!("Context indexes: {:?}", ctx.get_stats("scores").map(|s| &s.indexes));
651
652 match &physical {
654 PhysicalPlan::IndexScan { reverse, limit, .. } => {
655 assert!(reverse, "IndexScan should have reverse=true for DESC ordering");
656 assert_eq!(*limit, Some(3), "IndexScan should have limit=3");
657 }
658 _ => panic!("Expected IndexScan, got {:?}", physical),
659 }
660
661 let result = execute_plan(&cache, "scores", plan).unwrap();
663 println!("Result: {:?}", result.iter().map(|r| r.get(1)).collect::<Vec<_>>());
664
665 assert_eq!(result.len(), 3, "Expected 3 rows");
666 assert_eq!(result[0].get(1), Some(&Value::Int64(50)), "First row should have score=50");
667 assert_eq!(result[1].get(1), Some(&Value::Int64(40)), "Second row should have score=40");
668 assert_eq!(result[2].get(1), Some(&Value::Int64(30)), "Third row should have score=30");
669 }
670
671 #[test]
672 fn test_order_by_asc_with_index() {
673 use cynos_core::schema::TableBuilder;
674 use cynos_core::{DataType, Row, Value};
675 use cynos_query::ast::SortOrder;
676 use cynos_query::planner::PhysicalPlan;
677
678 let table = TableBuilder::new("scores_asc")
680 .unwrap()
681 .add_column("id", DataType::Int64).unwrap()
682 .add_column("score", DataType::Int64).unwrap()
683 .add_primary_key(&["id"], false).unwrap()
684 .add_index("idx_score", &["score"], false).unwrap()
685 .build()
686 .unwrap();
687
688 let mut cache = TableCache::new();
690 cache.create_table(table).unwrap();
691
692 let store = cache.get_table_mut("scores_asc").unwrap();
694 for i in 1..=5 {
695 store.insert(Row::new(
696 i as u64,
697 alloc::vec![
698 Value::Int64(i),
699 Value::Int64(i * 10),
700 ],
701 )).unwrap();
702 }
703
704 let plan = LogicalPlan::Limit {
706 input: Box::new(LogicalPlan::Sort {
707 input: Box::new(LogicalPlan::Scan {
708 table: "scores_asc".into(),
709 }),
710 order_by: alloc::vec![(AstExpr::column("scores_asc", "score", 1), SortOrder::Asc)],
711 }),
712 limit: 3,
713 offset: 0,
714 };
715
716 println!("Input plan: {:?}", plan);
717
718 let ctx = build_execution_context(&cache, "scores_asc");
720 println!("Context indexes: {:?}", ctx.get_stats("scores_asc").map(|s| &s.indexes));
721
722 let planner = QueryPlanner::new(ctx);
723 let physical = planner.plan(plan.clone());
724 println!("Physical plan (single line): {:?}", physical);
725 println!("Physical plan (pretty): {:#?}", physical);
726
727 match &physical {
729 PhysicalPlan::IndexScan { reverse, limit, .. } => {
730 assert!(!reverse, "IndexScan should have reverse=false for ASC ordering");
731 assert_eq!(*limit, Some(3), "IndexScan should have limit=3");
732 }
733 _ => panic!("Expected IndexScan, got {:?}", physical),
734 }
735
736 let result = execute_plan(&cache, "scores_asc", plan).unwrap();
738 println!("Result: {:?}", result.iter().map(|r| r.get(1)).collect::<Vec<_>>());
739
740 assert_eq!(result.len(), 3, "Expected 3 rows");
741 assert_eq!(result[0].get(1), Some(&Value::Int64(10)), "First row should have score=10");
742 assert_eq!(result[1].get(1), Some(&Value::Int64(20)), "Second row should have score=20");
743 assert_eq!(result[2].get(1), Some(&Value::Int64(30)), "Third row should have score=30");
744 }
745}