manifoldb 0.1.4

A multi-paradigm embedded database for graph, vector, and relational data
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
//! Prepared statements with query plan caching.
//!
//! This module provides [`PreparedStatement`] which caches the parsed AST and
//! logical plan for repeated query execution. Plans are automatically invalidated
//! when the schema changes (e.g., after DDL operations like CREATE/DROP TABLE/INDEX).
//!
//! # Example
//!
//! ```ignore
//! use manifoldb::{Database, Value};
//!
//! let db = Database::in_memory()?;
//!
//! // Prepare a statement once
//! let stmt = db.prepare("SELECT * FROM users WHERE age > $1")?;
//!
//! // Execute multiple times with different parameters
//! let young = stmt.query(&db, &[Value::Int(18)])?;
//! let old = stmt.query(&db, &[Value::Int(65)])?;
//! ```

use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};

use manifoldb_query::ast::Statement;
use manifoldb_query::plan::{LogicalPlan, PhysicalPlan, PhysicalPlanner, PlanBuilder};
use manifoldb_query::ExtendedParser;

use crate::error::{Error, Result};

/// A prepared statement with cached AST and query plan.
///
/// Prepared statements amortize the cost of parsing and planning by caching
/// these results. The cached plan is automatically invalidated when the
/// schema version changes.
///
/// # Thread Safety
///
/// `PreparedStatement` is `Send + Sync` and can be safely shared across threads.
/// Multiple threads can execute the same prepared statement concurrently.
#[derive(Debug)]
pub struct PreparedStatement {
    /// The original SQL text.
    sql: String,
    /// The cached parsed AST.
    ast: Statement,
    /// The cached logical plan.
    logical_plan: LogicalPlan,
    /// The cached physical plan.
    physical_plan: PhysicalPlan,
    /// The schema version when this statement was prepared.
    schema_version: u64,
    /// Tables accessed by this query (for cache invalidation).
    accessed_tables: HashSet<String>,
    /// Whether this is a DML statement (INSERT/UPDATE/DELETE).
    is_dml: bool,
    /// Whether this is a DDL statement (CREATE/DROP).
    is_ddl: bool,
}

impl PreparedStatement {
    /// Parse and plan a SQL statement.
    ///
    /// # Arguments
    ///
    /// * `sql` - The SQL statement to prepare
    /// * `schema_version` - The current schema version for invalidation tracking
    ///
    /// # Errors
    ///
    /// Returns an error if the SQL cannot be parsed or planned.
    pub fn new(sql: &str, schema_version: u64) -> Result<Self> {
        // Parse the SQL
        let ast = ExtendedParser::parse_single(sql)?;

        // Build logical plan
        let mut builder = PlanBuilder::new();
        let logical_plan =
            builder.build_statement(&ast).map_err(|e| Error::Parse(e.to_string()))?;

        // Build physical plan
        let planner = PhysicalPlanner::new();
        let physical_plan = planner.plan(&logical_plan);

        // Extract accessed tables from the plan
        let accessed_tables = Self::extract_tables(&logical_plan);

        // Determine statement type
        let (is_dml, is_ddl) = Self::classify_statement(&logical_plan);

        Ok(Self {
            sql: sql.to_string(),
            ast,
            logical_plan,
            physical_plan,
            schema_version,
            accessed_tables,
            is_dml,
            is_ddl,
        })
    }

    /// Returns the original SQL text.
    #[must_use]
    pub fn sql(&self) -> &str {
        &self.sql
    }

    /// Returns the schema version when this statement was prepared.
    #[must_use]
    pub fn schema_version(&self) -> u64 {
        self.schema_version
    }

    /// Returns the parsed AST.
    #[must_use]
    pub fn ast(&self) -> &Statement {
        &self.ast
    }

    /// Returns the logical plan.
    #[must_use]
    pub fn logical_plan(&self) -> &LogicalPlan {
        &self.logical_plan
    }

