pub mod config;
pub mod cql;
pub mod error;
pub mod parser;
pub mod types;
pub mod util;
pub mod version_hints;
pub mod benchmarks;
pub mod memory;
pub mod platform;
#[cfg(feature = "state_machine")]
pub mod query;
pub mod schema;
pub mod storage;
pub mod export;
#[cfg(feature = "write-support")]
pub use storage::serialization;
#[cfg(feature = "write-support")]
pub use storage::write_engine;
#[cfg(feature = "cli-helpers")]
pub mod ingestion;
#[cfg(feature = "state_machine")]
pub mod discovery;
#[doc(hidden)]
pub mod testing;
pub use crate::{
config::Config,
error::{Error, Result},
platform::Platform,
types::*,
};
#[cfg(feature = "state_machine")]
pub use query::SchemaStatus;
use std::path::Path;
#[cfg(feature = "state_machine")]
use std::path::PathBuf;
use std::sync::Arc;
use crate::{memory::MemoryManager, storage::StorageEngine};
#[cfg(feature = "state_machine")]
use crate::schema::SchemaManager;
#[cfg(feature = "state_machine")]
use crate::query::QueryEngine;
#[derive(Debug)]
pub struct Database {
storage: Arc<StorageEngine>,
#[cfg(feature = "state_machine")]
query: Arc<QueryEngine>,
memory: Arc<MemoryManager>,
config: Config,
}
impl Database {
pub async fn open(path: &Path, config: Config) -> Result<Self> {
let platform = Arc::new(Platform::new(&config).await?);
let memory = Arc::new(MemoryManager::new(&config)?);
let storage = Arc::new(
StorageEngine::open(
path,
&config,
platform.clone(),
#[cfg(feature = "state_machine")]
None,
)
.await?,
);
#[cfg(feature = "state_machine")]
let schema = Arc::new(SchemaManager::new_with_storage(storage.clone(), &config).await?);
#[cfg(feature = "state_machine")]
let query = Arc::new(QueryEngine::new(
storage.clone(),
schema.clone(),
memory.clone(),
&config,
)?);
Ok(Self {
storage,
#[cfg(feature = "state_machine")]
query,
memory,
config,
})
}
#[cfg(feature = "state_machine")]
pub async fn open_with_discovered_sstables(
storage_path: &Path,
discovered_table_dirs: Vec<PathBuf>,
config: Config,
) -> Result<Self> {
Self::open_with_discovered_sstables_and_registry(
storage_path,
discovered_table_dirs,
config,
None,
)
.await
}
#[cfg(feature = "state_machine")]
pub(crate) async fn open_with_discovered_sstables_and_registry(
storage_path: &Path,
discovered_table_dirs: Vec<PathBuf>,
config: Config,
schema_registry: Option<Arc<tokio::sync::RwLock<schema::SchemaRegistry>>>,
) -> Result<Self> {
let platform = Arc::new(Platform::new(&config).await?);
let memory = Arc::new(MemoryManager::new(&config)?);
let storage = Arc::new(
StorageEngine::open_with_sstables(
storage_path,
discovered_table_dirs,
&config,
platform.clone(),
schema_registry.clone(),
)
.await?,
);
let schema = if let Some(registry_rwlock) = schema_registry {
Arc::new(
SchemaManager::new_with_registry(storage.clone(), registry_rwlock, &config).await?,
)
} else {
Arc::new(SchemaManager::new_with_storage(storage.clone(), &config).await?)
};
let query = Arc::new(QueryEngine::new(
storage.clone(),
schema.clone(),
memory.clone(),
&config,
)?);
Ok(Self {
storage,
query,
memory,
config,
})
}
#[cfg(feature = "state_machine")]
pub async fn execute(&self, sql: &str) -> Result<query::result::QueryResult> {
let result = self.query.execute(sql).await;
#[cfg(debug_assertions)]
if let Ok(ref query_result) = result {
log::debug!(
"Database::execute('{}') returning rows_affected: {}",
sql,
query_result.rows_affected
);
}
result
}
#[cfg(feature = "state_machine")]
pub async fn execute_streaming(
&self,
sql: &str,
config: query::result::StreamingConfig,
) -> Result<query::result::QueryResultIterator> {
self.query.execute_streaming(sql, config).await
}
#[cfg(feature = "state_machine")]
pub async fn prepare(&self, sql: &str) -> Result<std::sync::Arc<query::PreparedQuery>> {
self.query.prepare(sql).await
}
#[cfg(feature = "state_machine")]
pub async fn explain(&self, sql: &str) -> Result<query::ExplainResult> {
self.query.explain(sql).await
}
#[cfg(feature = "state_machine")]
pub async fn has_schema_for_table(&self, table: &str) -> bool {
self.query.has_schema_for_table(table).await
}
#[cfg(feature = "state_machine")]
pub async fn schema_status(&self, table: &str) -> query::SchemaStatus {
self.query.schema_status(table).await
}
pub async fn stats(&self) -> Result<DatabaseStats> {
Ok(DatabaseStats {
storage_stats: self.storage.stats().await?,
memory_stats: self.memory.stats()?,
#[cfg(feature = "state_machine")]
query_stats: self.query.stats(),
})
}
#[cfg(feature = "experimental")]
pub async fn flush(&self) -> Result<()> {
self.storage.flush().await
}
#[cfg(feature = "experimental")]
pub async fn compact(&self) -> Result<()> {
self.storage.compact().await
}
pub async fn shutdown(&self) -> Result<()> {
self.storage.shutdown().await
}
pub async fn close(self) -> Result<()> {
self.storage.shutdown().await?;
#[cfg(feature = "experimental")]
{
self.storage.flush().await?;
}
Ok(())
}
pub fn config(&self) -> &Config {
&self.config
}
}
impl Clone for Database {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
#[cfg(feature = "state_machine")]
query: self.query.clone(),
memory: self.memory.clone(),
config: self.config.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct DatabaseStats {
pub storage_stats: storage::StorageStats,
pub memory_stats: memory::MemoryStats,
#[cfg(feature = "state_machine")]
pub query_stats: query::QueryStats,
}
#[cfg(feature = "state_machine")]
#[derive(Debug)]
pub struct PreparedStatement {
statement: query::PreparedQuery,
}
#[cfg(feature = "state_machine")]
impl PreparedStatement {
pub async fn execute(&self, params: &[Value]) -> Result<query::result::QueryResult> {
self.statement.execute(params).await
}
}
#[cfg(feature = "state_machine")]
pub use query::result::{QueryResult, QueryRow};
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_database_open_close() {
let temp_dir = TempDir::new().unwrap();
let config = Config::test_config();
let db = Database::open(temp_dir.path(), config).await.unwrap();
db.close().await.unwrap();
}
#[cfg(feature = "state_machine")]
#[test]
fn test_open_with_discovered_sstables_and_registry_is_crate_private() {
assert!(
true,
"open_with_discovered_sstables_and_registry is correctly marked pub(crate)"
);
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_database_open_with_discovered_sstables() {
let temp_dir = TempDir::new().unwrap();
let config = Config::test_config();
let discovered_dirs = Vec::new();
let db = Database::open_with_discovered_sstables(temp_dir.path(), discovered_dirs, config)
.await
.unwrap();
let stats = db.stats().await.unwrap();
assert_eq!(stats.storage_stats.sstables.sstable_count, 0);
db.close().await.unwrap();
}
#[tokio::test]
#[cfg(all(
feature = "legacy-heuristics",
feature = "state_machine",
feature = "experimental"
))]
async fn test_database_basic_operations() {
let temp_dir = TempDir::new().unwrap();
let config = Config::test_config();
let db = Database::open(temp_dir.path(), config).await.unwrap();
let result = db
.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.await
.unwrap();
assert_eq!(result.rows_affected, 0);
let result = db
.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")
.await
.unwrap();
#[cfg(debug_assertions)]
log::debug!(
"Test INSERT assertion - rows_affected: {}",
result.rows_affected
);
assert_eq!(result.rows_affected, 1);
let result = db
.execute("SELECT * FROM users WHERE id = 1")
.await
.unwrap();
#[cfg(debug_assertions)]
log::debug!("Test SELECT assertion - rows.len(): {}", result.rows.len());
assert_eq!(result.rows.len(), 1, "SELECT should return 1 row");
db.close().await.unwrap();
}
}