use crate::{
shared_memory_layout::{required_layout_size, validate_layout, write_layout, SegmentKind},
MultiProcessError, MultiProcessResult,
};
use shared_memory::{Shmem, ShmemConf};
use std::mem::align_of;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicI64, Ordering};
const CACHE_LINE_SIZE: usize = 64;
#[repr(align(64))]
struct PaddedAtomicI64 {
atomic: AtomicI64,
_padding: [u8; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
}
impl PaddedAtomicI64 {
fn new(value: i64) -> Self {
Self {
atomic: AtomicI64::new(value),
_padding: [0; CACHE_LINE_SIZE - std::mem::size_of::<AtomicI64>()],
}
}
}
pub struct SharedCursor {
_shmem: Shmem,
cursor_ptr: NonNull<PaddedAtomicI64>,
is_owner: bool,
}
unsafe impl Send for SharedCursor {}
unsafe impl Sync for SharedCursor {}
impl Clone for SharedCursor {
fn clone(&self) -> Self {
let os_id = self._shmem.get_os_id();
Self::attach(os_id)
.expect("Failed to clone SharedCursor - shared memory segment should exist")
}
}
impl Drop for SharedCursor {
fn drop(&mut self) {
if self.is_owner {
}
}
}
impl SharedCursor {
fn ensure_name(name: &str) {
assert!(!name.is_empty(), "shared cursor name must not be empty");
}
#[cfg(unix)]
fn unlink_shared_segment(name: &str) {
use std::ffi::CString;
if let Ok(c_str) = CString::new(name) {
unsafe {
libc::shm_unlink(c_str.as_ptr());
}
}
}
#[cfg(not(unix))]
fn unlink_shared_segment(_name: &str) {
}
pub fn new_auto(initial_value: i64) -> MultiProcessResult<(Self, String)> {
let payload_size = std::mem::size_of::<PaddedAtomicI64>();
let payload_alignment = align_of::<PaddedAtomicI64>();
let shmem = ShmemConf::new()
.size(required_layout_size(payload_size, payload_alignment)?)
.create() .map_err(|e| MultiProcessError::SharedMemoryError(e.to_string()))?;
let generated_name = shmem.get_os_id().to_string();
let contract = write_layout(
&shmem,
payload_size,
payload_size,
1,
payload_alignment,
SegmentKind::Cursor,
)?;
let ptr = unsafe {
shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
};
let cursor_ptr = NonNull::new(ptr)
.ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
unsafe {
std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
}
let cursor = Self {
_shmem: shmem,
cursor_ptr,
is_owner: true, };
Ok((cursor, generated_name))
}
pub fn new(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
Self::ensure_name(name);
let shmem = ShmemConf::new()
.size(required_layout_size(
std::mem::size_of::<PaddedAtomicI64>(),
align_of::<PaddedAtomicI64>(),
)?)
.os_id(name)
.create()
.map_err(|e| MultiProcessError::SharedMemoryError(e.to_string()))?;
let contract = write_layout(
&shmem,
std::mem::size_of::<PaddedAtomicI64>(),
std::mem::size_of::<PaddedAtomicI64>(),
1,
align_of::<PaddedAtomicI64>(),
SegmentKind::Cursor,
)?;
let ptr = unsafe {
shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
};
let cursor_ptr = NonNull::new(ptr)
.ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
unsafe {
std::ptr::write(ptr, PaddedAtomicI64::new(initial_value));
}
Ok(Self {
_shmem: shmem,
cursor_ptr,
is_owner: true, })
}
pub fn recreate(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
Self::ensure_name(name);
Self::unlink_shared_segment(name);
Self::new(name, initial_value)
}
pub fn attach(name: &str) -> MultiProcessResult<Self> {
Self::ensure_name(name);
let payload_size = std::mem::size_of::<PaddedAtomicI64>();
let payload_alignment = align_of::<PaddedAtomicI64>();
let shmem = ShmemConf::new()
.os_id(name)
.open()
.map_err(|e| MultiProcessError::SegmentNotFound(e.to_string()))?;
let contract = validate_layout(
&shmem,
payload_size,
payload_size,
1,
payload_alignment,
SegmentKind::Cursor,
)?;
let ptr = unsafe {
shmem.as_ptr().cast::<u8>().add(contract.payload_offset) as *mut PaddedAtomicI64
};
let cursor_ptr = NonNull::new(ptr)
.ok_or_else(|| MultiProcessError::MemoryMapError("Null pointer".to_string()))?;
Ok(Self {
_shmem: shmem,
cursor_ptr,
is_owner: false, })
}
pub fn new_or_attach(name: &str, initial_value: i64) -> MultiProcessResult<Self> {
match Self::new(name, initial_value) {
Ok(cursor) => Ok(cursor),
Err(MultiProcessError::SharedMemoryError(ref msg))
if msg.contains("already exists") || msg.contains("File exists") =>
{
Self::attach(name)
}
Err(e) => Err(e),
}
}
pub fn is_owner(&self) -> bool {
self.is_owner
}
pub fn set_owner(&mut self, is_owner: bool) -> bool {
let previous = self.is_owner;
self._shmem.set_owner(is_owner);
self.is_owner = is_owner;
previous
}
pub fn load(&self, ordering: Ordering) -> i64 {
unsafe { self.cursor_ptr.as_ref().atomic.load(ordering) }
}
pub fn store(&self, value: i64, ordering: Ordering) {
unsafe { self.cursor_ptr.as_ref().atomic.store(value, ordering) }
}
pub fn compare_exchange(
&self,
current: i64,
new: i64,
success: Ordering,
failure: Ordering,
) -> Result<i64, i64> {
unsafe {
self.cursor_ptr
.as_ref()
.atomic
.compare_exchange(current, new, success, failure)
}
}
pub fn compare_exchange_weak(
&self,
current: i64,
new: i64,
success: Ordering,
failure: Ordering,
) -> Result<i64, i64> {
unsafe {
self.cursor_ptr
.as_ref()
.atomic
.compare_exchange_weak(current, new, success, failure)
}
}
pub fn fetch_add(&self, val: i64, ordering: Ordering) -> i64 {
unsafe { self.cursor_ptr.as_ref().atomic.fetch_add(val, ordering) }
}
pub fn swap(&self, val: i64, ordering: Ordering) -> i64 {
unsafe { self.cursor_ptr.as_ref().atomic.swap(val, ordering) }
}
}
pub trait SharedCursorTrait<T> {
fn load(&self, ordering: Ordering) -> T;
fn store(&self, value: T, ordering: Ordering);
fn compare_exchange(
&self,
current: T,
new: T,
success: Ordering,
failure: Ordering,
) -> Result<T, T>;
}
impl SharedCursorTrait<i64> for SharedCursor {
fn load(&self, ordering: Ordering) -> i64 {
self.load(ordering)
}
fn store(&self, value: i64, ordering: Ordering) {
self.store(value, ordering)
}
fn compare_exchange(
&self,
current: i64,
new: i64,
success: Ordering,
failure: Ordering,
) -> Result<i64, i64> {
self.compare_exchange(current, new, success, failure)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
#[test]
fn test_shared_cursor_basic_operations() {
let cursor = SharedCursor::new("test_cursor", 42).unwrap();
assert_eq!(cursor.load(Ordering::Relaxed), 42);
cursor.store(100, Ordering::Relaxed);
assert_eq!(cursor.load(Ordering::Relaxed), 100);
let old = cursor.fetch_add(5, Ordering::Relaxed);
assert_eq!(old, 100);
assert_eq!(cursor.load(Ordering::Relaxed), 105);
}
#[test]
#[should_panic(expected = "shared cursor name must not be empty")]
fn test_new_cursor_rejects_empty_name() {
let _ = SharedCursor::new("", 0).unwrap();
}
#[test]
#[should_panic(expected = "shared cursor name must not be empty")]
fn test_attach_cursor_rejects_empty_name() {
let _ = SharedCursor::attach("").unwrap();
}
#[test]
#[should_panic(expected = "shared cursor name must not be empty")]
fn test_recreate_cursor_rejects_empty_name() {
let _ = SharedCursor::recreate("", 0).unwrap();
}
}