1use crate::{
9 shared_memory_layout::{required_layout_size, validate_layout, write_layout, SegmentKind},
10 MultiProcessError, MultiProcessResult,
11};
12use shared_memory::{Shmem, ShmemConf};
13use std::mem::align_of;
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicI64, Ordering};
16
17const CACHE_LINE_SIZE: usize = 64;
19
20#[repr(align(64))]
22struct PaddedAtomicI64 {
23 atomic: AtomicI64,
24 _padding: [u8; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
25}
26
27impl PaddedAtomicI64 {
28 fn new(value: i64) -> Self {
29 Self {
30 atomic: AtomicI64::new(value),
31 _padding: [0; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
32 }
33 }
34}
35
36pub struct SharedCursor {
42 _shmem: Shmem,
43 cursor_ptr: NonNull<PaddedAtomicI64>,
44 is_owner: bool,
45}
46
47unsafe impl Send for SharedCursor {}
48unsafe impl Sync for SharedCursor {}
49
50impl Clone for SharedCursor {
51 fn clone(&self) -> Self {
52 let os_id = self._shmem.get_os_id();
55
56 Self::attach(os_id)
59 .expect("Failed to clone SharedCursor - shared memory segment should exist")
60 }
61}
62
63impl Drop for SharedCursor {
64 fn drop(&mut self) {
65 if self.is_owner {
66 }
70 }
71}
72
73impl SharedCursor {
74 fn ensure_name(name: &str) {
75 assert!(!name.is_empty(), "shared cursor name must not be empty");
76 }
77
78 #[cfg(unix)]
79 fn unlink_shared_segment(name: &str) {
80 use std::ffi::CString;
81 if let Ok(c_str) = CString::new(name) {
82 unsafe {
85 libc::shm_unlink(c_str.as_ptr());
86 }
87 }
88 }
89
90 #[cfg(not(unix))]
91 fn unlink_shared_segment(_name: &str) {
92 }
94
95 pub fn new_auto(initial_value: i64) -> MultiProcessResult<(Self, String)> {
97 let payload_size = std::mem::size_of::<PaddedAtomicI64>();
98 let payload_alignment = align_of::<PaddedAtomicI64>();
99 let shmem = ShmemConf::new()
101 .size(required_layout_size(payload_size, payload_alignment)?)
102 .create() .map_err(|e| MultiProcessError::SharedMemoryError(e.to_string()))?;
104
105 let generated_name = shmem.get_os_id().to_string();
106
107 let contract = write_layout(
108 &shmem,
109 payload_size,
110 payload_size,
111 1,
112 payload_alignment,
113 SegmentKind::Cursor,
114 )?;
115 let ptr = unsafe {
117 shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
118 };
119 let cursor_ptr = NonNull::new(ptr)
120 .ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
121
122 unsafe {
124 std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
125 }
126
127 let cursor = Self {
128 _shmem: shmem,
129 cursor_ptr,
130 is_owner: true, };
132
133 Ok((cursor, generated_name))
134 }
135
136 pub fn new(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
138 Self::ensure_name(name);
139 let shmem = ShmemConf::new()
142 .size(required_layout_size(
143 std::mem::size_of::<PaddedAtomicI64>(),
144 align_of::<PaddedAtomicI64>(),
145 )?)
146 .os_id(name)
147 .create()
148 .map_err(|e| MultiProcessError::SharedMemoryError(e.to_string()))?;
149
150 let contract = write_layout(
151 &shmem,
152 std::mem::size_of::<PaddedAtomicI64>(),
153 std::mem::size_of::<PaddedAtomicI64>(),
154 1,
155 align_of::<PaddedAtomicI64>(),
156 SegmentKind::Cursor,
157 )?;
158 let ptr = unsafe {
160 shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
161 };
162 let cursor_ptr = NonNull::new(ptr)
163 .ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
164
165 unsafe {
167 std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
168 }
169
170 Ok(Self {
171 _shmem: shmem,
172 cursor_ptr,
173 is_owner: true, })
175 }
176
177 pub fn recreate(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
183 Self::ensure_name(name);
184 Self::unlink_shared_segment(name);
185 Self::new(name, initial_value)
186 }
187
188 pub fn attach(name: &str) -> MultiProcessResult<Self> {
190 Self::ensure_name(name);
191 let payload_size = std::mem::size_of::<PaddedAtomicI64>();
192 let payload_alignment = align_of::<PaddedAtomicI64>();
193 let shmem = ShmemConf::new()
194 .os_id(name)
195 .open()
196 .map_err(|e| MultiProcessError::SegmentNotFound(e.to_string()))?;
197
198 let contract = validate_layout(
199 &shmem,
200 payload_size,
201 payload_size,
202 1,
203 payload_alignment,
204 SegmentKind::Cursor,
205 )?;
206 let ptr = unsafe {
207 shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
208 };
209 let cursor_ptr = NonNull::new(ptr)
210 .ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
211
212 Ok(Self {
213 _shmem: shmem,
214 cursor_ptr,
215 is_owner: false, })
217 }
218
219 pub fn new_or_attach(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
221 match Self::new(name, initial_value) {
223 Ok(cursor) => Ok(cursor),
224 Err(MultiProcessError::SharedMemoryError(ref msg))
225 if msg.contains("already exists") || msg.contains("File exists") =>
226 {
227 Self::attach(name)
229 }
230 Err(e) => Err(e),
231 }
232 }
233
234 pub fn is_owner(&self) -> bool {
236 self.is_owner
237 }
238
239 pub fn set_owner(&mut self, is_owner: bool) -> bool {
246 let previous = self.is_owner;
247 self._shmem.set_owner(is_owner);
248 self.is_owner = is_owner;
249 previous
250 }
251
252 pub fn load(&self, ordering: Ordering) -> i64 {
254 unsafe { self.cursor_ptr.as_ref().atomic.load(ordering) }
257 }
258
259 pub fn store(&self, value: i64, ordering: Ordering) {
261 unsafe { self.cursor_ptr.as_ref().atomic.store(value, ordering) }
264 }
265
266 pub fn compare_exchange(
268 &self,
269 current: i64,
270 new: i64,
271 success: Ordering,
272 failure: Ordering,
273 ) -> Result<i64, i64> {
274 unsafe {
275 self.cursor_ptr
276 .as_ref()
277 .atomic
278 .compare_exchange(current, new, success, failure)
279 }
280 }
281
282 pub fn compare_exchange_weak(
284 &self,
285 current: i64,
286 new: i64,
287 success: Ordering,
288 failure: Ordering,
289 ) -> Result<i64, i64> {
290 unsafe {
291 self.cursor_ptr
292 .as_ref()
293 .atomic
294 .compare_exchange_weak(current, new, success, failure)
295 }
296 }
297
298 pub fn fetch_add(&self, val: i64, ordering: Ordering) -> i64 {
300 unsafe { self.cursor_ptr.as_ref().atomic.fetch_add(val, ordering) }
303 }
304
305 pub fn swap(&self, val: i64, ordering: Ordering) -> i64 {
307 unsafe { self.cursor_ptr.as_ref().atomic.swap(val, ordering) }
308 }
309}
310
311pub trait SharedCursorTrait<T> {
313 fn load(&self, ordering: Ordering) -> T;
315
316 fn store(&self, value: T, ordering: Ordering);
318
319 fn compare_exchange(
321 &self,
322 current: T,
323 new: T,
324 success: Ordering,
325 failure: Ordering,
326 ) -> Result<T, T>;
327}
328
329impl SharedCursorTrait<i64> for SharedCursor {
330 fn load(&self, ordering: Ordering) -> i64 {
331 self.load(ordering)
332 }
333
334 fn store(&self, value: i64, ordering: Ordering) {
335 self.store(value, ordering)
336 }
337
338 fn compare_exchange(
339 &self,
340 current: i64,
341 new: i64,
342 success: Ordering,
343 failure: Ordering,
344 ) -> Result<i64, i64> {
345 self.compare_exchange(current, new, success, failure)
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352 use std::sync::atomic::Ordering;
353
354 #[test]
355 fn test_shared_cursor_basic_operations() {
356 let cursor = SharedCursor::new("test_cursor", 42).unwrap();
357
358 assert_eq!(cursor.load(Ordering::Relaxed), 42);
359
360 cursor.store(100, Ordering::Relaxed);
361 assert_eq!(cursor.load(Ordering::Relaxed), 100);
362
363 let old = cursor.fetch_add(5, Ordering::Relaxed);
364 assert_eq!(old, 100);
365 assert_eq!(cursor.load(Ordering::Relaxed), 105);
366 }
367
368 #[test]
369 #[should_panic(expected = "shared cursor name must not be empty")]
370 fn test_new_cursor_rejects_empty_name() {
371 let _ = SharedCursor::new("", 0).unwrap();
372 }
373
374 #[test]
375 #[should_panic(expected = "shared cursor name must not be empty")]
376 fn test_attach_cursor_rejects_empty_name() {
377 let _ = SharedCursor::attach("").unwrap();
378 }
379
380 #[test]
381 #[should_panic(expected = "shared cursor name must not be empty")]
382 fn test_recreate_cursor_rejects_empty_name() {
383 let _ = SharedCursor::recreate("", 0).unwrap();
384 }
385}