1mod cache;
2mod channel;
3pub mod error;
4mod header;
5mod shm;
6mod table;
7
8use std::{
9 fmt,
10 marker::PhantomData,
11 mem::size_of,
12 num::NonZeroUsize,
13 os::fd::OwnedFd,
14 path::Path,
15 sync::{atomic::AtomicU32, Arc},
16};
17
18use nix::sys::stat::Mode;
19
20use crate::{
21 cache::cacheline_aligned,
22 channel::{ConsumerChannel, ProducerChannel},
23 header::Header,
24 shm::{Chunk, SharedMemory, Span},
25 table::ChannelTable,
26};
27
28pub use channel::{ConsumeResult, ProduceForceResult, ProduceTryResult};
29pub use error::*;
30
31pub use log;
32
33pub(crate) type AtomicIndex = AtomicU32;
34pub(crate) type Index = u32;
35pub(crate) const MIN_MSGS: usize = 3;
36
37#[derive(Debug, Copy, Clone)]
38pub struct ChannelParam {
39 pub add_msgs: usize,
40 pub msg_size: NonZeroUsize,
41}
42
43impl ChannelParam {
44 fn data_size(&self) -> usize {
45 let n = MIN_MSGS + self.add_msgs;
46
47 n * cacheline_aligned(self.msg_size.get())
48 }
49
50 fn queue_size(&self) -> usize {
51 let n = 2 + MIN_MSGS + self.add_msgs;
52 cacheline_aligned(n * std::mem::size_of::<Index>())
53 }
54
55 pub(crate) fn size(&self) -> NonZeroUsize {
56 NonZeroUsize::new(self.queue_size() + self.data_size()).unwrap()
57 }
58}
59
60pub struct Producer<T> {
61 channel: ProducerChannel,
62 _type: PhantomData<T>,
63}
64
65impl<T> Producer<T> {
66 pub(crate) fn new(channel: ProducerChannel) -> Result<Producer<T>, MemError> {
67 if size_of::<T>() > channel.msg_size().get() {
68 return Err(MemError::Size);
69 }
70 Ok(Producer {
71 channel,
72 _type: PhantomData,
73 })
74 }
75
76 pub fn msg(&mut self) -> &mut T {
77 let ptr: *mut T = self.channel.current().cast();
78 unsafe { &mut *ptr }
79 }
80
81 pub fn force_push(&mut self) -> ProduceForceResult {
82 self.channel.force_push()
83 }
84
85 pub fn try_push(&mut self) -> ProduceTryResult {
86 self.channel.try_push()
87 }
88}
89
90pub struct Consumer<T> {
91 channel: ConsumerChannel,
92 _type: PhantomData<T>,
93}
94
95impl<T> Consumer<T> {
96 pub(crate) fn new(channel: ConsumerChannel) -> Result<Consumer<T>, MemError> {
97 if size_of::<T>() > channel.msg_size().get() {
98 return Err(MemError::Size);
99 }
100 Ok(Consumer {
101 channel,
102 _type: PhantomData,
103 })
104 }
105
106 pub fn msg(&self) -> Option<&T> {
107 let ptr: *const T = self.channel.current()?.cast();
108 Some(unsafe { &*ptr })
109 }
110
111 pub fn pop(&mut self) -> ConsumeResult {
112 self.channel.pop()
113 }
114
115 pub fn flush(&mut self) -> ConsumeResult {
116 self.channel.flush()
117 }
118}
119
120pub struct RtIpc {
121 shm: Arc<SharedMemory>,
122 producers: Vec<Option<ProducerChannel>>,
123 consumers: Vec<Option<ConsumerChannel>>,
124}
125
126impl RtIpc {
127 fn calc_shm_size(
128 consumers: &[ChannelParam],
129 producers: &[ChannelParam],
130 ) -> Result<NonZeroUsize, CreateError> {
131 let num_channels =
132 NonZeroUsize::new(consumers.len() + producers.len()).ok_or(CreateError::Argument)?;
133
134 let mut size = RtIpc::calc_offset_channels(num_channels);
135
136 for chan in consumers {
137 size += chan.size().get();
138 }
139
140 for chan in producers {
141 size += chan.size().get();
142 }
143
144 NonZeroUsize::new(size).ok_or(CreateError::Argument)
145 }
146
147 fn calc_offset_channels(num_channels: NonZeroUsize) -> usize {
148 let mut offset = size_of::<Header>();
149 offset += ChannelTable::calc_size(num_channels).get();
150 offset = cacheline_aligned(offset);
151 offset
152 }
153
154 fn chunk_header(shm: &SharedMemory) -> Result<Chunk, MemError> {
155 let span = Span {
156 offset: 0,
157 size: NonZeroUsize::new(size_of::<Header>()).unwrap(),
158 };
159 shm.alloc(&span)
160 }
161
162 fn chunk_table(shm: &SharedMemory, num_channels: NonZeroUsize) -> Result<Chunk, MemError> {
163 let offset = size_of::<Header>();
164 let size = ChannelTable::calc_size(num_channels);
165 let span = Span { offset, size };
166 shm.alloc(&span)
167 }
168
169 fn construct(
170 shm: Arc<SharedMemory>,
171 table: ChannelTable,
172 init: bool,
173 ) -> Result<RtIpc, CreateError> {
174 let mut consumers: Vec<Option<ConsumerChannel>> = Vec::with_capacity(table.consumers.len());
175 let mut producers: Vec<Option<ProducerChannel>> = Vec::with_capacity(table.producers.len());
176
177 for entry in table.consumers {
178 let chunk = shm.alloc(&entry.span)?;
179 let channel = ConsumerChannel::new(chunk, &entry.param)?;
180 if init {
181 channel.init();
182 }
183 consumers.push(Some(channel));
184 }
185
186 for entry in table.producers {
187 let chunk = shm.alloc(&entry.span)?;
188 let channel = ProducerChannel::new(chunk, &entry.param)?;
189 if init {
190 channel.init();
191 }
192 producers.push(Some(channel));
193 }
194
195 Ok(RtIpc {
196 shm,
197 consumers,
198 producers,
199 })
200 }
201
202 fn from_shm(shm: Arc<SharedMemory>, cookie: u32) -> Result<RtIpc, CreateError> {
203 let chunk_header = RtIpc::chunk_header(&shm)?;
204 let header = Header::from_chunk(&chunk_header, cookie)?;
205
206 let num_producers = header.num_channels[0] as usize;
207 let num_consumers = header.num_channels[1] as usize;
208
209 let num_channels =
210 NonZeroUsize::new(num_consumers + num_producers).ok_or(CreateError::Argument)?;
211
212 let offset: usize = RtIpc::calc_offset_channels(num_channels);
213
214 let chunk_table = RtIpc::chunk_table(&shm, num_channels)?;
215
216 let table = ChannelTable::from_chunk(&chunk_table, num_consumers, num_producers, offset)?;
217
218 RtIpc::construct(shm, table, false)
219 }
220
221 fn new(
222 shm: Arc<SharedMemory>,
223 param_consumers: &[ChannelParam],
224 param_producers: &[ChannelParam],
225 cookie: u32,
226 ) -> Result<RtIpc, CreateError> {
227 let header = Header::new(
228 param_consumers.len() as u32,
229 param_producers.len() as u32,
230 cookie,
231 );
232 let num_channels = NonZeroUsize::new(param_consumers.len() + param_producers.len())
233 .ok_or(CreateError::Argument)?;
234
235 let offset: usize = RtIpc::calc_offset_channels(num_channels);
236 let table = ChannelTable::new(param_consumers, param_producers, offset);
237
238 let chunk_header = RtIpc::chunk_header(&shm)?;
239 let chunk_table = RtIpc::chunk_table(&shm, num_channels)?;
240
241 header.write(&chunk_header)?;
242 table.write(&chunk_table)?;
243
244 RtIpc::construct(shm, table, true)
245 }
246
247 pub fn new_anon_shm(
248 param_consumers: &[ChannelParam],
249 param_producers: &[ChannelParam],
250 cookie: u32,
251 ) -> Result<RtIpc, CreateError> {
252 let shm_size = RtIpc::calc_shm_size(param_consumers, param_producers)?;
253
254 let shm = SharedMemory::new_anon(shm_size)?;
255 RtIpc::new(shm, param_consumers, param_producers, cookie)
256 }
257
258 pub fn new_named_shm(
259 param_consumers: &[ChannelParam],
260 param_producers: &[ChannelParam],
261 cookie: u32,
262 path: &Path,
263 mode: Mode,
264 ) -> Result<RtIpc, CreateError> {
265 let shm_size = RtIpc::calc_shm_size(param_consumers, param_producers)?;
266
267 let shm = SharedMemory::new_named(shm_size, path, mode)?;
268 RtIpc::new(shm, param_consumers, param_producers, cookie)
269 }
270
271 pub fn from_fd(fd: OwnedFd, cookie: u32) -> Result<RtIpc, CreateError> {
272 let shm = SharedMemory::from_fd(fd)?;
273 RtIpc::from_shm(shm, cookie)
274 }
275
276 pub fn take_consumer<T>(&mut self, index: usize) -> Result<Consumer<T>, ChannelError> {
277 let channel_option: &mut Option<ConsumerChannel> =
278 self.consumers.get_mut(index).ok_or(MemError::Index)?;
279 let channel: ConsumerChannel = channel_option.take().ok_or(ChannelError::Index)?;
280 Ok(Consumer::<T>::new(channel)?)
281 }
282
283 pub fn take_producer<T>(&mut self, index: usize) -> Result<Producer<T>, ChannelError> {
284 let channel_option: &mut Option<ProducerChannel> =
285 self.producers.get_mut(index).ok_or(MemError::Index)?;
286 let channel: ProducerChannel = channel_option.take().ok_or(ChannelError::Index)?;
287 Ok(Producer::<T>::new(channel)?)
288 }
289
290 pub fn get_fd(&self) -> &OwnedFd {
291 self.shm.get_fd()
292 }
293}
294
295impl fmt::Display for RtIpc {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 write!(f, "shm: {}", self.shm)
298 }
299}