disruptor_mp/backend/mmap/
cursor.rs1use crate::{
8 shared_memory_layout::{
9 required_layout_size, validate_layout_bytes, write_layout_bytes, SegmentKind,
10 },
11 MmapCursorConfig, MultiProcessError, MultiProcessResult,
12};
13use memmap2::{MmapMut, MmapOptions};
14use std::fs::OpenOptions;
15use std::mem::align_of;
16use std::path::Path;
17use std::ptr::NonNull;
18use std::sync::atomic::{AtomicI64, Ordering};
19
20const CACHE_LINE_SIZE: usize = 64;
22
23#[repr(align(64))]
25struct PaddedAtomicI64 {
26 atomic: AtomicI64,
27 _padding: [u8; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
28}
29
30impl PaddedAtomicI64 {
31 fn new(value: i64) -> Self {
32 Self {
33 atomic: AtomicI64::new(value),
34 _padding: [0; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
35 }
36 }
37}
38
39pub struct MmapCursor {
41 _mmap: MmapMut,
42 cursor_ptr: NonNull<PaddedAtomicI64>,
43 path: std::path::PathBuf,
44 is_owner: bool,
45}
46
47unsafe impl Send for MmapCursor {}
48unsafe impl Sync for MmapCursor {}
49
50impl Clone for MmapCursor {
51 fn clone(&self) -> Self {
52 Self::attach(MmapCursorConfig {
53 path: self.path.clone(),
54 create: false,
55 })
56 .expect("mmap cursor clone must attach to existing backing file")
57 }
58}
59
60impl Drop for MmapCursor {
61 fn drop(&mut self) {
62 if self.is_owner {
63 }
66 }
67}
68
69impl MmapCursor {
70 pub fn new(config: MmapCursorConfig, initial_value: i64) -> MultiProcessResult<Self> {
72 assert!(
73 !config.path.as_os_str().is_empty(),
74 "mmap cursor path must not be empty"
75 );
76
77 ensure_parent_dir(&config.path)?;
78
79 let payload_size = std::mem::size_of::<PaddedAtomicI64>();
80 let payload_alignment = align_of::<PaddedAtomicI64>();
81 let required_size = required_layout_size(payload_size, payload_alignment)?;
82
83 let file = if config.create {
84 let file = OpenOptions::new()
85 .create(true)
86 .truncate(true)
87 .read(true)
88 .write(true)
89 .open(&config.path)
90 .map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))?;
91 file.set_len(required_size as u64)
92 .map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))?;
93 file
94 } else {
95 OpenOptions::new()
96 .read(true)
97 .write(true)
98 .open(&config.path)
99 .map_err(|error| MultiProcessError::SegmentNotFound(error.to_string()))?
100 };
101
102 let mut mmap = unsafe {
103 MmapOptions::new()
104 .map_mut(&file)
105 .map_err(|error| MultiProcessError::MemoryMapError(error.to_string()))?
106 };
107
108 let payload_offset = if config.create {
109 write_layout_bytes(
110 mmap.as_mut_ptr(),
111 mmap.len(),
112 payload_size,
113 payload_size,
114 1,
115 payload_alignment,
116 SegmentKind::Cursor,
117 )?
118 .payload_offset
119 } else {
120 validate_layout_bytes(
121 mmap.as_ptr(),
122 mmap.len(),
123 payload_size,
124 payload_size,
125 1,
126 payload_alignment,
127 SegmentKind::Cursor,
128 )?
129 .payload_offset
130 };
131
132 let ptr = unsafe { mmap.as_mut_ptr().add(payload_offset) as *mut PaddedAtomicI64 };
133 let cursor_ptr = NonNull::new(ptr).ok_or_else(|| {
134 MultiProcessError::MemoryMapError("Null mmap cursor pointer".to_string())
135 })?;
136
137 if config.create {
138 unsafe {
139 std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
140 }
141 }
142
143 Ok(Self {
144 _mmap: mmap,
145 cursor_ptr,
146 path: config.path,
147 is_owner: config.create,
148 })
149 }
150
151 pub fn attach(config: MmapCursorConfig) -> MultiProcessResult<Self> {
153 assert!(!config.create, "MmapCursor::attach requires create = false");
154 Self::new(config, 0)
155 }
156
157 pub fn new_or_attach(
163 mut config: MmapCursorConfig,
164 initial_value: i64,
165 ) -> MultiProcessResult<Self> {
166 if config.path.exists() {
167 config.create = false;
168 return Self::attach(config);
169 }
170 Self::new(config, initial_value)
171 }
172
173 pub fn is_owner(&self) -> bool {
175 self.is_owner
176 }
177
178 pub fn load(&self, ordering: Ordering) -> i64 {
180 unsafe { self.cursor_ptr.as_ref().atomic.load(ordering) }
181 }
182
183 pub fn store(&self, value: i64, ordering: Ordering) {
185 unsafe { self.cursor_ptr.as_ref().atomic.store(value, ordering) }
186 }
187
188 pub fn compare_exchange(
190 &self,
191 current: i64,
192 new: i64,
193 success: Ordering,
194 failure: Ordering,
195 ) -> Result<i64, i64> {
196 unsafe {
197 self.cursor_ptr
198 .as_ref()
199 .atomic
200 .compare_exchange(current, new, success, failure)
201 }
202 }
203
204 pub fn fetch_add(&self, value: i64, ordering: Ordering) -> i64 {
206 unsafe { self.cursor_ptr.as_ref().atomic.fetch_add(value, ordering) }
207 }
208
209 pub fn swap(&self, value: i64, ordering: Ordering) -> i64 {
211 unsafe { self.cursor_ptr.as_ref().atomic.swap(value, ordering) }
212 }
213
214 pub fn path(&self) -> &Path {
216 &self.path
217 }
218}
219
220fn ensure_parent_dir(path: &Path) -> MultiProcessResult<()> {
221 let Some(parent) = path.parent() else {
222 return Ok(());
223 };
224 if parent.as_os_str().is_empty() {
225 return Ok(());
226 }
227
228 std::fs::create_dir_all(parent)
229 .map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use std::path::Path;
236 use std::time::{SystemTime, UNIX_EPOCH};
237
238 fn unique_test_path(prefix: &str) -> std::path::PathBuf {
239 let pid = std::process::id();
240 let nanos = SystemTime::now()
241 .duration_since(UNIX_EPOCH)
242 .expect("system time should be valid")
243 .as_nanos();
244 std::env::temp_dir().join(format!("{prefix}_{pid}_{nanos}.cursor"))
245 }
246
247 fn truncate_file(path: &Path, len: u64) {
248 let file = OpenOptions::new()
249 .write(true)
250 .open(path)
251 .expect("test file should be reopenable for truncation");
252 file.set_len(len)
253 .expect("test file truncation should succeed");
254 }
255
256 #[test]
257 fn create_and_attach_share_state() {
258 let path = unique_test_path("mmap_cursor");
259 let config_create = MmapCursorConfig {
260 path: path.clone(),
261 create: true,
262 };
263 let config_attach = MmapCursorConfig {
264 path: path.clone(),
265 create: false,
266 };
267
268 {
269 let owner = MmapCursor::new(config_create, 7).unwrap();
270 let attached = MmapCursor::attach(config_attach).unwrap();
271
272 assert_eq!(attached.load(Ordering::Acquire), 7);
273 owner.store(19, Ordering::Release);
274 assert_eq!(attached.load(Ordering::Acquire), 19);
275 assert_eq!(attached.fetch_add(2, Ordering::AcqRel), 19);
276 assert_eq!(owner.load(Ordering::Acquire), 21);
277 }
278
279 let _ = std::fs::remove_file(path);
280 }
281
282 #[test]
283 fn compare_exchange_round_trip() {
284 let path = unique_test_path("mmap_cursor_cas");
285 let config = MmapCursorConfig {
286 path: path.clone(),
287 create: true,
288 };
289
290 let cursor = MmapCursor::new(config, 3).unwrap();
291 assert_eq!(
292 cursor.compare_exchange(3, 8, Ordering::AcqRel, Ordering::Acquire),
293 Ok(3)
294 );
295 assert_eq!(cursor.load(Ordering::Acquire), 8);
296 assert_eq!(
297 cursor.compare_exchange(3, 9, Ordering::AcqRel, Ordering::Acquire),
298 Err(8)
299 );
300
301 drop(cursor);
302 let _ = std::fs::remove_file(path);
303 }
304
305 #[test]
306 fn attach_rejects_truncated_layout_header() {
307 let path = unique_test_path("mmap_cursor_truncated_header");
308 let config_create = MmapCursorConfig {
309 path: path.clone(),
310 create: true,
311 };
312 let config_attach = MmapCursorConfig {
313 path: path.clone(),
314 create: false,
315 };
316
317 {
318 let owner = MmapCursor::new(config_create, 11).unwrap();
319 drop(owner);
320 }
321
322 truncate_file(&path, 8);
323 let error = match MmapCursor::attach(config_attach) {
324 Ok(_) => panic!("expected truncated header attach to fail"),
325 Err(error) => error,
326 };
327 assert!(matches!(
328 error,
329 MultiProcessError::IncompatibleLayout(message)
330 if message.contains("layout header")
331 ));
332
333 let _ = std::fs::remove_file(path);
334 }
335
336 #[test]
337 fn attach_rejects_truncated_payload_region() {
338 let path = unique_test_path("mmap_cursor_truncated_payload");
339 let config_create = MmapCursorConfig {
340 path: path.clone(),
341 create: true,
342 };
343 let config_attach = MmapCursorConfig {
344 path: path.clone(),
345 create: false,
346 };
347
348 {
349 let owner = MmapCursor::new(config_create, 17).unwrap();
350 drop(owner);
351 }
352
353 let file_len = std::fs::metadata(&path)
354 .expect("cursor file metadata should exist")
355 .len();
356 truncate_file(&path, file_len - 1);
357 let error = match MmapCursor::attach(config_attach) {
358 Ok(_) => panic!("expected truncated payload attach to fail"),
359 Err(error) => error,
360 };
361 assert!(matches!(
362 error,
363 MultiProcessError::IncompatibleLayout(message)
364 if message.contains("shared segment too small for layout")
365 ));
366
367 let _ = std::fs::remove_file(path);
368 }
369}