vibesql_executor/
session.rs

1//! Session - Prepared Statement Execution Context
2//!
3//! Provides a session-based API for prepared statement execution, offering
4//! significant performance benefits for repeated query patterns by caching
5//! parsed SQL and performing AST-level parameter binding.
6//!
7//! ## Performance Benefits
8//!
9//! For repeated queries, prepared statements can provide 10-100x speedup by:
10//! - Parsing SQL once and caching the AST
11//! - Binding parameters at the AST level (no re-parsing)
12//! - Reusing the same cached statement for different parameter values
13//!
14//! ## Usage
15//!
16//! ```text
17//! use vibesql_executor::Session;
18//! use vibesql_storage::Database;
19//! use vibesql_types::SqlValue;
20//!
21//! let mut db = Database::new();
22//! // ... create tables and insert data ...
23//!
24//! // Create a session for prepared statement execution
25//! let session = Session::new(&db);
26//!
27//! // Prepare a statement (parses SQL once)
28//! let stmt = session.prepare("SELECT * FROM users WHERE id = ?")?;
29//!
30//! // Execute with different parameters (reuses parsed AST - no re-parsing!)
31//! let result1 = session.execute_prepared(&stmt, &[SqlValue::Integer(1)])?;
32//! let result2 = session.execute_prepared(&stmt, &[SqlValue::Integer(2)])?;
33//!
34//! // For DML statements that modify data, use execute_prepared_mut
35//! let mut session = Session::new(&mut db);
36//! let insert_stmt = session.prepare("INSERT INTO users (id, name) VALUES (?, ?)")?;
37//! session.execute_prepared_mut(&insert_stmt, &[SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))])?;
38//! ```
39//!
40//! ## Thread Safety
41//!
42//! `PreparedStatement` is `Clone` and can be shared across threads via `Arc`.
43//! The `PreparedStatementCache` uses internal locking for thread-safe access.
44
45use std::sync::Arc;
46
47use vibesql_ast::Statement;
48use vibesql_storage::{Database, Row};
49use vibesql_types::SqlValue;
50
51use crate::{
52    cache::{
53        ArenaParseError, ArenaPreparedStatement, CachedPlan, PkPointLookupPlan, PreparedStatement,
54        PreparedStatementCache, PreparedStatementError, ProjectionPlan, ResolvedProjection,
55    },
56    errors::ExecutorError,
57    DeleteExecutor, InsertExecutor, SelectExecutor, SelectResult, UpdateExecutor,
58};
59
60/// Execution result for prepared statements
61#[derive(Debug)]
62pub enum PreparedExecutionResult {
63    /// Result from a SELECT query
64    Select(SelectResult),
65    /// Number of rows affected by INSERT/UPDATE/DELETE
66    RowsAffected(usize),
67    /// DDL or other statement that doesn't return rows
68    Ok,
69}
70
71impl PreparedExecutionResult {
72    /// Get rows if this is a SELECT result
73    pub fn rows(&self) -> Option<&[Row]> {
74        match self {
75            PreparedExecutionResult::Select(result) => Some(&result.rows),
76            _ => None,
77        }
78    }
79
80    /// Get rows affected if this is a DML result
81    pub fn rows_affected(&self) -> Option<usize> {
82        match self {
83            PreparedExecutionResult::RowsAffected(n) => Some(*n),
84            _ => None,
85        }
86    }
87
88    /// Convert to SelectResult if this is a SELECT result
89    pub fn into_select_result(self) -> Option<SelectResult> {
90        match self {
91            PreparedExecutionResult::Select(result) => Some(result),
92            _ => None,
93        }
94    }
95}
96
97/// Error type for session operations
98#[derive(Debug)]
99pub enum SessionError {
100    /// Error during prepared statement operations
101    PreparedStatement(PreparedStatementError),
102    /// Error during query execution
103    Execution(ExecutorError),
104    /// Statement type not supported for this operation
105    UnsupportedStatement(String),
106}
107
108impl std::fmt::Display for SessionError {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            SessionError::PreparedStatement(e) => write!(f, "Prepared statement error: {}", e),
112            SessionError::Execution(e) => write!(f, "Execution error: {:?}", e),
113            SessionError::UnsupportedStatement(msg) => write!(f, "Unsupported statement: {}", msg),
114        }
115    }
116}
117
118impl std::error::Error for SessionError {}
119
120impl From<PreparedStatementError> for SessionError {
121    fn from(e: PreparedStatementError) -> Self {
122        SessionError::PreparedStatement(e)
123    }
124}
125
126impl From<ExecutorError> for SessionError {
127    fn from(e: ExecutorError) -> Self {
128        SessionError::Execution(e)
129    }
130}
131
132/// Session for executing prepared statements
133///
134/// A session holds a reference to the database and a cache of prepared statements.
135/// Use this for executing repeated queries with different parameters.
136pub struct Session<'a> {
137    db: &'a Database,
138    cache: Arc<PreparedStatementCache>,
139}
140
141impl<'a> Session<'a> {
142    /// Create a new session with a reference to the database
143    ///
144    /// Uses a default cache size of 1000 prepared statements.
145    pub fn new(db: &'a Database) -> Self {
146        Self { db, cache: Arc::new(PreparedStatementCache::default_cache()) }
147    }
148
149    /// Create a new session with a custom cache size
150    pub fn with_cache_size(db: &'a Database, cache_size: usize) -> Self {
151        Self { db, cache: Arc::new(PreparedStatementCache::new(cache_size)) }
152    }
153
154    /// Create a new session with a shared cache
155    ///
156    /// This allows multiple sessions to share the same prepared statement cache,
157    /// which is useful for connection pooling scenarios.
158    pub fn with_shared_cache(db: &'a Database, cache: Arc<PreparedStatementCache>) -> Self {
159        Self { db, cache }
160    }
161
162    /// Get the underlying database reference
163    pub fn database(&self) -> &Database {
164        self.db
165    }
166
167    /// Get the prepared statement cache
168    pub fn cache(&self) -> &PreparedStatementCache {
169        &self.cache
170    }
171
172    /// Get the shared cache Arc (for sharing with other sessions)
173    pub fn shared_cache(&self) -> Arc<PreparedStatementCache> {
174        Arc::clone(&self.cache)
175    }
176
177    /// Prepare a SQL statement for execution
178    ///
179    /// Parses the SQL and caches the result. Subsequent calls with the same
180    /// SQL string will return the cached statement without re-parsing.
181    ///
182    /// Supports `?` placeholders for parameter binding.
183    ///
184    /// # Example
185    ///
186    /// ```text
187    /// let stmt = session.prepare("SELECT * FROM users WHERE id = ?")?;
188    /// assert_eq!(stmt.param_count(), 1);
189    /// ```
190    pub fn prepare(&self, sql: &str) -> Result<Arc<PreparedStatement>, SessionError> {
191        self.cache.get_or_prepare(sql).map_err(SessionError::from)
192    }
193
194    /// Prepare a SQL SELECT statement using arena allocation
195    ///
196    /// This is optimized for SELECT statements and provides better cache locality.
197    /// Arena-based statements store the parsed AST in contiguous memory, which
198    /// can improve performance for frequently executed queries.
199    ///
200    /// For non-SELECT statements, this will return an error - use `prepare()` instead.
201    ///
202    /// # Performance Benefits
203    ///
204    /// Arena allocation provides:
205    /// - Better cache locality (contiguous memory layout)
206    /// - Lower allocation overhead (single arena vs multiple heap allocations)
207    /// - Potential for zero-copy parameter binding in future phases
208    ///
209    /// # Example
210    ///
211    /// ```text
212    /// let stmt = session.prepare_arena("SELECT * FROM users WHERE id = ?")?;
213    /// assert_eq!(stmt.param_count(), 1);
214    /// ```
215    pub fn prepare_arena(&self, sql: &str) -> Result<Arc<ArenaPreparedStatement>, ArenaParseError> {
216        self.cache.get_or_prepare_arena(sql)
217    }
218
219    /// Execute a prepared SELECT statement with parameters
220    ///
221    /// Binds the parameters to the prepared statement and executes it.
222    /// This is the fast path for repeated queries - no SQL parsing occurs.
223    ///
224    /// For simple PK point lookups, uses cached execution plan to bypass
225    /// the full query execution pipeline.
226    ///
227    /// # Example
228    ///
229    /// ```text
230    /// let stmt = session.prepare("SELECT * FROM users WHERE id = ?")?;
231    /// let result = session.execute_prepared(&stmt, &[SqlValue::Integer(42)])?;
232    /// ```
233    pub fn execute_prepared(
234        &self,
235        stmt: &PreparedStatement,
236        params: &[SqlValue],
237    ) -> Result<PreparedExecutionResult, SessionError> {
238        // Check parameter count first
239        if params.len() != stmt.param_count() {
240            return Err(SessionError::PreparedStatement(
241                PreparedStatementError::ParameterCountMismatch {
242                    expected: stmt.param_count(),
243                    actual: params.len(),
244                },
245            ));
246        }
247
248        // Try fast-path execution using cached plan
249        match stmt.cached_plan() {
250            CachedPlan::PkPointLookup(plan) => {
251                if let Some(result) = self.try_execute_pk_lookup(plan, params)? {
252                    return Ok(result);
253                }
254                // Fall through to standard execution if fast path fails
255            }
256            CachedPlan::SimpleFastPath(plan) => {
257                // SimpleFastPath caches both the result of is_simple_point_query()
258                // AND the column names derived from the SELECT list (#3780)
259                let bound_stmt = stmt.bind(params)?;
260                if let Statement::Select(select_stmt) = &bound_stmt {
261                    let executor = SelectExecutor::new(self.db);
262
263                    // Use cached column names if available, otherwise derive and cache them
264                    let columns = plan.get_or_resolve_columns(|| {
265                        executor.derive_fast_path_column_names(select_stmt).ok()
266                    });
267
268                    match columns {
269                        Some(cached_columns) => {
270                            // Fast path: use cached column names
271                            let rows = executor.execute_fast_path(select_stmt)?;
272                            return Ok(PreparedExecutionResult::Select(SelectResult {
273                                columns: cached_columns.iter().cloned().collect(),
274                                rows,
275                            }));
276                        }
277                        None => {
278                            // First execution or resolution failed: derive columns
279                            let result = executor.execute_fast_path_with_columns(select_stmt)?;
280                            return Ok(PreparedExecutionResult::Select(result));
281                        }
282                    }
283                }
284                // Fall through for non-SELECT (shouldn't happen for SimpleFastPath)
285            }
286            CachedPlan::PkDelete(_) => {
287                // DELETE requires mutable access, use execute_prepared_mut instead
288                // Fall through to standard execution which will return an error
289            }
290            CachedPlan::Standard => {
291                // Fall through to standard execution
292            }
293        }
294
295        // Bind parameters to get executable statement
296        let bound_stmt = stmt.bind(params)?;
297
298        // Execute based on statement type
299        self.execute_statement(&bound_stmt)
300    }
301
302    /// Try to execute a PK point lookup using the cached plan
303    ///
304    /// Returns `Ok(Some(result))` if execution succeeded via fast path,
305    /// `Ok(None)` if we need to fall back to standard execution,
306    /// or `Err` if a real error occurred.
307    fn try_execute_pk_lookup(
308        &self,
309        plan: &PkPointLookupPlan,
310        params: &[SqlValue],
311    ) -> Result<Option<PreparedExecutionResult>, SessionError> {
312        // Get the table
313        let table = match self.db.get_table(&plan.table_name) {
314            Some(t) => t,
315            None => return Ok(None), // Table doesn't exist or it's a view - fall back
316        };
317
318        // Verify PK columns match what we expected
319        let actual_pk_columns = match &table.schema.primary_key {
320            Some(cols) if cols.len() == plan.pk_columns.len() => cols,
321            _ => return Ok(None), // PK structure changed - fall back
322        };
323
324        // Verify column mappings and extract parameter values
325        // Use borrowing to avoid cloning when possible
326        for (param_idx, pk_col_idx) in &plan.param_to_pk_col {
327            if *param_idx >= params.len() || *pk_col_idx >= plan.pk_columns.len() {
328                return Ok(None); // Invalid mapping - fall back
329            }
330
331            // Verify the column name still matches
332            let expected_col = &plan.pk_columns[*pk_col_idx];
333            let actual_col = &actual_pk_columns[*pk_col_idx];
334            if !expected_col.eq_ignore_ascii_case(actual_col) {
335                return Ok(None); // Column mismatch - fall back
336            }
337        }
338
339        // Get or resolve projection info (cached after first execution)
340        let resolved = match plan
341            .get_or_resolve(|proj| self.resolve_projection(proj, &table.schema.columns))
342        {
343            Some(r) => r,
344            None => return Ok(None), // Resolution failed - fall back
345        };
346
347        // Perform the PK lookup using borrowed parameter
348        // For single-column PK, we can pass the reference directly
349        let row = if plan.param_to_pk_col.len() == 1 {
350            let (param_idx, _) = plan.param_to_pk_col[0];
351            self.db
352                .get_row_by_pk(&plan.table_name, &params[param_idx])
353                .map_err(|e| SessionError::Execution(ExecutorError::StorageError(e.to_string())))?
354        } else {
355            // For composite PK, we need to collect values
356            let pk_values: Vec<SqlValue> = plan
357                .param_to_pk_col
358                .iter()
359                .map(|(param_idx, _)| params[*param_idx].clone())
360                .collect();
361            self.db
362                .get_row_by_composite_pk(&plan.table_name, &pk_values)
363                .map_err(|e| SessionError::Execution(ExecutorError::StorageError(e.to_string())))?
364        };
365
366        // Build result with cached column names
367        let columns: Vec<String> = resolved.column_names.iter().cloned().collect();
368
369        let rows = match row {
370            Some(r) => {
371                // Project directly from source row without cloning entire row first
372                if resolved.column_indices.is_empty() {
373                    // Wildcard - clone entire row
374                    vec![r.clone()]
375                } else {
376                    // Specific columns - only clone needed values
377                    let projected_values: Vec<SqlValue> =
378                        resolved.column_indices.iter().map(|&i| r.values[i].clone()).collect();
379                    vec![Row::new(projected_values)]
380                }
381            }
382            None => vec![],
383        };
384
385        Ok(Some(PreparedExecutionResult::Select(SelectResult { columns, rows })))
386    }
387
388    /// Resolve projection plan to column indices and names
389    ///
390    /// This is called once per plan and the result is cached.
391    fn resolve_projection(
392        &self,
393        proj: &ProjectionPlan,
394        schema_columns: &[vibesql_catalog::ColumnSchema],
395    ) -> Option<ResolvedProjection> {
396        match proj {
397            ProjectionPlan::Wildcard => {
398                // For wildcard, indices are empty (we clone entire row)
399                // but we cache the column names
400                let column_names: Arc<[String]> =
401                    schema_columns.iter().map(|c| c.name.clone()).collect();
402                Some(ResolvedProjection { column_indices: vec![], column_names })
403            }
404            ProjectionPlan::Columns(projections) => {
405                let mut col_indices = Vec::with_capacity(projections.len());
406                let mut column_names = Vec::with_capacity(projections.len());
407
408                for proj in projections {
409                    let idx = schema_columns
410                        .iter()
411                        .position(|c| c.name.eq_ignore_ascii_case(&proj.column_name))?;
412
413                    col_indices.push(idx);
414                    column_names
415                        .push(proj.alias.clone().unwrap_or_else(|| proj.column_name.clone()));
416                }
417
418                Some(ResolvedProjection {
419                    column_indices: col_indices,
420                    column_names: column_names.into(),
421                })
422            }
423        }
424    }
425
426    /// Execute a bound statement (internal helper)
427    fn execute_statement(&self, stmt: &Statement) -> Result<PreparedExecutionResult, SessionError> {
428        match stmt {
429            Statement::Select(select_stmt) => {
430                let executor = SelectExecutor::new(self.db);
431                let result = executor.execute_with_columns(select_stmt)?;
432                Ok(PreparedExecutionResult::Select(result))
433            }
434            _ => Err(SessionError::UnsupportedStatement(
435                "Only SELECT is supported for read-only sessions. Use SessionMut for DML.".into(),
436            )),
437        }
438    }
439}
440
441/// Mutable session for executing prepared statements that modify data
442///
443/// Use this session type when you need to execute INSERT, UPDATE, or DELETE
444/// statements in addition to SELECT.
445pub struct SessionMut<'a> {
446    db: &'a mut Database,
447    cache: Arc<PreparedStatementCache>,
448}
449
450impl<'a> SessionMut<'a> {
451    /// Create a new mutable session with a reference to the database
452    pub fn new(db: &'a mut Database) -> Self {
453        Self { db, cache: Arc::new(PreparedStatementCache::default_cache()) }
454    }
455
456    /// Create a new mutable session with a custom cache size
457    pub fn with_cache_size(db: &'a mut Database, cache_size: usize) -> Self {
458        Self { db, cache: Arc::new(PreparedStatementCache::new(cache_size)) }
459    }
460
461    /// Create a new mutable session with a shared cache
462    pub fn with_shared_cache(db: &'a mut Database, cache: Arc<PreparedStatementCache>) -> Self {
463        Self { db, cache }
464    }
465
466    /// Get the underlying database reference (immutable)
467    pub fn database(&self) -> &Database {
468        self.db
469    }
470
471    /// Get the underlying database reference (mutable)
472    pub fn database_mut(&mut self) -> &mut Database {
473        self.db
474    }
475
476    /// Get the prepared statement cache
477    pub fn cache(&self) -> &PreparedStatementCache {
478        &self.cache
479    }
480
481    /// Get the shared cache Arc
482    pub fn shared_cache(&self) -> Arc<PreparedStatementCache> {
483        Arc::clone(&self.cache)
484    }
485
486    /// Prepare a SQL statement for execution
487    pub fn prepare(&self, sql: &str) -> Result<Arc<PreparedStatement>, SessionError> {
488        self.cache.get_or_prepare(sql).map_err(SessionError::from)
489    }
490
491    /// Prepare a SQL SELECT statement using arena allocation
492    ///
493    /// See [`Session::prepare_arena`] for details.
494    pub fn prepare_arena(&self, sql: &str) -> Result<Arc<ArenaPreparedStatement>, ArenaParseError> {
495        self.cache.get_or_prepare_arena(sql)
496    }
497
498    /// Execute a prepared statement with parameters (read-only)
499    ///
500    /// Use this for SELECT queries.
501    pub fn execute_prepared(
502        &self,
503        stmt: &PreparedStatement,
504        params: &[SqlValue],
505    ) -> Result<PreparedExecutionResult, SessionError> {
506        let bound_stmt = stmt.bind(params)?;
507        self.execute_statement_readonly(&bound_stmt)
508    }
509
510    /// Execute a prepared statement with parameters (read-write)
511    ///
512    /// Use this for INSERT, UPDATE, DELETE statements.
513    pub fn execute_prepared_mut(
514        &mut self,
515        stmt: &PreparedStatement,
516        params: &[SqlValue],
517    ) -> Result<PreparedExecutionResult, SessionError> {
518        // Try fast-path execution using cached plan
519        if let CachedPlan::PkDelete(plan) = stmt.cached_plan() {
520            if let Some(result) = self.try_execute_pk_delete(plan, params)? {
521                return Ok(result);
522            }
523            // Fall through to standard execution if fast path fails
524        }
525
526        let bound_stmt = stmt.bind(params)?;
527        self.execute_statement_mut(&bound_stmt)
528    }
529
530    /// Try to execute a PK delete using the cached plan
531    ///
532    /// Returns `Ok(Some(result))` if execution succeeded via fast path,
533    /// `Ok(None)` if we need to fall back to standard execution.
534    fn try_execute_pk_delete(
535        &mut self,
536        plan: &crate::cache::PkDeletePlan,
537        params: &[SqlValue],
538    ) -> Result<Option<PreparedExecutionResult>, SessionError> {
539        // Check cached validation first (fast path for repeated executions)
540        if let Some(valid) = plan.is_fast_path_valid() {
541            if !valid {
542                return Ok(None); // Cached as invalid, fall back immediately
543            }
544            // Cached as valid, skip expensive checks and execute directly
545        } else {
546            // Not cached yet - do the expensive validation and cache result
547            let valid = self.validate_delete_fast_path(plan);
548            plan.set_fast_path_valid(valid);
549            if !valid {
550                return Ok(None);
551            }
552        }
553
554        // Build PK values from parameters
555        let pk_values = plan.build_pk_values(params);
556
557        // Execute the fast delete
558        match self.db.delete_by_pk_fast(&plan.table_name, &pk_values) {
559            Ok(deleted) => {
560                let rows_affected = if deleted { 1 } else { 0 };
561                // Track changes count for changes() and total_changes() functions
562                self.db.set_last_changes_count(rows_affected);
563                self.db.increment_total_changes_count(rows_affected);
564                Ok(Some(PreparedExecutionResult::RowsAffected(rows_affected)))
565            }
566            Err(_) => Ok(None), // Fall back to standard path on error
567        }
568    }
569
570    /// Validate whether fast delete path can be used for this table
571    /// This is expensive (iterates triggers and FKs) so result is cached
572    fn validate_delete_fast_path(&self, plan: &crate::cache::PkDeletePlan) -> bool {
573        // Check for triggers - if any exist, we must use standard path
574        let has_triggers = self
575            .db
576            .catalog
577            .get_triggers_for_table(&plan.table_name, Some(vibesql_ast::TriggerEvent::Delete))
578            .next()
579            .is_some();
580
581        if has_triggers {
582            return false;
583        }
584
585        // Check for referencing FKs - if any exist, we must use standard path
586        let schema = match self.db.catalog.get_table(&plan.table_name) {
587            Some(s) => s,
588            None => return false, // Table not found
589        };
590
591        let has_pk = schema.get_primary_key_indices().is_some();
592        if has_pk {
593            let has_referencing_fks = self.db.catalog.list_tables().iter().any(|t| {
594                self.db
595                    .catalog
596                    .get_table(t)
597                    .map(|s| {
598                        s.foreign_keys
599                            .iter()
600                            .any(|fk| fk.parent_table.eq_ignore_ascii_case(&plan.table_name))
601                    })
602                    .unwrap_or(false)
603            });
604
605            if has_referencing_fks {
606                return false;
607            }
608        }
609
610        true // No blockers, fast path is valid
611    }
612
613    /// Execute a read-only statement
614    fn execute_statement_readonly(
615        &self,
616        stmt: &Statement,
617    ) -> Result<PreparedExecutionResult, SessionError> {
618        match stmt {
619            Statement::Select(select_stmt) => {
620                let executor = SelectExecutor::new(self.db);
621                let result = executor.execute_with_columns(select_stmt)?;
622                Ok(PreparedExecutionResult::Select(result))
623            }
624            _ => Err(SessionError::UnsupportedStatement(
625                "Use execute_prepared_mut for DML statements".into(),
626            )),
627        }
628    }
629
630    /// Execute a statement that may modify data
631    fn execute_statement_mut(
632        &mut self,
633        stmt: &Statement,
634    ) -> Result<PreparedExecutionResult, SessionError> {
635        match stmt {
636            Statement::Select(select_stmt) => {
637                let executor = SelectExecutor::new(self.db);
638                let result = executor.execute_with_columns(select_stmt)?;
639                Ok(PreparedExecutionResult::Select(result))
640            }
641            Statement::Insert(insert_stmt) => {
642                let rows_affected = InsertExecutor::execute(self.db, insert_stmt)?;
643                // Track changes count for changes() and total_changes() functions
644                self.db.set_last_changes_count(rows_affected);
645                self.db.increment_total_changes_count(rows_affected);
646                // Note: We don't invalidate prepared statement cache for DML operations.
647                // Prepared statements (parsed AST) don't depend on data values.
648                // Only schema changes (DDL) require cache invalidation.
649                // Query result caches are handled separately by IntegrationCache.
650                Ok(PreparedExecutionResult::RowsAffected(rows_affected))
651            }
652            Statement::Update(update_stmt) => {
653                let rows_affected = UpdateExecutor::execute(update_stmt, self.db)?;
654                // Track changes count for changes() and total_changes() functions
655                self.db.set_last_changes_count(rows_affected);
656                self.db.increment_total_changes_count(rows_affected);
657                // Note: We don't invalidate prepared statement cache for DML operations.
658                // Prepared statements (parsed AST) don't depend on data values.
659                // Only schema changes (DDL) require cache invalidation.
660                // Query result caches are handled separately by IntegrationCache.
661                Ok(PreparedExecutionResult::RowsAffected(rows_affected))
662            }
663            Statement::Delete(delete_stmt) => {
664                let rows_affected = DeleteExecutor::execute(delete_stmt, self.db)?;
665                // Track changes count for changes() and total_changes() functions
666                self.db.set_last_changes_count(rows_affected);
667                self.db.increment_total_changes_count(rows_affected);
668                // Note: We don't invalidate prepared statement cache for DML operations.
669                // Prepared statements (parsed AST) don't depend on data values.
670                // Only schema changes (DDL) require cache invalidation.
671                // Query result caches are handled separately by IntegrationCache.
672                Ok(PreparedExecutionResult::RowsAffected(rows_affected))
673            }
674            _ => Err(SessionError::UnsupportedStatement(format!(
675                "Statement type {:?} not supported for prepared execution",
676                std::mem::discriminant(stmt)
677            ))),
678        }
679    }
680}
681
682#[cfg(test)]
683mod tests {
684    use vibesql_catalog::{ColumnSchema, TableSchema};
685    use vibesql_types::DataType;
686
687    use super::*;
688
689    fn create_test_db() -> Database {
690        let mut db = Database::new();
691        // Enable case-insensitive identifiers (default MySQL behavior)
692        db.catalog.set_case_sensitive_identifiers(false);
693
694        // Create users table
695        let columns = vec![
696            ColumnSchema::new("id".to_string(), DataType::Integer, false),
697            ColumnSchema::new(
698                "name".to_string(),
699                DataType::Varchar { max_length: Some(100) },
700                true,
701            ),
702        ];
703        let schema =
704            TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
705        db.create_table(schema).unwrap();
706
707        // Insert test data
708        let row1 =
709            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]);
710        let row2 =
711            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]);
712        let row3 = Row::new(vec![
713            SqlValue::Integer(3),
714            SqlValue::Varchar(arcstr::ArcStr::from("Charlie")),
715        ]);
716
717        db.insert_row("users", row1).unwrap();
718        db.insert_row("users", row2).unwrap();
719        db.insert_row("users", row3).unwrap();
720
721        db
722    }
723
724    #[test]
725    fn test_session_prepare() {
726        let db = create_test_db();
727        let session = Session::new(&db);
728
729        let stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
730        assert_eq!(stmt.param_count(), 1);
731    }
732
733    #[test]
734    fn test_session_execute_prepared() {
735        let db = create_test_db();
736        let session = Session::new(&db);
737
738        let stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
739
740        // Execute with id = 1
741        let result = session.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
742
743        if let PreparedExecutionResult::Select(select_result) = result {
744            assert_eq!(select_result.rows.len(), 1);
745            assert_eq!(select_result.rows[0].values[0], SqlValue::Integer(1));
746            assert_eq!(
747                select_result.rows[0].values[1],
748                SqlValue::Varchar(arcstr::ArcStr::from("Alice"))
749            );
750        } else {
751            panic!("Expected Select result");
752        }
753    }
754
755    #[test]
756    fn test_session_reuse_prepared() {
757        let db = create_test_db();
758        let session = Session::new(&db);
759
760        let stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
761
762        // Execute multiple times with different parameters
763        let result1 = session.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
764        let result2 = session.execute_prepared(&stmt, &[SqlValue::Integer(2)]).unwrap();
765        let result3 = session.execute_prepared(&stmt, &[SqlValue::Integer(3)]).unwrap();
766
767        // Verify each returned the correct row
768        assert_eq!(
769            result1.rows().unwrap()[0].values[1],
770            SqlValue::Varchar(arcstr::ArcStr::from("Alice"))
771        );
772        assert_eq!(
773            result2.rows().unwrap()[0].values[1],
774            SqlValue::Varchar(arcstr::ArcStr::from("Bob"))
775        );
776        assert_eq!(
777            result3.rows().unwrap()[0].values[1],
778            SqlValue::Varchar(arcstr::ArcStr::from("Charlie"))
779        );
780
781        // Verify cache was used (should have 1 miss, then hits)
782        let stats = session.cache().stats();
783        assert_eq!(stats.misses, 1);
784        // hits is always >= 0 since it's a u64, just ensure it exists
785        let _hits = stats.hits;
786    }
787
788    #[test]
789    fn test_session_param_count_mismatch() {
790        let db = create_test_db();
791        let session = Session::new(&db);
792
793        let stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
794
795        // Try to execute with wrong number of parameters
796        let result = session.execute_prepared(&stmt, &[]);
797        assert!(result.is_err());
798
799        let result = session.execute_prepared(&stmt, &[SqlValue::Integer(1), SqlValue::Integer(2)]);
800        assert!(result.is_err());
801    }
802
803    #[test]
804    fn test_session_mut_insert() {
805        let mut db = create_test_db();
806        let mut session = SessionMut::new(&mut db);
807
808        let stmt = session.prepare("INSERT INTO users (id, name) VALUES (?, ?)").unwrap();
809
810        let result = session
811            .execute_prepared_mut(
812                &stmt,
813                &[SqlValue::Integer(4), SqlValue::Varchar(arcstr::ArcStr::from("David"))],
814            )
815            .unwrap();
816
817        assert_eq!(result.rows_affected(), Some(1));
818
819        // Verify the row was inserted
820        let select_stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
821        let select_result =
822            session.execute_prepared(&select_stmt, &[SqlValue::Integer(4)]).unwrap();
823
824        assert_eq!(select_result.rows().unwrap().len(), 1);
825        assert_eq!(
826            select_result.rows().unwrap()[0].values[1],
827            SqlValue::Varchar(arcstr::ArcStr::from("David"))
828        );
829    }
830
831    #[test]
832    fn test_session_mut_update() {
833        let mut db = create_test_db();
834        let mut session = SessionMut::new(&mut db);
835
836        let stmt = session.prepare("UPDATE users SET name = ? WHERE id = ?").unwrap();
837
838        let result = session
839            .execute_prepared_mut(
840                &stmt,
841                &[SqlValue::Varchar(arcstr::ArcStr::from("Alicia")), SqlValue::Integer(1)],
842            )
843            .unwrap();
844
845        assert_eq!(result.rows_affected(), Some(1));
846
847        // Verify the row was updated
848        let select_stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
849        let select_result =
850            session.execute_prepared(&select_stmt, &[SqlValue::Integer(1)]).unwrap();
851
852        assert_eq!(
853            select_result.rows().unwrap()[0].values[1],
854            SqlValue::Varchar(arcstr::ArcStr::from("Alicia"))
855        );
856    }
857
858    #[test]
859    fn test_session_mut_delete() {
860        let mut db = create_test_db();
861        let mut session = SessionMut::new(&mut db);
862
863        let stmt = session.prepare("DELETE FROM users WHERE id = ?").unwrap();
864
865        let result = session.execute_prepared_mut(&stmt, &[SqlValue::Integer(1)]).unwrap();
866
867        assert_eq!(result.rows_affected(), Some(1));
868
869        // Verify the row was deleted
870        let select_stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
871        let select_result =
872            session.execute_prepared(&select_stmt, &[SqlValue::Integer(1)]).unwrap();
873
874        assert_eq!(select_result.rows().unwrap().len(), 0);
875    }
876
877    #[test]
878    fn test_shared_cache() {
879        let db = create_test_db();
880
881        // Create first session and prepare a statement
882        let session1 = Session::new(&db);
883        let stmt = session1.prepare("SELECT * FROM users WHERE id = ?").unwrap();
884
885        // Get the shared cache
886        let shared_cache = session1.shared_cache();
887        let initial_misses = session1.cache().stats().misses;
888
889        // Create second session with shared cache
890        let session2 = Session::with_shared_cache(&db, shared_cache);
891
892        // Prepare the same statement - should hit cache
893        let _stmt2 = session2.prepare("SELECT * FROM users WHERE id = ?").unwrap();
894
895        // Verify cache was shared (no additional miss)
896        assert_eq!(session2.cache().stats().misses, initial_misses);
897
898        // Execute on both sessions
899        let result1 = session1.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
900        let result2 = session2.execute_prepared(&stmt, &[SqlValue::Integer(2)]).unwrap();
901
902        assert_eq!(
903            result1.rows().unwrap()[0].values[1],
904            SqlValue::Varchar(arcstr::ArcStr::from("Alice"))
905        );
906        assert_eq!(
907            result2.rows().unwrap()[0].values[1],
908            SqlValue::Varchar(arcstr::ArcStr::from("Bob"))
909        );
910    }
911
912    #[test]
913    fn test_no_params_statement() {
914        let db = create_test_db();
915        let session = Session::new(&db);
916
917        let stmt = session.prepare("SELECT * FROM users").unwrap();
918        assert_eq!(stmt.param_count(), 0);
919
920        let result = session.execute_prepared(&stmt, &[]).unwrap();
921        assert_eq!(result.rows().unwrap().len(), 3);
922    }
923
924    #[test]
925    fn test_multiple_placeholders() {
926        let db = create_test_db();
927        let session = Session::new(&db);
928
929        let stmt = session.prepare("SELECT * FROM users WHERE id >= ? AND id <= ?").unwrap();
930        assert_eq!(stmt.param_count(), 2);
931
932        let result =
933            session.execute_prepared(&stmt, &[SqlValue::Integer(1), SqlValue::Integer(2)]).unwrap();
934
935        assert_eq!(result.rows().unwrap().len(), 2);
936    }
937
938    #[test]
939    fn test_session_prepare_arena() {
940        let db = create_test_db();
941        let session = Session::new(&db);
942
943        // Test prepare_arena for SELECT statement
944        let stmt = session.prepare_arena("SELECT * FROM users WHERE id = ?").unwrap();
945        assert_eq!(stmt.param_count(), 1);
946
947        // Verify tables were extracted (arena parser uses uppercase)
948        assert!(stmt.tables().contains("users"));
949
950        // Test caching - second call should hit cache
951        let stmt2 = session.prepare_arena("SELECT * FROM users WHERE id = ?").unwrap();
952        assert_eq!(stmt2.param_count(), 1);
953
954        // Verify it's the same cached statement
955        assert!(std::sync::Arc::ptr_eq(&stmt, &stmt2));
956    }
957
958    #[test]
959    fn test_session_prepare_arena_no_params() {
960        let db = create_test_db();
961        let session = Session::new(&db);
962
963        let stmt = session.prepare_arena("SELECT * FROM users").unwrap();
964        assert_eq!(stmt.param_count(), 0);
965    }
966
967    #[test]
968    fn test_session_prepare_arena_join() {
969        let db = create_test_db();
970        let session = Session::new(&db);
971
972        // Create orders table first
973        use vibesql_catalog::{ColumnSchema, TableSchema};
974        use vibesql_types::DataType;
975
976        let orders_columns = vec![
977            ColumnSchema::new("id".to_string(), DataType::Integer, false),
978            ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
979        ];
980        let _orders_schema = TableSchema::with_primary_key(
981            "orders".to_string(),
982            orders_columns,
983            vec!["id".to_string()],
984        );
985
986        // We need a mutable db for this test, so we'll just test the parse works
987        // without actually querying
988        let stmt = session
989            .prepare_arena("SELECT u.id FROM users u JOIN orders o ON u.id = o.user_id")
990            .unwrap();
991
992        // Both tables should be tracked
993        let tables = stmt.tables();
994        assert!(tables.contains("users"), "Expected USERS in {:?}", tables);
995        assert!(tables.contains("orders"), "Expected orders in {:?}", tables);
996    }
997
998    #[test]
999    fn test_session_mut_prepare_arena() {
1000        let mut db = create_test_db();
1001        let session = SessionMut::new(&mut db);
1002
1003        // Test prepare_arena for SELECT statement
1004        let stmt = session.prepare_arena("SELECT * FROM users WHERE id = ?").unwrap();
1005        assert_eq!(stmt.param_count(), 1);
1006    }
1007
1008    #[test]
1009    fn test_delete_fast_path_plan() {
1010        use crate::cache::CachedPlan;
1011
1012        let mut db = create_test_db();
1013        let mut session = SessionMut::new(&mut db);
1014
1015        // Prepare DELETE statement
1016        let stmt = session.prepare("DELETE FROM users WHERE id = ?").unwrap();
1017
1018        // Verify it creates a PkDelete plan
1019        match stmt.cached_plan() {
1020            CachedPlan::PkDelete(plan) => {
1021                // Table name should be uppercase
1022                assert_eq!(plan.table_name, "users");
1023                // Should have one PK column
1024                assert_eq!(plan.pk_columns, vec!["id"]);
1025                // Param 0 maps to PK column 0
1026                assert_eq!(plan.param_to_pk_col, vec![(0, 0)]);
1027                // Fast path should not be validated yet
1028                assert!(plan.is_fast_path_valid().is_none());
1029            }
1030            other => panic!("Expected PkDelete plan, got {:?}", other),
1031        }
1032
1033        // Execute DELETE - this should validate and use fast path
1034        let result = session.execute_prepared_mut(&stmt, &[SqlValue::Integer(1)]).unwrap();
1035        assert_eq!(result.rows_affected(), Some(1));
1036
1037        // After execution, fast path should be cached as valid (no triggers/FKs on users table)
1038        match stmt.cached_plan() {
1039            CachedPlan::PkDelete(plan) => {
1040                assert_eq!(
1041                    plan.is_fast_path_valid(),
1042                    Some(true),
1043                    "Fast path should be valid after execution"
1044                );
1045            }
1046            _ => panic!("Plan should still be PkDelete"),
1047        }
1048
1049        // Verify the row was deleted
1050        let select_stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1051        let select_result =
1052            session.execute_prepared(&select_stmt, &[SqlValue::Integer(1)]).unwrap();
1053        assert_eq!(select_result.rows().unwrap().len(), 0);
1054    }
1055
1056    // =========================================================================
1057    // Concurrent Session Tests (Phase 3: Session with &Database)
1058    // =========================================================================
1059    // These tests verify that multiple Session instances can coexist and
1060    // execute queries concurrently against the same &Database reference.
1061    // This is the foundation for concurrent read queries in the server.
1062
1063    #[test]
1064    fn test_concurrent_sessions_coexist() {
1065        let db = create_test_db();
1066
1067        // Multiple Sessions can be created with the same &Database
1068        let session1 = Session::new(&db);
1069        let session2 = Session::new(&db);
1070        let session3 = Session::new(&db);
1071
1072        // All sessions can prepare statements
1073        let stmt1 = session1.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1074        let stmt2 = session2.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1075        let stmt3 = session3.prepare("SELECT name FROM users WHERE id = ?").unwrap();
1076
1077        // All can execute queries concurrently (sequentially here, but proves they coexist)
1078        let result1 = session1.execute_prepared(&stmt1, &[SqlValue::Integer(1)]).unwrap();
1079        let result2 = session2.execute_prepared(&stmt2, &[SqlValue::Integer(2)]).unwrap();
1080        let result3 = session3.execute_prepared(&stmt3, &[SqlValue::Integer(3)]).unwrap();
1081
1082        // All results should be correct
1083        assert_eq!(
1084            result1.rows().unwrap()[0].values[1],
1085            SqlValue::Varchar(arcstr::ArcStr::from("Alice"))
1086        );
1087        assert_eq!(
1088            result2.rows().unwrap()[0].values[1],
1089            SqlValue::Varchar(arcstr::ArcStr::from("Bob"))
1090        );
1091        assert_eq!(
1092            result3.rows().unwrap()[0].values[0],
1093            SqlValue::Varchar(arcstr::ArcStr::from("Charlie"))
1094        );
1095    }
1096
1097    #[test]
1098    fn test_concurrent_sessions_shared_cache() {
1099        let db = create_test_db();
1100
1101        // Create a shared cache
1102        let shared_cache = Arc::new(PreparedStatementCache::default_cache());
1103
1104        // Multiple sessions sharing the same cache
1105        let session1 = Session::with_shared_cache(&db, Arc::clone(&shared_cache));
1106        let session2 = Session::with_shared_cache(&db, Arc::clone(&shared_cache));
1107
1108        // First session prepares a statement (cache miss)
1109        let stmt = session1.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1110        let stats_after_first = shared_cache.stats();
1111        assert_eq!(stats_after_first.misses, 1);
1112
1113        // Second session uses the same SQL (cache hit)
1114        let _stmt2 = session2.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1115        let stats_after_second = shared_cache.stats();
1116        assert_eq!(stats_after_second.misses, 1); // Still 1 miss
1117        assert!(stats_after_second.hits >= 1); // At least 1 hit
1118
1119        // Both sessions can execute the same prepared statement
1120        let r1 = session1.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
1121        let r2 = session2.execute_prepared(&stmt, &[SqlValue::Integer(2)]).unwrap();
1122
1123        assert_eq!(r1.rows().unwrap().len(), 1);
1124        assert_eq!(r2.rows().unwrap().len(), 1);
1125    }
1126
1127    #[test]
1128    fn test_concurrent_sessions_different_queries() {
1129        let db = create_test_db();
1130
1131        let session1 = Session::new(&db);
1132        let session2 = Session::new(&db);
1133
1134        // Different query types
1135        let point_query = session1.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1136        let range_query =
1137            session2.prepare("SELECT * FROM users WHERE id >= ? AND id <= ?").unwrap();
1138        let all_query = session1.prepare("SELECT * FROM users").unwrap();
1139        let projection_query = session2.prepare("SELECT name FROM users WHERE id = ?").unwrap();
1140
1141        // Execute all queries
1142        let r1 = session1.execute_prepared(&point_query, &[SqlValue::Integer(1)]).unwrap();
1143        let r2 = session2
1144            .execute_prepared(&range_query, &[SqlValue::Integer(1), SqlValue::Integer(2)])
1145            .unwrap();
1146        let r3 = session1.execute_prepared(&all_query, &[]).unwrap();
1147        let r4 = session2.execute_prepared(&projection_query, &[SqlValue::Integer(3)]).unwrap();
1148
1149        assert_eq!(r1.rows().unwrap().len(), 1);
1150        assert_eq!(r2.rows().unwrap().len(), 2);
1151        assert_eq!(r3.rows().unwrap().len(), 3);
1152        assert_eq!(r4.rows().unwrap().len(), 1);
1153    }
1154
1155    #[test]
1156    fn test_concurrent_sessions_interleaved_execution() {
1157        let db = create_test_db();
1158
1159        let session1 = Session::new(&db);
1160        let session2 = Session::new(&db);
1161
1162        let stmt = session1.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1163
1164        // Interleave executions between sessions
1165        // This pattern mimics what happens with concurrent connections
1166        let r1 = session1.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
1167        let r2 = session2.execute_prepared(&stmt, &[SqlValue::Integer(2)]).unwrap();
1168        let r3 = session1.execute_prepared(&stmt, &[SqlValue::Integer(3)]).unwrap();
1169        let r4 = session2.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
1170
1171        // Verify correctness
1172        assert_eq!(r1.rows().unwrap()[0].values[0], SqlValue::Integer(1));
1173        assert_eq!(r2.rows().unwrap()[0].values[0], SqlValue::Integer(2));
1174        assert_eq!(r3.rows().unwrap()[0].values[0], SqlValue::Integer(3));
1175        assert_eq!(r4.rows().unwrap()[0].values[0], SqlValue::Integer(1));
1176    }
1177
1178    #[test]
1179    fn test_session_immutable_borrow_allows_multiple() {
1180        let db = create_test_db();
1181
1182        // This test explicitly verifies the Rust borrow checker allows multiple
1183        // immutable borrows of Database for concurrent Sessions
1184        let session1 = Session::new(&db);
1185        let session2 = Session::new(&db);
1186
1187        // Both sessions hold references to the same database
1188        // This compiles because Session only needs &Database (immutable borrow)
1189        let db_ref1 = session1.database();
1190        let db_ref2 = session2.database();
1191
1192        // Both point to the same database
1193        assert!(std::ptr::eq(db_ref1, db_ref2));
1194
1195        // And both can still execute queries
1196        let stmt = session1.prepare("SELECT COUNT(*) FROM users").unwrap();
1197        let r1 = session1.execute_prepared(&stmt, &[]).unwrap();
1198        let r2 = session2.execute_prepared(&stmt, &[]).unwrap();
1199
1200        assert_eq!(r1.rows().unwrap().len(), 1);
1201        assert_eq!(r2.rows().unwrap().len(), 1);
1202    }
1203
1204    #[test]
1205    fn test_session_execute_uses_immutable_self() {
1206        let db = create_test_db();
1207        let session = Session::new(&db);
1208
1209        // Prepare a statement
1210        let stmt = session.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1211
1212        // execute_prepared takes &self, so we can call it multiple times
1213        // without needing mutable access to session
1214        let _ = session.execute_prepared(&stmt, &[SqlValue::Integer(1)]);
1215        let _ = session.execute_prepared(&stmt, &[SqlValue::Integer(2)]);
1216        let _ = session.execute_prepared(&stmt, &[SqlValue::Integer(3)]);
1217
1218        // We can still access session immutably after all executions
1219        let cache = session.cache();
1220        let _stats = cache.stats();
1221        // Stats accessible - session is still usable after multiple executions
1222    }
1223
1224    #[test]
1225    fn test_concurrent_sessions_with_aggregates() {
1226        let db = create_test_db();
1227
1228        let session1 = Session::new(&db);
1229        let session2 = Session::new(&db);
1230
1231        // Aggregate queries
1232        let count_stmt = session1.prepare("SELECT COUNT(*) FROM users").unwrap();
1233        let sum_stmt = session2.prepare("SELECT COUNT(*) FROM users WHERE id <= ?").unwrap();
1234
1235        let r1 = session1.execute_prepared(&count_stmt, &[]).unwrap();
1236        let r2 = session2.execute_prepared(&sum_stmt, &[SqlValue::Integer(2)]).unwrap();
1237
1238        // Verify aggregate results
1239        assert_eq!(r1.rows().unwrap()[0].values[0], SqlValue::Integer(3));
1240        assert_eq!(r2.rows().unwrap()[0].values[0], SqlValue::Integer(2));
1241    }
1242
1243    #[test]
1244    fn test_concurrent_sessions_with_pk_fast_path() {
1245        let db = create_test_db();
1246
1247        let session1 = Session::new(&db);
1248        let session2 = Session::new(&db);
1249
1250        // PK point lookup queries use the fast path
1251        let stmt = session1.prepare("SELECT * FROM users WHERE id = ?").unwrap();
1252
1253        // Both sessions use the fast path
1254        let r1 = session1.execute_prepared(&stmt, &[SqlValue::Integer(1)]).unwrap();
1255        let r2 = session2.execute_prepared(&stmt, &[SqlValue::Integer(2)]).unwrap();
1256
1257        assert_eq!(
1258            r1.rows().unwrap()[0].values[1],
1259            SqlValue::Varchar(arcstr::ArcStr::from("Alice"))
1260        );
1261        assert_eq!(r2.rows().unwrap()[0].values[1], SqlValue::Varchar(arcstr::ArcStr::from("Bob")));
1262    }
1263}