    /// Returns the physical plan.
    #[must_use]
    pub fn physical_plan(&self) -> &PhysicalPlan {
        &self.physical_plan
    }

    /// Returns the tables accessed by this statement.
    #[must_use]
    pub fn accessed_tables(&self) -> &HashSet<String> {
        &self.accessed_tables
    }

    /// Returns true if this is a DML statement (INSERT/UPDATE/DELETE).
    #[must_use]
    pub fn is_dml(&self) -> bool {
        self.is_dml
    }

    /// Returns true if this is a DDL statement (CREATE/DROP TABLE/INDEX).
    #[must_use]
    pub fn is_ddl(&self) -> bool {
        self.is_ddl
    }

    /// Returns true if this is a query (SELECT).
    #[must_use]
    pub fn is_query(&self) -> bool {
        !self.is_dml && !self.is_ddl
    }

    /// Check if this prepared statement is still valid for the given schema version.
    ///
    /// A prepared statement becomes invalid when the schema changes after it
    /// was prepared (e.g., due to DDL operations).
    #[must_use]
    pub fn is_valid(&self, current_schema_version: u64) -> bool {
        self.schema_version == current_schema_version
    }

    /// Extract table names from a logical plan.
    fn extract_tables(plan: &LogicalPlan) -> HashSet<String> {
        let mut tables = HashSet::new();
        Self::extract_tables_recursive(plan, &mut tables);
        tables
    }

    fn extract_tables_recursive(plan: &LogicalPlan, tables: &mut HashSet<String>) {
        match plan {
            LogicalPlan::Scan(scan_node) => {
                tables.insert(scan_node.table_name.clone());
            }
            LogicalPlan::Insert { table, input, .. } => {
                tables.insert(table.clone());
                Self::extract_tables_recursive(input, tables);
            }
            LogicalPlan::Update { table, .. } => {
                tables.insert(table.clone());
            }
            LogicalPlan::Delete { table, .. } => {
                tables.insert(table.clone());
            }
            LogicalPlan::CreateTable(node) => {
                tables.insert(node.name.clone());
            }
            LogicalPlan::DropTable(node) => {
                for name in &node.names {
                    tables.insert(name.clone());
                }
            }
            LogicalPlan::CreateIndex(node) => {
                tables.insert(node.table.clone());
            }
            LogicalPlan::DropIndex(_) => {
                // DROP INDEX doesn't directly reference a table in the node
            }
            _ => {
                // Recurse into children
                for child in plan.children() {
                    Self::extract_tables_recursive(child, tables);
                }
            }
        }
    }

    /// Classify the statement type.
    fn classify_statement(plan: &LogicalPlan) -> (bool, bool) {
        match plan {
            LogicalPlan::Insert { .. }
            | LogicalPlan::Update { .. }
            | LogicalPlan::Delete { .. } => (true, false),
            LogicalPlan::CreateTable(_)
            | LogicalPlan::DropTable(_)
            | LogicalPlan::CreateIndex(_)
            | LogicalPlan::DropIndex(_) => (false, true),
            _ => (false, false),
        }
    }
}

/// A cache of prepared statements.
///
/// This cache stores prepared statements by their SQL text and automatically
/// invalidates them when the schema changes.
#[derive(Debug)]
pub struct PreparedStatementCache {
    /// The cached prepared statements.
    statements: RwLock<std::collections::HashMap<String, Arc<PreparedStatement>>>,
    /// Maximum number of cached statements.
    max_size: usize,
    /// Current schema version.
    schema_version: AtomicU64,
    /// Cache metrics.
    hits: AtomicU64,
    misses: AtomicU64,
    invalidations: AtomicU64,
}

impl PreparedStatementCache {
    /// Create a new prepared statement cache with the given maximum size.
    #[must_use]
    pub fn new(max_size: usize) -> Self {
        Self {
            statements: RwLock::new(std::collections::HashMap::new()),
            max_size,
            schema_version: AtomicU64::new(0),
            hits: AtomicU64::new(0),
            misses: AtomicU64::new(0),
            invalidations: AtomicU64::new(0),
        }
    }

