Skip to main content

reddb_server/physical/
shm.rs

1//! `<data>.shm` shared-memory file substrate (gh-475).
2//!
3//! When provisioning is enabled (tier-wired for `Standard` and above in a
4//! later slice), opening a database creates a sibling `<data>.shm` file
5//! that carries a deterministic binary header recording the current owner
6//! pid, generation counter, and reader registry. The header is the lock
7//! protocol substrate that lets multiple embedded readers coexist on the
8//! same data file and lets the next opener detect a crashed prior owner.
9//!
10//! ## Binary layout (v1, little-endian, 64-byte fixed header)
11//!
12//! ```text
13//! offset size field             notes
14//!      0    8 magic             ASCII "RDBSHM01"
15//!      8    4 version           u32 = 1
16//!     12    4 owner_pid         u32, host pid of the writer that holds the lease
17//!     16    8 generation        u64, bumped on every owner takeover or heal
18//!     24    8 reader_count      u64, count of attached embedded readers
19//!     32    8 last_heartbeat_ms u64, owner heartbeat in unix-ms
20//!     40   16 reserved          zeroed, room for v2 fields
21//!     56    8 checksum          xxh3-style fold of bytes [0..56)
22//! ```
23//!
24//! ## Lock protocol
25//!
26//! 1. On open, the writer attempts to claim ownership. If the magic is
27//!    absent or invalid, it initialises the header with its pid and a
28//!    fresh generation. If the magic is present, it inspects
29//!    `owner_pid`: if the pid is no longer alive, this is a crash — the
30//!    new owner bumps `generation`, rewrites the header, and the load
31//!    path treats `reader_count` as authoritative for cleanup decisions
32//!    in a later slice.
33//! 2. Embedded readers attach by incrementing `reader_count` and
34//!    detach by decrementing it. The count survives the writer crash
35//!    so the next opener sees how many stale handles must be cleaned.
36//! 3. mmap-ing the file is a follow-up slice; the on-disk substrate is
37//!    valid without it. The file size is fixed at one OS page so mmap
38//!    integration is mechanical when wired.
39
40use std::fs::{File, OpenOptions};
41use std::io::{self, Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicU8, Ordering};
44use std::time::{SystemTime, UNIX_EPOCH};
45
46pub const SHM_MAGIC: &[u8; 8] = b"RDBSHM01";
47pub const SHM_VERSION: u32 = 1;
48pub const SHM_HEADER_SIZE: usize = 64;
49pub const SHM_FILE_SIZE: u64 = 4096;
50
51static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
52
53/// Process-wide opt-in for `<data>.shm` provisioning. Default off so
54/// existing single-writer flows (`minimal`) keep their current shape.
55/// Tier wiring should call this with `true` when `tier >= Standard`.
56/// Escape hatch: `REDDB_SHM_PROVISION=1`.
57pub fn set_shm_provisioning_enabled(enabled: bool) {
58    SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
59}
60
61/// Whether the open path should provision a `<data>.shm` file.
62pub fn shm_provisioning_enabled() -> bool {
63    match SHM_POLICY.load(Ordering::Relaxed) {
64        1 => true,
65        2 => false,
66        _ => std::env::var("REDDB_SHM_PROVISION")
67            .ok()
68            .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
69            .unwrap_or(false),
70    }
71}
72
73/// Sibling path of the `-shm` substrate file for a given data file.
74pub fn shm_path_for(data_path: &Path) -> PathBuf {
75    let file_name = data_path
76        .file_name()
77        .map(|n| n.to_string_lossy().into_owned())
78        .unwrap_or_else(|| "data.rdb".to_string());
79    let shm_file = format!("{file_name}-shm");
80    match data_path.parent() {
81        Some(parent) if !parent.as_os_str().is_empty() => parent.join(shm_file),
82        _ => PathBuf::from(shm_file),
83    }
84}
85
86/// Outcome of a provisioning attempt — distinguishes a clean takeover
87/// from a crash recovery for diagnostics and tests.
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ShmProvisionState {
90    /// File did not exist; created fresh.
91    Created,
92    /// Existing owner pid is still alive; attached as an additional handle.
93    AttachedToLiveOwner,
94    /// Existing owner pid is dead; took ownership and bumped generation.
95    RecoveredFromCrash,
96    /// File existed but the header was unreadable; reinitialised.
97    HealedCorruptHeader,
98}
99
100#[derive(Debug, Clone)]
101pub struct ShmHeader {
102    pub version: u32,
103    pub owner_pid: u32,
104    pub generation: u64,
105    pub reader_count: u64,
106    pub last_heartbeat_ms: u64,
107}
108
109impl ShmHeader {
110    fn encode(&self) -> [u8; SHM_HEADER_SIZE] {
111        let mut buf = [0u8; SHM_HEADER_SIZE];
112        buf[0..8].copy_from_slice(SHM_MAGIC);
113        buf[8..12].copy_from_slice(&self.version.to_le_bytes());
114        buf[12..16].copy_from_slice(&self.owner_pid.to_le_bytes());
115        buf[16..24].copy_from_slice(&self.generation.to_le_bytes());
116        buf[24..32].copy_from_slice(&self.reader_count.to_le_bytes());
117        buf[32..40].copy_from_slice(&self.last_heartbeat_ms.to_le_bytes());
118        let checksum = fold_checksum(&buf[..56]);
119        buf[56..64].copy_from_slice(&checksum.to_le_bytes());
120        buf
121    }
122
123    fn decode(buf: &[u8; SHM_HEADER_SIZE]) -> io::Result<Self> {
124        if &buf[0..8] != SHM_MAGIC {
125            return Err(io::Error::new(io::ErrorKind::InvalidData, "shm magic mismatch"));
126        }
127        let stored_checksum = u64::from_le_bytes(buf[56..64].try_into().unwrap());
128        let computed = fold_checksum(&buf[..56]);
129        if stored_checksum != computed {
130            return Err(io::Error::new(io::ErrorKind::InvalidData, "shm checksum mismatch"));
131        }
132        Ok(Self {
133            version: u32::from_le_bytes(buf[8..12].try_into().unwrap()),
134            owner_pid: u32::from_le_bytes(buf[12..16].try_into().unwrap()),
135            generation: u64::from_le_bytes(buf[16..24].try_into().unwrap()),
136            reader_count: u64::from_le_bytes(buf[24..32].try_into().unwrap()),
137            last_heartbeat_ms: u64::from_le_bytes(buf[32..40].try_into().unwrap()),
138        })
139    }
140}
141
142/// Handle to the provisioned `-shm` file. Drop semantics intentionally
143/// minimal in this slice — callers must invoke [`Self::detach_reader`]
144/// explicitly to mirror the eventual mmap-backed lifecycle.
145pub struct ShmHandle {
146    pub path: PathBuf,
147    pub header: ShmHeader,
148    pub state: ShmProvisionState,
149    file: File,
150}
151
152impl ShmHandle {
153    /// Current generation counter. Bumps on every crash recovery so
154    /// observers can detect that ownership changed between snapshots.
155    pub fn generation(&self) -> u64 {
156        self.header.generation
157    }
158
159    /// Increment the on-disk reader counter. Returns the new count.
160    pub fn attach_reader(&mut self) -> io::Result<u64> {
161        self.header.reader_count = self.header.reader_count.saturating_add(1);
162        self.rewrite_header()?;
163        Ok(self.header.reader_count)
164    }
165
166    /// Decrement the on-disk reader counter (saturating). Returns new count.
167    pub fn detach_reader(&mut self) -> io::Result<u64> {
168        self.header.reader_count = self.header.reader_count.saturating_sub(1);
169        self.rewrite_header()?;
170        Ok(self.header.reader_count)
171    }
172
173    /// Refresh `last_heartbeat_ms` to the current unix-ms.
174    pub fn heartbeat(&mut self) -> io::Result<()> {
175        self.header.last_heartbeat_ms = unix_ms_now();
176        self.rewrite_header()
177    }
178
179    fn rewrite_header(&mut self) -> io::Result<()> {
180        let buf = self.header.encode();
181        self.file.seek(SeekFrom::Start(0))?;
182        self.file.write_all(&buf)?;
183        self.file.sync_data()?;
184        Ok(())
185    }
186}
187
188/// Provision the `-shm` substrate for a data file. Idempotent; safe to
189/// call from every open. See module docs for the lock protocol.
190pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
191    let path = shm_path_for(data_path);
192    if let Some(parent) = path.parent() {
193        if !parent.as_os_str().is_empty() {
194            std::fs::create_dir_all(parent)?;
195        }
196    }
197
198    let mut file = OpenOptions::new()
199        .read(true)
200        .write(true)
201        .create(true)
202        .truncate(false)
203        .open(&path)?;
204
205    let metadata = file.metadata()?;
206    let fresh = metadata.len() == 0;
207
208    if fresh {
209        file.set_len(SHM_FILE_SIZE)?;
210        let header = ShmHeader {
211            version: SHM_VERSION,
212            owner_pid: current_pid(),
213            generation: 1,
214            reader_count: 0,
215            last_heartbeat_ms: unix_ms_now(),
216        };
217        file.seek(SeekFrom::Start(0))?;
218        file.write_all(&header.encode())?;
219        file.sync_data()?;
220        return Ok(ShmHandle {
221            path,
222            header,
223            state: ShmProvisionState::Created,
224            file,
225        });
226    }
227
228    let mut buf = [0u8; SHM_HEADER_SIZE];
229    file.seek(SeekFrom::Start(0))?;
230    let existing = match file.read_exact(&mut buf) {
231        Ok(()) => ShmHeader::decode(&buf).ok(),
232        Err(_) => None,
233    };
234
235    let (header, state) = match existing {
236        Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
237            // Attach to live owner — increment reader_count, keep generation.
238            let next = ShmHeader {
239                version: SHM_VERSION,
240                owner_pid: prev.owner_pid,
241                generation: prev.generation,
242                reader_count: prev.reader_count.saturating_add(1),
243                last_heartbeat_ms: prev.last_heartbeat_ms,
244            };
245            (next, ShmProvisionState::AttachedToLiveOwner)
246        }
247        Some(prev) if prev.owner_pid == current_pid() => {
248            // Same-process reopen; refresh heartbeat, keep counters.
249            let next = ShmHeader {
250                version: SHM_VERSION,
251                owner_pid: prev.owner_pid,
252                generation: prev.generation,
253                reader_count: prev.reader_count,
254                last_heartbeat_ms: unix_ms_now(),
255            };
256            (next, ShmProvisionState::AttachedToLiveOwner)
257        }
258        Some(prev) => {
259            // Owner is dead — take over, bump generation, clear reader count.
260            let next = ShmHeader {
261                version: SHM_VERSION,
262                owner_pid: current_pid(),
263                generation: prev.generation.saturating_add(1),
264                reader_count: 0,
265                last_heartbeat_ms: unix_ms_now(),
266            };
267            (next, ShmProvisionState::RecoveredFromCrash)
268        }
269        None => {
270            // File exists but header is unreadable — heal in place.
271            let next = ShmHeader {
272                version: SHM_VERSION,
273                owner_pid: current_pid(),
274                generation: 1,
275                reader_count: 0,
276                last_heartbeat_ms: unix_ms_now(),
277            };
278            file.set_len(SHM_FILE_SIZE)?;
279            (next, ShmProvisionState::HealedCorruptHeader)
280        }
281    };
282
283    file.seek(SeekFrom::Start(0))?;
284    file.write_all(&header.encode())?;
285    file.sync_data()?;
286
287    Ok(ShmHandle {
288        path,
289        header,
290        state,
291        file,
292    })
293}
294
295/// Read the current header without taking ownership. Returns `Ok(None)`
296/// when the file does not exist; surfaces a real I/O error if the file
297/// is present but unreadable.
298pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
299    let path = shm_path_for(data_path);
300    if !path.exists() {
301        return Ok(None);
302    }
303    let mut file = OpenOptions::new().read(true).open(&path)?;
304    let mut buf = [0u8; SHM_HEADER_SIZE];
305    file.read_exact(&mut buf)?;
306    ShmHeader::decode(&buf).map(Some)
307}
308
309fn fold_checksum(bytes: &[u8]) -> u64 {
310    let mut acc: u64 = 0xcbf29ce484222325;
311    for &byte in bytes {
312        acc ^= byte as u64;
313        acc = acc.wrapping_mul(0x100000001b3);
314    }
315    acc
316}
317
318fn unix_ms_now() -> u64 {
319    SystemTime::now()
320        .duration_since(UNIX_EPOCH)
321        .map(|d| d.as_millis() as u64)
322        .unwrap_or(0)
323}
324
325fn current_pid() -> u32 {
326    std::process::id()
327}
328
329#[cfg(unix)]
330fn pid_alive(pid: u32) -> bool {
331    if pid == 0 {
332        return false;
333    }
334    // `kill(pid, 0)` returns 0 if the process exists, -1 otherwise.
335    // EPERM still implies the process exists (we just can't signal it).
336    let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
337    if rc == 0 {
338        return true;
339    }
340    io::Error::last_os_error()
341        .raw_os_error()
342        .map(|e| e == libc::EPERM)
343        .unwrap_or(false)
344}
345
346#[cfg(not(unix))]
347fn pid_alive(_pid: u32) -> bool {
348    // Conservative: assume alive on non-unix until a platform-specific
349    // probe is wired. Crash recovery on those platforms is a follow-up.
350    true
351}