Skip to main content

reddb_server/storage/query/
executor.rs

1//! Query Executor for RedDB
2//!
3//! Executes query plans against the storage engine, applying filters,
4//! sorting, and limits to produce results.
5
6use super::filter::Filter;
7use super::filter_compiled::CompiledFilter;
8use super::sort::{OrderBy, QueryLimits};
9use crate::storage::schema::{Row, Value};
10use std::collections::HashMap;
11
12/// Build a `column name → slot index` map from the executor's
13/// columns Vec. Used by [`MemoryExecutor`] to compile filters once
14/// per query against the table's schema instead of doing a linear
15/// `columns.iter().position()` per predicate per row.
16#[inline]
17fn build_schema_index(columns: &[String]) -> HashMap<String, usize> {
18    columns
19        .iter()
20        .enumerate()
21        .map(|(i, c)| (c.clone(), i))
22        .collect()
23}
24
25/// Query result set
26#[derive(Debug, Clone)]
27pub struct QueryResult {
28    /// Column names
29    pub columns: Vec<String>,
30    /// Result rows
31    pub rows: Vec<Row>,
32    /// Total rows before limits (if known)
33    pub total_count: Option<usize>,
34    /// Execution statistics
35    pub stats: QueryStats,
36}
37
38impl QueryResult {
39    /// Create empty result
40    pub fn empty() -> Self {
41        Self {
42            columns: Vec::new(),
43            rows: Vec::new(),
44            total_count: Some(0),
45            stats: QueryStats::default(),
46        }
47    }
48
49    /// Create from rows
50    pub fn from_rows(columns: Vec<String>, rows: Vec<Row>) -> Self {
51        let count = rows.len();
52        Self {
53            columns,
54            rows,
55            total_count: Some(count),
56            stats: QueryStats::default(),
57        }
58    }
59
60    /// Get row count
61    pub fn len(&self) -> usize {
62        self.rows.len()
63    }
64
65    /// Check if empty
66    pub fn is_empty(&self) -> bool {
67        self.rows.is_empty()
68    }
69
70    /// Get a column index by name
71    pub fn column_index(&self, name: &str) -> Option<usize> {
72        self.columns.iter().position(|c| c == name)
73    }
74
75    /// Get value from a row by column name
76    pub fn get_value(&self, row_idx: usize, column: &str) -> Option<&Value> {
77        let col_idx = self.column_index(column)?;
78        self.rows.get(row_idx)?.get(col_idx)
79    }
80
81    /// Iterate over rows with column access
82    pub fn iter_rows(&self) -> impl Iterator<Item = RowView<'_>> {
83        self.rows.iter().map(|row| RowView {
84            columns: &self.columns,
85            row,
86        })
87    }
88}
89
90/// View of a single row with column name access
91pub struct RowView<'a> {
92    columns: &'a [String],
93    row: &'a Row,
94}
95
96impl<'a> RowView<'a> {
97    /// Get value by column name
98    pub fn get(&self, column: &str) -> Option<&Value> {
99        let idx = self.columns.iter().position(|c| c == column)?;
100        self.row.get(idx)
101    }
102
103    /// Get value by index
104    pub fn get_index(&self, idx: usize) -> Option<&Value> {
105        self.row.get(idx)
106    }
107
108    /// Get all values
109    pub fn values(&self) -> &[Value] {
110        self.row.values()
111    }
112}
113
114/// Query execution statistics
115#[derive(Debug, Clone, Default)]
116pub struct QueryStats {
117    /// Rows scanned
118    pub rows_scanned: usize,
119    /// Rows matched filter
120    pub rows_matched: usize,
121    /// Rows returned
122    pub rows_returned: usize,
123    /// Execution time in microseconds
124    pub execution_time_us: u64,
125    /// Index used (if any)
126    pub index_used: Option<String>,
127}
128
129/// Query plan representing a query to execute
130#[derive(Debug, Clone)]
131pub struct QueryPlan {
132    /// Table to query
133    pub table: String,
134    /// Columns to select (empty = all)
135    pub columns: Vec<String>,
136    /// Filter conditions
137    pub filter: Option<Filter>,
138    /// Ordering
139    pub order_by: OrderBy,
140    /// Limits
141    pub limits: QueryLimits,
142    /// Whether to count total rows
143    pub count_total: bool,
144}
145
146impl QueryPlan {
147    /// Create a new query plan for a table
148    pub fn new(table: impl Into<String>) -> Self {
149        Self {
150            table: table.into(),
151            columns: Vec::new(),
152            filter: None,
153            order_by: OrderBy::new(),
154            limits: QueryLimits::none(),
155            count_total: false,
156        }
157    }
158
159    /// Select specific columns
160    pub fn select(mut self, columns: Vec<String>) -> Self {
161        self.columns = columns;
162        self
163    }
164
165    /// Add a filter
166    pub fn filter(mut self, filter: Filter) -> Self {
167        self.filter = Some(match self.filter {
168            Some(existing) => existing.and_filter(filter),
169            None => filter,
170        });
171        self
172    }
173
174    /// Add ordering
175    pub fn order_by(mut self, order: OrderBy) -> Self {
176        self.order_by = order;
177        self
178    }
179
180    /// Set limit
181    pub fn limit(mut self, n: usize) -> Self {
182        self.limits = self.limits.limit(n);
183        self
184    }
185
186    /// Set offset
187    pub fn offset(mut self, n: usize) -> Self {
188        self.limits = self.limits.offset(n);
189        self
190    }
191
192    /// Enable total count
193    pub fn with_total_count(mut self) -> Self {
194        self.count_total = true;
195        self
196    }
197}
198
199/// Query builder for fluent API
200#[derive(Debug, Clone)]
201pub struct Query {
202    plan: QueryPlan,
203}
204
205impl Query {
206    /// Create SELECT query for a table
207    pub fn select(table: impl Into<String>) -> Self {
208        Self {
209            plan: QueryPlan::new(table),
210        }
211    }
212
213    /// Select specific columns
214    pub fn columns(mut self, columns: Vec<impl Into<String>>) -> Self {
215        self.plan.columns = columns.into_iter().map(|c| c.into()).collect();
216        self
217    }
218
219    /// Add WHERE filter
220    pub fn filter(mut self, filter: Filter) -> Self {
221        self.plan = self.plan.filter(filter);
222        self
223    }
224
225    /// Add ORDER BY ascending
226    pub fn order_by_asc(mut self, column: impl Into<String>) -> Self {
227        self.plan.order_by = self.plan.order_by.asc(column);
228        self
229    }
230
231    /// Add ORDER BY descending
232    pub fn order_by_desc(mut self, column: impl Into<String>) -> Self {
233        self.plan.order_by = self.plan.order_by.desc(column);
234        self
235    }
236
237    /// Set LIMIT
238    pub fn limit(mut self, n: usize) -> Self {
239        self.plan = self.plan.limit(n);
240        self
241    }
242
243    /// Set OFFSET
244    pub fn offset(mut self, n: usize) -> Self {
245        self.plan = self.plan.offset(n);
246        self
247    }
248
249    /// Enable total count
250    pub fn with_total_count(mut self) -> Self {
251        self.plan = self.plan.with_total_count();
252        self
253    }
254
255    /// Get the query plan
256    pub fn plan(self) -> QueryPlan {
257        self.plan
258    }
259}
260
261/// Query executor trait
262pub trait QueryExecutor {
263    /// Execute a query plan
264    fn execute(&self, plan: &QueryPlan) -> Result<QueryResult, QueryError>;
265
266    /// Count rows matching filter
267    fn count(&self, table: &str, filter: Option<&Filter>) -> Result<usize, QueryError>;
268
269    /// Check if table exists
270    fn table_exists(&self, table: &str) -> bool;
271
272    /// Get table column names
273    fn table_columns(&self, table: &str) -> Option<Vec<String>>;
274}
275
276/// Query execution error
277#[derive(Debug, Clone)]
278pub enum QueryError {
279    /// Table not found
280    TableNotFound(String),
281    /// Column not found
282    ColumnNotFound(String),
283    /// Invalid filter
284    InvalidFilter(String),
285    /// Type mismatch
286    TypeMismatch(String),
287    /// Storage error
288    StorageError(String),
289    /// Query timeout
290    Timeout,
291}
292
293impl std::fmt::Display for QueryError {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        match self {
296            QueryError::TableNotFound(t) => write!(f, "Table not found: {}", t),
297            QueryError::ColumnNotFound(c) => write!(f, "Column not found: {}", c),
298            QueryError::InvalidFilter(msg) => write!(f, "Invalid filter: {}", msg),
299            QueryError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
300            QueryError::StorageError(msg) => write!(f, "Storage error: {}", msg),
301            QueryError::Timeout => write!(f, "Query timeout"),
302        }
303    }
304}
305
306impl std::error::Error for QueryError {}
307
308/// In-memory query executor for testing and simple use cases
309pub struct MemoryExecutor {
310    /// Tables: name -> (columns, rows)
311    tables: HashMap<String, (Vec<String>, Vec<Row>)>,
312}
313
314impl MemoryExecutor {
315    /// Create a new memory executor
316    pub fn new() -> Self {
317        Self {
318            tables: HashMap::new(),
319        }
320    }
321
322    /// Add a table
323    pub fn add_table(&mut self, name: impl Into<String>, columns: Vec<String>, rows: Vec<Row>) {
324        self.tables.insert(name.into(), (columns, rows));
325    }
326
327    /// Insert a row
328    pub fn insert(&mut self, table: &str, row: Row) -> bool {
329        if let Some((_, rows)) = self.tables.get_mut(table) {
330            rows.push(row);
331            true
332        } else {
333            false
334        }
335    }
336
337    /// Get value from row by column index
338    fn get_row_value(&self, row: &Row, columns: &[String], column: &str) -> Value {
339        if let Some(idx) = columns.iter().position(|c| c == column) {
340            row.get(idx).cloned().unwrap_or(Value::Null)
341        } else {
342            Value::Null
343        }
344    }
345}
346
347impl Default for MemoryExecutor {
348    fn default() -> Self {
349        Self::new()
350    }
351}
352
353impl QueryExecutor for MemoryExecutor {
354    fn execute(&self, plan: &QueryPlan) -> Result<QueryResult, QueryError> {
355        let start = std::time::Instant::now();
356
357        // Get table
358        let (columns, rows) = self
359            .tables
360            .get(&plan.table)
361            .ok_or_else(|| QueryError::TableNotFound(plan.table.clone()))?;
362
363        let mut stats = QueryStats {
364            rows_scanned: rows.len(),
365            ..Default::default()
366        };
367
368        // Compile the filter ONCE before the row loop. This resolves
369        // every `column name → slot index` lookup at compile time
370        // (no per-row `columns.iter().position(|c| c == col)` linear
371        // scan) and produces a flat opcode list that the hot loop
372        // walks without recursion or closure indirection. Falls
373        // back to the legacy walker only when the filter references
374        // a column that isn't in the table — which is a query bug
375        // the executor would have surfaced anyway.
376        let compiled_filter = plan.filter.as_ref().and_then(|f| {
377            let schema = build_schema_index(columns);
378            CompiledFilter::compile(f, &schema).ok()
379        });
380
381        // Filter rows
382        let mut matched_rows: Vec<Row> = rows
383            .iter()
384            .filter(|row| match (&compiled_filter, &plan.filter) {
385                (Some(compiled), _) => compiled.evaluate(row.values()),
386                (None, Some(filter)) => filter.evaluate(&|col| {
387                    if let Some(idx) = columns.iter().position(|c| c == col) {
388                        row.get(idx).cloned()
389                    } else {
390                        None
391                    }
392                }),
393                (None, None) => true,
394            })
395            .cloned()
396            .collect();
397
398        stats.rows_matched = matched_rows.len();
399        let total_count = if plan.count_total {
400            Some(matched_rows.len())
401        } else {
402            None
403        };
404
405        // Sort
406        if !plan.order_by.is_empty() {
407            plan.order_by.sort_rows(&mut matched_rows, |row, col| {
408                self.get_row_value(row, columns, col)
409            });
410        }
411
412        // Apply limits
413        let result_rows = plan.limits.apply(matched_rows);
414        stats.rows_returned = result_rows.len();
415
416        // Select columns
417        let result_columns = if plan.columns.is_empty() {
418            columns.clone()
419        } else {
420            // Validate columns exist
421            for col in &plan.columns {
422                if !columns.contains(col) {
423                    return Err(QueryError::ColumnNotFound(col.clone()));
424                }
425            }
426
427            // Project rows to selected columns
428            let col_indices: Vec<usize> = plan
429                .columns
430                .iter()
431                .filter_map(|c| columns.iter().position(|col| col == c))
432                .collect();
433
434            let projected_rows: Vec<Row> = result_rows
435                .into_iter()
436                .map(|row| {
437                    let projected_values: Vec<Value> = col_indices
438                        .iter()
439                        .map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
440                        .collect();
441                    Row::new(projected_values)
442                })
443                .collect();
444
445            stats.rows_returned = projected_rows.len();
446            stats.execution_time_us = start.elapsed().as_micros() as u64;
447
448            return Ok(QueryResult {
449                columns: plan.columns.clone(),
450                rows: projected_rows,
451                total_count,
452                stats,
453            });
454        };
455
456        stats.execution_time_us = start.elapsed().as_micros() as u64;
457
458        Ok(QueryResult {
459            columns: result_columns,
460            rows: result_rows,
461            total_count,
462            stats,
463        })
464    }
465
466    fn count(&self, table: &str, filter: Option<&Filter>) -> Result<usize, QueryError> {
467        let (columns, rows) = self
468            .tables
469            .get(table)
470            .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
471
472        // Compile once before the count loop — same pattern as
473        // execute(). See filter_compiled.rs for the algorithm.
474        let compiled = filter.and_then(|f| {
475            let schema = build_schema_index(columns);
476            CompiledFilter::compile(f, &schema).ok()
477        });
478
479        let count = rows
480            .iter()
481            .filter(|row| match (&compiled, filter) {
482                (Some(c), _) => c.evaluate(row.values()),
483                (None, Some(f)) => f.evaluate(&|col| {
484                    if let Some(idx) = columns.iter().position(|c| c == col) {
485                        row.get(idx).cloned()
486                    } else {
487                        None
488                    }
489                }),
490                (None, None) => true,
491            })
492            .count();
493
494        Ok(count)
495    }
496
497    fn table_exists(&self, table: &str) -> bool {
498        self.tables.contains_key(table)
499    }
500
501    fn table_columns(&self, table: &str) -> Option<Vec<String>> {
502        self.tables.get(table).map(|(cols, _)| cols.clone())
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509
510    fn create_test_executor() -> MemoryExecutor {
511        let mut executor = MemoryExecutor::new();
512
513        let columns = vec![
514            "id".to_string(),
515            "name".to_string(),
516            "age".to_string(),
517            "department".to_string(),
518        ];
519
520        let rows = vec![
521            Row::new(vec![
522                Value::Integer(1),
523                Value::text("Alice".to_string()),
524                Value::Integer(30),
525                Value::text("Engineering".to_string()),
526            ]),
527            Row::new(vec![
528                Value::Integer(2),
529                Value::text("Bob".to_string()),
530                Value::Integer(25),
531                Value::text("Sales".to_string()),
532            ]),
533            Row::new(vec![
534                Value::Integer(3),
535                Value::text("Charlie".to_string()),
536                Value::Integer(35),
537                Value::text("Engineering".to_string()),
538            ]),
539            Row::new(vec![
540                Value::Integer(4),
541                Value::text("Diana".to_string()),
542                Value::Integer(28),
543                Value::text("HR".to_string()),
544            ]),
545            Row::new(vec![
546                Value::Integer(5),
547                Value::text("Eve".to_string()),
548                Value::Integer(32),
549                Value::text("Engineering".to_string()),
550            ]),
551        ];
552
553        executor.add_table("employees", columns, rows);
554        executor
555    }
556
557    #[test]
558    fn test_simple_select() {
559        let executor = create_test_executor();
560
561        let plan = QueryPlan::new("employees");
562        let result = executor.execute(&plan).unwrap();
563
564        assert_eq!(result.len(), 5);
565        assert_eq!(result.columns.len(), 4);
566    }
567
568    #[test]
569    fn test_select_columns() {
570        let executor = create_test_executor();
571
572        let plan = QueryPlan::new("employees").select(vec!["name".to_string(), "age".to_string()]);
573
574        let result = executor.execute(&plan).unwrap();
575
576        assert_eq!(result.columns, vec!["name", "age"]);
577        assert_eq!(result.rows[0].len(), 2);
578    }
579
580    #[test]
581    fn test_filter_eq() {
582        let executor = create_test_executor();
583
584        let plan = QueryPlan::new("employees").filter(Filter::eq(
585            "department",
586            Value::text("Engineering".to_string()),
587        ));
588
589        let result = executor.execute(&plan).unwrap();
590
591        assert_eq!(result.len(), 3); // Alice, Charlie, Eve
592    }
593
594    #[test]
595    fn test_filter_gt() {
596        let executor = create_test_executor();
597
598        let plan = QueryPlan::new("employees").filter(Filter::gt("age", Value::Integer(30)));
599
600        let result = executor.execute(&plan).unwrap();
601
602        assert_eq!(result.len(), 2); // Charlie (35), Eve (32)
603    }
604
605    #[test]
606    fn test_filter_and() {
607        let executor = create_test_executor();
608
609        let plan = QueryPlan::new("employees")
610            .filter(Filter::eq(
611                "department",
612                Value::text("Engineering".to_string()),
613            ))
614            .filter(Filter::ge("age", Value::Integer(30)));
615
616        let result = executor.execute(&plan).unwrap();
617
618        assert_eq!(result.len(), 3); // Alice (30), Charlie (35), Eve (32)
619    }
620
621    #[test]
622    fn test_order_by() {
623        let executor = create_test_executor();
624
625        let plan = QueryPlan::new("employees").order_by(OrderBy::new().asc("age"));
626
627        let result = executor.execute(&plan).unwrap();
628
629        // Should be ordered: Bob (25), Diana (28), Alice (30), Eve (32), Charlie (35)
630        assert_eq!(
631            result.get_value(0, "name"),
632            Some(&Value::text("Bob".to_string()))
633        );
634        assert_eq!(
635            result.get_value(4, "name"),
636            Some(&Value::text("Charlie".to_string()))
637        );
638    }
639
640    #[test]
641    fn test_order_by_desc() {
642        let executor = create_test_executor();
643
644        let plan = QueryPlan::new("employees").order_by(OrderBy::new().desc("age"));
645
646        let result = executor.execute(&plan).unwrap();
647
648        assert_eq!(
649            result.get_value(0, "name"),
650            Some(&Value::text("Charlie".to_string()))
651        );
652        assert_eq!(
653            result.get_value(4, "name"),
654            Some(&Value::text("Bob".to_string()))
655        );
656    }
657
658    #[test]
659    fn test_limit() {
660        let executor = create_test_executor();
661
662        let plan = QueryPlan::new("employees").limit(2);
663
664        let result = executor.execute(&plan).unwrap();
665
666        assert_eq!(result.len(), 2);
667    }
668
669    #[test]
670    fn test_offset() {
671        let executor = create_test_executor();
672
673        let plan = QueryPlan::new("employees")
674            .order_by(OrderBy::new().asc("id"))
675            .offset(2);
676
677        let result = executor.execute(&plan).unwrap();
678
679        assert_eq!(result.len(), 3); // Skip first 2
680        assert_eq!(result.get_value(0, "id"), Some(&Value::Integer(3)));
681    }
682
683    #[test]
684    fn test_limit_offset() {
685        let executor = create_test_executor();
686
687        let plan = QueryPlan::new("employees")
688            .order_by(OrderBy::new().asc("id"))
689            .offset(1)
690            .limit(2);
691
692        let result = executor.execute(&plan).unwrap();
693
694        assert_eq!(result.len(), 2);
695        assert_eq!(result.get_value(0, "id"), Some(&Value::Integer(2)));
696        assert_eq!(result.get_value(1, "id"), Some(&Value::Integer(3)));
697    }
698
699    #[test]
700    fn test_count() {
701        let executor = create_test_executor();
702
703        let count = executor.count("employees", None).unwrap();
704        assert_eq!(count, 5);
705
706        let filter = Filter::eq("department", Value::text("Engineering".to_string()));
707        let count = executor.count("employees", Some(&filter)).unwrap();
708        assert_eq!(count, 3);
709    }
710
711    #[test]
712    fn test_total_count() {
713        let executor = create_test_executor();
714
715        let plan = QueryPlan::new("employees").limit(2).with_total_count();
716
717        let result = executor.execute(&plan).unwrap();
718
719        assert_eq!(result.len(), 2);
720        assert_eq!(result.total_count, Some(5));
721    }
722
723    #[test]
724    fn test_table_not_found() {
725        let executor = create_test_executor();
726
727        let plan = QueryPlan::new("nonexistent");
728        let result = executor.execute(&plan);
729
730        assert!(matches!(result, Err(QueryError::TableNotFound(_))));
731    }
732
733    #[test]
734    fn test_column_not_found() {
735        let executor = create_test_executor();
736
737        let plan =
738            QueryPlan::new("employees").select(vec!["name".to_string(), "nonexistent".to_string()]);
739
740        let result = executor.execute(&plan);
741
742        assert!(matches!(result, Err(QueryError::ColumnNotFound(_))));
743    }
744
745    #[test]
746    fn test_query_builder() {
747        let executor = create_test_executor();
748
749        let query = Query::select("employees")
750            .columns(vec!["name", "age"])
751            .filter(Filter::gt("age", Value::Integer(28)))
752            .order_by_desc("age")
753            .limit(3);
754
755        let result = executor.execute(&query.plan()).unwrap();
756
757        assert_eq!(result.columns, vec!["name", "age"]);
758        assert_eq!(result.len(), 3);
759    }
760
761    #[test]
762    fn test_row_view() {
763        let executor = create_test_executor();
764
765        let plan = QueryPlan::new("employees")
766            .order_by(OrderBy::new().asc("id"))
767            .limit(1);
768
769        let result = executor.execute(&plan).unwrap();
770
771        for row in result.iter_rows() {
772            assert_eq!(row.get("name"), Some(&Value::text("Alice".to_string())));
773            assert_eq!(row.get("age"), Some(&Value::Integer(30)));
774        }
775    }
776
777    #[test]
778    fn test_stats() {
779        let executor = create_test_executor();
780
781        let plan = QueryPlan::new("employees")
782            .filter(Filter::eq(
783                "department",
784                Value::text("Engineering".to_string()),
785            ))
786            .limit(2);
787
788        let result = executor.execute(&plan).unwrap();
789
790        assert_eq!(result.stats.rows_scanned, 5);
791        assert_eq!(result.stats.rows_matched, 3);
792        assert_eq!(result.stats.rows_returned, 2);
793    }
794
795    #[test]
796    fn test_insert() {
797        let mut executor = create_test_executor();
798
799        let new_row = Row::new(vec![
800            Value::Integer(6),
801            Value::text("Frank".to_string()),
802            Value::Integer(40),
803            Value::text("Legal".to_string()),
804        ]);
805
806        assert!(executor.insert("employees", new_row));
807
808        let count = executor.count("employees", None).unwrap();
809        assert_eq!(count, 6);
810    }
811
812    #[test]
813    fn test_table_exists() {
814        let executor = create_test_executor();
815
816        assert!(executor.table_exists("employees"));
817        assert!(!executor.table_exists("nonexistent"));
818    }
819
820    #[test]
821    fn test_table_columns() {
822        let executor = create_test_executor();
823
824        let columns = executor.table_columns("employees").unwrap();
825        assert_eq!(columns, vec!["id", "name", "age", "department"]);
826
827        assert!(executor.table_columns("nonexistent").is_none());
828    }
829}