1use alopex_core::kv::KVStore;
2use alopex_core::types::TxnMode;
3use alopex_core::KVTransaction;
4use alopex_sql::catalog::CatalogOverlay;
5use alopex_sql::catalog::TxnCatalogView;
6use alopex_sql::executor::query::execute_query_streaming;
7use alopex_sql::executor::query::RowIterator;
8use alopex_sql::executor::{build_streaming_pipeline, ColumnInfo, Executor, QueryRowIterator, Row};
9use alopex_sql::planner::typed_expr::Projection;
10use alopex_sql::storage::{SqlValue, TxnBridge};
11use alopex_sql::AlopexDialect;
12use alopex_sql::Parser;
13use alopex_sql::Planner;
14use alopex_sql::Statement;
15use alopex_sql::StatementKind;
16
17use crate::Database;
18use crate::Error;
19use crate::Result;
20use crate::SqlResult;
21use crate::Transaction;
22
23pub struct StreamingRows<'a> {
29 columns: Vec<ColumnInfo>,
30 iter: Box<dyn RowIterator + 'a>,
31 projection: Projection,
32 schema: Vec<alopex_sql::catalog::ColumnMetadata>,
33}
34
35impl<'a> StreamingRows<'a> {
36 pub fn columns(&self) -> &[ColumnInfo] {
38 &self.columns
39 }
40
41 pub fn next_row(&mut self) -> Result<Option<Vec<SqlValue>>> {
45 match self.iter.next_row() {
46 Some(result) => {
47 let row = result.map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
48 let projected = self.project_row(&row)?;
49 Ok(Some(projected))
50 }
51 None => Ok(None),
52 }
53 }
54
55 fn project_row(&self, row: &Row) -> Result<Vec<SqlValue>> {
57 match &self.projection {
58 Projection::All(names) => {
59 let mut result = Vec::with_capacity(names.len());
61 for name in names {
62 let idx = self
63 .schema
64 .iter()
65 .position(|c| &c.name == name)
66 .ok_or_else(|| {
67 Error::Sql(alopex_sql::SqlError::Execution {
68 message: format!("column not found: {}", name),
69 code: "ALOPEX-E020",
70 })
71 })?;
72 result.push(row.values.get(idx).cloned().unwrap_or(SqlValue::Null));
73 }
74 Ok(result)
75 }
76 Projection::Columns(cols) => {
77 use alopex_sql::executor::evaluator::{evaluate, EvalContext};
78 let ctx = EvalContext::new(&row.values);
79 let mut result = Vec::with_capacity(cols.len());
80 for col in cols {
81 let value = evaluate(&col.expr, &ctx)
82 .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
83 result.push(value);
84 }
85 Ok(result)
86 }
87 }
88 }
89}
90
91pub enum StreamingQueryResult<R> {
93 Success,
95 RowsAffected(u64),
97 QueryProcessed(R),
99}
100
101pub enum SqlStreamingResult {
106 Success,
108 RowsAffected(u64),
110 Query(QueryRowIterator<'static>),
112}
113
114fn parse_sql(sql: &str) -> Result<Vec<Statement>> {
115 let dialect = AlopexDialect;
116 Parser::parse_sql(&dialect, sql).map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))
117}
118
119fn stmt_requires_write(stmt: &Statement) -> bool {
120 !matches!(stmt.kind, StatementKind::Select(_))
121}
122
123fn plan_stmt<'a, S: KVStore>(
124 catalog: &'a alopex_sql::catalog::PersistentCatalog<S>,
125 overlay: &'a CatalogOverlay,
126 stmt: &Statement,
127) -> Result<alopex_sql::LogicalPlan> {
128 let view = TxnCatalogView::new(catalog, overlay);
129 let planner = Planner::new(&view);
130 planner
131 .plan(stmt)
132 .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))
133}
134
135fn build_column_info(
137 projection: &Projection,
138 schema: &[alopex_sql::catalog::ColumnMetadata],
139) -> Result<Vec<ColumnInfo>> {
140 match projection {
141 Projection::All(names) => {
142 let mut cols = Vec::with_capacity(names.len());
143 for name in names {
144 let meta = schema.iter().find(|c| &c.name == name).ok_or_else(|| {
145 Error::Sql(alopex_sql::SqlError::Execution {
146 message: format!("column not found: {}", name),
147 code: "ALOPEX-E020",
148 })
149 })?;
150 cols.push(ColumnInfo::new(name.clone(), meta.data_type.clone()));
151 }
152 Ok(cols)
153 }
154 Projection::Columns(cols) => {
155 let mut result = Vec::with_capacity(cols.len());
156 for (i, col) in cols.iter().enumerate() {
157 let name = col
158 .alias
159 .clone()
160 .or_else(|| {
161 if let alopex_sql::planner::typed_expr::TypedExprKind::ColumnRef {
162 column,
163 ..
164 } = &col.expr.kind
165 {
166 Some(column.clone())
167 } else {
168 None
169 }
170 })
171 .unwrap_or_else(|| format!("col_{}", i));
172 result.push(ColumnInfo::new(name, col.expr.resolved_type.clone()));
173 }
174 Ok(result)
175 }
176 }
177}
178
179impl Database {
180 pub fn execute_sql(&self, sql: &str) -> Result<SqlResult> {
198 let stmts = parse_sql(sql)?;
199 if stmts.is_empty() {
200 return Ok(alopex_sql::ExecutionResult::Success);
201 }
202
203 let requires_write = stmts.iter().any(stmt_requires_write);
204 let mode = if requires_write {
205 TxnMode::ReadWrite
206 } else {
207 TxnMode::ReadOnly
208 };
209
210 let mut txn = self.store.begin(mode).map_err(Error::Core)?;
211 let mut overlay = CatalogOverlay::new();
212 let mut borrowed =
213 TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(&mut txn, mode, &mut overlay);
214
215 let mut executor: Executor<_, _> =
216 Executor::new(self.store.clone(), self.sql_catalog.clone());
217
218 let mut last = alopex_sql::ExecutionResult::Success;
219 for stmt in &stmts {
220 let plan = {
221 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
222 let (_, overlay) = borrowed.split_parts();
223 plan_stmt(&*catalog, &*overlay, stmt)?
224 };
225
226 last = executor
227 .execute_in_txn(plan, &mut borrowed)
228 .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
229 }
230
231 drop(borrowed);
232
233 txn.commit_self().map_err(Error::Core)?;
238 if mode == TxnMode::ReadWrite {
239 let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
240 catalog.apply_overlay(overlay);
241 }
242 if requires_write {
243 let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
244 cache.clear();
245 let mut vector_cache = self
246 .vector_cache
247 .write()
248 .expect("vector cache lock poisoned");
249 *vector_cache = None;
250 }
251 Ok(last)
252 }
253
254 pub fn execute_sql_with_rows<F, R>(&self, sql: &str, f: F) -> Result<StreamingQueryResult<R>>
286 where
287 F: FnOnce(StreamingRows<'_>) -> Result<R>,
288 {
289 let stmts = parse_sql(sql)?;
290 if stmts.is_empty() {
291 return Ok(StreamingQueryResult::Success);
292 }
293
294 if stmts.len() == 1 && matches!(stmts[0].kind, StatementKind::Select(_)) {
296 let stmt = &stmts[0];
297 let mode = TxnMode::ReadOnly;
298
299 let mut txn = self.store.begin(mode).map_err(Error::Core)?;
300 let mut overlay = CatalogOverlay::new();
301 let mut borrowed =
302 TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(&mut txn, mode, &mut overlay);
303
304 let plan = {
305 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
306 let (_, overlay_ref) = borrowed.split_parts();
307 plan_stmt(&*catalog, overlay_ref, stmt)?
308 };
309
310 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
311 let (mut sql_txn, overlay_ref) = borrowed.split_parts();
312 let view = TxnCatalogView::new(&*catalog, overlay_ref);
313
314 let (iter, projection, schema) = build_streaming_pipeline(&mut sql_txn, &view, plan)
316 .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
317
318 let columns = build_column_info(&projection, &schema)?;
320
321 let streaming_rows = StreamingRows {
323 columns,
324 iter,
325 projection,
326 schema,
327 };
328
329 let result = f(streaming_rows)?;
331
332 drop(catalog);
334 drop(borrowed);
335 txn.commit_self().map_err(Error::Core)?;
336
337 return Ok(StreamingQueryResult::QueryProcessed(result));
338 }
339
340 let exec_result = self.execute_sql(sql)?;
342 match exec_result {
343 alopex_sql::ExecutionResult::Success => Ok(StreamingQueryResult::Success),
344 alopex_sql::ExecutionResult::RowsAffected(n) => {
345 Ok(StreamingQueryResult::RowsAffected(n))
346 }
347 alopex_sql::ExecutionResult::Query(_qr) => {
348 Err(Error::Sql(alopex_sql::SqlError::Execution {
351 message: "Streaming not available for multi-statement or complex queries"
352 .into(),
353 code: "ALOPEX-E021",
354 }))
355 }
356 }
357 }
358
359 pub fn execute_sql_streaming(&self, sql: &str) -> Result<SqlStreamingResult> {
387 let stmts = parse_sql(sql)?;
388 if stmts.is_empty() {
389 return Ok(SqlStreamingResult::Success);
390 }
391
392 if stmts.len() == 1 && matches!(stmts[0].kind, StatementKind::Select(_)) {
394 let stmt = &stmts[0];
395 let mode = TxnMode::ReadOnly;
396
397 let mut txn = self.store.begin(mode).map_err(Error::Core)?;
398 let mut overlay = CatalogOverlay::new();
399 let mut borrowed =
400 TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(&mut txn, mode, &mut overlay);
401
402 let plan = {
403 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
404 let (_, overlay) = borrowed.split_parts();
405 plan_stmt(&*catalog, &*overlay, stmt)?
406 };
407
408 let (mut sql_txn, _overlay) = borrowed.split_parts();
409
410 let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
411 let view = TxnCatalogView::new(&*catalog, _overlay);
412 let iter = execute_query_streaming(&mut sql_txn, &view, plan)
413 .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
414
415 drop(catalog);
416 drop(borrowed);
417
418 txn.commit_self().map_err(Error::Core)?;
419
420 return Ok(SqlStreamingResult::Query(iter));
421 }
422
423 let result = self.execute_sql(sql)?;
425 match result {
426 alopex_sql::ExecutionResult::Success => Ok(SqlStreamingResult::Success),
427 alopex_sql::ExecutionResult::RowsAffected(n) => Ok(SqlStreamingResult::RowsAffected(n)),
428 alopex_sql::ExecutionResult::Query(qr) => {
429 use alopex_sql::executor::query::iterator::VecIterator;
431 use alopex_sql::executor::Row;
432 use alopex_sql::planner::typed_expr::Projection;
433
434 let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
435 let schema: Vec<alopex_sql::catalog::ColumnMetadata> = qr
436 .columns
437 .iter()
438 .map(|c| alopex_sql::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
439 .collect();
440 let rows: Vec<Row> = qr
441 .rows
442 .into_iter()
443 .enumerate()
444 .map(|(i, values)| Row::new(i as u64, values))
445 .collect();
446 let iter = VecIterator::new(rows, schema.clone());
447 let query_iter =
448 QueryRowIterator::new(Box::new(iter), Projection::All(column_names), schema);
449 Ok(SqlStreamingResult::Query(query_iter))
450 }
451 }
452 }
453}
454
455impl<'a> Transaction<'a> {
456 pub fn execute_sql(&mut self, sql: &str) -> Result<SqlResult> {
473 let stmts = parse_sql(sql)?;
474 if stmts.is_empty() {
475 return Ok(alopex_sql::ExecutionResult::Success);
476 }
477
478 if stmts.iter().any(stmt_requires_write) {
479 let mut cache = self
480 .db
481 .hnsw_cache
482 .write()
483 .expect("hnsw cache lock poisoned");
484 cache.clear();
485 let mut vector_cache = self
486 .db
487 .vector_cache
488 .write()
489 .expect("vector cache lock poisoned");
490 *vector_cache = None;
491 }
492
493 let store = self.db.store.clone();
494 let sql_catalog = self.db.sql_catalog.clone();
495
496 let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
497 let mode = txn.mode();
498
499 let mut borrowed =
500 TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(txn, mode, &mut self.overlay);
501 let mut executor: Executor<_, _> = Executor::new(store, sql_catalog.clone());
502
503 let mut last = alopex_sql::ExecutionResult::Success;
504 for stmt in &stmts {
505 let plan = {
506 let catalog = sql_catalog.read().expect("catalog lock poisoned");
507 let (_, overlay) = borrowed.split_parts();
508 plan_stmt(&*catalog, &*overlay, stmt)?
509 };
510
511 last = executor
512 .execute_in_txn(plan, &mut borrowed)
513 .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
514 }
515
516 Ok(last)
517 }
518}