vibesql_executor/
readonly.rs

1//! Read-only query execution for concurrent access.
2//!
3//! This module provides:
4//! - `ReadOnlyQuery` trait: A `query(&self)` method that enables read-only SQL queries
5//!   on an immutable database reference
6//! - `SharedDatabase` wrapper: A thread-safe wrapper around `Database` that manages
7//!   concurrent read/write access
8//!
9//! ## Usage
10//!
11//! ### Using ReadOnlyQuery trait directly
12//!
13//! ```text
14//! use vibesql_executor::readonly::ReadOnlyQuery;
15//! use vibesql_storage::Database;
16//!
17//! let db = Database::new();
18//! // ... set up tables and data ...
19//!
20//! // Execute read-only query without requiring &mut self
21//! let result = db.query("SELECT * FROM users WHERE id = 1")?;
22//! println!("Found {} rows", result.rows.len());
23//! ```
24//!
25//! ### Using SharedDatabase for concurrent access
26//!
27//! ```text
28//! use vibesql_executor::SharedDatabase;
29//! use vibesql_storage::Database;
30//!
31//! let db = SharedDatabase::new(Database::new());
32//!
33//! // Concurrent read queries - multiple can execute simultaneously
34//! let result = db.query("SELECT * FROM users WHERE id = 1").await?;
35//!
36//! // Write operations - exclusive access
37//! db.write().await.insert_row("users", row)?;
38//! ```
39//!
40//! ## Thread Safety
41//!
42//! The `query()` method takes `&self`, enabling concurrent access. Multiple readers
43//! can execute SELECT queries simultaneously using the read lock, while writers
44//! acquire exclusive access via write lock.
45//!
46//! ## Error Handling
47//!
48//! The `query()` method only accepts SELECT statements. Any other statement type
49//! (INSERT, UPDATE, DELETE, DDL) returns a `ReadOnlyError::NotReadOnly` error.
50
51use std::sync::Arc;
52
53use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
54use vibesql_ast::Statement;
55use vibesql_storage::Database;
56
57use crate::{
58    errors::ExecutorError,
59    select::{SelectExecutor, SelectResult},
60};
61
62/// Error type for read-only query operations.
63#[derive(Debug)]
64pub enum ReadOnlyError {
65    /// The query is not a read-only SELECT statement
66    NotReadOnly { statement_type: String },
67    /// SQL parsing failed
68    ParseError(String),
69    /// Execution failed
70    ExecutionError(ExecutorError),
71}
72
73impl std::fmt::Display for ReadOnlyError {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            ReadOnlyError::NotReadOnly { statement_type } => {
77                write!(
78                    f,
79                    "{} is not allowed in read-only mode. Only SELECT queries are permitted.",
80                    statement_type
81                )
82            }
83            ReadOnlyError::ParseError(msg) => write!(f, "SQL parse error: {}", msg),
84            ReadOnlyError::ExecutionError(e) => write!(f, "Execution error: {:?}", e),
85        }
86    }
87}
88
89impl std::error::Error for ReadOnlyError {}
90
91impl From<ExecutorError> for ReadOnlyError {
92    fn from(e: ExecutorError) -> Self {
93        ReadOnlyError::ExecutionError(e)
94    }
95}
96
97/// Extension trait for read-only query execution on `Database`.
98///
99/// This trait provides a `query(&self)` method that enables executing read-only
100/// SQL queries without requiring mutable access to the database. This is essential
101/// for concurrent read access in multi-connection scenarios.
102pub trait ReadOnlyQuery {
103    /// Execute a read-only SQL query.
104    ///
105    /// This method parses the SQL string and executes it if it's a SELECT statement.
106    /// Any other statement type (INSERT, UPDATE, DELETE, DDL) will return an error.
107    ///
108    /// # Arguments
109    ///
110    /// * `sql` - The SQL query string to execute
111    ///
112    /// # Returns
113    ///
114    /// * `Ok(SelectResult)` - The query results including column names and rows
115    /// * `Err(ReadOnlyError::NotReadOnly)` - If the SQL is not a SELECT statement
116    /// * `Err(ReadOnlyError::ParseError)` - If the SQL cannot be parsed
117    /// * `Err(ReadOnlyError::ExecutionError)` - If the query execution fails
118    ///
119    /// # Example
120    ///
121    /// ```text
122    /// use vibesql_executor::readonly::ReadOnlyQuery;
123    ///
124    /// let db = Database::new();
125    /// // ... create tables and insert data ...
126    ///
127    /// // Read-only query works with &self (no mutation)
128    /// let result = db.query("SELECT * FROM users")?;
129    /// for row in &result.rows {
130    ///     println!("{:?}", row);
131    /// }
132    ///
133    /// // DML queries are rejected
134    /// let err = db.query("INSERT INTO users VALUES (1, 'test')");
135    /// assert!(matches!(err, Err(ReadOnlyError::NotReadOnly { .. })));
136    /// ```
137    fn query(&self, sql: &str) -> Result<SelectResult, ReadOnlyError>;
138}
139
140impl ReadOnlyQuery for Database {
141    fn query(&self, sql: &str) -> Result<SelectResult, ReadOnlyError> {
142        // Parse the SQL
143        let statement = vibesql_parser::Parser::parse_sql(sql)
144            .map_err(|e| ReadOnlyError::ParseError(format!("{:?}", e)))?;
145
146        // Only allow SELECT statements
147        match &statement {
148            Statement::Select(select_stmt) => {
149                let executor = SelectExecutor::new(self);
150                executor.execute_with_columns(select_stmt.as_ref()).map_err(ReadOnlyError::from)
151            }
152            Statement::Insert(_) => {
153                Err(ReadOnlyError::NotReadOnly { statement_type: "INSERT".to_string() })
154            }
155            Statement::Update(_) => {
156                Err(ReadOnlyError::NotReadOnly { statement_type: "UPDATE".to_string() })
157            }
158            Statement::Delete(_) => {
159                Err(ReadOnlyError::NotReadOnly { statement_type: "DELETE".to_string() })
160            }
161            Statement::CreateTable(_) => {
162                Err(ReadOnlyError::NotReadOnly { statement_type: "CREATE TABLE".to_string() })
163            }
164            Statement::DropTable(_) => {
165                Err(ReadOnlyError::NotReadOnly { statement_type: "DROP TABLE".to_string() })
166            }
167            Statement::CreateIndex(_) => {
168                Err(ReadOnlyError::NotReadOnly { statement_type: "CREATE INDEX".to_string() })
169            }
170            Statement::DropIndex(_) => {
171                Err(ReadOnlyError::NotReadOnly { statement_type: "DROP INDEX".to_string() })
172            }
173            Statement::CreateView(_) => {
174                Err(ReadOnlyError::NotReadOnly { statement_type: "CREATE VIEW".to_string() })
175            }
176            Statement::DropView(_) => {
177                Err(ReadOnlyError::NotReadOnly { statement_type: "DROP VIEW".to_string() })
178            }
179            Statement::AlterTable(_) => {
180                Err(ReadOnlyError::NotReadOnly { statement_type: "ALTER TABLE".to_string() })
181            }
182            Statement::TruncateTable(_) => {
183                Err(ReadOnlyError::NotReadOnly { statement_type: "TRUNCATE".to_string() })
184            }
185            Statement::BeginTransaction(_) => {
186                Err(ReadOnlyError::NotReadOnly { statement_type: "BEGIN TRANSACTION".to_string() })
187            }
188            Statement::Commit(_) => {
189                Err(ReadOnlyError::NotReadOnly { statement_type: "COMMIT".to_string() })
190            }
191            Statement::Rollback(_) => {
192                Err(ReadOnlyError::NotReadOnly { statement_type: "ROLLBACK".to_string() })
193            }
194            _ => {
195                // Catch-all for other statement types
196                Err(ReadOnlyError::NotReadOnly {
197                    statement_type: format!("{:?}", std::mem::discriminant(&statement)),
198                })
199            }
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use vibesql_catalog::{ColumnSchema, TableSchema};
207    use vibesql_storage::Row;
208    use vibesql_types::{DataType, SqlValue};
209
210    use super::*;
211
212    fn create_test_db() -> Database {
213        let mut db = Database::new();
214        db.catalog.set_case_sensitive_identifiers(false);
215
216        // Create users table
217        let columns = vec![
218            ColumnSchema::new("id".to_string(), DataType::Integer, false),
219            ColumnSchema::new(
220                "name".to_string(),
221                DataType::Varchar { max_length: Some(100) },
222                true,
223            ),
224        ];
225        let schema =
226            TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
227        db.create_table(schema).unwrap();
228
229        // Insert test data
230        let row1 =
231            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]);
232        let row2 =
233            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]);
234        let row3 = Row::new(vec![
235            SqlValue::Integer(3),
236            SqlValue::Varchar(arcstr::ArcStr::from("Charlie")),
237        ]);
238
239        db.insert_row("users", row1).unwrap();
240        db.insert_row("users", row2).unwrap();
241        db.insert_row("users", row3).unwrap();
242
243        db
244    }
245
246    #[test]
247    fn test_query_select_all() {
248        let db = create_test_db();
249
250        let result = db.query("SELECT * FROM users").unwrap();
251        assert_eq!(result.rows.len(), 3);
252        assert_eq!(result.columns.len(), 2);
253    }
254
255    #[test]
256    fn test_query_select_with_where() {
257        let db = create_test_db();
258
259        let result = db.query("SELECT * FROM users WHERE id = 1").unwrap();
260        assert_eq!(result.rows.len(), 1);
261        assert_eq!(result.rows[0].values[0], SqlValue::Integer(1));
262        assert_eq!(result.rows[0].values[1], SqlValue::Varchar(arcstr::ArcStr::from("Alice")));
263    }
264
265    #[test]
266    fn test_query_select_specific_columns() {
267        let db = create_test_db();
268
269        let result = db.query("SELECT name FROM users WHERE id = 2").unwrap();
270        assert_eq!(result.rows.len(), 1);
271        assert_eq!(result.columns.len(), 1);
272        // Column names use short format by default (short_column_names)
273        assert_eq!(result.columns[0].to_lowercase(), "name");
274        assert_eq!(result.rows[0].values[0], SqlValue::Varchar(arcstr::ArcStr::from("Bob")));
275    }
276
277    #[test]
278    fn test_query_select_count() {
279        let db = create_test_db();
280
281        let result = db.query("SELECT COUNT(*) FROM users").unwrap();
282        assert_eq!(result.rows.len(), 1);
283        // COUNT returns Integer (3) in this implementation
284        assert_eq!(result.rows[0].values[0], SqlValue::Integer(3));
285    }
286
287    #[test]
288    fn test_query_rejects_insert() {
289        let db = create_test_db();
290
291        let result = db.query("INSERT INTO users (id, name) VALUES (4, 'David')");
292        assert!(matches!(
293            result,
294            Err(ReadOnlyError::NotReadOnly { statement_type }) if statement_type == "INSERT"
295        ));
296    }
297
298    #[test]
299    fn test_query_rejects_update() {
300        let db = create_test_db();
301
302        let result = db.query("UPDATE users SET name = 'Alicia' WHERE id = 1");
303        assert!(matches!(
304            result,
305            Err(ReadOnlyError::NotReadOnly { statement_type }) if statement_type == "UPDATE"
306        ));
307    }
308
309    #[test]
310    fn test_query_rejects_delete() {
311        let db = create_test_db();
312
313        let result = db.query("DELETE FROM users WHERE id = 1");
314        assert!(matches!(
315            result,
316            Err(ReadOnlyError::NotReadOnly { statement_type }) if statement_type == "DELETE"
317        ));
318    }
319
320    #[test]
321    fn test_query_rejects_create_table() {
322        let db = create_test_db();
323
324        let result = db.query("CREATE TABLE test (id INT)");
325        assert!(matches!(
326            result,
327            Err(ReadOnlyError::NotReadOnly { statement_type }) if statement_type == "CREATE TABLE"
328        ));
329    }
330
331    #[test]
332    fn test_query_rejects_drop_table() {
333        let db = create_test_db();
334
335        let result = db.query("DROP TABLE users");
336        assert!(matches!(
337            result,
338            Err(ReadOnlyError::NotReadOnly { statement_type }) if statement_type == "DROP TABLE"
339        ));
340    }
341
342    #[test]
343    fn test_query_rejects_truncate() {
344        let db = create_test_db();
345
346        let result = db.query("TRUNCATE TABLE users");
347        assert!(matches!(
348            result,
349            Err(ReadOnlyError::NotReadOnly { statement_type }) if statement_type == "TRUNCATE"
350        ));
351    }
352
353    #[test]
354    fn test_query_parse_error() {
355        let db = create_test_db();
356
357        let result = db.query("SELEKT * FROM users");
358        assert!(matches!(result, Err(ReadOnlyError::ParseError(_))));
359    }
360
361    #[test]
362    fn test_query_execution_error_table_not_found() {
363        let db = create_test_db();
364
365        let result = db.query("SELECT * FROM nonexistent");
366        assert!(matches!(result, Err(ReadOnlyError::ExecutionError(_))));
367    }
368
369    #[test]
370    fn test_query_with_order_by() {
371        let db = create_test_db();
372
373        let result = db.query("SELECT * FROM users ORDER BY id DESC").unwrap();
374        assert_eq!(result.rows.len(), 3);
375        // First row should be id=3 (Charlie)
376        assert_eq!(result.rows[0].values[0], SqlValue::Integer(3));
377        // Last row should be id=1 (Alice)
378        assert_eq!(result.rows[2].values[0], SqlValue::Integer(1));
379    }
380
381    #[test]
382    fn test_query_with_limit() {
383        let db = create_test_db();
384
385        let result = db.query("SELECT * FROM users LIMIT 2").unwrap();
386        assert_eq!(result.rows.len(), 2);
387    }
388
389    #[test]
390    fn test_query_with_aggregation() {
391        let db = create_test_db();
392
393        let result = db.query("SELECT COUNT(*), MAX(id), MIN(id) FROM users").unwrap();
394        assert_eq!(result.rows.len(), 1);
395        // COUNT returns Integer in this implementation
396        assert_eq!(result.rows[0].values[0], SqlValue::Integer(3)); // COUNT(*)
397        assert_eq!(result.rows[0].values[1], SqlValue::Integer(3)); // MAX(id)
398        assert_eq!(result.rows[0].values[2], SqlValue::Integer(1)); // MIN(id)
399    }
400
401    #[test]
402    fn test_query_immutability() {
403        let db = create_test_db();
404
405        // Execute multiple queries on the same &db reference
406        let result1 = db.query("SELECT COUNT(*) FROM users").unwrap();
407        let result2 = db.query("SELECT * FROM users WHERE id = 1").unwrap();
408        let result3 = db.query("SELECT name FROM users").unwrap();
409
410        // All queries should work and return expected results
411        // COUNT returns Integer in this implementation
412        assert_eq!(result1.rows[0].values[0], SqlValue::Integer(3));
413        assert_eq!(result2.rows.len(), 1);
414        assert_eq!(result3.rows.len(), 3);
415    }
416}
417
418// ============================================================================
419// SharedDatabase - Thread-safe wrapper for concurrent read/write access
420// ============================================================================
421
422/// Thread-safe database wrapper enabling concurrent read queries.
423///
424/// `SharedDatabase` wraps a `Database` in `Arc<RwLock<...>>` to enable:
425/// - **Concurrent reads**: Multiple `query()` calls can execute simultaneously
426/// - **Exclusive writes**: Mutations acquire exclusive access via `write()`
427///
428/// ## Performance
429///
430/// Using `SharedDatabase` enables significant throughput improvements for read-heavy
431/// workloads. On a multi-core system, concurrent read queries can achieve near-linear
432/// scaling with the number of cores.
433///
434/// | Metric | Sequential | Concurrent (4 cores) |
435/// |--------|------------|---------------------|
436/// | Read QPS | 1x | ~4x |
437/// | P99 latency | baseline | ~1.5x baseline |
438///
439/// ## Example
440///
441/// ```text
442/// use vibesql_executor::SharedDatabase;
443/// use vibesql_storage::Database;
444///
445/// // Create shared database
446/// let db = SharedDatabase::new(Database::new());
447///
448/// // Concurrent reads (acquire read lock internally)
449/// let result = db.query("SELECT * FROM users").await?;
450///
451/// // Exclusive writes
452/// let mut guard = db.write().await;
453/// guard.insert_row("users", row)?;
454/// // guard dropped, releasing write lock
455/// ```
456#[derive(Clone)]
457pub struct SharedDatabase {
458    inner: Arc<RwLock<Database>>,
459}
460
461impl SharedDatabase {
462    /// Create a new `SharedDatabase` wrapping the given database.
463    pub fn new(db: Database) -> Self {
464        Self { inner: Arc::new(RwLock::new(db)) }
465    }
466
467    /// Create a `SharedDatabase` from an existing `Arc<RwLock<Database>>`.
468    ///
469    /// This is useful when integrating with existing code that already uses
470    /// the Arc<RwLock<Database>> pattern.
471    pub fn from_arc(inner: Arc<RwLock<Database>>) -> Self {
472        Self { inner }
473    }
474
475    /// Get the inner `Arc<RwLock<Database>>`.
476    ///
477    /// This is useful when you need to pass the database to code that expects
478    /// the raw `Arc<RwLock<Database>>` type.
479    pub fn into_inner(self) -> Arc<RwLock<Database>> {
480        self.inner
481    }
482
483    /// Get a reference to the inner `Arc<RwLock<Database>>`.
484    pub fn as_arc(&self) -> &Arc<RwLock<Database>> {
485        &self.inner
486    }
487
488    /// Acquire a read lock for concurrent read access.
489    ///
490    /// Multiple readers can hold read locks simultaneously. Use this for
491    /// SELECT queries or any read-only operations.
492    pub async fn read(&self) -> RwLockReadGuard<'_, Database> {
493        self.inner.read().await
494    }
495
496    /// Acquire a write lock for exclusive write access.
497    ///
498    /// Only one writer can hold the write lock at a time, and no readers
499    /// can acquire read locks while a write lock is held.
500    pub async fn write(&self) -> RwLockWriteGuard<'_, Database> {
501        self.inner.write().await
502    }
503
504    /// Execute a read-only SQL query with automatic read lock management.
505    ///
506    /// This is a convenience method that:
507    /// 1. Acquires a read lock on the database
508    /// 2. Parses and executes the SQL query
509    /// 3. Returns the result, releasing the lock
510    ///
511    /// Only SELECT statements are allowed. Other statement types return
512    /// `ReadOnlyError::NotReadOnly`.
513    ///
514    /// ## Example
515    ///
516    /// ```text
517    /// let db = SharedDatabase::new(Database::new());
518    /// // ... setup tables and data ...
519    ///
520    /// // Execute concurrent queries from multiple tasks
521    /// let result = db.query("SELECT * FROM users WHERE active = true").await?;
522    /// ```
523    pub async fn query(&self, sql: &str) -> Result<SelectResult, ReadOnlyError> {
524        let guard = self.read().await;
525        guard.query(sql)
526    }
527}
528
529impl Default for SharedDatabase {
530    fn default() -> Self {
531        Self::new(Database::new())
532    }
533}
534
535impl std::fmt::Debug for SharedDatabase {
536    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537        f.debug_struct("SharedDatabase").field("inner", &"Arc<RwLock<Database>>").finish()
538    }
539}
540
541#[cfg(test)]
542mod shared_database_tests {
543    use super::*;
544    use vibesql_catalog::{ColumnSchema, TableSchema};
545    use vibesql_storage::Row;
546    use vibesql_types::{DataType, SqlValue};
547
548    async fn create_shared_test_db() -> SharedDatabase {
549        let mut db = Database::new();
550        db.catalog.set_case_sensitive_identifiers(false);
551
552        // Create users table
553        let columns = vec![
554            ColumnSchema::new("id".to_string(), DataType::Integer, false),
555            ColumnSchema::new(
556                "name".to_string(),
557                DataType::Varchar { max_length: Some(100) },
558                true,
559            ),
560        ];
561        let schema =
562            TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
563        db.create_table(schema).unwrap();
564
565        // Insert test data
566        let row1 =
567            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))]);
568        let row2 =
569            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))]);
570
571        db.insert_row("users", row1).unwrap();
572        db.insert_row("users", row2).unwrap();
573
574        SharedDatabase::new(db)
575    }
576
577    #[tokio::test]
578    async fn test_shared_query() {
579        let db = create_shared_test_db().await;
580
581        let result = db.query("SELECT * FROM users").await.unwrap();
582        assert_eq!(result.rows.len(), 2);
583    }
584
585    #[tokio::test]
586    async fn test_shared_query_with_filter() {
587        let db = create_shared_test_db().await;
588
589        let result = db.query("SELECT * FROM users WHERE id = 1").await.unwrap();
590        assert_eq!(result.rows.len(), 1);
591        assert_eq!(result.rows[0].values[0], SqlValue::Integer(1));
592    }
593
594    #[tokio::test]
595    async fn test_shared_query_rejects_mutations() {
596        let db = create_shared_test_db().await;
597
598        let result = db.query("INSERT INTO users VALUES (3, 'Charlie')").await;
599        assert!(matches!(result, Err(ReadOnlyError::NotReadOnly { .. })));
600    }
601
602    #[tokio::test]
603    async fn test_concurrent_reads() {
604        let db = create_shared_test_db().await;
605
606        // Spawn multiple concurrent read tasks
607        let mut handles = Vec::new();
608        for i in 0..10 {
609            let db_clone = db.clone();
610            handles.push(tokio::spawn(async move {
611                let result = db_clone.query("SELECT COUNT(*) FROM users").await.unwrap();
612                (i, result.rows[0].values[0].clone())
613            }));
614        }
615
616        // All should succeed with count = 2
617        for handle in handles {
618            let (_, count) = handle.await.unwrap();
619            assert_eq!(count, SqlValue::Integer(2));
620        }
621    }
622
623    #[tokio::test]
624    async fn test_read_write_isolation() {
625        let db = create_shared_test_db().await;
626
627        // Start a read
628        let result_before = db.query("SELECT COUNT(*) FROM users").await.unwrap();
629        assert_eq!(result_before.rows[0].values[0], SqlValue::Integer(2));
630
631        // Perform a write (table name is lowercased for case-insensitive lookup)
632        {
633            let mut guard = db.write().await;
634            let row = Row::new(vec![
635                SqlValue::Integer(3),
636                SqlValue::Varchar(arcstr::ArcStr::from("Charlie")),
637            ]);
638            guard.insert_row("users", row).unwrap();
639        }
640
641        // Read should see the new data
642        let result_after = db.query("SELECT COUNT(*) FROM users").await.unwrap();
643        assert_eq!(result_after.rows[0].values[0], SqlValue::Integer(3));
644    }
645
646    #[tokio::test]
647    async fn test_from_arc() {
648        let inner = Arc::new(RwLock::new(Database::new()));
649        let db = SharedDatabase::from_arc(inner.clone());
650
651        // Should share the same underlying database
652        assert!(Arc::ptr_eq(db.as_arc(), &inner));
653    }
654}