Skip to main content

disruptor_mp/lock_free/
cursor.rs

1//! Process-safe cursor operations using shared memory.
2//!
3//! This module provides a [`SharedCursor`] that implements the disruptor pattern's
4//! cursor concept for multi-process scenarios using shared memory. Cursors track
5//! sequence positions and coordinate between producers and consumers across processes
6//! with cache-line padding to prevent false sharing.
7
8use crate::{
9    shared_memory_layout::{required_layout_size, validate_layout, write_layout, SegmentKind},
10    MultiProcessError, MultiProcessResult,
11};
12use shared_memory::{Shmem, ShmemConf};
13use std::mem::align_of;
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicI64, Ordering};
16
17/// Cache line size for padding (64 bytes on most modern CPUs)
18const CACHE_LINE_SIZE: usize = 64;
19
20/// Padded atomic to prevent false sharing
21#[repr(align(64))]
22struct PaddedAtomicI64 {
23    atomic: AtomicI64,
24    _padding: [u8; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
25}
26
27impl PaddedAtomicI64 {
28    fn new(value: i64) -> Self {
29        Self {
30            atomic: AtomicI64::new(value),
31            _padding: [0; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
32        }
33    }
34}
35
36/// A shared cursor that can be accessed across processes
37///
38/// This implements the disruptor pattern's cursor concept for multi-process scenarios.
39/// Each cursor tracks a sequence position and can be safely shared between processes
40/// using shared memory with cache-line padding to prevent false sharing.
41pub struct SharedCursor {
42    _shmem: Shmem,
43    cursor_ptr: NonNull<PaddedAtomicI64>,
44    is_owner: bool,
45}
46
47unsafe impl Send for SharedCursor {}
48unsafe impl Sync for SharedCursor {}
49
50impl Clone for SharedCursor {
51    fn clone(&self) -> Self {
52        // Clone by creating a new mapping to the same shared memory
53        // Get the OS ID from the existing shared memory
54        let os_id = self._shmem.get_os_id();
55
56        // Attach to the same shared memory segment
57        // Cloned instances are not owners
58        Self::attach(os_id)
59            .expect("Failed to clone SharedCursor - shared memory segment should exist")
60    }
61}
62
63impl Drop for SharedCursor {
64    fn drop(&mut self) {
65        if self.is_owner {
66            // Cleanup ownership is centralized in shared_memory::Shmem drop.
67            // Avoid explicit unlink here to prevent double-unlink races and
68            // cross-platform backend mismatches.
69        }
70    }
71}
72
73impl SharedCursor {
74    fn ensure_name(name: &str) {
75        assert!(!name.is_empty(), "shared cursor name must not be empty");
76    }
77
78    #[cfg(unix)]
79    fn unlink_shared_segment(name: &str) {
80        use std::ffi::CString;
81        if let Ok(c_str) = CString::new(name) {
82            // Best-effort explicit unlink for crash-recovery workflows.
83            // Ignore errors because segment may not exist.
84            unsafe {
85                libc::shm_unlink(c_str.as_ptr());
86            }
87        }
88    }
89
90    #[cfg(not(unix))]
91    fn unlink_shared_segment(_name: &str) {
92        // Best-effort no-op on non-Unix platforms.
93    }
94
95    /// Create a new shared cursor with automatic naming
96    pub fn new_auto(initial_value: i64) -> MultiProcessResult<(Self, String)> {
97        let payload_size = std::mem::size_of::<PaddedAtomicI64>();
98        let payload_alignment = align_of::<PaddedAtomicI64>();
99        // Let shared_memory crate generate the name automatically
100        let shmem = ShmemConf::new()
101            .size(required_layout_size(payload_size, payload_alignment)?)
102            .create() // No .os_id() = automatic naming
103            .map_err(|e| MultiProcessError::SharedMemoryError(e.to_string()))?;
104
105        let generated_name = shmem.get_os_id().to_string();
106
107        let contract = write_layout(
108            &shmem,
109            payload_size,
110            payload_size,
111            1,
112            payload_alignment,
113            SegmentKind::Cursor,
114        )?;
115        // Map the shared memory
116        let ptr = unsafe {
117            shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
118        };
119        let cursor_ptr = NonNull::new(ptr)
120            .ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
121
122        // Initialize the padded atomic value
123        unsafe {
124            std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
125        }
126
127        let cursor = Self {
128            _shmem: shmem,
129            cursor_ptr,
130            is_owner: true, // Creator is the owner
131        };
132
133        Ok((cursor, generated_name))
134    }
135
136    /// Create a new shared cursor in a new shared memory segment (legacy method)
137    pub fn new(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
138        Self::ensure_name(name);
139        // Do not unlink preemptively: that can replace a live segment and break
140        // existing attachers. Callers should use unique names or explicit cleanup.
141        let shmem = ShmemConf::new()
142            .size(required_layout_size(
143                std::mem::size_of::<PaddedAtomicI64>(),
144                align_of::<PaddedAtomicI64>(),
145            )?)
146            .os_id(name)
147            .create()
148            .map_err(|e| MultiProcessError::SharedMemoryError(e.to_string()))?;
149
150        let contract = write_layout(
151            &shmem,
152            std::mem::size_of::<PaddedAtomicI64>(),
153            std::mem::size_of::<PaddedAtomicI64>(),
154            1,
155            align_of::<PaddedAtomicI64>(),
156            SegmentKind::Cursor,
157        )?;
158        // Map the shared memory
159        let ptr = unsafe {
160            shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
161        };
162        let cursor_ptr = NonNull::new(ptr)
163            .ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
164
165        // Initialize the padded atomic value
166        unsafe {
167            std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
168        }
169
170        Ok(Self {
171            _shmem: shmem,
172            cursor_ptr,
173            is_owner: true, // Creator is the owner
174        })
175    }
176
177    /// Explicitly recreate a shared cursor by unlinking and creating a fresh segment.
178    ///
179    /// This is intended for crash-restart recovery paths when a stale segment may
180    /// remain after an unclean shutdown. This operation is destructive for any
181    /// currently attached process using the same name.
182    pub fn recreate(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
183        Self::ensure_name(name);
184        Self::unlink_shared_segment(name);
185        Self::new(name, initial_value)
186    }
187
188    /// Attach to an existing shared cursor in an existing shared memory segment
189    pub fn attach(name: &str) -> MultiProcessResult<Self> {
190        Self::ensure_name(name);
191        let payload_size = std::mem::size_of::<PaddedAtomicI64>();
192        let payload_alignment = align_of::<PaddedAtomicI64>();
193        let shmem = ShmemConf::new()
194            .os_id(name)
195            .open()
196            .map_err(|e| MultiProcessError::SegmentNotFound(e.to_string()))?;
197
198        let contract = validate_layout(
199            &shmem,
200            payload_size,
201            payload_size,
202            1,
203            payload_alignment,
204            SegmentKind::Cursor,
205        )?;
206        let ptr = unsafe {
207            shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
208        };
209        let cursor_ptr = NonNull::new(ptr)
210            .ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
211
212        Ok(Self {
213            _shmem: shmem,
214            cursor_ptr,
215            is_owner: false, // Attacher is not the owner
216        })
217    }
218
219    /// Create a new shared cursor, or attach to existing one if it already exists
220    pub fn new_or_attach(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
221        // First try to create
222        match Self::new(name, initial_value) {
223            Ok(cursor) => Ok(cursor),
224            Err(MultiProcessError::SharedMemoryError(ref msg))
225                if msg.contains("already exists") || msg.contains("File exists") =>
226            {
227                // If creation fails because segment already exists, try to attach
228                Self::attach(name)
229            }
230            Err(e) => Err(e),
231        }
232    }
233
234    /// Return whether this mapping currently owns unlink responsibility.
235    pub fn is_owner(&self) -> bool {
236        self.is_owner
237    }
238
239    /// Transfer or release ownership of the underlying shared-memory name.
240    ///
241    /// Consumer sequence cursors use this to become persistent restart anchors:
242    /// the first attaching consumer may create the cursor segment, but it must not
243    /// unlink that name when the process exits, or subsequent restarts would reset
244    /// the logical consumer position back to the initial value.
245    pub fn set_owner(&mut self, is_owner: bool) -> bool {
246        let previous = self.is_owner;
247        self._shmem.set_owner(is_owner);
248        self.is_owner = is_owner;
249        previous
250    }
251
252    /// Load the current value
253    pub fn load(&self, ordering: Ordering) -> i64 {
254        // Caller-supplied ordering allows callers in producer/consumer layers
255        // to enforce the required synchronizes-with relation in each hot path.
256        unsafe { self.cursor_ptr.as_ref().atomic.load(ordering) }
257    }
258
259    /// Store a new value
260    pub fn store(&self, value: i64, ordering: Ordering) {
261        // Store ordering is selected by the caller because this type is shared
262        // across producers/consumers with different release/acquire needs.
263        unsafe { self.cursor_ptr.as_ref().atomic.store(value, ordering) }
264    }
265
266    /// Compare and swap
267    pub fn compare_exchange(
268        &self,
269        current: i64,
270        new: i64,
271        success: Ordering,
272        failure: Ordering,
273    ) -> Result<i64, i64> {
274        unsafe {
275            self.cursor_ptr
276                .as_ref()
277                .atomic
278                .compare_exchange(current, new, success, failure)
279        }
280    }
281
282    /// Compare and swap (weak version, may fail spuriously)
283    pub fn compare_exchange_weak(
284        &self,
285        current: i64,
286        new: i64,
287        success: Ordering,
288        failure: Ordering,
289    ) -> Result<i64, i64> {
290        unsafe {
291            self.cursor_ptr
292                .as_ref()
293                .atomic
294                .compare_exchange_weak(current, new, success, failure)
295        }
296    }
297
298    /// Fetch and add
299    pub fn fetch_add(&self, val: i64, ordering: Ordering) -> i64 {
300        // Caller controls ordering for backpressure and publication fences
301        // at the ring-buffer protocol level.
302        unsafe { self.cursor_ptr.as_ref().atomic.fetch_add(val, ordering) }
303    }
304
305    /// Atomic exchange
306    pub fn swap(&self, val: i64, ordering: Ordering) -> i64 {
307        unsafe { self.cursor_ptr.as_ref().atomic.swap(val, ordering) }
308    }
309}
310
311/// Generic shared cursor trait
312pub trait SharedCursorTrait<T> {
313    /// Load the current value with the given memory ordering
314    fn load(&self, ordering: Ordering) -> T;
315
316    /// Store a new value with the given memory ordering
317    fn store(&self, value: T, ordering: Ordering);
318
319    /// Compare and exchange operation with success and failure memory orderings
320    fn compare_exchange(
321        &self,
322        current: T,
323        new: T,
324        success: Ordering,
325        failure: Ordering,
326    ) -> Result<T, T>;
327}
328
329impl SharedCursorTrait<i64> for SharedCursor {
330    fn load(&self, ordering: Ordering) -> i64 {
331        self.load(ordering)
332    }
333
334    fn store(&self, value: i64, ordering: Ordering) {
335        self.store(value, ordering)
336    }
337
338    fn compare_exchange(
339        &self,
340        current: i64,
341        new: i64,
342        success: Ordering,
343        failure: Ordering,
344    ) -> Result<i64, i64> {
345        self.compare_exchange(current, new, success, failure)
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352    use std::sync::atomic::Ordering;
353
354    #[test]
355    fn test_shared_cursor_basic_operations() {
356        let cursor = SharedCursor::new("test_cursor", 42).unwrap();
357
358        assert_eq!(cursor.load(Ordering::Relaxed), 42);
359
360        cursor.store(100, Ordering::Relaxed);
361        assert_eq!(cursor.load(Ordering::Relaxed), 100);
362
363        let old = cursor.fetch_add(5, Ordering::Relaxed);
364        assert_eq!(old, 100);
365        assert_eq!(cursor.load(Ordering::Relaxed), 105);
366    }
367
368    #[test]
369    #[should_panic(expected = "shared cursor name must not be empty")]
370    fn test_new_cursor_rejects_empty_name() {
371        let _ = SharedCursor::new("", 0).unwrap();
372    }
373
374    #[test]
375    #[should_panic(expected = "shared cursor name must not be empty")]
376    fn test_attach_cursor_rejects_empty_name() {
377        let _ = SharedCursor::attach("").unwrap();
378    }
379
380    #[test]
381    #[should_panic(expected = "shared cursor name must not be empty")]
382    fn test_recreate_cursor_rejects_empty_name() {
383        let _ = SharedCursor::recreate("", 0).unwrap();
384    }
385}