Skip to main content

reddb_server/physical/
shm.rs

1//! Shared-memory file substrate (gh-475).
2//!
3//! When provisioning is enabled (tier-wired for `Standard` and above in a
4//! later slice), opening a database creates the sibling file selected by
5//! `reddb_file::layout::shm_path` with a deterministic binary header owned by
6//! `reddb-file`. This module owns the runtime lock policy: current owner pid,
7//! generation counter, reader registry, and crash takeover decisions.
8//!
9//! ## Lock protocol
10//!
11//! 1. On open, the writer attempts to claim ownership. If the magic is
12//!    absent or invalid, it initialises the header with its pid and a
13//!    fresh generation. If the magic is present, it inspects
14//!    `owner_pid`: if the pid is no longer alive, this is a crash — the
15//!    new owner bumps `generation`, rewrites the header, and the load
16//!    path treats `reader_count` as authoritative for cleanup decisions
17//!    in a later slice.
18//! 2. Embedded readers attach by incrementing `reader_count` and
19//!    detach by decrementing it. The count survives the writer crash
20//!    so the next opener sees how many stale handles must be cleaned.
21//! 3. mmap-ing the file is a follow-up slice; the on-disk substrate is
22//!    valid without it. The file size is fixed at one OS page so mmap
23//!    integration is mechanical when wired.
24
25use std::fs::{File, OpenOptions};
26use std::io;
27use std::path::{Path, PathBuf};
28use std::sync::atomic::{AtomicU8, Ordering};
29use std::time::{SystemTime, UNIX_EPOCH};
30
31pub use reddb_file::{ShmHeader, SHM_FILE_SIZE, SHM_HEADER_SIZE, SHM_MAGIC, SHM_VERSION};
32
33static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
34
35/// Process-wide opt-in for SHM provisioning. Default off so
36/// existing single-writer flows (`minimal`) keep their current shape.
37/// Tier wiring should call this with `true` when `tier >= Standard`.
38/// Escape hatch: `REDDB_SHM_PROVISION=1`.
39pub fn set_shm_provisioning_enabled(enabled: bool) {
40    SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
41}
42
43/// Whether the open path should provision a SHM file.
44pub fn shm_provisioning_enabled() -> bool {
45    match SHM_POLICY.load(Ordering::Relaxed) {
46        1 => true,
47        2 => false,
48        _ => std::env::var("REDDB_SHM_PROVISION")
49            .ok()
50            .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
51            .unwrap_or(false),
52    }
53}
54
55/// Sibling path of the `-shm` substrate file for a given data file.
56pub fn shm_path_for(data_path: &Path) -> PathBuf {
57    reddb_file::layout::shm_path(data_path)
58}
59
60/// Outcome of a provisioning attempt — distinguishes a clean takeover
61/// from a crash recovery for diagnostics and tests.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ShmProvisionState {
64    /// File did not exist; created fresh.
65    Created,
66    /// Existing owner pid is still alive; attached as an additional handle.
67    AttachedToLiveOwner,
68    /// Existing owner pid is dead; took ownership and bumped generation.
69    RecoveredFromCrash,
70    /// File existed but the header was unreadable; reinitialised.
71    HealedCorruptHeader,
72}
73
74/// Handle to the provisioned `-shm` file. Drop semantics intentionally
75/// minimal in this slice — callers must invoke [`Self::detach_reader`]
76/// explicitly to mirror the eventual mmap-backed lifecycle.
77pub struct ShmHandle {
78    pub path: PathBuf,
79    pub header: ShmHeader,
80    pub state: ShmProvisionState,
81    file: File,
82}
83
84impl ShmHandle {
85    /// Current generation counter. Bumps on every crash recovery so
86    /// observers can detect that ownership changed between snapshots.
87    pub fn generation(&self) -> u64 {
88        self.header.generation
89    }
90
91    /// Increment the on-disk reader counter. Returns the new count.
92    pub fn attach_reader(&mut self) -> io::Result<u64> {
93        self.header.reader_count = self.header.reader_count.saturating_add(1);
94        self.rewrite_header()?;
95        Ok(self.header.reader_count)
96    }
97
98    /// Decrement the on-disk reader counter (saturating). Returns new count.
99    pub fn detach_reader(&mut self) -> io::Result<u64> {
100        self.header.reader_count = self.header.reader_count.saturating_sub(1);
101        self.rewrite_header()?;
102        Ok(self.header.reader_count)
103    }
104
105    /// Refresh `last_heartbeat_ms` to the current unix-ms.
106    pub fn heartbeat(&mut self) -> io::Result<()> {
107        self.header.last_heartbeat_ms = unix_ms_now();
108        self.rewrite_header()
109    }
110
111    fn rewrite_header(&mut self) -> io::Result<()> {
112        reddb_file::write_shm_header_to_file(&mut self.file, &self.header)
113    }
114}
115
116/// Provision the `-shm` substrate for a data file. Idempotent; safe to
117/// call from every open. See module docs for the lock protocol.
118pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
119    let path = shm_path_for(data_path);
120    if let Some(parent) = path.parent() {
121        if !parent.as_os_str().is_empty() {
122            std::fs::create_dir_all(parent)?;
123        }
124    }
125
126    let mut file = OpenOptions::new()
127        .read(true)
128        .write(true)
129        .create(true)
130        .truncate(false)
131        .open(&path)?;
132
133    let metadata = file.metadata()?;
134    let fresh = metadata.len() == 0;
135
136    if fresh {
137        let header = ShmHeader::new(current_pid(), 1, 0, unix_ms_now());
138        reddb_file::initialize_shm_file(&mut file, &header)?;
139        return Ok(ShmHandle {
140            path,
141            header,
142            state: ShmProvisionState::Created,
143            file,
144        });
145    }
146
147    let existing = reddb_file::read_shm_header_from_file(&mut file).ok();
148
149    let (header, state) = match existing {
150        Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
151            // Attach to live owner — increment reader_count, keep generation.
152            let next = ShmHeader::new(
153                prev.owner_pid,
154                prev.generation,
155                prev.reader_count.saturating_add(1),
156                prev.last_heartbeat_ms,
157            );
158            (next, ShmProvisionState::AttachedToLiveOwner)
159        }
160        Some(prev) if prev.owner_pid == current_pid() => {
161            // Same-process reopen; refresh heartbeat, keep counters.
162            let next = ShmHeader::new(
163                prev.owner_pid,
164                prev.generation,
165                prev.reader_count,
166                unix_ms_now(),
167            );
168            (next, ShmProvisionState::AttachedToLiveOwner)
169        }
170        Some(prev) => {
171            // Owner is dead — take over, bump generation, clear reader count.
172            let next = ShmHeader::new(
173                current_pid(),
174                prev.generation.saturating_add(1),
175                0,
176                unix_ms_now(),
177            );
178            (next, ShmProvisionState::RecoveredFromCrash)
179        }
180        None => {
181            // File exists but header is unreadable — heal in place.
182            let next = ShmHeader::new(current_pid(), 1, 0, unix_ms_now());
183            reddb_file::initialize_shm_file(&mut file, &next)?;
184            (next, ShmProvisionState::HealedCorruptHeader)
185        }
186    };
187
188    reddb_file::write_shm_header_to_file(&mut file, &header)?;
189
190    Ok(ShmHandle {
191        path,
192        header,
193        state,
194        file,
195    })
196}
197
198/// Read the current header without taking ownership. Returns `Ok(None)`
199/// when the file does not exist; surfaces a real I/O error if the file
200/// is present but unreadable.
201pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
202    let path = shm_path_for(data_path);
203    if !path.exists() {
204        return Ok(None);
205    }
206    let mut file = OpenOptions::new().read(true).open(&path)?;
207    reddb_file::read_shm_header_from_file(&mut file).map(Some)
208}
209
210fn unix_ms_now() -> u64 {
211    SystemTime::now()
212        .duration_since(UNIX_EPOCH)
213        .map(|d| d.as_millis() as u64)
214        .unwrap_or(0)
215}
216
217fn current_pid() -> u32 {
218    std::process::id()
219}
220
221#[cfg(unix)]
222fn pid_alive(pid: u32) -> bool {
223    if pid == 0 {
224        return false;
225    }
226    // `kill(pid, 0)` returns 0 if the process exists, -1 otherwise.
227    // EPERM still implies the process exists (we just can't signal it).
228    let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
229    if rc == 0 {
230        return true;
231    }
232    io::Error::last_os_error()
233        .raw_os_error()
234        .map(|e| e == libc::EPERM)
235        .unwrap_or(false)
236}
237
238#[cfg(not(unix))]
239fn pid_alive(_pid: u32) -> bool {
240    // Conservative: assume alive on non-unix until a platform-specific
241    // probe is wired. Crash recovery on those platforms is a follow-up.
242    true
243}