Skip to main content

memlink_shm/
transport.rs

1//! High-level shared memory transport trait with daemon-client architecture.
2//! Provides priority-based messaging, futex signaling, and connection management.
3
4use crate::buffer::RingBufferError;
5use crate::control::ControlRegion;
6use crate::futex::{Futex, FutexError};
7use crate::mmap::MmapSegment;
8use crate::priority::Priority;
9use crate::pring::PriorityRingBuffer;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ShmError {
15    Disconnected,
16    Timeout,
17    BufferFull,
18    ProtocolMismatch,
19    InvalidState,
20    MessageTooLarge,
21    Other(&'static str),
22}
23
24impl std::fmt::Display for ShmError {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            ShmError::Disconnected => write!(f, "Shared memory disconnected"),
28            ShmError::Timeout => write!(f, "Operation timed out"),
29            ShmError::BufferFull => write!(f, "Buffer is full"),
30            ShmError::ProtocolMismatch => write!(f, "Protocol version mismatch"),
31            ShmError::InvalidState => write!(f, "Invalid state for operation"),
32            ShmError::MessageTooLarge => write!(f, "Message too large"),
33            ShmError::Other(msg) => write!(f, "Error: {}", msg),
34        }
35    }
36}
37
38impl std::error::Error for ShmError {}
39
40impl From<RingBufferError> for ShmError {
41    fn from(err: RingBufferError) -> Self {
42        match err {
43            RingBufferError::Full => ShmError::BufferFull,
44            RingBufferError::Empty => ShmError::InvalidState,
45            RingBufferError::InvalidState => ShmError::InvalidState,
46            RingBufferError::DataTooLarge => ShmError::MessageTooLarge,
47        }
48    }
49}
50
51impl From<FutexError> for ShmError {
52    fn from(err: FutexError) -> Self {
53        match err {
54            FutexError::Timeout => ShmError::Timeout,
55            FutexError::Interrupted => ShmError::InvalidState,
56            FutexError::InvalidArgument => ShmError::InvalidState,
57            FutexError::Unsupported => ShmError::Other("Futex unsupported"),
58            FutexError::Other(_) => ShmError::Other("Futex error"),
59        }
60    }
61}
62
63pub type ShmResult<T> = Result<T, ShmError>;
64
65pub trait ShmTransport {
66    fn write(&self, priority: Priority, data: &[u8]) -> ShmResult<()>;
67    fn read(&self) -> ShmResult<(Priority, Vec<u8>)>;
68    fn signal(&self);
69    fn wait(&self, timeout: Option<Duration>) -> ShmResult<()>;
70    fn is_connected(&self) -> bool;
71    fn protocol_version(&self) -> u16;
72}
73
74pub struct NrelayShmTransport {
75    #[allow(dead_code)]
76    mmap: MmapSegment,
77    control: *const ControlRegion,
78    ring: PriorityRingBuffer,
79    futex: Futex,
80    is_daemon: bool,
81    connected: AtomicBool,
82    expected_version: u16,
83}
84
85unsafe impl Send for NrelayShmTransport {}
86unsafe impl Sync for NrelayShmTransport {}
87
88impl NrelayShmTransport {
89    pub fn create(path: &str, size: usize, version: u16) -> ShmResult<Self> {
90        if size < 8192 {
91            return Err(ShmError::InvalidState);
92        }
93
94        let mut mmap = MmapSegment::create(path, size)
95            .map_err(|_| ShmError::Other("Failed to create mmap"))?;
96
97        let control_ptr = mmap.as_slice().as_ptr() as *const ControlRegion;
98        let control_mut = mmap.as_mut_slice().as_mut_ptr() as *mut ControlRegion;
99
100        unsafe {
101            ControlRegion::init(control_mut);
102            (*control_mut).data.version.store(version, Ordering::Release);
103            (*control_mut).data.daemon_alive.store(true, Ordering::Release);
104            let _ = mmap.flush();
105        }
106
107        let ring = PriorityRingBuffer::new(256)
108            .map_err(|_| ShmError::Other("Failed to create ring buffer"))?;
109
110        let futex = Futex::new(0);
111
112        Ok(Self {
113            mmap,
114            control: control_ptr,
115            ring,
116            futex,
117            is_daemon: true,
118            connected: AtomicBool::new(true),
119            expected_version: version,
120        })
121    }
122
123    pub fn connect(path: &str, version: u16) -> ShmResult<Self> {
124        let mmap = MmapSegment::open(path, 8192)
125            .map_err(|_| ShmError::Disconnected)?;
126
127        let control_ptr = mmap.as_slice().as_ptr() as *const ControlRegion;
128
129        unsafe {
130            let actual_version = (*control_ptr).data.version.load(Ordering::Acquire);
131            if actual_version != version {
132                return Err(ShmError::ProtocolMismatch);
133            }
134        }
135
136        let ring = PriorityRingBuffer::new(256)
137            .map_err(|_| ShmError::Other("Failed to create ring buffer"))?;
138
139        let futex = Futex::new(0);
140
141        unsafe {
142            (*control_ptr).data.client_count.fetch_add(1, Ordering::AcqRel);
143        }
144
145        Ok(Self {
146            mmap,
147            control: control_ptr,
148            ring,
149            futex,
150            is_daemon: false,
151            connected: AtomicBool::new(true),
152            expected_version: version,
153        })
154    }
155
156    pub fn backpressure(&self) -> f32 {
157        unsafe {
158            (*self.control).backpressure()
159        }
160    }
161
162    pub fn set_backpressure(&self, value: f32) {
163        unsafe {
164            (*self.control).set_backpressure(value);
165        }
166    }
167
168    pub fn is_daemon_alive(&self) -> bool {
169        unsafe {
170            (*self.control).data.daemon_alive.load(Ordering::Acquire)
171        }
172    }
173
174    pub fn client_count(&self) -> u32 {
175        unsafe {
176            (*self.control).data.client_count.load(Ordering::Acquire)
177        }
178    }
179
180    pub fn shutdown(&self) {
181        if self.is_daemon {
182            unsafe {
183                (*self.control).data.daemon_alive.store(false, Ordering::Release);
184            }
185        }
186        self.connected.store(false, Ordering::Release);
187    }
188}
189
190impl ShmTransport for NrelayShmTransport {
191    fn write(&self, priority: Priority, data: &[u8]) -> ShmResult<()> {
192        if !self.connected.load(Ordering::Acquire) {
193            return Err(ShmError::Disconnected);
194        }
195
196        if data.is_empty() {
197            return Err(ShmError::InvalidState);
198        }
199
200        self.ring.write(priority, data)?;
201        Ok(())
202    }
203
204    fn read(&self) -> ShmResult<(Priority, Vec<u8>)> {
205        if !self.connected.load(Ordering::Acquire) {
206            return Err(ShmError::Disconnected);
207        }
208
209        self.ring.read()
210            .ok_or(ShmError::InvalidState)
211    }
212
213    fn signal(&self) {
214        self.futex.wake_one();
215    }
216
217    fn wait(&self, timeout: Option<Duration>) -> ShmResult<()> {
218        if !self.connected.load(Ordering::Acquire) {
219            return Err(ShmError::Disconnected);
220        }
221
222        self.futex.wait(0, timeout)?;
223        Ok(())
224    }
225
226    fn is_connected(&self) -> bool {
227        self.connected.load(Ordering::Acquire) && self.is_daemon_alive()
228    }
229
230    fn protocol_version(&self) -> u16 {
231        self.expected_version
232    }
233}
234
235impl Drop for NrelayShmTransport {
236    fn drop(&mut self) {
237        if !self.is_daemon {
238            unsafe {
239                (*self.control).data.client_count.fetch_sub(1, Ordering::AcqRel);
240            }
241        }
242        self.connected.store(false, Ordering::Release);
243    }
244}