use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::sync::Mutex;
use crate::error::{DbError, DbResult};
use crate::fixed::config::FixedConfig;
use crate::fixed::shard::FixedShardInner;
const META_SIZE: usize = 16;
const META_MAGIC: &[u8; 4] = b"ARMD";
const META_VERSION: u8 = 2;
const META_BACKEND: u8 = 1;
#[allow(dead_code)]
pub(crate) struct FixedShard {
pub id: u8,
pub inner: Mutex<FixedShardInner>,
}
pub(crate) struct FixedEngine {
path: PathBuf,
shards: Arc<Vec<FixedShard>>,
config: FixedConfig,
}
fn write_meta(path: &Path, config: &FixedConfig) -> DbResult<()> {
let mut buf = [0u8; META_SIZE];
buf[0..4].copy_from_slice(META_MAGIC);
buf[4] = META_VERSION;
buf[5] = META_BACKEND;
buf[6] = config.shard_count as u8;
buf[7] = config.shard_prefix_bits as u8;
buf[8] = 0; std::fs::write(path, buf)?;
Ok(())
}
fn validate_meta(path: &Path, config: &FixedConfig) -> DbResult<()> {
let meta = std::fs::read(path)?;
if meta.len() != META_SIZE {
return Err(DbError::FormatMismatch(format!(
"db.meta: unexpected size: expected {META_SIZE}, got {}",
meta.len()
)));
}
if &meta[0..4] != META_MAGIC {
return Err(DbError::FormatMismatch(
"db.meta: bad magic (expected ARMD)".into(),
));
}
if meta[4] != META_VERSION {
return Err(DbError::FormatMismatch(format!(
"db.meta: version mismatch: stored {}, expected {META_VERSION}",
meta[4]
)));
}
if meta[5] != META_BACKEND {
return Err(DbError::FormatMismatch(format!(
"db.meta: backend mismatch: stored {}, expected {META_BACKEND} (FixedStore)",
meta[5]
)));
}
let stored_shards = meta[6] as usize;
if stored_shards != config.shard_count {
return Err(DbError::FormatMismatch(format!(
"db.meta: shard_count mismatch: stored {stored_shards}, config {}",
config.shard_count
)));
}
let stored_prefix = meta[7] as usize;
if stored_prefix != config.shard_prefix_bits {
return Err(DbError::FormatMismatch(format!(
"db.meta: shard_prefix_bits mismatch: stored {stored_prefix}, config {}",
config.shard_prefix_bits
)));
}
Ok(())
}
impl FixedEngine {
#[tracing::instrument(skip(path,config), fields(path = %path.as_ref().display()))]
pub fn open(
path: impl AsRef<Path>,
config: FixedConfig,
key_len: u16,
value_len: u16,
) -> DbResult<Self> {
let path = path.as_ref().to_path_buf();
config.validate()?;
tracing::info!(shards = config.shard_count, "opening fixed-slot database");
std::fs::create_dir_all(&path)?;
let meta_path = path.join("db.meta");
if meta_path.exists() {
validate_meta(&meta_path, &config)?;
} else {
write_meta(&meta_path, &config)?;
}
let mut shards = Vec::with_capacity(config.shard_count);
for i in 0..config.shard_count {
let shard_dir = path.join(format!("shard_{i:03}"));
let inner = FixedShardInner::open(&shard_dir, i as u8, key_len, value_len, &config)?;
shards.push(FixedShard {
id: i as u8,
inner: Mutex::new(inner),
});
}
tracing::info!("fixed-slot database opened");
Ok(Self {
path,
shards: Arc::new(shards),
config,
})
}
#[allow(dead_code)]
pub fn shards(&self) -> &Arc<Vec<FixedShard>> {
&self.shards
}
#[allow(dead_code)]
pub fn config(&self) -> &FixedConfig {
&self.config
}
#[allow(dead_code)]
pub fn path(&self) -> &Path {
&self.path
}
pub fn flush(&self) -> DbResult<()> {
tracing::debug!("flushing all fixed-slot shards");
for shard in self.shards.iter() {
shard.inner.lock().sync()?;
}
Ok(())
}
pub fn close(&self) -> DbResult<()> {
tracing::info!("closing fixed-slot database");
for shard in self.shards.iter() {
shard.inner.lock().clean_shutdown()?;
}
tracing::info!("fixed-slot database closed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn test_config() -> FixedConfig {
FixedConfig {
shard_count: 2,
grow_step: 64,
..FixedConfig::test()
}
}
#[test]
fn test_engine_open_creates_shards() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("testdb");
let config = test_config();
let engine = FixedEngine::open(&db_path, config.clone(), 8, 32).unwrap();
assert!(db_path.join("db.meta").exists());
for i in 0..config.shard_count {
assert!(
db_path.join(format!("shard_{i:03}")).exists(),
"shard_{i:03} directory should exist"
);
}
assert_eq!(engine.shards().len(), config.shard_count);
for (i, shard) in engine.shards().iter().enumerate() {
assert_eq!(shard.id, i as u8);
}
}
#[test]
fn test_engine_reopen() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("testdb");
let config = test_config();
{
let engine = FixedEngine::open(&db_path, config.clone(), 8, 32).unwrap();
engine.close().unwrap();
}
{
let engine = FixedEngine::open(&db_path, config.clone(), 8, 32).unwrap();
assert_eq!(engine.shards().len(), config.shard_count);
engine.close().unwrap();
}
}
#[test]
fn test_engine_shard_count_mismatch() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("testdb");
{
let config = FixedConfig {
shard_count: 2,
grow_step: 64,
..FixedConfig::test()
};
let engine = FixedEngine::open(&db_path, config, 8, 32).unwrap();
engine.close().unwrap();
}
{
let config = FixedConfig {
shard_count: 4,
grow_step: 64,
..FixedConfig::test()
};
let result = FixedEngine::open(&db_path, config, 8, 32);
assert!(result.is_err());
let msg = result.err().unwrap().to_string();
assert!(
msg.contains("shard_count mismatch"),
"expected shard_count mismatch error, got: {msg}"
);
}
}
}