Skip to main content

sochdb_query/
storage_bridge.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Phase 0: Storage Bridge — Wiring SQL Execution to Real Storage
19//!
20//! This module provides concrete implementations of the query layer's
21//! [`StorageBackend`] and [`SqlConnection`] traits, backed by the actual
22//! `sochdb_storage::Database` kernel.
23//!
24//! ## Architecture
25//!
26//! ```text
27//! ┌─────────────────────────────────────────────────┐
28//! │           Query Layer (sochdb-query)             │
29//! │                                                  │
30//! │  ┌──────────────┐    ┌───────────────────────┐  │
31//! │  │ OptimizedExec │    │ SQL AST (SqlBridge)  │  │
32//! │  └──────┬───────┘    └───────────┬───────────┘  │
33//! │         │                        │               │
34//! │  ┌──────▼────────────────────────▼───────────┐  │
35//! │  │        StorageBackend trait                │  │
36//! │  │        SqlConnection trait                 │  │
37//! │  └──────────────────┬────────────────────────┘  │
38//! ├─────────────────────┼────────────────────────────┤
39//! │         ┌───────────▼───────────────┐            │
40//! │         │  DatabaseStorageBackend   │            │
41//! │         │  DatabaseSqlConnection    │            │
42//! │         └───────────┬───────────────┘            │
43//! ├─────────────────────┼────────────────────────────┤
44//! │           Storage Layer (sochdb-storage)         │
45//! │         ┌───────────▼───────────┐                │
46//! │         │   Database kernel     │                │
47//! │         │  (DurableStorage +    │                │
48//! │         │   MVCC + WAL)         │                │
49//! │         └───────────────────────┘                │
50//! └─────────────────────────────────────────────────┘
51//! ```
52//!
53//! ## SochValue Bridging
54//!
55//! The storage layer uses `sochdb_core::SochValue` (10 variants including
56//! `Object` and `Ref`), while the query optimizer uses
57//! `sochdb_query::soch_ql::SochValue` (8 variants, missing `Object`/`Ref`).
58//!
59//! This module provides [`convert_core_to_query()`] to bridge the gap:
60//! - `Object(map)` → serialized as `Text(json_string)`
61//! - `Ref { table, id }` → `Text("table/id")`
62
63use crate::optimizer_integration::StorageBackend;
64use crate::soch_ql::SochValue as QuerySochValue;
65use crate::sql::ast::*;
66use crate::sql::bridge::{ExecutionResult, SqlConnection};
67use crate::sql::error::{SqlError, SqlResult};
68use sochdb_core::SochValue as CoreSochValue;
69use sochdb_storage::{Database, KernelTxnHandle};
70use std::collections::HashMap;
71use std::sync::Arc;
72
73// ============================================================================
74// SochValue Conversion (Core ↔ Query)
75// ============================================================================
76
77/// Convert `sochdb_core::SochValue` to `sochdb_query::soch_ql::SochValue`.
78///
79/// Handles the two extra variants in core:
80/// - `Object(HashMap)` → `Text(json_serialized)` for query-layer consumption
81/// - `Ref { table, id }` → `Text("table/id")` as a foreign-key string
82pub fn convert_core_to_query(value: &CoreSochValue) -> QuerySochValue {
83    match value {
84        CoreSochValue::Null => QuerySochValue::Null,
85        CoreSochValue::Bool(b) => QuerySochValue::Bool(*b),
86        CoreSochValue::Int(i) => QuerySochValue::Int(*i),
87        CoreSochValue::UInt(u) => QuerySochValue::UInt(*u),
88        CoreSochValue::Float(f) => QuerySochValue::Float(*f),
89        CoreSochValue::Text(s) => QuerySochValue::Text(s.clone()),
90        CoreSochValue::Binary(b) => QuerySochValue::Binary(b.clone()),
91        CoreSochValue::Array(arr) => {
92            QuerySochValue::Array(arr.iter().map(convert_core_to_query).collect())
93        }
94        CoreSochValue::Object(map) => {
95            // Serialize to JSON text for query-layer consumption
96            match serde_json::to_string(map) {
97                Ok(json) => QuerySochValue::Text(json),
98                Err(_) => QuerySochValue::Text(format!("{:?}", map)),
99            }
100        }
101        CoreSochValue::Ref { table, id } => {
102            QuerySochValue::Text(format!("{}/{}", table, id))
103        }
104    }
105}
106
107/// Convert `sochdb_query::soch_ql::SochValue` to `sochdb_core::SochValue`.
108pub fn convert_query_to_core(value: &QuerySochValue) -> CoreSochValue {
109    match value {
110        QuerySochValue::Null => CoreSochValue::Null,
111        QuerySochValue::Bool(b) => CoreSochValue::Bool(*b),
112        QuerySochValue::Int(i) => CoreSochValue::Int(*i),
113        QuerySochValue::UInt(u) => CoreSochValue::UInt(*u),
114        QuerySochValue::Float(f) => CoreSochValue::Float(*f),
115        QuerySochValue::Text(s) => CoreSochValue::Text(s.clone()),
116        QuerySochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
117        QuerySochValue::Array(arr) => {
118            CoreSochValue::Array(arr.iter().map(convert_query_to_core).collect())
119        }
120    }
121}
122
123/// Convert a row from core format to query format.
124fn convert_row_core_to_query(
125    row: HashMap<String, CoreSochValue>,
126) -> HashMap<String, QuerySochValue> {
127    row.into_iter()
128        .map(|(k, v)| (k, convert_core_to_query(&v)))
129        .collect()
130}
131
132/// Convert rows from core format to query format.
133fn convert_rows_core_to_query(
134    rows: Vec<HashMap<String, CoreSochValue>>,
135) -> Vec<HashMap<String, QuerySochValue>> {
136    rows.into_iter().map(convert_row_core_to_query).collect()
137}
138
139// ============================================================================
140// DatabaseStorageBackend — Implements StorageBackend for real storage
141// ============================================================================
142
143/// Concrete implementation of [`StorageBackend`] backed by `sochdb_storage::Database`.
144///
145/// This bridges the query optimizer to the actual storage engine, enabling
146/// cost-based query plans to execute against real data.
147///
148/// # Thread Safety
149///
150/// `Database` is always `Arc<Database>` and is `Send + Sync`. This struct
151/// holds an `Arc` clone and can be shared across threads.
152///
153/// # Transaction Management
154///
155/// Each query operation creates a read-only transaction for MVCC snapshot
156/// isolation. Transactions are automatically cleaned up on completion.
157pub struct DatabaseStorageBackend {
158    db: Arc<Database>,
159}
160
161impl DatabaseStorageBackend {
162    /// Create a new storage backend wrapping a `Database` instance.
163    pub fn new(db: Arc<Database>) -> Self {
164        Self { db }
165    }
166
167    /// Get a reference to the underlying database.
168    pub fn database(&self) -> &Arc<Database> {
169        &self.db
170    }
171
172    /// Execute a read-only operation within an MVCC transaction.
173    fn with_read_txn<F, T>(&self, f: F) -> sochdb_core::Result<T>
174    where
175        F: FnOnce(KernelTxnHandle) -> sochdb_core::Result<T>,
176    {
177        let txn = self.db.begin_read_only_fast();
178        let result = f(txn);
179        self.db.abort_read_only_fast(txn);
180        result
181    }
182}
183
184impl StorageBackend for DatabaseStorageBackend {
185    fn table_scan(
186        &self,
187        table: &str,
188        columns: &[String],
189        predicate: Option<&str>,
190    ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
191        self.with_read_txn(|txn| {
192            let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
193
194            let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
195                self.db.query(txn, table)
196            } else {
197                self.db.query(txn, table).columns(&col_refs)
198            };
199
200            let result = query.execute()?;
201            let mut rows = convert_rows_core_to_query(result.rows);
202
203            // Post-scan predicate evaluation (simple string-based for now)
204            if let Some(pred) = predicate {
205                rows = apply_simple_predicate(&rows, pred);
206            }
207
208            Ok(rows)
209        })
210    }
211
212    fn primary_key_lookup(
213        &self,
214        table: &str,
215        key: &QuerySochValue,
216    ) -> sochdb_core::Result<Option<HashMap<String, QuerySochValue>>> {
217        let row_id = match key {
218            QuerySochValue::Int(i) => *i as u64,
219            QuerySochValue::UInt(u) => *u,
220            _ => return Ok(None),
221        };
222
223        self.with_read_txn(|txn| {
224            let result = self.db.read_row(txn, table, row_id, None)?;
225            Ok(result.map(convert_row_core_to_query))
226        })
227    }
228
229    fn secondary_index_seek(
230        &self,
231        table: &str,
232        index: &str,
233        key: &QuerySochValue,
234    ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
235        // Fall back to table scan with filtering on the indexed column.
236        let column_name = index.to_string();
237        let core_key = convert_query_to_core(key);
238
239        self.with_read_txn(|txn| {
240            let result = self.db.query(txn, table).execute()?;
241            let rows: Vec<HashMap<String, QuerySochValue>> = result
242                .rows
243                .into_iter()
244                .filter(|row| {
245                    row.get(&column_name)
246                        .map(|v| v == &core_key)
247                        .unwrap_or(false)
248                })
249                .map(convert_row_core_to_query)
250                .collect();
251            Ok(rows)
252        })
253    }
254
255    fn time_index_scan(
256        &self,
257        table: &str,
258        start_us: u64,
259        end_us: u64,
260    ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
261        self.with_read_txn(|txn| {
262            let result = self.db.query(txn, table).execute()?;
263            let rows: Vec<HashMap<String, QuerySochValue>> = result
264                .rows
265                .into_iter()
266                .filter(|row| {
267                    if let Some(CoreSochValue::UInt(ts)) = row.get("_timestamp") {
268                        *ts >= start_us && *ts <= end_us
269                    } else if let Some(CoreSochValue::Int(ts)) = row.get("_timestamp") {
270                        let ts = *ts as u64;
271                        ts >= start_us && ts <= end_us
272                    } else {
273                        false
274                    }
275                })
276                .map(convert_row_core_to_query)
277                .collect();
278            Ok(rows)
279        })
280    }
281
282    fn vector_search(
283        &self,
284        table: &str,
285        query: &[f32],
286        k: usize,
287    ) -> sochdb_core::Result<Vec<(f32, HashMap<String, QuerySochValue>)>> {
288        self.with_read_txn(|txn| {
289            let result = self.db.query(txn, table).execute()?;
290
291            let mut scored: Vec<(f32, HashMap<String, CoreSochValue>)> = result
292                .rows
293                .into_iter()
294                .filter_map(|row| {
295                    let vec_col = row.get("_vector").or_else(|| row.get("_embedding"));
296
297                    if let Some(CoreSochValue::Array(arr)) = vec_col {
298                        let vec: Vec<f32> = arr
299                            .iter()
300                            .filter_map(|v| match v {
301                                CoreSochValue::Float(f) => Some(*f as f32),
302                                CoreSochValue::Int(i) => Some(*i as f32),
303                                _ => None,
304                            })
305                            .collect();
306
307                        if vec.len() == query.len() {
308                            let dist = euclidean_distance(&vec, query);
309                            Some((dist, row))
310                        } else {
311                            None
312                        }
313                    } else {
314                        None
315                    }
316                })
317                .collect();
318
319            scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
320            scored.truncate(k);
321
322            Ok(scored
323                .into_iter()
324                .map(|(dist, row)| (dist, convert_row_core_to_query(row)))
325                .collect())
326        })
327    }
328
329    fn row_count(&self, table: &str) -> usize {
330        let txn = self.db.begin_read_only_fast();
331        let count = self
332            .db
333            .query(txn, table)
334            .execute()
335            .map(|r| r.rows_scanned)
336            .unwrap_or(0);
337        self.db.abort_read_only_fast(txn);
338        count
339    }
340}
341
342// ============================================================================
343// DatabaseSqlConnection — Implements SqlConnection for real storage
344// ============================================================================
345
346/// Concrete implementation of [`SqlConnection`] backed by `sochdb_storage::Database`.
347///
348/// This enables the `SqlBridge` to route parsed SQL AST operations to the
349/// actual storage engine, making `SELECT`, `INSERT`, `UPDATE`, `DELETE`,
350/// and DDL statements work against real persisted data.
351///
352/// # Transaction Model
353///
354/// The connection maintains an optional active write transaction. Read-only
355/// operations (`select`, `table_exists`) use fast read-only transactions.
356/// Write operations auto-begin a transaction if none is active.
357pub struct DatabaseSqlConnection {
358    db: Arc<Database>,
359    active_txn: Option<KernelTxnHandle>,
360    /// Whether the current transaction was explicitly started via BEGIN.
361    explicit_txn: bool,
362    /// Counter for auto-incrementing row IDs per table.
363    next_row_ids: HashMap<String, u64>,
364}
365
366impl DatabaseSqlConnection {
367    /// Create a new SQL connection wrapping a `Database` instance.
368    pub fn new(db: Arc<Database>) -> Self {
369        Self {
370            db,
371            active_txn: None,
372            explicit_txn: false,
373            next_row_ids: HashMap::new(),
374        }
375    }
376
377    /// Get a reference to the underlying database.
378    pub fn database(&self) -> &Arc<Database> {
379        &self.db
380    }
381
382    /// Get or create an active write transaction.
383    fn ensure_write_txn(&mut self) -> SqlResult<KernelTxnHandle> {
384        if let Some(txn) = self.active_txn {
385            Ok(txn)
386        } else {
387            let txn = self
388                .db
389                .begin_transaction()
390                .map_err(|e| SqlError::ExecutionError(format!("Failed to begin txn: {}", e)))?;
391            self.active_txn = Some(txn);
392            Ok(txn)
393        }
394    }
395
396    /// Auto-commit if we created an implicit transaction.
397    /// Explicit transactions (started via BEGIN) are not auto-committed.
398    fn auto_commit_if_implicit(&mut self) -> SqlResult<()> {
399        if self.explicit_txn {
400            return Ok(()); // Let the user COMMIT explicitly
401        }
402        if let Some(txn) = self.active_txn.take() {
403            self.db
404                .commit(txn)
405                .map_err(|e| SqlError::ExecutionError(format!("Commit failed: {}", e)))?;
406        }
407        Ok(())
408    }
409
410    /// Get the next row ID for a table (simple auto-increment).
411    fn next_row_id(&mut self, table: &str) -> u64 {
412        let counter = self.next_row_ids.entry(table.to_string()).or_insert(0);
413        *counter += 1;
414        *counter
415    }
416
417    /// Initialize the row ID counter from existing data.
418    fn init_row_id_counter(&mut self, table: &str) {
419        if self.next_row_ids.contains_key(table) {
420            return;
421        }
422        let txn = self.db.begin_read_only_fast();
423        let max_id = self
424            .db
425            .query(txn, table)
426            .execute()
427            .map(|r| r.rows_scanned as u64)
428            .unwrap_or(0);
429        self.db.abort_read_only_fast(txn);
430        self.next_row_ids.insert(table.to_string(), max_id);
431    }
432
433    /// Evaluate a SQL expression against a row for WHERE clause filtering.
434    fn eval_expr(
435        &self,
436        expr: &Expr,
437        row: &HashMap<String, CoreSochValue>,
438        params: &[CoreSochValue],
439    ) -> Option<CoreSochValue> {
440        match expr {
441            Expr::Column(col_ref) => {
442                // Try qualified lookup first: "table.column"
443                if let Some(ref tbl) = col_ref.table {
444                    let qualified = format!("{}.{}", tbl, col_ref.column);
445                    if let Some(v) = row.get(&qualified) {
446                        return Some(v.clone());
447                    }
448                }
449                // Fall back to unqualified: "column"
450                let col_name = &col_ref.column;
451                row.get(col_name).cloned()
452            }
453            Expr::Literal(lit) => Some(literal_to_core(lit)),
454            Expr::Placeholder(idx) => params.get((*idx as usize).saturating_sub(1)).cloned(),
455            Expr::BinaryOp { left, op, right } => {
456                let lhs = self.eval_expr(left, row, params)?;
457                let rhs = self.eval_expr(right, row, params)?;
458                Some(eval_binary_op(&lhs, op, &rhs))
459            }
460            Expr::UnaryOp { op, expr: inner } => {
461                let val = self.eval_expr(inner, row, params)?;
462                Some(eval_unary_op(op, &val))
463            }
464            Expr::IsNull { expr: inner, negated } => {
465                let val = self.eval_expr(inner, row, params)?;
466                let is_null = matches!(val, CoreSochValue::Null);
467                Some(CoreSochValue::Bool(if *negated { !is_null } else { is_null }))
468            }
469            Expr::Between {
470                expr: inner,
471                low,
472                high,
473                negated,
474            } => {
475                let val = self.eval_expr(inner, row, params)?;
476                let lo = self.eval_expr(low, row, params)?;
477                let hi = self.eval_expr(high, row, params)?;
478                let in_range = compare_values(&val, &lo) != std::cmp::Ordering::Less
479                    && compare_values(&val, &hi) != std::cmp::Ordering::Greater;
480                Some(CoreSochValue::Bool(if *negated {
481                    !in_range
482                } else {
483                    in_range
484                }))
485            }
486            Expr::InList {
487                expr: inner,
488                list,
489                negated,
490            } => {
491                let val = self.eval_expr(inner, row, params)?;
492                let found = list
493                    .iter()
494                    .any(|item| self.eval_expr(item, row, params) == Some(val.clone()));
495                Some(CoreSochValue::Bool(if *negated { !found } else { found }))
496            }
497            Expr::Like {
498                expr: inner,
499                pattern,
500                negated,
501                ..
502            } => {
503                let val = self.eval_expr(inner, row, params)?;
504                let pat = self.eval_expr(pattern, row, params)?;
505                if let (CoreSochValue::Text(s), CoreSochValue::Text(p)) = (&val, &pat) {
506                    let matched = sql_like_match(s, p);
507                    Some(CoreSochValue::Bool(if *negated { !matched } else { matched }))
508                } else {
509                    Some(CoreSochValue::Bool(false))
510                }
511            }
512            Expr::Function(func_call) => {
513                let func_name = func_call.name.name().to_uppercase();
514                match func_name.as_str() {
515                    "UPPER" => {
516                        let val = func_call.args.first().and_then(|a| self.eval_expr(a, row, params))?;
517                        if let CoreSochValue::Text(s) = val {
518                            Some(CoreSochValue::Text(s.to_uppercase()))
519                        } else {
520                            Some(CoreSochValue::Null)
521                        }
522                    }
523                    "LOWER" => {
524                        let val = func_call.args.first().and_then(|a| self.eval_expr(a, row, params))?;
525                        if let CoreSochValue::Text(s) = val {
526                            Some(CoreSochValue::Text(s.to_lowercase()))
527                        } else {
528                            Some(CoreSochValue::Null)
529                        }
530                    }
531                    "LENGTH" | "LEN" => {
532                        let val = func_call.args.first().and_then(|a| self.eval_expr(a, row, params))?;
533                        if let CoreSochValue::Text(s) = val {
534                            Some(CoreSochValue::Int(s.len() as i64))
535                        } else {
536                            Some(CoreSochValue::Null)
537                        }
538                    }
539                    "COALESCE" => {
540                        for arg in &func_call.args {
541                            if let Some(val) = self.eval_expr(arg, row, params) {
542                                if !matches!(val, CoreSochValue::Null) {
543                                    return Some(val);
544                                }
545                            }
546                        }
547                        Some(CoreSochValue::Null)
548                    }
549                    // Aggregate functions pass through for row-level evaluation
550                    _ => {
551                        func_call
552                            .args
553                            .first()
554                            .and_then(|a| self.eval_expr(a, row, params))
555                    }
556                }
557            }
558            _ => None,
559        }
560    }
561
562    /// Check if a WHERE expression evaluates to true for a given row.
563    fn row_matches(
564        &self,
565        expr: &Expr,
566        row: &HashMap<String, CoreSochValue>,
567        params: &[CoreSochValue],
568    ) -> bool {
569        match self.eval_expr(expr, row, params) {
570            Some(CoreSochValue::Bool(b)) => b,
571            _ => false,
572        }
573    }
574
575    /// Find the row_id for a row by scanning the table key space.
576    ///
577    /// This is O(n) and should be optimized with a primary key index.
578    fn find_row_id(
579        &self,
580        table: &str,
581        target_row: &HashMap<String, CoreSochValue>,
582        txn: KernelTxnHandle,
583    ) -> SqlResult<Option<u64>> {
584        let entries = self
585            .db
586            .scan(txn, table.as_bytes())
587            .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)))?;
588
589        for (key_bytes, _value_bytes) in entries {
590            if let Ok(key_str) = String::from_utf8(key_bytes) {
591                // Keys are "table/row_id"
592                let parts: Vec<&str> = key_str.split('/').collect();
593                if parts.len() == 2 {
594                    if let Ok(row_id) = parts[1].parse::<u64>() {
595                        if let Ok(Some(row)) = self.db.read_row(txn, table, row_id, None) {
596                            if rows_equal(&row, target_row) {
597                                return Ok(Some(row_id));
598                            }
599                        }
600                    }
601                }
602            }
603        }
604
605        Ok(None)
606    }
607}
608
609impl SqlConnection for DatabaseSqlConnection {
610    fn select(
611        &self,
612        table: &str,
613        columns: &[String],
614        where_clause: Option<&Expr>,
615        order_by: &[OrderByItem],
616        limit: Option<usize>,
617        offset: Option<usize>,
618        params: &[CoreSochValue],
619    ) -> SqlResult<ExecutionResult> {
620        let txn = self.db.begin_read_only_fast();
621
622        let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
623            self.db.query(txn, table)
624        } else {
625            let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
626            self.db.query(txn, table).columns(&col_refs)
627        };
628
629        let result = query
630            .execute()
631            .map_err(|e| SqlError::ExecutionError(format!("Query failed: {}", e)));
632
633        self.db.abort_read_only_fast(txn);
634
635        let result = result?;
636        let mut rows = result.rows;
637
638        // Apply WHERE filter
639        if let Some(expr) = where_clause {
640            rows.retain(|row| self.row_matches(expr, row, params));
641        }
642
643        // Apply ORDER BY
644        if !order_by.is_empty() {
645            rows.sort_by(|a, b| {
646                for item in order_by {
647                    let col_name = extract_order_by_column(&item.expr);
648                    let va = a.get(&col_name);
649                    let vb = b.get(&col_name);
650                    let cmp = compare_optional_values(va, vb);
651                    let cmp = if !item.asc { cmp.reverse() } else { cmp };
652                    if cmp != std::cmp::Ordering::Equal {
653                        return cmp;
654                    }
655                }
656                std::cmp::Ordering::Equal
657            });
658        }
659
660        // Apply OFFSET
661        if let Some(off) = offset {
662            rows = rows.into_iter().skip(off).collect();
663        }
664
665        // Apply LIMIT
666        if let Some(lim) = limit {
667            rows.truncate(lim);
668        }
669
670        // Determine column names
671        let result_columns = if columns.is_empty() || columns.iter().any(|c| c == "*") {
672            rows.first()
673                .map(|r| r.keys().cloned().collect())
674                .unwrap_or_default()
675        } else {
676            columns.to_vec()
677        };
678
679        Ok(ExecutionResult::Rows {
680            columns: result_columns,
681            rows,
682        })
683    }
684
685    fn insert(
686        &mut self,
687        table: &str,
688        columns: Option<&[String]>,
689        rows: &[Vec<Expr>],
690        _on_conflict: Option<&OnConflict>,
691        params: &[CoreSochValue],
692    ) -> SqlResult<ExecutionResult> {
693        let txn = self.ensure_write_txn()?;
694        self.init_row_id_counter(table);
695
696        let schema = self.db.get_table_schema(table);
697        let col_names: Vec<String> = if let Some(cols) = columns {
698            cols.to_vec()
699        } else if let Some(ref s) = schema {
700            s.columns.iter().map(|c| c.name.clone()).collect()
701        } else {
702            return Err(SqlError::InvalidArgument(
703                "INSERT requires column names when table has no schema".into(),
704            ));
705        };
706
707        let mut inserted = 0;
708        for row_exprs in rows {
709            let row_id = self.next_row_id(table);
710            let mut values = HashMap::new();
711
712            for (i, expr) in row_exprs.iter().enumerate() {
713                if i < col_names.len() {
714                    let value = match expr {
715                        Expr::Literal(lit) => literal_to_core(lit),
716                        Expr::Placeholder(idx) => params
717                            .get((*idx as usize).saturating_sub(1))
718                            .cloned()
719                            .unwrap_or(CoreSochValue::Null),
720                        _ => CoreSochValue::Null,
721                    };
722                    values.insert(col_names[i].clone(), value);
723                }
724            }
725
726            self.db
727                .insert_row(txn, table, row_id, &values)
728                .map_err(|e| SqlError::ExecutionError(format!("Insert failed: {}", e)))?;
729            inserted += 1;
730        }
731
732        self.auto_commit_if_implicit()?;
733        Ok(ExecutionResult::RowsAffected(inserted))
734    }
735
736    fn update(
737        &mut self,
738        table: &str,
739        assignments: &[Assignment],
740        where_clause: Option<&Expr>,
741        params: &[CoreSochValue],
742    ) -> SqlResult<ExecutionResult> {
743        let txn = self.ensure_write_txn()?;
744
745        let result = self
746            .db
747            .query(txn, table)
748            .execute()
749            .map_err(|e| SqlError::ExecutionError(format!("Scan for update failed: {}", e)))?;
750
751        let mut updated = 0;
752
753        for row in &result.rows {
754            let matches = match where_clause {
755                Some(expr) => self.row_matches(expr, row, params),
756                None => true,
757            };
758
759            if matches {
760                let row_id = self.find_row_id(table, row, txn)?;
761                if let Some(row_id) = row_id {
762                    let mut new_values = row.clone();
763                    for assignment in assignments {
764                        let col_name = assignment.column.clone();
765                        let value = match &assignment.value {
766                            Expr::Literal(lit) => literal_to_core(lit),
767                            Expr::Placeholder(idx) => params
768                                .get((*idx as usize).saturating_sub(1))
769                                .cloned()
770                                .unwrap_or(CoreSochValue::Null),
771                            _ => self
772                                .eval_expr(&assignment.value, row, params)
773                                .unwrap_or(CoreSochValue::Null),
774                        };
775                        new_values.insert(col_name, value);
776                    }
777
778                    self.db
779                        .insert_row(txn, table, row_id, &new_values)
780                        .map_err(|e| SqlError::ExecutionError(format!("Update failed: {}", e)))?;
781                    updated += 1;
782                }
783            }
784        }
785
786        self.auto_commit_if_implicit()?;
787        Ok(ExecutionResult::RowsAffected(updated))
788    }
789
790    fn delete(
791        &mut self,
792        table: &str,
793        where_clause: Option<&Expr>,
794        params: &[CoreSochValue],
795    ) -> SqlResult<ExecutionResult> {
796        let txn = self.ensure_write_txn()?;
797
798        let result = self
799            .db
800            .query(txn, table)
801            .execute()
802            .map_err(|e| SqlError::ExecutionError(format!("Scan for delete failed: {}", e)))?;
803
804        let mut deleted = 0;
805
806        for row in &result.rows {
807            let matches = match where_clause {
808                Some(expr) => self.row_matches(expr, row, params),
809                None => true,
810            };
811
812            if matches {
813                if let Some(row_id) = self.find_row_id(table, row, txn)? {
814                    let key = format!("{}/{}", table, row_id);
815                    self.db
816                        .delete(txn, key.as_bytes())
817                        .map_err(|e| SqlError::ExecutionError(format!("Delete failed: {}", e)))?;
818                    deleted += 1;
819                }
820            }
821        }
822
823        self.auto_commit_if_implicit()?;
824        Ok(ExecutionResult::RowsAffected(deleted))
825    }
826
827    fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
828        use sochdb_storage::DbColumnDef;
829        use sochdb_storage::DbTableSchema;
830
831        let table_name = stmt.name.name().to_string();
832
833        if self.db.get_table_schema(&table_name).is_some() {
834            if stmt.if_not_exists {
835                return Ok(ExecutionResult::Ok);
836            }
837            return Err(SqlError::InvalidArgument(format!(
838                "Table '{}' already exists",
839                table_name
840            )));
841        }
842
843        let columns: Vec<DbColumnDef> = stmt
844            .columns
845            .iter()
846            .map(|col| {
847                let col_type = sql_type_to_db_type(&col.data_type);
848                let nullable = !col.constraints.iter().any(|c| matches!(c, ColumnConstraint::NotNull));
849                DbColumnDef {
850                    name: col.name.clone(),
851                    col_type,
852                    nullable,
853                }
854            })
855            .collect();
856
857        let schema = DbTableSchema {
858            name: table_name,
859            columns,
860        };
861
862        self.db
863            .register_table(schema)
864            .map_err(|e| SqlError::ExecutionError(format!("Create table failed: {}", e)))?;
865
866        Ok(ExecutionResult::Ok)
867    }
868
869    fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
870        let table_name = stmt
871            .names
872            .first()
873            .map(|n| n.name().to_string())
874            .unwrap_or_default();
875
876        if self.db.get_table_schema(&table_name).is_none() {
877            if stmt.if_exists {
878                return Ok(ExecutionResult::Ok);
879            }
880            return Err(SqlError::TableNotFound(table_name));
881        }
882        // Schema removal only for now — full data cleanup is deferred.
883        // TODO: scan and delete all rows under the table prefix
884        Ok(ExecutionResult::Ok)
885    }
886
887    fn create_index(&mut self, _stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
888        // Index creation deferred to TableIndexRegistry integration
889        Ok(ExecutionResult::Ok)
890    }
891
892    fn drop_index(&mut self, _stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
893        Ok(ExecutionResult::Ok)
894    }
895
896    fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
897        use sochdb_storage::{DbColumnDef, DbColumnType};
898
899        let table_name = stmt.name.name().to_string();
900        let mut schema = self
901            .db
902            .get_table_schema(&table_name)
903            .ok_or_else(|| SqlError::TableNotFound(table_name.clone()))?;
904
905        let original_name = table_name.clone();
906
907        for op in &stmt.operations {
908            match op {
909                AlterTableOp::AddColumn(col_def) => {
910                    // Check for duplicate column
911                    if schema.columns.iter().any(|c| c.name == col_def.name) {
912                        return Err(SqlError::InvalidArgument(format!(
913                            "Column '{}' already exists in table '{}'",
914                            col_def.name, schema.name
915                        )));
916                    }
917                    let col_type = sql_type_to_db_type(&col_def.data_type);
918                    let nullable = !col_def
919                        .constraints
920                        .iter()
921                        .any(|c| matches!(c, ColumnConstraint::NotNull));
922                    schema.columns.push(DbColumnDef {
923                        name: col_def.name.clone(),
924                        col_type,
925                        nullable,
926                    });
927                }
928                AlterTableOp::DropColumn { name, .. } => {
929                    let idx = schema
930                        .columns
931                        .iter()
932                        .position(|c| c.name == *name)
933                        .ok_or_else(|| {
934                            SqlError::InvalidArgument(format!(
935                                "Column '{}' not found in table '{}'",
936                                name, schema.name
937                            ))
938                        })?;
939                    schema.columns.remove(idx);
940                }
941                AlterTableOp::RenameColumn { old_name, new_name } => {
942                    let col = schema
943                        .columns
944                        .iter_mut()
945                        .find(|c| c.name == *old_name)
946                        .ok_or_else(|| {
947                            SqlError::InvalidArgument(format!(
948                                "Column '{}' not found in table '{}'",
949                                old_name, schema.name
950                            ))
951                        })?;
952                    col.name = new_name.clone();
953                }
954                AlterTableOp::RenameTable(new_name) => {
955                    schema.name = new_name.name().to_string();
956                }
957                AlterTableOp::AlterColumn { name, operation } => {
958                    let col = schema
959                        .columns
960                        .iter_mut()
961                        .find(|c| c.name == *name)
962                        .ok_or_else(|| {
963                            SqlError::InvalidArgument(format!(
964                                "Column '{}' not found in table '{}'",
965                                name, schema.name
966                            ))
967                        })?;
968                    match operation {
969                        AlterColumnOp::SetType(data_type) => {
970                            col.col_type = sql_type_to_db_type(data_type);
971                        }
972                        AlterColumnOp::SetNotNull => {
973                            col.nullable = false;
974                        }
975                        AlterColumnOp::DropNotNull => {
976                            col.nullable = true;
977                        }
978                        AlterColumnOp::SetDefault(_) | AlterColumnOp::DropDefault => {
979                            // Defaults are handled at the SQL layer, not stored
980                            // in the storage schema currently. This is a no-op.
981                        }
982                    }
983                }
984                AlterTableOp::AddConstraint(_) | AlterTableOp::DropConstraint { .. } => {
985                    return Err(SqlError::NotImplemented(
986                        "ADD/DROP CONSTRAINT not yet implemented".into(),
987                    ));
988                }
989            }
990        }
991
992        self.db
993            .update_table_schema(&original_name, schema)
994            .map_err(|e| SqlError::ExecutionError(format!("ALTER TABLE failed: {}", e)))?;
995
996        Ok(ExecutionResult::Ok)
997    }
998
999    fn begin(&mut self, _stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1000        if self.active_txn.is_some() {
1001            return Err(SqlError::TransactionError(
1002                "Transaction already active".into(),
1003            ));
1004        }
1005        let txn = self
1006            .db
1007            .begin_transaction()
1008            .map_err(|e| SqlError::ExecutionError(format!("Begin failed: {}", e)))?;
1009        self.active_txn = Some(txn);
1010        self.explicit_txn = true;
1011        Ok(ExecutionResult::TransactionOk)
1012    }
1013
1014    fn commit(&mut self) -> SqlResult<ExecutionResult> {
1015        if let Some(txn) = self.active_txn.take() {
1016            self.explicit_txn = false;
1017            self.db
1018                .commit(txn)
1019                .map_err(|e| SqlError::TransactionError(format!("Commit failed: {}", e)))?;
1020            Ok(ExecutionResult::TransactionOk)
1021        } else {
1022            Err(SqlError::TransactionError("No active transaction".into()))
1023        }
1024    }
1025
1026    fn rollback(&mut self, _savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1027        if let Some(txn) = self.active_txn.take() {
1028            self.explicit_txn = false;
1029            self.db
1030                .abort(txn)
1031                .map_err(|e| SqlError::TransactionError(format!("Rollback failed: {}", e)))?;
1032            Ok(ExecutionResult::TransactionOk)
1033        } else {
1034            Err(SqlError::TransactionError("No active transaction".into()))
1035        }
1036    }
1037
1038    fn table_exists(&self, table: &str) -> SqlResult<bool> {
1039        Ok(self.db.get_table_schema(table).is_some())
1040    }
1041
1042    fn index_exists(&self, _index: &str) -> SqlResult<bool> {
1043        // TODO: wire to TableIndexRegistry
1044        Ok(false)
1045    }
1046
1047    fn scan_all(
1048        &self,
1049        table: &str,
1050        columns: &[String],
1051    ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1052        let txn = self.db.begin_read_only_fast();
1053
1054        let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
1055            self.db.query(txn, table)
1056        } else {
1057            let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
1058            self.db.query(txn, table).columns(&col_refs)
1059        };
1060
1061        let result = query
1062            .execute()
1063            .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)));
1064
1065        self.db.abort_read_only_fast(txn);
1066        Ok(result?.rows)
1067    }
1068
1069    fn eval_join_predicate(
1070        &self,
1071        expr: &Expr,
1072        row: &HashMap<String, CoreSochValue>,
1073        params: &[CoreSochValue],
1074    ) -> Option<bool> {
1075        let val = self.eval_expr(expr, row, params)?;
1076        match val {
1077            CoreSochValue::Bool(b) => Some(b),
1078            CoreSochValue::Null => Some(false),
1079            _ => Some(false),
1080        }
1081    }
1082}
1083
1084// ============================================================================
1085// Helper Functions
1086// ============================================================================
1087
1088/// Simple string-based predicate evaluation for `StorageBackend::table_scan()`.
1089///
1090/// Supports: `column = value`, `column != value`, `column > value`,
1091/// `column < value`, `column >= value`, `column <= value`.
1092fn apply_simple_predicate(
1093    rows: &[HashMap<String, QuerySochValue>],
1094    predicate: &str,
1095) -> Vec<HashMap<String, QuerySochValue>> {
1096    let operators = [">=", "<=", "!=", "=", ">", "<"];
1097
1098    for op in &operators {
1099        if let Some(idx) = predicate.find(op) {
1100            let column = predicate[..idx].trim();
1101            let value_str = predicate[idx + op.len()..].trim().trim_matches('\'');
1102
1103            return rows
1104                .iter()
1105                .filter(|row| {
1106                    if let Some(val) = row.get(column) {
1107                        let val_str = match val {
1108                            QuerySochValue::Text(s) => s.clone(),
1109                            QuerySochValue::Int(i) => i.to_string(),
1110                            QuerySochValue::UInt(u) => u.to_string(),
1111                            QuerySochValue::Float(f) => f.to_string(),
1112                            QuerySochValue::Bool(b) => b.to_string(),
1113                            _ => return false,
1114                        };
1115
1116                        match *op {
1117                            "=" => val_str == value_str,
1118                            "!=" => val_str != value_str,
1119                            ">" => val_str.as_str() > value_str,
1120                            "<" => (val_str.as_str()) < value_str,
1121                            ">=" => val_str.as_str() >= value_str,
1122                            "<=" => val_str.as_str() <= value_str,
1123                            _ => false,
1124                        }
1125                    } else {
1126                        false
1127                    }
1128                })
1129                .cloned()
1130                .collect();
1131        }
1132    }
1133
1134    rows.to_vec()
1135}
1136
1137/// Euclidean distance between two vectors.
1138fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
1139    a.iter()
1140        .zip(b.iter())
1141        .map(|(x, y)| (x - y) * (x - y))
1142        .sum::<f32>()
1143        .sqrt()
1144}
1145
1146/// Convert a SQL literal to `CoreSochValue`.
1147fn literal_to_core(lit: &Literal) -> CoreSochValue {
1148    match lit {
1149        Literal::Integer(i) => CoreSochValue::Int(*i),
1150        Literal::Float(f) => CoreSochValue::Float(*f),
1151        Literal::String(s) => CoreSochValue::Text(s.clone()),
1152        Literal::Boolean(b) => CoreSochValue::Bool(*b),
1153        Literal::Null => CoreSochValue::Null,
1154        Literal::Blob(b) => CoreSochValue::Binary(b.clone()),
1155    }
1156}
1157
1158/// Evaluate a binary operation on two `CoreSochValue`s.
1159fn eval_binary_op(
1160    lhs: &CoreSochValue,
1161    op: &BinaryOperator,
1162    rhs: &CoreSochValue,
1163) -> CoreSochValue {
1164    match op {
1165        BinaryOperator::Eq => CoreSochValue::Bool(lhs == rhs),
1166        BinaryOperator::Ne => CoreSochValue::Bool(lhs != rhs),
1167        BinaryOperator::Lt => {
1168            CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Less)
1169        }
1170        BinaryOperator::Gt => {
1171            CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Greater)
1172        }
1173        BinaryOperator::Le => {
1174            CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Greater)
1175        }
1176        BinaryOperator::Ge => {
1177            CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Less)
1178        }
1179        BinaryOperator::And => {
1180            let a = matches!(lhs, CoreSochValue::Bool(true));
1181            let b = matches!(rhs, CoreSochValue::Bool(true));
1182            CoreSochValue::Bool(a && b)
1183        }
1184        BinaryOperator::Or => {
1185            let a = matches!(lhs, CoreSochValue::Bool(true));
1186            let b = matches!(rhs, CoreSochValue::Bool(true));
1187            CoreSochValue::Bool(a || b)
1188        }
1189        BinaryOperator::Plus => numeric_op(lhs, rhs, |a, b| a + b, |a, b| a + b),
1190        BinaryOperator::Minus => numeric_op(lhs, rhs, |a, b| a - b, |a, b| a - b),
1191        BinaryOperator::Multiply => numeric_op(lhs, rhs, |a, b| a * b, |a, b| a * b),
1192        BinaryOperator::Divide => numeric_op(
1193            lhs,
1194            rhs,
1195            |a, b| if b != 0 { a / b } else { 0 },
1196            |a, b| if b != 0.0 { a / b } else { 0.0 },
1197        ),
1198        BinaryOperator::Modulo => numeric_op(
1199            lhs,
1200            rhs,
1201            |a, b| if b != 0 { a % b } else { 0 },
1202            |a, b| if b != 0.0 { a % b } else { 0.0 },
1203        ),
1204        BinaryOperator::Like => {
1205            if let (CoreSochValue::Text(s), CoreSochValue::Text(pattern)) = (lhs, rhs) {
1206                CoreSochValue::Bool(sql_like_match(s, pattern))
1207            } else {
1208                CoreSochValue::Bool(false)
1209            }
1210        }
1211        BinaryOperator::Concat => {
1212            let a = value_to_string(lhs);
1213            let b = value_to_string(rhs);
1214            CoreSochValue::Text(format!("{}{}", a, b))
1215        }
1216        _ => CoreSochValue::Null,
1217    }
1218}
1219
1220/// Evaluate a unary operation.
1221fn eval_unary_op(op: &UnaryOperator, val: &CoreSochValue) -> CoreSochValue {
1222    match op {
1223        UnaryOperator::Not => match val {
1224            CoreSochValue::Bool(b) => CoreSochValue::Bool(!b),
1225            _ => CoreSochValue::Null,
1226        },
1227        UnaryOperator::Minus => match val {
1228            CoreSochValue::Int(i) => CoreSochValue::Int(-i),
1229            CoreSochValue::Float(f) => CoreSochValue::Float(-f),
1230            _ => CoreSochValue::Null,
1231        },
1232        UnaryOperator::Plus => val.clone(),
1233        _ => CoreSochValue::Null,
1234    }
1235}
1236
1237/// Numeric operation helper.
1238fn numeric_op(
1239    lhs: &CoreSochValue,
1240    rhs: &CoreSochValue,
1241    int_op: impl Fn(i64, i64) -> i64,
1242    float_op: impl Fn(f64, f64) -> f64,
1243) -> CoreSochValue {
1244    match (lhs, rhs) {
1245        (CoreSochValue::Int(a), CoreSochValue::Int(b)) => CoreSochValue::Int(int_op(*a, *b)),
1246        (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1247            CoreSochValue::Float(float_op(*a, *b))
1248        }
1249        (CoreSochValue::Int(a), CoreSochValue::Float(b)) => {
1250            CoreSochValue::Float(float_op(*a as f64, *b))
1251        }
1252        (CoreSochValue::Float(a), CoreSochValue::Int(b)) => {
1253            CoreSochValue::Float(float_op(*a, *b as f64))
1254        }
1255        (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => {
1256            CoreSochValue::Int(int_op(*a as i64, *b as i64))
1257        }
1258        _ => CoreSochValue::Null,
1259    }
1260}
1261
1262/// Compare two `CoreSochValue`s for ordering.
1263fn compare_values(a: &CoreSochValue, b: &CoreSochValue) -> std::cmp::Ordering {
1264    match (a, b) {
1265        (CoreSochValue::Int(a), CoreSochValue::Int(b)) => a.cmp(b),
1266        (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => a.cmp(b),
1267        (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1268            a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
1269        }
1270        (CoreSochValue::Text(a), CoreSochValue::Text(b)) => a.cmp(b),
1271        (CoreSochValue::Int(a), CoreSochValue::Float(b)) => (*a as f64)
1272            .partial_cmp(b)
1273            .unwrap_or(std::cmp::Ordering::Equal),
1274        (CoreSochValue::Float(a), CoreSochValue::Int(b)) => a
1275            .partial_cmp(&(*b as f64))
1276            .unwrap_or(std::cmp::Ordering::Equal),
1277        (CoreSochValue::Null, CoreSochValue::Null) => std::cmp::Ordering::Equal,
1278        (CoreSochValue::Null, _) => std::cmp::Ordering::Less,
1279        (_, CoreSochValue::Null) => std::cmp::Ordering::Greater,
1280        _ => std::cmp::Ordering::Equal,
1281    }
1282}
1283
1284/// Compare optional CoreSochValues for ORDER BY.
1285fn compare_optional_values(
1286    a: Option<&CoreSochValue>,
1287    b: Option<&CoreSochValue>,
1288) -> std::cmp::Ordering {
1289    match (a, b) {
1290        (Some(a), Some(b)) => compare_values(a, b),
1291        (None, Some(_)) => std::cmp::Ordering::Less,
1292        (Some(_), None) => std::cmp::Ordering::Greater,
1293        (None, None) => std::cmp::Ordering::Equal,
1294    }
1295}
1296
1297/// Extract column name from an ORDER BY expression.
1298fn extract_order_by_column(expr: &Expr) -> String {
1299    match expr {
1300        Expr::Column(col_ref) => col_ref.column.clone(),
1301        _ => String::new(),
1302    }
1303}
1304
1305/// Check if two rows have the same values.
1306fn rows_equal(
1307    a: &HashMap<String, CoreSochValue>,
1308    b: &HashMap<String, CoreSochValue>,
1309) -> bool {
1310    if a.len() != b.len() {
1311        return false;
1312    }
1313    a.iter().all(|(k, v)| b.get(k) == Some(v))
1314}
1315
1316/// Convert a value to string representation.
1317fn value_to_string(v: &CoreSochValue) -> String {
1318    match v {
1319        CoreSochValue::Text(s) => s.clone(),
1320        CoreSochValue::Int(i) => i.to_string(),
1321        CoreSochValue::UInt(u) => u.to_string(),
1322        CoreSochValue::Float(f) => f.to_string(),
1323        CoreSochValue::Bool(b) => b.to_string(),
1324        CoreSochValue::Null => "NULL".to_string(),
1325        _ => String::new(),
1326    }
1327}
1328
1329/// SQL LIKE pattern matching.
1330///
1331/// Converts SQL LIKE patterns (`%`, `_`) to regex equivalents.
1332fn sql_like_match(s: &str, pattern: &str) -> bool {
1333    // Escape regex special chars first, then convert LIKE wildcards
1334    let mut regex_pattern = String::with_capacity(pattern.len() * 2 + 2);
1335    regex_pattern.push('^');
1336    for ch in pattern.chars() {
1337        match ch {
1338            '%' => regex_pattern.push_str(".*"),
1339            '_' => regex_pattern.push('.'),
1340            '.' | '(' | ')' | '[' | ']' | '{' | '}' | '+' | '*' | '?' | '^' | '$' | '|'
1341            | '\\' => {
1342                regex_pattern.push('\\');
1343                regex_pattern.push(ch);
1344            }
1345            _ => regex_pattern.push(ch),
1346        }
1347    }
1348    regex_pattern.push('$');
1349
1350    regex::Regex::new(&regex_pattern)
1351        .map(|re| re.is_match(s))
1352        .unwrap_or(false)
1353}
1354
1355/// Convert SQL data type to storage column type.
1356fn sql_type_to_db_type(dt: &DataType) -> sochdb_storage::DbColumnType {
1357    use sochdb_storage::DbColumnType;
1358    match dt {
1359        DataType::TinyInt | DataType::SmallInt | DataType::Int | DataType::BigInt => {
1360            DbColumnType::Int64
1361        }
1362        DataType::Float | DataType::Double | DataType::Decimal { .. } => DbColumnType::Float64,
1363        DataType::Boolean => DbColumnType::Bool,
1364        DataType::Binary(_) | DataType::Varbinary(_) | DataType::Blob => DbColumnType::Binary,
1365        // All other types (text, date, json, custom, vector) → Text
1366        _ => DbColumnType::Text,
1367    }
1368}
1369
1370// ============================================================================
1371// NamespacedSqlConnection — Namespace-Prefixed Storage Isolation (P3.2)
1372// ============================================================================
1373
1374/// A wrapper around any `SqlConnection` that prefixes table names with
1375/// `namespace:database:` to provide storage-level isolation between tenants.
1376///
1377/// ## Key Isolation
1378///
1379/// All storage keys are prefixed: `ns:db:table/row_id` instead of `table/row_id`.
1380/// This prevents any cross-namespace data leakage at the storage layer.
1381///
1382/// ## Example
1383///
1384/// ```text
1385/// // Without namespace prefix:
1386/// table key = "users/1"
1387///
1388/// // With namespace prefix (namespace="prod", database="app"):
1389/// table key = "prod:app:users/1"
1390/// ```
1391pub struct NamespacedSqlConnection<C: SqlConnection> {
1392    inner: C,
1393    namespace: String,
1394    database: String,
1395}
1396
1397impl<C: SqlConnection> NamespacedSqlConnection<C> {
1398    /// Create a new namespaced connection.
1399    pub fn new(inner: C, namespace: impl Into<String>, database: impl Into<String>) -> Self {
1400        Self {
1401            inner,
1402            namespace: namespace.into(),
1403            database: database.into(),
1404        }
1405    }
1406
1407    /// Prefix a table name with namespace:database:
1408    fn prefix_table(&self, table: &str) -> String {
1409        format!("{}:{}:{}", self.namespace, self.database, table)
1410    }
1411
1412    /// Get the namespace.
1413    pub fn namespace(&self) -> &str {
1414        &self.namespace
1415    }
1416
1417    /// Get the database.
1418    pub fn database(&self) -> &str {
1419        &self.database
1420    }
1421
1422    /// Get a reference to the inner connection.
1423    pub fn inner(&self) -> &C {
1424        &self.inner
1425    }
1426
1427    /// Get a mutable reference to the inner connection.
1428    pub fn inner_mut(&mut self) -> &mut C {
1429        &mut self.inner
1430    }
1431}
1432
1433impl<C: SqlConnection> SqlConnection for NamespacedSqlConnection<C> {
1434    fn select(
1435        &self,
1436        table: &str,
1437        columns: &[String],
1438        where_clause: Option<&Expr>,
1439        order_by: &[OrderByItem],
1440        limit: Option<usize>,
1441        offset: Option<usize>,
1442        params: &[CoreSochValue],
1443    ) -> SqlResult<ExecutionResult> {
1444        self.inner.select(&self.prefix_table(table), columns, where_clause, order_by, limit, offset, params)
1445    }
1446
1447    fn insert(
1448        &mut self,
1449        table: &str,
1450        columns: Option<&[String]>,
1451        rows: &[Vec<Expr>],
1452        on_conflict: Option<&OnConflict>,
1453        params: &[CoreSochValue],
1454    ) -> SqlResult<ExecutionResult> {
1455        self.inner.insert(&self.prefix_table(table), columns, rows, on_conflict, params)
1456    }
1457
1458    fn update(
1459        &mut self,
1460        table: &str,
1461        assignments: &[Assignment],
1462        where_clause: Option<&Expr>,
1463        params: &[CoreSochValue],
1464    ) -> SqlResult<ExecutionResult> {
1465        self.inner.update(&self.prefix_table(table), assignments, where_clause, params)
1466    }
1467
1468    fn delete(
1469        &mut self,
1470        table: &str,
1471        where_clause: Option<&Expr>,
1472        params: &[CoreSochValue],
1473    ) -> SqlResult<ExecutionResult> {
1474        self.inner.delete(&self.prefix_table(table), where_clause, params)
1475    }
1476
1477    fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
1478        // Create a modified statement with prefixed table name
1479        let mut prefixed = stmt.clone();
1480        let original_name = stmt.name.name().to_string();
1481        prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1482        self.inner.create_table(&prefixed)
1483    }
1484
1485    fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
1486        let mut prefixed = stmt.clone();
1487        prefixed.names = stmt.names.iter()
1488            .map(|n| ObjectName::new(self.prefix_table(n.name())))
1489            .collect();
1490        self.inner.drop_table(&prefixed)
1491    }
1492
1493    fn create_index(&mut self, stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
1494        self.inner.create_index(stmt)
1495    }
1496
1497    fn drop_index(&mut self, stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
1498        self.inner.drop_index(stmt)
1499    }
1500
1501    fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
1502        let mut prefixed = stmt.clone();
1503        let original_name = stmt.name.name().to_string();
1504        prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1505        self.inner.alter_table(&prefixed)
1506    }
1507
1508    fn begin(&mut self, stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1509        self.inner.begin(stmt)
1510    }
1511
1512    fn commit(&mut self) -> SqlResult<ExecutionResult> {
1513        self.inner.commit()
1514    }
1515
1516    fn rollback(&mut self, savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1517        self.inner.rollback(savepoint)
1518    }
1519
1520    fn table_exists(&self, table: &str) -> SqlResult<bool> {
1521        self.inner.table_exists(&self.prefix_table(table))
1522    }
1523
1524    fn index_exists(&self, index: &str) -> SqlResult<bool> {
1525        self.inner.index_exists(index)
1526    }
1527
1528    fn scan_all(
1529        &self,
1530        table: &str,
1531        columns: &[String],
1532    ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1533        self.inner.scan_all(&self.prefix_table(table), columns)
1534    }
1535
1536    fn eval_join_predicate(
1537        &self,
1538        expr: &Expr,
1539        row: &HashMap<String, CoreSochValue>,
1540        params: &[CoreSochValue],
1541    ) -> Option<bool> {
1542        self.inner.eval_join_predicate(expr, row, params)
1543    }
1544}
1545
1546// ============================================================================
1547// Tests
1548// ============================================================================
1549
1550#[cfg(test)]
1551mod tests {
1552    use super::*;
1553
1554    #[test]
1555    fn test_convert_core_to_query_basic_types() {
1556        assert_eq!(
1557            convert_core_to_query(&CoreSochValue::Null),
1558            QuerySochValue::Null
1559        );
1560        assert_eq!(
1561            convert_core_to_query(&CoreSochValue::Bool(true)),
1562            QuerySochValue::Bool(true)
1563        );
1564        assert_eq!(
1565            convert_core_to_query(&CoreSochValue::Int(42)),
1566            QuerySochValue::Int(42)
1567        );
1568        assert_eq!(
1569            convert_core_to_query(&CoreSochValue::UInt(100)),
1570            QuerySochValue::UInt(100)
1571        );
1572        assert_eq!(
1573            convert_core_to_query(&CoreSochValue::Float(3.14)),
1574            QuerySochValue::Float(3.14)
1575        );
1576        assert_eq!(
1577            convert_core_to_query(&CoreSochValue::Text("hello".into())),
1578            QuerySochValue::Text("hello".into())
1579        );
1580    }
1581
1582    #[test]
1583    fn test_convert_core_to_query_object() {
1584        let mut map = HashMap::new();
1585        map.insert("name".to_string(), CoreSochValue::Text("Alice".into()));
1586        let result = convert_core_to_query(&CoreSochValue::Object(map));
1587        match result {
1588            QuerySochValue::Text(s) => assert!(s.contains("Alice")),
1589            _ => panic!("Expected Text for Object conversion"),
1590        }
1591    }
1592
1593    #[test]
1594    fn test_convert_core_to_query_ref() {
1595        let result = convert_core_to_query(&CoreSochValue::Ref {
1596            table: "users".into(),
1597            id: 42,
1598        });
1599        assert_eq!(result, QuerySochValue::Text("users/42".into()));
1600    }
1601
1602    #[test]
1603    fn test_convert_roundtrip() {
1604        let original = QuerySochValue::Int(42);
1605        let core = convert_query_to_core(&original);
1606        let back = convert_core_to_query(&core);
1607        assert_eq!(original, back);
1608    }
1609
1610    #[test]
1611    fn test_apply_simple_predicate_eq() {
1612        let rows = vec![
1613            {
1614                let mut m = HashMap::new();
1615                m.insert("name".into(), QuerySochValue::Text("Alice".into()));
1616                m.insert("age".into(), QuerySochValue::Int(30));
1617                m
1618            },
1619            {
1620                let mut m = HashMap::new();
1621                m.insert("name".into(), QuerySochValue::Text("Bob".into()));
1622                m.insert("age".into(), QuerySochValue::Int(25));
1623                m
1624            },
1625        ];
1626
1627        let filtered = apply_simple_predicate(&rows, "name = Alice");
1628        assert_eq!(filtered.len(), 1);
1629        assert_eq!(
1630            filtered[0].get("name"),
1631            Some(&QuerySochValue::Text("Alice".into()))
1632        );
1633    }
1634
1635    #[test]
1636    fn test_apply_simple_predicate_neq() {
1637        let rows = vec![
1638            {
1639                let mut m = HashMap::new();
1640                m.insert("status".into(), QuerySochValue::Text("active".into()));
1641                m
1642            },
1643            {
1644                let mut m = HashMap::new();
1645                m.insert("status".into(), QuerySochValue::Text("inactive".into()));
1646                m
1647            },
1648        ];
1649
1650        let filtered = apply_simple_predicate(&rows, "status != active");
1651        assert_eq!(filtered.len(), 1);
1652        assert_eq!(
1653            filtered[0].get("status"),
1654            Some(&QuerySochValue::Text("inactive".into()))
1655        );
1656    }
1657
1658    #[test]
1659    fn test_literal_to_core() {
1660        assert_eq!(
1661            literal_to_core(&Literal::Integer(42)),
1662            CoreSochValue::Int(42)
1663        );
1664        assert_eq!(
1665            literal_to_core(&Literal::Float(3.14)),
1666            CoreSochValue::Float(3.14)
1667        );
1668        assert_eq!(
1669            literal_to_core(&Literal::String("hi".into())),
1670            CoreSochValue::Text("hi".into())
1671        );
1672        assert_eq!(
1673            literal_to_core(&Literal::Boolean(true)),
1674            CoreSochValue::Bool(true)
1675        );
1676        assert_eq!(literal_to_core(&Literal::Null), CoreSochValue::Null);
1677    }
1678
1679    #[test]
1680    fn test_euclidean_distance() {
1681        let a = vec![1.0, 0.0, 0.0];
1682        let b = vec![0.0, 1.0, 0.0];
1683        let dist = euclidean_distance(&a, &b);
1684        assert!((dist - std::f32::consts::SQRT_2).abs() < 1e-6);
1685    }
1686
1687    #[test]
1688    fn test_compare_values() {
1689        assert_eq!(
1690            compare_values(&CoreSochValue::Int(1), &CoreSochValue::Int(2)),
1691            std::cmp::Ordering::Less
1692        );
1693        assert_eq!(
1694            compare_values(
1695                &CoreSochValue::Text("a".into()),
1696                &CoreSochValue::Text("b".into())
1697            ),
1698            std::cmp::Ordering::Less
1699        );
1700        assert_eq!(
1701            compare_values(&CoreSochValue::Null, &CoreSochValue::Int(1)),
1702            std::cmp::Ordering::Less
1703        );
1704    }
1705
1706    #[test]
1707    fn test_eval_binary_op() {
1708        assert_eq!(
1709            eval_binary_op(
1710                &CoreSochValue::Int(10),
1711                &BinaryOperator::Plus,
1712                &CoreSochValue::Int(5)
1713            ),
1714            CoreSochValue::Int(15)
1715        );
1716        assert_eq!(
1717            eval_binary_op(
1718                &CoreSochValue::Int(10),
1719                &BinaryOperator::Eq,
1720                &CoreSochValue::Int(10)
1721            ),
1722            CoreSochValue::Bool(true)
1723        );
1724        assert_eq!(
1725            eval_binary_op(
1726                &CoreSochValue::Text("hello".into()),
1727                &BinaryOperator::Concat,
1728                &CoreSochValue::Text(" world".into())
1729            ),
1730            CoreSochValue::Text("hello world".into())
1731        );
1732    }
1733
1734    #[test]
1735    fn test_sql_like_match() {
1736        assert!(sql_like_match("hello", "hello"));
1737        assert!(sql_like_match("hello", "%llo"));
1738        assert!(sql_like_match("hello", "h%o"));
1739        assert!(sql_like_match("hello", "h_llo"));
1740        assert!(!sql_like_match("hello", "world"));
1741        // Test with regex special characters in the data
1742        assert!(sql_like_match("file.txt", "file%"));
1743        assert!(sql_like_match("test(1)", "%(%"));
1744    }
1745
1746    #[test]
1747    fn test_sql_type_to_db_type() {
1748        use sochdb_storage::DbColumnType;
1749        assert_eq!(sql_type_to_db_type(&DataType::Int), DbColumnType::Int64);
1750        assert_eq!(sql_type_to_db_type(&DataType::BigInt), DbColumnType::Int64);
1751        assert_eq!(
1752            sql_type_to_db_type(&DataType::Float),
1753            DbColumnType::Float64
1754        );
1755        assert_eq!(
1756            sql_type_to_db_type(&DataType::Boolean),
1757            DbColumnType::Bool
1758        );
1759        assert_eq!(sql_type_to_db_type(&DataType::Blob), DbColumnType::Binary);
1760        assert_eq!(sql_type_to_db_type(&DataType::Text), DbColumnType::Text);
1761        assert_eq!(
1762            sql_type_to_db_type(&DataType::Varchar(Some(255))),
1763            DbColumnType::Text
1764        );
1765    }
1766
1767    // =========================================================================
1768    // Integration tests: Full round-trip through real Database
1769    // =========================================================================
1770
1771    fn setup_test_db() -> (std::sync::Arc<sochdb_storage::Database>, tempfile::TempDir) {
1772        let tmp = tempfile::tempdir().expect("tmpdir");
1773        let db = sochdb_storage::Database::open(tmp.path()).expect("open db");
1774        (db, tmp)
1775    }
1776
1777    #[test]
1778    fn test_integration_storage_backend_table_scan() {
1779        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1780        let (db, _tmp) = setup_test_db();
1781
1782        // Register table
1783        db.register_table(DbTableSchema {
1784            name: "users".into(),
1785            columns: vec![
1786                DbColumnDef { name: "id".into(), col_type: DbColumnType::Int64, nullable: false },
1787                DbColumnDef { name: "name".into(), col_type: DbColumnType::Text, nullable: false },
1788                DbColumnDef { name: "age".into(), col_type: DbColumnType::Int64, nullable: true },
1789            ],
1790        })
1791        .expect("register table");
1792
1793        // Insert rows via raw storage API
1794        let txn = db.begin_transaction().expect("begin txn");
1795        let mut vals = std::collections::HashMap::new();
1796        vals.insert("id".into(), CoreSochValue::Int(1));
1797        vals.insert("name".into(), CoreSochValue::Text("Alice".into()));
1798        vals.insert("age".into(), CoreSochValue::Int(30));
1799        db.insert_row(txn, "users", 1, &vals).expect("insert 1");
1800
1801        vals.clear();
1802        vals.insert("id".into(), CoreSochValue::Int(2));
1803        vals.insert("name".into(), CoreSochValue::Text("Bob".into()));
1804        vals.insert("age".into(), CoreSochValue::Int(25));
1805        db.insert_row(txn, "users", 2, &vals).expect("insert 2");
1806        db.commit(txn).expect("commit");
1807
1808        // Query via DatabaseStorageBackend
1809        let backend = DatabaseStorageBackend::new(db.clone());
1810        let rows = backend
1811            .table_scan("users", &["id".into(), "name".into(), "age".into()], None)
1812            .expect("table_scan");
1813
1814        assert_eq!(rows.len(), 2);
1815        // Verify data came through the conversion pipeline
1816        let names: Vec<_> = rows
1817            .iter()
1818            .filter_map(|r| match r.get("name") {
1819                Some(crate::soch_ql::SochValue::Text(t)) => Some(t.clone()),
1820                _ => None,
1821            })
1822            .collect();
1823        assert!(names.contains(&"Alice".to_string()));
1824        assert!(names.contains(&"Bob".to_string()));
1825    }
1826
1827    #[test]
1828    fn test_integration_storage_backend_primary_key_lookup() {
1829        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1830        let (db, _tmp) = setup_test_db();
1831
1832        db.register_table(DbTableSchema {
1833            name: "items".into(),
1834            columns: vec![
1835                DbColumnDef { name: "id".into(), col_type: DbColumnType::Int64, nullable: false },
1836                DbColumnDef { name: "label".into(), col_type: DbColumnType::Text, nullable: false },
1837            ],
1838        })
1839        .expect("register");
1840
1841        let txn = db.begin_transaction().expect("txn");
1842        let mut v = std::collections::HashMap::new();
1843        v.insert("id".into(), CoreSochValue::Int(42));
1844        v.insert("label".into(), CoreSochValue::Text("answer".into()));
1845        db.insert_row(txn, "items", 42, &v).expect("insert");
1846        db.commit(txn).expect("commit");
1847
1848        let backend = DatabaseStorageBackend::new(db.clone());
1849        let row = backend
1850            .primary_key_lookup("items", &crate::soch_ql::SochValue::Int(42))
1851            .expect("pk lookup");
1852        assert!(row.is_some());
1853        let row = row.unwrap();
1854        assert_eq!(
1855            row.get("label"),
1856            Some(&crate::soch_ql::SochValue::Text("answer".into()))
1857        );
1858    }
1859
1860    #[test]
1861    fn test_integration_sochql_executor_reads_storage() {
1862        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1863        use sochdb_core::{Catalog, SochSchema, SochType};
1864        let (db, _tmp) = setup_test_db();
1865
1866        // Register table in storage
1867        db.register_table(DbTableSchema {
1868            name: "events".into(),
1869            columns: vec![
1870                DbColumnDef { name: "id".into(), col_type: DbColumnType::Int64, nullable: false },
1871                DbColumnDef { name: "kind".into(), col_type: DbColumnType::Text, nullable: false },
1872                DbColumnDef { name: "score".into(), col_type: DbColumnType::Float64, nullable: true },
1873            ],
1874        })
1875        .expect("register events");
1876
1877        // Insert data
1878        let txn = db.begin_transaction().expect("txn");
1879        for i in 1..=5u64 {
1880            let mut vals = std::collections::HashMap::new();
1881            vals.insert("id".into(), CoreSochValue::Int(i as i64));
1882            vals.insert("kind".into(), CoreSochValue::Text(format!("event_{}", i)));
1883            vals.insert("score".into(), CoreSochValue::Float(i as f64 * 1.5));
1884            db.insert_row(txn, "events", i, &vals).expect("insert");
1885        }
1886        db.commit(txn).expect("commit");
1887
1888        // Build catalog (needed by SochQlExecutor for validation)
1889        let mut catalog = Catalog::new("test");
1890        let schema = SochSchema {
1891            name: "events".into(),
1892            fields: vec![
1893                sochdb_core::SochField { name: "id".into(), field_type: SochType::Int, nullable: false, default: None },
1894                sochdb_core::SochField { name: "kind".into(), field_type: SochType::Text, nullable: false, default: None },
1895                sochdb_core::SochField { name: "score".into(), field_type: SochType::Float, nullable: true, default: None },
1896            ],
1897            primary_key: None,
1898            indexes: vec![],
1899        };
1900        catalog.create_table(schema, 0).expect("register catalog");
1901
1902        // Execute via SochQlExecutor with storage backend
1903        let backend = std::sync::Arc::new(DatabaseStorageBackend::new(db.clone()));
1904        let executor = crate::soch_ql_executor::SochQlExecutor::with_storage(backend);
1905        let result = executor
1906            .execute("SELECT * FROM events", &catalog)
1907            .expect("select *");
1908
1909        // Phase 0 success: we actually get rows from storage!
1910        assert_eq!(result.rows.len(), 5, "Expected 5 rows from storage, got {}", result.rows.len());
1911        assert_eq!(result.columns, vec!["id", "kind", "score"]);
1912
1913        // Verify data integrity
1914        let first_row_kind = &result.rows[0];
1915        // Rows should have 3 values each (id, kind, score)
1916        assert_eq!(first_row_kind.len(), 3);
1917    }
1918
1919    #[test]
1920    fn test_integration_sql_connection_crud() {
1921        use crate::sql::bridge::SqlConnection;
1922
1923        let (db, _tmp) = setup_test_db();
1924        let mut conn = DatabaseSqlConnection::new(db.clone());
1925
1926        // CREATE TABLE via SQL connection
1927        let create = CreateTableStmt {
1928            span: crate::sql::token::Span::new(0, 0, 0, 0),
1929            name: crate::sql::ast::ObjectName::new("products"),
1930            columns: vec![
1931                crate::sql::ast::ColumnDef {
1932                    name: "id".into(),
1933                    data_type: DataType::Int,
1934                    constraints: vec![],
1935                },
1936                crate::sql::ast::ColumnDef {
1937                    name: "name".into(),
1938                    data_type: DataType::Text,
1939                    constraints: vec![],
1940                },
1941                crate::sql::ast::ColumnDef {
1942                    name: "price".into(),
1943                    data_type: DataType::Float,
1944                    constraints: vec![],
1945                },
1946            ],
1947            if_not_exists: false,
1948            constraints: vec![],
1949            options: vec![],
1950        };
1951        let result = conn.create_table(&create).expect("create table");
1952        assert!(matches!(result, crate::sql::bridge::ExecutionResult::Ok));
1953
1954        // BEGIN
1955        let begin_stmt = crate::sql::ast::BeginStmt {
1956            isolation_level: None,
1957            read_only: false,
1958        };
1959        let result = conn.begin(&begin_stmt).expect("begin");
1960        assert!(matches!(result, crate::sql::bridge::ExecutionResult::TransactionOk));
1961
1962        // INSERT via decomposed SqlConnection::insert
1963        let values = vec![vec![
1964            Expr::Literal(Literal::Integer(1)),
1965            Expr::Literal(Literal::String("Widget".into())),
1966            Expr::Literal(Literal::Float(9.99)),
1967        ]];
1968        let cols = vec!["id".into(), "name".into(), "price".into()];
1969        let result = conn
1970            .insert("products", Some(&cols), &values, None, &[])
1971            .expect("insert");
1972        match &result {
1973            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
1974            _ => panic!("Expected RowsAffected, got {:?}", result),
1975        }
1976
1977        // COMMIT
1978        let result = conn.commit().expect("commit");
1979        assert!(matches!(result, crate::sql::bridge::ExecutionResult::TransactionOk));
1980
1981        // SELECT via decomposed SqlConnection::select
1982        let columns = vec!["name".into()];
1983        let result = conn
1984            .select("products", &columns, None, &[], None, None, &[])
1985            .expect("select");
1986        match result {
1987            crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
1988                assert!(!rows.is_empty(), "Expected rows from SELECT");
1989                assert_eq!(columns, vec!["name"]);
1990                // Verify data round-trips: rows are Vec<HashMap<String, SochValue>>
1991                let name = rows[0].get("name").expect("name column");
1992                match name {
1993                    CoreSochValue::Text(s) => assert_eq!(s, "Widget"),
1994                    other => panic!("Expected Text, got {:?}", other),
1995                }
1996            }
1997            other => panic!("Expected Rows, got {:?}", other),
1998        }
1999    }
2000
2001    #[test]
2002    fn test_integration_row_count() {
2003        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
2004        let (db, _tmp) = setup_test_db();
2005
2006        db.register_table(DbTableSchema {
2007            name: "metrics".into(),
2008            columns: vec![
2009                DbColumnDef { name: "ts".into(), col_type: DbColumnType::UInt64, nullable: false },
2010                DbColumnDef { name: "val".into(), col_type: DbColumnType::Float64, nullable: false },
2011            ],
2012        })
2013        .expect("register");
2014
2015        let txn = db.begin_transaction().expect("txn");
2016        for i in 0..10u64 {
2017            let mut v = std::collections::HashMap::new();
2018            v.insert("ts".into(), CoreSochValue::UInt(i));
2019            v.insert("val".into(), CoreSochValue::Float(i as f64));
2020            db.insert_row(txn, "metrics", i, &v).expect("insert");
2021        }
2022        db.commit(txn).expect("commit");
2023
2024        let backend = DatabaseStorageBackend::new(db.clone());
2025        assert_eq!(backend.row_count("metrics"), 10);
2026        assert_eq!(backend.row_count("nonexistent"), 0);
2027    }
2028
2029    // =======================================================================
2030    // Step 0d: End-to-end SQL text → SqlBridge → DatabaseSqlConnection tests
2031    // =======================================================================
2032
2033    #[test]
2034    fn test_sqlbridge_create_insert_select() {
2035        let (db, _tmp) = setup_test_db();
2036        let conn = DatabaseSqlConnection::new(db.clone());
2037        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2038
2039        // CREATE TABLE via raw SQL
2040        let r = bridge.execute("CREATE TABLE cities (id INT, name TEXT, pop FLOAT)").unwrap();
2041        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2042
2043        // INSERT via raw SQL
2044        let r = bridge.execute("INSERT INTO cities (id, name, pop) VALUES (1, 'Tokyo', 13.96)").unwrap();
2045        match &r {
2046            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2047            other => panic!("Expected RowsAffected, got {:?}", other),
2048        }
2049
2050        let r = bridge.execute("INSERT INTO cities (id, name, pop) VALUES (2, 'Delhi', 11.03)").unwrap();
2051        match &r {
2052            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2053            other => panic!("Expected RowsAffected, got {:?}", other),
2054        }
2055
2056        // SELECT via raw SQL
2057        let r = bridge.execute("SELECT name, pop FROM cities").unwrap();
2058        match &r {
2059            crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
2060                assert_eq!(rows.len(), 2);
2061                assert!(columns.contains(&"name".to_string()));
2062            }
2063            other => panic!("Expected Rows, got {:?}", other),
2064        }
2065    }
2066
2067    #[test]
2068    fn test_sqlbridge_update_delete() {
2069        let (db, _tmp) = setup_test_db();
2070        let conn = DatabaseSqlConnection::new(db.clone());
2071        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2072
2073        bridge.execute("CREATE TABLE items (id INT, qty INT)").unwrap();
2074        bridge.execute("INSERT INTO items (id, qty) VALUES (1, 10)").unwrap();
2075        bridge.execute("INSERT INTO items (id, qty) VALUES (2, 20)").unwrap();
2076        bridge.execute("INSERT INTO items (id, qty) VALUES (3, 30)").unwrap();
2077
2078        // UPDATE
2079        let r = bridge.execute("UPDATE items SET qty = 99 WHERE id = 2").unwrap();
2080        match &r {
2081            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2082            other => panic!("Expected RowsAffected for UPDATE, got {:?}", other),
2083        }
2084
2085        // DELETE
2086        let r = bridge.execute("DELETE FROM items WHERE id = 3").unwrap();
2087        match &r {
2088            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2089            other => panic!("Expected RowsAffected for DELETE, got {:?}", other),
2090        }
2091
2092        // Verify remaining data
2093        let r = bridge.execute("SELECT id, qty FROM items").unwrap();
2094        match &r {
2095            crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2096                assert_eq!(rows.len(), 2, "Expected 2 rows after delete");
2097            }
2098            other => panic!("Expected Rows, got {:?}", other),
2099        }
2100    }
2101
2102    #[test]
2103    fn test_sqlbridge_transaction_commit() {
2104        let (db, _tmp) = setup_test_db();
2105        let conn = DatabaseSqlConnection::new(db.clone());
2106        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2107
2108        bridge.execute("CREATE TABLE txtest (id INT, val TEXT)").unwrap();
2109
2110        // BEGIN / INSERT / COMMIT
2111        let r = bridge.execute("BEGIN").unwrap();
2112        assert!(matches!(r, crate::sql::bridge::ExecutionResult::TransactionOk));
2113
2114        bridge.execute("INSERT INTO txtest (id, val) VALUES (1, 'committed')").unwrap();
2115
2116        let r = bridge.execute("COMMIT").unwrap();
2117        assert!(matches!(r, crate::sql::bridge::ExecutionResult::TransactionOk));
2118
2119        // Data should be visible
2120        let r = bridge.execute("SELECT val FROM txtest").unwrap();
2121        match &r {
2122            crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2123                assert_eq!(rows.len(), 1);
2124            }
2125            other => panic!("Expected Rows, got {:?}", other),
2126        }
2127    }
2128
2129    #[test]
2130    fn test_sqlbridge_drop_table() {
2131        let (db, _tmp) = setup_test_db();
2132        let conn = DatabaseSqlConnection::new(db.clone());
2133        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2134
2135        bridge.execute("CREATE TABLE ephemeral (x INT)").unwrap();
2136        let r = bridge.execute("DROP TABLE ephemeral").unwrap();
2137        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2138
2139        // Verify DROP TABLE IF EXISTS also works
2140        let r = bridge.execute("DROP TABLE IF EXISTS ephemeral").unwrap();
2141        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2142    }
2143
2144    #[test]
2145    fn test_sqlbridge_if_not_exists() {
2146        let (db, _tmp) = setup_test_db();
2147        let conn = DatabaseSqlConnection::new(db.clone());
2148        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2149
2150        bridge.execute("CREATE TABLE dup (id INT)").unwrap();
2151        // Should not error with IF NOT EXISTS
2152        let r = bridge.execute("CREATE TABLE IF NOT EXISTS dup (id INT)").unwrap();
2153        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2154    }
2155
2156    #[test]
2157    fn test_sqlbridge_alter_add_column() {
2158        let (db, _tmp) = setup_test_db();
2159        let conn = DatabaseSqlConnection::new(db.clone());
2160        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2161
2162        bridge
2163            .execute("CREATE TABLE alter_test (id INT, name TEXT)")
2164            .unwrap();
2165
2166        let r = bridge
2167            .execute("ALTER TABLE alter_test ADD COLUMN age INT")
2168            .unwrap();
2169        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2170
2171        // Verify schema was updated
2172        let schema = db.get_table_schema("alter_test").unwrap();
2173        assert_eq!(schema.columns.len(), 3);
2174        assert_eq!(schema.columns[2].name, "age");
2175    }
2176
2177    #[test]
2178    fn test_sqlbridge_alter_drop_column() {
2179        let (db, _tmp) = setup_test_db();
2180        let conn = DatabaseSqlConnection::new(db.clone());
2181        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2182
2183        bridge
2184            .execute("CREATE TABLE drop_col_test (id INT, name TEXT, age INT)")
2185            .unwrap();
2186
2187        let r = bridge
2188            .execute("ALTER TABLE drop_col_test DROP COLUMN age")
2189            .unwrap();
2190        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2191
2192        let schema = db.get_table_schema("drop_col_test").unwrap();
2193        assert_eq!(schema.columns.len(), 2);
2194        assert!(schema.columns.iter().all(|c| c.name != "age"));
2195    }
2196
2197    #[test]
2198    fn test_sqlbridge_alter_rename_column() {
2199        let (db, _tmp) = setup_test_db();
2200        let conn = DatabaseSqlConnection::new(db.clone());
2201        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2202
2203        bridge
2204            .execute("CREATE TABLE rename_col_test (id INT, name TEXT)")
2205            .unwrap();
2206
2207        let r = bridge
2208            .execute("ALTER TABLE rename_col_test RENAME COLUMN name TO full_name")
2209            .unwrap();
2210        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2211
2212        let schema = db.get_table_schema("rename_col_test").unwrap();
2213        assert_eq!(schema.columns[1].name, "full_name");
2214    }
2215
2216    #[test]
2217    fn test_sqlbridge_alter_rename_table() {
2218        let (db, _tmp) = setup_test_db();
2219        let conn = DatabaseSqlConnection::new(db.clone());
2220        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2221
2222        bridge
2223            .execute("CREATE TABLE old_name (id INT)")
2224            .unwrap();
2225
2226        let r = bridge
2227            .execute("ALTER TABLE old_name RENAME TO new_name")
2228            .unwrap();
2229        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2230
2231        assert!(db.get_table_schema("old_name").is_none());
2232        assert!(db.get_table_schema("new_name").is_some());
2233    }
2234
2235    #[test]
2236    fn test_sqlbridge_alter_multiple_ops() {
2237        let (db, _tmp) = setup_test_db();
2238        let conn = DatabaseSqlConnection::new(db.clone());
2239        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2240
2241        bridge
2242            .execute("CREATE TABLE multi_alter (id INT, a TEXT, b TEXT)")
2243            .unwrap();
2244
2245        // Add column and drop column in one statement
2246        let r = bridge
2247            .execute("ALTER TABLE multi_alter ADD COLUMN c INT, DROP COLUMN b")
2248            .unwrap();
2249        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2250
2251        let schema = db.get_table_schema("multi_alter").unwrap();
2252        let col_names: Vec<&str> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2253        assert_eq!(col_names, vec!["id", "a", "c"]);
2254    }
2255
2256    #[test]
2257    fn test_sqlbridge_alter_errors() {
2258        let (db, _tmp) = setup_test_db();
2259        let conn = DatabaseSqlConnection::new(db.clone());
2260        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2261
2262        bridge
2263            .execute("CREATE TABLE err_test (id INT, name TEXT)")
2264            .unwrap();
2265
2266        // Add duplicate column
2267        let r = bridge.execute("ALTER TABLE err_test ADD COLUMN name TEXT");
2268        assert!(r.is_err());
2269
2270        // Drop non-existent column
2271        let r = bridge.execute("ALTER TABLE err_test DROP COLUMN nonexistent");
2272        assert!(r.is_err());
2273
2274        // Alter non-existent table
2275        let r = bridge.execute("ALTER TABLE no_such_table ADD COLUMN x INT");
2276        assert!(r.is_err());
2277    }
2278
2279    // =======================================================================
2280    // JOIN Tests (Task 3 — Phase 3)
2281    // =======================================================================
2282
2283    /// Helper: set up two tables for join testing (users + orders)
2284    fn setup_join_tables(bridge: &mut crate::sql::bridge::SqlBridge<DatabaseSqlConnection>) {
2285        bridge.execute("CREATE TABLE users (id INT, name TEXT, dept TEXT)").unwrap();
2286        bridge.execute("INSERT INTO users (id, name, dept) VALUES (1, 'Alice', 'eng')").unwrap();
2287        bridge.execute("INSERT INTO users (id, name, dept) VALUES (2, 'Bob', 'sales')").unwrap();
2288        bridge.execute("INSERT INTO users (id, name, dept) VALUES (3, 'Carol', 'eng')").unwrap();
2289
2290        bridge.execute("CREATE TABLE orders (oid INT, user_id INT, amount FLOAT)").unwrap();
2291        bridge.execute("INSERT INTO orders (oid, user_id, amount) VALUES (10, 1, 99.50)").unwrap();
2292        bridge.execute("INSERT INTO orders (oid, user_id, amount) VALUES (11, 1, 45.00)").unwrap();
2293        bridge.execute("INSERT INTO orders (oid, user_id, amount) VALUES (12, 2, 200.00)").unwrap();
2294        // Note: Carol (id=3) has no orders; order 12 belongs to Bob (id=2)
2295    }
2296
2297    #[test]
2298    fn test_sqlbridge_inner_join() {
2299        let (db, _tmp) = setup_test_db();
2300        let conn = DatabaseSqlConnection::new(db.clone());
2301        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2302        setup_join_tables(&mut bridge);
2303
2304        let r = bridge.execute(
2305            "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id"
2306        ).unwrap();
2307
2308        let rows = r.rows().unwrap();
2309        assert_eq!(rows.len(), 3); // Alice×2, Bob×1
2310
2311        // Verify Alice appears twice
2312        let alice_rows: Vec<_> = rows.iter()
2313            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Alice".into())))
2314            .collect();
2315        assert_eq!(alice_rows.len(), 2);
2316
2317        // Verify Bob appears once
2318        let bob_rows: Vec<_> = rows.iter()
2319            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Bob".into())))
2320            .collect();
2321        assert_eq!(bob_rows.len(), 1);
2322
2323        // Carol should NOT appear (no matching orders)
2324        let carol_rows: Vec<_> = rows.iter()
2325            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2326            .collect();
2327        assert_eq!(carol_rows.len(), 0);
2328    }
2329
2330    #[test]
2331    fn test_sqlbridge_left_join() {
2332        let (db, _tmp) = setup_test_db();
2333        let conn = DatabaseSqlConnection::new(db.clone());
2334        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2335        setup_join_tables(&mut bridge);
2336
2337        let r = bridge.execute(
2338            "SELECT users.name, orders.amount FROM users LEFT JOIN orders ON users.id = orders.user_id"
2339        ).unwrap();
2340
2341        let rows = r.rows().unwrap();
2342        assert_eq!(rows.len(), 4); // Alice×2, Bob×1, Carol×1(NULL)
2343
2344        // Carol appears with NULL amount
2345        let carol_rows: Vec<_> = rows.iter()
2346            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2347            .collect();
2348        assert_eq!(carol_rows.len(), 1);
2349        assert_eq!(carol_rows[0].get("amount"), Some(&CoreSochValue::Null));
2350    }
2351
2352    #[test]
2353    fn test_sqlbridge_right_join() {
2354        let (db, _tmp) = setup_test_db();
2355        let conn = DatabaseSqlConnection::new(db.clone());
2356        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2357        setup_join_tables(&mut bridge);
2358
2359        // Right join: all orders appear; users without orders don't
2360        let r = bridge.execute(
2361            "SELECT users.name, orders.oid FROM users RIGHT JOIN orders ON users.id = orders.user_id"
2362        ).unwrap();
2363
2364        let rows = r.rows().unwrap();
2365        assert_eq!(rows.len(), 3); // All 3 orders match a user
2366    }
2367
2368    #[test]
2369    fn test_sqlbridge_cross_join() {
2370        let (db, _tmp) = setup_test_db();
2371        let conn = DatabaseSqlConnection::new(db.clone());
2372        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2373        setup_join_tables(&mut bridge);
2374
2375        let r = bridge.execute(
2376            "SELECT users.name, orders.oid FROM users CROSS JOIN orders"
2377        ).unwrap();
2378
2379        let rows = r.rows().unwrap();
2380        assert_eq!(rows.len(), 9); // 3 users × 3 orders = 9
2381    }
2382
2383    #[test]
2384    fn test_sqlbridge_join_with_where() {
2385        let (db, _tmp) = setup_test_db();
2386        let conn = DatabaseSqlConnection::new(db.clone());
2387        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2388        setup_join_tables(&mut bridge);
2389
2390        let r = bridge.execute(
2391            "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id WHERE orders.amount > 50"
2392        ).unwrap();
2393
2394        let rows = r.rows().unwrap();
2395        assert_eq!(rows.len(), 2); // Alice's 99.50 + Bob's 200.00
2396    }
2397
2398    #[test]
2399    fn test_sqlbridge_join_with_alias() {
2400        let (db, _tmp) = setup_test_db();
2401        let conn = DatabaseSqlConnection::new(db.clone());
2402        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2403        setup_join_tables(&mut bridge);
2404
2405        let r = bridge.execute(
2406            "SELECT u.name, o.amount FROM users u INNER JOIN orders o ON u.id = o.user_id"
2407        ).unwrap();
2408
2409        let rows = r.rows().unwrap();
2410        assert_eq!(rows.len(), 3);
2411    }
2412
2413    #[test]
2414    fn test_sqlbridge_join_with_limit() {
2415        let (db, _tmp) = setup_test_db();
2416        let conn = DatabaseSqlConnection::new(db.clone());
2417        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2418        setup_join_tables(&mut bridge);
2419
2420        let r = bridge.execute(
2421            "SELECT users.name, orders.oid FROM users INNER JOIN orders ON users.id = orders.user_id LIMIT 2"
2422        ).unwrap();
2423
2424        let rows = r.rows().unwrap();
2425        assert_eq!(rows.len(), 2);
2426    }
2427
2428    #[test]
2429    fn test_sqlbridge_join_three_tables() {
2430        let (db, _tmp) = setup_test_db();
2431        let conn = DatabaseSqlConnection::new(db.clone());
2432        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2433        setup_join_tables(&mut bridge);
2434
2435        // Add a departments table
2436        bridge.execute("CREATE TABLE departments (code TEXT, dname TEXT)").unwrap();
2437        bridge.execute("INSERT INTO departments (code, dname) VALUES ('eng', 'Engineering')").unwrap();
2438        bridge.execute("INSERT INTO departments (code, dname) VALUES ('sales', 'Sales')").unwrap();
2439
2440        let r = bridge.execute(
2441            "SELECT users.name, departments.dname FROM users INNER JOIN departments ON users.dept = departments.code"
2442        ).unwrap();
2443
2444        let rows = r.rows().unwrap();
2445        assert_eq!(rows.len(), 3); // All 3 users match a department
2446    }
2447
2448    #[test]
2449    fn test_namespaced_connection_prefixes_tables() {
2450        let ns_conn = NamespacedSqlConnection::new(
2451            MockConn::default(),
2452            "prod",
2453            "app",
2454        );
2455        assert_eq!(ns_conn.prefix_table("users"), "prod:app:users");
2456        assert_eq!(ns_conn.prefix_table("posts"), "prod:app:posts");
2457        assert_eq!(ns_conn.namespace(), "prod");
2458        assert_eq!(ns_conn.database(), "app");
2459    }
2460
2461    #[test]
2462    fn test_namespaced_connection_isolates_data() {
2463        let (db, _tmp) = setup_test_db();
2464
2465        // Create two namespaced connections to the same underlying DB
2466        let conn_a = NamespacedSqlConnection::new(
2467            DatabaseSqlConnection::new(db.clone()),
2468            "tenant_a",
2469            "db1",
2470        );
2471        let conn_b = NamespacedSqlConnection::new(
2472            DatabaseSqlConnection::new(db.clone()),
2473            "tenant_b",
2474            "db1",
2475        );
2476
2477        let mut bridge_a = crate::sql::bridge::SqlBridge::new(conn_a);
2478        let mut bridge_b = crate::sql::bridge::SqlBridge::new(conn_b);
2479
2480        // Tenant A creates a table and inserts data
2481        bridge_a.execute("CREATE TABLE users (name TEXT, age INTEGER)").unwrap();
2482        bridge_a.execute("INSERT INTO users (name, age) VALUES ('Alice', 30)").unwrap();
2483
2484        // Tenant B creates the same table name and inserts different data
2485        bridge_b.execute("CREATE TABLE users (name TEXT, age INTEGER)").unwrap();
2486        bridge_b.execute("INSERT INTO users (name, age) VALUES ('Bob', 25)").unwrap();
2487
2488        // Tenant A can only see their own data
2489        let result_a = bridge_a.execute("SELECT * FROM users").unwrap();
2490        let rows_a = result_a.rows().unwrap();
2491        assert_eq!(rows_a.len(), 1);
2492
2493        // Tenant B can only see their own data
2494        let result_b = bridge_b.execute("SELECT * FROM users").unwrap();
2495        let rows_b = result_b.rows().unwrap();
2496        assert_eq!(rows_b.len(), 1);
2497    }
2498
2499    /// Minimal mock connection for prefix testing
2500    #[derive(Default)]
2501    struct MockConn;
2502    impl crate::sql::bridge::SqlConnection for MockConn {
2503        fn select(&self, _: &str, _: &[String], _: Option<&Expr>, _: &[OrderByItem], _: Option<usize>, _: Option<usize>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2504            Ok(ExecutionResult::Rows { columns: vec![], rows: vec![] })
2505        }
2506        fn insert(&mut self, _: &str, _: Option<&[String]>, _: &[Vec<Expr>], _: Option<&OnConflict>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2507            Ok(ExecutionResult::RowsAffected(0))
2508        }
2509        fn update(&mut self, _: &str, _: &[Assignment], _: Option<&Expr>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2510            Ok(ExecutionResult::RowsAffected(0))
2511        }
2512        fn delete(&mut self, _: &str, _: Option<&Expr>, _: &[CoreSochValue]) -> SqlResult<ExecutionResult> {
2513            Ok(ExecutionResult::RowsAffected(0))
2514        }
2515        fn create_table(&mut self, _: &CreateTableStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2516        fn drop_table(&mut self, _: &DropTableStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2517        fn create_index(&mut self, _: &CreateIndexStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2518        fn drop_index(&mut self, _: &DropIndexStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2519        fn alter_table(&mut self, _: &AlterTableStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::Ok) }
2520        fn begin(&mut self, _: &BeginStmt) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::TransactionOk) }
2521        fn commit(&mut self) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::TransactionOk) }
2522        fn rollback(&mut self, _: Option<&str>) -> SqlResult<ExecutionResult> { Ok(ExecutionResult::TransactionOk) }
2523        fn table_exists(&self, _: &str) -> SqlResult<bool> { Ok(false) }
2524        fn index_exists(&self, _: &str) -> SqlResult<bool> { Ok(false) }
2525        fn scan_all(&self, _: &str, _: &[String]) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> { Ok(vec![]) }
2526        fn eval_join_predicate(&self, _: &Expr, _: &HashMap<String, CoreSochValue>, _: &[CoreSochValue]) -> Option<bool> { Some(true) }
2527    }
2528}