use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Weak};
use std::thread::sleep;
use std::time::Duration;
use parking_lot::{Mutex, RwLock};
use super::lmdb_env::{LmdbEnv, LmdbEnvConfig, open_lmdb_env};
use super::lmdb_error::LmdbLayerError;
use crate::util::fs::canonicalize;
const REOPEN_RETRIES: u32 = 100;
const REOPEN_RETRY_INTERVAL: Duration = Duration::from_millis(2);
#[derive(Default)]
pub struct LmdbEnvRegistry {
slots: RwLock<HashMap<PathBuf, Weak<LmdbEnv>>>,
open_lock: Mutex<()>,
}
impl LmdbEnvRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn get_or_open(
&self,
path: &Path,
config: &LmdbEnvConfig,
) -> Result<Arc<LmdbEnv>, LmdbLayerError> {
if let Some(handle) = self.lookup(path) {
return Ok(handle);
}
let _open_guard = self.open_lock.lock();
if let Some(handle) = self.lookup(path) {
return Ok(handle);
}
let mut attempts = 0;
loop {
match open_lmdb_env(path, config) {
Ok(env) => {
let handle = Arc::new(env);
self.insert(&handle);
return Ok(handle);
}
Err(err) if err.is_env_already_opened() => {
if let Some(handle) = self.lookup(path) {
return Ok(handle);
}
attempts += 1;
if attempts >= REOPEN_RETRIES {
return Err(err);
}
sleep(REOPEN_RETRY_INTERVAL);
}
Err(err) => return Err(err),
}
}
}
pub fn is_live(&self, path: &Path) -> bool {
self.lookup(path).is_some()
}
fn lookup(&self, path: &Path) -> Option<Arc<LmdbEnv>> {
let slots = self.slots.read();
if let Some(handle) = lookup_key(&slots, path) {
return Some(handle);
}
let canonical = canonicalize(path).ok()?;
if canonical.as_path() == path {
None
} else {
lookup_key(&slots, &canonical)
}
}
fn insert(&self, handle: &Arc<LmdbEnv>) {
let reported = handle.path();
let key = canonicalize(reported).unwrap_or_else(|_| reported.to_path_buf());
let mut slots = self.slots.write();
slots.insert(key, Arc::downgrade(handle));
slots.retain(|_, weak| weak.strong_count() > 0);
}
}
static SHARED_REGISTRY: LazyLock<LmdbEnvRegistry> = LazyLock::new(LmdbEnvRegistry::new);
pub fn open_shared_env(dir: &Path, config: &LmdbEnvConfig) -> Result<Arc<LmdbEnv>, LmdbLayerError> {
SHARED_REGISTRY.get_or_open(dir, config)
}
pub fn shared_env_is_live(dir: &Path) -> bool {
SHARED_REGISTRY.is_live(dir)
}
fn lookup_key(slots: &HashMap<PathBuf, Weak<LmdbEnv>>, key: &Path) -> Option<Arc<LmdbEnv>> {
slots.get(key)?.upgrade()
}
#[cfg(test)]
mod tests {
use bytesize::ByteSize;
use super::*;
fn test_config() -> LmdbEnvConfig {
LmdbEnvConfig::new(1, ByteSize::mib(16))
}
fn test_registry() -> LmdbEnvRegistry {
LmdbEnvRegistry::new()
}
#[test]
fn get_or_open_dedups_overlapping_opens() {
let registry = test_registry();
let dir = tempfile::tempdir().expect("create temp dir");
let a = registry
.get_or_open(dir.path(), &test_config())
.expect("first open");
let b = registry
.get_or_open(dir.path(), &test_config())
.expect("second open");
assert!(Arc::ptr_eq(&a, &b));
}
#[test]
fn env_closes_when_last_external_arc_drops() {
let registry = test_registry();
let dir = tempfile::tempdir().expect("create temp dir");
let handle = registry
.get_or_open(dir.path(), &test_config())
.expect("open");
let observer = Arc::downgrade(&handle);
drop(handle);
assert!(
observer.upgrade().is_none(),
"registry must not keep the env alive after the last external Arc drops"
);
registry
.get_or_open(dir.path(), &test_config())
.expect("reopen after close");
}
#[test]
fn open_on_miss_keys_on_canonical_path() {
let registry = test_registry();
let dir = tempfile::tempdir().expect("create temp dir");
let store = dir.path().join("store");
let alias = dir.path().join(".").join("store");
let via_alias = registry
.get_or_open(&alias, &test_config())
.expect("open brand-new path via non-canonical spelling");
let via_plain = registry
.get_or_open(&store, &test_config())
.expect("open via plain path");
let via_canonical = registry
.get_or_open(via_alias.path(), &test_config())
.expect("open via canonical path");
assert!(Arc::ptr_eq(&via_alias, &via_plain));
assert!(Arc::ptr_eq(&via_alias, &via_canonical));
}
#[test]
fn is_live_tracks_external_holders() {
let registry = test_registry();
let dir = tempfile::tempdir().expect("create temp dir");
let store = dir.path().join("store");
let alias = dir.path().join(".").join("store");
assert!(!registry.is_live(&store), "unopened path is not live");
let handle = registry.get_or_open(&store, &test_config()).expect("open");
assert!(registry.is_live(&store), "open env is live");
assert!(
registry.is_live(&alias),
"liveness resolves a non-canonical spelling too"
);
drop(handle);
assert!(!registry.is_live(&store), "closed env is not live");
}
#[test]
fn open_shared_env_dedups_through_the_global_registry() {
let dir = tempfile::tempdir().expect("create temp dir");
let store = dir.path().join("shared_store");
assert!(!shared_env_is_live(&store));
let a = open_shared_env(&store, &test_config()).expect("first open");
let b = open_shared_env(&store, &test_config()).expect("second open");
assert!(Arc::ptr_eq(&a, &b));
assert!(shared_env_is_live(&store));
drop(a);
drop(b);
assert!(!shared_env_is_live(&store), "closed shared env is not live");
}
}