Skip to main content

cynos_database/
query_engine.rs

1//! Query engine integration for API layer.
2//!
3//! This module bridges the storage layer with the query engine,
4//! providing optimized query execution using indexes.
5
6#[allow(unused_imports)]
7use alloc::boxed::Box;
8use alloc::rc::Rc;
9use alloc::vec::Vec;
10use cynos_core::{Row, Value};
11use cynos_index::KeyRange;
12use cynos_query::context::{ExecutionContext, IndexInfo, QueryIndexType, TableStats};
13use cynos_query::executor::{DataSource, ExecutionError, ExecutionResult, PhysicalPlanRunner};
14use cynos_query::planner::{LogicalPlan, PhysicalPlan, QueryPlanner};
15use cynos_storage::TableCache;
16
17#[cfg(target_arch = "wasm32")]
18use wasm_bindgen::prelude::*;
19
20#[cfg(target_arch = "wasm32")]
21#[wasm_bindgen]
22extern "C" {
23    #[wasm_bindgen(js_namespace = console)]
24    fn log(s: &str);
25}
26
27/// DataSource implementation for TableCache.
28///
29/// This allows the query engine to access table data and indexes.
30pub struct TableCacheDataSource<'a> {
31    cache: &'a TableCache,
32}
33
34impl<'a> TableCacheDataSource<'a> {
35    /// Creates a new data source from a TableCache reference.
36    pub fn new(cache: &'a TableCache) -> Self {
37        Self { cache }
38    }
39}
40
41impl<'a> DataSource for TableCacheDataSource<'a> {
42    fn get_table_rows(&self, table: &str) -> ExecutionResult<Vec<Rc<Row>>> {
43        let store = self
44            .cache
45            .get_table(table)
46            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
47        // Rc::clone is cheap (just increment ref count)
48        Ok(store.scan().collect())
49    }
50
51    fn get_index_range(
52        &self,
53        table: &str,
54        index: &str,
55        range_start: Option<&Value>,
56        range_end: Option<&Value>,
57        include_start: bool,
58        include_end: bool,
59    ) -> ExecutionResult<Vec<Rc<Row>>> {
60        self.get_index_range_with_limit(
61            table,
62            index,
63            range_start,
64            range_end,
65            include_start,
66            include_end,
67            None,
68            0,
69            false,
70        )
71    }
72
73    fn get_index_range_with_limit(
74        &self,
75        table: &str,
76        index: &str,
77        range_start: Option<&Value>,
78        range_end: Option<&Value>,
79        include_start: bool,
80        include_end: bool,
81        limit: Option<usize>,
82        offset: usize,
83        reverse: bool,
84    ) -> ExecutionResult<Vec<Rc<Row>>> {
85        let store = self
86            .cache
87            .get_table(table)
88            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
89
90        // Build KeyRange from bounds
91        let range = match (range_start, range_end) {
92            (Some(start), Some(end)) => Some(KeyRange::bound(
93                start.clone(),
94                end.clone(),
95                !include_start,
96                !include_end,
97            )),
98            (Some(start), None) => Some(KeyRange::lower_bound(start.clone(), !include_start)),
99            (None, Some(end)) => Some(KeyRange::upper_bound(end.clone(), !include_end)),
100            (None, None) => None,
101        };
102
103        // Push limit, offset, and reverse down to storage layer for early termination
104        Ok(store.index_scan_with_options(index, range.as_ref(), limit, offset, reverse))
105    }
106
107    fn get_index_point(&self, table: &str, index: &str, key: &Value) -> ExecutionResult<Vec<Rc<Row>>> {
108        let store = self
109            .cache
110            .get_table(table)
111            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
112
113        // Use index_scan with a point range (key == key)
114        let range = KeyRange::only(key.clone());
115
116        Ok(store.index_scan(index, Some(&range)))
117    }
118
119    fn get_index_point_with_limit(
120        &self,
121        table: &str,
122        index: &str,
123        key: &Value,
124        limit: Option<usize>,
125    ) -> ExecutionResult<Vec<Rc<Row>>> {
126        let store = self
127            .cache
128            .get_table(table)
129            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
130
131        // Use index_scan_with_limit for early termination
132        let range = KeyRange::only(key.clone());
133
134        Ok(store.index_scan_with_limit(index, Some(&range), limit))
135    }
136
137    fn get_column_count(&self, table: &str) -> ExecutionResult<usize> {
138        let store = self
139            .cache
140            .get_table(table)
141            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
142        Ok(store.schema().columns().len())
143    }
144
145    fn get_gin_index_rows(
146        &self,
147        table: &str,
148        index: &str,
149        key: &str,
150        value: &str,
151    ) -> ExecutionResult<Vec<Rc<Row>>> {
152        let store = self
153            .cache
154            .get_table(table)
155            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
156
157        Ok(store.gin_index_get_by_key_value(index, key, value))
158    }
159
160    fn get_gin_index_rows_by_key(
161        &self,
162        table: &str,
163        index: &str,
164        key: &str,
165    ) -> ExecutionResult<Vec<Rc<Row>>> {
166        let store = self
167            .cache
168            .get_table(table)
169            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
170
171        Ok(store.gin_index_get_by_key(index, key))
172    }
173
174    fn get_gin_index_rows_multi(
175        &self,
176        table: &str,
177        index: &str,
178        pairs: &[(&str, &str)],
179    ) -> ExecutionResult<Vec<Rc<Row>>> {
180        let store = self
181            .cache
182            .get_table(table)
183            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
184
185        Ok(store.gin_index_get_by_key_values_all(index, pairs))
186    }
187}
188
189/// Builds ExecutionContext from TableCache for optimizer.
190pub fn build_execution_context(cache: &TableCache, table_name: &str) -> ExecutionContext {
191    let mut ctx = ExecutionContext::new();
192
193    if let Some(store) = cache.get_table(table_name) {
194        let schema = store.schema();
195
196        // Collect index information
197        let mut indexes = Vec::new();
198
199        // Add secondary indexes
200        for idx in schema.indices() {
201            let index_type = match idx.get_index_type() {
202                cynos_core::schema::IndexType::Gin => QueryIndexType::Gin,
203                _ => QueryIndexType::BTree,
204            };
205            indexes.push(
206                IndexInfo::new(
207                    idx.name(),
208                    idx.columns().iter().map(|c| c.name.clone()).collect(),
209                    idx.is_unique(),
210                )
211                .with_type(index_type),
212            );
213        }
214
215        let stats = TableStats {
216            row_count: store.len(),
217            is_sorted: false,
218            indexes,
219        };
220
221        ctx.register_table(table_name, stats);
222    }
223
224    ctx
225}
226
227/// Executes a logical plan using the query engine.
228///
229/// This function:
230/// 1. Builds execution context with index information
231/// 2. Creates QueryPlanner with unified optimization pipeline
232/// 3. Plans and optimizes the query (logical + physical)
233/// 4. Executes using PhysicalPlanRunner
234pub fn execute_plan(
235    cache: &TableCache,
236    table_name: &str,
237    plan: LogicalPlan,
238) -> ExecutionResult<Vec<Rc<Row>>> {
239    execute_plan_internal(cache, table_name, plan, false)
240}
241
242/// Executes a logical plan with optional debug output.
243pub fn execute_plan_debug(
244    cache: &TableCache,
245    table_name: &str,
246    plan: LogicalPlan,
247) -> ExecutionResult<Vec<Rc<Row>>> {
248    execute_plan_internal(cache, table_name, plan, true)
249}
250
251fn execute_plan_internal(
252    cache: &TableCache,
253    table_name: &str,
254    plan: LogicalPlan,
255    _debug: bool,
256) -> ExecutionResult<Vec<Rc<Row>>> {
257    // Build execution context with index info
258    let ctx = build_execution_context(cache, table_name);
259
260    // Use unified QueryPlanner for complete optimization pipeline
261    let planner = QueryPlanner::new(ctx);
262
263    // Plan: logical optimization + physical conversion + physical optimization
264    let physical_plan = planner.plan(plan);
265
266    // Execute
267    let data_source = TableCacheDataSource::new(cache);
268    let runner = PhysicalPlanRunner::new(&data_source);
269    let relation = runner.execute(&physical_plan)?;
270
271    // Extract rows from relation entries
272    Ok(relation.entries.into_iter().map(|e| e.row).collect())
273}
274
275/// Compiles a logical plan to a physical plan.
276/// The physical plan can be cached and reused for repeated executions.
277pub fn compile_plan(
278    cache: &TableCache,
279    table_name: &str,
280    plan: LogicalPlan,
281) -> PhysicalPlan {
282    // Build execution context with index info
283    let ctx = build_execution_context(cache, table_name);
284
285    // Use unified QueryPlanner for complete optimization pipeline
286    let planner = QueryPlanner::new(ctx);
287    planner.plan(plan)
288}
289
290/// Query plan explanation result.
291#[derive(Debug)]
292pub struct ExplainResult {
293    pub logical_plan: String,
294    pub optimized_plan: String,
295    pub physical_plan: String,
296}
297
298/// Explains a logical plan by showing the optimization stages.
299///
300/// Returns the logical plan, optimized plan, and physical plan as strings.
301pub fn explain_plan(
302    cache: &TableCache,
303    table_name: &str,
304    plan: LogicalPlan,
305) -> ExplainResult {
306    let logical_plan = alloc::format!("{:#?}", plan);
307
308    // Build execution context with index info
309    let ctx = build_execution_context(cache, table_name);
310
311    // Use unified QueryPlanner
312    let planner = QueryPlanner::new(ctx);
313
314    // Get optimized logical plan
315    let optimized_plan_node = planner.optimize_logical(plan.clone());
316    let optimized_plan = alloc::format!("{:#?}", optimized_plan_node);
317
318    // Get physical plan (includes all physical optimizations)
319    let physical_plan_node = planner.plan(plan);
320    let physical_plan = alloc::format!("{:#?}", physical_plan_node);
321
322    ExplainResult {
323        logical_plan,
324        optimized_plan,
325        physical_plan,
326    }
327}
328
329/// Executes a pre-compiled physical plan.
330/// This is faster than execute_plan because it skips optimization.
331pub fn execute_physical_plan(
332    cache: &TableCache,
333    physical_plan: &PhysicalPlan,
334) -> ExecutionResult<Vec<Rc<Row>>> {
335    let data_source = TableCacheDataSource::new(cache);
336    let runner = PhysicalPlanRunner::new(&data_source);
337    let relation = runner.execute(physical_plan)?;
338
339    // Extract rows from relation entries
340    Ok(relation.entries.into_iter().map(|e| e.row).collect())
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use cynos_query::ast::Expr as AstExpr;
347    use cynos_query::optimizer::{IndexSelection, OptimizerPass};
348
349    #[test]
350    fn test_table_cache_data_source() {
351        // Basic test to ensure the module compiles
352        let cache = TableCache::new();
353        let _data_source = TableCacheDataSource::new(&cache);
354    }
355
356    #[test]
357    fn test_index_selection_with_empty_table_name() {
358        // Simulate what happens when col('status').eq('todo') is used
359        // The column has empty table name
360        let mut ctx = ExecutionContext::new();
361        ctx.register_table(
362            "tasks",
363            TableStats {
364                row_count: 100000,
365                is_sorted: false,
366                indexes: alloc::vec![
367                    IndexInfo::new("idx_status", alloc::vec!["status".into()], false),
368                    IndexInfo::new("idx_priority", alloc::vec!["priority".into()], false),
369                ],
370            },
371        );
372
373        let pass = IndexSelection::with_context(ctx);
374
375        // Create plan with empty table name in column (simulating col('status'))
376        let plan = LogicalPlan::Filter {
377            input: Box::new(LogicalPlan::Scan {
378                table: "tasks".into(),
379            }),
380            predicate: AstExpr::eq(
381                AstExpr::column("", "status", 2), // Empty table name!
382                AstExpr::literal(cynos_core::Value::String("todo".into())),
383            ),
384        };
385
386        let optimized = pass.optimize(plan.clone());
387
388        // Print for debugging
389        println!("Input plan: {:?}", plan);
390        println!("Optimized plan: {:?}", optimized);
391
392        // Should convert to IndexGet since we have idx_status
393        assert!(
394            matches!(optimized, LogicalPlan::IndexGet { .. }),
395            "Expected IndexGet but got {:?}",
396            optimized
397        );
398    }
399
400    #[test]
401    fn test_full_optimizer_pipeline() {
402        // Test the full optimizer pipeline using QueryPlanner
403        let mut ctx = ExecutionContext::new();
404        ctx.register_table(
405            "tasks",
406            TableStats {
407                row_count: 100000,
408                is_sorted: false,
409                indexes: alloc::vec![
410                    IndexInfo::new("idx_status", alloc::vec!["status".into()], false),
411                    IndexInfo::new("idx_priority", alloc::vec!["priority".into()], false),
412                ],
413            },
414        );
415
416        // Create QueryPlanner with context
417        let planner = QueryPlanner::new(ctx);
418
419        // Create plan with empty table name in column
420        let plan = LogicalPlan::Filter {
421            input: Box::new(LogicalPlan::Scan {
422                table: "tasks".into(),
423            }),
424            predicate: AstExpr::eq(
425                AstExpr::column("", "status", 2),
426                AstExpr::literal(cynos_core::Value::String("todo".into())),
427            ),
428        };
429
430        println!("Input plan: {:?}", plan);
431
432        // Run full optimization using QueryPlanner
433        let optimized = planner.optimize_logical(plan.clone());
434        println!("After optimize_logical(): {:?}", optimized);
435
436        // Convert to physical
437        let physical = planner.plan(plan);
438        println!("Physical plan: {:?}", physical);
439
440        // Should be IndexGet
441        assert!(
442            matches!(optimized, LogicalPlan::IndexGet { .. }),
443            "Expected IndexGet but got {:?}",
444            optimized
445        );
446    }
447
448    #[test]
449    fn test_end_to_end_with_real_table() {
450        use cynos_core::schema::TableBuilder;
451        use cynos_core::{DataType, Row, Value};
452
453        // Create a table with indexes
454        let table = TableBuilder::new("tasks")
455            .unwrap()
456            .add_column("id", DataType::Int64).unwrap()
457            .add_column("status", DataType::String).unwrap()
458            .add_column("priority", DataType::String).unwrap()
459            .add_primary_key(&["id"], false).unwrap()
460            .add_index("idx_status", &["status"], false).unwrap()
461            .add_index("idx_priority", &["priority"], false).unwrap()
462            .build()
463            .unwrap();
464
465        // Create cache and add table
466        let mut cache = TableCache::new();
467        cache.create_table(table).unwrap();
468
469        // Insert some test data
470        let store = cache.get_table_mut("tasks").unwrap();
471        for i in 0..1000 {
472            let status = if i % 5 == 0 { "todo" } else { "done" };
473            let priority = if i % 4 == 0 { "high" } else { "low" };
474            store.insert(Row::new(
475                i as u64,
476                alloc::vec![
477                    Value::Int64(i),
478                    Value::String(status.into()),
479                    Value::String(priority.into()),
480                ],
481            )).unwrap();
482        }
483
484        // Create a filter plan: WHERE status = 'todo'
485        let plan = LogicalPlan::Filter {
486            input: Box::new(LogicalPlan::Scan {
487                table: "tasks".into(),
488            }),
489            predicate: AstExpr::eq(
490                AstExpr::column("", "status", 1),
491                AstExpr::literal(Value::String("todo".into())),
492            ),
493        };
494
495        println!("Input plan: {:?}", plan);
496
497        // Build context and use QueryPlanner
498        let ctx = build_execution_context(&cache, "tasks");
499        println!("Context indexes: {:?}", ctx.get_stats("tasks").map(|s| &s.indexes));
500
501        let planner = QueryPlanner::new(ctx);
502        let optimized = planner.optimize_logical(plan.clone());
503        println!("Optimized plan: {:?}", optimized);
504
505        let physical = planner.plan(plan.clone());
506        println!("Physical plan: {:?}", physical);
507
508        // Execute
509        let result = execute_plan(&cache, "tasks", plan).unwrap();
510
511        println!("Result count: {}", result.len());
512
513        // Should return 200 rows (1000 / 5 = 200 with status = 'todo')
514        assert_eq!(result.len(), 200, "Expected 200 rows with status='todo'");
515
516        // Verify all results have status = 'todo'
517        for row in &result {
518            assert_eq!(
519                row.get(1),
520                Some(&Value::String("todo".into())),
521                "All rows should have status='todo'"
522            );
523        }
524    }
525
526    #[test]
527    fn test_execute_plan_with_limit() {
528        use cynos_core::schema::TableBuilder;
529        use cynos_core::{DataType, Row, Value};
530
531        // Create a table with indexes
532        let table = TableBuilder::new("tasks")
533            .unwrap()
534            .add_column("id", DataType::Int64).unwrap()
535            .add_column("status", DataType::String).unwrap()
536            .add_column("priority", DataType::String).unwrap()
537            .add_primary_key(&["id"], false).unwrap()
538            .add_index("idx_status", &["status"], false).unwrap()
539            .build()
540            .unwrap();
541
542        // Create cache and add table
543        let mut cache = TableCache::new();
544        cache.create_table(table).unwrap();
545
546        // Insert 1000 rows, 200 with status='todo'
547        let store = cache.get_table_mut("tasks").unwrap();
548        for i in 0..1000 {
549            let status = if i % 5 == 0 { "todo" } else { "done" };
550            store.insert(Row::new(
551                i as u64,
552                alloc::vec![
553                    Value::Int64(i),
554                    Value::String(status.into()),
555                    Value::String("low".into()),
556                ],
557            )).unwrap();
558        }
559
560        // Create a filter + limit plan: WHERE status = 'todo' LIMIT 10
561        let plan = LogicalPlan::Limit {
562            input: Box::new(LogicalPlan::Filter {
563                input: Box::new(LogicalPlan::Scan {
564                    table: "tasks".into(),
565                }),
566                predicate: AstExpr::eq(
567                    AstExpr::column("", "status", 1),
568                    AstExpr::literal(Value::String("todo".into())),
569                ),
570            }),
571            limit: 10,
572            offset: 0,
573        };
574
575        println!("Input plan with LIMIT: {:?}", plan);
576
577        // Execute
578        let result = execute_plan(&cache, "tasks", plan).unwrap();
579
580        println!("Result count: {} (expected 10)", result.len());
581
582        // Should return exactly 10 rows due to LIMIT
583        assert_eq!(result.len(), 10, "Expected 10 rows due to LIMIT");
584
585        // Verify all results have status = 'todo'
586        for row in &result {
587            assert_eq!(
588                row.get(1),
589                Some(&Value::String("todo".into())),
590                "All rows should have status='todo'"
591            );
592        }
593    }
594
595    #[test]
596    fn test_order_by_desc_with_index() {
597        use cynos_core::schema::TableBuilder;
598        use cynos_core::{DataType, Row, Value};
599        use cynos_query::ast::SortOrder;
600        use cynos_query::planner::PhysicalPlan;
601
602        // Create a table with an index on 'score'
603        let table = TableBuilder::new("scores")
604            .unwrap()
605            .add_column("id", DataType::Int64).unwrap()
606            .add_column("score", DataType::Int64).unwrap()
607            .add_primary_key(&["id"], false).unwrap()
608            .add_index("idx_score", &["score"], false).unwrap()
609            .build()
610            .unwrap();
611
612        // Create cache and add table
613        let mut cache = TableCache::new();
614        cache.create_table(table).unwrap();
615
616        // Insert rows with scores: 10, 20, 30, 40, 50
617        let store = cache.get_table_mut("scores").unwrap();
618        for i in 1..=5 {
619            store.insert(Row::new(
620                i as u64,
621                alloc::vec![
622                    Value::Int64(i),
623                    Value::Int64(i * 10),
624                ],
625            )).unwrap();
626        }
627
628        // Create a plan: SELECT * FROM scores ORDER BY score DESC LIMIT 3
629        let plan = LogicalPlan::Limit {
630            input: Box::new(LogicalPlan::Sort {
631                input: Box::new(LogicalPlan::Scan {
632                    table: "scores".into(),
633                }),
634                order_by: alloc::vec![(AstExpr::column("scores", "score", 1), SortOrder::Desc)],
635            }),
636            limit: 3,
637            offset: 0,
638        };
639
640        println!("Input plan: {:?}", plan);
641
642        // Build context and use QueryPlanner
643        let ctx = build_execution_context(&cache, "scores");
644        println!("Context indexes: {:?}", ctx.get_stats("scores").map(|s| &s.indexes));
645
646        let planner = QueryPlanner::new(ctx.clone());
647        let physical = planner.plan(plan.clone());
648        println!("Physical plan (single line): {:?}", physical);
649        println!("Physical plan (pretty): {:#?}", physical);
650        println!("Context indexes: {:?}", ctx.get_stats("scores").map(|s| &s.indexes));
651
652        // Verify the physical plan is an IndexScan with reverse=true
653        match &physical {
654            PhysicalPlan::IndexScan { reverse, limit, .. } => {
655                assert!(reverse, "IndexScan should have reverse=true for DESC ordering");
656                assert_eq!(*limit, Some(3), "IndexScan should have limit=3");
657            }
658            _ => panic!("Expected IndexScan, got {:?}", physical),
659        }
660
661        // Execute and verify results are in DESC order
662        let result = execute_plan(&cache, "scores", plan).unwrap();
663        println!("Result: {:?}", result.iter().map(|r| r.get(1)).collect::<Vec<_>>());
664
665        assert_eq!(result.len(), 3, "Expected 3 rows");
666        assert_eq!(result[0].get(1), Some(&Value::Int64(50)), "First row should have score=50");
667        assert_eq!(result[1].get(1), Some(&Value::Int64(40)), "Second row should have score=40");
668        assert_eq!(result[2].get(1), Some(&Value::Int64(30)), "Third row should have score=30");
669    }
670
671    #[test]
672    fn test_order_by_asc_with_index() {
673        use cynos_core::schema::TableBuilder;
674        use cynos_core::{DataType, Row, Value};
675        use cynos_query::ast::SortOrder;
676        use cynos_query::planner::PhysicalPlan;
677
678        // Create a table with an index on 'score'
679        let table = TableBuilder::new("scores_asc")
680            .unwrap()
681            .add_column("id", DataType::Int64).unwrap()
682            .add_column("score", DataType::Int64).unwrap()
683            .add_primary_key(&["id"], false).unwrap()
684            .add_index("idx_score", &["score"], false).unwrap()
685            .build()
686            .unwrap();
687
688        // Create cache and add table
689        let mut cache = TableCache::new();
690        cache.create_table(table).unwrap();
691
692        // Insert rows with scores: 10, 20, 30, 40, 50
693        let store = cache.get_table_mut("scores_asc").unwrap();
694        for i in 1..=5 {
695            store.insert(Row::new(
696                i as u64,
697                alloc::vec![
698                    Value::Int64(i),
699                    Value::Int64(i * 10),
700                ],
701            )).unwrap();
702        }
703
704        // Create a plan: SELECT * FROM scores_asc ORDER BY score ASC LIMIT 3
705        let plan = LogicalPlan::Limit {
706            input: Box::new(LogicalPlan::Sort {
707                input: Box::new(LogicalPlan::Scan {
708                    table: "scores_asc".into(),
709                }),
710                order_by: alloc::vec![(AstExpr::column("scores_asc", "score", 1), SortOrder::Asc)],
711            }),
712            limit: 3,
713            offset: 0,
714        };
715
716        println!("Input plan: {:?}", plan);
717
718        // Build context and use QueryPlanner
719        let ctx = build_execution_context(&cache, "scores_asc");
720        println!("Context indexes: {:?}", ctx.get_stats("scores_asc").map(|s| &s.indexes));
721
722        let planner = QueryPlanner::new(ctx);
723        let physical = planner.plan(plan.clone());
724        println!("Physical plan (single line): {:?}", physical);
725        println!("Physical plan (pretty): {:#?}", physical);
726
727        // Verify the physical plan is an IndexScan with reverse=false
728        match &physical {
729            PhysicalPlan::IndexScan { reverse, limit, .. } => {
730                assert!(!reverse, "IndexScan should have reverse=false for ASC ordering");
731                assert_eq!(*limit, Some(3), "IndexScan should have limit=3");
732            }
733            _ => panic!("Expected IndexScan, got {:?}", physical),
734        }
735
736        // Execute and verify results are in ASC order
737        let result = execute_plan(&cache, "scores_asc", plan).unwrap();
738        println!("Result: {:?}", result.iter().map(|r| r.get(1)).collect::<Vec<_>>());
739
740        assert_eq!(result.len(), 3, "Expected 3 rows");
741        assert_eq!(result[0].get(1), Some(&Value::Int64(10)), "First row should have score=10");
742        assert_eq!(result[1].get(1), Some(&Value::Int64(20)), "Second row should have score=20");
743        assert_eq!(result[2].get(1), Some(&Value::Int64(30)), "Third row should have score=30");
744    }
745}