reddb_server/physical/
shm.rs1use std::fs::{File, OpenOptions};
41use std::io::{self, Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicU8, Ordering};
44use std::time::{SystemTime, UNIX_EPOCH};
45
46pub const SHM_MAGIC: &[u8; 8] = b"RDBSHM01";
47pub const SHM_VERSION: u32 = 1;
48pub const SHM_HEADER_SIZE: usize = 64;
49pub const SHM_FILE_SIZE: u64 = 4096;
50
51static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
52
53pub fn set_shm_provisioning_enabled(enabled: bool) {
58 SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
59}
60
61pub fn shm_provisioning_enabled() -> bool {
63 match SHM_POLICY.load(Ordering::Relaxed) {
64 1 => true,
65 2 => false,
66 _ => std::env::var("REDDB_SHM_PROVISION")
67 .ok()
68 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
69 .unwrap_or(false),
70 }
71}
72
73pub fn shm_path_for(data_path: &Path) -> PathBuf {
75 let file_name = data_path
76 .file_name()
77 .map(|n| n.to_string_lossy().into_owned())
78 .unwrap_or_else(|| "data.rdb".to_string());
79 let shm_file = format!("{file_name}-shm");
80 match data_path.parent() {
81 Some(parent) if !parent.as_os_str().is_empty() => parent.join(shm_file),
82 _ => PathBuf::from(shm_file),
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ShmProvisionState {
90 Created,
92 AttachedToLiveOwner,
94 RecoveredFromCrash,
96 HealedCorruptHeader,
98}
99
100#[derive(Debug, Clone)]
101pub struct ShmHeader {
102 pub version: u32,
103 pub owner_pid: u32,
104 pub generation: u64,
105 pub reader_count: u64,
106 pub last_heartbeat_ms: u64,
107}
108
109impl ShmHeader {
110 fn encode(&self) -> [u8; SHM_HEADER_SIZE] {
111 let mut buf = [0u8; SHM_HEADER_SIZE];
112 buf[0..8].copy_from_slice(SHM_MAGIC);
113 buf[8..12].copy_from_slice(&self.version.to_le_bytes());
114 buf[12..16].copy_from_slice(&self.owner_pid.to_le_bytes());
115 buf[16..24].copy_from_slice(&self.generation.to_le_bytes());
116 buf[24..32].copy_from_slice(&self.reader_count.to_le_bytes());
117 buf[32..40].copy_from_slice(&self.last_heartbeat_ms.to_le_bytes());
118 let checksum = fold_checksum(&buf[..56]);
119 buf[56..64].copy_from_slice(&checksum.to_le_bytes());
120 buf
121 }
122
123 fn decode(buf: &[u8; SHM_HEADER_SIZE]) -> io::Result<Self> {
124 if &buf[0..8] != SHM_MAGIC {
125 return Err(io::Error::new(io::ErrorKind::InvalidData, "shm magic mismatch"));
126 }
127 let stored_checksum = u64::from_le_bytes(buf[56..64].try_into().unwrap());
128 let computed = fold_checksum(&buf[..56]);
129 if stored_checksum != computed {
130 return Err(io::Error::new(io::ErrorKind::InvalidData, "shm checksum mismatch"));
131 }
132 Ok(Self {
133 version: u32::from_le_bytes(buf[8..12].try_into().unwrap()),
134 owner_pid: u32::from_le_bytes(buf[12..16].try_into().unwrap()),
135 generation: u64::from_le_bytes(buf[16..24].try_into().unwrap()),
136 reader_count: u64::from_le_bytes(buf[24..32].try_into().unwrap()),
137 last_heartbeat_ms: u64::from_le_bytes(buf[32..40].try_into().unwrap()),
138 })
139 }
140}
141
142pub struct ShmHandle {
146 pub path: PathBuf,
147 pub header: ShmHeader,
148 pub state: ShmProvisionState,
149 file: File,
150}
151
152impl ShmHandle {
153 pub fn generation(&self) -> u64 {
156 self.header.generation
157 }
158
159 pub fn attach_reader(&mut self) -> io::Result<u64> {
161 self.header.reader_count = self.header.reader_count.saturating_add(1);
162 self.rewrite_header()?;
163 Ok(self.header.reader_count)
164 }
165
166 pub fn detach_reader(&mut self) -> io::Result<u64> {
168 self.header.reader_count = self.header.reader_count.saturating_sub(1);
169 self.rewrite_header()?;
170 Ok(self.header.reader_count)
171 }
172
173 pub fn heartbeat(&mut self) -> io::Result<()> {
175 self.header.last_heartbeat_ms = unix_ms_now();
176 self.rewrite_header()
177 }
178
179 fn rewrite_header(&mut self) -> io::Result<()> {
180 let buf = self.header.encode();
181 self.file.seek(SeekFrom::Start(0))?;
182 self.file.write_all(&buf)?;
183 self.file.sync_data()?;
184 Ok(())
185 }
186}
187
188pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
191 let path = shm_path_for(data_path);
192 if let Some(parent) = path.parent() {
193 if !parent.as_os_str().is_empty() {
194 std::fs::create_dir_all(parent)?;
195 }
196 }
197
198 let mut file = OpenOptions::new()
199 .read(true)
200 .write(true)
201 .create(true)
202 .truncate(false)
203 .open(&path)?;
204
205 let metadata = file.metadata()?;
206 let fresh = metadata.len() == 0;
207
208 if fresh {
209 file.set_len(SHM_FILE_SIZE)?;
210 let header = ShmHeader {
211 version: SHM_VERSION,
212 owner_pid: current_pid(),
213 generation: 1,
214 reader_count: 0,
215 last_heartbeat_ms: unix_ms_now(),
216 };
217 file.seek(SeekFrom::Start(0))?;
218 file.write_all(&header.encode())?;
219 file.sync_data()?;
220 return Ok(ShmHandle {
221 path,
222 header,
223 state: ShmProvisionState::Created,
224 file,
225 });
226 }
227
228 let mut buf = [0u8; SHM_HEADER_SIZE];
229 file.seek(SeekFrom::Start(0))?;
230 let existing = match file.read_exact(&mut buf) {
231 Ok(()) => ShmHeader::decode(&buf).ok(),
232 Err(_) => None,
233 };
234
235 let (header, state) = match existing {
236 Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
237 let next = ShmHeader {
239 version: SHM_VERSION,
240 owner_pid: prev.owner_pid,
241 generation: prev.generation,
242 reader_count: prev.reader_count.saturating_add(1),
243 last_heartbeat_ms: prev.last_heartbeat_ms,
244 };
245 (next, ShmProvisionState::AttachedToLiveOwner)
246 }
247 Some(prev) if prev.owner_pid == current_pid() => {
248 let next = ShmHeader {
250 version: SHM_VERSION,
251 owner_pid: prev.owner_pid,
252 generation: prev.generation,
253 reader_count: prev.reader_count,
254 last_heartbeat_ms: unix_ms_now(),
255 };
256 (next, ShmProvisionState::AttachedToLiveOwner)
257 }
258 Some(prev) => {
259 let next = ShmHeader {
261 version: SHM_VERSION,
262 owner_pid: current_pid(),
263 generation: prev.generation.saturating_add(1),
264 reader_count: 0,
265 last_heartbeat_ms: unix_ms_now(),
266 };
267 (next, ShmProvisionState::RecoveredFromCrash)
268 }
269 None => {
270 let next = ShmHeader {
272 version: SHM_VERSION,
273 owner_pid: current_pid(),
274 generation: 1,
275 reader_count: 0,
276 last_heartbeat_ms: unix_ms_now(),
277 };
278 file.set_len(SHM_FILE_SIZE)?;
279 (next, ShmProvisionState::HealedCorruptHeader)
280 }
281 };
282
283 file.seek(SeekFrom::Start(0))?;
284 file.write_all(&header.encode())?;
285 file.sync_data()?;
286
287 Ok(ShmHandle {
288 path,
289 header,
290 state,
291 file,
292 })
293}
294
295pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
299 let path = shm_path_for(data_path);
300 if !path.exists() {
301 return Ok(None);
302 }
303 let mut file = OpenOptions::new().read(true).open(&path)?;
304 let mut buf = [0u8; SHM_HEADER_SIZE];
305 file.read_exact(&mut buf)?;
306 ShmHeader::decode(&buf).map(Some)
307}
308
309fn fold_checksum(bytes: &[u8]) -> u64 {
310 let mut acc: u64 = 0xcbf29ce484222325;
311 for &byte in bytes {
312 acc ^= byte as u64;
313 acc = acc.wrapping_mul(0x100000001b3);
314 }
315 acc
316}
317
318fn unix_ms_now() -> u64 {
319 SystemTime::now()
320 .duration_since(UNIX_EPOCH)
321 .map(|d| d.as_millis() as u64)
322 .unwrap_or(0)
323}
324
325fn current_pid() -> u32 {
326 std::process::id()
327}
328
329#[cfg(unix)]
330fn pid_alive(pid: u32) -> bool {
331 if pid == 0 {
332 return false;
333 }
334 let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
337 if rc == 0 {
338 return true;
339 }
340 io::Error::last_os_error()
341 .raw_os_error()
342 .map(|e| e == libc::EPERM)
343 .unwrap_or(false)
344}
345
346#[cfg(not(unix))]
347fn pid_alive(_pid: u32) -> bool {
348 true
351}