1use std::{num::NonZeroUsize, sync::Arc};
7
8use lru::LruCache;
9use parking_lot::Mutex;
10use pyo3::{
11 prelude::*,
12 types::{PyList, PyTuple},
13};
14
15use crate::{
16 conversions::{convert_params_to_sql_values, sqlvalue_to_py, substitute_placeholders},
17 profiling, OperationalError, ProgrammingError,
18};
19
20enum QueryResultData {
25 Select { columns: Vec<String>, rows: Vec<vibesql_storage::Row> },
27 Execute {
29 rows_affected: usize,
30 #[allow(dead_code)]
31 message: String,
32 },
33}
34
35#[pyclass]
46pub struct Cursor {
47 db: Arc<Mutex<vibesql_storage::Database>>,
48 last_result: Option<QueryResultData>,
49 stmt_cache: Arc<Mutex<LruCache<String, vibesql_ast::Statement>>>,
52 schema_cache: Arc<Mutex<LruCache<String, vibesql_catalog::TableSchema>>>,
55 cache_hits: Arc<Mutex<usize>>,
57 cache_misses: Arc<Mutex<usize>>,
58 schema_cache_hits: Arc<Mutex<usize>>,
60 schema_cache_misses: Arc<Mutex<usize>>,
61}
62
63impl Cursor {
64 pub(crate) fn new(db: Arc<Mutex<vibesql_storage::Database>>) -> PyResult<Self> {
66 Ok(Cursor {
67 db,
68 last_result: None,
69 stmt_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1000).unwrap()))),
70 schema_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap()))),
71 cache_hits: Arc::new(Mutex::new(0)),
72 cache_misses: Arc::new(Mutex::new(0)),
73 schema_cache_hits: Arc::new(Mutex::new(0)),
74 schema_cache_misses: Arc::new(Mutex::new(0)),
75 })
76 }
77}
78
79#[pymethods]
80impl Cursor {
81 #[pyo3(signature = (sql, params=None))]
93 fn execute(
94 &mut self,
95 py: Python,
96 sql: &str,
97 params: Option<&Bound<'_, PyTuple>>,
98 ) -> PyResult<()> {
99 let mut profiler = profiling::DetailedProfiler::new("execute()");
100
101 let cache_key = sql.to_string();
103 profiler.checkpoint("SQL string copied");
104
105 let mut cache = self.stmt_cache.lock();
107 profiler.checkpoint("Acquired cache lock");
108 let stmt = if let Some(cached_stmt) = cache.get(&cache_key) {
109 let cloned_stmt = cached_stmt.clone();
111 drop(cache); *self.cache_hits.lock() += 1;
113 profiler.checkpoint("Cache HIT - stmt cloned");
114 cloned_stmt
115 } else {
116 drop(cache); *self.cache_misses.lock() += 1;
119 profiler.checkpoint("Cache MISS - need to parse");
120
121 let processed_sql = if let Some(params_tuple) = params {
123 Self::bind_parameters(py, sql, params_tuple)?
124 } else {
125 sql.to_string()
127 };
128
129 let stmt = vibesql_parser::Parser::parse_sql(&processed_sql)
131 .map_err(|e| ProgrammingError::new_err(format!("Parse error: {:?}", e)))?;
132 profiler.checkpoint("SQL parsed to AST");
133
134 let mut cache = self.stmt_cache.lock();
136 cache.put(cache_key.clone(), stmt.clone());
137 drop(cache);
138 profiler.checkpoint("AST cached");
139
140 stmt
141 };
142
143 profiler.checkpoint("Statement ready for execution");
144
145 match stmt {
147 vibesql_ast::Statement::Select(select_stmt) => {
148 let db = self.db.lock();
149 profiler.checkpoint("Database lock acquired (SELECT)");
150 let select_executor = vibesql_executor::SelectExecutor::new(&db);
151 profiler.checkpoint("SelectExecutor created");
152 let result = select_executor
153 .execute_with_columns(&select_stmt)
154 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
155 profiler.checkpoint("SELECT executed in Rust");
156
157 self.last_result =
158 Some(QueryResultData::Select { columns: result.columns, rows: result.rows });
159 profiler.checkpoint("Result stored");
160
161 Ok(())
162 }
163 vibesql_ast::Statement::CreateTable(create_stmt) => {
164 let mut db = self.db.lock();
165 vibesql_executor::CreateTableExecutor::execute(&create_stmt, &mut db)
166 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
167
168 let mut cache = self.stmt_cache.lock();
170 cache.clear();
171 drop(cache);
172 self.clear_schema_cache();
173
174 self.last_result = Some(QueryResultData::Execute {
175 rows_affected: 0,
176 message: format!("Table '{}' created successfully", create_stmt.table_name),
177 });
178
179 Ok(())
180 }
181 vibesql_ast::Statement::DropTable(drop_stmt) => {
182 let mut db = self.db.lock();
183 let message = vibesql_executor::DropTableExecutor::execute(&drop_stmt, &mut db)
184 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
185
186 let mut cache = self.stmt_cache.lock();
188 cache.clear();
189 drop(cache);
190 self.clear_schema_cache();
191
192 self.last_result = Some(QueryResultData::Execute { rows_affected: 0, message });
193
194 Ok(())
195 }
196 vibesql_ast::Statement::Insert(insert_stmt) => {
197 let mut db = self.db.lock();
198 profiler.checkpoint("Database lock acquired (INSERT)");
199 let row_count = vibesql_executor::InsertExecutor::execute(&mut db, &insert_stmt)
200 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
201 profiler.checkpoint("INSERT executed in Rust");
202
203 self.last_result = Some(QueryResultData::Execute {
204 rows_affected: row_count,
205 message: format!(
206 "{} row{} inserted into '{}'",
207 row_count,
208 if row_count == 1 { "" } else { "s" },
209 insert_stmt.table_name
210 ),
211 });
212 profiler.checkpoint("Result message created");
213
214 Ok(())
215 }
216 vibesql_ast::Statement::Update(update_stmt) => {
217 let cached_schema = self.get_cached_schema(&update_stmt.table_name)?;
219 profiler.checkpoint("Schema cache lookup (UPDATE)");
220
221 let mut db = self.db.lock();
222 profiler.checkpoint("Database lock acquired (UPDATE)");
223 let row_count = vibesql_executor::UpdateExecutor::execute_with_schema(
224 &update_stmt,
225 &mut db,
226 Some(&cached_schema),
227 )
228 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
229 profiler.checkpoint("UPDATE executed in Rust");
230
231 self.last_result = Some(QueryResultData::Execute {
232 rows_affected: row_count,
233 message: format!(
234 "{} row{} updated in '{}'",
235 row_count,
236 if row_count == 1 { "" } else { "s" },
237 update_stmt.table_name
238 ),
239 });
240 profiler.checkpoint("Result message created");
241
242 Ok(())
243 }
244 vibesql_ast::Statement::Delete(delete_stmt) => {
245 let mut db = self.db.lock();
246 profiler.checkpoint("Database lock acquired (DELETE)");
247 let row_count = vibesql_executor::DeleteExecutor::execute(&delete_stmt, &mut db)
248 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
249 profiler.checkpoint("DELETE executed in Rust");
250
251 self.last_result = Some(QueryResultData::Execute {
252 rows_affected: row_count,
253 message: format!(
254 "{} row{} deleted from '{}'",
255 row_count,
256 if row_count == 1 { "" } else { "s" },
257 delete_stmt.table_name
258 ),
259 });
260 profiler.checkpoint("Result message created");
261
262 Ok(())
263 }
264 vibesql_ast::Statement::CreateView(view_stmt) => {
265 let mut db = self.db.lock();
266 vibesql_executor::advanced_objects::execute_create_view(&view_stmt, &mut db)
267 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
268
269 let mut cache = self.stmt_cache.lock();
271 cache.clear();
272 drop(cache);
273 self.clear_schema_cache();
274
275 self.last_result = Some(QueryResultData::Execute {
276 rows_affected: 0,
277 message: format!("View '{}' created successfully", view_stmt.view_name),
278 });
279
280 Ok(())
281 }
282 vibesql_ast::Statement::DropView(drop_stmt) => {
283 let mut db = self.db.lock();
284 vibesql_executor::advanced_objects::execute_drop_view(&drop_stmt, &mut db)
285 .map_err(|e| OperationalError::new_err(format!("Execution error: {:?}", e)))?;
286
287 let mut cache = self.stmt_cache.lock();
289 cache.clear();
290 drop(cache);
291 self.clear_schema_cache();
292
293 self.last_result = Some(QueryResultData::Execute {
294 rows_affected: 0,
295 message: format!("View '{}' dropped successfully", drop_stmt.view_name),
296 });
297
298 Ok(())
299 }
300 _ => Err(ProgrammingError::new_err(format!(
301 "Statement type not yet supported: {:?}",
302 stmt
303 ))),
304 }
305 }
306
307 fn fetchall(&mut self, py: Python) -> PyResult<Py<PyAny>> {
315 let mut profiler = profiling::DetailedProfiler::new("fetchall()");
316 match &self.last_result {
317 Some(QueryResultData::Select { rows, .. }) => {
318 profiler.checkpoint(&format!("Starting fetch of {} rows", rows.len()));
319 let result_list = PyList::empty(py);
320 profiler.checkpoint("Empty PyList created");
321 for row in rows {
322 let py_values: Vec<Py<PyAny>> =
323 row.values.iter().map(|v| sqlvalue_to_py(py, v).unwrap()).collect();
324 let py_row = PyTuple::new(py, py_values)?;
325 result_list.append(py_row)?;
326 }
327 profiler.checkpoint("All rows converted to Python");
328 Ok(result_list.into())
329 }
330 Some(QueryResultData::Execute { .. }) => {
331 Err(ProgrammingError::new_err("No SELECT result to fetch"))
332 }
333 None => Err(ProgrammingError::new_err("No query has been executed")),
334 }
335 }
336
337 fn fetchone(&mut self, py: Python) -> PyResult<Py<PyAny>> {
345 match &mut self.last_result {
346 Some(QueryResultData::Select { rows, .. }) => {
347 if rows.is_empty() {
348 Ok(py.None())
349 } else {
350 let row = rows.remove(0);
351 let py_values: Vec<Py<PyAny>> =
352 row.values.iter().map(|v| sqlvalue_to_py(py, v).unwrap()).collect();
353 let py_row = PyTuple::new(py, py_values)?;
354 Ok(py_row.into())
355 }
356 }
357 Some(QueryResultData::Execute { .. }) => {
358 Err(ProgrammingError::new_err("No SELECT result to fetch"))
359 }
360 None => Err(ProgrammingError::new_err("No query has been executed")),
361 }
362 }
363
364 fn fetchmany(&mut self, py: Python, size: usize) -> PyResult<Py<PyAny>> {
375 match &mut self.last_result {
376 Some(QueryResultData::Select { rows, .. }) => {
377 let fetch_count = size.min(rows.len());
378 let result_list = PyList::empty(py);
379
380 for _ in 0..fetch_count {
381 if rows.is_empty() {
382 break;
383 }
384 let row = rows.remove(0);
385 let py_values: Vec<Py<PyAny>> =
386 row.values.iter().map(|v| sqlvalue_to_py(py, v).unwrap()).collect();
387 let py_row = PyTuple::new(py, py_values)?;
388 result_list.append(py_row)?;
389 }
390
391 Ok(result_list.into())
392 }
393 Some(QueryResultData::Execute { .. }) => {
394 Err(ProgrammingError::new_err("No SELECT result to fetch"))
395 }
396 None => Err(ProgrammingError::new_err("No query has been executed")),
397 }
398 }
399
400 #[getter]
406 fn rowcount(&self) -> PyResult<i64> {
407 match &self.last_result {
408 Some(QueryResultData::Execute { rows_affected, .. }) => Ok(*rows_affected as i64),
409 Some(QueryResultData::Select { rows, .. }) => Ok(rows.len() as i64),
410 None => Ok(-1),
411 }
412 }
413
414 #[getter]
421 fn description(&self, py: Python) -> PyResult<Py<PyAny>> {
422 match &self.last_result {
423 Some(QueryResultData::Select { columns, .. }) => {
424 let desc_list = PyList::empty(py);
425 for col_name in columns {
426 let col_str = col_name.as_str().into_pyobject(py)?.into_any().unbind();
428 let col_tuple = PyTuple::new(
429 py,
430 [
431 col_str,
432 py.None(), py.None(), py.None(), py.None(), py.None(), py.None(), ],
439 )?;
440 desc_list.append(col_tuple)?;
441 }
442 Ok(desc_list.into())
443 }
444 Some(QueryResultData::Execute { .. }) => Ok(py.None()),
445 None => Ok(py.None()),
446 }
447 }
448
449 fn schema_cache_stats(&self) -> PyResult<(usize, usize, f64)> {
454 let hits = *self.schema_cache_hits.lock();
455 let misses = *self.schema_cache_misses.lock();
456 let total = hits + misses;
457 let hit_rate = if total > 0 { (hits as f64) / (total as f64) } else { 0.0 };
458 Ok((hits, misses, hit_rate))
459 }
460
461 fn cache_stats(&self) -> PyResult<(usize, usize, f64)> {
466 let hits = *self.cache_hits.lock();
467 let misses = *self.cache_misses.lock();
468 let total = hits + misses;
469 let hit_rate = if total > 0 { (hits as f64) / (total as f64) } else { 0.0 };
470 Ok((hits, misses, hit_rate))
471 }
472
473 fn clear_cache(&mut self) -> PyResult<()> {
477 let mut cache = self.stmt_cache.lock();
478 cache.clear();
479 drop(cache);
480
481 self.clear_schema_cache();
482 Ok(())
483 }
484
485 fn close(&self) -> PyResult<()> {
489 Ok(())
491 }
492
493 fn executemany(
505 &mut self,
506 py: Python,
507 sql: &str,
508 seq_of_params: &Bound<'_, PyList>,
509 ) -> PyResult<()> {
510 let mut total_rows_affected = 0;
511
512 for item in seq_of_params.iter() {
513 let params_tuple = item.cast::<PyTuple>()?;
514 let processed_sql = Self::bind_parameters(py, sql, params_tuple)?;
516
517 let stmt = vibesql_parser::Parser::parse_sql(&processed_sql)
519 .map_err(|e| ProgrammingError::new_err(format!("Parse error: {:?}", e)))?;
520
521 match stmt {
523 vibesql_ast::Statement::Insert(insert_stmt) => {
524 let mut db = self.db.lock();
525 let row_count =
526 vibesql_executor::InsertExecutor::execute(&mut db, &insert_stmt).map_err(
527 |e| OperationalError::new_err(format!("Execution error: {:?}", e)),
528 )?;
529 total_rows_affected += row_count;
530 }
531 vibesql_ast::Statement::Update(update_stmt) => {
532 let mut db = self.db.lock();
533 let row_count =
534 vibesql_executor::UpdateExecutor::execute(&update_stmt, &mut db).map_err(
535 |e| OperationalError::new_err(format!("Execution error: {:?}", e)),
536 )?;
537 total_rows_affected += row_count;
538 }
539 vibesql_ast::Statement::Delete(delete_stmt) => {
540 let mut db = self.db.lock();
541 let row_count =
542 vibesql_executor::DeleteExecutor::execute(&delete_stmt, &mut db).map_err(
543 |e| OperationalError::new_err(format!("Execution error: {:?}", e)),
544 )?;
545 total_rows_affected += row_count;
546 }
547 _ => {
548 return Err(ProgrammingError::new_err(
549 "executemany only supports INSERT, UPDATE, and DELETE statements"
550 .to_string(),
551 ))
552 }
553 }
554 }
555
556 self.last_result = Some(QueryResultData::Execute {
558 rows_affected: total_rows_affected,
559 message: format!("{} rows affected", total_rows_affected),
560 });
561
562 Ok(())
563 }
564}
565
566impl Cursor {
567 fn bind_parameters(py: Python, sql: &str, params: &Bound<'_, PyTuple>) -> PyResult<String> {
572 let placeholder_count = sql.matches('?').count();
574 let param_count = params.len();
575
576 if placeholder_count != param_count {
578 return Err(ProgrammingError::new_err(format!(
579 "Parameter count mismatch: SQL has {} placeholders but {} parameters provided",
580 placeholder_count, param_count
581 )));
582 }
583
584 let sql_values = convert_params_to_sql_values(py, params)?;
586
587 Ok(substitute_placeholders(sql, &sql_values))
589 }
590
591 fn get_cached_schema(&self, table_name: &str) -> PyResult<vibesql_catalog::TableSchema> {
596 let mut cache = self.schema_cache.lock();
598 if let Some(schema) = cache.get(table_name) {
599 let schema = schema.clone();
601 drop(cache);
602 *self.schema_cache_hits.lock() += 1;
603 return Ok(schema);
604 }
605 drop(cache);
606
607 *self.schema_cache_misses.lock() += 1;
609 let db = self.db.lock();
610 let schema = db
611 .catalog
612 .get_table(table_name)
613 .ok_or_else(|| OperationalError::new_err(format!("Table not found: {}", table_name)))?
614 .clone();
615 drop(db);
616
617 let mut cache = self.schema_cache.lock();
619 cache.put(table_name.to_string(), schema.clone());
620 drop(cache);
621
622 Ok(schema)
623 }
624
625 #[allow(dead_code)]
629 fn invalidate_schema_cache(&self, table_name: &str) {
630 let mut cache = self.schema_cache.lock();
631 cache.pop(table_name);
632 }
633
634 fn clear_schema_cache(&self) {
638 let mut cache = self.schema_cache.lock();
639 cache.clear();
640 }
641}