Skip to main content

reddb_server/physical/
shm.rs

1//! `<data>.shm` shared-memory file substrate (gh-475).
2//!
3//! When provisioning is enabled (tier-wired for `Standard` and above in a
4//! later slice), opening a database creates a sibling `<data>.shm` file
5//! that carries a deterministic binary header recording the current owner
6//! pid, generation counter, and reader registry. The header is the lock
7//! protocol substrate that lets multiple embedded readers coexist on the
8//! same data file and lets the next opener detect a crashed prior owner.
9//!
10//! ## Binary layout (v1, little-endian, 64-byte fixed header)
11//!
12//! ```text
13//! offset size field             notes
14//!      0    8 magic             ASCII "RDBSHM01"
15//!      8    4 version           u32 = 1
16//!     12    4 owner_pid         u32, host pid of the writer that holds the lease
17//!     16    8 generation        u64, bumped on every owner takeover or heal
18//!     24    8 reader_count      u64, count of attached embedded readers
19//!     32    8 last_heartbeat_ms u64, owner heartbeat in unix-ms
20//!     40   16 reserved          zeroed, room for v2 fields
21//!     56    8 checksum          xxh3-style fold of bytes [0..56)
22//! ```
23//!
24//! ## Lock protocol
25//!
26//! 1. On open, the writer attempts to claim ownership. If the magic is
27//!    absent or invalid, it initialises the header with its pid and a
28//!    fresh generation. If the magic is present, it inspects
29//!    `owner_pid`: if the pid is no longer alive, this is a crash — the
30//!    new owner bumps `generation`, rewrites the header, and the load
31//!    path treats `reader_count` as authoritative for cleanup decisions
32//!    in a later slice.
33//! 2. Embedded readers attach by incrementing `reader_count` and
34//!    detach by decrementing it. The count survives the writer crash
35//!    so the next opener sees how many stale handles must be cleaned.
36//! 3. mmap-ing the file is a follow-up slice; the on-disk substrate is
37//!    valid without it. The file size is fixed at one OS page so mmap
38//!    integration is mechanical when wired.
39
40use std::fs::{File, OpenOptions};
41use std::io::{self, Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicU8, Ordering};
44use std::time::{SystemTime, UNIX_EPOCH};
45
46pub const SHM_MAGIC: &[u8; 8] = b"RDBSHM01";
47pub const SHM_VERSION: u32 = 1;
48pub const SHM_HEADER_SIZE: usize = 64;
49pub const SHM_FILE_SIZE: u64 = 4096;
50
51static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
52
53/// Process-wide opt-in for `<data>.shm` provisioning. Default off so
54/// existing single-writer flows (`minimal`) keep their current shape.
55/// Tier wiring should call this with `true` when `tier >= Standard`.
56/// Escape hatch: `REDDB_SHM_PROVISION=1`.
57pub fn set_shm_provisioning_enabled(enabled: bool) {
58    SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
59}
60
61/// Whether the open path should provision a `<data>.shm` file.
62pub fn shm_provisioning_enabled() -> bool {
63    match SHM_POLICY.load(Ordering::Relaxed) {
64        1 => true,
65        2 => false,
66        _ => std::env::var("REDDB_SHM_PROVISION")
67            .ok()
68            .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
69            .unwrap_or(false),
70    }
71}
72
73/// Sibling path of the `-shm` substrate file for a given data file.
74pub fn shm_path_for(data_path: &Path) -> PathBuf {
75    let file_name = data_path
76        .file_name()
77        .map(|n| n.to_string_lossy().into_owned())
78        .unwrap_or_else(|| "data.rdb".to_string());
79    let shm_file = format!("{file_name}-shm");
80    match data_path.parent() {
81        Some(parent) if !parent.as_os_str().is_empty() => parent.join(shm_file),
82        _ => PathBuf::from(shm_file),
83    }
84}
85
86/// Outcome of a provisioning attempt — distinguishes a clean takeover
87/// from a crash recovery for diagnostics and tests.
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ShmProvisionState {
90    /// File did not exist; created fresh.
91    Created,
92    /// Existing owner pid is still alive; attached as an additional handle.
93    AttachedToLiveOwner,
94    /// Existing owner pid is dead; took ownership and bumped generation.
95    RecoveredFromCrash,
96    /// File existed but the header was unreadable; reinitialised.
97    HealedCorruptHeader,
98}
99
100#[derive(Debug, Clone)]
101pub struct ShmHeader {
102    pub version: u32,
103    pub owner_pid: u32,
104    pub generation: u64,
105    pub reader_count: u64,
106    pub last_heartbeat_ms: u64,
107}
108
109impl ShmHeader {
110    fn encode(&self) -> [u8; SHM_HEADER_SIZE] {
111        let mut buf = [0u8; SHM_HEADER_SIZE];
112        buf[0..8].copy_from_slice(SHM_MAGIC);
113        buf[8..12].copy_from_slice(&self.version.to_le_bytes());
114        buf[12..16].copy_from_slice(&self.owner_pid.to_le_bytes());
115        buf[16..24].copy_from_slice(&self.generation.to_le_bytes());
116        buf[24..32].copy_from_slice(&self.reader_count.to_le_bytes());
117        buf[32..40].copy_from_slice(&self.last_heartbeat_ms.to_le_bytes());
118        let checksum = fold_checksum(&buf[..56]);
119        buf[56..64].copy_from_slice(&checksum.to_le_bytes());
120        buf
121    }
122
123    fn decode(buf: &[u8; SHM_HEADER_SIZE]) -> io::Result<Self> {
124        if &buf[0..8] != SHM_MAGIC {
125            return Err(io::Error::new(
126                io::ErrorKind::InvalidData,
127                "shm magic mismatch",
128            ));
129        }
130        let stored_checksum = u64::from_le_bytes(buf[56..64].try_into().unwrap());
131        let computed = fold_checksum(&buf[..56]);
132        if stored_checksum != computed {
133            return Err(io::Error::new(
134                io::ErrorKind::InvalidData,
135                "shm checksum mismatch",
136            ));
137        }
138        Ok(Self {
139            version: u32::from_le_bytes(buf[8..12].try_into().unwrap()),
140            owner_pid: u32::from_le_bytes(buf[12..16].try_into().unwrap()),
141            generation: u64::from_le_bytes(buf[16..24].try_into().unwrap()),
142            reader_count: u64::from_le_bytes(buf[24..32].try_into().unwrap()),
143            last_heartbeat_ms: u64::from_le_bytes(buf[32..40].try_into().unwrap()),
144        })
145    }
146}
147
148/// Handle to the provisioned `-shm` file. Drop semantics intentionally
149/// minimal in this slice — callers must invoke [`Self::detach_reader`]
150/// explicitly to mirror the eventual mmap-backed lifecycle.
151pub struct ShmHandle {
152    pub path: PathBuf,
153    pub header: ShmHeader,
154    pub state: ShmProvisionState,
155    file: File,
156}
157
158impl ShmHandle {
159    /// Current generation counter. Bumps on every crash recovery so
160    /// observers can detect that ownership changed between snapshots.
161    pub fn generation(&self) -> u64 {
162        self.header.generation
163    }
164
165    /// Increment the on-disk reader counter. Returns the new count.
166    pub fn attach_reader(&mut self) -> io::Result<u64> {
167        self.header.reader_count = self.header.reader_count.saturating_add(1);
168        self.rewrite_header()?;
169        Ok(self.header.reader_count)
170    }
171
172    /// Decrement the on-disk reader counter (saturating). Returns new count.
173    pub fn detach_reader(&mut self) -> io::Result<u64> {
174        self.header.reader_count = self.header.reader_count.saturating_sub(1);
175        self.rewrite_header()?;
176        Ok(self.header.reader_count)
177    }
178
179    /// Refresh `last_heartbeat_ms` to the current unix-ms.
180    pub fn heartbeat(&mut self) -> io::Result<()> {
181        self.header.last_heartbeat_ms = unix_ms_now();
182        self.rewrite_header()
183    }
184
185    fn rewrite_header(&mut self) -> io::Result<()> {
186        let buf = self.header.encode();
187        self.file.seek(SeekFrom::Start(0))?;
188        self.file.write_all(&buf)?;
189        self.file.sync_data()?;
190        Ok(())
191    }
192}
193
194/// Provision the `-shm` substrate for a data file. Idempotent; safe to
195/// call from every open. See module docs for the lock protocol.
196pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
197    let path = shm_path_for(data_path);
198    if let Some(parent) = path.parent() {
199        if !parent.as_os_str().is_empty() {
200            std::fs::create_dir_all(parent)?;
201        }
202    }
203
204    let mut file = OpenOptions::new()
205        .read(true)
206        .write(true)
207        .create(true)
208        .truncate(false)
209        .open(&path)?;
210
211    let metadata = file.metadata()?;
212    let fresh = metadata.len() == 0;
213
214    if fresh {
215        file.set_len(SHM_FILE_SIZE)?;
216        let header = ShmHeader {
217            version: SHM_VERSION,
218            owner_pid: current_pid(),
219            generation: 1,
220            reader_count: 0,
221            last_heartbeat_ms: unix_ms_now(),
222        };
223        file.seek(SeekFrom::Start(0))?;
224        file.write_all(&header.encode())?;
225        file.sync_data()?;
226        return Ok(ShmHandle {
227            path,
228            header,
229            state: ShmProvisionState::Created,
230            file,
231        });
232    }
233
234    let mut buf = [0u8; SHM_HEADER_SIZE];
235    file.seek(SeekFrom::Start(0))?;
236    let existing = match file.read_exact(&mut buf) {
237        Ok(()) => ShmHeader::decode(&buf).ok(),
238        Err(_) => None,
239    };
240
241    let (header, state) = match existing {
242        Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
243            // Attach to live owner — increment reader_count, keep generation.
244            let next = ShmHeader {
245                version: SHM_VERSION,
246                owner_pid: prev.owner_pid,
247                generation: prev.generation,
248                reader_count: prev.reader_count.saturating_add(1),
249                last_heartbeat_ms: prev.last_heartbeat_ms,
250            };
251            (next, ShmProvisionState::AttachedToLiveOwner)
252        }
253        Some(prev) if prev.owner_pid == current_pid() => {
254            // Same-process reopen; refresh heartbeat, keep counters.
255            let next = ShmHeader {
256                version: SHM_VERSION,
257                owner_pid: prev.owner_pid,
258                generation: prev.generation,
259                reader_count: prev.reader_count,
260                last_heartbeat_ms: unix_ms_now(),
261            };
262            (next, ShmProvisionState::AttachedToLiveOwner)
263        }
264        Some(prev) => {
265            // Owner is dead — take over, bump generation, clear reader count.
266            let next = ShmHeader {
267                version: SHM_VERSION,
268                owner_pid: current_pid(),
269                generation: prev.generation.saturating_add(1),
270                reader_count: 0,
271                last_heartbeat_ms: unix_ms_now(),
272            };
273            (next, ShmProvisionState::RecoveredFromCrash)
274        }
275        None => {
276            // File exists but header is unreadable — heal in place.
277            let next = ShmHeader {
278                version: SHM_VERSION,
279                owner_pid: current_pid(),
280                generation: 1,
281                reader_count: 0,
282                last_heartbeat_ms: unix_ms_now(),
283            };
284            file.set_len(SHM_FILE_SIZE)?;
285            (next, ShmProvisionState::HealedCorruptHeader)
286        }
287    };
288
289    file.seek(SeekFrom::Start(0))?;
290    file.write_all(&header.encode())?;
291    file.sync_data()?;
292
293    Ok(ShmHandle {
294        path,
295        header,
296        state,
297        file,
298    })
299}
300
301/// Read the current header without taking ownership. Returns `Ok(None)`
302/// when the file does not exist; surfaces a real I/O error if the file
303/// is present but unreadable.
304pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
305    let path = shm_path_for(data_path);
306    if !path.exists() {
307        return Ok(None);
308    }
309    let mut file = OpenOptions::new().read(true).open(&path)?;
310    let mut buf = [0u8; SHM_HEADER_SIZE];
311    file.read_exact(&mut buf)?;
312    ShmHeader::decode(&buf).map(Some)
313}
314
315fn fold_checksum(bytes: &[u8]) -> u64 {
316    let mut acc: u64 = 0xcbf29ce484222325;
317    for &byte in bytes {
318        acc ^= byte as u64;
319        acc = acc.wrapping_mul(0x100000001b3);
320    }
321    acc
322}
323
324fn unix_ms_now() -> u64 {
325    SystemTime::now()
326        .duration_since(UNIX_EPOCH)
327        .map(|d| d.as_millis() as u64)
328        .unwrap_or(0)
329}
330
331fn current_pid() -> u32 {
332    std::process::id()
333}
334
335#[cfg(unix)]
336fn pid_alive(pid: u32) -> bool {
337    if pid == 0 {
338        return false;
339    }
340    // `kill(pid, 0)` returns 0 if the process exists, -1 otherwise.
341    // EPERM still implies the process exists (we just can't signal it).
342    let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
343    if rc == 0 {
344        return true;
345    }
346    io::Error::last_os_error()
347        .raw_os_error()
348        .map(|e| e == libc::EPERM)
349        .unwrap_or(false)
350}
351
352#[cfg(not(unix))]
353fn pid_alive(_pid: u32) -> bool {
354    // Conservative: assume alive on non-unix until a platform-specific
355    // probe is wired. Crash recovery on those platforms is a follow-up.
356    true
357}