use cxx::UniquePtr;
use std::cell::UnsafeCell;
use std::fmt;
use std::path::Path;
use crate::error::Error;
use crate::ffi::ffi;
pub struct Database {
pub(crate) db: UnsafeCell<UniquePtr<ffi::Database>>,
}
unsafe impl Send for Database {}
unsafe impl Sync for Database {}
#[derive(Clone, Debug)]
pub struct SystemConfig {
buffer_pool_size: u64,
max_num_threads: u64,
enable_compression: bool,
read_only: bool,
max_db_size: u64,
auto_checkpoint: bool,
checkpoint_threshold: i64,
throw_on_wal_replay_failure: bool,
enable_checksums: bool,
enable_multi_writes: bool,
}
#[cfg(test)]
pub(crate) const SYSTEM_CONFIG_FOR_TESTS: SystemConfig = SystemConfig {
buffer_pool_size: 0,
max_num_threads: 0,
enable_compression: true,
read_only: false,
max_db_size: 16 * 1024 * 1024 * 1024,
auto_checkpoint: true,
checkpoint_threshold: -1_i64,
throw_on_wal_replay_failure: true,
enable_checksums: true,
enable_multi_writes: false,
};
impl Default for SystemConfig {
fn default() -> SystemConfig {
SystemConfig {
buffer_pool_size: 0,
max_num_threads: 0,
enable_compression: true,
read_only: false,
max_db_size: u64::from(u32::MAX),
auto_checkpoint: true,
checkpoint_threshold: -1_i64,
throw_on_wal_replay_failure: true,
enable_checksums: true,
enable_multi_writes: false,
}
}
}
impl SystemConfig {
pub fn buffer_pool_size(mut self, buffer_pool_size: u64) -> Self {
self.buffer_pool_size = buffer_pool_size;
self
}
pub fn max_num_threads(mut self, max_num_threads: u64) -> Self {
self.max_num_threads = max_num_threads;
self
}
pub fn enable_compression(mut self, enable_compression: bool) -> Self {
self.enable_compression = enable_compression;
self
}
pub fn read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
pub fn max_db_size(mut self, max_db_size: u64) -> Self {
self.max_db_size = max_db_size;
self
}
pub fn auto_checkpoint(mut self, auto_checkpoint: bool) -> Self {
self.auto_checkpoint = auto_checkpoint;
self
}
pub fn checkpoint_threshold(mut self, checkpoint_threshold: i64) -> Self {
self.checkpoint_threshold = checkpoint_threshold;
self
}
pub fn throw_on_wal_replay_failure(mut self, throw_on_wal_replay_failure: bool) -> Self {
self.throw_on_wal_replay_failure = throw_on_wal_replay_failure;
self
}
pub fn enable_checksums(mut self, enable_checksums: bool) -> Self {
self.enable_checksums = enable_checksums;
self
}
pub fn enable_multi_writes(mut self, enable_multi_writes: bool) -> Self {
self.enable_multi_writes = enable_multi_writes;
self
}
}
pub(crate) const IN_MEMORY_DB_NAME: &str = ":memory:";
impl Database {
pub fn new<P: AsRef<Path>>(path: P, config: SystemConfig) -> Result<Self, Error> {
Ok(Database {
db: UnsafeCell::new(ffi::new_database(
ffi::StringView::new(&path.as_ref().display().to_string()),
config.buffer_pool_size,
config.max_num_threads,
config.enable_compression,
config.read_only,
config.max_db_size,
config.auto_checkpoint,
config.checkpoint_threshold,
config.throw_on_wal_replay_failure,
config.enable_checksums,
config.enable_multi_writes,
)?),
})
}
pub fn in_memory(config: SystemConfig) -> Result<Self, Error> {
Self::new(IN_MEMORY_DB_NAME, config)
}
}
impl fmt::Debug for Database {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Database")
.field("db", &"Opaque C++ data".to_string())
.finish()
}
}
#[cfg(test)]
mod tests {
use anyhow::{Error, Result};
use crate::connection::Connection;
use crate::database::SYSTEM_CONFIG_FOR_TESTS;
use crate::database::{Database, SystemConfig};
use crate::value::Value;
use std::collections::HashSet;
use std::fs::File;
use std::io::Write;
#[test]
fn create_database() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let _db = Database::new(temp_dir.path().join("test"), SYSTEM_CONFIG_FOR_TESTS)?;
temp_dir.close()?;
Ok(())
}
#[test]
fn create_database_no_compression() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let _ = Database::new(
temp_dir.path().join("test"),
SYSTEM_CONFIG_FOR_TESTS.enable_compression(false),
)?;
temp_dir.close()?;
Ok(())
}
#[test]
fn test_database_read_only() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
{
Database::new(temp_dir.path().join("test"), SYSTEM_CONFIG_FOR_TESTS)?;
}
let db = Database::new(
temp_dir.path().join("test"),
SYSTEM_CONFIG_FOR_TESTS.read_only(true),
)?;
let conn = Connection::new(&db)?;
let result: Error = conn
.query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));")
.expect_err("Write query should produce an error on a read-only DB")
.into();
assert_eq!(
result.to_string(),
"Query execution failed: Connection exception: Cannot execute write operations in a read-only database!"
);
Ok(())
}
#[test]
fn test_database_max_size() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
{
Database::new(
temp_dir.path().join("test"),
SystemConfig::default().max_db_size(1 << 32),
)?;
}
{
Database::new(
temp_dir.path().join("test"),
SystemConfig::default().max_db_size(0),
)
.expect_err("0 is not a valid max DB size");
}
Ok(())
}
#[test]
fn test_database_auto_checkpoint() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let db = Database::new(
temp_dir.path().join("test"),
SYSTEM_CONFIG_FOR_TESTS.auto_checkpoint(false),
)?;
let conn = Connection::new(&db)?;
let result = conn.query("CALL current_setting('auto_checkpoint') RETURN *");
assert_eq!(
result?.next().unwrap()[0],
Value::String("False".to_string())
);
let result = conn.query("CALL current_setting('checkpoint_threshold') RETURN *");
assert_eq!(
result?.next().unwrap()[0],
Value::String("16777216".to_string())
);
Ok(())
}
#[test]
fn test_database_checkpoint_threshold() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let db = Database::new(
temp_dir.path().join("test"),
SYSTEM_CONFIG_FOR_TESTS.checkpoint_threshold(1234),
)?;
let conn = Connection::new(&db)?;
let result = conn.query("CALL current_setting('checkpoint_threshold') RETURN *");
assert_eq!(
result?.next().unwrap()[0],
Value::String("1234".to_string())
);
Ok(())
}
#[test]
fn test_database_throw_on_wal_replay_failure() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let db_path = temp_dir.path().join("test");
let wal_path = db_path.with_extension("wal");
let mut wal_file = File::create(wal_path)?;
wal_file.write_all(b"aaaaaaaaaaaaaaaaaaaaaaaa")?;
let db = Database::new(
db_path,
SYSTEM_CONFIG_FOR_TESTS.throw_on_wal_replay_failure(false),
)?;
let conn = Connection::new(&db)?;
let result = conn.query("return 1");
assert_eq!(result?.next().unwrap()[0], Value::Int64(1));
Ok(())
}
#[test]
fn test_database_enable_checksums() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let db_path = temp_dir.path().join("test");
{
let db = Database::new(
db_path.clone(),
SYSTEM_CONFIG_FOR_TESTS
.auto_checkpoint(false)
.enable_checksums(true),
)?;
let conn = Connection::new(&db)?;
conn.query("call force_checkpoint_on_close=false")?;
conn.query("create node table testtest1(id int64 primary key)")?;
}
{
Database::new(
db_path.clone(),
SYSTEM_CONFIG_FOR_TESTS.enable_checksums(false),
)
.expect_err("An error should be thrown if the enable_checksums config doesn't match the one used to produce a WAL file");
}
Ok(())
}
#[test]
fn test_database_in_memory() -> Result<()> {
use crate::Value;
let db = Database::in_memory(SYSTEM_CONFIG_FOR_TESTS)?;
assert!(!std::path::Path::new(crate::database::IN_MEMORY_DB_NAME).exists());
let conn = Connection::new(&db)?;
conn.query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name))")?;
conn.query("BEGIN TRANSACTION")?;
conn.query("CREATE (:Person {name: 'Alice', age: 25})")?;
conn.query("CREATE (:Person {name: 'Bob', age: 30})")?;
conn.query("COMMIT")?;
let results: HashSet<String> = conn
.query("MATCH (p:Person) RETURN p.name")?
.map(|tuple| {
assert!(tuple.len() == 1);
if let Value::String(value) = &tuple[0] {
value.clone()
} else {
panic!(
"Expected query values to be strings, but got {:?} instead",
tuple[0]
)
}
})
.collect();
assert!(results == HashSet::from(["Alice".to_string(), "Bob".to_string()]));
Ok(())
}
#[test]
fn test_database_enable_multi_writes() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let db = Database::new(
temp_dir.path().join("test"),
SYSTEM_CONFIG_FOR_TESTS.enable_multi_writes(true),
)?;
let conn1 = Connection::new(&db)?;
conn1.query("BEGIN TRANSACTION")?;
conn1.query("CREATE NODE TABLE t(a INT, b INT, PRIMARY KEY(a))")?;
let conn2 = Connection::new(&db)?;
conn2.query("BEGIN TRANSACTION")?;
Ok(())
}
}