    /// Create a new cache with default settings.
    #[must_use]
    pub fn default_cache() -> Self {
        Self::new(1000)
    }

    /// Get or create a prepared statement for the given SQL.
    ///
    /// If the statement is already cached and the schema hasn't changed,
    /// returns the cached statement. Otherwise, prepares a new statement.
    ///
    /// # Errors
    ///
    /// Returns an error if the SQL cannot be parsed/planned, or if an internal
    /// lock is poisoned.
    pub fn get_or_prepare(&self, sql: &str) -> Result<Arc<PreparedStatement>> {
        let schema_version = self.schema_version.load(Ordering::Acquire);

        // Try to get from cache
        {
            let cache = self
                .statements
                .read()
                .map_err(|_| Error::lock_poisoned("prepared statement cache read lock"))?;
            if let Some(stmt) = cache.get(sql) {
                if stmt.is_valid(schema_version) {
                    self.hits.fetch_add(1, Ordering::Relaxed);
                    return Ok(Arc::clone(stmt));
                }
            }
        }

        // Cache miss or invalid - prepare a new statement
        self.misses.fetch_add(1, Ordering::Relaxed);
        let stmt = Arc::new(PreparedStatement::new(sql, schema_version)?);

        // Insert into cache
        {
            let mut cache = self
                .statements
                .write()
                .map_err(|_| Error::lock_poisoned("prepared statement cache write lock"))?;

            // If cache is full, remove oldest entries (simple LRU-ish eviction)
            if cache.len() >= self.max_size {
                // Remove 10% of entries (minimum 1)
                let to_remove = (self.max_size / 10).max(1);
                let keys_to_remove: Vec<String> = cache.keys().take(to_remove).cloned().collect();
                for key in keys_to_remove {
                    cache.remove(&key);
                }
            }

            cache.insert(sql.to_string(), Arc::clone(&stmt));
        }

        Ok(stmt)
    }

    /// Prepare a statement (always creates a new one, bypassing cache).
    pub fn prepare(&self, sql: &str) -> Result<Arc<PreparedStatement>> {
        let schema_version = self.schema_version.load(Ordering::Acquire);
        Ok(Arc::new(PreparedStatement::new(sql, schema_version)?))
    }

    /// Update the schema version, invalidating all cached statements.
    pub fn set_schema_version(&self, version: u64) {
        let old_version = self.schema_version.swap(version, Ordering::Release);
        if old_version != version {
            self.invalidations.fetch_add(1, Ordering::Relaxed);
        }
    }

    /// Get the current schema version.
    #[must_use]
    pub fn schema_version(&self) -> u64 {
        self.schema_version.load(Ordering::Acquire)
    }

    /// Clear all cached statements.
    ///
    /// # Errors
    ///
    /// Returns an error if the internal lock is poisoned.
    pub fn clear(&self) -> Result<()> {
        let mut cache = self
            .statements
            .write()
            .map_err(|_| Error::lock_poisoned("prepared statement cache write lock"))?;
        cache.clear();
        Ok(())
    }

    /// Invalidate cached statements that access specific tables.
    ///
    /// # Errors
    ///
    /// Returns an error if the internal lock is poisoned.
    pub fn invalidate_tables(&self, tables: &[String]) -> Result<()> {
        if tables.is_empty() {
            return Ok(());
        }

        let tables_set: HashSet<&String> = tables.iter().collect();
        let mut cache = self
            .statements
            .write()
            .map_err(|_| Error::lock_poisoned("prepared statement cache write lock"))?;

        cache.retain(|_, stmt| !stmt.accessed_tables().iter().any(|t| tables_set.contains(t)));
        Ok(())
    }

