Skip to main content

featherdb_query/
prepared.rs

1//! Prepared statements with plan caching
2//!
3//! This module provides prepared statement support for FeatherDB, enabling
4//! 5-10x speedup for repeated queries by caching query plans.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use featherdb::Database;
10//!
11//! let db = Database::open("myapp.db").unwrap();
12//!
13//! // Prepare a statement once
14//! let stmt = db.prepare("SELECT * FROM users WHERE id = ?").unwrap();
15//!
16//! // Execute multiple times with different parameters
17//! let rows = db.execute_prepared(&stmt, &[1.into()]).unwrap();
18//! let rows = db.execute_prepared(&stmt, &[2.into()]).unwrap();
19//! ```
20
21use crate::expr::Expr;
22use crate::planner::LogicalPlan;
23use featherdb_core::{ColumnType, Error, Result, Value};
24use std::collections::HashMap;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::RwLock;
27use std::time::Instant;
28
29/// A prepared statement with a cached, optimized query plan
30#[derive(Debug, Clone)]
31pub struct PreparedStatement {
32    /// The original SQL string
33    sql: String,
34    /// The optimized logical plan (with parameter placeholders)
35    plan: LogicalPlan,
36    /// Expected types for parameters (inferred from context)
37    param_types: Vec<Option<ColumnType>>,
38    /// Number of parameters in the query
39    param_count: usize,
40    /// Unique statement ID
41    id: u64,
42    /// Subquery plans associated with this statement (subquery_id -> LogicalPlan).
43    /// Stored alongside the plan so that cache hits return matching subquery plans.
44    subquery_plans: HashMap<usize, LogicalPlan>,
45}
46
47impl PreparedStatement {
48    /// Create a new prepared statement
49    pub fn new(
50        sql: String,
51        plan: LogicalPlan,
52        param_types: Vec<Option<ColumnType>>,
53        param_count: usize,
54        id: u64,
55    ) -> Self {
56        PreparedStatement {
57            sql,
58            plan,
59            param_types,
60            param_count,
61            id,
62            subquery_plans: HashMap::new(),
63        }
64    }
65
66    /// Create a new prepared statement with subquery plans
67    pub fn with_subquery_plans(
68        sql: String,
69        plan: LogicalPlan,
70        param_types: Vec<Option<ColumnType>>,
71        param_count: usize,
72        id: u64,
73        subquery_plans: HashMap<usize, LogicalPlan>,
74    ) -> Self {
75        PreparedStatement {
76            sql,
77            plan,
78            param_types,
79            param_count,
80            id,
81            subquery_plans,
82        }
83    }
84
85    /// Get the SQL string
86    pub fn sql(&self) -> &str {
87        &self.sql
88    }
89
90    /// Get the cached plan
91    pub fn plan(&self) -> &LogicalPlan {
92        &self.plan
93    }
94
95    /// Get the expected parameter types
96    pub fn param_types(&self) -> &[Option<ColumnType>] {
97        &self.param_types
98    }
99
100    /// Get the number of parameters
101    pub fn param_count(&self) -> usize {
102        self.param_count
103    }
104
105    /// Get the statement ID
106    pub fn id(&self) -> u64 {
107        self.id
108    }
109
110    /// Get the subquery plans
111    pub fn subquery_plans(&self) -> &HashMap<usize, LogicalPlan> {
112        &self.subquery_plans
113    }
114
115    /// Validate that provided parameters match expected count
116    pub fn validate_params(&self, params: &[Value]) -> Result<()> {
117        if params.len() != self.param_count {
118            return Err(Error::InvalidQuery {
119                message: format!(
120                    "Expected {} parameters, got {}",
121                    self.param_count,
122                    params.len()
123                ),
124            });
125        }
126
127        // Optionally validate types
128        for (i, (param, expected_type)) in params.iter().zip(self.param_types.iter()).enumerate() {
129            if let Some(expected) = expected_type {
130                if !param.is_null() && !expected.is_compatible(param) {
131                    return Err(Error::TypeError {
132                        expected: format!("{:?} for parameter ${}", expected, i + 1),
133                        actual: format!("{:?}", param.column_type()),
134                    });
135                }
136            }
137        }
138
139        Ok(())
140    }
141}
142
143/// Cache entry with metadata
144#[derive(Debug)]
145struct CacheEntry {
146    /// The prepared statement
147    stmt: PreparedStatement,
148    /// Last access time
149    last_access: Instant,
150    /// Access count for frequency tracking
151    access_count: u64,
152}
153
154/// LRU cache for query plans
155///
156/// Maps SQL strings to optimized LogicalPlans with configurable max size.
157/// Supports cache invalidation on schema changes (DDL).
158#[derive(Debug)]
159pub struct PlanCache {
160    /// The cache mapping SQL to prepared statements
161    cache: RwLock<HashMap<String, CacheEntry>>,
162    /// Maximum number of entries
163    max_size: usize,
164    /// Counter for statement IDs
165    next_id: AtomicU64,
166    /// Cache statistics
167    stats: CacheStats,
168    /// Schema version for invalidation
169    schema_version: AtomicU64,
170}
171
172/// Statistics for the plan cache
173#[derive(Debug, Default)]
174pub struct CacheStats {
175    /// Number of cache hits
176    hits: AtomicU64,
177    /// Number of cache misses
178    misses: AtomicU64,
179    /// Number of evictions
180    evictions: AtomicU64,
181    /// Number of invalidations
182    invalidations: AtomicU64,
183}
184
185impl CacheStats {
186    /// Get the number of cache hits
187    pub fn hits(&self) -> u64 {
188        self.hits.load(Ordering::Relaxed)
189    }
190
191    /// Get the number of cache misses
192    pub fn misses(&self) -> u64 {
193        self.misses.load(Ordering::Relaxed)
194    }
195
196    /// Get the number of evictions
197    pub fn evictions(&self) -> u64 {
198        self.evictions.load(Ordering::Relaxed)
199    }
200
201    /// Get the number of invalidations
202    pub fn invalidations(&self) -> u64 {
203        self.invalidations.load(Ordering::Relaxed)
204    }
205
206    /// Get the cache hit ratio
207    pub fn hit_ratio(&self) -> f64 {
208        let hits = self.hits() as f64;
209        let total = hits + self.misses() as f64;
210        if total == 0.0 {
211            1.0
212        } else {
213            hits / total
214        }
215    }
216}
217
218impl PlanCache {
219    /// Create a new plan cache with the specified maximum size
220    pub fn new(max_size: usize) -> Self {
221        PlanCache {
222            cache: RwLock::new(HashMap::new()),
223            max_size,
224            next_id: AtomicU64::new(1),
225            stats: CacheStats::default(),
226            schema_version: AtomicU64::new(0),
227        }
228    }
229
230    /// Create a plan cache with default size (1000 entries)
231    pub fn default_size() -> Self {
232        Self::new(1000)
233    }
234
235    /// Get a cached plan, if available
236    pub fn get(&self, sql: &str) -> Option<PreparedStatement> {
237        let mut cache = self.cache.write().ok()?;
238
239        if let Some(entry) = cache.get_mut(sql) {
240            entry.last_access = Instant::now();
241            entry.access_count += 1;
242            self.stats.hits.fetch_add(1, Ordering::Relaxed);
243            Some(entry.stmt.clone())
244        } else {
245            self.stats.misses.fetch_add(1, Ordering::Relaxed);
246            None
247        }
248    }
249
250    /// Insert a prepared statement into the cache
251    pub fn insert(&self, sql: String, stmt: PreparedStatement) {
252        let mut cache = match self.cache.write() {
253            Ok(c) => c,
254            Err(_) => return,
255        };
256
257        // Evict if necessary
258        if cache.len() >= self.max_size && !cache.contains_key(&sql) {
259            self.evict_lru(&mut cache);
260        }
261
262        cache.insert(
263            sql,
264            CacheEntry {
265                stmt,
266                last_access: Instant::now(),
267                access_count: 1,
268            },
269        );
270    }
271
272    /// Evict the least recently used entry
273    fn evict_lru(&self, cache: &mut HashMap<String, CacheEntry>) {
274        let oldest = cache
275            .iter()
276            .min_by_key(|(_, entry)| entry.last_access)
277            .map(|(k, _)| k.clone());
278
279        if let Some(key) = oldest {
280            cache.remove(&key);
281            self.stats.evictions.fetch_add(1, Ordering::Relaxed);
282        }
283    }
284
285    /// Invalidate all cached plans (called on schema changes)
286    pub fn invalidate_all(&self) {
287        if let Ok(mut cache) = self.cache.write() {
288            let count = cache.len() as u64;
289            cache.clear();
290            self.stats.invalidations.fetch_add(count, Ordering::Relaxed);
291            self.schema_version.fetch_add(1, Ordering::Release);
292        }
293    }
294
295    /// Invalidate plans for a specific table
296    pub fn invalidate_table(&self, table_name: &str) {
297        if let Ok(mut cache) = self.cache.write() {
298            // Remove entries that reference this table
299            // This is a simple substring check; could be made more precise
300            let table_lower = table_name.to_lowercase();
301            let keys_to_remove: Vec<String> = cache
302                .keys()
303                .filter(|sql| sql.to_lowercase().contains(&table_lower))
304                .cloned()
305                .collect();
306
307            for key in keys_to_remove {
308                cache.remove(&key);
309                self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
310            }
311            self.schema_version.fetch_add(1, Ordering::Release);
312        }
313    }
314
315    /// Get the next statement ID
316    pub fn next_id(&self) -> u64 {
317        self.next_id.fetch_add(1, Ordering::Relaxed)
318    }
319
320    /// Get the current schema version
321    pub fn schema_version(&self) -> u64 {
322        self.schema_version.load(Ordering::Acquire)
323    }
324
325    /// Get cache statistics
326    pub fn stats(&self) -> &CacheStats {
327        &self.stats
328    }
329
330    /// Get the number of cached entries
331    pub fn len(&self) -> usize {
332        self.cache.read().map(|c| c.len()).unwrap_or(0)
333    }
334
335    /// Check if the cache is empty
336    pub fn is_empty(&self) -> bool {
337        self.len() == 0
338    }
339
340    /// Get the maximum cache size
341    pub fn max_size(&self) -> usize {
342        self.max_size
343    }
344}
345
346impl Default for PlanCache {
347    fn default() -> Self {
348        Self::default_size()
349    }
350}
351
352/// Substitute parameters in an expression
353pub fn substitute_params(expr: &Expr, params: &[Value]) -> Result<Expr> {
354    match expr {
355        Expr::Parameter { index } => {
356            if *index >= params.len() {
357                return Err(Error::InvalidQuery {
358                    message: format!(
359                        "Parameter ${} referenced but only {} parameters provided",
360                        index + 1,
361                        params.len()
362                    ),
363                });
364            }
365            Ok(Expr::Literal(params[*index].clone()))
366        }
367
368        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
369            left: Box::new(substitute_params(left, params)?),
370            op: *op,
371            right: Box::new(substitute_params(right, params)?),
372        }),
373
374        Expr::UnaryOp { op, expr } => Ok(Expr::UnaryOp {
375            op: *op,
376            expr: Box::new(substitute_params(expr, params)?),
377        }),
378
379        Expr::Function { name, args } => {
380            let substituted_args: Vec<Expr> = args
381                .iter()
382                .map(|a| substitute_params(a, params))
383                .collect::<Result<_>>()?;
384            Ok(Expr::Function {
385                name: name.clone(),
386                args: substituted_args,
387            })
388        }
389
390        Expr::Aggregate {
391            func,
392            arg,
393            distinct,
394        } => {
395            let substituted_arg = arg
396                .as_ref()
397                .map(|a| substitute_params(a, params).map(Box::new))
398                .transpose()?;
399            Ok(Expr::Aggregate {
400                func: func.clone(),
401                arg: substituted_arg,
402                distinct: *distinct,
403            })
404        }
405
406        Expr::Case {
407            operand,
408            when_clauses,
409            else_result,
410        } => {
411            let substituted_operand = operand
412                .as_ref()
413                .map(|o| substitute_params(o, params).map(Box::new))
414                .transpose()?;
415
416            let substituted_whens: Vec<(Expr, Expr)> = when_clauses
417                .iter()
418                .map(|(w, t)| Ok((substitute_params(w, params)?, substitute_params(t, params)?)))
419                .collect::<Result<_>>()?;
420
421            let substituted_else = else_result
422                .as_ref()
423                .map(|e| substitute_params(e, params).map(Box::new))
424                .transpose()?;
425
426            Ok(Expr::Case {
427                operand: substituted_operand,
428                when_clauses: substituted_whens,
429                else_result: substituted_else,
430            })
431        }
432
433        Expr::Between {
434            expr,
435            low,
436            high,
437            negated,
438        } => Ok(Expr::Between {
439            expr: Box::new(substitute_params(expr, params)?),
440            low: Box::new(substitute_params(low, params)?),
441            high: Box::new(substitute_params(high, params)?),
442            negated: *negated,
443        }),
444
445        Expr::InList {
446            expr,
447            list,
448            negated,
449        } => {
450            let substituted_list: Vec<Expr> = list
451                .iter()
452                .map(|i| substitute_params(i, params))
453                .collect::<Result<_>>()?;
454            Ok(Expr::InList {
455                expr: Box::new(substitute_params(expr, params)?),
456                list: substituted_list,
457                negated: *negated,
458            })
459        }
460
461        Expr::Like {
462            expr,
463            pattern,
464            negated,
465        } => Ok(Expr::Like {
466            expr: Box::new(substitute_params(expr, params)?),
467            pattern: pattern.clone(),
468            negated: *negated,
469        }),
470
471        // Expressions without sub-expressions
472        Expr::Column { .. } | Expr::Literal(_) | Expr::Wildcard | Expr::QualifiedWildcard(_) => {
473            Ok(expr.clone())
474        }
475
476        // Window functions
477        Expr::Window(wf) => {
478            use crate::expr::WindowOrderByExpr;
479
480            let substituted_partition_by = wf
481                .partition_by
482                .iter()
483                .map(|e| substitute_params(e, params))
484                .collect::<Result<Vec<_>>>()?;
485            let substituted_order_by = wf
486                .order_by
487                .iter()
488                .map(|window_order_expr| {
489                    Ok(WindowOrderByExpr {
490                        expr: substitute_params(&window_order_expr.expr, params)?,
491                        order: window_order_expr.order,
492                        nulls_first: window_order_expr.nulls_first,
493                    })
494                })
495                .collect::<Result<Vec<_>>>()?;
496
497            Ok(Expr::Window(crate::expr::WindowFunction {
498                function: wf.function.clone(),
499                partition_by: substituted_partition_by,
500                order_by: substituted_order_by,
501                frame: wf.frame.clone(),
502            }))
503        }
504
505        // Subquery expressions - these reference subquery IDs, not inline subqueries
506        Expr::ScalarSubquery {
507            subquery_id,
508            correlated,
509        } => Ok(Expr::ScalarSubquery {
510            subquery_id: *subquery_id,
511            correlated: *correlated,
512        }),
513
514        Expr::Exists {
515            subquery_id,
516            negated,
517        } => Ok(Expr::Exists {
518            subquery_id: *subquery_id,
519            negated: *negated,
520        }),
521
522        Expr::InSubquery {
523            expr,
524            subquery_id,
525            negated,
526        } => Ok(Expr::InSubquery {
527            expr: Box::new(substitute_params(expr, params)?),
528            subquery_id: *subquery_id,
529            negated: *negated,
530        }),
531
532        Expr::AnySubquery {
533            expr,
534            op,
535            subquery_id,
536        } => Ok(Expr::AnySubquery {
537            expr: Box::new(substitute_params(expr, params)?),
538            op: *op,
539            subquery_id: *subquery_id,
540        }),
541
542        Expr::AllSubquery {
543            expr,
544            op,
545            subquery_id,
546        } => Ok(Expr::AllSubquery {
547            expr: Box::new(substitute_params(expr, params)?),
548            op: *op,
549            subquery_id: *subquery_id,
550        }),
551    }
552}
553
554/// Substitute parameters in a logical plan
555pub fn substitute_plan_params(plan: &LogicalPlan, params: &[Value]) -> Result<LogicalPlan> {
556    match plan {
557        LogicalPlan::Scan {
558            table,
559            alias,
560            projection,
561            filter,
562        } => {
563            let substituted_filter = filter
564                .as_ref()
565                .map(|f| substitute_params(f, params))
566                .transpose()?;
567            Ok(LogicalPlan::Scan {
568                table: table.clone(),
569                alias: alias.clone(),
570                projection: projection.clone(),
571                filter: substituted_filter,
572            })
573        }
574
575        LogicalPlan::IndexScan {
576            table,
577            alias,
578            index,
579            index_column,
580            range,
581            residual_filter,
582            projection,
583        } => {
584            let substituted_filter = residual_filter
585                .as_ref()
586                .map(|f| substitute_params(f, params))
587                .transpose()?;
588            Ok(LogicalPlan::IndexScan {
589                table: table.clone(),
590                alias: alias.clone(),
591                index: index.clone(),
592                index_column: *index_column,
593                range: range.clone(),
594                residual_filter: substituted_filter,
595                projection: projection.clone(),
596            })
597        }
598
599        LogicalPlan::PkSeek {
600            table,
601            alias,
602            key_values,
603            residual_filter,
604            projection,
605        } => {
606            let substituted_keys: Vec<Expr> = key_values
607                .iter()
608                .map(|e| substitute_params(e, params))
609                .collect::<Result<_>>()?;
610            let substituted_filter = residual_filter
611                .as_ref()
612                .map(|f| substitute_params(f, params))
613                .transpose()?;
614            Ok(LogicalPlan::PkSeek {
615                table: table.clone(),
616                alias: alias.clone(),
617                key_values: substituted_keys,
618                residual_filter: substituted_filter,
619                projection: projection.clone(),
620            })
621        }
622
623        LogicalPlan::PkRangeScan {
624            table,
625            alias,
626            range,
627            residual_filter,
628            projection,
629        } => {
630            let substituted_filter = residual_filter
631                .as_ref()
632                .map(|f| substitute_params(f, params))
633                .transpose()?;
634            Ok(LogicalPlan::PkRangeScan {
635                table: table.clone(),
636                alias: alias.clone(),
637                range: range.clone(),
638                residual_filter: substituted_filter,
639                projection: projection.clone(),
640            })
641        }
642
643        LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
644            input: Box::new(substitute_plan_params(input, params)?),
645            predicate: substitute_params(predicate, params)?,
646        }),
647
648        LogicalPlan::Project { input, exprs } => {
649            let substituted_exprs: Vec<(Expr, String)> = exprs
650                .iter()
651                .map(|(e, name)| Ok((substitute_params(e, params)?, name.clone())))
652                .collect::<Result<_>>()?;
653            Ok(LogicalPlan::Project {
654                input: Box::new(substitute_plan_params(input, params)?),
655                exprs: substituted_exprs,
656            })
657        }
658
659        LogicalPlan::Join {
660            left,
661            right,
662            join_type,
663            condition,
664        } => {
665            let substituted_condition = condition
666                .as_ref()
667                .map(|c| substitute_params(c, params))
668                .transpose()?;
669            Ok(LogicalPlan::Join {
670                left: Box::new(substitute_plan_params(left, params)?),
671                right: Box::new(substitute_plan_params(right, params)?),
672                join_type: *join_type,
673                condition: substituted_condition,
674            })
675        }
676
677        LogicalPlan::Aggregate {
678            input,
679            group_by,
680            aggregates,
681        } => {
682            let substituted_group_by: Vec<Expr> = group_by
683                .iter()
684                .map(|e| substitute_params(e, params))
685                .collect::<Result<_>>()?;
686            let substituted_aggregates: Vec<(Expr, String)> = aggregates
687                .iter()
688                .map(|(e, name)| Ok((substitute_params(e, params)?, name.clone())))
689                .collect::<Result<_>>()?;
690            Ok(LogicalPlan::Aggregate {
691                input: Box::new(substitute_plan_params(input, params)?),
692                group_by: substituted_group_by,
693                aggregates: substituted_aggregates,
694            })
695        }
696
697        LogicalPlan::Sort { input, order_by } => {
698            let substituted_order_by: Vec<_> = order_by
699                .iter()
700                .map(|(e, order)| Ok((substitute_params(e, params)?, *order)))
701                .collect::<Result<_>>()?;
702            Ok(LogicalPlan::Sort {
703                input: Box::new(substitute_plan_params(input, params)?),
704                order_by: substituted_order_by,
705            })
706        }
707
708        LogicalPlan::Limit {
709            input,
710            limit,
711            offset,
712        } => Ok(LogicalPlan::Limit {
713            input: Box::new(substitute_plan_params(input, params)?),
714            limit: *limit,
715            offset: *offset,
716        }),
717
718        LogicalPlan::Distinct { input } => Ok(LogicalPlan::Distinct {
719            input: Box::new(substitute_plan_params(input, params)?),
720        }),
721
722        LogicalPlan::Insert {
723            table,
724            columns,
725            values,
726        } => {
727            let substituted_values: Vec<Vec<Expr>> = values
728                .iter()
729                .map(|row| {
730                    row.iter()
731                        .map(|e| substitute_params(e, params))
732                        .collect::<Result<_>>()
733                })
734                .collect::<Result<_>>()?;
735            Ok(LogicalPlan::Insert {
736                table: table.clone(),
737                columns: columns.clone(),
738                values: substituted_values,
739            })
740        }
741
742        LogicalPlan::Update {
743            table,
744            assignments,
745            filter,
746        } => {
747            let substituted_assignments: Vec<(String, Expr)> = assignments
748                .iter()
749                .map(|(name, e)| Ok((name.clone(), substitute_params(e, params)?)))
750                .collect::<Result<_>>()?;
751            let substituted_filter = filter
752                .as_ref()
753                .map(|f| substitute_params(f, params))
754                .transpose()?;
755            Ok(LogicalPlan::Update {
756                table: table.clone(),
757                assignments: substituted_assignments,
758                filter: substituted_filter,
759            })
760        }
761
762        LogicalPlan::Delete { table, filter } => {
763            let substituted_filter = filter
764                .as_ref()
765                .map(|f| substitute_params(f, params))
766                .transpose()?;
767            Ok(LogicalPlan::Delete {
768                table: table.clone(),
769                filter: substituted_filter,
770            })
771        }
772
773        // DDL plans don't have parameters
774        LogicalPlan::CreateTable { .. }
775        | LogicalPlan::DropTable { .. }
776        | LogicalPlan::AlterTable { .. }
777        | LogicalPlan::EmptyRelation => Ok(plan.clone()),
778
779        LogicalPlan::Explain {
780            input,
781            verbose,
782            analyze,
783        } => {
784            let substituted_input = substitute_plan_params(input, params)?;
785            Ok(LogicalPlan::Explain {
786                input: Box::new(substituted_input),
787                verbose: *verbose,
788                analyze: *analyze,
789            })
790        }
791
792        LogicalPlan::Window {
793            input,
794            window_exprs,
795        } => {
796            let substituted_input = substitute_plan_params(input, params)?;
797            let substituted_exprs: Vec<(Expr, String)> = window_exprs
798                .iter()
799                .map(|(e, name)| Ok((substitute_params(e, params)?, name.clone())))
800                .collect::<Result<_>>()?;
801            Ok(LogicalPlan::Window {
802                input: Box::new(substituted_input),
803                window_exprs: substituted_exprs,
804            })
805        }
806
807        LogicalPlan::WithCte { ctes, query } => {
808            // Substitute params in CTE definitions and main query
809            let substituted_ctes = ctes
810                .iter()
811                .map(|cte| {
812                    Ok(crate::planner::CommonTableExpression {
813                        name: cte.name.clone(),
814                        columns: cte.columns.clone(),
815                        query: Box::new(substitute_plan_params(&cte.query, params)?),
816                        recursive: cte.recursive,
817                    })
818                })
819                .collect::<Result<_>>()?;
820            Ok(LogicalPlan::WithCte {
821                ctes: substituted_ctes,
822                query: Box::new(substitute_plan_params(query, params)?),
823            })
824        }
825
826        LogicalPlan::CteScan {
827            cte_name,
828            alias,
829            columns,
830        } => Ok(LogicalPlan::CteScan {
831            cte_name: cte_name.clone(),
832            alias: alias.clone(),
833            columns: columns.clone(),
834        }),
835
836        LogicalPlan::SubqueryScan {
837            subquery,
838            subquery_id,
839            alias,
840        } => Ok(LogicalPlan::SubqueryScan {
841            subquery: Box::new(substitute_plan_params(subquery, params)?),
842            subquery_id: *subquery_id,
843            alias: alias.clone(),
844        }),
845
846        LogicalPlan::SemiJoin {
847            left,
848            right,
849            condition,
850            positive,
851        } => {
852            let substituted_condition = condition
853                .as_ref()
854                .map(|c| substitute_params(c, params))
855                .transpose()?;
856            Ok(LogicalPlan::SemiJoin {
857                left: Box::new(substitute_plan_params(left, params)?),
858                right: Box::new(substitute_plan_params(right, params)?),
859                condition: substituted_condition,
860                positive: *positive,
861            })
862        }
863
864        LogicalPlan::AntiJoin {
865            left,
866            right,
867            condition,
868        } => {
869            let substituted_condition = condition
870                .as_ref()
871                .map(|c| substitute_params(c, params))
872                .transpose()?;
873            Ok(LogicalPlan::AntiJoin {
874                left: Box::new(substitute_plan_params(left, params)?),
875                right: Box::new(substitute_plan_params(right, params)?),
876                condition: substituted_condition,
877            })
878        }
879
880        // Permission statements don't have parameters
881        LogicalPlan::Grant {
882            privileges,
883            table,
884            grantee,
885        } => Ok(LogicalPlan::Grant {
886            privileges: *privileges,
887            table: table.clone(),
888            grantee: grantee.clone(),
889        }),
890
891        LogicalPlan::Revoke {
892            privileges,
893            table,
894            grantee,
895        } => Ok(LogicalPlan::Revoke {
896            privileges: *privileges,
897            table: table.clone(),
898            grantee: grantee.clone(),
899        }),
900
901        LogicalPlan::ShowGrants { target } => Ok(LogicalPlan::ShowGrants {
902            target: target.clone(),
903        }),
904
905        LogicalPlan::Union { left, right, all } => Ok(LogicalPlan::Union {
906            left: Box::new(substitute_plan_params(left, params)?),
907            right: Box::new(substitute_plan_params(right, params)?),
908            all: *all,
909        }),
910
911        LogicalPlan::Intersect { left, right, all } => Ok(LogicalPlan::Intersect {
912            left: Box::new(substitute_plan_params(left, params)?),
913            right: Box::new(substitute_plan_params(right, params)?),
914            all: *all,
915        }),
916
917        LogicalPlan::Except { left, right, all } => Ok(LogicalPlan::Except {
918            left: Box::new(substitute_plan_params(left, params)?),
919            right: Box::new(substitute_plan_params(right, params)?),
920            all: *all,
921        }),
922
923        LogicalPlan::CreateView {
924            name,
925            query,
926            query_sql,
927            columns,
928        } => Ok(LogicalPlan::CreateView {
929            name: name.clone(),
930            query: Box::new(substitute_plan_params(query, params)?),
931            query_sql: query_sql.clone(),
932            columns: columns.clone(),
933        }),
934
935        LogicalPlan::DropView { name, if_exists } => Ok(LogicalPlan::DropView {
936            name: name.clone(),
937            if_exists: *if_exists,
938        }),
939
940        LogicalPlan::CreateIndex {
941            index_name,
942            table_name,
943            columns,
944            unique,
945        } => Ok(LogicalPlan::CreateIndex {
946            index_name: index_name.clone(),
947            table_name: table_name.clone(),
948            columns: columns.clone(),
949            unique: *unique,
950        }),
951    }
952}
953
954/// Count parameters in an expression
955pub fn count_params(expr: &Expr) -> usize {
956    match expr {
957        Expr::Parameter { index } => *index + 1,
958
959        Expr::BinaryOp { left, right, .. } => count_params(left).max(count_params(right)),
960
961        Expr::UnaryOp { expr, .. } => count_params(expr),
962
963        Expr::Function { args, .. } => args.iter().map(count_params).max().unwrap_or(0),
964
965        Expr::Aggregate { arg, .. } => arg.as_ref().map(|a| count_params(a)).unwrap_or(0),
966
967        Expr::Case {
968            operand,
969            when_clauses,
970            else_result,
971        } => {
972            let operand_count = operand.as_ref().map(|o| count_params(o)).unwrap_or(0);
973            let when_count = when_clauses
974                .iter()
975                .map(|(w, t)| count_params(w).max(count_params(t)))
976                .max()
977                .unwrap_or(0);
978            let else_count = else_result.as_ref().map(|e| count_params(e)).unwrap_or(0);
979            operand_count.max(when_count).max(else_count)
980        }
981
982        Expr::Between {
983            expr, low, high, ..
984        } => count_params(expr)
985            .max(count_params(low))
986            .max(count_params(high)),
987
988        Expr::InList { expr, list, .. } => {
989            let list_max = list.iter().map(count_params).max().unwrap_or(0);
990            count_params(expr).max(list_max)
991        }
992
993        Expr::Like { expr, .. } => count_params(expr),
994
995        // Expressions without sub-expressions that contain parameters
996        Expr::Column { .. } | Expr::Literal(_) | Expr::Wildcard | Expr::QualifiedWildcard(_) => 0,
997
998        // Window function - count params in the window function expression
999        Expr::Window(wf) => {
1000            // Count parameters in partition_by and order_by expressions
1001            let partition_max = wf.partition_by.iter().map(count_params).max().unwrap_or(0);
1002            let order_max = wf
1003                .order_by
1004                .iter()
1005                .map(|window_order_expr| count_params(&window_order_expr.expr))
1006                .max()
1007                .unwrap_or(0);
1008            partition_max.max(order_max)
1009        }
1010
1011        // Subquery expressions - subqueries don't contain parameters in the expr itself
1012        // The actual subquery plan is handled separately
1013        Expr::ScalarSubquery { .. } | Expr::Exists { .. } => 0,
1014
1015        Expr::InSubquery { expr, .. }
1016        | Expr::AnySubquery { expr, .. }
1017        | Expr::AllSubquery { expr, .. } => count_params(expr),
1018    }
1019}
1020
1021/// Count parameters in a logical plan
1022pub fn count_plan_params(plan: &LogicalPlan) -> usize {
1023    match plan {
1024        LogicalPlan::Scan { filter, .. } => filter.as_ref().map(count_params).unwrap_or(0),
1025
1026        LogicalPlan::IndexScan {
1027            residual_filter, ..
1028        } => residual_filter.as_ref().map(count_params).unwrap_or(0),
1029
1030        LogicalPlan::PkSeek {
1031            key_values,
1032            residual_filter,
1033            ..
1034        } => {
1035            let key_max = key_values.iter().map(count_params).max().unwrap_or(0);
1036            let filter_max = residual_filter.as_ref().map(count_params).unwrap_or(0);
1037            key_max.max(filter_max)
1038        }
1039
1040        LogicalPlan::PkRangeScan {
1041            residual_filter, ..
1042        } => residual_filter.as_ref().map(count_params).unwrap_or(0),
1043
1044        LogicalPlan::Filter { input, predicate } => {
1045            count_plan_params(input).max(count_params(predicate))
1046        }
1047
1048        LogicalPlan::Project { input, exprs } => {
1049            let expr_max = exprs
1050                .iter()
1051                .map(|(e, _)| count_params(e))
1052                .max()
1053                .unwrap_or(0);
1054            count_plan_params(input).max(expr_max)
1055        }
1056
1057        LogicalPlan::Join {
1058            left,
1059            right,
1060            condition,
1061            ..
1062        } => {
1063            let cond_count = condition.as_ref().map(count_params).unwrap_or(0);
1064            count_plan_params(left)
1065                .max(count_plan_params(right))
1066                .max(cond_count)
1067        }
1068
1069        LogicalPlan::Aggregate {
1070            input,
1071            group_by,
1072            aggregates,
1073        } => {
1074            let group_max = group_by.iter().map(count_params).max().unwrap_or(0);
1075            let agg_max = aggregates
1076                .iter()
1077                .map(|(e, _)| count_params(e))
1078                .max()
1079                .unwrap_or(0);
1080            count_plan_params(input).max(group_max).max(agg_max)
1081        }
1082
1083        LogicalPlan::Sort { input, order_by } => {
1084            let order_max = order_by
1085                .iter()
1086                .map(|(e, _)| count_params(e))
1087                .max()
1088                .unwrap_or(0);
1089            count_plan_params(input).max(order_max)
1090        }
1091
1092        LogicalPlan::Limit { input, .. } | LogicalPlan::Distinct { input } => {
1093            count_plan_params(input)
1094        }
1095
1096        LogicalPlan::Insert { values, .. } => values
1097            .iter()
1098            .flat_map(|row| row.iter())
1099            .map(count_params)
1100            .max()
1101            .unwrap_or(0),
1102
1103        LogicalPlan::Update {
1104            assignments,
1105            filter,
1106            ..
1107        } => {
1108            let assign_max = assignments
1109                .iter()
1110                .map(|(_, e)| count_params(e))
1111                .max()
1112                .unwrap_or(0);
1113            let filter_max = filter.as_ref().map(count_params).unwrap_or(0);
1114            assign_max.max(filter_max)
1115        }
1116
1117        LogicalPlan::Delete { filter, .. } => filter.as_ref().map(count_params).unwrap_or(0),
1118
1119        LogicalPlan::CreateTable { .. }
1120        | LogicalPlan::DropTable { .. }
1121        | LogicalPlan::AlterTable { .. }
1122        | LogicalPlan::EmptyRelation
1123        | LogicalPlan::CteScan { .. } => 0,
1124
1125        LogicalPlan::Explain { input, .. } => count_plan_params(input),
1126
1127        LogicalPlan::Window {
1128            input,
1129            window_exprs,
1130        } => {
1131            let expr_max = window_exprs
1132                .iter()
1133                .map(|(e, _)| count_params(e))
1134                .max()
1135                .unwrap_or(0);
1136            count_plan_params(input).max(expr_max)
1137        }
1138
1139        LogicalPlan::WithCte { ctes, query } => {
1140            let cte_max = ctes
1141                .iter()
1142                .map(|cte| count_plan_params(&cte.query))
1143                .max()
1144                .unwrap_or(0);
1145            count_plan_params(query).max(cte_max)
1146        }
1147
1148        LogicalPlan::SubqueryScan { subquery, .. } => count_plan_params(subquery),
1149
1150        LogicalPlan::SemiJoin {
1151            left,
1152            right,
1153            condition,
1154            ..
1155        } => {
1156            let cond_count = condition.as_ref().map(count_params).unwrap_or(0);
1157            count_plan_params(left)
1158                .max(count_plan_params(right))
1159                .max(cond_count)
1160        }
1161
1162        LogicalPlan::AntiJoin {
1163            left,
1164            right,
1165            condition,
1166        } => {
1167            let cond_count = condition.as_ref().map(count_params).unwrap_or(0);
1168            count_plan_params(left)
1169                .max(count_plan_params(right))
1170                .max(cond_count)
1171        }
1172
1173        // Permission statements don't have parameters
1174        LogicalPlan::Grant { .. } | LogicalPlan::Revoke { .. } | LogicalPlan::ShowGrants { .. } => {
1175            0
1176        }
1177
1178        LogicalPlan::Union { left, right, .. }
1179        | LogicalPlan::Intersect { left, right, .. }
1180        | LogicalPlan::Except { left, right, .. } => {
1181            count_plan_params(left).max(count_plan_params(right))
1182        }
1183
1184        LogicalPlan::CreateView { query, .. } => count_plan_params(query),
1185
1186        LogicalPlan::DropView { .. } => 0,
1187        LogicalPlan::CreateIndex { .. } => 0,
1188    }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193    use super::*;
1194    use crate::expr::{AggregateFunction, BinaryOp, UnaryOp};
1195
1196    #[test]
1197    fn test_plan_cache_basic() {
1198        let cache = PlanCache::new(10);
1199
1200        let stmt = PreparedStatement::new(
1201            "SELECT 1".to_string(),
1202            LogicalPlan::EmptyRelation,
1203            vec![],
1204            0,
1205            1,
1206        );
1207
1208        cache.insert("SELECT 1".to_string(), stmt.clone());
1209
1210        let retrieved = cache.get("SELECT 1");
1211        assert!(retrieved.is_some());
1212        assert_eq!(retrieved.unwrap().sql(), "SELECT 1");
1213    }
1214
1215    #[test]
1216    fn test_plan_cache_eviction() {
1217        let cache = PlanCache::new(2);
1218
1219        for i in 0..3 {
1220            let sql = format!("SELECT {}", i);
1221            let stmt = PreparedStatement::new(
1222                sql.clone(),
1223                LogicalPlan::EmptyRelation,
1224                vec![],
1225                0,
1226                i as u64,
1227            );
1228            cache.insert(sql, stmt);
1229        }
1230
1231        // Should have evicted the oldest entry
1232        assert_eq!(cache.len(), 2);
1233        assert_eq!(cache.stats().evictions(), 1);
1234    }
1235
1236    #[test]
1237    fn test_plan_cache_invalidation() {
1238        let cache = PlanCache::new(10);
1239
1240        cache.insert(
1241            "SELECT * FROM users".to_string(),
1242            PreparedStatement::new(
1243                "SELECT * FROM users".to_string(),
1244                LogicalPlan::EmptyRelation,
1245                vec![],
1246                0,
1247                1,
1248            ),
1249        );
1250
1251        cache.insert(
1252            "SELECT * FROM posts".to_string(),
1253            PreparedStatement::new(
1254                "SELECT * FROM posts".to_string(),
1255                LogicalPlan::EmptyRelation,
1256                vec![],
1257                0,
1258                2,
1259            ),
1260        );
1261
1262        assert_eq!(cache.len(), 2);
1263
1264        // Invalidate plans for users table
1265        cache.invalidate_table("users");
1266
1267        assert_eq!(cache.len(), 1);
1268        assert!(cache.get("SELECT * FROM users").is_none());
1269        assert!(cache.get("SELECT * FROM posts").is_some());
1270    }
1271
1272    #[test]
1273    fn test_substitute_params() {
1274        let expr = Expr::BinaryOp {
1275            left: Box::new(Expr::Column {
1276                table: None,
1277                name: "id".to_string(),
1278                index: None,
1279            }),
1280            op: BinaryOp::Eq,
1281            right: Box::new(Expr::Parameter { index: 0 }),
1282        };
1283
1284        let params = vec![Value::Integer(42)];
1285        let substituted = substitute_params(&expr, &params).unwrap();
1286
1287        if let Expr::BinaryOp { right, .. } = substituted {
1288            assert!(matches!(*right, Expr::Literal(Value::Integer(42))));
1289        } else {
1290            panic!("Expected BinaryOp");
1291        }
1292    }
1293
1294    #[test]
1295    fn test_count_params() {
1296        let expr = Expr::BinaryOp {
1297            left: Box::new(Expr::Parameter { index: 0 }),
1298            op: BinaryOp::And,
1299            right: Box::new(Expr::BinaryOp {
1300                left: Box::new(Expr::Column {
1301                    table: None,
1302                    name: "x".to_string(),
1303                    index: None,
1304                }),
1305                op: BinaryOp::Eq,
1306                right: Box::new(Expr::Parameter { index: 1 }),
1307            }),
1308        };
1309
1310        assert_eq!(count_params(&expr), 2);
1311    }
1312
1313    #[test]
1314    fn test_validate_params() {
1315        let stmt = PreparedStatement::new(
1316            "SELECT * FROM users WHERE id = ?".to_string(),
1317            LogicalPlan::EmptyRelation,
1318            vec![Some(ColumnType::Integer)],
1319            1,
1320            1,
1321        );
1322
1323        // Valid params
1324        assert!(stmt.validate_params(&[Value::Integer(1)]).is_ok());
1325
1326        // Wrong count
1327        assert!(stmt.validate_params(&[]).is_err());
1328        assert!(stmt
1329            .validate_params(&[Value::Integer(1), Value::Integer(2)])
1330            .is_err());
1331
1332        // Wrong type (if type checking is enabled)
1333        assert!(stmt
1334            .validate_params(&[Value::Text("hello".to_string())])
1335            .is_err());
1336    }
1337
1338    // ========== substitute_params for all expr types ==========
1339
1340    #[test]
1341    fn test_substitute_unary_op() {
1342        // NOT $1
1343        let expr = Expr::UnaryOp {
1344            op: UnaryOp::Not,
1345            expr: Box::new(Expr::Parameter { index: 0 }),
1346        };
1347
1348        let params = vec![Value::Boolean(true)];
1349        let substituted = substitute_params(&expr, &params).unwrap();
1350
1351        match substituted {
1352            Expr::UnaryOp {
1353                op: UnaryOp::Not,
1354                expr,
1355            } => {
1356                assert!(matches!(*expr, Expr::Literal(Value::Boolean(true))));
1357            }
1358            _ => panic!("Expected UnaryOp"),
1359        }
1360    }
1361
1362    #[test]
1363    fn test_substitute_function_params() {
1364        // UPPER($1)
1365        let expr = Expr::Function {
1366            name: "UPPER".to_string(),
1367            args: vec![Expr::Parameter { index: 0 }],
1368        };
1369
1370        let params = vec![Value::Text("hello".to_string())];
1371        let substituted = substitute_params(&expr, &params).unwrap();
1372
1373        match substituted {
1374            Expr::Function { name, args } => {
1375                assert_eq!(name, "UPPER");
1376                assert!(matches!(args[0], Expr::Literal(Value::Text(_))));
1377            }
1378            _ => panic!("Expected Function"),
1379        }
1380    }
1381
1382    #[test]
1383    fn test_substitute_aggregate_params() {
1384        // SUM($1)
1385        let expr = Expr::Aggregate {
1386            func: AggregateFunction::Sum,
1387            arg: Some(Box::new(Expr::Parameter { index: 0 })),
1388            distinct: false,
1389        };
1390
1391        let params = vec![Value::Integer(100)];
1392        let substituted = substitute_params(&expr, &params).unwrap();
1393
1394        match substituted {
1395            Expr::Aggregate {
1396                func,
1397                arg,
1398                distinct,
1399            } => {
1400                assert_eq!(func, AggregateFunction::Sum);
1401                assert!(!distinct);
1402                assert!(matches!(*arg.unwrap(), Expr::Literal(Value::Integer(100))));
1403            }
1404            _ => panic!("Expected Aggregate"),
1405        }
1406    }
1407
1408    #[test]
1409    fn test_substitute_case_expr() {
1410        // CASE WHEN $1 THEN $2 ELSE $3 END
1411        let expr = Expr::Case {
1412            operand: None,
1413            when_clauses: vec![(Expr::Parameter { index: 0 }, Expr::Parameter { index: 1 })],
1414            else_result: Some(Box::new(Expr::Parameter { index: 2 })),
1415        };
1416
1417        let params = vec![Value::Boolean(true), Value::Integer(1), Value::Integer(0)];
1418        let substituted = substitute_params(&expr, &params).unwrap();
1419
1420        match substituted {
1421            Expr::Case {
1422                when_clauses,
1423                else_result,
1424                ..
1425            } => {
1426                assert!(matches!(
1427                    when_clauses[0].0,
1428                    Expr::Literal(Value::Boolean(true))
1429                ));
1430                assert!(matches!(
1431                    when_clauses[0].1,
1432                    Expr::Literal(Value::Integer(1))
1433                ));
1434                assert!(matches!(
1435                    *else_result.unwrap(),
1436                    Expr::Literal(Value::Integer(0))
1437                ));
1438            }
1439            _ => panic!("Expected Case"),
1440        }
1441    }
1442
1443    #[test]
1444    fn test_substitute_case_with_operand() {
1445        // CASE $1 WHEN 'active' THEN 1 END
1446        let expr = Expr::Case {
1447            operand: Some(Box::new(Expr::Parameter { index: 0 })),
1448            when_clauses: vec![(
1449                Expr::Literal(Value::Text("active".to_string())),
1450                Expr::Literal(Value::Integer(1)),
1451            )],
1452            else_result: None,
1453        };
1454
1455        let params = vec![Value::Text("active".to_string())];
1456        let substituted = substitute_params(&expr, &params).unwrap();
1457
1458        match substituted {
1459            Expr::Case { operand, .. } => {
1460                assert!(matches!(*operand.unwrap(), Expr::Literal(Value::Text(_))));
1461            }
1462            _ => panic!("Expected Case"),
1463        }
1464    }
1465
1466    #[test]
1467    fn test_substitute_between_expr() {
1468        // x BETWEEN $1 AND $2
1469        let expr = Expr::Between {
1470            expr: Box::new(Expr::Column {
1471                table: None,
1472                name: "x".to_string(),
1473                index: None,
1474            }),
1475            low: Box::new(Expr::Parameter { index: 0 }),
1476            high: Box::new(Expr::Parameter { index: 1 }),
1477            negated: false,
1478        };
1479
1480        let params = vec![Value::Integer(1), Value::Integer(10)];
1481        let substituted = substitute_params(&expr, &params).unwrap();
1482
1483        match substituted {
1484            Expr::Between { low, high, .. } => {
1485                assert!(matches!(*low, Expr::Literal(Value::Integer(1))));
1486                assert!(matches!(*high, Expr::Literal(Value::Integer(10))));
1487            }
1488            _ => panic!("Expected Between"),
1489        }
1490    }
1491
1492    #[test]
1493    fn test_substitute_in_list() {
1494        // x IN ($1, $2, $3)
1495        let expr = Expr::InList {
1496            expr: Box::new(Expr::Column {
1497                table: None,
1498                name: "x".to_string(),
1499                index: None,
1500            }),
1501            list: vec![
1502                Expr::Parameter { index: 0 },
1503                Expr::Parameter { index: 1 },
1504                Expr::Parameter { index: 2 },
1505            ],
1506            negated: false,
1507        };
1508
1509        let params = vec![Value::Integer(1), Value::Integer(2), Value::Integer(3)];
1510        let substituted = substitute_params(&expr, &params).unwrap();
1511
1512        match substituted {
1513            Expr::InList { list, .. } => {
1514                assert!(matches!(list[0], Expr::Literal(Value::Integer(1))));
1515                assert!(matches!(list[1], Expr::Literal(Value::Integer(2))));
1516                assert!(matches!(list[2], Expr::Literal(Value::Integer(3))));
1517            }
1518            _ => panic!("Expected InList"),
1519        }
1520    }
1521
1522    #[test]
1523    fn test_substitute_like_expr() {
1524        // $1 LIKE 'pattern'
1525        let expr = Expr::Like {
1526            expr: Box::new(Expr::Parameter { index: 0 }),
1527            pattern: "%test%".to_string(),
1528            negated: false,
1529        };
1530
1531        let params = vec![Value::Text("this is a test".to_string())];
1532        let substituted = substitute_params(&expr, &params).unwrap();
1533
1534        match substituted {
1535            Expr::Like {
1536                expr,
1537                pattern,
1538                negated,
1539            } => {
1540                assert!(matches!(*expr, Expr::Literal(Value::Text(_))));
1541                assert_eq!(pattern, "%test%");
1542                assert!(!negated);
1543            }
1544            _ => panic!("Expected Like"),
1545        }
1546    }
1547
1548    #[test]
1549    fn test_substitute_nested_expressions() {
1550        // ($1 + $2) * ($3 - $4)
1551        let expr = Expr::BinaryOp {
1552            left: Box::new(Expr::BinaryOp {
1553                left: Box::new(Expr::Parameter { index: 0 }),
1554                op: BinaryOp::Add,
1555                right: Box::new(Expr::Parameter { index: 1 }),
1556            }),
1557            op: BinaryOp::Mul,
1558            right: Box::new(Expr::BinaryOp {
1559                left: Box::new(Expr::Parameter { index: 2 }),
1560                op: BinaryOp::Sub,
1561                right: Box::new(Expr::Parameter { index: 3 }),
1562            }),
1563        };
1564
1565        let params = vec![
1566            Value::Integer(1),
1567            Value::Integer(2),
1568            Value::Integer(10),
1569            Value::Integer(3),
1570        ];
1571        let substituted = substitute_params(&expr, &params).unwrap();
1572
1573        // Verify structure is preserved
1574        match substituted {
1575            Expr::BinaryOp {
1576                op: BinaryOp::Mul, ..
1577            } => {}
1578            _ => panic!("Expected BinaryOp with Mul"),
1579        }
1580    }
1581
1582    #[test]
1583    fn test_substitute_params_missing_param() {
1584        let expr = Expr::Parameter { index: 5 };
1585        let params = vec![Value::Integer(1)]; // Only 1 param, but index 5 requested
1586
1587        let result = substitute_params(&expr, &params);
1588        assert!(result.is_err());
1589    }
1590
1591    #[test]
1592    fn test_substitute_column_literal_unchanged() {
1593        let expr = Expr::Column {
1594            table: None,
1595            name: "x".to_string(),
1596            index: None,
1597        };
1598        let params = vec![];
1599        let substituted = substitute_params(&expr, &params).unwrap();
1600        assert!(matches!(substituted, Expr::Column { .. }));
1601
1602        let expr = Expr::Literal(Value::Integer(42));
1603        let substituted = substitute_params(&expr, &params).unwrap();
1604        assert!(matches!(substituted, Expr::Literal(Value::Integer(42))));
1605    }
1606
1607    // ========== substitute_plan_params tests ==========
1608
1609    #[test]
1610    fn test_substitute_plan_filter() {
1611        let plan = LogicalPlan::Filter {
1612            input: Box::new(LogicalPlan::EmptyRelation),
1613            predicate: Expr::BinaryOp {
1614                left: Box::new(Expr::Column {
1615                    table: None,
1616                    name: "x".to_string(),
1617                    index: None,
1618                }),
1619                op: BinaryOp::Eq,
1620                right: Box::new(Expr::Parameter { index: 0 }),
1621            },
1622        };
1623
1624        let params = vec![Value::Integer(42)];
1625        let substituted = substitute_plan_params(&plan, &params).unwrap();
1626
1627        match substituted {
1628            LogicalPlan::Filter { predicate, .. } => match predicate {
1629                Expr::BinaryOp { right, .. } => {
1630                    assert!(matches!(*right, Expr::Literal(Value::Integer(42))));
1631                }
1632                _ => panic!("Expected BinaryOp predicate"),
1633            },
1634            _ => panic!("Expected Filter"),
1635        }
1636    }
1637
1638    // Note: LogicalPlan::Scan requires Arc<Table> which is complex to construct in tests.
1639    // We test the substitution logic through Filter which wraps simpler plans.
1640
1641    #[test]
1642    fn test_substitute_plan_project() {
1643        let plan = LogicalPlan::Project {
1644            input: Box::new(LogicalPlan::EmptyRelation),
1645            exprs: vec![(Expr::Parameter { index: 0 }, "col1".to_string())],
1646        };
1647
1648        let params = vec![Value::Integer(100)];
1649        let substituted = substitute_plan_params(&plan, &params).unwrap();
1650
1651        match substituted {
1652            LogicalPlan::Project { exprs, .. } => {
1653                assert!(matches!(exprs[0].0, Expr::Literal(Value::Integer(100))));
1654            }
1655            _ => panic!("Expected Project"),
1656        }
1657    }
1658
1659    #[test]
1660    fn test_substitute_plan_join() {
1661        let plan = LogicalPlan::Join {
1662            left: Box::new(LogicalPlan::EmptyRelation),
1663            right: Box::new(LogicalPlan::EmptyRelation),
1664            join_type: crate::planner::JoinType::Inner,
1665            condition: Some(Expr::BinaryOp {
1666                left: Box::new(Expr::Column {
1667                    table: None,
1668                    name: "a".to_string(),
1669                    index: None,
1670                }),
1671                op: BinaryOp::Eq,
1672                right: Box::new(Expr::Parameter { index: 0 }),
1673            }),
1674        };
1675
1676        let params = vec![Value::Integer(1)];
1677        let substituted = substitute_plan_params(&plan, &params).unwrap();
1678
1679        match substituted {
1680            LogicalPlan::Join {
1681                condition: Some(c), ..
1682            } => match c {
1683                Expr::BinaryOp { right, .. } => {
1684                    assert!(matches!(*right, Expr::Literal(Value::Integer(1))));
1685                }
1686                _ => panic!("Expected BinaryOp"),
1687            },
1688            _ => panic!("Expected Join"),
1689        }
1690    }
1691
1692    // Note: LogicalPlan::Insert/Update/Delete require Arc<Table> which is complex to construct.
1693    // These operations are tested through integration tests instead.
1694
1695    #[test]
1696    fn test_substitute_plan_aggregate() {
1697        let plan = LogicalPlan::Aggregate {
1698            input: Box::new(LogicalPlan::EmptyRelation),
1699            group_by: vec![Expr::Parameter { index: 0 }],
1700            aggregates: vec![(Expr::Parameter { index: 1 }, "agg".to_string())],
1701        };
1702
1703        let params = vec![Value::Integer(1), Value::Integer(2)];
1704        let substituted = substitute_plan_params(&plan, &params).unwrap();
1705
1706        match substituted {
1707            LogicalPlan::Aggregate {
1708                group_by,
1709                aggregates,
1710                ..
1711            } => {
1712                assert!(matches!(group_by[0], Expr::Literal(Value::Integer(1))));
1713                assert!(matches!(aggregates[0].0, Expr::Literal(Value::Integer(2))));
1714            }
1715            _ => panic!("Expected Aggregate"),
1716        }
1717    }
1718
1719    #[test]
1720    fn test_substitute_plan_sort() {
1721        use crate::planner::SortOrder;
1722
1723        let plan = LogicalPlan::Sort {
1724            input: Box::new(LogicalPlan::EmptyRelation),
1725            order_by: vec![(Expr::Parameter { index: 0 }, SortOrder::Asc)],
1726        };
1727
1728        let params = vec![Value::Integer(1)];
1729        let substituted = substitute_plan_params(&plan, &params).unwrap();
1730
1731        match substituted {
1732            LogicalPlan::Sort { order_by, .. } => {
1733                assert!(matches!(order_by[0].0, Expr::Literal(Value::Integer(1))));
1734                assert_eq!(order_by[0].1, SortOrder::Asc);
1735            }
1736            _ => panic!("Expected Sort"),
1737        }
1738    }
1739
1740    #[test]
1741    fn test_substitute_plan_limit_distinct() {
1742        let plan = LogicalPlan::Limit {
1743            input: Box::new(LogicalPlan::Distinct {
1744                input: Box::new(LogicalPlan::EmptyRelation),
1745            }),
1746            limit: Some(10),
1747            offset: 5,
1748        };
1749
1750        let params = vec![];
1751        let substituted = substitute_plan_params(&plan, &params).unwrap();
1752
1753        match substituted {
1754            LogicalPlan::Limit {
1755                input,
1756                limit,
1757                offset,
1758            } => {
1759                assert!(matches!(*input, LogicalPlan::Distinct { .. }));
1760                assert_eq!(limit, Some(10));
1761                assert_eq!(offset, 5);
1762            }
1763            _ => panic!("Expected Limit"),
1764        }
1765    }
1766
1767    // ========== count_plan_params tests ==========
1768
1769    #[test]
1770    fn test_count_params_in_plan_filter() {
1771        let plan = LogicalPlan::Filter {
1772            input: Box::new(LogicalPlan::EmptyRelation),
1773            predicate: Expr::BinaryOp {
1774                left: Box::new(Expr::Parameter { index: 0 }),
1775                op: BinaryOp::And,
1776                right: Box::new(Expr::Parameter { index: 2 }), // Gap in indices
1777            },
1778        };
1779
1780        assert_eq!(count_plan_params(&plan), 3); // Max index + 1
1781    }
1782
1783    // Note: count_plan_params for Insert is tested via integration tests
1784    // since LogicalPlan::Insert requires Arc<Table>.
1785
1786    #[test]
1787    fn test_count_params_nested_plans() {
1788        let plan = LogicalPlan::Filter {
1789            input: Box::new(LogicalPlan::Project {
1790                input: Box::new(LogicalPlan::EmptyRelation),
1791                exprs: vec![(Expr::Parameter { index: 1 }, "col".to_string())],
1792            }),
1793            predicate: Expr::Parameter { index: 0 },
1794        };
1795
1796        // Max should be 2 (index 1 + 1)
1797        assert_eq!(count_plan_params(&plan), 2);
1798    }
1799
1800    // ========== PreparedStatement methods ==========
1801
1802    #[test]
1803    fn test_prepared_statement_accessors() {
1804        let stmt = PreparedStatement::new(
1805            "SELECT * FROM users WHERE id = ?".to_string(),
1806            LogicalPlan::EmptyRelation,
1807            vec![Some(ColumnType::Integer)],
1808            1,
1809            42,
1810        );
1811
1812        assert_eq!(stmt.sql(), "SELECT * FROM users WHERE id = ?");
1813        assert!(matches!(stmt.plan(), LogicalPlan::EmptyRelation));
1814        assert_eq!(stmt.param_types().len(), 1);
1815        assert_eq!(stmt.param_count(), 1);
1816        assert_eq!(stmt.id(), 42);
1817    }
1818
1819    #[test]
1820    fn test_validate_params_with_nulls() {
1821        let stmt = PreparedStatement::new(
1822            "SELECT * FROM users WHERE id = ?".to_string(),
1823            LogicalPlan::EmptyRelation,
1824            vec![Some(ColumnType::Integer)],
1825            1,
1826            1,
1827        );
1828
1829        // NULL should be accepted for any type
1830        assert!(stmt.validate_params(&[Value::Null]).is_ok());
1831    }
1832
1833    #[test]
1834    fn test_validate_params_no_type_check() {
1835        let stmt = PreparedStatement::new(
1836            "SELECT * FROM users WHERE id = ?".to_string(),
1837            LogicalPlan::EmptyRelation,
1838            vec![None], // No type specified
1839            1,
1840            1,
1841        );
1842
1843        // Any type should be accepted when no type constraint
1844        assert!(stmt.validate_params(&[Value::Integer(1)]).is_ok());
1845        assert!(stmt
1846            .validate_params(&[Value::Text("hello".to_string())])
1847            .is_ok());
1848    }
1849
1850    // ========== PlanCache edge cases ==========
1851
1852    #[test]
1853    fn test_cache_invalidate_all() {
1854        let cache = PlanCache::new(10);
1855
1856        for i in 0..5 {
1857            let sql = format!("SELECT {}", i);
1858            let stmt = PreparedStatement::new(
1859                sql.clone(),
1860                LogicalPlan::EmptyRelation,
1861                vec![],
1862                0,
1863                i as u64,
1864            );
1865            cache.insert(sql, stmt);
1866        }
1867
1868        assert_eq!(cache.len(), 5);
1869
1870        cache.invalidate_all();
1871
1872        assert_eq!(cache.len(), 0);
1873        assert!(cache.is_empty());
1874        assert_eq!(cache.stats().invalidations(), 5);
1875    }
1876
1877    #[test]
1878    fn test_cache_stats_accuracy() {
1879        let cache = PlanCache::new(10);
1880
1881        // Initial state
1882        assert_eq!(cache.stats().hits(), 0);
1883        assert_eq!(cache.stats().misses(), 0);
1884        assert_eq!(cache.stats().evictions(), 0);
1885        assert_eq!(cache.stats().invalidations(), 0);
1886        assert_eq!(cache.stats().hit_ratio(), 1.0); // No accesses yet = 100%
1887
1888        // Miss
1889        assert!(cache.get("SELECT 1").is_none());
1890        assert_eq!(cache.stats().misses(), 1);
1891
1892        // Insert and hit
1893        cache.insert(
1894            "SELECT 1".to_string(),
1895            PreparedStatement::new(
1896                "SELECT 1".to_string(),
1897                LogicalPlan::EmptyRelation,
1898                vec![],
1899                0,
1900                1,
1901            ),
1902        );
1903        assert!(cache.get("SELECT 1").is_some());
1904        assert_eq!(cache.stats().hits(), 1);
1905
1906        // Hit ratio: 1 hit / (1 hit + 1 miss) = 0.5
1907        assert!((cache.stats().hit_ratio() - 0.5).abs() < 0.001);
1908    }
1909
1910    #[test]
1911    fn test_cache_next_id() {
1912        let cache = PlanCache::new(10);
1913
1914        let id1 = cache.next_id();
1915        let id2 = cache.next_id();
1916        let id3 = cache.next_id();
1917
1918        assert_eq!(id1, 1);
1919        assert_eq!(id2, 2);
1920        assert_eq!(id3, 3);
1921    }
1922
1923    #[test]
1924    fn test_cache_schema_version() {
1925        let cache = PlanCache::new(10);
1926
1927        let v1 = cache.schema_version();
1928        cache.invalidate_all();
1929        let v2 = cache.schema_version();
1930
1931        assert!(v2 > v1);
1932    }
1933
1934    #[test]
1935    fn test_cache_max_size() {
1936        let cache = PlanCache::new(100);
1937        assert_eq!(cache.max_size(), 100);
1938    }
1939
1940    #[test]
1941    fn test_cache_default_size() {
1942        let cache = PlanCache::default_size();
1943        assert_eq!(cache.max_size(), 1000);
1944    }
1945
1946    #[test]
1947    fn test_cache_default_trait() {
1948        let cache = PlanCache::default();
1949        assert_eq!(cache.max_size(), 1000);
1950    }
1951
1952    // ========== count_params for edge cases ==========
1953
1954    #[test]
1955    fn test_count_params_case_expr() {
1956        let expr = Expr::Case {
1957            operand: Some(Box::new(Expr::Parameter { index: 0 })),
1958            when_clauses: vec![(Expr::Parameter { index: 1 }, Expr::Parameter { index: 2 })],
1959            else_result: Some(Box::new(Expr::Parameter { index: 3 })),
1960        };
1961
1962        assert_eq!(count_params(&expr), 4);
1963    }
1964
1965    #[test]
1966    fn test_count_params_between() {
1967        let expr = Expr::Between {
1968            expr: Box::new(Expr::Parameter { index: 0 }),
1969            low: Box::new(Expr::Parameter { index: 1 }),
1970            high: Box::new(Expr::Parameter { index: 2 }),
1971            negated: false,
1972        };
1973
1974        assert_eq!(count_params(&expr), 3);
1975    }
1976
1977    #[test]
1978    fn test_count_params_in_list() {
1979        let expr = Expr::InList {
1980            expr: Box::new(Expr::Column {
1981                table: None,
1982                name: "x".to_string(),
1983                index: None,
1984            }),
1985            list: vec![
1986                Expr::Parameter { index: 0 },
1987                Expr::Parameter { index: 3 }, // Gap
1988            ],
1989            negated: false,
1990        };
1991
1992        assert_eq!(count_params(&expr), 4); // Max index + 1
1993    }
1994
1995    #[test]
1996    fn test_count_params_aggregate() {
1997        let expr = Expr::Aggregate {
1998            func: AggregateFunction::Sum,
1999            arg: Some(Box::new(Expr::Parameter { index: 2 })),
2000            distinct: false,
2001        };
2002
2003        assert_eq!(count_params(&expr), 3);
2004    }
2005
2006    #[test]
2007    fn test_count_params_function() {
2008        let expr = Expr::Function {
2009            name: "CONCAT".to_string(),
2010            args: vec![Expr::Parameter { index: 0 }, Expr::Parameter { index: 1 }],
2011        };
2012
2013        assert_eq!(count_params(&expr), 2);
2014    }
2015
2016    #[test]
2017    fn test_count_params_like() {
2018        let expr = Expr::Like {
2019            expr: Box::new(Expr::Parameter { index: 0 }),
2020            pattern: "%test%".to_string(),
2021            negated: false,
2022        };
2023
2024        assert_eq!(count_params(&expr), 1);
2025    }
2026
2027    #[test]
2028    fn test_count_params_no_params() {
2029        let expr = Expr::Column {
2030            table: None,
2031            name: "x".to_string(),
2032            index: None,
2033        };
2034        assert_eq!(count_params(&expr), 0);
2035
2036        let expr = Expr::Literal(Value::Integer(42));
2037        assert_eq!(count_params(&expr), 0);
2038
2039        let expr = Expr::Wildcard;
2040        assert_eq!(count_params(&expr), 0);
2041    }
2042}