Skip to main content

sochdb_query/executor/
pipeline.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2
3//! Unified execution pipeline: SQL text → result.
4//!
5//! This is the top-level entry point that replaces the disconnected
6//! three-way split between `SqlBridge`, `SqlExecutor`, and `SochQlExecutor`.
7//!
8//! ```text
9//! SQL Text → Parser → AST → Planner → Volcano Operator Tree → ExecutionResult
10//! ```
11
12use crate::optimizer_integration::StorageBackend;
13use crate::soch_ql::SochValue;
14use crate::storage_bridge::convert_query_to_core;
15use crate::sql::bridge::ExecutionResult;
16use crate::sql::ast::*;
17use crate::sql::error::{SqlError, SqlResult};
18use crate::sql::parser::Parser;
19use super::explain::ExplainNode;
20use super::node::PlanNode;
21use super::planner::{QueryPlanner, explain_select};
22use std::collections::HashMap;
23use std::sync::Arc;
24
25/// Executor configuration.
26pub struct ExecutorConfig {
27    /// Maximum rows to return (safety limit).
28    pub max_rows: usize,
29    /// Enable EXPLAIN output.
30    pub explain_mode: bool,
31}
32
33impl Default for ExecutorConfig {
34    fn default() -> Self {
35        Self {
36            max_rows: 1_000_000,
37            explain_mode: false,
38        }
39    }
40}
41
42/// Execute a SQL string against a storage backend, returning the result.
43///
44/// This is the unified entry point for all SQL execution:
45///
46/// ```text
47/// let result = execute_sql("SELECT * FROM users WHERE age > 21", &storage)?;
48/// ```
49pub fn execute_sql(
50    sql: &str,
51    storage: &Arc<dyn StorageBackend>,
52) -> SqlResult<ExecutionResult> {
53    let stmt = Parser::parse(sql).map_err(SqlError::from_parse_errors)?;
54    execute_statement(&stmt, storage)
55}
56
57/// Execute a parsed SQL statement against a storage backend.
58pub fn execute_statement(
59    stmt: &Statement,
60    storage: &Arc<dyn StorageBackend>,
61) -> SqlResult<ExecutionResult> {
62    match stmt {
63        Statement::Select(select) => execute_select(select, storage),
64
65        Statement::Explain(inner) => {
66            match inner.as_ref() {
67                Statement::Select(select) => {
68                    let plan_text = explain_select(select, storage);
69                    let mut node = ExplainNode::new(plan_text);
70                    collect_rows_from_node(&mut node)
71                }
72                _ => Err(SqlError::NotImplemented(
73                    "EXPLAIN only supported for SELECT statements".into(),
74                )),
75            }
76        }
77
78        // DML / DDL — these still need to go through the SqlConnection/storage bridge
79        // The Volcano executor handles SELECT; mutations go through the existing path.
80        Statement::Insert(_)
81        | Statement::Update(_)
82        | Statement::Delete(_)
83        | Statement::CreateTable(_)
84        | Statement::DropTable(_)
85        | Statement::CreateIndex(_)
86        | Statement::DropIndex(_)
87        | Statement::AlterTable(_)
88        | Statement::Begin(_)
89        | Statement::Commit
90        | Statement::Rollback(_)
91        | Statement::Savepoint(_)
92        | Statement::Release(_)
93        | Statement::DefineScope(_)
94        | Statement::DefineTablePermissions(_)
95        | Statement::RemoveScope(_)
96        | Statement::Relate(_)
97        | Statement::LiveSelect(_)
98        | Statement::DefineEvent(_) => Err(SqlError::NotImplemented(
99            "DML/DDL statements should be routed through SqlBridge".into(),
100        )),
101    }
102}
103
104/// Execute a SELECT statement and collect results.
105fn execute_select(
106    select: &SelectStmt,
107    storage: &Arc<dyn StorageBackend>,
108) -> SqlResult<ExecutionResult> {
109    let planner = QueryPlanner::new(storage.clone());
110    let mut node = planner
111        .plan_select(select)
112        .map_err(|e| SqlError::ExecutionError(e.to_string()))?;
113
114    collect_rows_from_node(&mut node)
115}
116
117/// Collect all rows from a PlanNode into an ExecutionResult.
118fn collect_rows_from_node(node: &mut dyn PlanNode) -> SqlResult<ExecutionResult> {
119    let schema = node.schema().clone();
120    let columns = schema.column_names();
121
122    let mut rows: Vec<HashMap<String, sochdb_core::SochValue>> = Vec::new();
123    loop {
124        match node.next() {
125            Ok(Some(row)) => {
126                let mut row_map = HashMap::new();
127                for (i, val) in row.into_iter().enumerate() {
128                    let col_name = columns
129                        .get(i)
130                        .cloned()
131                        .unwrap_or_else(|| format!("col{}", i));
132                    row_map.insert(col_name, convert_query_to_core(&val));
133                }
134                rows.push(row_map);
135            }
136            Ok(None) => break,
137            Err(e) => return Err(SqlError::ExecutionError(e.to_string())),
138        }
139    }
140
141    Ok(ExecutionResult::Rows { columns, rows })
142}
143
144// ============================================================================
145// Convenience: execute SQL and get results as vectors
146// ============================================================================
147
148/// Execute SQL and return rows as Vec<Vec<SochValue>> (positional).
149pub fn execute_sql_rows(
150    sql: &str,
151    storage: &Arc<dyn StorageBackend>,
152) -> SqlResult<(Vec<String>, Vec<Vec<SochValue>>)> {
153    let result = execute_sql(sql, storage)?;
154    match result {
155        ExecutionResult::Rows { columns, rows } => {
156            let typed_rows: Vec<Vec<SochValue>> = rows
157                .into_iter()
158                .map(|row_map| {
159                    columns
160                        .iter()
161                        .map(|col| {
162                            row_map
163                                .get(col)
164                                .map(|v| crate::storage_bridge::convert_core_to_query(v))
165                                .unwrap_or(SochValue::Null)
166                        })
167                        .collect()
168                })
169                .collect();
170            Ok((columns, typed_rows))
171        }
172        _ => Ok((vec![], vec![])),
173    }
174}