Skip to main content

disruptor_mp/backend/mmap/
cursor.rs

1//! File-backed mmap cursor for multi-process coordination.
2//!
3//! This mirrors [`crate::SharedCursor`] but uses a file-backed mapping so the
4//! mmap backend can share the same atomic coordination model as the shared
5//! memory backend.
6
7use crate::{
8    shared_memory_layout::{
9        required_layout_size, validate_layout_bytes, write_layout_bytes, SegmentKind,
10    },
11    MmapCursorConfig, MultiProcessError, MultiProcessResult,
12};
13use memmap2::{MmapMut, MmapOptions};
14use std::fs::OpenOptions;
15use std::mem::align_of;
16use std::path::Path;
17use std::ptr::NonNull;
18use std::sync::atomic::{AtomicI64, Ordering};
19
20/// Cache line size for padding (64 bytes on most modern CPUs).
21const CACHE_LINE_SIZE: usize = 64;
22
23/// Padded atomic to prevent false sharing.
24#[repr(align(64))]
25struct PaddedAtomicI64 {
26    atomic: AtomicI64,
27    _padding: [u8; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
28}
29
30impl PaddedAtomicI64 {
31    fn new(value: i64) -> Self {
32        Self {
33            atomic: AtomicI64::new(value),
34            _padding: [0; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
35        }
36    }
37}
38
39/// File-backed mmap cursor with cache-line padding.
40pub struct MmapCursor {
41    _mmap: MmapMut,
42    cursor_ptr: NonNull<PaddedAtomicI64>,
43    path: std::path::PathBuf,
44    is_owner: bool,
45}
46
47unsafe impl Send for MmapCursor {}
48unsafe impl Sync for MmapCursor {}
49
50impl Clone for MmapCursor {
51    fn clone(&self) -> Self {
52        Self::attach(MmapCursorConfig {
53            path: self.path.clone(),
54            create: false,
55        })
56        .expect("mmap cursor clone must attach to existing backing file")
57    }
58}
59
60impl Drop for MmapCursor {
61    fn drop(&mut self) {
62        if self.is_owner {
63            // File lifecycle remains explicit while the mmap backend is still
64            // stabilizing and attachers may outlive the creator.
65        }
66    }
67}
68
69impl MmapCursor {
70    /// Create or attach a file-backed mmap cursor.
71    pub fn new(config: MmapCursorConfig, initial_value: i64) -> MultiProcessResult<Self> {
72        assert!(
73            !config.path.as_os_str().is_empty(),
74            "mmap cursor path must not be empty"
75        );
76
77        ensure_parent_dir(&config.path)?;
78
79        let payload_size = std::mem::size_of::<PaddedAtomicI64>();
80        let payload_alignment = align_of::<PaddedAtomicI64>();
81        let required_size = required_layout_size(payload_size, payload_alignment)?;
82
83        let file = if config.create {
84            let file = OpenOptions::new()
85                .create(true)
86                .truncate(true)
87                .read(true)
88                .write(true)
89                .open(&config.path)
90                .map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))?;
91            file.set_len(required_size as u64)
92                .map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))?;
93            file
94        } else {
95            OpenOptions::new()
96                .read(true)
97                .write(true)
98                .open(&config.path)
99                .map_err(|error| MultiProcessError::SegmentNotFound(error.to_string()))?
100        };
101
102        let mut mmap = unsafe {
103            MmapOptions::new()
104                .map_mut(&file)
105                .map_err(|error| MultiProcessError::MemoryMapError(error.to_string()))?
106        };
107
108        let payload_offset = if config.create {
109            write_layout_bytes(
110                mmap.as_mut_ptr(),
111                mmap.len(),
112                payload_size,
113                payload_size,
114                1,
115                payload_alignment,
116                SegmentKind::Cursor,
117            )?
118            .payload_offset
119        } else {
120            validate_layout_bytes(
121                mmap.as_ptr(),
122                mmap.len(),
123                payload_size,
124                payload_size,
125                1,
126                payload_alignment,
127                SegmentKind::Cursor,
128            )?
129            .payload_offset
130        };
131
132        let ptr = unsafe { mmap.as_mut_ptr().add(payload_offset) as *mut PaddedAtomicI64 };
133        let cursor_ptr = NonNull::new(ptr).ok_or_else(|| {
134            MultiProcessError::MemoryMapError("Null mmap cursor pointer".to_string())
135        })?;
136
137        if config.create {
138            unsafe {
139                std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
140            }
141        }
142
143        Ok(Self {
144            _mmap: mmap,
145            cursor_ptr,
146            path: config.path,
147            is_owner: config.create,
148        })
149    }
150
151    /// Attach to an existing file-backed mmap cursor.
152    pub fn attach(config: MmapCursorConfig) -> MultiProcessResult<Self> {
153        assert!(!config.create, "MmapCursor::attach requires create = false");
154        Self::new(config, 0)
155    }
156
157    /// Create a new cursor if it does not exist, or attach to the existing one.
158    ///
159    /// This is the mmap equivalent of `SharedCursor::new_or_attach` and is required
160    /// for restart paths where a logical consumer must reattach to its existing
161    /// sequence cursor instead of truncating it back to the initial value.
162    pub fn new_or_attach(
163        mut config: MmapCursorConfig,
164        initial_value: i64,
165    ) -> MultiProcessResult<Self> {
166        if config.path.exists() {
167            config.create = false;
168            return Self::attach(config);
169        }
170        Self::new(config, initial_value)
171    }
172
173    /// Return whether this mapping created the backing file.
174    pub fn is_owner(&self) -> bool {
175        self.is_owner
176    }
177
178    /// Load the current value.
179    pub fn load(&self, ordering: Ordering) -> i64 {
180        unsafe { self.cursor_ptr.as_ref().atomic.load(ordering) }
181    }
182
183    /// Store a new value.
184    pub fn store(&self, value: i64, ordering: Ordering) {
185        unsafe { self.cursor_ptr.as_ref().atomic.store(value, ordering) }
186    }
187
188    /// Compare and swap.
189    pub fn compare_exchange(
190        &self,
191        current: i64,
192        new: i64,
193        success: Ordering,
194        failure: Ordering,
195    ) -> Result<i64, i64> {
196        unsafe {
197            self.cursor_ptr
198                .as_ref()
199                .atomic
200                .compare_exchange(current, new, success, failure)
201        }
202    }
203
204    /// Fetch and add.
205    pub fn fetch_add(&self, value: i64, ordering: Ordering) -> i64 {
206        unsafe { self.cursor_ptr.as_ref().atomic.fetch_add(value, ordering) }
207    }
208
209    /// Atomic swap.
210    pub fn swap(&self, value: i64, ordering: Ordering) -> i64 {
211        unsafe { self.cursor_ptr.as_ref().atomic.swap(value, ordering) }
212    }
213
214    /// Return the backing path for this mmap cursor.
215    pub fn path(&self) -> &Path {
216        &self.path
217    }
218}
219
220fn ensure_parent_dir(path: &Path) -> MultiProcessResult<()> {
221    let Some(parent) = path.parent() else {
222        return Ok(());
223    };
224    if parent.as_os_str().is_empty() {
225        return Ok(());
226    }
227
228    std::fs::create_dir_all(parent)
229        .map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use std::path::Path;
236    use std::time::{SystemTime, UNIX_EPOCH};
237
238    fn unique_test_path(prefix: &str) -> std::path::PathBuf {
239        let pid = std::process::id();
240        let nanos = SystemTime::now()
241            .duration_since(UNIX_EPOCH)
242            .expect("system time should be valid")
243            .as_nanos();
244        std::env::temp_dir().join(format!("{prefix}_{pid}_{nanos}.cursor"))
245    }
246
247    fn truncate_file(path: &Path, len: u64) {
248        let file = OpenOptions::new()
249            .write(true)
250            .open(path)
251            .expect("test file should be reopenable for truncation");
252        file.set_len(len)
253            .expect("test file truncation should succeed");
254    }
255
256    #[test]
257    fn create_and_attach_share_state() {
258        let path = unique_test_path("mmap_cursor");
259        let config_create = MmapCursorConfig {
260            path: path.clone(),
261            create: true,
262        };
263        let config_attach = MmapCursorConfig {
264            path: path.clone(),
265            create: false,
266        };
267
268        {
269            let owner = MmapCursor::new(config_create, 7).unwrap();
270            let attached = MmapCursor::attach(config_attach).unwrap();
271
272            assert_eq!(attached.load(Ordering::Acquire), 7);
273            owner.store(19, Ordering::Release);
274            assert_eq!(attached.load(Ordering::Acquire), 19);
275            assert_eq!(attached.fetch_add(2, Ordering::AcqRel), 19);
276            assert_eq!(owner.load(Ordering::Acquire), 21);
277        }
278
279        let _ = std::fs::remove_file(path);
280    }
281
282    #[test]
283    fn compare_exchange_round_trip() {
284        let path = unique_test_path("mmap_cursor_cas");
285        let config = MmapCursorConfig {
286            path: path.clone(),
287            create: true,
288        };
289
290        let cursor = MmapCursor::new(config, 3).unwrap();
291        assert_eq!(
292            cursor.compare_exchange(3, 8, Ordering::AcqRel, Ordering::Acquire),
293            Ok(3)
294        );
295        assert_eq!(cursor.load(Ordering::Acquire), 8);
296        assert_eq!(
297            cursor.compare_exchange(3, 9, Ordering::AcqRel, Ordering::Acquire),
298            Err(8)
299        );
300
301        drop(cursor);
302        let _ = std::fs::remove_file(path);
303    }
304
305    #[test]
306    fn attach_rejects_truncated_layout_header() {
307        let path = unique_test_path("mmap_cursor_truncated_header");
308        let config_create = MmapCursorConfig {
309            path: path.clone(),
310            create: true,
311        };
312        let config_attach = MmapCursorConfig {
313            path: path.clone(),
314            create: false,
315        };
316
317        {
318            let owner = MmapCursor::new(config_create, 11).unwrap();
319            drop(owner);
320        }
321
322        truncate_file(&path, 8);
323        let error = match MmapCursor::attach(config_attach) {
324            Ok(_) => panic!("expected truncated header attach to fail"),
325            Err(error) => error,
326        };
327        assert!(matches!(
328            error,
329            MultiProcessError::IncompatibleLayout(message)
330                if message.contains("layout header")
331        ));
332
333        let _ = std::fs::remove_file(path);
334    }
335
336    #[test]
337    fn attach_rejects_truncated_payload_region() {
338        let path = unique_test_path("mmap_cursor_truncated_payload");
339        let config_create = MmapCursorConfig {
340            path: path.clone(),
341            create: true,
342        };
343        let config_attach = MmapCursorConfig {
344            path: path.clone(),
345            create: false,
346        };
347
348        {
349            let owner = MmapCursor::new(config_create, 17).unwrap();
350            drop(owner);
351        }
352
353        let file_len = std::fs::metadata(&path)
354            .expect("cursor file metadata should exist")
355            .len();
356        truncate_file(&path, file_len - 1);
357        let error = match MmapCursor::attach(config_attach) {
358            Ok(_) => panic!("expected truncated payload attach to fail"),
359            Err(error) => error,
360        };
361        assert!(matches!(
362            error,
363            MultiProcessError::IncompatibleLayout(message)
364                if message.contains("shared segment too small for layout")
365        ));
366
367        let _ = std::fs::remove_file(path);
368    }
369}