use std::fs::{File, OpenOptions};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
pub use reddb_file::{ShmHeader, SHM_FILE_SIZE, SHM_HEADER_SIZE, SHM_MAGIC, SHM_VERSION};
static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
pub fn set_shm_provisioning_enabled(enabled: bool) {
SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
}
pub fn shm_provisioning_enabled() -> bool {
match SHM_POLICY.load(Ordering::Relaxed) {
1 => true,
2 => false,
_ => std::env::var("REDDB_SHM_PROVISION")
.ok()
.map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
.unwrap_or(false),
}
}
pub fn shm_path_for(data_path: &Path) -> PathBuf {
reddb_file::layout::shm_path(data_path)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ShmProvisionState {
Created,
AttachedToLiveOwner,
RecoveredFromCrash,
HealedCorruptHeader,
}
pub struct ShmHandle {
pub path: PathBuf,
pub header: ShmHeader,
pub state: ShmProvisionState,
file: File,
}
impl ShmHandle {
pub fn generation(&self) -> u64 {
self.header.generation
}
pub fn attach_reader(&mut self) -> io::Result<u64> {
self.header.reader_count = self.header.reader_count.saturating_add(1);
self.rewrite_header()?;
Ok(self.header.reader_count)
}
pub fn detach_reader(&mut self) -> io::Result<u64> {
self.header.reader_count = self.header.reader_count.saturating_sub(1);
self.rewrite_header()?;
Ok(self.header.reader_count)
}
pub fn heartbeat(&mut self) -> io::Result<()> {
self.header.last_heartbeat_ms = unix_ms_now();
self.rewrite_header()
}
fn rewrite_header(&mut self) -> io::Result<()> {
reddb_file::write_shm_header_to_file(&mut self.file, &self.header)
}
}
pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
let path = shm_path_for(data_path);
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
let metadata = file.metadata()?;
let fresh = metadata.len() == 0;
if fresh {
let header = ShmHeader::new(current_pid(), 1, 0, unix_ms_now());
reddb_file::initialize_shm_file(&mut file, &header)?;
return Ok(ShmHandle {
path,
header,
state: ShmProvisionState::Created,
file,
});
}
let existing = reddb_file::read_shm_header_from_file(&mut file).ok();
let (header, state) = match existing {
Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
let next = ShmHeader::new(
prev.owner_pid,
prev.generation,
prev.reader_count.saturating_add(1),
prev.last_heartbeat_ms,
);
(next, ShmProvisionState::AttachedToLiveOwner)
}
Some(prev) if prev.owner_pid == current_pid() => {
let next = ShmHeader::new(
prev.owner_pid,
prev.generation,
prev.reader_count,
unix_ms_now(),
);
(next, ShmProvisionState::AttachedToLiveOwner)
}
Some(prev) => {
let next = ShmHeader::new(
current_pid(),
prev.generation.saturating_add(1),
0,
unix_ms_now(),
);
(next, ShmProvisionState::RecoveredFromCrash)
}
None => {
let next = ShmHeader::new(current_pid(), 1, 0, unix_ms_now());
reddb_file::initialize_shm_file(&mut file, &next)?;
(next, ShmProvisionState::HealedCorruptHeader)
}
};
reddb_file::write_shm_header_to_file(&mut file, &header)?;
Ok(ShmHandle {
path,
header,
state,
file,
})
}
pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
let path = shm_path_for(data_path);
if !path.exists() {
return Ok(None);
}
let mut file = OpenOptions::new().read(true).open(&path)?;
reddb_file::read_shm_header_from_file(&mut file).map(Some)
}
fn unix_ms_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn current_pid() -> u32 {
std::process::id()
}
#[cfg(unix)]
fn pid_alive(pid: u32) -> bool {
if pid == 0 {
return false;
}
let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
if rc == 0 {
return true;
}
io::Error::last_os_error()
.raw_os_error()
.map(|e| e == libc::EPERM)
.unwrap_or(false)
}
#[cfg(not(unix))]
fn pid_alive(_pid: u32) -> bool {
true
}