    /// Returns the number of cached statements.
    ///
    /// # Errors
    ///
    /// Returns an error if the internal lock is poisoned.
    pub fn len(&self) -> Result<usize> {
        let cache = self
            .statements
            .read()
            .map_err(|_| Error::lock_poisoned("prepared statement cache read lock"))?;
        Ok(cache.len())
    }

    /// Returns true if the cache is empty.
    ///
    /// # Errors
    ///
    /// Returns an error if the internal lock is poisoned.
    pub fn is_empty(&self) -> Result<bool> {
        let cache = self
            .statements
            .read()
            .map_err(|_| Error::lock_poisoned("prepared statement cache read lock"))?;
        Ok(cache.is_empty())
    }

    /// Returns the cache hit count.
    #[must_use]
    pub fn hits(&self) -> u64 {
        self.hits.load(Ordering::Relaxed)
    }

    /// Returns the cache miss count.
    #[must_use]
    pub fn misses(&self) -> u64 {
        self.misses.load(Ordering::Relaxed)
    }

    /// Returns the number of invalidations.
    #[must_use]
    pub fn invalidations(&self) -> u64 {
        self.invalidations.load(Ordering::Relaxed)
    }

    /// Returns the cache hit rate as a percentage.
    #[must_use]
    pub fn hit_rate(&self) -> Option<f64> {
        let hits = self.hits.load(Ordering::Relaxed);
        let misses = self.misses.load(Ordering::Relaxed);
        let total = hits + misses;
        if total == 0 {
            None
        } else {
            Some((hits as f64 / total as f64) * 100.0)
        }
    }

    /// Reset cache metrics.
    pub fn reset_metrics(&self) {
        self.hits.store(0, Ordering::Relaxed);
        self.misses.store(0, Ordering::Relaxed);
        self.invalidations.store(0, Ordering::Relaxed);
    }
}

