use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::config::Config;
use crate::error::DbResult;
use crate::shard::Shard;
#[cfg(feature = "encryption")]
use crate::crypto::PageCipher;
const META_SIZE: usize = 4;
fn write_meta(
path: &std::path::Path,
config: &Config,
#[cfg(feature = "encryption")] encrypted: bool,
) -> DbResult<()> {
let mut buf = [0u8; META_SIZE];
buf[0] = config.shard_count as u8;
buf[1] = config.shard_prefix_bits as u8;
#[cfg(feature = "encryption")]
if encrypted {
buf[2] |= 1;
}
std::fs::write(path, buf)?;
Ok(())
}
fn validate_meta(
path: &std::path::Path,
config: &Config,
#[cfg(feature = "encryption")] encrypted: bool,
) -> DbResult<()> {
let meta = std::fs::read(path)?;
if meta.len() != META_SIZE {
return Err(crate::error::DbError::FormatMismatch(format!(
"db.meta has unexpected size: expected {META_SIZE}, got {}",
meta.len()
)));
}
let stored_shards = meta[0] as usize;
if stored_shards != config.shard_count {
return Err(crate::error::DbError::FormatMismatch(format!(
"shard_count mismatch: db has {stored_shards}, config has {}",
config.shard_count,
)));
}
let stored_prefix = meta[1] as usize;
if stored_prefix != config.shard_prefix_bits {
return Err(crate::error::DbError::FormatMismatch(format!(
"shard_prefix_bits mismatch: db has {stored_prefix}, config has {}",
config.shard_prefix_bits,
)));
}
#[cfg(feature = "encryption")]
{
let was_encrypted = meta[2] & 1 != 0;
if was_encrypted != encrypted {
return Err(crate::error::DbError::FormatMismatch(if was_encrypted {
"database is encrypted but no encryption_key provided".into()
} else {
"database is not encrypted but encryption_key was provided".into()
}));
}
}
Ok(())
}
#[allow(dead_code)]
pub(crate) struct Engine {
path: PathBuf,
config: Config,
shards: Arc<Vec<Shard>>,
gsn: Arc<AtomicU64>,
#[cfg(feature = "encryption")]
cipher: Option<Arc<PageCipher>>,
}
impl Engine {
#[tracing::instrument(skip(path, config), fields(path = %path.as_ref().display()))]
pub fn open(path: impl AsRef<Path>, config: Config) -> DbResult<Self> {
let path = path.as_ref().to_path_buf();
config.validate()?;
tracing::info!(shards = config.shard_count, "opening database");
std::fs::create_dir_all(&path)?;
#[cfg(feature = "encryption")]
let cipher = config
.encryption_key
.as_ref()
.map(|key| PageCipher::new(key).map(Arc::new))
.transpose()?;
{
let meta_path = path.join("db.meta");
if meta_path.exists() {
validate_meta(
&meta_path,
&config,
#[cfg(feature = "encryption")]
cipher.is_some(),
)?;
} else {
write_meta(
&meta_path,
&config,
#[cfg(feature = "encryption")]
cipher.is_some(),
)?;
}
}
let gsn = Arc::new(AtomicU64::new(1));
let mut shards = Vec::with_capacity(config.shard_count);
for i in 0..config.shard_count {
let shard_dir = path.join(format!("shard_{i:03}"));
#[cfg(feature = "encryption")]
let shard = Shard::open_encrypted(
i as u8,
&shard_dir,
config.max_file_size,
config.write_buffer_size,
config.hints,
cipher.clone(),
gsn.clone(),
)?;
#[cfg(not(feature = "encryption"))]
let shard = Shard::open(
i as u8,
&shard_dir,
config.max_file_size,
config.write_buffer_size,
config.hints,
gsn.clone(),
)?;
shards.push(shard);
}
tracing::info!("database opened");
Ok(Self {
path,
config,
shards: Arc::new(shards),
gsn,
#[cfg(feature = "encryption")]
cipher,
})
}
#[allow(dead_code)]
pub fn path(&self) -> &Path {
&self.path
}
#[allow(dead_code)]
pub fn config(&self) -> &Config {
&self.config
}
pub fn gsn(&self) -> &AtomicU64 {
&self.gsn
}
pub fn shards(&self) -> &Arc<Vec<Shard>> {
&self.shards
}
pub fn shard_dirs(&self) -> Vec<std::path::PathBuf> {
self.shards.iter().map(|s| s.dir().to_path_buf()).collect()
}
pub fn shard_dir_refs(dirs: &[std::path::PathBuf]) -> Vec<&Path> {
dirs.iter().map(|p| p.as_path()).collect()
}
pub fn shard_ids(&self) -> Vec<u8> {
self.shards.iter().map(|s| s.id).collect()
}
pub fn hints(&self) -> bool {
self.config.hints
}
#[cfg(feature = "encryption")]
pub fn cipher(&self) -> Option<Arc<PageCipher>> {
self.cipher.clone()
}
pub fn flush_buffers(&self) -> DbResult<()> {
tracing::debug!("flushing all buffers");
for shard in self.shards.iter() {
shard.flush_buf()?;
}
Ok(())
}
pub fn flush(&self) -> DbResult<()> {
tracing::info!("closing database");
for shard in self.shards.iter() {
shard.flush()?;
}
tracing::info!("database closed");
Ok(())
}
}