reddb_server/physical/
shm.rs1use std::fs::{File, OpenOptions};
26use std::io;
27use std::path::{Path, PathBuf};
28use std::sync::atomic::{AtomicU8, Ordering};
29use std::time::{SystemTime, UNIX_EPOCH};
30
31pub use reddb_file::{ShmHeader, SHM_FILE_SIZE, SHM_HEADER_SIZE, SHM_MAGIC, SHM_VERSION};
32
33static SHM_POLICY: AtomicU8 = AtomicU8::new(0);
34
35pub fn set_shm_provisioning_enabled(enabled: bool) {
40 SHM_POLICY.store(if enabled { 1 } else { 2 }, Ordering::Relaxed);
41}
42
43pub fn shm_provisioning_enabled() -> bool {
45 match SHM_POLICY.load(Ordering::Relaxed) {
46 1 => true,
47 2 => false,
48 _ => std::env::var("REDDB_SHM_PROVISION")
49 .ok()
50 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
51 .unwrap_or(false),
52 }
53}
54
55pub fn shm_path_for(data_path: &Path) -> PathBuf {
57 reddb_file::layout::shm_path(data_path)
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ShmProvisionState {
64 Created,
66 AttachedToLiveOwner,
68 RecoveredFromCrash,
70 HealedCorruptHeader,
72}
73
74pub struct ShmHandle {
78 pub path: PathBuf,
79 pub header: ShmHeader,
80 pub state: ShmProvisionState,
81 file: File,
82}
83
84impl ShmHandle {
85 pub fn generation(&self) -> u64 {
88 self.header.generation
89 }
90
91 pub fn attach_reader(&mut self) -> io::Result<u64> {
93 self.header.reader_count = self.header.reader_count.saturating_add(1);
94 self.rewrite_header()?;
95 Ok(self.header.reader_count)
96 }
97
98 pub fn detach_reader(&mut self) -> io::Result<u64> {
100 self.header.reader_count = self.header.reader_count.saturating_sub(1);
101 self.rewrite_header()?;
102 Ok(self.header.reader_count)
103 }
104
105 pub fn heartbeat(&mut self) -> io::Result<()> {
107 self.header.last_heartbeat_ms = unix_ms_now();
108 self.rewrite_header()
109 }
110
111 fn rewrite_header(&mut self) -> io::Result<()> {
112 reddb_file::write_shm_header_to_file(&mut self.file, &self.header)
113 }
114}
115
116pub fn provision_shm(data_path: &Path) -> io::Result<ShmHandle> {
119 let path = shm_path_for(data_path);
120 if let Some(parent) = path.parent() {
121 if !parent.as_os_str().is_empty() {
122 std::fs::create_dir_all(parent)?;
123 }
124 }
125
126 let mut file = OpenOptions::new()
127 .read(true)
128 .write(true)
129 .create(true)
130 .truncate(false)
131 .open(&path)?;
132
133 let metadata = file.metadata()?;
134 let fresh = metadata.len() == 0;
135
136 if fresh {
137 let header = ShmHeader::new(current_pid(), 1, 0, unix_ms_now());
138 reddb_file::initialize_shm_file(&mut file, &header)?;
139 return Ok(ShmHandle {
140 path,
141 header,
142 state: ShmProvisionState::Created,
143 file,
144 });
145 }
146
147 let existing = reddb_file::read_shm_header_from_file(&mut file).ok();
148
149 let (header, state) = match existing {
150 Some(prev) if pid_alive(prev.owner_pid) && prev.owner_pid != current_pid() => {
151 let next = ShmHeader::new(
153 prev.owner_pid,
154 prev.generation,
155 prev.reader_count.saturating_add(1),
156 prev.last_heartbeat_ms,
157 );
158 (next, ShmProvisionState::AttachedToLiveOwner)
159 }
160 Some(prev) if prev.owner_pid == current_pid() => {
161 let next = ShmHeader::new(
163 prev.owner_pid,
164 prev.generation,
165 prev.reader_count,
166 unix_ms_now(),
167 );
168 (next, ShmProvisionState::AttachedToLiveOwner)
169 }
170 Some(prev) => {
171 let next = ShmHeader::new(
173 current_pid(),
174 prev.generation.saturating_add(1),
175 0,
176 unix_ms_now(),
177 );
178 (next, ShmProvisionState::RecoveredFromCrash)
179 }
180 None => {
181 let next = ShmHeader::new(current_pid(), 1, 0, unix_ms_now());
183 reddb_file::initialize_shm_file(&mut file, &next)?;
184 (next, ShmProvisionState::HealedCorruptHeader)
185 }
186 };
187
188 reddb_file::write_shm_header_to_file(&mut file, &header)?;
189
190 Ok(ShmHandle {
191 path,
192 header,
193 state,
194 file,
195 })
196}
197
198pub fn read_shm_header(data_path: &Path) -> io::Result<Option<ShmHeader>> {
202 let path = shm_path_for(data_path);
203 if !path.exists() {
204 return Ok(None);
205 }
206 let mut file = OpenOptions::new().read(true).open(&path)?;
207 reddb_file::read_shm_header_from_file(&mut file).map(Some)
208}
209
210fn unix_ms_now() -> u64 {
211 SystemTime::now()
212 .duration_since(UNIX_EPOCH)
213 .map(|d| d.as_millis() as u64)
214 .unwrap_or(0)
215}
216
217fn current_pid() -> u32 {
218 std::process::id()
219}
220
221#[cfg(unix)]
222fn pid_alive(pid: u32) -> bool {
223 if pid == 0 {
224 return false;
225 }
226 let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
229 if rc == 0 {
230 return true;
231 }
232 io::Error::last_os_error()
233 .raw_os_error()
234 .map(|e| e == libc::EPERM)
235 .unwrap_or(false)
236}
237
238#[cfg(not(unix))]
239fn pid_alive(_pid: u32) -> bool {
240 true
243}