use heed::{Database, Env, EnvOpenOptions};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU8, Ordering};
use std::thread;
use std::time::Duration;
use crate::error::{Error, Result};
pub(crate) fn is_map_full(err: &heed::Error) -> bool {
matches!(err, heed::Error::Mdb(heed::MdbError::MapFull))
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DbHealthState {
Pending = 0,
Healthy = 1,
Degraded = 2,
}
impl DbHealthState {
fn from_u8(v: u8) -> Self {
debug_assert!(v <= 2);
match v {
0 => Self::Pending,
1 => Self::Healthy,
_ => Self::Degraded,
}
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct DbHealth(Arc<AtomicU8>);
impl DbHealth {
pub(crate) fn new() -> Self {
Self(Arc::new(AtomicU8::new(DbHealthState::Pending as u8)))
}
pub(crate) fn is_healthy(&self) -> bool {
DbHealthState::from_u8(self.0.load(Ordering::Acquire)) == DbHealthState::Healthy
}
pub(crate) fn mark_healthy(&self) {
let _ = self.0.compare_exchange(
DbHealthState::Pending as u8,
DbHealthState::Healthy as u8,
Ordering::AcqRel,
Ordering::Acquire,
);
}
pub(crate) fn mark_unhealthy(&self, reason: &'static str) {
let prev = self.0.swap(DbHealthState::Degraded as u8, Ordering::AcqRel);
if DbHealthState::from_u8(prev) != DbHealthState::Degraded {
tracing::error!(reason, "LMDB tracker marked unhealthy");
}
}
}
pub(crate) fn spawn_lmdb_gc<T: LmdbStore>(shared: Arc<RwLock<Option<T>>>) {
let thread_shared = shared.clone();
let spawn_result = thread::Builder::new()
.name("fff-lmdb-gc".into())
.spawn(move || {
let guard = match thread_shared.read() {
Ok(g) => g,
Err(e) => {
tracing::debug!("gc: read lock poisoned: {e}");
return;
}
};
let Some(ref tracker) = *guard else {
return; };
let env = tracker.env();
if let Err(e) = T::purge_stale_data(env) {
tracing::debug!("purge_stale_data failed: {e}");
}
tracker.health().mark_healthy();
});
if let Err(e) = spawn_result {
tracing::debug!(?e, "failed to spawn fff-lmdb-gc thread");
if let Ok(guard) = shared.read()
&& let Some(ref tracker) = *guard
{
tracker.health().mark_healthy();
}
}
}
fn is_transient_env_open_error(err: &heed::Error) -> bool {
match err {
heed::Error::Io(io) => matches!(
io.kind(),
std::io::ErrorKind::InvalidInput | std::io::ErrorKind::NotFound
),
_ => false,
}
}
pub(crate) trait LmdbStore: Sized + Send + Sync + 'static {
const LABEL: &'static str;
const MAP_SIZE: usize;
const MAX_DBS: u32;
const SIZE_CAP_BYTES: u64;
fn env(&self) -> &Env;
fn health(&self) -> &DbHealth;
fn purge_stale_data(_env: &Env) -> Result<()> {
Ok(())
}
#[tracing::instrument]
fn open_env(db_path: &Path) -> Result<(Env, DbHealth)> {
Self::erase_if_oversized(db_path);
fs::create_dir_all(db_path).map_err(Error::CreateDir)?;
let db = Self::LABEL;
const MAX_ATTEMPTS: u32 = 8;
let mut attempt = 0u32;
let env = loop {
let result = unsafe {
let mut opts = EnvOpenOptions::new();
opts.map_size(Self::MAP_SIZE);
if Self::MAX_DBS > 0 {
opts.max_dbs(Self::MAX_DBS);
}
opts.open(db_path)
};
match result {
Ok(env) => break env,
Err(e) if is_transient_env_open_error(&e) && attempt + 1 < MAX_ATTEMPTS => {
attempt += 1;
tracing::debug!(
path = %db_path.display(),
attempt,
error = ?e,
"transient LMDB env open error, retrying"
);
thread::sleep(Duration::from_millis(50));
}
Err(e) => return Err(Error::EnvOpen { db, source: e }),
}
};
match env.clear_stale_readers() {
Ok(cleared) if cleared > 0 => {
tracing::warn!(cleared, "reclaimed stale LMDB reader slots at open");
}
Ok(_) => {}
Err(e) => tracing::debug!("clear_stale_readers at open failed: {e}"),
}
Ok((env, DbHealth::new()))
}
fn open_database_safe<KC, DC>(env: &Env, name: Option<&str>) -> Result<Database<KC, DC>>
where
KC: 'static,
DC: 'static,
{
let db = Self::LABEL;
let rtxn = env
.read_txn()
.map_err(|source| Error::DbStartReadTxn { db, source })?;
let maybe_db: Option<Database<KC, DC>> = env
.open_database(&rtxn, name)
.map_err(|source| Error::DbOpen { db, source })?;
rtxn.commit()
.map_err(|source| Error::DbCommit { db, source })?;
match maybe_db {
Some(handle) => Ok(handle),
None => {
let mut wtxn = env
.write_txn()
.map_err(|source| Error::DbStartWriteTxn { db, source })?;
let handle = env
.create_database(&mut wtxn, name)
.map_err(|source| Error::DbCreate { db, source })?;
wtxn.commit()
.map_err(|source| Error::DbCommit { db, source })?;
Ok(handle)
}
}
}
fn erase_if_oversized(db_path: &Path) {
let data = db_path.join("data.mdb");
let Ok(meta) = fs::metadata(&data) else {
return;
};
if meta.len() <= Self::SIZE_CAP_BYTES {
return;
}
tracing::error!(
path = %db_path.display(),
size = meta.len(),
cap = Self::SIZE_CAP_BYTES,
"LMDB db exceeds size cap, erasing"
);
let _ = fs::remove_file(&data);
let _ = fs::remove_file(db_path.join("lock.mdb"));
}
}