alopex_embedded/
sql_api.rs

1use alopex_core::kv::KVStore;
2use alopex_core::types::TxnMode;
3use alopex_core::KVTransaction;
4use alopex_sql::catalog::CatalogOverlay;
5use alopex_sql::catalog::TxnCatalogView;
6use alopex_sql::executor::query::execute_query_streaming;
7use alopex_sql::executor::query::RowIterator;
8use alopex_sql::executor::{build_streaming_pipeline, ColumnInfo, Executor, QueryRowIterator, Row};
9use alopex_sql::planner::typed_expr::Projection;
10use alopex_sql::storage::{SqlValue, TxnBridge};
11use alopex_sql::AlopexDialect;
12use alopex_sql::Parser;
13use alopex_sql::Planner;
14use alopex_sql::Statement;
15use alopex_sql::StatementKind;
16
17use crate::Database;
18use crate::Error;
19use crate::Result;
20use crate::SqlResult;
21use crate::Transaction;
22
23/// Streaming row access for FR-7 compliance.
24///
25/// This struct provides access to query results in a streaming fashion,
26/// where the transaction is kept alive for the duration of row iteration.
27/// The lifetime `'a` is tied to the transaction scope.
28pub struct StreamingRows<'a> {
29    columns: Vec<ColumnInfo>,
30    iter: Box<dyn RowIterator + 'a>,
31    projection: Projection,
32    schema: Vec<alopex_sql::catalog::ColumnMetadata>,
33}
34
35impl<'a> StreamingRows<'a> {
36    /// Get column information for the query result.
37    pub fn columns(&self) -> &[ColumnInfo] {
38        &self.columns
39    }
40
41    /// Fetch the next row, returning `None` when exhausted.
42    ///
43    /// Rows are fetched on-demand from storage, enabling true streaming.
44    pub fn next_row(&mut self) -> Result<Option<Vec<SqlValue>>> {
45        match self.iter.next_row() {
46            Some(result) => {
47                let row = result.map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
48                let projected = self.project_row(&row)?;
49                Ok(Some(projected))
50            }
51            None => Ok(None),
52        }
53    }
54
55    /// Apply projection to a row.
56    fn project_row(&self, row: &Row) -> Result<Vec<SqlValue>> {
57        match &self.projection {
58            Projection::All(names) => {
59                // Return values in the order specified by names
60                let mut result = Vec::with_capacity(names.len());
61                for name in names {
62                    let idx = self
63                        .schema
64                        .iter()
65                        .position(|c| &c.name == name)
66                        .ok_or_else(|| {
67                            Error::Sql(alopex_sql::SqlError::Execution {
68                                message: format!("column not found: {}", name),
69                                code: "ALOPEX-E020",
70                            })
71                        })?;
72                    result.push(row.values.get(idx).cloned().unwrap_or(SqlValue::Null));
73                }
74                Ok(result)
75            }
76            Projection::Columns(cols) => {
77                use alopex_sql::executor::evaluator::{evaluate, EvalContext};
78                let ctx = EvalContext::new(&row.values);
79                let mut result = Vec::with_capacity(cols.len());
80                for col in cols {
81                    let value = evaluate(&col.expr, &ctx)
82                        .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
83                    result.push(value);
84                }
85                Ok(result)
86            }
87        }
88    }
89}
90
91/// Result type for callback-based streaming query.
92pub enum StreamingQueryResult<R> {
93    /// DDL operation success.
94    Success,
95    /// DML operation with affected row count.
96    RowsAffected(u64),
97    /// Query result processed by callback.
98    QueryProcessed(R),
99}
100
101/// Streaming SQL execution result for FR-7 compliance.
102///
103/// This enum enables true streaming output for SELECT queries by returning
104/// an iterator instead of a materialized Vec.
105pub enum SqlStreamingResult {
106    /// DDL operation success (CREATE/DROP TABLE/INDEX).
107    Success,
108    /// DML operation success with affected row count.
109    RowsAffected(u64),
110    /// Query result with streaming row iterator.
111    Query(QueryRowIterator<'static>),
112}
113
114fn parse_sql(sql: &str) -> Result<Vec<Statement>> {
115    let dialect = AlopexDialect;
116    Parser::parse_sql(&dialect, sql).map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))
117}
118
119fn stmt_requires_write(stmt: &Statement) -> bool {
120    !matches!(stmt.kind, StatementKind::Select(_))
121}
122
123fn plan_stmt<'a, S: KVStore>(
124    catalog: &'a alopex_sql::catalog::PersistentCatalog<S>,
125    overlay: &'a CatalogOverlay,
126    stmt: &Statement,
127) -> Result<alopex_sql::LogicalPlan> {
128    let view = TxnCatalogView::new(catalog, overlay);
129    let planner = Planner::new(&view);
130    planner
131        .plan(stmt)
132        .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))
133}
134
135/// Build column info from projection and schema.
136fn build_column_info(
137    projection: &Projection,
138    schema: &[alopex_sql::catalog::ColumnMetadata],
139) -> Result<Vec<ColumnInfo>> {
140    match projection {
141        Projection::All(names) => {
142            let mut cols = Vec::with_capacity(names.len());
143            for name in names {
144                let meta = schema.iter().find(|c| &c.name == name).ok_or_else(|| {
145                    Error::Sql(alopex_sql::SqlError::Execution {
146                        message: format!("column not found: {}", name),
147                        code: "ALOPEX-E020",
148                    })
149                })?;
150                cols.push(ColumnInfo::new(name.clone(), meta.data_type.clone()));
151            }
152            Ok(cols)
153        }
154        Projection::Columns(cols) => {
155            let mut result = Vec::with_capacity(cols.len());
156            for (i, col) in cols.iter().enumerate() {
157                let name = col
158                    .alias
159                    .clone()
160                    .or_else(|| {
161                        if let alopex_sql::planner::typed_expr::TypedExprKind::ColumnRef {
162                            column,
163                            ..
164                        } = &col.expr.kind
165                        {
166                            Some(column.clone())
167                        } else {
168                            None
169                        }
170                    })
171                    .unwrap_or_else(|| format!("col_{}", i));
172                result.push(ColumnInfo::new(name, col.expr.resolved_type.clone()));
173            }
174            Ok(result)
175        }
176    }
177}
178
179impl Database {
180    /// SQL を実行する(auto-commit)。
181    ///
182    /// - DDL/DML は ReadWrite トランザクションで実行し、成功時に自動コミットする。
183    /// - SELECT は ReadOnly トランザクションで実行する。
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use alopex_embedded::Database;
189    /// use alopex_sql::ExecutionResult;
190    ///
191    /// let db = Database::new();
192    /// let result = db.execute_sql(
193    ///     "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
194    /// ).unwrap();
195    /// assert!(matches!(result, ExecutionResult::Success));
196    /// ```
197    pub fn execute_sql(&self, sql: &str) -> Result<SqlResult> {
198        let stmts = parse_sql(sql)?;
199        if stmts.is_empty() {
200            return Ok(alopex_sql::ExecutionResult::Success);
201        }
202
203        let requires_write = stmts.iter().any(stmt_requires_write);
204        let mode = if requires_write {
205            TxnMode::ReadWrite
206        } else {
207            TxnMode::ReadOnly
208        };
209
210        let mut txn = self.store.begin(mode).map_err(Error::Core)?;
211        let mut overlay = CatalogOverlay::new();
212        let mut borrowed =
213            TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(&mut txn, mode, &mut overlay);
214
215        let mut executor: Executor<_, _> =
216            Executor::new(self.store.clone(), self.sql_catalog.clone());
217
218        let mut last = alopex_sql::ExecutionResult::Success;
219        for stmt in &stmts {
220            let plan = {
221                let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
222                let (_, overlay) = borrowed.split_parts();
223                plan_stmt(&*catalog, &*overlay, stmt)?
224            };
225
226            last = executor
227                .execute_in_txn(plan, &mut borrowed)
228                .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
229        }
230
231        drop(borrowed);
232
233        // `execute_in_txn()` 成功時に HNSW flush 済み(失敗時は abandon 済み)なので、
234        // ここでは KV commit と overlay 適用のみを行う。
235        //
236        // commit_self は `txn` を消費するため、失敗時に rollback はできない。
237        txn.commit_self().map_err(Error::Core)?;
238        if mode == TxnMode::ReadWrite {
239            let mut catalog = self.sql_catalog.write().expect("catalog lock poisoned");
240            catalog.apply_overlay(overlay);
241        }
242        if requires_write {
243            let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
244            cache.clear();
245            let mut vector_cache = self
246                .vector_cache
247                .write()
248                .expect("vector cache lock poisoned");
249            *vector_cache = None;
250        }
251        Ok(last)
252    }
253
254    /// Execute SQL with callback-based streaming for SELECT queries (FR-7).
255    ///
256    /// This method provides true streaming by keeping the transaction alive
257    /// during row iteration. The callback receives a `StreamingRows` that
258    /// yields rows on-demand from storage.
259    ///
260    /// # Type Parameters
261    ///
262    /// * `F` - Callback function that processes the streaming rows
263    /// * `R` - Return type from the callback
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use alopex_embedded::Database;
269    ///
270    /// let db = Database::new();
271    /// db.execute_sql("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);").unwrap();
272    /// db.execute_sql("INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob');").unwrap();
273    ///
274    /// // Process rows with streaming - transaction stays alive during callback
275    /// let result = db.execute_sql_with_rows("SELECT * FROM users;", |mut rows| {
276    ///     let mut names = Vec::new();
277    ///     while let Ok(Some(row)) = rows.next_row() {
278    ///         if let Some(alopex_sql::storage::SqlValue::Text(name)) = row.get(1) {
279    ///             names.push(name.clone());
280    ///         }
281    ///     }
282    ///     Ok(names)
283    /// }).unwrap();
284    /// ```
285    pub fn execute_sql_with_rows<F, R>(&self, sql: &str, f: F) -> Result<StreamingQueryResult<R>>
286    where
287        F: FnOnce(StreamingRows<'_>) -> Result<R>,
288    {
289        let stmts = parse_sql(sql)?;
290        if stmts.is_empty() {
291            return Ok(StreamingQueryResult::Success);
292        }
293
294        // For streaming SELECT, use the new pipeline
295        if stmts.len() == 1 && matches!(stmts[0].kind, StatementKind::Select(_)) {
296            let stmt = &stmts[0];
297            let mode = TxnMode::ReadOnly;
298
299            let mut txn = self.store.begin(mode).map_err(Error::Core)?;
300            let mut overlay = CatalogOverlay::new();
301            let mut borrowed =
302                TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(&mut txn, mode, &mut overlay);
303
304            let plan = {
305                let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
306                let (_, overlay_ref) = borrowed.split_parts();
307                plan_stmt(&*catalog, overlay_ref, stmt)?
308            };
309
310            let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
311            let (mut sql_txn, overlay_ref) = borrowed.split_parts();
312            let view = TxnCatalogView::new(&*catalog, overlay_ref);
313
314            // Build streaming pipeline - iterator lifetime tied to sql_txn
315            let (iter, projection, schema) = build_streaming_pipeline(&mut sql_txn, &view, plan)
316                .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
317
318            // Build column info from projection and schema
319            let columns = build_column_info(&projection, &schema)?;
320
321            // Create StreamingRows and pass to callback
322            let streaming_rows = StreamingRows {
323                columns,
324                iter,
325                projection,
326                schema,
327            };
328
329            // Execute callback with streaming rows
330            let result = f(streaming_rows)?;
331
332            // Clean up and commit after callback completes
333            drop(catalog);
334            drop(borrowed);
335            txn.commit_self().map_err(Error::Core)?;
336
337            return Ok(StreamingQueryResult::QueryProcessed(result));
338        }
339
340        // Fall back to standard execution for non-SELECT or multi-statement
341        let exec_result = self.execute_sql(sql)?;
342        match exec_result {
343            alopex_sql::ExecutionResult::Success => Ok(StreamingQueryResult::Success),
344            alopex_sql::ExecutionResult::RowsAffected(n) => {
345                Ok(StreamingQueryResult::RowsAffected(n))
346            }
347            alopex_sql::ExecutionResult::Query(_qr) => {
348                // For non-streaming path, we can't provide true streaming
349                // Return an error indicating streaming is not available
350                Err(Error::Sql(alopex_sql::SqlError::Execution {
351                    message: "Streaming not available for multi-statement or complex queries"
352                        .into(),
353                    code: "ALOPEX-E021",
354                }))
355            }
356        }
357    }
358
359    /// Execute SQL and return a streaming result for SELECT queries (FR-7).
360    ///
361    /// This method returns a `SqlStreamingResult` that contains an iterator
362    /// for query results, enabling true streaming output without materializing
363    /// all rows upfront.
364    ///
365    /// # Note
366    ///
367    /// Only single SELECT statements are supported for streaming. Multi-statement
368    /// SQL or non-SELECT statements fall back to the standard execution path.
369    ///
370    /// # Examples
371    ///
372    /// ```
373    /// use alopex_embedded::{Database, SqlStreamingResult};
374    ///
375    /// let db = Database::new();
376    /// db.execute_sql("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);").unwrap();
377    /// db.execute_sql("INSERT INTO users (id, name) VALUES (1, 'Alice');").unwrap();
378    ///
379    /// let result = db.execute_sql_streaming("SELECT * FROM users;").unwrap();
380    /// if let SqlStreamingResult::Query(mut iter) = result {
381    ///     while let Ok(Some(row)) = iter.next_row() {
382    ///         println!("{:?}", row);
383    ///     }
384    /// }
385    /// ```
386    pub fn execute_sql_streaming(&self, sql: &str) -> Result<SqlStreamingResult> {
387        let stmts = parse_sql(sql)?;
388        if stmts.is_empty() {
389            return Ok(SqlStreamingResult::Success);
390        }
391
392        // For streaming, only support single SELECT statement
393        if stmts.len() == 1 && matches!(stmts[0].kind, StatementKind::Select(_)) {
394            let stmt = &stmts[0];
395            let mode = TxnMode::ReadOnly;
396
397            let mut txn = self.store.begin(mode).map_err(Error::Core)?;
398            let mut overlay = CatalogOverlay::new();
399            let mut borrowed =
400                TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(&mut txn, mode, &mut overlay);
401
402            let plan = {
403                let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
404                let (_, overlay) = borrowed.split_parts();
405                plan_stmt(&*catalog, &*overlay, stmt)?
406            };
407
408            let (mut sql_txn, _overlay) = borrowed.split_parts();
409
410            let catalog = self.sql_catalog.read().expect("catalog lock poisoned");
411            let view = TxnCatalogView::new(&*catalog, _overlay);
412            let iter = execute_query_streaming(&mut sql_txn, &view, plan)
413                .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
414
415            drop(catalog);
416            drop(borrowed);
417
418            txn.commit_self().map_err(Error::Core)?;
419
420            return Ok(SqlStreamingResult::Query(iter));
421        }
422
423        // Fall back to standard execution for non-streaming cases
424        let result = self.execute_sql(sql)?;
425        match result {
426            alopex_sql::ExecutionResult::Success => Ok(SqlStreamingResult::Success),
427            alopex_sql::ExecutionResult::RowsAffected(n) => Ok(SqlStreamingResult::RowsAffected(n)),
428            alopex_sql::ExecutionResult::Query(qr) => {
429                // Convert materialized result to streaming iterator
430                use alopex_sql::executor::query::iterator::VecIterator;
431                use alopex_sql::executor::Row;
432                use alopex_sql::planner::typed_expr::Projection;
433
434                let column_names: Vec<String> = qr.columns.iter().map(|c| c.name.clone()).collect();
435                let schema: Vec<alopex_sql::catalog::ColumnMetadata> = qr
436                    .columns
437                    .iter()
438                    .map(|c| alopex_sql::catalog::ColumnMetadata::new(&c.name, c.data_type.clone()))
439                    .collect();
440                let rows: Vec<Row> = qr
441                    .rows
442                    .into_iter()
443                    .enumerate()
444                    .map(|(i, values)| Row::new(i as u64, values))
445                    .collect();
446                let iter = VecIterator::new(rows, schema.clone());
447                let query_iter =
448                    QueryRowIterator::new(Box::new(iter), Projection::All(column_names), schema);
449                Ok(SqlStreamingResult::Query(query_iter))
450            }
451        }
452    }
453}
454
455impl<'a> Transaction<'a> {
456    /// SQL を実行する(外部トランザクション利用)。
457    ///
458    /// 同一トランザクション内の複数回呼び出しでカタログ変更が見えるよう、`CatalogOverlay` は
459    /// `Transaction` が所有して保持する。
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// use alopex_embedded::{Database, TxnMode};
465    ///
466    /// let db = Database::new();
467    /// let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
468    /// txn.execute_sql("CREATE TABLE t (id INTEGER PRIMARY KEY);").unwrap();
469    /// txn.execute_sql("INSERT INTO t (id) VALUES (1);").unwrap();
470    /// txn.commit().unwrap();
471    /// ```
472    pub fn execute_sql(&mut self, sql: &str) -> Result<SqlResult> {
473        let stmts = parse_sql(sql)?;
474        if stmts.is_empty() {
475            return Ok(alopex_sql::ExecutionResult::Success);
476        }
477
478        if stmts.iter().any(stmt_requires_write) {
479            let mut cache = self
480                .db
481                .hnsw_cache
482                .write()
483                .expect("hnsw cache lock poisoned");
484            cache.clear();
485            let mut vector_cache = self
486                .db
487                .vector_cache
488                .write()
489                .expect("vector cache lock poisoned");
490            *vector_cache = None;
491        }
492
493        let store = self.db.store.clone();
494        let sql_catalog = self.db.sql_catalog.clone();
495
496        let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
497        let mode = txn.mode();
498
499        let mut borrowed =
500            TxnBridge::<alopex_core::kv::AnyKV>::wrap_external(txn, mode, &mut self.overlay);
501        let mut executor: Executor<_, _> = Executor::new(store, sql_catalog.clone());
502
503        let mut last = alopex_sql::ExecutionResult::Success;
504        for stmt in &stmts {
505            let plan = {
506                let catalog = sql_catalog.read().expect("catalog lock poisoned");
507                let (_, overlay) = borrowed.split_parts();
508                plan_stmt(&*catalog, &*overlay, stmt)?
509            };
510
511            last = executor
512                .execute_in_txn(plan, &mut borrowed)
513                .map_err(|e| Error::Sql(alopex_sql::SqlError::from(e)))?;
514        }
515
516        Ok(last)
517    }
518}