Skip to main content

sochdb_query/
storage_bridge.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Phase 0: Storage Bridge — Wiring SQL Execution to Real Storage
19//!
20//! This module provides concrete implementations of the query layer's
21//! [`StorageBackend`] and [`SqlConnection`] traits, backed by the actual
22//! `sochdb_storage::Database` kernel.
23//!
24//! ## Architecture
25//!
26//! ```text
27//! ┌─────────────────────────────────────────────────┐
28//! │           Query Layer (sochdb-query)             │
29//! │                                                  │
30//! │  ┌──────────────┐    ┌───────────────────────┐  │
31//! │  │ OptimizedExec │    │ SQL AST (SqlBridge)  │  │
32//! │  └──────┬───────┘    └───────────┬───────────┘  │
33//! │         │                        │               │
34//! │  ┌──────▼────────────────────────▼───────────┐  │
35//! │  │        StorageBackend trait                │  │
36//! │  │        SqlConnection trait                 │  │
37//! │  └──────────────────┬────────────────────────┘  │
38//! ├─────────────────────┼────────────────────────────┤
39//! │         ┌───────────▼───────────────┐            │
40//! │         │  DatabaseStorageBackend   │            │
41//! │         │  DatabaseSqlConnection    │            │
42//! │         └───────────┬───────────────┘            │
43//! ├─────────────────────┼────────────────────────────┤
44//! │           Storage Layer (sochdb-storage)         │
45//! │         ┌───────────▼───────────┐                │
46//! │         │   Database kernel     │                │
47//! │         │  (DurableStorage +    │                │
48//! │         │   MVCC + WAL)         │                │
49//! │         └───────────────────────┘                │
50//! └─────────────────────────────────────────────────┘
51//! ```
52//!
53//! ## SochValue Bridging
54//!
55//! The storage layer uses `sochdb_core::SochValue` (10 variants including
56//! `Object` and `Ref`), while the query optimizer uses
57//! `sochdb_query::soch_ql::SochValue` (8 variants, missing `Object`/`Ref`).
58//!
59//! This module provides [`convert_core_to_query()`] to bridge the gap:
60//! - `Object(map)` → serialized as `Text(json_string)`
61//! - `Ref { table, id }` → `Text("table/id")`
62
63use crate::optimizer_integration::StorageBackend;
64use crate::soch_ql::SochValue as QuerySochValue;
65use crate::sql::ast::*;
66use crate::sql::bridge::{ExecutionResult, SqlConnection};
67use crate::sql::error::{SqlError, SqlResult};
68use sochdb_core::SochValue as CoreSochValue;
69use sochdb_storage::{Database, KernelTxnHandle};
70use std::collections::HashMap;
71use std::sync::Arc;
72
73// ============================================================================
74// SochValue Conversion (Core ↔ Query)
75// ============================================================================
76
77/// Convert `sochdb_core::SochValue` to `sochdb_query::soch_ql::SochValue`.
78///
79/// Handles the two extra variants in core:
80/// - `Object(HashMap)` → `Text(json_serialized)` for query-layer consumption
81/// - `Ref { table, id }` → `Text("table/id")` as a foreign-key string
82pub fn convert_core_to_query(value: &CoreSochValue) -> QuerySochValue {
83    match value {
84        CoreSochValue::Null => QuerySochValue::Null,
85        CoreSochValue::Bool(b) => QuerySochValue::Bool(*b),
86        CoreSochValue::Int(i) => QuerySochValue::Int(*i),
87        CoreSochValue::UInt(u) => QuerySochValue::UInt(*u),
88        CoreSochValue::Float(f) => QuerySochValue::Float(*f),
89        CoreSochValue::Text(s) => QuerySochValue::Text(s.clone()),
90        CoreSochValue::Binary(b) => QuerySochValue::Binary(b.clone()),
91        CoreSochValue::Array(arr) => {
92            QuerySochValue::Array(arr.iter().map(convert_core_to_query).collect())
93        }
94        CoreSochValue::Object(map) => {
95            // Serialize to JSON text for query-layer consumption
96            match serde_json::to_string(map) {
97                Ok(json) => QuerySochValue::Text(json),
98                Err(_) => QuerySochValue::Text(format!("{:?}", map)),
99            }
100        }
101        CoreSochValue::Ref { table, id } => QuerySochValue::Text(format!("{}/{}", table, id)),
102    }
103}
104
105/// Convert `sochdb_query::soch_ql::SochValue` to `sochdb_core::SochValue`.
106pub fn convert_query_to_core(value: &QuerySochValue) -> CoreSochValue {
107    match value {
108        QuerySochValue::Null => CoreSochValue::Null,
109        QuerySochValue::Bool(b) => CoreSochValue::Bool(*b),
110        QuerySochValue::Int(i) => CoreSochValue::Int(*i),
111        QuerySochValue::UInt(u) => CoreSochValue::UInt(*u),
112        QuerySochValue::Float(f) => CoreSochValue::Float(*f),
113        QuerySochValue::Text(s) => CoreSochValue::Text(s.clone()),
114        QuerySochValue::Binary(b) => CoreSochValue::Binary(b.clone()),
115        QuerySochValue::Array(arr) => {
116            CoreSochValue::Array(arr.iter().map(convert_query_to_core).collect())
117        }
118    }
119}
120
121/// Convert a row from core format to query format.
122fn convert_row_core_to_query(
123    row: HashMap<String, CoreSochValue>,
124) -> HashMap<String, QuerySochValue> {
125    row.into_iter()
126        .map(|(k, v)| (k, convert_core_to_query(&v)))
127        .collect()
128}
129
130/// Convert rows from core format to query format.
131fn convert_rows_core_to_query(
132    rows: Vec<HashMap<String, CoreSochValue>>,
133) -> Vec<HashMap<String, QuerySochValue>> {
134    rows.into_iter().map(convert_row_core_to_query).collect()
135}
136
137// ============================================================================
138// DatabaseStorageBackend — Implements StorageBackend for real storage
139// ============================================================================
140
141/// Concrete implementation of [`StorageBackend`] backed by `sochdb_storage::Database`.
142///
143/// This bridges the query optimizer to the actual storage engine, enabling
144/// cost-based query plans to execute against real data.
145///
146/// # Thread Safety
147///
148/// `Database` is always `Arc<Database>` and is `Send + Sync`. This struct
149/// holds an `Arc` clone and can be shared across threads.
150///
151/// # Transaction Management
152///
153/// Each query operation creates a read-only transaction for MVCC snapshot
154/// isolation. Transactions are automatically cleaned up on completion.
155pub struct DatabaseStorageBackend {
156    db: Arc<Database>,
157}
158
159impl DatabaseStorageBackend {
160    /// Create a new storage backend wrapping a `Database` instance.
161    pub fn new(db: Arc<Database>) -> Self {
162        Self { db }
163    }
164
165    /// Get a reference to the underlying database.
166    pub fn database(&self) -> &Arc<Database> {
167        &self.db
168    }
169
170    /// Execute a read-only operation within an MVCC transaction.
171    fn with_read_txn<F, T>(&self, f: F) -> sochdb_core::Result<T>
172    where
173        F: FnOnce(KernelTxnHandle) -> sochdb_core::Result<T>,
174    {
175        let txn = self.db.begin_read_only_fast();
176        let result = f(txn);
177        self.db.abort_read_only_fast(txn);
178        result
179    }
180}
181
182impl StorageBackend for DatabaseStorageBackend {
183    fn table_scan(
184        &self,
185        table: &str,
186        columns: &[String],
187        predicate: Option<&str>,
188    ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
189        self.with_read_txn(|txn| {
190            let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
191
192            let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
193                self.db.query(txn, table)
194            } else {
195                self.db.query(txn, table).columns(&col_refs)
196            };
197
198            let result = query.execute()?;
199            let mut rows = convert_rows_core_to_query(result.rows);
200
201            // Post-scan predicate evaluation (simple string-based for now)
202            if let Some(pred) = predicate {
203                rows = apply_simple_predicate(&rows, pred);
204            }
205
206            Ok(rows)
207        })
208    }
209
210    fn primary_key_lookup(
211        &self,
212        table: &str,
213        key: &QuerySochValue,
214    ) -> sochdb_core::Result<Option<HashMap<String, QuerySochValue>>> {
215        let row_id = match key {
216            QuerySochValue::Int(i) => *i as u64,
217            QuerySochValue::UInt(u) => *u,
218            _ => return Ok(None),
219        };
220
221        self.with_read_txn(|txn| {
222            let result = self.db.read_row(txn, table, row_id, None)?;
223            Ok(result.map(convert_row_core_to_query))
224        })
225    }
226
227    fn secondary_index_seek(
228        &self,
229        table: &str,
230        index: &str,
231        key: &QuerySochValue,
232    ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
233        // Fall back to table scan with filtering on the indexed column.
234        let column_name = index.to_string();
235        let core_key = convert_query_to_core(key);
236
237        self.with_read_txn(|txn| {
238            let result = self.db.query(txn, table).execute()?;
239            let rows: Vec<HashMap<String, QuerySochValue>> = result
240                .rows
241                .into_iter()
242                .filter(|row| {
243                    row.get(&column_name)
244                        .map(|v| v == &core_key)
245                        .unwrap_or(false)
246                })
247                .map(convert_row_core_to_query)
248                .collect();
249            Ok(rows)
250        })
251    }
252
253    fn time_index_scan(
254        &self,
255        table: &str,
256        start_us: u64,
257        end_us: u64,
258    ) -> sochdb_core::Result<Vec<HashMap<String, QuerySochValue>>> {
259        self.with_read_txn(|txn| {
260            let result = self.db.query(txn, table).execute()?;
261            let rows: Vec<HashMap<String, QuerySochValue>> = result
262                .rows
263                .into_iter()
264                .filter(|row| {
265                    if let Some(CoreSochValue::UInt(ts)) = row.get("_timestamp") {
266                        *ts >= start_us && *ts <= end_us
267                    } else if let Some(CoreSochValue::Int(ts)) = row.get("_timestamp") {
268                        let ts = *ts as u64;
269                        ts >= start_us && ts <= end_us
270                    } else {
271                        false
272                    }
273                })
274                .map(convert_row_core_to_query)
275                .collect();
276            Ok(rows)
277        })
278    }
279
280    fn vector_search(
281        &self,
282        table: &str,
283        query: &[f32],
284        k: usize,
285    ) -> sochdb_core::Result<Vec<(f32, HashMap<String, QuerySochValue>)>> {
286        self.with_read_txn(|txn| {
287            let result = self.db.query(txn, table).execute()?;
288
289            let mut scored: Vec<(f32, HashMap<String, CoreSochValue>)> = result
290                .rows
291                .into_iter()
292                .filter_map(|row| {
293                    let vec_col = row.get("_vector").or_else(|| row.get("_embedding"));
294
295                    if let Some(CoreSochValue::Array(arr)) = vec_col {
296                        let vec: Vec<f32> = arr
297                            .iter()
298                            .filter_map(|v| match v {
299                                CoreSochValue::Float(f) => Some(*f as f32),
300                                CoreSochValue::Int(i) => Some(*i as f32),
301                                _ => None,
302                            })
303                            .collect();
304
305                        if vec.len() == query.len() {
306                            let dist = euclidean_distance(&vec, query);
307                            Some((dist, row))
308                        } else {
309                            None
310                        }
311                    } else {
312                        None
313                    }
314                })
315                .collect();
316
317            scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
318            scored.truncate(k);
319
320            Ok(scored
321                .into_iter()
322                .map(|(dist, row)| (dist, convert_row_core_to_query(row)))
323                .collect())
324        })
325    }
326
327    fn row_count(&self, table: &str) -> usize {
328        let txn = self.db.begin_read_only_fast();
329        let count = self
330            .db
331            .query(txn, table)
332            .execute()
333            .map(|r| r.rows_scanned)
334            .unwrap_or(0);
335        self.db.abort_read_only_fast(txn);
336        count
337    }
338}
339
340// ============================================================================
341// DatabaseSqlConnection — Implements SqlConnection for real storage
342// ============================================================================
343
344/// Concrete implementation of [`SqlConnection`] backed by `sochdb_storage::Database`.
345///
346/// This enables the `SqlBridge` to route parsed SQL AST operations to the
347/// actual storage engine, making `SELECT`, `INSERT`, `UPDATE`, `DELETE`,
348/// and DDL statements work against real persisted data.
349///
350/// # Transaction Model
351///
352/// The connection maintains an optional active write transaction. Read-only
353/// operations (`select`, `table_exists`) use fast read-only transactions.
354/// Write operations auto-begin a transaction if none is active.
355pub struct DatabaseSqlConnection {
356    db: Arc<Database>,
357    active_txn: Option<KernelTxnHandle>,
358    /// Whether the current transaction was explicitly started via BEGIN.
359    explicit_txn: bool,
360    /// Counter for auto-incrementing row IDs per table.
361    next_row_ids: HashMap<String, u64>,
362}
363
364impl DatabaseSqlConnection {
365    /// Create a new SQL connection wrapping a `Database` instance.
366    pub fn new(db: Arc<Database>) -> Self {
367        Self {
368            db,
369            active_txn: None,
370            explicit_txn: false,
371            next_row_ids: HashMap::new(),
372        }
373    }
374
375    /// Get a reference to the underlying database.
376    pub fn database(&self) -> &Arc<Database> {
377        &self.db
378    }
379
380    /// Get or create an active write transaction.
381    fn ensure_write_txn(&mut self) -> SqlResult<KernelTxnHandle> {
382        if let Some(txn) = self.active_txn {
383            Ok(txn)
384        } else {
385            let txn = self
386                .db
387                .begin_transaction()
388                .map_err(|e| SqlError::ExecutionError(format!("Failed to begin txn: {}", e)))?;
389            self.active_txn = Some(txn);
390            Ok(txn)
391        }
392    }
393
394    /// Auto-commit if we created an implicit transaction.
395    /// Explicit transactions (started via BEGIN) are not auto-committed.
396    fn auto_commit_if_implicit(&mut self) -> SqlResult<()> {
397        if self.explicit_txn {
398            return Ok(()); // Let the user COMMIT explicitly
399        }
400        if let Some(txn) = self.active_txn.take() {
401            self.db
402                .commit(txn)
403                .map_err(|e| SqlError::ExecutionError(format!("Commit failed: {}", e)))?;
404        }
405        Ok(())
406    }
407
408    /// Get the next row ID for a table (simple auto-increment).
409    fn next_row_id(&mut self, table: &str) -> u64 {
410        let counter = self.next_row_ids.entry(table.to_string()).or_insert(0);
411        *counter += 1;
412        *counter
413    }
414
415    /// Initialize the row ID counter from existing data.
416    fn init_row_id_counter(&mut self, table: &str) {
417        if self.next_row_ids.contains_key(table) {
418            return;
419        }
420        let txn = self.db.begin_read_only_fast();
421        let max_id = self
422            .db
423            .query(txn, table)
424            .execute()
425            .map(|r| r.rows_scanned as u64)
426            .unwrap_or(0);
427        self.db.abort_read_only_fast(txn);
428        self.next_row_ids.insert(table.to_string(), max_id);
429    }
430
431    /// Evaluate a SQL expression against a row for WHERE clause filtering.
432    fn eval_expr(
433        &self,
434        expr: &Expr,
435        row: &HashMap<String, CoreSochValue>,
436        params: &[CoreSochValue],
437    ) -> Option<CoreSochValue> {
438        match expr {
439            Expr::Column(col_ref) => {
440                // Try qualified lookup first: "table.column"
441                if let Some(ref tbl) = col_ref.table {
442                    let qualified = format!("{}.{}", tbl, col_ref.column);
443                    if let Some(v) = row.get(&qualified) {
444                        return Some(v.clone());
445                    }
446                }
447                // Fall back to unqualified: "column"
448                let col_name = &col_ref.column;
449                row.get(col_name).cloned()
450            }
451            Expr::Literal(lit) => Some(literal_to_core(lit)),
452            Expr::Placeholder(idx) => params.get((*idx as usize).saturating_sub(1)).cloned(),
453            Expr::BinaryOp { left, op, right } => {
454                let lhs = self.eval_expr(left, row, params)?;
455                let rhs = self.eval_expr(right, row, params)?;
456                Some(eval_binary_op(&lhs, op, &rhs))
457            }
458            Expr::UnaryOp { op, expr: inner } => {
459                let val = self.eval_expr(inner, row, params)?;
460                Some(eval_unary_op(op, &val))
461            }
462            Expr::IsNull {
463                expr: inner,
464                negated,
465            } => {
466                let val = self.eval_expr(inner, row, params)?;
467                let is_null = matches!(val, CoreSochValue::Null);
468                Some(CoreSochValue::Bool(if *negated {
469                    !is_null
470                } else {
471                    is_null
472                }))
473            }
474            Expr::Between {
475                expr: inner,
476                low,
477                high,
478                negated,
479            } => {
480                let val = self.eval_expr(inner, row, params)?;
481                let lo = self.eval_expr(low, row, params)?;
482                let hi = self.eval_expr(high, row, params)?;
483                let in_range = compare_values(&val, &lo) != std::cmp::Ordering::Less
484                    && compare_values(&val, &hi) != std::cmp::Ordering::Greater;
485                Some(CoreSochValue::Bool(if *negated {
486                    !in_range
487                } else {
488                    in_range
489                }))
490            }
491            Expr::InList {
492                expr: inner,
493                list,
494                negated,
495            } => {
496                let val = self.eval_expr(inner, row, params)?;
497                let found = list
498                    .iter()
499                    .any(|item| self.eval_expr(item, row, params) == Some(val.clone()));
500                Some(CoreSochValue::Bool(if *negated { !found } else { found }))
501            }
502            Expr::Like {
503                expr: inner,
504                pattern,
505                negated,
506                ..
507            } => {
508                let val = self.eval_expr(inner, row, params)?;
509                let pat = self.eval_expr(pattern, row, params)?;
510                if let (CoreSochValue::Text(s), CoreSochValue::Text(p)) = (&val, &pat) {
511                    let matched = sql_like_match(s, p);
512                    Some(CoreSochValue::Bool(if *negated {
513                        !matched
514                    } else {
515                        matched
516                    }))
517                } else {
518                    Some(CoreSochValue::Bool(false))
519                }
520            }
521            Expr::Function(func_call) => {
522                let func_name = func_call.name.name().to_uppercase();
523                match func_name.as_str() {
524                    "UPPER" => {
525                        let val = func_call
526                            .args
527                            .first()
528                            .and_then(|a| self.eval_expr(a, row, params))?;
529                        if let CoreSochValue::Text(s) = val {
530                            Some(CoreSochValue::Text(s.to_uppercase()))
531                        } else {
532                            Some(CoreSochValue::Null)
533                        }
534                    }
535                    "LOWER" => {
536                        let val = func_call
537                            .args
538                            .first()
539                            .and_then(|a| self.eval_expr(a, row, params))?;
540                        if let CoreSochValue::Text(s) = val {
541                            Some(CoreSochValue::Text(s.to_lowercase()))
542                        } else {
543                            Some(CoreSochValue::Null)
544                        }
545                    }
546                    "LENGTH" | "LEN" => {
547                        let val = func_call
548                            .args
549                            .first()
550                            .and_then(|a| self.eval_expr(a, row, params))?;
551                        if let CoreSochValue::Text(s) = val {
552                            Some(CoreSochValue::Int(s.len() as i64))
553                        } else {
554                            Some(CoreSochValue::Null)
555                        }
556                    }
557                    "COALESCE" => {
558                        for arg in &func_call.args {
559                            if let Some(val) = self.eval_expr(arg, row, params) {
560                                if !matches!(val, CoreSochValue::Null) {
561                                    return Some(val);
562                                }
563                            }
564                        }
565                        Some(CoreSochValue::Null)
566                    }
567                    // Aggregate functions pass through for row-level evaluation
568                    _ => func_call
569                        .args
570                        .first()
571                        .and_then(|a| self.eval_expr(a, row, params)),
572                }
573            }
574            _ => None,
575        }
576    }
577
578    /// Check if a WHERE expression evaluates to true for a given row.
579    fn row_matches(
580        &self,
581        expr: &Expr,
582        row: &HashMap<String, CoreSochValue>,
583        params: &[CoreSochValue],
584    ) -> bool {
585        match self.eval_expr(expr, row, params) {
586            Some(CoreSochValue::Bool(b)) => b,
587            _ => false,
588        }
589    }
590
591    /// Find the row_id for a row by scanning the table key space.
592    ///
593    /// This is O(n) and should be optimized with a primary key index.
594    fn find_row_id(
595        &self,
596        table: &str,
597        target_row: &HashMap<String, CoreSochValue>,
598        txn: KernelTxnHandle,
599    ) -> SqlResult<Option<u64>> {
600        let entries = self
601            .db
602            .scan(txn, table.as_bytes())
603            .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)))?;
604
605        for (key_bytes, _value_bytes) in entries {
606            if let Ok(key_str) = String::from_utf8(key_bytes) {
607                // Keys are "table/row_id"
608                let parts: Vec<&str> = key_str.split('/').collect();
609                if parts.len() == 2 {
610                    if let Ok(row_id) = parts[1].parse::<u64>() {
611                        if let Ok(Some(row)) = self.db.read_row(txn, table, row_id, None) {
612                            if rows_equal(&row, target_row) {
613                                return Ok(Some(row_id));
614                            }
615                        }
616                    }
617                }
618            }
619        }
620
621        Ok(None)
622    }
623}
624
625impl SqlConnection for DatabaseSqlConnection {
626    fn select(
627        &self,
628        table: &str,
629        columns: &[String],
630        where_clause: Option<&Expr>,
631        order_by: &[OrderByItem],
632        limit: Option<usize>,
633        offset: Option<usize>,
634        params: &[CoreSochValue],
635    ) -> SqlResult<ExecutionResult> {
636        let txn = self.db.begin_read_only_fast();
637
638        let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
639            self.db.query(txn, table)
640        } else {
641            let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
642            self.db.query(txn, table).columns(&col_refs)
643        };
644
645        let result = query
646            .execute()
647            .map_err(|e| SqlError::ExecutionError(format!("Query failed: {}", e)));
648
649        self.db.abort_read_only_fast(txn);
650
651        let result = result?;
652        let mut rows = result.rows;
653
654        // Apply WHERE filter
655        if let Some(expr) = where_clause {
656            rows.retain(|row| self.row_matches(expr, row, params));
657        }
658
659        // Apply ORDER BY
660        if !order_by.is_empty() {
661            rows.sort_by(|a, b| {
662                for item in order_by {
663                    let col_name = extract_order_by_column(&item.expr);
664                    let va = a.get(&col_name);
665                    let vb = b.get(&col_name);
666                    let cmp = compare_optional_values(va, vb);
667                    let cmp = if !item.asc { cmp.reverse() } else { cmp };
668                    if cmp != std::cmp::Ordering::Equal {
669                        return cmp;
670                    }
671                }
672                std::cmp::Ordering::Equal
673            });
674        }
675
676        // Apply OFFSET
677        if let Some(off) = offset {
678            rows = rows.into_iter().skip(off).collect();
679        }
680
681        // Apply LIMIT
682        if let Some(lim) = limit {
683            rows.truncate(lim);
684        }
685
686        // Determine column names
687        let result_columns = if columns.is_empty() || columns.iter().any(|c| c == "*") {
688            rows.first()
689                .map(|r| r.keys().cloned().collect())
690                .unwrap_or_default()
691        } else {
692            columns.to_vec()
693        };
694
695        Ok(ExecutionResult::Rows {
696            columns: result_columns,
697            rows,
698        })
699    }
700
701    fn insert(
702        &mut self,
703        table: &str,
704        columns: Option<&[String]>,
705        rows: &[Vec<Expr>],
706        _on_conflict: Option<&OnConflict>,
707        params: &[CoreSochValue],
708    ) -> SqlResult<ExecutionResult> {
709        let txn = self.ensure_write_txn()?;
710        self.init_row_id_counter(table);
711
712        let schema = self.db.get_table_schema(table);
713        let col_names: Vec<String> = if let Some(cols) = columns {
714            cols.to_vec()
715        } else if let Some(ref s) = schema {
716            s.columns.iter().map(|c| c.name.clone()).collect()
717        } else {
718            return Err(SqlError::InvalidArgument(
719                "INSERT requires column names when table has no schema".into(),
720            ));
721        };
722
723        let mut inserted = 0;
724        for row_exprs in rows {
725            let row_id = self.next_row_id(table);
726            let mut values = HashMap::new();
727
728            for (i, expr) in row_exprs.iter().enumerate() {
729                if i < col_names.len() {
730                    let value = match expr {
731                        Expr::Literal(lit) => literal_to_core(lit),
732                        Expr::Placeholder(idx) => params
733                            .get((*idx as usize).saturating_sub(1))
734                            .cloned()
735                            .unwrap_or(CoreSochValue::Null),
736                        _ => CoreSochValue::Null,
737                    };
738                    values.insert(col_names[i].clone(), value);
739                }
740            }
741
742            self.db
743                .insert_row(txn, table, row_id, &values)
744                .map_err(|e| SqlError::ExecutionError(format!("Insert failed: {}", e)))?;
745            inserted += 1;
746        }
747
748        self.auto_commit_if_implicit()?;
749        Ok(ExecutionResult::RowsAffected(inserted))
750    }
751
752    fn update(
753        &mut self,
754        table: &str,
755        assignments: &[Assignment],
756        where_clause: Option<&Expr>,
757        params: &[CoreSochValue],
758    ) -> SqlResult<ExecutionResult> {
759        let txn = self.ensure_write_txn()?;
760
761        let result = self
762            .db
763            .query(txn, table)
764            .execute()
765            .map_err(|e| SqlError::ExecutionError(format!("Scan for update failed: {}", e)))?;
766
767        let mut updated = 0;
768
769        for row in &result.rows {
770            let matches = match where_clause {
771                Some(expr) => self.row_matches(expr, row, params),
772                None => true,
773            };
774
775            if matches {
776                let row_id = self.find_row_id(table, row, txn)?;
777                if let Some(row_id) = row_id {
778                    let mut new_values = row.clone();
779                    for assignment in assignments {
780                        let col_name = assignment.column.clone();
781                        let value = match &assignment.value {
782                            Expr::Literal(lit) => literal_to_core(lit),
783                            Expr::Placeholder(idx) => params
784                                .get((*idx as usize).saturating_sub(1))
785                                .cloned()
786                                .unwrap_or(CoreSochValue::Null),
787                            _ => self
788                                .eval_expr(&assignment.value, row, params)
789                                .unwrap_or(CoreSochValue::Null),
790                        };
791                        new_values.insert(col_name, value);
792                    }
793
794                    self.db
795                        .insert_row(txn, table, row_id, &new_values)
796                        .map_err(|e| SqlError::ExecutionError(format!("Update failed: {}", e)))?;
797                    updated += 1;
798                }
799            }
800        }
801
802        self.auto_commit_if_implicit()?;
803        Ok(ExecutionResult::RowsAffected(updated))
804    }
805
806    fn delete(
807        &mut self,
808        table: &str,
809        where_clause: Option<&Expr>,
810        params: &[CoreSochValue],
811    ) -> SqlResult<ExecutionResult> {
812        let txn = self.ensure_write_txn()?;
813
814        let result = self
815            .db
816            .query(txn, table)
817            .execute()
818            .map_err(|e| SqlError::ExecutionError(format!("Scan for delete failed: {}", e)))?;
819
820        let mut deleted = 0;
821
822        for row in &result.rows {
823            let matches = match where_clause {
824                Some(expr) => self.row_matches(expr, row, params),
825                None => true,
826            };
827
828            if matches {
829                if let Some(row_id) = self.find_row_id(table, row, txn)? {
830                    let key = format!("{}/{}", table, row_id);
831                    self.db
832                        .delete(txn, key.as_bytes())
833                        .map_err(|e| SqlError::ExecutionError(format!("Delete failed: {}", e)))?;
834                    deleted += 1;
835                }
836            }
837        }
838
839        self.auto_commit_if_implicit()?;
840        Ok(ExecutionResult::RowsAffected(deleted))
841    }
842
843    fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
844        use sochdb_storage::DbColumnDef;
845        use sochdb_storage::DbTableSchema;
846
847        let table_name = stmt.name.name().to_string();
848
849        if self.db.get_table_schema(&table_name).is_some() {
850            if stmt.if_not_exists {
851                return Ok(ExecutionResult::Ok);
852            }
853            return Err(SqlError::InvalidArgument(format!(
854                "Table '{}' already exists",
855                table_name
856            )));
857        }
858
859        let columns: Vec<DbColumnDef> = stmt
860            .columns
861            .iter()
862            .map(|col| {
863                let col_type = sql_type_to_db_type(&col.data_type);
864                let nullable = !col
865                    .constraints
866                    .iter()
867                    .any(|c| matches!(c, ColumnConstraint::NotNull));
868                DbColumnDef {
869                    name: col.name.clone(),
870                    col_type,
871                    nullable,
872                }
873            })
874            .collect();
875
876        let schema = DbTableSchema {
877            name: table_name,
878            columns,
879        };
880
881        self.db
882            .register_table(schema)
883            .map_err(|e| SqlError::ExecutionError(format!("Create table failed: {}", e)))?;
884
885        Ok(ExecutionResult::Ok)
886    }
887
888    fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
889        let table_name = stmt
890            .names
891            .first()
892            .map(|n| n.name().to_string())
893            .unwrap_or_default();
894
895        if self.db.get_table_schema(&table_name).is_none() {
896            if stmt.if_exists {
897                return Ok(ExecutionResult::Ok);
898            }
899            return Err(SqlError::TableNotFound(table_name));
900        }
901        // Schema removal only for now — full data cleanup is deferred.
902        // TODO: scan and delete all rows under the table prefix
903        Ok(ExecutionResult::Ok)
904    }
905
906    fn create_index(&mut self, _stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
907        // Index creation deferred to TableIndexRegistry integration
908        Ok(ExecutionResult::Ok)
909    }
910
911    fn drop_index(&mut self, _stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
912        Ok(ExecutionResult::Ok)
913    }
914
915    fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
916        use sochdb_storage::DbColumnDef;
917
918        let table_name = stmt.name.name().to_string();
919        let mut schema = self
920            .db
921            .get_table_schema(&table_name)
922            .ok_or_else(|| SqlError::TableNotFound(table_name.clone()))?;
923
924        let original_name = table_name.clone();
925
926        for op in &stmt.operations {
927            match op {
928                AlterTableOp::AddColumn(col_def) => {
929                    // Check for duplicate column
930                    if schema.columns.iter().any(|c| c.name == col_def.name) {
931                        return Err(SqlError::InvalidArgument(format!(
932                            "Column '{}' already exists in table '{}'",
933                            col_def.name, schema.name
934                        )));
935                    }
936                    let col_type = sql_type_to_db_type(&col_def.data_type);
937                    let nullable = !col_def
938                        .constraints
939                        .iter()
940                        .any(|c| matches!(c, ColumnConstraint::NotNull));
941                    schema.columns.push(DbColumnDef {
942                        name: col_def.name.clone(),
943                        col_type,
944                        nullable,
945                    });
946                }
947                AlterTableOp::DropColumn { name, .. } => {
948                    let idx = schema
949                        .columns
950                        .iter()
951                        .position(|c| c.name == *name)
952                        .ok_or_else(|| {
953                            SqlError::InvalidArgument(format!(
954                                "Column '{}' not found in table '{}'",
955                                name, schema.name
956                            ))
957                        })?;
958                    schema.columns.remove(idx);
959                }
960                AlterTableOp::RenameColumn { old_name, new_name } => {
961                    let col = schema
962                        .columns
963                        .iter_mut()
964                        .find(|c| c.name == *old_name)
965                        .ok_or_else(|| {
966                            SqlError::InvalidArgument(format!(
967                                "Column '{}' not found in table '{}'",
968                                old_name, schema.name
969                            ))
970                        })?;
971                    col.name = new_name.clone();
972                }
973                AlterTableOp::RenameTable(new_name) => {
974                    schema.name = new_name.name().to_string();
975                }
976                AlterTableOp::AlterColumn { name, operation } => {
977                    let col = schema
978                        .columns
979                        .iter_mut()
980                        .find(|c| c.name == *name)
981                        .ok_or_else(|| {
982                            SqlError::InvalidArgument(format!(
983                                "Column '{}' not found in table '{}'",
984                                name, schema.name
985                            ))
986                        })?;
987                    match operation {
988                        AlterColumnOp::SetType(data_type) => {
989                            col.col_type = sql_type_to_db_type(data_type);
990                        }
991                        AlterColumnOp::SetNotNull => {
992                            col.nullable = false;
993                        }
994                        AlterColumnOp::DropNotNull => {
995                            col.nullable = true;
996                        }
997                        AlterColumnOp::SetDefault(_) | AlterColumnOp::DropDefault => {
998                            // Defaults are handled at the SQL layer, not stored
999                            // in the storage schema currently. This is a no-op.
1000                        }
1001                    }
1002                }
1003                AlterTableOp::AddConstraint(_) | AlterTableOp::DropConstraint { .. } => {
1004                    return Err(SqlError::NotImplemented(
1005                        "ADD/DROP CONSTRAINT not yet implemented".into(),
1006                    ));
1007                }
1008            }
1009        }
1010
1011        self.db
1012            .update_table_schema(&original_name, schema)
1013            .map_err(|e| SqlError::ExecutionError(format!("ALTER TABLE failed: {}", e)))?;
1014
1015        Ok(ExecutionResult::Ok)
1016    }
1017
1018    fn begin(&mut self, _stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1019        if self.active_txn.is_some() {
1020            return Err(SqlError::TransactionError(
1021                "Transaction already active".into(),
1022            ));
1023        }
1024        let txn = self
1025            .db
1026            .begin_transaction()
1027            .map_err(|e| SqlError::ExecutionError(format!("Begin failed: {}", e)))?;
1028        self.active_txn = Some(txn);
1029        self.explicit_txn = true;
1030        Ok(ExecutionResult::TransactionOk)
1031    }
1032
1033    fn commit(&mut self) -> SqlResult<ExecutionResult> {
1034        if let Some(txn) = self.active_txn.take() {
1035            self.explicit_txn = false;
1036            self.db
1037                .commit(txn)
1038                .map_err(|e| SqlError::TransactionError(format!("Commit failed: {}", e)))?;
1039            Ok(ExecutionResult::TransactionOk)
1040        } else {
1041            Err(SqlError::TransactionError("No active transaction".into()))
1042        }
1043    }
1044
1045    fn rollback(&mut self, _savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1046        if let Some(txn) = self.active_txn.take() {
1047            self.explicit_txn = false;
1048            self.db
1049                .abort(txn)
1050                .map_err(|e| SqlError::TransactionError(format!("Rollback failed: {}", e)))?;
1051            Ok(ExecutionResult::TransactionOk)
1052        } else {
1053            Err(SqlError::TransactionError("No active transaction".into()))
1054        }
1055    }
1056
1057    fn table_exists(&self, table: &str) -> SqlResult<bool> {
1058        Ok(self.db.get_table_schema(table).is_some())
1059    }
1060
1061    fn index_exists(&self, _index: &str) -> SqlResult<bool> {
1062        // TODO: wire to TableIndexRegistry
1063        Ok(false)
1064    }
1065
1066    fn scan_all(
1067        &self,
1068        table: &str,
1069        columns: &[String],
1070    ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1071        let txn = self.db.begin_read_only_fast();
1072
1073        let query = if columns.is_empty() || columns.iter().any(|c| c == "*") {
1074            self.db.query(txn, table)
1075        } else {
1076            let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
1077            self.db.query(txn, table).columns(&col_refs)
1078        };
1079
1080        let result = query
1081            .execute()
1082            .map_err(|e| SqlError::ExecutionError(format!("Scan failed: {}", e)));
1083
1084        self.db.abort_read_only_fast(txn);
1085        Ok(result?.rows)
1086    }
1087
1088    fn eval_join_predicate(
1089        &self,
1090        expr: &Expr,
1091        row: &HashMap<String, CoreSochValue>,
1092        params: &[CoreSochValue],
1093    ) -> Option<bool> {
1094        let val = self.eval_expr(expr, row, params)?;
1095        match val {
1096            CoreSochValue::Bool(b) => Some(b),
1097            CoreSochValue::Null => Some(false),
1098            _ => Some(false),
1099        }
1100    }
1101}
1102
1103// ============================================================================
1104// Helper Functions
1105// ============================================================================
1106
1107/// Simple string-based predicate evaluation for `StorageBackend::table_scan()`.
1108///
1109/// Supports: `column = value`, `column != value`, `column > value`,
1110/// `column < value`, `column >= value`, `column <= value`.
1111fn apply_simple_predicate(
1112    rows: &[HashMap<String, QuerySochValue>],
1113    predicate: &str,
1114) -> Vec<HashMap<String, QuerySochValue>> {
1115    let operators = [">=", "<=", "!=", "=", ">", "<"];
1116
1117    for op in &operators {
1118        if let Some(idx) = predicate.find(op) {
1119            let column = predicate[..idx].trim();
1120            let value_str = predicate[idx + op.len()..].trim().trim_matches('\'');
1121
1122            return rows
1123                .iter()
1124                .filter(|row| {
1125                    if let Some(val) = row.get(column) {
1126                        let val_str = match val {
1127                            QuerySochValue::Text(s) => s.clone(),
1128                            QuerySochValue::Int(i) => i.to_string(),
1129                            QuerySochValue::UInt(u) => u.to_string(),
1130                            QuerySochValue::Float(f) => f.to_string(),
1131                            QuerySochValue::Bool(b) => b.to_string(),
1132                            _ => return false,
1133                        };
1134
1135                        match *op {
1136                            "=" => val_str == value_str,
1137                            "!=" => val_str != value_str,
1138                            ">" => val_str.as_str() > value_str,
1139                            "<" => (val_str.as_str()) < value_str,
1140                            ">=" => val_str.as_str() >= value_str,
1141                            "<=" => val_str.as_str() <= value_str,
1142                            _ => false,
1143                        }
1144                    } else {
1145                        false
1146                    }
1147                })
1148                .cloned()
1149                .collect();
1150        }
1151    }
1152
1153    rows.to_vec()
1154}
1155
1156/// Euclidean distance between two vectors.
1157fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
1158    a.iter()
1159        .zip(b.iter())
1160        .map(|(x, y)| (x - y) * (x - y))
1161        .sum::<f32>()
1162        .sqrt()
1163}
1164
1165/// Convert a SQL literal to `CoreSochValue`.
1166fn literal_to_core(lit: &Literal) -> CoreSochValue {
1167    match lit {
1168        Literal::Integer(i) => CoreSochValue::Int(*i),
1169        Literal::Float(f) => CoreSochValue::Float(*f),
1170        Literal::String(s) => CoreSochValue::Text(s.clone()),
1171        Literal::Boolean(b) => CoreSochValue::Bool(*b),
1172        Literal::Null => CoreSochValue::Null,
1173        Literal::Blob(b) => CoreSochValue::Binary(b.clone()),
1174    }
1175}
1176
1177/// Evaluate a binary operation on two `CoreSochValue`s.
1178fn eval_binary_op(lhs: &CoreSochValue, op: &BinaryOperator, rhs: &CoreSochValue) -> CoreSochValue {
1179    match op {
1180        BinaryOperator::Eq => CoreSochValue::Bool(lhs == rhs),
1181        BinaryOperator::Ne => CoreSochValue::Bool(lhs != rhs),
1182        BinaryOperator::Lt => {
1183            CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Less)
1184        }
1185        BinaryOperator::Gt => {
1186            CoreSochValue::Bool(compare_values(lhs, rhs) == std::cmp::Ordering::Greater)
1187        }
1188        BinaryOperator::Le => {
1189            CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Greater)
1190        }
1191        BinaryOperator::Ge => {
1192            CoreSochValue::Bool(compare_values(lhs, rhs) != std::cmp::Ordering::Less)
1193        }
1194        BinaryOperator::And => {
1195            let a = matches!(lhs, CoreSochValue::Bool(true));
1196            let b = matches!(rhs, CoreSochValue::Bool(true));
1197            CoreSochValue::Bool(a && b)
1198        }
1199        BinaryOperator::Or => {
1200            let a = matches!(lhs, CoreSochValue::Bool(true));
1201            let b = matches!(rhs, CoreSochValue::Bool(true));
1202            CoreSochValue::Bool(a || b)
1203        }
1204        BinaryOperator::Plus => numeric_op(lhs, rhs, |a, b| a + b, |a, b| a + b),
1205        BinaryOperator::Minus => numeric_op(lhs, rhs, |a, b| a - b, |a, b| a - b),
1206        BinaryOperator::Multiply => numeric_op(lhs, rhs, |a, b| a * b, |a, b| a * b),
1207        // Division/modulo by zero yields SQL NULL, not a silent 0. Returning 0
1208        // here corrupted results: `WHERE x / 0 > 1` evaluated to `0 > 1`
1209        // (false) and `UPDATE SET z = a / 0` stored 0. NULL also matches the
1210        // aggregate-path evaluator (sql/aggregate.rs eval_binary).
1211        BinaryOperator::Divide => {
1212            if is_numeric_zero(rhs) {
1213                CoreSochValue::Null
1214            } else {
1215                numeric_op(lhs, rhs, |a, b| a / b, |a, b| a / b)
1216            }
1217        }
1218        BinaryOperator::Modulo => {
1219            if is_numeric_zero(rhs) {
1220                CoreSochValue::Null
1221            } else {
1222                numeric_op(lhs, rhs, |a, b| a % b, |a, b| a % b)
1223            }
1224        }
1225        BinaryOperator::Like => {
1226            if let (CoreSochValue::Text(s), CoreSochValue::Text(pattern)) = (lhs, rhs) {
1227                CoreSochValue::Bool(sql_like_match(s, pattern))
1228            } else {
1229                CoreSochValue::Bool(false)
1230            }
1231        }
1232        BinaryOperator::Concat => {
1233            let a = value_to_string(lhs);
1234            let b = value_to_string(rhs);
1235            CoreSochValue::Text(format!("{}{}", a, b))
1236        }
1237        _ => CoreSochValue::Null,
1238    }
1239}
1240
1241/// Evaluate a unary operation.
1242fn eval_unary_op(op: &UnaryOperator, val: &CoreSochValue) -> CoreSochValue {
1243    match op {
1244        UnaryOperator::Not => match val {
1245            CoreSochValue::Bool(b) => CoreSochValue::Bool(!b),
1246            _ => CoreSochValue::Null,
1247        },
1248        UnaryOperator::Minus => match val {
1249            CoreSochValue::Int(i) => CoreSochValue::Int(-i),
1250            CoreSochValue::Float(f) => CoreSochValue::Float(-f),
1251            _ => CoreSochValue::Null,
1252        },
1253        UnaryOperator::Plus => val.clone(),
1254        _ => CoreSochValue::Null,
1255    }
1256}
1257
1258/// Numeric operation helper.
1259/// True if the value is a numeric zero (used to guard divide/modulo, which
1260/// must yield SQL NULL rather than a silent 0 on a zero divisor).
1261fn is_numeric_zero(v: &CoreSochValue) -> bool {
1262    match v {
1263        CoreSochValue::Int(0) | CoreSochValue::UInt(0) => true,
1264        CoreSochValue::Float(f) => *f == 0.0,
1265        _ => false,
1266    }
1267}
1268
1269fn numeric_op(
1270    lhs: &CoreSochValue,
1271    rhs: &CoreSochValue,
1272    int_op: impl Fn(i64, i64) -> i64,
1273    float_op: impl Fn(f64, f64) -> f64,
1274) -> CoreSochValue {
1275    match (lhs, rhs) {
1276        (CoreSochValue::Int(a), CoreSochValue::Int(b)) => CoreSochValue::Int(int_op(*a, *b)),
1277        (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1278            CoreSochValue::Float(float_op(*a, *b))
1279        }
1280        (CoreSochValue::Int(a), CoreSochValue::Float(b)) => {
1281            CoreSochValue::Float(float_op(*a as f64, *b))
1282        }
1283        (CoreSochValue::Float(a), CoreSochValue::Int(b)) => {
1284            CoreSochValue::Float(float_op(*a, *b as f64))
1285        }
1286        (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => {
1287            CoreSochValue::Int(int_op(*a as i64, *b as i64))
1288        }
1289        _ => CoreSochValue::Null,
1290    }
1291}
1292
1293/// Compare two `CoreSochValue`s for ordering.
1294fn compare_values(a: &CoreSochValue, b: &CoreSochValue) -> std::cmp::Ordering {
1295    match (a, b) {
1296        (CoreSochValue::Int(a), CoreSochValue::Int(b)) => a.cmp(b),
1297        (CoreSochValue::UInt(a), CoreSochValue::UInt(b)) => a.cmp(b),
1298        (CoreSochValue::Float(a), CoreSochValue::Float(b)) => {
1299            a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
1300        }
1301        (CoreSochValue::Text(a), CoreSochValue::Text(b)) => a.cmp(b),
1302        (CoreSochValue::Int(a), CoreSochValue::Float(b)) => (*a as f64)
1303            .partial_cmp(b)
1304            .unwrap_or(std::cmp::Ordering::Equal),
1305        (CoreSochValue::Float(a), CoreSochValue::Int(b)) => a
1306            .partial_cmp(&(*b as f64))
1307            .unwrap_or(std::cmp::Ordering::Equal),
1308        (CoreSochValue::Null, CoreSochValue::Null) => std::cmp::Ordering::Equal,
1309        (CoreSochValue::Null, _) => std::cmp::Ordering::Less,
1310        (_, CoreSochValue::Null) => std::cmp::Ordering::Greater,
1311        _ => std::cmp::Ordering::Equal,
1312    }
1313}
1314
1315/// Compare optional CoreSochValues for ORDER BY.
1316fn compare_optional_values(
1317    a: Option<&CoreSochValue>,
1318    b: Option<&CoreSochValue>,
1319) -> std::cmp::Ordering {
1320    match (a, b) {
1321        (Some(a), Some(b)) => compare_values(a, b),
1322        (None, Some(_)) => std::cmp::Ordering::Less,
1323        (Some(_), None) => std::cmp::Ordering::Greater,
1324        (None, None) => std::cmp::Ordering::Equal,
1325    }
1326}
1327
1328/// Extract column name from an ORDER BY expression.
1329fn extract_order_by_column(expr: &Expr) -> String {
1330    match expr {
1331        Expr::Column(col_ref) => col_ref.column.clone(),
1332        _ => String::new(),
1333    }
1334}
1335
1336/// Check if two rows have the same values.
1337fn rows_equal(a: &HashMap<String, CoreSochValue>, b: &HashMap<String, CoreSochValue>) -> bool {
1338    if a.len() != b.len() {
1339        return false;
1340    }
1341    a.iter().all(|(k, v)| b.get(k) == Some(v))
1342}
1343
1344/// Convert a value to string representation.
1345fn value_to_string(v: &CoreSochValue) -> String {
1346    match v {
1347        CoreSochValue::Text(s) => s.clone(),
1348        CoreSochValue::Int(i) => i.to_string(),
1349        CoreSochValue::UInt(u) => u.to_string(),
1350        CoreSochValue::Float(f) => f.to_string(),
1351        CoreSochValue::Bool(b) => b.to_string(),
1352        CoreSochValue::Null => "NULL".to_string(),
1353        _ => String::new(),
1354    }
1355}
1356
1357/// SQL LIKE pattern matching.
1358///
1359/// Delegates to the canonical [`crate::like::like_match`] so that `LIKE`
1360/// behaves identically across every query path.
1361fn sql_like_match(s: &str, pattern: &str) -> bool {
1362    crate::like::like_match(s, pattern)
1363}
1364
1365/// Convert SQL data type to storage column type.
1366fn sql_type_to_db_type(dt: &DataType) -> sochdb_storage::DbColumnType {
1367    use sochdb_storage::DbColumnType;
1368    match dt {
1369        DataType::TinyInt | DataType::SmallInt | DataType::Int | DataType::BigInt => {
1370            DbColumnType::Int64
1371        }
1372        DataType::Float | DataType::Double | DataType::Decimal { .. } => DbColumnType::Float64,
1373        DataType::Boolean => DbColumnType::Bool,
1374        DataType::Binary(_) | DataType::Varbinary(_) | DataType::Blob => DbColumnType::Binary,
1375        // All other types (text, date, json, custom, vector) → Text
1376        _ => DbColumnType::Text,
1377    }
1378}
1379
1380// ============================================================================
1381// NamespacedSqlConnection — Namespace-Prefixed Storage Isolation (P3.2)
1382// ============================================================================
1383
1384/// A wrapper around any `SqlConnection` that prefixes table names with
1385/// `namespace:database:` to provide storage-level isolation between tenants.
1386///
1387/// ## Key Isolation
1388///
1389/// All storage keys are prefixed: `ns:db:table/row_id` instead of `table/row_id`.
1390/// This prevents any cross-namespace data leakage at the storage layer.
1391///
1392/// ## Example
1393///
1394/// ```text
1395/// // Without namespace prefix:
1396/// table key = "users/1"
1397///
1398/// // With namespace prefix (namespace="prod", database="app"):
1399/// table key = "prod:app:users/1"
1400/// ```
1401pub struct NamespacedSqlConnection<C: SqlConnection> {
1402    inner: C,
1403    namespace: String,
1404    database: String,
1405}
1406
1407impl<C: SqlConnection> NamespacedSqlConnection<C> {
1408    /// Create a new namespaced connection.
1409    pub fn new(inner: C, namespace: impl Into<String>, database: impl Into<String>) -> Self {
1410        Self {
1411            inner,
1412            namespace: namespace.into(),
1413            database: database.into(),
1414        }
1415    }
1416
1417    /// Prefix a table name with namespace:database:
1418    fn prefix_table(&self, table: &str) -> String {
1419        format!("{}:{}:{}", self.namespace, self.database, table)
1420    }
1421
1422    /// Get the namespace.
1423    pub fn namespace(&self) -> &str {
1424        &self.namespace
1425    }
1426
1427    /// Get the database.
1428    pub fn database(&self) -> &str {
1429        &self.database
1430    }
1431
1432    /// Get a reference to the inner connection.
1433    pub fn inner(&self) -> &C {
1434        &self.inner
1435    }
1436
1437    /// Get a mutable reference to the inner connection.
1438    pub fn inner_mut(&mut self) -> &mut C {
1439        &mut self.inner
1440    }
1441}
1442
1443impl<C: SqlConnection> SqlConnection for NamespacedSqlConnection<C> {
1444    fn select(
1445        &self,
1446        table: &str,
1447        columns: &[String],
1448        where_clause: Option<&Expr>,
1449        order_by: &[OrderByItem],
1450        limit: Option<usize>,
1451        offset: Option<usize>,
1452        params: &[CoreSochValue],
1453    ) -> SqlResult<ExecutionResult> {
1454        self.inner.select(
1455            &self.prefix_table(table),
1456            columns,
1457            where_clause,
1458            order_by,
1459            limit,
1460            offset,
1461            params,
1462        )
1463    }
1464
1465    fn insert(
1466        &mut self,
1467        table: &str,
1468        columns: Option<&[String]>,
1469        rows: &[Vec<Expr>],
1470        on_conflict: Option<&OnConflict>,
1471        params: &[CoreSochValue],
1472    ) -> SqlResult<ExecutionResult> {
1473        self.inner.insert(
1474            &self.prefix_table(table),
1475            columns,
1476            rows,
1477            on_conflict,
1478            params,
1479        )
1480    }
1481
1482    fn update(
1483        &mut self,
1484        table: &str,
1485        assignments: &[Assignment],
1486        where_clause: Option<&Expr>,
1487        params: &[CoreSochValue],
1488    ) -> SqlResult<ExecutionResult> {
1489        self.inner
1490            .update(&self.prefix_table(table), assignments, where_clause, params)
1491    }
1492
1493    fn delete(
1494        &mut self,
1495        table: &str,
1496        where_clause: Option<&Expr>,
1497        params: &[CoreSochValue],
1498    ) -> SqlResult<ExecutionResult> {
1499        self.inner
1500            .delete(&self.prefix_table(table), where_clause, params)
1501    }
1502
1503    fn create_table(&mut self, stmt: &CreateTableStmt) -> SqlResult<ExecutionResult> {
1504        // Create a modified statement with prefixed table name
1505        let mut prefixed = stmt.clone();
1506        let original_name = stmt.name.name().to_string();
1507        prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1508        self.inner.create_table(&prefixed)
1509    }
1510
1511    fn drop_table(&mut self, stmt: &DropTableStmt) -> SqlResult<ExecutionResult> {
1512        let mut prefixed = stmt.clone();
1513        prefixed.names = stmt
1514            .names
1515            .iter()
1516            .map(|n| ObjectName::new(self.prefix_table(n.name())))
1517            .collect();
1518        self.inner.drop_table(&prefixed)
1519    }
1520
1521    fn create_index(&mut self, stmt: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
1522        self.inner.create_index(stmt)
1523    }
1524
1525    fn drop_index(&mut self, stmt: &DropIndexStmt) -> SqlResult<ExecutionResult> {
1526        self.inner.drop_index(stmt)
1527    }
1528
1529    fn alter_table(&mut self, stmt: &AlterTableStmt) -> SqlResult<ExecutionResult> {
1530        let mut prefixed = stmt.clone();
1531        let original_name = stmt.name.name().to_string();
1532        prefixed.name = ObjectName::new(self.prefix_table(&original_name));
1533        self.inner.alter_table(&prefixed)
1534    }
1535
1536    fn begin(&mut self, stmt: &BeginStmt) -> SqlResult<ExecutionResult> {
1537        self.inner.begin(stmt)
1538    }
1539
1540    fn commit(&mut self) -> SqlResult<ExecutionResult> {
1541        self.inner.commit()
1542    }
1543
1544    fn rollback(&mut self, savepoint: Option<&str>) -> SqlResult<ExecutionResult> {
1545        self.inner.rollback(savepoint)
1546    }
1547
1548    fn table_exists(&self, table: &str) -> SqlResult<bool> {
1549        self.inner.table_exists(&self.prefix_table(table))
1550    }
1551
1552    fn index_exists(&self, index: &str) -> SqlResult<bool> {
1553        self.inner.index_exists(index)
1554    }
1555
1556    fn scan_all(
1557        &self,
1558        table: &str,
1559        columns: &[String],
1560    ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
1561        self.inner.scan_all(&self.prefix_table(table), columns)
1562    }
1563
1564    fn eval_join_predicate(
1565        &self,
1566        expr: &Expr,
1567        row: &HashMap<String, CoreSochValue>,
1568        params: &[CoreSochValue],
1569    ) -> Option<bool> {
1570        self.inner.eval_join_predicate(expr, row, params)
1571    }
1572}
1573
1574// ============================================================================
1575// Tests
1576// ============================================================================
1577
1578#[cfg(test)]
1579mod tests {
1580    use super::*;
1581
1582    #[test]
1583    #[allow(clippy::approx_constant)] // 3.14 is arbitrary test data, not PI
1584    fn test_convert_core_to_query_basic_types() {
1585        assert_eq!(
1586            convert_core_to_query(&CoreSochValue::Null),
1587            QuerySochValue::Null
1588        );
1589        assert_eq!(
1590            convert_core_to_query(&CoreSochValue::Bool(true)),
1591            QuerySochValue::Bool(true)
1592        );
1593        assert_eq!(
1594            convert_core_to_query(&CoreSochValue::Int(42)),
1595            QuerySochValue::Int(42)
1596        );
1597        assert_eq!(
1598            convert_core_to_query(&CoreSochValue::UInt(100)),
1599            QuerySochValue::UInt(100)
1600        );
1601        assert_eq!(
1602            convert_core_to_query(&CoreSochValue::Float(3.14)),
1603            QuerySochValue::Float(3.14)
1604        );
1605        assert_eq!(
1606            convert_core_to_query(&CoreSochValue::Text("hello".into())),
1607            QuerySochValue::Text("hello".into())
1608        );
1609    }
1610
1611    #[test]
1612    fn test_convert_core_to_query_object() {
1613        let mut map = HashMap::new();
1614        map.insert("name".to_string(), CoreSochValue::Text("Alice".into()));
1615        let result = convert_core_to_query(&CoreSochValue::Object(map));
1616        match result {
1617            QuerySochValue::Text(s) => assert!(s.contains("Alice")),
1618            _ => panic!("Expected Text for Object conversion"),
1619        }
1620    }
1621
1622    #[test]
1623    fn test_convert_core_to_query_ref() {
1624        let result = convert_core_to_query(&CoreSochValue::Ref {
1625            table: "users".into(),
1626            id: 42,
1627        });
1628        assert_eq!(result, QuerySochValue::Text("users/42".into()));
1629    }
1630
1631    #[test]
1632    fn test_convert_roundtrip() {
1633        let original = QuerySochValue::Int(42);
1634        let core = convert_query_to_core(&original);
1635        let back = convert_core_to_query(&core);
1636        assert_eq!(original, back);
1637    }
1638
1639    #[test]
1640    fn test_apply_simple_predicate_eq() {
1641        let rows = vec![
1642            {
1643                let mut m = HashMap::new();
1644                m.insert("name".into(), QuerySochValue::Text("Alice".into()));
1645                m.insert("age".into(), QuerySochValue::Int(30));
1646                m
1647            },
1648            {
1649                let mut m = HashMap::new();
1650                m.insert("name".into(), QuerySochValue::Text("Bob".into()));
1651                m.insert("age".into(), QuerySochValue::Int(25));
1652                m
1653            },
1654        ];
1655
1656        let filtered = apply_simple_predicate(&rows, "name = Alice");
1657        assert_eq!(filtered.len(), 1);
1658        assert_eq!(
1659            filtered[0].get("name"),
1660            Some(&QuerySochValue::Text("Alice".into()))
1661        );
1662    }
1663
1664    #[test]
1665    fn test_apply_simple_predicate_neq() {
1666        let rows = vec![
1667            {
1668                let mut m = HashMap::new();
1669                m.insert("status".into(), QuerySochValue::Text("active".into()));
1670                m
1671            },
1672            {
1673                let mut m = HashMap::new();
1674                m.insert("status".into(), QuerySochValue::Text("inactive".into()));
1675                m
1676            },
1677        ];
1678
1679        let filtered = apply_simple_predicate(&rows, "status != active");
1680        assert_eq!(filtered.len(), 1);
1681        assert_eq!(
1682            filtered[0].get("status"),
1683            Some(&QuerySochValue::Text("inactive".into()))
1684        );
1685    }
1686
1687    #[test]
1688    #[allow(clippy::approx_constant)] // 3.14 is arbitrary test data, not PI
1689    fn test_literal_to_core() {
1690        assert_eq!(
1691            literal_to_core(&Literal::Integer(42)),
1692            CoreSochValue::Int(42)
1693        );
1694        assert_eq!(
1695            literal_to_core(&Literal::Float(3.14)),
1696            CoreSochValue::Float(3.14)
1697        );
1698        assert_eq!(
1699            literal_to_core(&Literal::String("hi".into())),
1700            CoreSochValue::Text("hi".into())
1701        );
1702        assert_eq!(
1703            literal_to_core(&Literal::Boolean(true)),
1704            CoreSochValue::Bool(true)
1705        );
1706        assert_eq!(literal_to_core(&Literal::Null), CoreSochValue::Null);
1707    }
1708
1709    #[test]
1710    fn test_euclidean_distance() {
1711        let a = vec![1.0, 0.0, 0.0];
1712        let b = vec![0.0, 1.0, 0.0];
1713        let dist = euclidean_distance(&a, &b);
1714        assert!((dist - std::f32::consts::SQRT_2).abs() < 1e-6);
1715    }
1716
1717    #[test]
1718    fn test_compare_values() {
1719        assert_eq!(
1720            compare_values(&CoreSochValue::Int(1), &CoreSochValue::Int(2)),
1721            std::cmp::Ordering::Less
1722        );
1723        assert_eq!(
1724            compare_values(
1725                &CoreSochValue::Text("a".into()),
1726                &CoreSochValue::Text("b".into())
1727            ),
1728            std::cmp::Ordering::Less
1729        );
1730        assert_eq!(
1731            compare_values(&CoreSochValue::Null, &CoreSochValue::Int(1)),
1732            std::cmp::Ordering::Less
1733        );
1734    }
1735
1736    #[test]
1737    fn test_eval_binary_op() {
1738        assert_eq!(
1739            eval_binary_op(
1740                &CoreSochValue::Int(10),
1741                &BinaryOperator::Plus,
1742                &CoreSochValue::Int(5)
1743            ),
1744            CoreSochValue::Int(15)
1745        );
1746        assert_eq!(
1747            eval_binary_op(
1748                &CoreSochValue::Int(10),
1749                &BinaryOperator::Eq,
1750                &CoreSochValue::Int(10)
1751            ),
1752            CoreSochValue::Bool(true)
1753        );
1754        assert_eq!(
1755            eval_binary_op(
1756                &CoreSochValue::Text("hello".into()),
1757                &BinaryOperator::Concat,
1758                &CoreSochValue::Text(" world".into())
1759            ),
1760            CoreSochValue::Text("hello world".into())
1761        );
1762    }
1763
1764    #[test]
1765    fn test_sql_like_match() {
1766        assert!(sql_like_match("hello", "hello"));
1767        assert!(sql_like_match("hello", "%llo"));
1768        assert!(sql_like_match("hello", "h%o"));
1769        assert!(sql_like_match("hello", "h_llo"));
1770        assert!(!sql_like_match("hello", "world"));
1771        // Test with regex special characters in the data
1772        assert!(sql_like_match("file.txt", "file%"));
1773        assert!(sql_like_match("test(1)", "%(%"));
1774    }
1775
1776    #[test]
1777    fn test_sql_type_to_db_type() {
1778        use sochdb_storage::DbColumnType;
1779        assert_eq!(sql_type_to_db_type(&DataType::Int), DbColumnType::Int64);
1780        assert_eq!(sql_type_to_db_type(&DataType::BigInt), DbColumnType::Int64);
1781        assert_eq!(sql_type_to_db_type(&DataType::Float), DbColumnType::Float64);
1782        assert_eq!(sql_type_to_db_type(&DataType::Boolean), DbColumnType::Bool);
1783        assert_eq!(sql_type_to_db_type(&DataType::Blob), DbColumnType::Binary);
1784        assert_eq!(sql_type_to_db_type(&DataType::Text), DbColumnType::Text);
1785        assert_eq!(
1786            sql_type_to_db_type(&DataType::Varchar(Some(255))),
1787            DbColumnType::Text
1788        );
1789    }
1790
1791    // =========================================================================
1792    // Integration tests: Full round-trip through real Database
1793    // =========================================================================
1794
1795    fn setup_test_db() -> (std::sync::Arc<sochdb_storage::Database>, tempfile::TempDir) {
1796        let tmp = tempfile::tempdir().expect("tmpdir");
1797        let db = sochdb_storage::Database::open(tmp.path()).expect("open db");
1798        (db, tmp)
1799    }
1800
1801    #[test]
1802    fn test_integration_storage_backend_table_scan() {
1803        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1804        let (db, _tmp) = setup_test_db();
1805
1806        // Register table
1807        db.register_table(DbTableSchema {
1808            name: "users".into(),
1809            columns: vec![
1810                DbColumnDef {
1811                    name: "id".into(),
1812                    col_type: DbColumnType::Int64,
1813                    nullable: false,
1814                },
1815                DbColumnDef {
1816                    name: "name".into(),
1817                    col_type: DbColumnType::Text,
1818                    nullable: false,
1819                },
1820                DbColumnDef {
1821                    name: "age".into(),
1822                    col_type: DbColumnType::Int64,
1823                    nullable: true,
1824                },
1825            ],
1826        })
1827        .expect("register table");
1828
1829        // Insert rows via raw storage API
1830        let txn = db.begin_transaction().expect("begin txn");
1831        let mut vals = std::collections::HashMap::new();
1832        vals.insert("id".into(), CoreSochValue::Int(1));
1833        vals.insert("name".into(), CoreSochValue::Text("Alice".into()));
1834        vals.insert("age".into(), CoreSochValue::Int(30));
1835        db.insert_row(txn, "users", 1, &vals).expect("insert 1");
1836
1837        vals.clear();
1838        vals.insert("id".into(), CoreSochValue::Int(2));
1839        vals.insert("name".into(), CoreSochValue::Text("Bob".into()));
1840        vals.insert("age".into(), CoreSochValue::Int(25));
1841        db.insert_row(txn, "users", 2, &vals).expect("insert 2");
1842        db.commit(txn).expect("commit");
1843
1844        // Query via DatabaseStorageBackend
1845        let backend = DatabaseStorageBackend::new(db.clone());
1846        let rows = backend
1847            .table_scan("users", &["id".into(), "name".into(), "age".into()], None)
1848            .expect("table_scan");
1849
1850        assert_eq!(rows.len(), 2);
1851        // Verify data came through the conversion pipeline
1852        let names: Vec<_> = rows
1853            .iter()
1854            .filter_map(|r| match r.get("name") {
1855                Some(crate::soch_ql::SochValue::Text(t)) => Some(t.clone()),
1856                _ => None,
1857            })
1858            .collect();
1859        assert!(names.contains(&"Alice".to_string()));
1860        assert!(names.contains(&"Bob".to_string()));
1861    }
1862
1863    #[test]
1864    fn test_integration_storage_backend_primary_key_lookup() {
1865        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1866        let (db, _tmp) = setup_test_db();
1867
1868        db.register_table(DbTableSchema {
1869            name: "items".into(),
1870            columns: vec![
1871                DbColumnDef {
1872                    name: "id".into(),
1873                    col_type: DbColumnType::Int64,
1874                    nullable: false,
1875                },
1876                DbColumnDef {
1877                    name: "label".into(),
1878                    col_type: DbColumnType::Text,
1879                    nullable: false,
1880                },
1881            ],
1882        })
1883        .expect("register");
1884
1885        let txn = db.begin_transaction().expect("txn");
1886        let mut v = std::collections::HashMap::new();
1887        v.insert("id".into(), CoreSochValue::Int(42));
1888        v.insert("label".into(), CoreSochValue::Text("answer".into()));
1889        db.insert_row(txn, "items", 42, &v).expect("insert");
1890        db.commit(txn).expect("commit");
1891
1892        let backend = DatabaseStorageBackend::new(db.clone());
1893        let row = backend
1894            .primary_key_lookup("items", &crate::soch_ql::SochValue::Int(42))
1895            .expect("pk lookup");
1896        assert!(row.is_some());
1897        let row = row.unwrap();
1898        assert_eq!(
1899            row.get("label"),
1900            Some(&crate::soch_ql::SochValue::Text("answer".into()))
1901        );
1902    }
1903
1904    #[test]
1905    fn test_integration_sochql_executor_reads_storage() {
1906        use sochdb_core::{Catalog, SochSchema, SochType};
1907        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
1908        let (db, _tmp) = setup_test_db();
1909
1910        // Register table in storage
1911        db.register_table(DbTableSchema {
1912            name: "events".into(),
1913            columns: vec![
1914                DbColumnDef {
1915                    name: "id".into(),
1916                    col_type: DbColumnType::Int64,
1917                    nullable: false,
1918                },
1919                DbColumnDef {
1920                    name: "kind".into(),
1921                    col_type: DbColumnType::Text,
1922                    nullable: false,
1923                },
1924                DbColumnDef {
1925                    name: "score".into(),
1926                    col_type: DbColumnType::Float64,
1927                    nullable: true,
1928                },
1929            ],
1930        })
1931        .expect("register events");
1932
1933        // Insert data
1934        let txn = db.begin_transaction().expect("txn");
1935        for i in 1..=5u64 {
1936            let mut vals = std::collections::HashMap::new();
1937            vals.insert("id".into(), CoreSochValue::Int(i as i64));
1938            vals.insert("kind".into(), CoreSochValue::Text(format!("event_{}", i)));
1939            vals.insert("score".into(), CoreSochValue::Float(i as f64 * 1.5));
1940            db.insert_row(txn, "events", i, &vals).expect("insert");
1941        }
1942        db.commit(txn).expect("commit");
1943
1944        // Build catalog (needed by SochQlExecutor for validation)
1945        let mut catalog = Catalog::new("test");
1946        let schema = SochSchema {
1947            name: "events".into(),
1948            fields: vec![
1949                sochdb_core::SochField {
1950                    name: "id".into(),
1951                    field_type: SochType::Int,
1952                    nullable: false,
1953                    default: None,
1954                },
1955                sochdb_core::SochField {
1956                    name: "kind".into(),
1957                    field_type: SochType::Text,
1958                    nullable: false,
1959                    default: None,
1960                },
1961                sochdb_core::SochField {
1962                    name: "score".into(),
1963                    field_type: SochType::Float,
1964                    nullable: true,
1965                    default: None,
1966                },
1967            ],
1968            primary_key: None,
1969            indexes: vec![],
1970        };
1971        catalog.create_table(schema, 0).expect("register catalog");
1972
1973        // Execute via SochQlExecutor with storage backend
1974        let backend = std::sync::Arc::new(DatabaseStorageBackend::new(db.clone()));
1975        let executor = crate::soch_ql_executor::SochQlExecutor::with_storage(backend);
1976        let result = executor
1977            .execute("SELECT * FROM events", &catalog)
1978            .expect("select *");
1979
1980        // Phase 0 success: we actually get rows from storage!
1981        assert_eq!(
1982            result.rows.len(),
1983            5,
1984            "Expected 5 rows from storage, got {}",
1985            result.rows.len()
1986        );
1987        assert_eq!(result.columns, vec!["id", "kind", "score"]);
1988
1989        // Verify data integrity
1990        let first_row_kind = &result.rows[0];
1991        // Rows should have 3 values each (id, kind, score)
1992        assert_eq!(first_row_kind.len(), 3);
1993    }
1994
1995    #[test]
1996    fn test_integration_sql_connection_crud() {
1997        use crate::sql::bridge::SqlConnection;
1998
1999        let (db, _tmp) = setup_test_db();
2000        let mut conn = DatabaseSqlConnection::new(db.clone());
2001
2002        // CREATE TABLE via SQL connection
2003        let create = CreateTableStmt {
2004            span: crate::sql::token::Span::new(0, 0, 0, 0),
2005            name: crate::sql::ast::ObjectName::new("products"),
2006            columns: vec![
2007                crate::sql::ast::ColumnDef {
2008                    name: "id".into(),
2009                    data_type: DataType::Int,
2010                    constraints: vec![],
2011                },
2012                crate::sql::ast::ColumnDef {
2013                    name: "name".into(),
2014                    data_type: DataType::Text,
2015                    constraints: vec![],
2016                },
2017                crate::sql::ast::ColumnDef {
2018                    name: "price".into(),
2019                    data_type: DataType::Float,
2020                    constraints: vec![],
2021                },
2022            ],
2023            if_not_exists: false,
2024            constraints: vec![],
2025            options: vec![],
2026        };
2027        let result = conn.create_table(&create).expect("create table");
2028        assert!(matches!(result, crate::sql::bridge::ExecutionResult::Ok));
2029
2030        // BEGIN
2031        let begin_stmt = crate::sql::ast::BeginStmt {
2032            isolation_level: None,
2033            read_only: false,
2034        };
2035        let result = conn.begin(&begin_stmt).expect("begin");
2036        assert!(matches!(
2037            result,
2038            crate::sql::bridge::ExecutionResult::TransactionOk
2039        ));
2040
2041        // INSERT via decomposed SqlConnection::insert
2042        let values = vec![vec![
2043            Expr::Literal(Literal::Integer(1)),
2044            Expr::Literal(Literal::String("Widget".into())),
2045            Expr::Literal(Literal::Float(9.99)),
2046        ]];
2047        let cols = vec!["id".into(), "name".into(), "price".into()];
2048        let result = conn
2049            .insert("products", Some(&cols), &values, None, &[])
2050            .expect("insert");
2051        match &result {
2052            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2053            _ => panic!("Expected RowsAffected, got {:?}", result),
2054        }
2055
2056        // COMMIT
2057        let result = conn.commit().expect("commit");
2058        assert!(matches!(
2059            result,
2060            crate::sql::bridge::ExecutionResult::TransactionOk
2061        ));
2062
2063        // SELECT via decomposed SqlConnection::select
2064        let columns = vec!["name".into()];
2065        let result = conn
2066            .select("products", &columns, None, &[], None, None, &[])
2067            .expect("select");
2068        match result {
2069            crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
2070                assert!(!rows.is_empty(), "Expected rows from SELECT");
2071                assert_eq!(columns, vec!["name"]);
2072                // Verify data round-trips: rows are Vec<HashMap<String, SochValue>>
2073                let name = rows[0].get("name").expect("name column");
2074                match name {
2075                    CoreSochValue::Text(s) => assert_eq!(s, "Widget"),
2076                    other => panic!("Expected Text, got {:?}", other),
2077                }
2078            }
2079            other => panic!("Expected Rows, got {:?}", other),
2080        }
2081    }
2082
2083    #[test]
2084    fn test_integration_row_count() {
2085        use sochdb_storage::{DbColumnDef, DbColumnType, DbTableSchema};
2086        let (db, _tmp) = setup_test_db();
2087
2088        db.register_table(DbTableSchema {
2089            name: "metrics".into(),
2090            columns: vec![
2091                DbColumnDef {
2092                    name: "ts".into(),
2093                    col_type: DbColumnType::UInt64,
2094                    nullable: false,
2095                },
2096                DbColumnDef {
2097                    name: "val".into(),
2098                    col_type: DbColumnType::Float64,
2099                    nullable: false,
2100                },
2101            ],
2102        })
2103        .expect("register");
2104
2105        let txn = db.begin_transaction().expect("txn");
2106        for i in 0..10u64 {
2107            let mut v = std::collections::HashMap::new();
2108            v.insert("ts".into(), CoreSochValue::UInt(i));
2109            v.insert("val".into(), CoreSochValue::Float(i as f64));
2110            db.insert_row(txn, "metrics", i, &v).expect("insert");
2111        }
2112        db.commit(txn).expect("commit");
2113
2114        let backend = DatabaseStorageBackend::new(db.clone());
2115        assert_eq!(backend.row_count("metrics"), 10);
2116        assert_eq!(backend.row_count("nonexistent"), 0);
2117    }
2118
2119    // =======================================================================
2120    // Step 0d: End-to-end SQL text → SqlBridge → DatabaseSqlConnection tests
2121    // =======================================================================
2122
2123    #[test]
2124    fn test_sqlbridge_create_insert_select() {
2125        let (db, _tmp) = setup_test_db();
2126        let conn = DatabaseSqlConnection::new(db.clone());
2127        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2128
2129        // CREATE TABLE via raw SQL
2130        let r = bridge
2131            .execute("CREATE TABLE cities (id INT, name TEXT, pop FLOAT)")
2132            .unwrap();
2133        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2134
2135        // INSERT via raw SQL
2136        let r = bridge
2137            .execute("INSERT INTO cities (id, name, pop) VALUES (1, 'Tokyo', 13.96)")
2138            .unwrap();
2139        match &r {
2140            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2141            other => panic!("Expected RowsAffected, got {:?}", other),
2142        }
2143
2144        let r = bridge
2145            .execute("INSERT INTO cities (id, name, pop) VALUES (2, 'Delhi', 11.03)")
2146            .unwrap();
2147        match &r {
2148            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2149            other => panic!("Expected RowsAffected, got {:?}", other),
2150        }
2151
2152        // SELECT via raw SQL
2153        let r = bridge.execute("SELECT name, pop FROM cities").unwrap();
2154        match &r {
2155            crate::sql::bridge::ExecutionResult::Rows { columns, rows } => {
2156                assert_eq!(rows.len(), 2);
2157                assert!(columns.contains(&"name".to_string()));
2158            }
2159            other => panic!("Expected Rows, got {:?}", other),
2160        }
2161    }
2162
2163    #[test]
2164    fn test_sqlbridge_update_delete() {
2165        let (db, _tmp) = setup_test_db();
2166        let conn = DatabaseSqlConnection::new(db.clone());
2167        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2168
2169        bridge
2170            .execute("CREATE TABLE items (id INT, qty INT)")
2171            .unwrap();
2172        bridge
2173            .execute("INSERT INTO items (id, qty) VALUES (1, 10)")
2174            .unwrap();
2175        bridge
2176            .execute("INSERT INTO items (id, qty) VALUES (2, 20)")
2177            .unwrap();
2178        bridge
2179            .execute("INSERT INTO items (id, qty) VALUES (3, 30)")
2180            .unwrap();
2181
2182        // UPDATE
2183        let r = bridge
2184            .execute("UPDATE items SET qty = 99 WHERE id = 2")
2185            .unwrap();
2186        match &r {
2187            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2188            other => panic!("Expected RowsAffected for UPDATE, got {:?}", other),
2189        }
2190
2191        // DELETE
2192        let r = bridge.execute("DELETE FROM items WHERE id = 3").unwrap();
2193        match &r {
2194            crate::sql::bridge::ExecutionResult::RowsAffected(n) => assert_eq!(*n, 1),
2195            other => panic!("Expected RowsAffected for DELETE, got {:?}", other),
2196        }
2197
2198        // Verify remaining data
2199        let r = bridge.execute("SELECT id, qty FROM items").unwrap();
2200        match &r {
2201            crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2202                assert_eq!(rows.len(), 2, "Expected 2 rows after delete");
2203            }
2204            other => panic!("Expected Rows, got {:?}", other),
2205        }
2206    }
2207
2208    #[test]
2209    fn test_sqlbridge_transaction_commit() {
2210        let (db, _tmp) = setup_test_db();
2211        let conn = DatabaseSqlConnection::new(db.clone());
2212        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2213
2214        bridge
2215            .execute("CREATE TABLE txtest (id INT, val TEXT)")
2216            .unwrap();
2217
2218        // BEGIN / INSERT / COMMIT
2219        let r = bridge.execute("BEGIN").unwrap();
2220        assert!(matches!(
2221            r,
2222            crate::sql::bridge::ExecutionResult::TransactionOk
2223        ));
2224
2225        bridge
2226            .execute("INSERT INTO txtest (id, val) VALUES (1, 'committed')")
2227            .unwrap();
2228
2229        let r = bridge.execute("COMMIT").unwrap();
2230        assert!(matches!(
2231            r,
2232            crate::sql::bridge::ExecutionResult::TransactionOk
2233        ));
2234
2235        // Data should be visible
2236        let r = bridge.execute("SELECT val FROM txtest").unwrap();
2237        match &r {
2238            crate::sql::bridge::ExecutionResult::Rows { rows, .. } => {
2239                assert_eq!(rows.len(), 1);
2240            }
2241            other => panic!("Expected Rows, got {:?}", other),
2242        }
2243    }
2244
2245    #[test]
2246    fn test_sqlbridge_drop_table() {
2247        let (db, _tmp) = setup_test_db();
2248        let conn = DatabaseSqlConnection::new(db.clone());
2249        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2250
2251        bridge.execute("CREATE TABLE ephemeral (x INT)").unwrap();
2252        let r = bridge.execute("DROP TABLE ephemeral").unwrap();
2253        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2254
2255        // Verify DROP TABLE IF EXISTS also works
2256        let r = bridge.execute("DROP TABLE IF EXISTS ephemeral").unwrap();
2257        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2258    }
2259
2260    #[test]
2261    fn test_sqlbridge_if_not_exists() {
2262        let (db, _tmp) = setup_test_db();
2263        let conn = DatabaseSqlConnection::new(db.clone());
2264        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2265
2266        bridge.execute("CREATE TABLE dup (id INT)").unwrap();
2267        // Should not error with IF NOT EXISTS
2268        let r = bridge
2269            .execute("CREATE TABLE IF NOT EXISTS dup (id INT)")
2270            .unwrap();
2271        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2272    }
2273
2274    #[test]
2275    fn test_sqlbridge_alter_add_column() {
2276        let (db, _tmp) = setup_test_db();
2277        let conn = DatabaseSqlConnection::new(db.clone());
2278        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2279
2280        bridge
2281            .execute("CREATE TABLE alter_test (id INT, name TEXT)")
2282            .unwrap();
2283
2284        let r = bridge
2285            .execute("ALTER TABLE alter_test ADD COLUMN age INT")
2286            .unwrap();
2287        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2288
2289        // Verify schema was updated
2290        let schema = db.get_table_schema("alter_test").unwrap();
2291        assert_eq!(schema.columns.len(), 3);
2292        assert_eq!(schema.columns[2].name, "age");
2293    }
2294
2295    #[test]
2296    fn test_sqlbridge_alter_drop_column() {
2297        let (db, _tmp) = setup_test_db();
2298        let conn = DatabaseSqlConnection::new(db.clone());
2299        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2300
2301        bridge
2302            .execute("CREATE TABLE drop_col_test (id INT, name TEXT, age INT)")
2303            .unwrap();
2304
2305        let r = bridge
2306            .execute("ALTER TABLE drop_col_test DROP COLUMN age")
2307            .unwrap();
2308        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2309
2310        let schema = db.get_table_schema("drop_col_test").unwrap();
2311        assert_eq!(schema.columns.len(), 2);
2312        assert!(schema.columns.iter().all(|c| c.name != "age"));
2313    }
2314
2315    #[test]
2316    fn test_sqlbridge_alter_rename_column() {
2317        let (db, _tmp) = setup_test_db();
2318        let conn = DatabaseSqlConnection::new(db.clone());
2319        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2320
2321        bridge
2322            .execute("CREATE TABLE rename_col_test (id INT, name TEXT)")
2323            .unwrap();
2324
2325        let r = bridge
2326            .execute("ALTER TABLE rename_col_test RENAME COLUMN name TO full_name")
2327            .unwrap();
2328        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2329
2330        let schema = db.get_table_schema("rename_col_test").unwrap();
2331        assert_eq!(schema.columns[1].name, "full_name");
2332    }
2333
2334    #[test]
2335    fn test_sqlbridge_alter_rename_table() {
2336        let (db, _tmp) = setup_test_db();
2337        let conn = DatabaseSqlConnection::new(db.clone());
2338        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2339
2340        bridge.execute("CREATE TABLE old_name (id INT)").unwrap();
2341
2342        let r = bridge
2343            .execute("ALTER TABLE old_name RENAME TO new_name")
2344            .unwrap();
2345        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2346
2347        assert!(db.get_table_schema("old_name").is_none());
2348        assert!(db.get_table_schema("new_name").is_some());
2349    }
2350
2351    #[test]
2352    fn test_sqlbridge_alter_multiple_ops() {
2353        let (db, _tmp) = setup_test_db();
2354        let conn = DatabaseSqlConnection::new(db.clone());
2355        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2356
2357        bridge
2358            .execute("CREATE TABLE multi_alter (id INT, a TEXT, b TEXT)")
2359            .unwrap();
2360
2361        // Add column and drop column in one statement
2362        let r = bridge
2363            .execute("ALTER TABLE multi_alter ADD COLUMN c INT, DROP COLUMN b")
2364            .unwrap();
2365        assert!(matches!(r, crate::sql::bridge::ExecutionResult::Ok));
2366
2367        let schema = db.get_table_schema("multi_alter").unwrap();
2368        let col_names: Vec<&str> = schema.columns.iter().map(|c| c.name.as_str()).collect();
2369        assert_eq!(col_names, vec!["id", "a", "c"]);
2370    }
2371
2372    #[test]
2373    fn test_sqlbridge_alter_errors() {
2374        let (db, _tmp) = setup_test_db();
2375        let conn = DatabaseSqlConnection::new(db.clone());
2376        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2377
2378        bridge
2379            .execute("CREATE TABLE err_test (id INT, name TEXT)")
2380            .unwrap();
2381
2382        // Add duplicate column
2383        let r = bridge.execute("ALTER TABLE err_test ADD COLUMN name TEXT");
2384        assert!(r.is_err());
2385
2386        // Drop non-existent column
2387        let r = bridge.execute("ALTER TABLE err_test DROP COLUMN nonexistent");
2388        assert!(r.is_err());
2389
2390        // Alter non-existent table
2391        let r = bridge.execute("ALTER TABLE no_such_table ADD COLUMN x INT");
2392        assert!(r.is_err());
2393    }
2394
2395    // =======================================================================
2396    // JOIN Tests (Task 3 — Phase 3)
2397    // =======================================================================
2398
2399    /// Helper: set up two tables for join testing (users + orders)
2400    fn setup_join_tables(bridge: &mut crate::sql::bridge::SqlBridge<DatabaseSqlConnection>) {
2401        bridge
2402            .execute("CREATE TABLE users (id INT, name TEXT, dept TEXT)")
2403            .unwrap();
2404        bridge
2405            .execute("INSERT INTO users (id, name, dept) VALUES (1, 'Alice', 'eng')")
2406            .unwrap();
2407        bridge
2408            .execute("INSERT INTO users (id, name, dept) VALUES (2, 'Bob', 'sales')")
2409            .unwrap();
2410        bridge
2411            .execute("INSERT INTO users (id, name, dept) VALUES (3, 'Carol', 'eng')")
2412            .unwrap();
2413
2414        bridge
2415            .execute("CREATE TABLE orders (oid INT, user_id INT, amount FLOAT)")
2416            .unwrap();
2417        bridge
2418            .execute("INSERT INTO orders (oid, user_id, amount) VALUES (10, 1, 99.50)")
2419            .unwrap();
2420        bridge
2421            .execute("INSERT INTO orders (oid, user_id, amount) VALUES (11, 1, 45.00)")
2422            .unwrap();
2423        bridge
2424            .execute("INSERT INTO orders (oid, user_id, amount) VALUES (12, 2, 200.00)")
2425            .unwrap();
2426        // Note: Carol (id=3) has no orders; order 12 belongs to Bob (id=2)
2427    }
2428
2429    #[test]
2430    fn test_sqlbridge_inner_join() {
2431        let (db, _tmp) = setup_test_db();
2432        let conn = DatabaseSqlConnection::new(db.clone());
2433        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2434        setup_join_tables(&mut bridge);
2435
2436        let r = bridge.execute(
2437            "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id"
2438        ).unwrap();
2439
2440        let rows = r.rows().unwrap();
2441        assert_eq!(rows.len(), 3); // Alice×2, Bob×1
2442
2443        // Verify Alice appears twice
2444        let alice_rows: Vec<_> = rows
2445            .iter()
2446            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Alice".into())))
2447            .collect();
2448        assert_eq!(alice_rows.len(), 2);
2449
2450        // Verify Bob appears once
2451        let bob_rows: Vec<_> = rows
2452            .iter()
2453            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Bob".into())))
2454            .collect();
2455        assert_eq!(bob_rows.len(), 1);
2456
2457        // Carol should NOT appear (no matching orders)
2458        let carol_rows: Vec<_> = rows
2459            .iter()
2460            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2461            .collect();
2462        assert_eq!(carol_rows.len(), 0);
2463    }
2464
2465    #[test]
2466    fn test_sqlbridge_left_join() {
2467        let (db, _tmp) = setup_test_db();
2468        let conn = DatabaseSqlConnection::new(db.clone());
2469        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2470        setup_join_tables(&mut bridge);
2471
2472        let r = bridge.execute(
2473            "SELECT users.name, orders.amount FROM users LEFT JOIN orders ON users.id = orders.user_id"
2474        ).unwrap();
2475
2476        let rows = r.rows().unwrap();
2477        assert_eq!(rows.len(), 4); // Alice×2, Bob×1, Carol×1(NULL)
2478
2479        // Carol appears with NULL amount
2480        let carol_rows: Vec<_> = rows
2481            .iter()
2482            .filter(|r| r.get("name") == Some(&CoreSochValue::Text("Carol".into())))
2483            .collect();
2484        assert_eq!(carol_rows.len(), 1);
2485        assert_eq!(carol_rows[0].get("amount"), Some(&CoreSochValue::Null));
2486    }
2487
2488    #[test]
2489    fn test_sqlbridge_right_join() {
2490        let (db, _tmp) = setup_test_db();
2491        let conn = DatabaseSqlConnection::new(db.clone());
2492        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2493        setup_join_tables(&mut bridge);
2494
2495        // Right join: all orders appear; users without orders don't
2496        let r = bridge.execute(
2497            "SELECT users.name, orders.oid FROM users RIGHT JOIN orders ON users.id = orders.user_id"
2498        ).unwrap();
2499
2500        let rows = r.rows().unwrap();
2501        assert_eq!(rows.len(), 3); // All 3 orders match a user
2502    }
2503
2504    #[test]
2505    fn test_sqlbridge_cross_join() {
2506        let (db, _tmp) = setup_test_db();
2507        let conn = DatabaseSqlConnection::new(db.clone());
2508        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2509        setup_join_tables(&mut bridge);
2510
2511        let r = bridge
2512            .execute("SELECT users.name, orders.oid FROM users CROSS JOIN orders")
2513            .unwrap();
2514
2515        let rows = r.rows().unwrap();
2516        assert_eq!(rows.len(), 9); // 3 users × 3 orders = 9
2517    }
2518
2519    #[test]
2520    fn test_sqlbridge_join_with_where() {
2521        let (db, _tmp) = setup_test_db();
2522        let conn = DatabaseSqlConnection::new(db.clone());
2523        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2524        setup_join_tables(&mut bridge);
2525
2526        let r = bridge.execute(
2527            "SELECT users.name, orders.amount FROM users INNER JOIN orders ON users.id = orders.user_id WHERE orders.amount > 50"
2528        ).unwrap();
2529
2530        let rows = r.rows().unwrap();
2531        assert_eq!(rows.len(), 2); // Alice's 99.50 + Bob's 200.00
2532    }
2533
2534    #[test]
2535    fn test_sqlbridge_join_with_alias() {
2536        let (db, _tmp) = setup_test_db();
2537        let conn = DatabaseSqlConnection::new(db.clone());
2538        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2539        setup_join_tables(&mut bridge);
2540
2541        let r = bridge
2542            .execute("SELECT u.name, o.amount FROM users u INNER JOIN orders o ON u.id = o.user_id")
2543            .unwrap();
2544
2545        let rows = r.rows().unwrap();
2546        assert_eq!(rows.len(), 3);
2547    }
2548
2549    #[test]
2550    fn test_sqlbridge_join_with_limit() {
2551        let (db, _tmp) = setup_test_db();
2552        let conn = DatabaseSqlConnection::new(db.clone());
2553        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2554        setup_join_tables(&mut bridge);
2555
2556        let r = bridge.execute(
2557            "SELECT users.name, orders.oid FROM users INNER JOIN orders ON users.id = orders.user_id LIMIT 2"
2558        ).unwrap();
2559
2560        let rows = r.rows().unwrap();
2561        assert_eq!(rows.len(), 2);
2562    }
2563
2564    #[test]
2565    fn test_sqlbridge_join_three_tables() {
2566        let (db, _tmp) = setup_test_db();
2567        let conn = DatabaseSqlConnection::new(db.clone());
2568        let mut bridge = crate::sql::bridge::SqlBridge::new(conn);
2569        setup_join_tables(&mut bridge);
2570
2571        // Add a departments table
2572        bridge
2573            .execute("CREATE TABLE departments (code TEXT, dname TEXT)")
2574            .unwrap();
2575        bridge
2576            .execute("INSERT INTO departments (code, dname) VALUES ('eng', 'Engineering')")
2577            .unwrap();
2578        bridge
2579            .execute("INSERT INTO departments (code, dname) VALUES ('sales', 'Sales')")
2580            .unwrap();
2581
2582        let r = bridge.execute(
2583            "SELECT users.name, departments.dname FROM users INNER JOIN departments ON users.dept = departments.code"
2584        ).unwrap();
2585
2586        let rows = r.rows().unwrap();
2587        assert_eq!(rows.len(), 3); // All 3 users match a department
2588    }
2589
2590    #[test]
2591    fn test_namespaced_connection_prefixes_tables() {
2592        let ns_conn = NamespacedSqlConnection::new(MockConn::default(), "prod", "app");
2593        assert_eq!(ns_conn.prefix_table("users"), "prod:app:users");
2594        assert_eq!(ns_conn.prefix_table("posts"), "prod:app:posts");
2595        assert_eq!(ns_conn.namespace(), "prod");
2596        assert_eq!(ns_conn.database(), "app");
2597    }
2598
2599    #[test]
2600    fn test_namespaced_connection_isolates_data() {
2601        let (db, _tmp) = setup_test_db();
2602
2603        // Create two namespaced connections to the same underlying DB
2604        let conn_a =
2605            NamespacedSqlConnection::new(DatabaseSqlConnection::new(db.clone()), "tenant_a", "db1");
2606        let conn_b =
2607            NamespacedSqlConnection::new(DatabaseSqlConnection::new(db.clone()), "tenant_b", "db1");
2608
2609        let mut bridge_a = crate::sql::bridge::SqlBridge::new(conn_a);
2610        let mut bridge_b = crate::sql::bridge::SqlBridge::new(conn_b);
2611
2612        // Tenant A creates a table and inserts data
2613        bridge_a
2614            .execute("CREATE TABLE users (name TEXT, age INTEGER)")
2615            .unwrap();
2616        bridge_a
2617            .execute("INSERT INTO users (name, age) VALUES ('Alice', 30)")
2618            .unwrap();
2619
2620        // Tenant B creates the same table name and inserts different data
2621        bridge_b
2622            .execute("CREATE TABLE users (name TEXT, age INTEGER)")
2623            .unwrap();
2624        bridge_b
2625            .execute("INSERT INTO users (name, age) VALUES ('Bob', 25)")
2626            .unwrap();
2627
2628        // Tenant A can only see their own data
2629        let result_a = bridge_a.execute("SELECT * FROM users").unwrap();
2630        let rows_a = result_a.rows().unwrap();
2631        assert_eq!(rows_a.len(), 1);
2632
2633        // Tenant B can only see their own data
2634        let result_b = bridge_b.execute("SELECT * FROM users").unwrap();
2635        let rows_b = result_b.rows().unwrap();
2636        assert_eq!(rows_b.len(), 1);
2637    }
2638
2639    /// Minimal mock connection for prefix testing
2640    #[derive(Default)]
2641    struct MockConn;
2642    impl crate::sql::bridge::SqlConnection for MockConn {
2643        fn select(
2644            &self,
2645            _: &str,
2646            _: &[String],
2647            _: Option<&Expr>,
2648            _: &[OrderByItem],
2649            _: Option<usize>,
2650            _: Option<usize>,
2651            _: &[CoreSochValue],
2652        ) -> SqlResult<ExecutionResult> {
2653            Ok(ExecutionResult::Rows {
2654                columns: vec![],
2655                rows: vec![],
2656            })
2657        }
2658        fn insert(
2659            &mut self,
2660            _: &str,
2661            _: Option<&[String]>,
2662            _: &[Vec<Expr>],
2663            _: Option<&OnConflict>,
2664            _: &[CoreSochValue],
2665        ) -> SqlResult<ExecutionResult> {
2666            Ok(ExecutionResult::RowsAffected(0))
2667        }
2668        fn update(
2669            &mut self,
2670            _: &str,
2671            _: &[Assignment],
2672            _: Option<&Expr>,
2673            _: &[CoreSochValue],
2674        ) -> SqlResult<ExecutionResult> {
2675            Ok(ExecutionResult::RowsAffected(0))
2676        }
2677        fn delete(
2678            &mut self,
2679            _: &str,
2680            _: Option<&Expr>,
2681            _: &[CoreSochValue],
2682        ) -> SqlResult<ExecutionResult> {
2683            Ok(ExecutionResult::RowsAffected(0))
2684        }
2685        fn create_table(&mut self, _: &CreateTableStmt) -> SqlResult<ExecutionResult> {
2686            Ok(ExecutionResult::Ok)
2687        }
2688        fn drop_table(&mut self, _: &DropTableStmt) -> SqlResult<ExecutionResult> {
2689            Ok(ExecutionResult::Ok)
2690        }
2691        fn create_index(&mut self, _: &CreateIndexStmt) -> SqlResult<ExecutionResult> {
2692            Ok(ExecutionResult::Ok)
2693        }
2694        fn drop_index(&mut self, _: &DropIndexStmt) -> SqlResult<ExecutionResult> {
2695            Ok(ExecutionResult::Ok)
2696        }
2697        fn alter_table(&mut self, _: &AlterTableStmt) -> SqlResult<ExecutionResult> {
2698            Ok(ExecutionResult::Ok)
2699        }
2700        fn begin(&mut self, _: &BeginStmt) -> SqlResult<ExecutionResult> {
2701            Ok(ExecutionResult::TransactionOk)
2702        }
2703        fn commit(&mut self) -> SqlResult<ExecutionResult> {
2704            Ok(ExecutionResult::TransactionOk)
2705        }
2706        fn rollback(&mut self, _: Option<&str>) -> SqlResult<ExecutionResult> {
2707            Ok(ExecutionResult::TransactionOk)
2708        }
2709        fn table_exists(&self, _: &str) -> SqlResult<bool> {
2710            Ok(false)
2711        }
2712        fn index_exists(&self, _: &str) -> SqlResult<bool> {
2713            Ok(false)
2714        }
2715        fn scan_all(
2716            &self,
2717            _: &str,
2718            _: &[String],
2719        ) -> SqlResult<Vec<HashMap<String, CoreSochValue>>> {
2720            Ok(vec![])
2721        }
2722        fn eval_join_predicate(
2723            &self,
2724            _: &Expr,
2725            _: &HashMap<String, CoreSochValue>,
2726            _: &[CoreSochValue],
2727        ) -> Option<bool> {
2728            Some(true)
2729        }
2730    }
2731}