Skip to main content

cqlite_core/
lib.rs

1//! CQLite Core Database Engine
2//!
3//! A high-performance, embeddable database engine with SSTable-based storage,
4//! supporting both native and WASM deployments.
5
6pub mod config;
7pub mod cql;
8pub mod error;
9pub mod parser;
10// DISABLED FOR M1: Security and performance modules causing compilation errors
11// pub mod performance;
12// pub mod security; // Security framework for comprehensive protection
13pub mod types;
14pub mod util;
15pub mod version_hints;
16
17pub mod benchmarks;
18pub mod memory;
19pub mod platform;
20#[cfg(feature = "state_machine")]
21pub mod query;
22pub mod schema;
23pub mod storage;
24
25// Embeddable export writers (Epic #682). The module is always present; the
26// Parquet writer inside it is gated behind the optional `parquet` feature.
27pub mod export;
28
29// M5: Write engine and serialization modules (Issue #359)
30// Re-exported at crate level for convenience when write-support is enabled
31#[cfg(feature = "write-support")]
32pub use storage::serialization;
33#[cfg(feature = "write-support")]
34pub use storage::write_engine;
35
36// Ingestion module for one-shot schema & SSTable discovery (Issue #249: CLI-specific)
37#[cfg(feature = "cli-helpers")]
38pub mod ingestion;
39
40// Discovery module for SSTable scanning and coverage analysis
41#[cfg(feature = "state_machine")]
42pub mod discovery;
43
44// Testing utilities - hidden from public docs via #[doc(hidden)] but available for integration tests
45#[doc(hidden)]
46pub mod testing;
47
48// NOTE: memory_safety_runner moved to tools/memory-safety-runner (Issue #245)
49// NOTE: memory_safety_tests disabled - MemTable removed in Issue #175
50
51// Re-export main types for convenience
52pub use crate::{
53    config::Config,
54    error::{Error, Result},
55    platform::Platform,
56    types::*,
57};
58
59// Re-export query types when state_machine feature is enabled
60#[cfg(feature = "state_machine")]
61pub use query::SchemaStatus;
62
63use std::path::Path;
64#[cfg(feature = "state_machine")]
65use std::path::PathBuf;
66use std::sync::Arc;
67
68use crate::{memory::MemoryManager, storage::StorageEngine};
69
70#[cfg(feature = "state_machine")]
71use crate::schema::SchemaManager;
72
73#[cfg(feature = "state_machine")]
74use crate::query::QueryEngine;
75
76/// Main database handle
77///
78/// This is the primary interface for interacting with a CQLite database.
79/// It coordinates between the storage engine, schema manager, and query engine.
80#[derive(Debug)]
81pub struct Database {
82    storage: Arc<StorageEngine>,
83    #[cfg(feature = "state_machine")]
84    query: Arc<QueryEngine>,
85    memory: Arc<MemoryManager>,
86    config: Config,
87}
88
89impl Database {
90    /// Open a database at the given path with the specified configuration
91    ///
92    /// # Arguments
93    ///
94    /// * `path` - The directory path where the database files will be stored
95    /// * `config` - Database configuration options
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if:
100    /// - The path cannot be created or accessed
101    /// - Database files are corrupted
102    /// - Configuration is invalid
103    ///
104    /// # Examples
105    ///
106    /// ```rust,no_run
107    /// use cqlite_core::{Database, Config};
108    /// use std::path::{Path, PathBuf};
109    ///
110    /// # tokio_test::block_on(async {
111    /// let config = Config::default();
112    /// let db = Database::open(Path::new("./data"), config).await?;
113    /// # Ok::<(), Box<dyn std::error::Error>>(())
114    /// # });
115    /// ```
116    pub async fn open(path: &Path, config: Config) -> Result<Self> {
117        // Initialize platform abstraction layer
118        let platform = Arc::new(Platform::new(&config).await?);
119
120        // Initialize memory manager
121        let memory = Arc::new(MemoryManager::new(&config)?);
122
123        // Initialize storage engine (no schema registry for simple open)
124        let storage = Arc::new(
125            StorageEngine::open(
126                path,
127                &config,
128                platform.clone(),
129                #[cfg(feature = "state_machine")]
130                None,
131            )
132            .await?,
133        );
134
135        // Initialize schema manager
136        #[cfg(feature = "state_machine")]
137        let schema = Arc::new(SchemaManager::new_with_storage(storage.clone(), &config).await?);
138
139        // Initialize query engine (only when feature enabled)
140        #[cfg(feature = "state_machine")]
141        let query = Arc::new(QueryEngine::new(
142            storage.clone(),
143            schema.clone(),
144            memory.clone(),
145            &config,
146        )?);
147
148        Ok(Self {
149            storage,
150            #[cfg(feature = "state_machine")]
151            query,
152            memory,
153            config,
154        })
155    }
156
157    /// Open a database with pre-discovered SSTable table directories
158    ///
159    /// This method is used in the ingestion flow where SSTable discovery has been performed
160    /// externally (e.g., via `DiscoveryService`) and the database should be initialized with
161    /// specific SSTable files rather than scanning the storage directory.
162    ///
163    /// # Use Case
164    ///
165    /// This method is designed for the one-shot ingestion workflow:
166    /// 1. `DiscoveryService::discover()` scans external Cassandra data directories
167    /// 2. `SchemaManager` parses schema from discovered files
168    /// 3. `Database::open_with_discovered_sstables()` creates a queryable database instance
169    ///
170    /// # Arguments
171    ///
172    /// * `storage_path` - The directory path for database runtime files (WAL, manifest, memtable)
173    /// * `discovered_table_dirs` - Vector of table directory paths from DiscoveryService
174    ///   (e.g., `/var/lib/cassandra/data/keyspace1/table1-abc123`)
175    /// * `config` - Database configuration options
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if:
180    /// - The storage path cannot be created or accessed
181    /// - Any discovered table directory cannot be read
182    /// - Configuration is invalid
183    /// - Storage engine or query engine initialization fails
184    ///
185    /// # Feature Gates
186    ///
187    /// This method is only available when the `state_machine` feature is enabled (default in M2+).
188    ///
189    /// # Examples
190    ///
191    /// ```rust,no_run
192    /// use cqlite_core::{Database, Config};
193    /// use std::path::{Path, PathBuf};
194    ///
195    /// # tokio_test::block_on(async {
196    /// let config = Config::default();
197    /// let storage_path = Path::new("./runtime");
198    /// let discovered_dirs = vec![
199    ///     PathBuf::from("/var/lib/cassandra/data/keyspace1/table1-abc123"),
200    ///     PathBuf::from("/var/lib/cassandra/data/keyspace1/table2-def456"),
201    /// ];
202    ///
203    /// let db = Database::open_with_discovered_sstables(
204    ///     storage_path,
205    ///     discovered_dirs,
206    ///     config
207    /// ).await?;
208    /// # Ok::<(), Box<dyn std::error::Error>>(())
209    /// # });
210    /// ```
211    #[cfg(feature = "state_machine")]
212    pub async fn open_with_discovered_sstables(
213        storage_path: &Path,
214        discovered_table_dirs: Vec<PathBuf>,
215        config: Config,
216    ) -> Result<Self> {
217        Self::open_with_discovered_sstables_and_registry(
218            storage_path,
219            discovered_table_dirs,
220            config,
221            None,
222        )
223        .await
224    }
225
226    /// Open a database with pre-discovered SSTable table directories and optional schema registry
227    ///
228    /// This is the internal implementation that supports passing a pre-loaded schema registry.
229    /// Public callers should use `open_with_discovered_sstables()` which calls this with None.
230    /// The ingestion module uses this directly to pass loaded schemas.
231    ///
232    /// # Arguments
233    ///
234    /// * `storage_path` - The directory path for database runtime files
235    /// * `discovered_table_dirs` - Vector of table directory paths from DiscoveryService
236    /// * `config` - Database configuration options
237    /// * `schema_registry` - Optional pre-loaded schema registry from ingestion
238    #[cfg(feature = "state_machine")]
239    pub(crate) async fn open_with_discovered_sstables_and_registry(
240        storage_path: &Path,
241        discovered_table_dirs: Vec<PathBuf>,
242        config: Config,
243        schema_registry: Option<Arc<tokio::sync::RwLock<schema::SchemaRegistry>>>,
244    ) -> Result<Self> {
245        // Initialize platform abstraction layer
246        let platform = Arc::new(Platform::new(&config).await?);
247
248        // Initialize memory manager
249        let memory = Arc::new(MemoryManager::new(&config)?);
250
251        // Initialize storage engine with pre-discovered SSTables and schema registry
252        let storage = Arc::new(
253            StorageEngine::open_with_sstables(
254                storage_path,
255                discovered_table_dirs,
256                &config,
257                platform.clone(),
258                schema_registry.clone(),
259            )
260            .await?,
261        );
262
263        // Initialize schema manager - use registry if provided, otherwise create empty
264        let schema = if let Some(registry_rwlock) = schema_registry {
265            Arc::new(
266                SchemaManager::new_with_registry(storage.clone(), registry_rwlock, &config).await?,
267            )
268        } else {
269            Arc::new(SchemaManager::new_with_storage(storage.clone(), &config).await?)
270        };
271
272        // Initialize query engine
273        let query = Arc::new(QueryEngine::new(
274            storage.clone(),
275            schema.clone(),
276            memory.clone(),
277            &config,
278        )?);
279
280        Ok(Self {
281            storage,
282            query,
283            memory,
284            config,
285        })
286    }
287
288    /// Execute a SQL query and return the result
289    ///
290    /// # Arguments
291    ///
292    /// * `sql` - The SQL query string to execute
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if:
297    /// - SQL syntax is invalid
298    /// - Referenced tables/columns don't exist
299    /// - Query execution fails
300    ///
301    /// # Examples
302    ///
303    /// ```rust,no_run
304    /// # use cqlite_core::{Database, Config};
305    /// # use std::path::{Path, PathBuf};
306    /// # tokio_test::block_on(async {
307    /// # let config = Config::default();
308    /// # let db = Database::open(Path::new("./data"), config).await?;
309    /// let result = db.execute("SELECT * FROM users WHERE id = 1").await?;
310    /// # Ok::<(), Box<dyn std::error::Error>>(())
311    /// # });
312    /// ```
313    #[cfg(feature = "state_machine")]
314    pub async fn execute(&self, sql: &str) -> Result<query::result::QueryResult> {
315        let result = self.query.execute(sql).await;
316
317        #[cfg(debug_assertions)]
318        if let Ok(ref query_result) = result {
319            log::debug!(
320                "Database::execute('{}') returning rows_affected: {}",
321                sql,
322                query_result.rows_affected
323            );
324        }
325
326        result
327    }
328
329    /// Execute a SQL query with streaming results (Issue #280)
330    ///
331    /// Returns a `QueryResultIterator` that yields rows incrementally via a bounded
332    /// channel, enabling memory-efficient processing of large result sets.
333    ///
334    /// This is the recommended method for exporting large tables, as it avoids
335    /// materializing all rows in memory at once.
336    ///
337    /// # Arguments
338    ///
339    /// * `sql` - The SQL query to execute (must be a SELECT statement)
340    /// * `config` - Streaming configuration (buffer size, chunk hints)
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if:
345    /// - Query is not a SELECT statement
346    /// - SQL syntax is invalid
347    /// - Query execution fails
348    ///
349    /// # Examples
350    ///
351    /// ```rust,no_run
352    /// # use cqlite_core::{Database, Config};
353    /// # use cqlite_core::query::result::StreamingConfig;
354    /// # use std::path::Path;
355    /// # tokio_test::block_on(async {
356    /// # let db = Database::open(Path::new("./data"), Config::default()).await?;
357    /// let config = StreamingConfig::default();
358    /// let mut iter = db.execute_streaming(
359    ///     "SELECT * FROM large_table",
360    ///     config
361    /// ).await?;
362    ///
363    /// while let Some(row_result) = iter.next_async().await {
364    ///     let row = row_result?;
365    ///     // Process row incrementally
366    /// }
367    /// # Ok::<(), Box<dyn std::error::Error>>(())
368    /// # });
369    /// ```
370    #[cfg(feature = "state_machine")]
371    pub async fn execute_streaming(
372        &self,
373        sql: &str,
374        config: query::result::StreamingConfig,
375    ) -> Result<query::result::QueryResultIterator> {
376        self.query.execute_streaming(sql, config).await
377    }
378
379    /// Prepare a SQL statement for repeated execution
380    ///
381    /// # Arguments
382    ///
383    /// * `sql` - The SQL statement to prepare
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if SQL syntax is invalid or references non-existent objects
388    #[cfg(feature = "state_machine")]
389    pub async fn prepare(&self, sql: &str) -> Result<std::sync::Arc<query::PreparedQuery>> {
390        self.query.prepare(sql).await
391    }
392
393    /// Explain a SQL query without executing it
394    ///
395    /// # Arguments
396    ///
397    /// * `sql` - The SQL query to explain
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if SQL syntax is invalid
402    #[cfg(feature = "state_machine")]
403    pub async fn explain(&self, sql: &str) -> Result<query::ExplainResult> {
404        self.query.explain(sql).await
405    }
406
407    /// Check if schema is available for a table
408    ///
409    /// This is a fast boolean check useful for pre-flight validation.
410    /// For detailed diagnostic information, use `schema_status()`.
411    ///
412    /// # Examples
413    ///
414    /// ```rust,no_run
415    /// # use cqlite_core::{Database, Config};
416    /// # tokio_test::block_on(async {
417    /// let db = Database::open(std::path::Path::new("./data"), Config::default()).await?;
418    ///
419    /// if !db.has_schema_for_table("users").await {
420    ///     eprintln!("Warning: No schema found for 'users' table");
421    /// }
422    /// # Ok::<(), Box<dyn std::error::Error>>(())
423    /// # });
424    /// ```
425    #[cfg(feature = "state_machine")]
426    pub async fn has_schema_for_table(&self, table: &str) -> bool {
427        self.query.has_schema_for_table(table).await
428    }
429
430    /// Get detailed schema status for debugging
431    ///
432    /// Returns diagnostic information about schema availability including
433    /// reasons for missing schemas or extraction failures.
434    ///
435    /// # Examples
436    ///
437    /// ```rust,no_run
438    /// # use cqlite_core::{Database, Config};
439    /// # use cqlite_core::query::SchemaStatus;
440    /// # tokio_test::block_on(async {
441    /// let db = Database::open(std::path::Path::new("./data"), Config::default()).await?;
442    ///
443    /// match db.schema_status("users").await {
444    ///     SchemaStatus::Available { .. } => println!("Schema ready"),
445    ///     SchemaStatus::ExtractionFailed { cause, suggestion, .. } => {
446    ///         eprintln!("Schema extraction failed: {}", cause);
447    ///         eprintln!("Suggestion: {}", suggestion);
448    ///     }
449    ///     _ => {}
450    /// }
451    /// # Ok::<(), Box<dyn std::error::Error>>(())
452    /// # });
453    /// ```
454    #[cfg(feature = "state_machine")]
455    pub async fn schema_status(&self, table: &str) -> query::SchemaStatus {
456        self.query.schema_status(table).await
457    }
458
459    /// Get database statistics
460    pub async fn stats(&self) -> Result<DatabaseStats> {
461        Ok(DatabaseStats {
462            storage_stats: self.storage.stats().await?,
463            memory_stats: self.memory.stats()?,
464            #[cfg(feature = "state_machine")]
465            query_stats: self.query.stats(),
466        })
467    }
468
469    /// Flush all pending writes to disk
470    #[cfg(feature = "experimental")]
471    pub async fn flush(&self) -> Result<()> {
472        self.storage.flush().await
473    }
474
475    /// Perform manual compaction of storage files
476    #[cfg(feature = "experimental")]
477    pub async fn compact(&self) -> Result<()> {
478        self.storage.compact().await
479    }
480
481    /// Shutdown the database storage engine without consuming self.
482    ///
483    /// This is useful for language bindings where the Database is wrapped
484    /// in an Arc and cannot be consumed. The shutdown operation is idempotent.
485    ///
486    /// For consuming close that also drops the Database, use `close()`.
487    pub async fn shutdown(&self) -> Result<()> {
488        self.storage.shutdown().await
489    }
490
491    /// Close the database and release all resources
492    ///
493    /// This method ensures all pending operations are completed and
494    /// all resources are properly cleaned up.
495    pub async fn close(self) -> Result<()> {
496        // Stop background tasks
497        self.storage.shutdown().await?;
498
499        // Flush any remaining data (only with experimental feature)
500        #[cfg(feature = "experimental")]
501        {
502            self.storage.flush().await?;
503        }
504
505        Ok(())
506    }
507
508    /// Get the database configuration
509    pub fn config(&self) -> &Config {
510        &self.config
511    }
512}
513
514impl Clone for Database {
515    fn clone(&self) -> Self {
516        Self {
517            storage: self.storage.clone(),
518            #[cfg(feature = "state_machine")]
519            query: self.query.clone(),
520            memory: self.memory.clone(),
521            config: self.config.clone(),
522        }
523    }
524}
525
526/// Database statistics
527#[derive(Debug, Clone)]
528pub struct DatabaseStats {
529    /// Storage engine statistics
530    pub storage_stats: storage::StorageStats,
531    /// Memory manager statistics
532    pub memory_stats: memory::MemoryStats,
533    /// Query engine statistics
534    #[cfg(feature = "state_machine")]
535    pub query_stats: query::QueryStats,
536}
537
538/// A prepared SQL statement that can be executed multiple times
539#[cfg(feature = "state_machine")]
540#[derive(Debug)]
541pub struct PreparedStatement {
542    statement: query::PreparedQuery,
543}
544
545#[cfg(feature = "state_machine")]
546impl PreparedStatement {
547    /// Execute the prepared statement with the given parameters
548    pub async fn execute(&self, params: &[Value]) -> Result<query::result::QueryResult> {
549        self.statement.execute(params).await
550    }
551}
552
553// Re-export query result types for convenience
554#[cfg(feature = "state_machine")]
555pub use query::result::{QueryResult, QueryRow};
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use tempfile::TempDir;
561
562    #[tokio::test]
563    async fn test_database_open_close() {
564        let temp_dir = TempDir::new().unwrap();
565        let config = Config::test_config();
566
567        let db = Database::open(temp_dir.path(), config).await.unwrap();
568        db.close().await.unwrap();
569    }
570
571    /// Documents that open_with_discovered_sstables_and_registry is crate-private.
572    /// This test exists to document the API contract - the function should NOT be
573    /// callable from integration tests or external crates.
574    #[cfg(feature = "state_machine")]
575    #[test]
576    fn test_open_with_discovered_sstables_and_registry_is_crate_private() {
577        // This test compiling proves the function exists and is accessible within the crate
578        // If we accidentally made it pub instead of pub(crate), integration tests could access it
579        // The function signature itself enforces this via pub(crate) keyword
580
581        // Note: We don't actually call the function here since it requires async setup
582        // The mere existence of this test documents the API boundary
583        assert!(
584            true,
585            "open_with_discovered_sstables_and_registry is correctly marked pub(crate)"
586        );
587    }
588
589    #[tokio::test]
590    #[cfg(feature = "state_machine")]
591    async fn test_database_open_with_discovered_sstables() {
592        let temp_dir = TempDir::new().unwrap();
593        let config = Config::test_config();
594
595        // Create an empty list of discovered table directories
596        let discovered_dirs = Vec::new();
597
598        let db = Database::open_with_discovered_sstables(temp_dir.path(), discovered_dirs, config)
599            .await
600            .unwrap();
601
602        // Verify database was created successfully
603        let stats = db.stats().await.unwrap();
604        assert_eq!(stats.storage_stats.sstables.sstable_count, 0);
605
606        db.close().await.unwrap();
607    }
608
609    #[tokio::test]
610    #[cfg(all(
611        feature = "legacy-heuristics",
612        feature = "state_machine",
613        feature = "experimental"
614    ))]
615    async fn test_database_basic_operations() {
616        let temp_dir = TempDir::new().unwrap();
617        let config = Config::test_config();
618
619        let db = Database::open(temp_dir.path(), config).await.unwrap();
620
621        // Create table
622        let result = db
623            .execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
624            .await
625            .unwrap();
626        assert_eq!(result.rows_affected, 0);
627
628        // Insert data
629        let result = db
630            .execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
631            .await
632            .unwrap();
633
634        #[cfg(debug_assertions)]
635        log::debug!(
636            "Test INSERT assertion - rows_affected: {}",
637            result.rows_affected
638        );
639
640        assert_eq!(result.rows_affected, 1);
641
642        // Query data - Re-enabled for QA debugging
643        let result = db
644            .execute("SELECT * FROM users WHERE id = 1")
645            .await
646            .unwrap();
647
648        #[cfg(debug_assertions)]
649        log::debug!("Test SELECT assertion - rows.len(): {}", result.rows.len());
650
651        assert_eq!(result.rows.len(), 1, "SELECT should return 1 row");
652
653        db.close().await.unwrap();
654    }
655}