1use crate::buffer::RingBufferError;
5use crate::control::ControlRegion;
6use crate::futex::{Futex, FutexError};
7use crate::mmap::MmapSegment;
8use crate::priority::Priority;
9use crate::pring::PriorityRingBuffer;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ShmError {
15 Disconnected,
16 Timeout,
17 BufferFull,
18 ProtocolMismatch,
19 InvalidState,
20 MessageTooLarge,
21 Other(&'static str),
22}
23
24impl std::fmt::Display for ShmError {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 ShmError::Disconnected => write!(f, "Shared memory disconnected"),
28 ShmError::Timeout => write!(f, "Operation timed out"),
29 ShmError::BufferFull => write!(f, "Buffer is full"),
30 ShmError::ProtocolMismatch => write!(f, "Protocol version mismatch"),
31 ShmError::InvalidState => write!(f, "Invalid state for operation"),
32 ShmError::MessageTooLarge => write!(f, "Message too large"),
33 ShmError::Other(msg) => write!(f, "Error: {}", msg),
34 }
35 }
36}
37
38impl std::error::Error for ShmError {}
39
40impl From<RingBufferError> for ShmError {
41 fn from(err: RingBufferError) -> Self {
42 match err {
43 RingBufferError::Full => ShmError::BufferFull,
44 RingBufferError::Empty => ShmError::InvalidState,
45 RingBufferError::InvalidState => ShmError::InvalidState,
46 RingBufferError::DataTooLarge => ShmError::MessageTooLarge,
47 }
48 }
49}
50
51impl From<FutexError> for ShmError {
52 fn from(err: FutexError) -> Self {
53 match err {
54 FutexError::Timeout => ShmError::Timeout,
55 FutexError::Interrupted => ShmError::InvalidState,
56 FutexError::InvalidArgument => ShmError::InvalidState,
57 FutexError::Unsupported => ShmError::Other("Futex unsupported"),
58 FutexError::Other(_) => ShmError::Other("Futex error"),
59 }
60 }
61}
62
63pub type ShmResult<T> = Result<T, ShmError>;
64
65pub trait ShmTransport {
66 fn write(&self, priority: Priority, data: &[u8]) -> ShmResult<()>;
67 fn read(&self) -> ShmResult<(Priority, Vec<u8>)>;
68 fn signal(&self);
69 fn wait(&self, timeout: Option<Duration>) -> ShmResult<()>;
70 fn is_connected(&self) -> bool;
71 fn protocol_version(&self) -> u16;
72}
73
74pub struct NrelayShmTransport {
75 #[allow(dead_code)]
76 mmap: MmapSegment,
77 control: *const ControlRegion,
78 ring: PriorityRingBuffer,
79 futex: Futex,
80 is_daemon: bool,
81 connected: AtomicBool,
82 expected_version: u16,
83}
84
85unsafe impl Send for NrelayShmTransport {}
86unsafe impl Sync for NrelayShmTransport {}
87
88impl NrelayShmTransport {
89 pub fn create(path: &str, size: usize, version: u16) -> ShmResult<Self> {
90 if size < 8192 {
91 return Err(ShmError::InvalidState);
92 }
93
94 let mut mmap = MmapSegment::create(path, size)
95 .map_err(|_| ShmError::Other("Failed to create mmap"))?;
96
97 let control_ptr = mmap.as_slice().as_ptr() as *const ControlRegion;
98 let control_mut = mmap.as_mut_slice().as_mut_ptr() as *mut ControlRegion;
99
100 unsafe {
101 ControlRegion::init(control_mut);
102 (*control_mut).data.version.store(version, Ordering::Release);
103 (*control_mut).data.daemon_alive.store(true, Ordering::Release);
104 let _ = mmap.flush();
105 }
106
107 let ring = PriorityRingBuffer::new(256)
108 .map_err(|_| ShmError::Other("Failed to create ring buffer"))?;
109
110 let futex = Futex::new(0);
111
112 Ok(Self {
113 mmap,
114 control: control_ptr,
115 ring,
116 futex,
117 is_daemon: true,
118 connected: AtomicBool::new(true),
119 expected_version: version,
120 })
121 }
122
123 pub fn connect(path: &str, version: u16) -> ShmResult<Self> {
124 let mmap = MmapSegment::open(path, 8192)
125 .map_err(|_| ShmError::Disconnected)?;
126
127 let control_ptr = mmap.as_slice().as_ptr() as *const ControlRegion;
128
129 unsafe {
130 let actual_version = (*control_ptr).data.version.load(Ordering::Acquire);
131 if actual_version != version {
132 return Err(ShmError::ProtocolMismatch);
133 }
134 }
135
136 let ring = PriorityRingBuffer::new(256)
137 .map_err(|_| ShmError::Other("Failed to create ring buffer"))?;
138
139 let futex = Futex::new(0);
140
141 unsafe {
142 (*control_ptr).data.client_count.fetch_add(1, Ordering::AcqRel);
143 }
144
145 Ok(Self {
146 mmap,
147 control: control_ptr,
148 ring,
149 futex,
150 is_daemon: false,
151 connected: AtomicBool::new(true),
152 expected_version: version,
153 })
154 }
155
156 pub fn backpressure(&self) -> f32 {
157 unsafe {
158 (*self.control).backpressure()
159 }
160 }
161
162 pub fn set_backpressure(&self, value: f32) {
163 unsafe {
164 (*self.control).set_backpressure(value);
165 }
166 }
167
168 pub fn is_daemon_alive(&self) -> bool {
169 unsafe {
170 (*self.control).data.daemon_alive.load(Ordering::Acquire)
171 }
172 }
173
174 pub fn client_count(&self) -> u32 {
175 unsafe {
176 (*self.control).data.client_count.load(Ordering::Acquire)
177 }
178 }
179
180 pub fn shutdown(&self) {
181 if self.is_daemon {
182 unsafe {
183 (*self.control).data.daemon_alive.store(false, Ordering::Release);
184 }
185 }
186 self.connected.store(false, Ordering::Release);
187 }
188}
189
190impl ShmTransport for NrelayShmTransport {
191 fn write(&self, priority: Priority, data: &[u8]) -> ShmResult<()> {
192 if !self.connected.load(Ordering::Acquire) {
193 return Err(ShmError::Disconnected);
194 }
195
196 if data.is_empty() {
197 return Err(ShmError::InvalidState);
198 }
199
200 self.ring.write(priority, data)?;
201 Ok(())
202 }
203
204 fn read(&self) -> ShmResult<(Priority, Vec<u8>)> {
205 if !self.connected.load(Ordering::Acquire) {
206 return Err(ShmError::Disconnected);
207 }
208
209 self.ring.read()
210 .ok_or(ShmError::InvalidState)
211 }
212
213 fn signal(&self) {
214 self.futex.wake_one();
215 }
216
217 fn wait(&self, timeout: Option<Duration>) -> ShmResult<()> {
218 if !self.connected.load(Ordering::Acquire) {
219 return Err(ShmError::Disconnected);
220 }
221
222 self.futex.wait(0, timeout)?;
223 Ok(())
224 }
225
226 fn is_connected(&self) -> bool {
227 self.connected.load(Ordering::Acquire) && self.is_daemon_alive()
228 }
229
230 fn protocol_version(&self) -> u16 {
231 self.expected_version
232 }
233}
234
235impl Drop for NrelayShmTransport {
236 fn drop(&mut self) {
237 if !self.is_daemon {
238 unsafe {
239 (*self.control).data.client_count.fetch_sub(1, Ordering::AcqRel);
240 }
241 }
242 self.connected.store(false, Ordering::Release);
243 }
244}