vibesql/
cursor.rs

1//! Query cursor implementation
2//!
3//! This module provides the Cursor class for executing SQL statements,
4//! managing cached results, and fetching query results following DB-API 2.0 conventions.
5
6use std::{num::NonZeroUsize, sync::Arc};
7
8use lru::LruCache;
9use parking_lot::Mutex;
10use pyo3::{
11    prelude::*,
12    types::{PyList, PyTuple},
13};
14
15use crate::{
16    conversions::{convert_params_to_sql_values, sqlvalue_to_py, substitute_placeholders},
17    profiling, OperationalError, ProgrammingError,
18};
19
20/// Query result storage
21///
22/// Represents the result of a SQL query execution, either a SELECT result
23/// with rows and columns, or a DML/DDL result with rows affected count.
24enum QueryResultData {
25    /// SELECT query result with columns and rows
26    Select { columns: Vec<String>, rows: Vec<vibesql_storage::Row> },
27    /// DML/DDL result with rows affected and message
28    Execute {
29        rows_affected: usize,
30        #[allow(dead_code)]
31        message: String,
32    },
33}
34
35/// Cursor object for executing SQL statements
36///
37/// Follows DB-API 2.0 conventions for query execution and result fetching.
38/// Maintains caches for parsed statements and table schemas to improve performance.
39///
40/// # Caches
41///
42/// - **Statement Cache**: LRU cache of up to 1000 parsed SQL statements
43/// - **Schema Cache**: LRU cache of up to 100 table schemas
44/// - **Statistics**: Tracks hit/miss ratios for both caches
45#[pyclass]
46pub struct Cursor {
47    db: Arc<Mutex<vibesql_storage::Database>>,
48    last_result: Option<QueryResultData>,
49    /// LRU cache for parsed SQL statements (max 1000 entries)
50    /// Key: SQL string with ? placeholders, Value: parsed AST
51    stmt_cache: Arc<Mutex<LruCache<String, vibesql_ast::Statement>>>,
52    /// Cache for table schemas (max 100 tables per cursor)
53    /// Key: table name, Value: cached schema
54    schema_cache: Arc<Mutex<LruCache<String, vibesql_catalog::TableSchema>>>,
55    /// Cache statistics for monitoring
56    cache_hits: Arc<Mutex<usize>>,
57    cache_misses: Arc<Mutex<usize>>,
58    /// Schema cache statistics
59    schema_cache_hits: Arc<Mutex<usize>>,
60    schema_cache_misses: Arc<Mutex<usize>>,
61}
62
63impl Cursor {
64    /// Create a new cursor for the given database
65    pub(crate) fn new(db: Arc<Mutex<vibesql_storage::Database>>) -> PyResult<Self> {
66        Ok(Cursor {
67            db,
68            last_result: None,
69            stmt_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1000).unwrap()))),
70            schema_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap()))),
71            cache_hits: Arc::new(Mutex::new(0)),
72            cache_misses: Arc::new(Mutex::new(0)),
73            schema_cache_hits: Arc::new(Mutex::new(0)),
74            schema_cache_misses: Arc::new(Mutex::new(0)),
75        })
76    }
77}
78
79#[pymethods]
80impl Cursor {
81    /// Execute a SQL statement with optional parameter binding
82    ///
83    /// # Arguments
84    /// * `sql` - The SQL statement to execute (may contain ? placeholders)
85    /// * `params` - Optional tuple of parameter values to bind to ? placeholders
86    ///
87    /// # Returns
88    /// None
89    ///
90    /// # Errors
91    /// Returns ProgrammingError for SQL parse errors or OperationalError for execution errors.
92    #[pyo3(signature = (sql, params=None))]
93    fn execute(
94        &mut self,
95        py: Python,
96        sql: &str,
97        params: Option<&Bound<'_, PyTuple>>,
98    ) -> PyResult<()> {
99        let mut profiler = profiling::DetailedProfiler::new("execute()");
100
101        // Use the original SQL (with ? placeholders) as the cache key
102        let cache_key = sql.to_string();
103        profiler.checkpoint("SQL string copied");
104
105        // Check if we have a cached parsed AST for this SQL
106        let mut cache = self.stmt_cache.lock();
107        profiler.checkpoint("Acquired cache lock");
108        let stmt = if let Some(cached_stmt) = cache.get(&cache_key) {
109            // Cache hit! Clone the cached AST before releasing lock
110            let cloned_stmt = cached_stmt.clone();
111            drop(cache); // Release cache lock before updating stats
112            *self.cache_hits.lock() += 1;
113            profiler.checkpoint("Cache HIT - stmt cloned");
114            cloned_stmt
115        } else {
116            // Cache miss - need to parse
117            drop(cache); // Release cache lock before parsing
118            *self.cache_misses.lock() += 1;
119            profiler.checkpoint("Cache MISS - need to parse");
120
121            // Process SQL with parameter substitution if params are provided
122            let processed_sql = if let Some(params_tuple) = params {
123                Self::bind_parameters(py, sql, params_tuple)?
124            } else {
125                // No parameters provided, use SQL as-is
126                sql.to_string()
127            };
128
129            // Parse the processed SQL
130            let stmt = vibesql_parser::Parser::parse_sql(&processed_sql)
131                .map_err(|e| ProgrammingError::new_err(format!("Parse error: {:?}", e)))?;
132            profiler.checkpoint("SQL parsed to AST");
133
134            // Store the parsed AST in cache for future reuse
135            let mut cache = self.stmt_cache.lock();
136            cache.put(cache_key.clone(), stmt.clone());
137            drop(cache);
138            profiler.checkpoint("AST cached");
139
140            stmt
141        };
142
143        profiler.checkpoint("Statement ready for execution");
144
145        // Execute based on statement type
146        match stmt {
147            vibesql_ast::Statement::Select(select_stmt) => {
148                let db = self.db.lock();
149                profiler.checkpoint("Database lock acquired (SELECT)");
150                let select_executor = vibesql_executor::SelectExecutor::new(&db);
151                profiler.checkpoint("SelectExecutor created");
152                let result = select_executor
153                    .execute_with_columns(&select_stmt)
154                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
155                profiler.checkpoint("SELECT executed in Rust");
156
157                self.last_result =
158                    Some(QueryResultData::Select { columns: result.columns, rows: result.rows });
159                profiler.checkpoint("Result stored");
160
161                Ok(())
162            }
163            vibesql_ast::Statement::CreateTable(create_stmt) => {
164                let mut db = self.db.lock();
165                vibesql_executor::CreateTableExecutor::execute(&create_stmt, &mut db)
166                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
167
168                // Clear both statement and schema caches on schema change
169                let mut cache = self.stmt_cache.lock();
170                cache.clear();
171                drop(cache);
172                self.clear_schema_cache();
173
174                self.last_result = Some(QueryResultData::Execute {
175                    rows_affected: 0,
176                    message: format!("Table '{}' created successfully", create_stmt.table_name),
177                });
178
179                Ok(())
180            }
181            vibesql_ast::Statement::DropTable(drop_stmt) => {
182                let mut db = self.db.lock();
183                let message = vibesql_executor::DropTableExecutor::execute(&drop_stmt, &mut db)
184                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
185
186                // Clear both statement and schema caches on schema change
187                let mut cache = self.stmt_cache.lock();
188                cache.clear();
189                drop(cache);
190                self.clear_schema_cache();
191
192                self.last_result = Some(QueryResultData::Execute { rows_affected: 0, message });
193
194                Ok(())
195            }
196            vibesql_ast::Statement::Insert(insert_stmt) => {
197                let mut db = self.db.lock();
198                profiler.checkpoint("Database lock acquired (INSERT)");
199                let row_count = vibesql_executor::InsertExecutor::execute(&mut db, &insert_stmt)
200                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
201                profiler.checkpoint("INSERT executed in Rust");
202
203                self.last_result = Some(QueryResultData::Execute {
204                    rows_affected: row_count,
205                    message: format!(
206                        "{} row{} inserted into '{}'",
207                        row_count,
208                        if row_count == 1 { "" } else { "s" },
209                        insert_stmt.table_name
210                    ),
211                });
212                profiler.checkpoint("Result message created");
213
214                Ok(())
215            }
216            vibesql_ast::Statement::Update(update_stmt) => {
217                // Use cached schema to reduce catalog lookups
218                let cached_schema = self.get_cached_schema(&update_stmt.table_name)?;
219                profiler.checkpoint("Schema cache lookup (UPDATE)");
220
221                let mut db = self.db.lock();
222                profiler.checkpoint("Database lock acquired (UPDATE)");
223                let row_count = vibesql_executor::UpdateExecutor::execute_with_schema(
224                    &update_stmt,
225                    &mut db,
226                    Some(&cached_schema),
227                )
228                .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
229                profiler.checkpoint("UPDATE executed in Rust");
230
231                self.last_result = Some(QueryResultData::Execute {
232                    rows_affected: row_count,
233                    message: format!(
234                        "{} row{} updated in '{}'",
235                        row_count,
236                        if row_count == 1 { "" } else { "s" },
237                        update_stmt.table_name
238                    ),
239                });
240                profiler.checkpoint("Result message created");
241
242                Ok(())
243            }
244            vibesql_ast::Statement::Delete(delete_stmt) => {
245                let mut db = self.db.lock();
246                profiler.checkpoint("Database lock acquired (DELETE)");
247                let row_count = vibesql_executor::DeleteExecutor::execute(&delete_stmt, &mut db)
248                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
249                profiler.checkpoint("DELETE executed in Rust");
250
251                self.last_result = Some(QueryResultData::Execute {
252                    rows_affected: row_count,
253                    message: format!(
254                        "{} row{} deleted from '{}'",
255                        row_count,
256                        if row_count == 1 { "" } else { "s" },
257                        delete_stmt.table_name
258                    ),
259                });
260                profiler.checkpoint("Result message created");
261
262                Ok(())
263            }
264            vibesql_ast::Statement::CreateView(view_stmt) => {
265                let mut db = self.db.lock();
266                vibesql_executor::advanced_objects::execute_create_view(&view_stmt, &mut db)
267                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
268
269                // Clear both statement and schema caches on schema change
270                let mut cache = self.stmt_cache.lock();
271                cache.clear();
272                drop(cache);
273                self.clear_schema_cache();
274
275                self.last_result = Some(QueryResultData::Execute {
276                    rows_affected: 0,
277                    message: format!("View '{}' created successfully", view_stmt.view_name),
278                });
279
280                Ok(())
281            }
282            vibesql_ast::Statement::DropView(drop_stmt) => {
283                let mut db = self.db.lock();
284                vibesql_executor::advanced_objects::execute_drop_view(&drop_stmt, &mut db)
285                    .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
286
287                // Clear both statement and schema caches on schema change
288                let mut cache = self.stmt_cache.lock();
289                cache.clear();
290                drop(cache);
291                self.clear_schema_cache();
292
293                self.last_result = Some(QueryResultData::Execute {
294                    rows_affected: 0,
295                    message: format!("View '{}' dropped successfully", drop_stmt.view_name),
296                });
297
298                Ok(())
299            }
300            _ => Err(ProgrammingError::new_err(format!(
301                "Statement type not yet supported: {:?}",
302                stmt
303            ))),
304        }
305    }
306
307    /// Fetch all rows from the last query result
308    ///
309    /// # Returns
310    /// A list of tuples, each representing a row from the result set.
311    ///
312    /// # Errors
313    /// Returns ProgrammingError if the last result was not a SELECT query.
314    fn fetchall(&mut self, py: Python) -> PyResult<Py<PyAny>> {
315        let mut profiler = profiling::DetailedProfiler::new("fetchall()");
316        match &self.last_result {
317            Some(QueryResultData::Select { rows, .. }) => {
318                profiler.checkpoint(&format!("Starting fetch of {} rows", rows.len()));
319                let result_list = PyList::empty(py);
320                profiler.checkpoint("Empty PyList created");
321                for row in rows {
322                    let py_values: Vec<Py<PyAny>> =
323                        row.values.iter().map(|v| sqlvalue_to_py(py, v).unwrap()).collect();
324                    let py_row = PyTuple::new(py, py_values)?;
325                    result_list.append(py_row)?;
326                }
327                profiler.checkpoint("All rows converted to Python");
328                Ok(result_list.into())
329            }
330            Some(QueryResultData::Execute { .. }) => {
331                Err(ProgrammingError::new_err("No SELECT result to fetch"))
332            }
333            None => Err(ProgrammingError::new_err("No query has been executed")),
334        }
335    }
336
337    /// Fetch one row from the last query result
338    ///
339    /// # Returns
340    /// A tuple representing the next row, or None if no more rows are available.
341    ///
342    /// # Errors
343    /// Returns ProgrammingError if the last result was not a SELECT query.
344    fn fetchone(&mut self, py: Python) -> PyResult<Py<PyAny>> {
345        match &mut self.last_result {
346            Some(QueryResultData::Select { rows, .. }) => {
347                if rows.is_empty() {
348                    Ok(py.None())
349                } else {
350                    let row = rows.remove(0);
351                    let py_values: Vec<Py<PyAny>> =
352                        row.values.iter().map(|v| sqlvalue_to_py(py, v).unwrap()).collect();
353                    let py_row = PyTuple::new(py, py_values)?;
354                    Ok(py_row.into())
355                }
356            }
357            Some(QueryResultData::Execute { .. }) => {
358                Err(ProgrammingError::new_err("No SELECT result to fetch"))
359            }
360            None => Err(ProgrammingError::new_err("No query has been executed")),
361        }
362    }
363
364    /// Fetch multiple rows from the last query result
365    ///
366    /// # Arguments
367    /// * `size` - Number of rows to fetch
368    ///
369    /// # Returns
370    /// A list of tuples, each representing a row from the result set.
371    ///
372    /// # Errors
373    /// Returns ProgrammingError if the last result was not a SELECT query.
374    fn fetchmany(&mut self, py: Python, size: usize) -> PyResult<Py<PyAny>> {
375        match &mut self.last_result {
376            Some(QueryResultData::Select { rows, .. }) => {
377                let fetch_count = size.min(rows.len());
378                let result_list = PyList::empty(py);
379
380                for _ in 0..fetch_count {
381                    if rows.is_empty() {
382                        break;
383                    }
384                    let row = rows.remove(0);
385                    let py_values: Vec<Py<PyAny>> =
386                        row.values.iter().map(|v| sqlvalue_to_py(py, v).unwrap()).collect();
387                    let py_row = PyTuple::new(py, py_values)?;
388                    result_list.append(py_row)?;
389                }
390
391                Ok(result_list.into())
392            }
393            Some(QueryResultData::Execute { .. }) => {
394                Err(ProgrammingError::new_err("No SELECT result to fetch"))
395            }
396            None => Err(ProgrammingError::new_err("No query has been executed")),
397        }
398    }
399
400    /// Get the number of rows affected by the last DML operation
401    ///
402    /// # Returns
403    /// The number of rows affected by the last INSERT, UPDATE, or DELETE operation,
404    /// the number of rows in the result set for a SELECT, or -1 if not applicable.
405    #[getter]
406    fn rowcount(&self) -> PyResult<i64> {
407        match &self.last_result {
408            Some(QueryResultData::Execute { rows_affected, .. }) => Ok(*rows_affected as i64),
409            Some(QueryResultData::Select { rows, .. }) => Ok(rows.len() as i64),
410            None => Ok(-1),
411        }
412    }
413
414    /// Get description of the columns in the last SELECT result
415    ///
416    /// # Returns
417    /// A sequence of 7-item sequences describing each column:
418    /// (name, type_code, display_size, internal_size, precision, scale, null_ok)
419    /// Returns None if the last result was not a SELECT query.
420    #[getter]
421    fn description(&self, py: Python) -> PyResult<Py<PyAny>> {
422        match &self.last_result {
423            Some(QueryResultData::Select { columns, .. }) => {
424                let desc_list = PyList::empty(py);
425                for col_name in columns {
426                    // Each column is a 7-tuple: (name, type_code, None, None, None, None, None)
427                    let col_str = col_name.as_str().into_pyobject(py)?.into_any().unbind();
428                    let col_tuple = PyTuple::new(
429                        py,
430                        [
431                            col_str,
432                            py.None(), // type_code - we don't have detailed type info
433                            py.None(), // display_size
434                            py.None(), // internal_size
435                            py.None(), // precision
436                            py.None(), // scale
437                            py.None(), // null_ok
438                        ],
439                    )?;
440                    desc_list.append(col_tuple)?;
441                }
442                Ok(desc_list.into())
443            }
444            Some(QueryResultData::Execute { .. }) => Ok(py.None()),
445            None => Ok(py.None()),
446        }
447    }
448
449    /// Get schema cache statistics
450    ///
451    /// # Returns
452    /// A tuple of (hits, misses, hit_rate) for the schema cache.
453    fn schema_cache_stats(&self) -> PyResult<(usize, usize, f64)> {
454        let hits = *self.schema_cache_hits.lock();
455        let misses = *self.schema_cache_misses.lock();
456        let total = hits + misses;
457        let hit_rate = if total > 0 { (hits as f64) / (total as f64) } else { 0.0 };
458        Ok((hits, misses, hit_rate))
459    }
460
461    /// Get statement cache statistics
462    ///
463    /// # Returns
464    /// A tuple of (hits, misses, hit_rate) for the statement cache.
465    fn cache_stats(&self) -> PyResult<(usize, usize, f64)> {
466        let hits = *self.cache_hits.lock();
467        let misses = *self.cache_misses.lock();
468        let total = hits + misses;
469        let hit_rate = if total > 0 { (hits as f64) / (total as f64) } else { 0.0 };
470        Ok((hits, misses, hit_rate))
471    }
472
473    /// Clear both statement and schema caches
474    ///
475    /// Useful for testing or when schema changes occur outside this cursor.
476    fn clear_cache(&mut self) -> PyResult<()> {
477        let mut cache = self.stmt_cache.lock();
478        cache.clear();
479        drop(cache);
480
481        self.clear_schema_cache();
482        Ok(())
483    }
484
485    /// Close the cursor
486    ///
487    /// For now, this is a no-op but provided for DB-API 2.0 compatibility.
488    fn close(&self) -> PyResult<()> {
489        // No cleanup needed
490        Ok(())
491    }
492
493    /// Execute a SQL statement multiple times with different parameter sets
494    ///
495    /// # Arguments
496    /// * `sql` - The SQL statement to execute (may contain ? placeholders)
497    /// * `seq_of_params` - A sequence of parameter sequences (list of tuples)
498    ///
499    /// # Returns
500    /// None
501    ///
502    /// # Errors
503    /// Returns ProgrammingError for SQL parse errors or OperationalError for execution errors.
504    fn executemany(
505        &mut self,
506        py: Python,
507        sql: &str,
508        seq_of_params: &Bound<'_, PyList>,
509    ) -> PyResult<()> {
510        let mut total_rows_affected = 0;
511
512        for item in seq_of_params.iter() {
513            let params_tuple = item.cast::<PyTuple>()?;
514            // Bind parameters for this iteration
515            let processed_sql = Self::bind_parameters(py, sql, params_tuple)?;
516
517            // Parse the SQL
518            let stmt = vibesql_parser::Parser::parse_sql(&processed_sql)
519                .map_err(|e| ProgrammingError::new_err(format!("Parse error: {:?}", e)))?;
520
521            // Execute the statement
522            match stmt {
523                vibesql_ast::Statement::Insert(insert_stmt) => {
524                    let mut db = self.db.lock();
525                    let row_count =
526                        vibesql_executor::InsertExecutor::execute(&mut db, &insert_stmt).map_err(
527                            |e| OperationalError::new_err(format!("Execution error: {:?}", e)),
528                        )?;
529                    total_rows_affected += row_count;
530                }
531                vibesql_ast::Statement::Update(update_stmt) => {
532                    let mut db = self.db.lock();
533                    let row_count =
534                        vibesql_executor::UpdateExecutor::execute(&update_stmt, &mut db).map_err(
535                            |e| OperationalError::new_err(format!("Execution error: {:?}", e)),
536                        )?;
537                    total_rows_affected += row_count;
538                }
539                vibesql_ast::Statement::Delete(delete_stmt) => {
540                    let mut db = self.db.lock();
541                    let row_count =
542                        vibesql_executor::DeleteExecutor::execute(&delete_stmt, &mut db).map_err(
543                            |e| OperationalError::new_err(format!("Execution error: {:?}", e)),
544                        )?;
545                    total_rows_affected += row_count;
546                }
547                _ => {
548                    return Err(ProgrammingError::new_err(
549                        "executemany only supports INSERT, UPDATE, and DELETE statements"
550                            .to_string(),
551                    ))
552                }
553            }
554        }
555
556        // Set the final result to the total rows affected
557        self.last_result = Some(QueryResultData::Execute {
558            rows_affected: total_rows_affected,
559            message: format!("{} rows affected", total_rows_affected),
560        });
561
562        Ok(())
563    }
564}
565
566impl Cursor {
567    /// Bind parameters to SQL statement
568    ///
569    /// Takes SQL with ? placeholders and a tuple of parameters, validates
570    /// parameter count, and returns SQL with parameters substituted as literals.
571    fn bind_parameters(py: Python, sql: &str, params: &Bound<'_, PyTuple>) -> PyResult<String> {
572        // Count placeholders in SQL
573        let placeholder_count = sql.matches('?').count();
574        let param_count = params.len();
575
576        // Validate parameter count matches placeholder count
577        if placeholder_count != param_count {
578            return Err(ProgrammingError::new_err(format!(
579                "Parameter count mismatch: SQL has {} placeholders but {} parameters provided",
580                placeholder_count, param_count
581            )));
582        }
583
584        // Convert Python parameters to SQL values
585        let sql_values = convert_params_to_sql_values(py, params)?;
586
587        // Replace placeholders with SQL literal values
588        Ok(substitute_placeholders(sql, &sql_values))
589    }
590
591    /// Get table schema with caching
592    ///
593    /// First checks the schema cache, and only queries the database catalog on cache miss.
594    /// This reduces redundant catalog lookups during repeated operations on the same table.
595    fn get_cached_schema(&self, table_name: &str) -> PyResult<vibesql_catalog::TableSchema> {
596        // Check cache first
597        let mut cache = self.schema_cache.lock();
598        if let Some(schema) = cache.get(table_name) {
599            // Cache hit! Clone and return
600            let schema = schema.clone();
601            drop(cache);
602            *self.schema_cache_hits.lock() += 1;
603            return Ok(schema);
604        }
605        drop(cache);
606
607        // Cache miss - look up in database catalog
608        *self.schema_cache_misses.lock() += 1;
609        let db = self.db.lock();
610        let schema = db
611            .catalog
612            .get_table(table_name)
613            .ok_or_else(|| OperationalError::new_err(format!("Table not found: {}", table_name)))?
614            .clone();
615        drop(db);
616
617        // Store in cache for future use
618        let mut cache = self.schema_cache.lock();
619        cache.put(table_name.to_string(), schema.clone());
620        drop(cache);
621
622        Ok(schema)
623    }
624
625    /// Invalidate schema cache for a specific table
626    ///
627    /// Call this after DDL operations that modify table schema.
628    #[allow(dead_code)]
629    fn invalidate_schema_cache(&self, table_name: &str) {
630        let mut cache = self.schema_cache.lock();
631        cache.pop(table_name);
632    }
633
634    /// Clear all schema caches
635    ///
636    /// Call this after any DDL operation that could affect multiple tables.
637    fn clear_schema_cache(&self) {
638        let mut cache = self.schema_cache.lock();
639        cache.clear();
640    }
641}