impl Default for PreparedStatementCache {
    fn default() -> Self {
        Self::default_cache()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_prepare_select() {
        let stmt = PreparedStatement::new("SELECT * FROM users WHERE id = $1", 0)
            .expect("should parse SELECT statement");
        assert!(!stmt.is_dml());
        assert!(!stmt.is_ddl());
        assert!(stmt.is_query());
        assert!(stmt.accessed_tables().contains("users"));
    }

    #[test]
    fn test_prepare_insert() {
        let stmt = PreparedStatement::new("INSERT INTO users (name) VALUES ($1)", 0)
            .expect("should parse INSERT statement");
        assert!(stmt.is_dml());
        assert!(!stmt.is_ddl());
        assert!(!stmt.is_query());
        assert!(stmt.accessed_tables().contains("users"));
    }

    #[test]
    fn test_prepare_update() {
        let stmt = PreparedStatement::new("UPDATE users SET name = $1 WHERE id = $2", 0)
            .expect("should parse UPDATE statement");
        assert!(stmt.is_dml());
        assert!(!stmt.is_ddl());
        assert!(stmt.accessed_tables().contains("users"));
    }

    #[test]
    fn test_prepare_delete() {
        let stmt = PreparedStatement::new("DELETE FROM users WHERE id = $1", 0)
            .expect("should parse DELETE statement");
        assert!(stmt.is_dml());
        assert!(!stmt.is_ddl());
        assert!(stmt.accessed_tables().contains("users"));
    }

    #[test]
    fn test_schema_version_validity() {
        let stmt = PreparedStatement::new("SELECT * FROM users", 5).expect("should parse SELECT");
        assert!(stmt.is_valid(5));
        assert!(!stmt.is_valid(6));
        assert!(!stmt.is_valid(4));
    }

    #[test]
    fn test_cache_basic() {
        let cache = PreparedStatementCache::new(100);

        // First access - cache miss
        let stmt1 = cache.get_or_prepare("SELECT * FROM users").expect("should prepare statement");
        assert_eq!(cache.hits(), 0);
        assert_eq!(cache.misses(), 1);

        // Second access - cache hit
        let stmt2 =
            cache.get_or_prepare("SELECT * FROM users").expect("should get cached statement");
        assert_eq!(cache.hits(), 1);
        assert_eq!(cache.misses(), 1);

        // Same statement
        assert!(Arc::ptr_eq(&stmt1, &stmt2));
    }

    #[test]
    fn test_cache_invalidation_on_schema_change() {
        let cache = PreparedStatementCache::new(100);

        // Prepare at version 0
        let stmt1 = cache.get_or_prepare("SELECT * FROM users").expect("should prepare statement");
        assert_eq!(stmt1.schema_version(), 0);

        // Change schema version
        cache.set_schema_version(1);

        // Access again - should re-prepare due to schema change
        let stmt2 = cache
            .get_or_prepare("SELECT * FROM users")
            .expect("should re-prepare after schema change");
        assert_eq!(stmt2.schema_version(), 1);

        // Different statement instance
        assert!(!Arc::ptr_eq(&stmt1, &stmt2));
    }

    #[test]
    fn test_cache_table_invalidation() {
        let cache = PreparedStatementCache::new(100);

        // Prepare statements for different tables
        cache.get_or_prepare("SELECT * FROM users").expect("should prepare users query");
        cache.get_or_prepare("SELECT * FROM orders").expect("should prepare orders query");
        assert_eq!(cache.len().expect("should get cache len"), 2);

        // Invalidate users table
        cache.invalidate_tables(&["users".to_string()]).expect("should invalidate tables");
        assert_eq!(cache.len().expect("should get cache len"), 1);

        // Orders should still be cached
        cache.get_or_prepare("SELECT * FROM orders").expect("should get cached orders query");
        assert_eq!(cache.hits(), 1);
    }

    #[test]
    fn test_cache_clear() {
        let cache = PreparedStatementCache::new(100);

        cache.get_or_prepare("SELECT * FROM users").expect("should prepare users query");
        cache.get_or_prepare("SELECT * FROM orders").expect("should prepare orders query");
        assert_eq!(cache.len().expect("should get cache len"), 2);

        cache.clear().expect("should clear cache");
        assert!(cache.is_empty().expect("should check if empty"));
    }

    #[test]
    fn test_cache_eviction() {
        let cache = PreparedStatementCache::new(5);

        // Fill the cache
        for i in 0..10 {
            cache
                .get_or_prepare(&format!("SELECT * FROM table{i}"))
                .expect("should prepare statement");
        }

        // Cache should not exceed max size (though it may temporarily during eviction)
        assert!(cache.len().expect("should get cache len") <= 5);
    }

    #[test]
    fn test_concurrent_schema_version_change() {
        use std::sync::Arc;
        use std::thread;

        let cache = Arc::new(PreparedStatementCache::new(100));

        // Prepare initial statement
        cache.get_or_prepare("SELECT * FROM users").expect("should prepare statement");

        let cache_clone = Arc::clone(&cache);
        let handle = thread::spawn(move || {
            // Simulate DDL changing the schema
            cache_clone.set_schema_version(1);
        });

        handle.join().expect("thread should complete");

        // After schema change, should re-prepare
        let stmt = cache
            .get_or_prepare("SELECT * FROM users")
            .expect("should re-prepare after concurrent schema change");
        assert_eq!(stmt.schema_version(), 1);
    }

    #[test]
    fn test_invalidate_empty_tables_is_noop() {
        let cache = PreparedStatementCache::new(100);

        cache.get_or_prepare("SELECT * FROM users").expect("should prepare statement");
        assert_eq!(cache.len().expect("should get cache len"), 1);

        // Invalidating empty list should be a no-op
        cache.invalidate_tables(&[]).expect("should handle empty table list");
        assert_eq!(cache.len().expect("should get cache len"), 1);
    }

    #[test]
    fn test_parse_error_returns_error() {
        let result = PreparedStatement::new("INVALID SQL SYNTAX HERE", 0);
        assert!(result.is_err(), "invalid SQL should return error");
    }
}