sochdb_query/executor/
pipeline.rs1use crate::optimizer_integration::StorageBackend;
13use crate::soch_ql::SochValue;
14use crate::storage_bridge::convert_query_to_core;
15use crate::sql::bridge::ExecutionResult;
16use crate::sql::ast::*;
17use crate::sql::error::{SqlError, SqlResult};
18use crate::sql::parser::Parser;
19use super::explain::ExplainNode;
20use super::node::PlanNode;
21use super::planner::{QueryPlanner, explain_select};
22use std::collections::HashMap;
23use std::sync::Arc;
24
25pub struct ExecutorConfig {
27 pub max_rows: usize,
29 pub explain_mode: bool,
31}
32
33impl Default for ExecutorConfig {
34 fn default() -> Self {
35 Self {
36 max_rows: 1_000_000,
37 explain_mode: false,
38 }
39 }
40}
41
42pub fn execute_sql(
50 sql: &str,
51 storage: &Arc<dyn StorageBackend>,
52) -> SqlResult<ExecutionResult> {
53 let stmt = Parser::parse(sql).map_err(SqlError::from_parse_errors)?;
54 execute_statement(&stmt, storage)
55}
56
57pub fn execute_statement(
59 stmt: &Statement,
60 storage: &Arc<dyn StorageBackend>,
61) -> SqlResult<ExecutionResult> {
62 match stmt {
63 Statement::Select(select) => execute_select(select, storage),
64
65 Statement::Explain(inner) => {
66 match inner.as_ref() {
67 Statement::Select(select) => {
68 let plan_text = explain_select(select, storage);
69 let mut node = ExplainNode::new(plan_text);
70 collect_rows_from_node(&mut node)
71 }
72 _ => Err(SqlError::NotImplemented(
73 "EXPLAIN only supported for SELECT statements".into(),
74 )),
75 }
76 }
77
78 Statement::Insert(_)
81 | Statement::Update(_)
82 | Statement::Delete(_)
83 | Statement::CreateTable(_)
84 | Statement::DropTable(_)
85 | Statement::CreateIndex(_)
86 | Statement::DropIndex(_)
87 | Statement::AlterTable(_)
88 | Statement::Begin(_)
89 | Statement::Commit
90 | Statement::Rollback(_)
91 | Statement::Savepoint(_)
92 | Statement::Release(_)
93 | Statement::DefineScope(_)
94 | Statement::DefineTablePermissions(_)
95 | Statement::RemoveScope(_)
96 | Statement::Relate(_)
97 | Statement::LiveSelect(_)
98 | Statement::DefineEvent(_) => Err(SqlError::NotImplemented(
99 "DML/DDL statements should be routed through SqlBridge".into(),
100 )),
101 }
102}
103
104fn execute_select(
106 select: &SelectStmt,
107 storage: &Arc<dyn StorageBackend>,
108) -> SqlResult<ExecutionResult> {
109 let planner = QueryPlanner::new(storage.clone());
110 let mut node = planner
111 .plan_select(select)
112 .map_err(|e| SqlError::ExecutionError(e.to_string()))?;
113
114 collect_rows_from_node(&mut node)
115}
116
117fn collect_rows_from_node(node: &mut dyn PlanNode) -> SqlResult<ExecutionResult> {
119 let schema = node.schema().clone();
120 let columns = schema.column_names();
121
122 let mut rows: Vec<HashMap<String, sochdb_core::SochValue>> = Vec::new();
123 loop {
124 match node.next() {
125 Ok(Some(row)) => {
126 let mut row_map = HashMap::new();
127 for (i, val) in row.into_iter().enumerate() {
128 let col_name = columns
129 .get(i)
130 .cloned()
131 .unwrap_or_else(|| format!("col{}", i));
132 row_map.insert(col_name, convert_query_to_core(&val));
133 }
134 rows.push(row_map);
135 }
136 Ok(None) => break,
137 Err(e) => return Err(SqlError::ExecutionError(e.to_string())),
138 }
139 }
140
141 Ok(ExecutionResult::Rows { columns, rows })
142}
143
144pub fn execute_sql_rows(
150 sql: &str,
151 storage: &Arc<dyn StorageBackend>,
152) -> SqlResult<(Vec<String>, Vec<Vec<SochValue>>)> {
153 let result = execute_sql(sql, storage)?;
154 match result {
155 ExecutionResult::Rows { columns, rows } => {
156 let typed_rows: Vec<Vec<SochValue>> = rows
157 .into_iter()
158 .map(|row_map| {
159 columns
160 .iter()
161 .map(|col| {
162 row_map
163 .get(col)
164 .map(|v| crate::storage_bridge::convert_core_to_query(v))
165 .unwrap_or(SochValue::Null)
166 })
167 .collect()
168 })
169 .collect();
170 Ok((columns, typed_rows))
171 }
172 _ => Ok((vec![], vec![])),
173 }
174}