reddb_server/physical/
shm.rs1use std::fs::{File, OpenOptions};
41use std::io::{self, Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicU8, Ordering};
44use std::time::{SystemTime, UNIX_EPOCH};
45
46pub const SHM_MAGIC: &[u8; 8] = b"RDBSHM01";
47pub const SHM_VERSION: u32 = 1;
48pub const SHM_HEADER_SIZE: usize = 64;
49pub const SHM_FILE_SIZE: u64 = 4096;
50
51static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
52
53pub fn set_shm_provisioning_enabled(enabled: bool) {
58 SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
59}
60
61pub fn shm_provisioning_enabled() -> bool {
63 match SHM_POLICY.load(Ordering::Relaxed) {
64 1 => true,
65 2 => false,
66 _ => std::env::var("REDDB_SHM_PROVISION")
67 .ok()
68 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
69 .unwrap_or(false),
70 }
71}
72
73pub fn shm_path_for(data_path: &Path) -> PathBuf {
75 let file_name = data_path
76 .file_name()
77 .map(|n| n.to_string_lossy().into_owned())
78 .unwrap_or_else(|| "data.rdb".to_string());
79 let shm_file = format!("{file_name}-shm");
80 match data_path.parent() {
81 Some(parent) if !parent.as_os_str().is_empty() => parent.join(shm_file),
82 _ => PathBuf::from(shm_file),
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ShmProvisionState {
90 Created,
92 AttachedToLiveOwner,
94 RecoveredFromCrash,
96 HealedCorruptHeader,
98}
99
100#[derive(Debug, Clone)]
101pub struct ShmHeader {
102 pub version: u32,
103 pub owner_pid: u32,
104 pub generation: u64,
105 pub reader_count: u64,
106 pub last_heartbeat_ms: u64,
107}
108
109impl ShmHeader {
110 fn encode(&self) -> [u8; SHM_HEADER_SIZE] {
111 let mut buf = [0u8; SHM_HEADER_SIZE];
112 buf[0..8].copy_from_slice(SHM_MAGIC);
113 buf[8..12].copy_from_slice(&self.version.to_le_bytes());
114 buf[12..16].copy_from_slice(&self.owner_pid.to_le_bytes());
115 buf[16..24].copy_from_slice(&self.generation.to_le_bytes());
116 buf[24..32].copy_from_slice(&self.reader_count.to_le_bytes());
117 buf[32..40].copy_from_slice(&self.last_heartbeat_ms.to_le_bytes());
118 let checksum = fold_checksum(&buf[..56]);
119 buf[56..64].copy_from_slice(&checksum.to_le_bytes());
120 buf
121 }
122
123 fn decode(buf: &[u8; SHM_HEADER_SIZE]) -> io::Result<Self> {
124 if &buf[0..8] != SHM_MAGIC {
125 return Err(io::Error::new(
126 io::ErrorKind::InvalidData,
127 "shm magic mismatch",
128 ));
129 }
130 let stored_checksum = u64::from_le_bytes(buf[56..64].try_into().unwrap());
131 let computed = fold_checksum(&buf[..56]);
132 if stored_checksum != computed {
133 return Err(io::Error::new(
134 io::ErrorKind::InvalidData,
135 "shm checksum mismatch",
136 ));
137 }
138 Ok(Self {
139 version: u32::from_le_bytes(buf[8..12].try_into().unwrap()),
140 owner_pid: u32::from_le_bytes(buf[12..16].try_into().unwrap()),
141 generation: u64::from_le_bytes(buf[16..24].try_into().unwrap()),
142 reader_count: u64::from_le_bytes(buf[24..32].try_into().unwrap()),
143 last_heartbeat_ms: u64::from_le_bytes(buf[32..40].try_into().unwrap()),
144 })
145 }
146}
147
148pub struct ShmHandle {
152 pub path: PathBuf,
153 pub header: ShmHeader,
154 pub state: ShmProvisionState,
155 file: File,
156}
157
158impl ShmHandle {
159 pub fn generation(&self) -> u64 {
162 self.header.generation
163 }
164
165 pub fn attach_reader(&mut self) -> io::Result<u64> {
167 self.header.reader_count = self.header.reader_count.saturating_add(1);
168 self.rewrite_header()?;
169 Ok(self.header.reader_count)
170 }
171
172 pub fn detach_reader(&mut self) -> io::Result<u64> {
174 self.header.reader_count = self.header.reader_count.saturating_sub(1);
175 self.rewrite_header()?;
176 Ok(self.header.reader_count)
177 }
178
179 pub fn heartbeat(&mut self) -> io::Result<()> {
181 self.header.last_heartbeat_ms = unix_ms_now();
182 self.rewrite_header()
183 }
184
185 fn rewrite_header(&mut self) -> io::Result<()> {
186 let buf = self.header.encode();
187 self.file.seek(SeekFrom::Start(0))?;
188 self.file.write_all(&buf)?;
189 self.file.sync_data()?;
190 Ok(())
191 }
192}
193
194pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
197 let path = shm_path_for(data_path);
198 if let Some(parent) = path.parent() {
199 if !parent.as_os_str().is_empty() {
200 std::fs::create_dir_all(parent)?;
201 }
202 }
203
204 let mut file = OpenOptions::new()
205 .read(true)
206 .write(true)
207 .create(true)
208 .truncate(false)
209 .open(&path)?;
210
211 let metadata = file.metadata()?;
212 let fresh = metadata.len() == 0;
213
214 if fresh {
215 file.set_len(SHM_FILE_SIZE)?;
216 let header = ShmHeader {
217 version: SHM_VERSION,
218 owner_pid: current_pid(),
219 generation: 1,
220 reader_count: 0,
221 last_heartbeat_ms: unix_ms_now(),
222 };
223 file.seek(SeekFrom::Start(0))?;
224 file.write_all(&header.encode())?;
225 file.sync_data()?;
226 return Ok(ShmHandle {
227 path,
228 header,
229 state: ShmProvisionState::Created,
230 file,
231 });
232 }
233
234 let mut buf = [0u8; SHM_HEADER_SIZE];
235 file.seek(SeekFrom::Start(0))?;
236 let existing = match file.read_exact(&mut buf) {
237 Ok(()) => ShmHeader::decode(&buf).ok(),
238 Err(_) => None,
239 };
240
241 let (header, state) = match existing {
242 Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
243 let next = ShmHeader {
245 version: SHM_VERSION,
246 owner_pid: prev.owner_pid,
247 generation: prev.generation,
248 reader_count: prev.reader_count.saturating_add(1),
249 last_heartbeat_ms: prev.last_heartbeat_ms,
250 };
251 (next, ShmProvisionState::AttachedToLiveOwner)
252 }
253 Some(prev) if prev.owner_pid == current_pid() => {
254 let next = ShmHeader {
256 version: SHM_VERSION,
257 owner_pid: prev.owner_pid,
258 generation: prev.generation,
259 reader_count: prev.reader_count,
260 last_heartbeat_ms: unix_ms_now(),
261 };
262 (next, ShmProvisionState::AttachedToLiveOwner)
263 }
264 Some(prev) => {
265 let next = ShmHeader {
267 version: SHM_VERSION,
268 owner_pid: current_pid(),
269 generation: prev.generation.saturating_add(1),
270 reader_count: 0,
271 last_heartbeat_ms: unix_ms_now(),
272 };
273 (next, ShmProvisionState::RecoveredFromCrash)
274 }
275 None => {
276 let next = ShmHeader {
278 version: SHM_VERSION,
279 owner_pid: current_pid(),
280 generation: 1,
281 reader_count: 0,
282 last_heartbeat_ms: unix_ms_now(),
283 };
284 file.set_len(SHM_FILE_SIZE)?;
285 (next, ShmProvisionState::HealedCorruptHeader)
286 }
287 };
288
289 file.seek(SeekFrom::Start(0))?;
290 file.write_all(&header.encode())?;
291 file.sync_data()?;
292
293 Ok(ShmHandle {
294 path,
295 header,
296 state,
297 file,
298 })
299}
300
301pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
305 let path = shm_path_for(data_path);
306 if !path.exists() {
307 return Ok(None);
308 }
309 let mut file = OpenOptions::new().read(true).open(&path)?;
310 let mut buf = [0u8; SHM_HEADER_SIZE];
311 file.read_exact(&mut buf)?;
312 ShmHeader::decode(&buf).map(Some)
313}
314
315fn fold_checksum(bytes: &[u8]) -> u64 {
316 let mut acc: u64 = 0xcbf29ce484222325;
317 for &byte in bytes {
318 acc ^= byte as u64;
319 acc = acc.wrapping_mul(0x100000001b3);
320 }
321 acc
322}
323
324fn unix_ms_now() -> u64 {
325 SystemTime::now()
326 .duration_since(UNIX_EPOCH)
327 .map(|d| d.as_millis() as u64)
328 .unwrap_or(0)
329}
330
331fn current_pid() -> u32 {
332 std::process::id()
333}
334
335#[cfg(unix)]
336fn pid_alive(pid: u32) -> bool {
337 if pid == 0 {
338 return false;
339 }
340 let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
343 if rc == 0 {
344 return true;
345 }
346 io::Error::last_os_error()
347 .raw_os_error()
348 .map(|e| e == libc::EPERM)
349 .unwrap_or(false)
350}
351
352#[cfg(not(unix))]
353fn pid_alive(_pid: u32) -> bool {
354 true
357}