cqlite_core/lib.rs
1//! CQLite Core Database Engine
2//!
3//! A high-performance, embeddable database engine with SSTable-based storage,
4//! supporting both native and WASM deployments.
5
6pub mod config;
7pub mod cql;
8pub mod error;
9pub mod parser;
10// DISABLED FOR M1: Security and performance modules causing compilation errors
11// pub mod performance;
12// pub mod security; // Security framework for comprehensive protection
13pub mod types;
14pub mod util;
15pub mod version_hints;
16
17pub mod benchmarks;
18pub mod memory;
19pub mod platform;
20#[cfg(feature = "state_machine")]
21pub mod query;
22pub mod schema;
23pub mod storage;
24
25// Embeddable export writers (Epic #682). The module is always present; the
26// Parquet writer inside it is gated behind the optional `parquet` feature.
27pub mod export;
28
29// M5: Write engine and serialization modules (Issue #359)
30// Re-exported at crate level for convenience when write-support is enabled
31#[cfg(feature = "write-support")]
32pub use storage::serialization;
33#[cfg(feature = "write-support")]
34pub use storage::write_engine;
35
36// Ingestion module for one-shot schema & SSTable discovery (Issue #249: CLI-specific)
37#[cfg(feature = "cli-helpers")]
38pub mod ingestion;
39
40// Discovery module for SSTable scanning and coverage analysis
41#[cfg(feature = "state_machine")]
42pub mod discovery;
43
44// Testing utilities - hidden from public docs via #[doc(hidden)] but available for integration tests
45#[doc(hidden)]
46pub mod testing;
47
48// NOTE: memory_safety_runner moved to tools/memory-safety-runner (Issue #245)
49// NOTE: memory_safety_tests disabled - MemTable removed in Issue #175
50
51// Re-export main types for convenience
52pub use crate::{
53 config::Config,
54 error::{Error, Result},
55 platform::Platform,
56 types::*,
57};
58
59// Re-export query types when state_machine feature is enabled
60#[cfg(feature = "state_machine")]
61pub use query::SchemaStatus;
62
63use std::path::Path;
64#[cfg(feature = "state_machine")]
65use std::path::PathBuf;
66use std::sync::Arc;
67
68use crate::{memory::MemoryManager, storage::StorageEngine};
69
70#[cfg(feature = "state_machine")]
71use crate::schema::SchemaManager;
72
73#[cfg(feature = "state_machine")]
74use crate::query::QueryEngine;
75
76/// Main database handle
77///
78/// This is the primary interface for interacting with a CQLite database.
79/// It coordinates between the storage engine, schema manager, and query engine.
80#[derive(Debug)]
81pub struct Database {
82 storage: Arc<StorageEngine>,
83 #[cfg(feature = "state_machine")]
84 query: Arc<QueryEngine>,
85 memory: Arc<MemoryManager>,
86 config: Config,
87}
88
89impl Database {
90 /// Open a database at the given path with the specified configuration
91 ///
92 /// # Arguments
93 ///
94 /// * `path` - The directory path where the database files will be stored
95 /// * `config` - Database configuration options
96 ///
97 /// # Errors
98 ///
99 /// Returns an error if:
100 /// - The path cannot be created or accessed
101 /// - Database files are corrupted
102 /// - Configuration is invalid
103 ///
104 /// # Examples
105 ///
106 /// ```rust,no_run
107 /// use cqlite_core::{Database, Config};
108 /// use std::path::{Path, PathBuf};
109 ///
110 /// # tokio_test::block_on(async {
111 /// let config = Config::default();
112 /// let db = Database::open(Path::new("./data"), config).await?;
113 /// # Ok::<(), Box<dyn std::error::Error>>(())
114 /// # });
115 /// ```
116 pub async fn open(path: &Path, config: Config) -> Result<Self> {
117 // Initialize platform abstraction layer
118 let platform = Arc::new(Platform::new(&config).await?);
119
120 // Initialize memory manager
121 let memory = Arc::new(MemoryManager::new(&config)?);
122
123 // Initialize storage engine (no schema registry for simple open)
124 let storage = Arc::new(
125 StorageEngine::open(
126 path,
127 &config,
128 platform.clone(),
129 #[cfg(feature = "state_machine")]
130 None,
131 )
132 .await?,
133 );
134
135 // Initialize schema manager
136 #[cfg(feature = "state_machine")]
137 let schema = Arc::new(SchemaManager::new_with_storage(storage.clone(), &config).await?);
138
139 // Initialize query engine (only when feature enabled)
140 #[cfg(feature = "state_machine")]
141 let query = Arc::new(QueryEngine::new(
142 storage.clone(),
143 schema.clone(),
144 memory.clone(),
145 &config,
146 )?);
147
148 Ok(Self {
149 storage,
150 #[cfg(feature = "state_machine")]
151 query,
152 memory,
153 config,
154 })
155 }
156
157 /// Open a database with pre-discovered SSTable table directories
158 ///
159 /// This method is used in the ingestion flow where SSTable discovery has been performed
160 /// externally (e.g., via `DiscoveryService`) and the database should be initialized with
161 /// specific SSTable files rather than scanning the storage directory.
162 ///
163 /// # Use Case
164 ///
165 /// This method is designed for the one-shot ingestion workflow:
166 /// 1. `DiscoveryService::discover()` scans external Cassandra data directories
167 /// 2. `SchemaManager` parses schema from discovered files
168 /// 3. `Database::open_with_discovered_sstables()` creates a queryable database instance
169 ///
170 /// # Arguments
171 ///
172 /// * `storage_path` - The directory path for database runtime files (WAL, manifest, memtable)
173 /// * `discovered_table_dirs` - Vector of table directory paths from DiscoveryService
174 /// (e.g., `/var/lib/cassandra/data/keyspace1/table1-abc123`)
175 /// * `config` - Database configuration options
176 ///
177 /// # Errors
178 ///
179 /// Returns an error if:
180 /// - The storage path cannot be created or accessed
181 /// - Any discovered table directory cannot be read
182 /// - Configuration is invalid
183 /// - Storage engine or query engine initialization fails
184 ///
185 /// # Feature Gates
186 ///
187 /// This method is only available when the `state_machine` feature is enabled (default in M2+).
188 ///
189 /// # Examples
190 ///
191 /// ```rust,no_run
192 /// use cqlite_core::{Database, Config};
193 /// use std::path::{Path, PathBuf};
194 ///
195 /// # tokio_test::block_on(async {
196 /// let config = Config::default();
197 /// let storage_path = Path::new("./runtime");
198 /// let discovered_dirs = vec![
199 /// PathBuf::from("/var/lib/cassandra/data/keyspace1/table1-abc123"),
200 /// PathBuf::from("/var/lib/cassandra/data/keyspace1/table2-def456"),
201 /// ];
202 ///
203 /// let db = Database::open_with_discovered_sstables(
204 /// storage_path,
205 /// discovered_dirs,
206 /// config
207 /// ).await?;
208 /// # Ok::<(), Box<dyn std::error::Error>>(())
209 /// # });
210 /// ```
211 #[cfg(feature = "state_machine")]
212 pub async fn open_with_discovered_sstables(
213 storage_path: &Path,
214 discovered_table_dirs: Vec<PathBuf>,
215 config: Config,
216 ) -> Result<Self> {
217 Self::open_with_discovered_sstables_and_registry(
218 storage_path,
219 discovered_table_dirs,
220 config,
221 None,
222 )
223 .await
224 }
225
226 /// Open a database with pre-discovered SSTable table directories and optional schema registry
227 ///
228 /// This is the internal implementation that supports passing a pre-loaded schema registry.
229 /// Public callers should use `open_with_discovered_sstables()` which calls this with None.
230 /// The ingestion module uses this directly to pass loaded schemas.
231 ///
232 /// # Arguments
233 ///
234 /// * `storage_path` - The directory path for database runtime files
235 /// * `discovered_table_dirs` - Vector of table directory paths from DiscoveryService
236 /// * `config` - Database configuration options
237 /// * `schema_registry` - Optional pre-loaded schema registry from ingestion
238 #[cfg(feature = "state_machine")]
239 pub(crate) async fn open_with_discovered_sstables_and_registry(
240 storage_path: &Path,
241 discovered_table_dirs: Vec<PathBuf>,
242 config: Config,
243 schema_registry: Option<Arc<tokio::sync::RwLock<schema::SchemaRegistry>>>,
244 ) -> Result<Self> {
245 // Initialize platform abstraction layer
246 let platform = Arc::new(Platform::new(&config).await?);
247
248 // Initialize memory manager
249 let memory = Arc::new(MemoryManager::new(&config)?);
250
251 // Initialize storage engine with pre-discovered SSTables and schema registry
252 let storage = Arc::new(
253 StorageEngine::open_with_sstables(
254 storage_path,
255 discovered_table_dirs,
256 &config,
257 platform.clone(),
258 schema_registry.clone(),
259 )
260 .await?,
261 );
262
263 // Initialize schema manager - use registry if provided, otherwise create empty
264 let schema = if let Some(registry_rwlock) = schema_registry {
265 Arc::new(
266 SchemaManager::new_with_registry(storage.clone(), registry_rwlock, &config).await?,
267 )
268 } else {
269 Arc::new(SchemaManager::new_with_storage(storage.clone(), &config).await?)
270 };
271
272 // Initialize query engine
273 let query = Arc::new(QueryEngine::new(
274 storage.clone(),
275 schema.clone(),
276 memory.clone(),
277 &config,
278 )?);
279
280 Ok(Self {
281 storage,
282 query,
283 memory,
284 config,
285 })
286 }
287
288 /// Execute a SQL query and return the result
289 ///
290 /// # Arguments
291 ///
292 /// * `sql` - The SQL query string to execute
293 ///
294 /// # Errors
295 ///
296 /// Returns an error if:
297 /// - SQL syntax is invalid
298 /// - Referenced tables/columns don't exist
299 /// - Query execution fails
300 ///
301 /// # Examples
302 ///
303 /// ```rust,no_run
304 /// # use cqlite_core::{Database, Config};
305 /// # use std::path::{Path, PathBuf};
306 /// # tokio_test::block_on(async {
307 /// # let config = Config::default();
308 /// # let db = Database::open(Path::new("./data"), config).await?;
309 /// let result = db.execute("SELECT * FROM users WHERE id = 1").await?;
310 /// # Ok::<(), Box<dyn std::error::Error>>(())
311 /// # });
312 /// ```
313 #[cfg(feature = "state_machine")]
314 pub async fn execute(&self, sql: &str) -> Result<query::result::QueryResult> {
315 let result = self.query.execute(sql).await;
316
317 #[cfg(debug_assertions)]
318 if let Ok(ref query_result) = result {
319 log::debug!(
320 "Database::execute('{}') returning rows_affected: {}",
321 sql,
322 query_result.rows_affected
323 );
324 }
325
326 result
327 }
328
329 /// Execute a SQL query with streaming results (Issue #280)
330 ///
331 /// Returns a `QueryResultIterator` that yields rows incrementally via a bounded
332 /// channel, enabling memory-efficient processing of large result sets.
333 ///
334 /// This is the recommended method for exporting large tables, as it avoids
335 /// materializing all rows in memory at once.
336 ///
337 /// # Arguments
338 ///
339 /// * `sql` - The SQL query to execute (must be a SELECT statement)
340 /// * `config` - Streaming configuration (buffer size, chunk hints)
341 ///
342 /// # Errors
343 ///
344 /// Returns an error if:
345 /// - Query is not a SELECT statement
346 /// - SQL syntax is invalid
347 /// - Query execution fails
348 ///
349 /// # Examples
350 ///
351 /// ```rust,no_run
352 /// # use cqlite_core::{Database, Config};
353 /// # use cqlite_core::query::result::StreamingConfig;
354 /// # use std::path::Path;
355 /// # tokio_test::block_on(async {
356 /// # let db = Database::open(Path::new("./data"), Config::default()).await?;
357 /// let config = StreamingConfig::default();
358 /// let mut iter = db.execute_streaming(
359 /// "SELECT * FROM large_table",
360 /// config
361 /// ).await?;
362 ///
363 /// while let Some(row_result) = iter.next_async().await {
364 /// let row = row_result?;
365 /// // Process row incrementally
366 /// }
367 /// # Ok::<(), Box<dyn std::error::Error>>(())
368 /// # });
369 /// ```
370 #[cfg(feature = "state_machine")]
371 pub async fn execute_streaming(
372 &self,
373 sql: &str,
374 config: query::result::StreamingConfig,
375 ) -> Result<query::result::QueryResultIterator> {
376 self.query.execute_streaming(sql, config).await
377 }
378
379 /// Prepare a SQL statement for repeated execution
380 ///
381 /// # Arguments
382 ///
383 /// * `sql` - The SQL statement to prepare
384 ///
385 /// # Errors
386 ///
387 /// Returns an error if SQL syntax is invalid or references non-existent objects
388 #[cfg(feature = "state_machine")]
389 pub async fn prepare(&self, sql: &str) -> Result<std::sync::Arc<query::PreparedQuery>> {
390 self.query.prepare(sql).await
391 }
392
393 /// Explain a SQL query without executing it
394 ///
395 /// # Arguments
396 ///
397 /// * `sql` - The SQL query to explain
398 ///
399 /// # Errors
400 ///
401 /// Returns an error if SQL syntax is invalid
402 #[cfg(feature = "state_machine")]
403 pub async fn explain(&self, sql: &str) -> Result<query::ExplainResult> {
404 self.query.explain(sql).await
405 }
406
407 /// Check if schema is available for a table
408 ///
409 /// This is a fast boolean check useful for pre-flight validation.
410 /// For detailed diagnostic information, use `schema_status()`.
411 ///
412 /// # Examples
413 ///
414 /// ```rust,no_run
415 /// # use cqlite_core::{Database, Config};
416 /// # tokio_test::block_on(async {
417 /// let db = Database::open(std::path::Path::new("./data"), Config::default()).await?;
418 ///
419 /// if !db.has_schema_for_table("users").await {
420 /// eprintln!("Warning: No schema found for 'users' table");
421 /// }
422 /// # Ok::<(), Box<dyn std::error::Error>>(())
423 /// # });
424 /// ```
425 #[cfg(feature = "state_machine")]
426 pub async fn has_schema_for_table(&self, table: &str) -> bool {
427 self.query.has_schema_for_table(table).await
428 }
429
430 /// Get detailed schema status for debugging
431 ///
432 /// Returns diagnostic information about schema availability including
433 /// reasons for missing schemas or extraction failures.
434 ///
435 /// # Examples
436 ///
437 /// ```rust,no_run
438 /// # use cqlite_core::{Database, Config};
439 /// # use cqlite_core::query::SchemaStatus;
440 /// # tokio_test::block_on(async {
441 /// let db = Database::open(std::path::Path::new("./data"), Config::default()).await?;
442 ///
443 /// match db.schema_status("users").await {
444 /// SchemaStatus::Available { .. } => println!("Schema ready"),
445 /// SchemaStatus::ExtractionFailed { cause, suggestion, .. } => {
446 /// eprintln!("Schema extraction failed: {}", cause);
447 /// eprintln!("Suggestion: {}", suggestion);
448 /// }
449 /// _ => {}
450 /// }
451 /// # Ok::<(), Box<dyn std::error::Error>>(())
452 /// # });
453 /// ```
454 #[cfg(feature = "state_machine")]
455 pub async fn schema_status(&self, table: &str) -> query::SchemaStatus {
456 self.query.schema_status(table).await
457 }
458
459 /// Get database statistics
460 pub async fn stats(&self) -> Result<DatabaseStats> {
461 Ok(DatabaseStats {
462 storage_stats: self.storage.stats().await?,
463 memory_stats: self.memory.stats()?,
464 #[cfg(feature = "state_machine")]
465 query_stats: self.query.stats(),
466 })
467 }
468
469 /// Flush all pending writes to disk
470 #[cfg(feature = "experimental")]
471 pub async fn flush(&self) -> Result<()> {
472 self.storage.flush().await
473 }
474
475 /// Perform manual compaction of storage files
476 #[cfg(feature = "experimental")]
477 pub async fn compact(&self) -> Result<()> {
478 self.storage.compact().await
479 }
480
481 /// Shutdown the database storage engine without consuming self.
482 ///
483 /// This is useful for language bindings where the Database is wrapped
484 /// in an Arc and cannot be consumed. The shutdown operation is idempotent.
485 ///
486 /// For consuming close that also drops the Database, use `close()`.
487 pub async fn shutdown(&self) -> Result<()> {
488 self.storage.shutdown().await
489 }
490
491 /// Close the database and release all resources
492 ///
493 /// This method ensures all pending operations are completed and
494 /// all resources are properly cleaned up.
495 pub async fn close(self) -> Result<()> {
496 // Stop background tasks
497 self.storage.shutdown().await?;
498
499 // Flush any remaining data (only with experimental feature)
500 #[cfg(feature = "experimental")]
501 {
502 self.storage.flush().await?;
503 }
504
505 Ok(())
506 }
507
508 /// Get the database configuration
509 pub fn config(&self) -> &Config {
510 &self.config
511 }
512}
513
514impl Clone for Database {
515 fn clone(&self) -> Self {
516 Self {
517 storage: self.storage.clone(),
518 #[cfg(feature = "state_machine")]
519 query: self.query.clone(),
520 memory: self.memory.clone(),
521 config: self.config.clone(),
522 }
523 }
524}
525
526/// Database statistics
527#[derive(Debug, Clone)]
528pub struct DatabaseStats {
529 /// Storage engine statistics
530 pub storage_stats: storage::StorageStats,
531 /// Memory manager statistics
532 pub memory_stats: memory::MemoryStats,
533 /// Query engine statistics
534 #[cfg(feature = "state_machine")]
535 pub query_stats: query::QueryStats,
536}
537
538/// A prepared SQL statement that can be executed multiple times
539#[cfg(feature = "state_machine")]
540#[derive(Debug)]
541pub struct PreparedStatement {
542 statement: query::PreparedQuery,
543}
544
545#[cfg(feature = "state_machine")]
546impl PreparedStatement {
547 /// Execute the prepared statement with the given parameters
548 pub async fn execute(&self, params: &[Value]) -> Result<query::result::QueryResult> {
549 self.statement.execute(params).await
550 }
551}
552
553// Re-export query result types for convenience
554#[cfg(feature = "state_machine")]
555pub use query::result::{QueryResult, QueryRow};
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use tempfile::TempDir;
561
562 #[tokio::test]
563 async fn test_database_open_close() {
564 let temp_dir = TempDir::new().unwrap();
565 let config = Config::test_config();
566
567 let db = Database::open(temp_dir.path(), config).await.unwrap();
568 db.close().await.unwrap();
569 }
570
571 /// Documents that open_with_discovered_sstables_and_registry is crate-private.
572 /// This test exists to document the API contract - the function should NOT be
573 /// callable from integration tests or external crates.
574 #[cfg(feature = "state_machine")]
575 #[test]
576 fn test_open_with_discovered_sstables_and_registry_is_crate_private() {
577 // This test compiling proves the function exists and is accessible within the crate
578 // If we accidentally made it pub instead of pub(crate), integration tests could access it
579 // The function signature itself enforces this via pub(crate) keyword
580
581 // Note: We don't actually call the function here since it requires async setup
582 // The mere existence of this test documents the API boundary
583 assert!(
584 true,
585 "open_with_discovered_sstables_and_registry is correctly marked pub(crate)"
586 );
587 }
588
589 #[tokio::test]
590 #[cfg(feature = "state_machine")]
591 async fn test_database_open_with_discovered_sstables() {
592 let temp_dir = TempDir::new().unwrap();
593 let config = Config::test_config();
594
595 // Create an empty list of discovered table directories
596 let discovered_dirs = Vec::new();
597
598 let db = Database::open_with_discovered_sstables(temp_dir.path(), discovered_dirs, config)
599 .await
600 .unwrap();
601
602 // Verify database was created successfully
603 let stats = db.stats().await.unwrap();
604 assert_eq!(stats.storage_stats.sstables.sstable_count, 0);
605
606 db.close().await.unwrap();
607 }
608
609 #[tokio::test]
610 #[cfg(all(
611 feature = "legacy-heuristics",
612 feature = "state_machine",
613 feature = "experimental"
614 ))]
615 async fn test_database_basic_operations() {
616 let temp_dir = TempDir::new().unwrap();
617 let config = Config::test_config();
618
619 let db = Database::open(temp_dir.path(), config).await.unwrap();
620
621 // Create table
622 let result = db
623 .execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
624 .await
625 .unwrap();
626 assert_eq!(result.rows_affected, 0);
627
628 // Insert data
629 let result = db
630 .execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
631 .await
632 .unwrap();
633
634 #[cfg(debug_assertions)]
635 log::debug!(
636 "Test INSERT assertion - rows_affected: {}",
637 result.rows_affected
638 );
639
640 assert_eq!(result.rows_affected, 1);
641
642 // Query data - Re-enabled for QA debugging
643 let result = db
644 .execute("SELECT * FROM users WHERE id = 1")
645 .await
646 .unwrap();
647
648 #[cfg(debug_assertions)]
649 log::debug!("Test SELECT assertion - rows.len(): {}", result.rows.len());
650
651 assert_eq!(result.rows.len(), 1, "SELECT should return 1 row");
652
653 db.close().await.unwrap();
654 }
655}