rtipc/
lib.rs

1mod cache;
2mod channel;
3pub mod error;
4mod header;
5mod shm;
6mod table;
7
8use std::{
9    fmt,
10    marker::PhantomData,
11    mem::size_of,
12    num::NonZeroUsize,
13    os::fd::OwnedFd,
14    path::Path,
15    sync::{atomic::AtomicU32, Arc},
16};
17
18use nix::sys::stat::Mode;
19
20use crate::{
21    cache::cacheline_aligned,
22    channel::{ConsumerChannel, ProducerChannel},
23    header::Header,
24    shm::{Chunk, SharedMemory, Span},
25    table::ChannelTable,
26};
27
28pub use channel::{ConsumeResult, ProduceForceResult, ProduceTryResult};
29pub use error::*;
30
31pub use log;
32
33pub(crate) type AtomicIndex = AtomicU32;
34pub(crate) type Index = u32;
35pub(crate) const MIN_MSGS: usize = 3;
36
37#[derive(Debug, Copy, Clone)]
38pub struct ChannelParam {
39    pub add_msgs: usize,
40    pub msg_size: NonZeroUsize,
41}
42
43impl ChannelParam {
44    fn data_size(&self) -> usize {
45        let n = MIN_MSGS + self.add_msgs;
46
47        n * cacheline_aligned(self.msg_size.get())
48    }
49
50    fn queue_size(&self) -> usize {
51        let n = 2 + MIN_MSGS + self.add_msgs;
52        cacheline_aligned(n * std::mem::size_of::<Index>())
53    }
54
55    pub(crate) fn size(&self) -> NonZeroUsize {
56        NonZeroUsize::new(self.queue_size() + self.data_size()).unwrap()
57    }
58}
59
60pub struct Producer<T> {
61    channel: ProducerChannel,
62    _type: PhantomData<T>,
63}
64
65impl<T> Producer<T> {
66    pub(crate) fn new(channel: ProducerChannel) -> Result<Producer<T>, MemError> {
67        if size_of::<T>() > channel.msg_size().get() {
68            return Err(MemError::Size);
69        }
70        Ok(Producer {
71            channel,
72            _type: PhantomData,
73        })
74    }
75
76    pub fn msg(&mut self) -> &mut T {
77        let ptr: *mut T = self.channel.current().cast();
78        unsafe { &mut *ptr }
79    }
80
81    pub fn force_push(&mut self) -> ProduceForceResult {
82        self.channel.force_push()
83    }
84
85    pub fn try_push(&mut self) -> ProduceTryResult {
86        self.channel.try_push()
87    }
88}
89
90pub struct Consumer<T> {
91    channel: ConsumerChannel,
92    _type: PhantomData<T>,
93}
94
95impl<T> Consumer<T> {
96    pub(crate) fn new(channel: ConsumerChannel) -> Result<Consumer<T>, MemError> {
97        if size_of::<T>() > channel.msg_size().get() {
98            return Err(MemError::Size);
99        }
100        Ok(Consumer {
101            channel,
102            _type: PhantomData,
103        })
104    }
105
106    pub fn msg(&self) -> Option<&T> {
107        let ptr: *const T = self.channel.current()?.cast();
108        Some(unsafe { &*ptr })
109    }
110
111    pub fn pop(&mut self) -> ConsumeResult {
112        self.channel.pop()
113    }
114
115    pub fn flush(&mut self) -> ConsumeResult {
116        self.channel.flush()
117    }
118}
119
120pub struct RtIpc {
121    shm: Arc<SharedMemory>,
122    producers: Vec<Option<ProducerChannel>>,
123    consumers: Vec<Option<ConsumerChannel>>,
124}
125
126impl RtIpc {
127    fn calc_shm_size(
128        consumers: &[ChannelParam],
129        producers: &[ChannelParam],
130    ) -> Result<NonZeroUsize, CreateError> {
131        let num_channels =
132            NonZeroUsize::new(consumers.len() + producers.len()).ok_or(CreateError::Argument)?;
133
134        let mut size = RtIpc::calc_offset_channels(num_channels);
135
136        for chan in consumers {
137            size += chan.size().get();
138        }
139
140        for chan in producers {
141            size += chan.size().get();
142        }
143
144        NonZeroUsize::new(size).ok_or(CreateError::Argument)
145    }
146
147    fn calc_offset_channels(num_channels: NonZeroUsize) -> usize {
148        let mut offset = size_of::<Header>();
149        offset += ChannelTable::calc_size(num_channels).get();
150        offset = cacheline_aligned(offset);
151        offset
152    }
153
154    fn chunk_header(shm: &SharedMemory) -> Result<Chunk, MemError> {
155        let span = Span {
156            offset: 0,
157            size: NonZeroUsize::new(size_of::<Header>()).unwrap(),
158        };
159        shm.alloc(&span)
160    }
161
162    fn chunk_table(shm: &SharedMemory, num_channels: NonZeroUsize) -> Result<Chunk, MemError> {
163        let offset = size_of::<Header>();
164        let size = ChannelTable::calc_size(num_channels);
165        let span = Span { offset, size };
166        shm.alloc(&span)
167    }
168
169    fn construct(
170        shm: Arc<SharedMemory>,
171        table: ChannelTable,
172        init: bool,
173    ) -> Result<RtIpc, CreateError> {
174        let mut consumers: Vec<Option<ConsumerChannel>> = Vec::with_capacity(table.consumers.len());
175        let mut producers: Vec<Option<ProducerChannel>> = Vec::with_capacity(table.producers.len());
176
177        for entry in table.consumers {
178            let chunk = shm.alloc(&entry.span)?;
179            let channel = ConsumerChannel::new(chunk, &entry.param)?;
180            if init {
181                channel.init();
182            }
183            consumers.push(Some(channel));
184        }
185
186        for entry in table.producers {
187            let chunk = shm.alloc(&entry.span)?;
188            let channel = ProducerChannel::new(chunk, &entry.param)?;
189            if init {
190                channel.init();
191            }
192            producers.push(Some(channel));
193        }
194
195        Ok(RtIpc {
196            shm,
197            consumers,
198            producers,
199        })
200    }
201
202    fn from_shm(shm: Arc<SharedMemory>, cookie: u32) -> Result<RtIpc, CreateError> {
203        let chunk_header = RtIpc::chunk_header(&shm)?;
204        let header = Header::from_chunk(&chunk_header, cookie)?;
205
206        let num_producers = header.num_channels[0] as usize;
207        let num_consumers = header.num_channels[1] as usize;
208
209        let num_channels =
210            NonZeroUsize::new(num_consumers + num_producers).ok_or(CreateError::Argument)?;
211
212        let offset: usize = RtIpc::calc_offset_channels(num_channels);
213
214        let chunk_table = RtIpc::chunk_table(&shm, num_channels)?;
215
216        let table = ChannelTable::from_chunk(&chunk_table, num_consumers, num_producers, offset)?;
217
218        RtIpc::construct(shm, table, false)
219    }
220
221    fn new(
222        shm: Arc<SharedMemory>,
223        param_consumers: &[ChannelParam],
224        param_producers: &[ChannelParam],
225        cookie: u32,
226    ) -> Result<RtIpc, CreateError> {
227        let header = Header::new(
228            param_consumers.len() as u32,
229            param_producers.len() as u32,
230            cookie,
231        );
232        let num_channels = NonZeroUsize::new(param_consumers.len() + param_producers.len())
233            .ok_or(CreateError::Argument)?;
234
235        let offset: usize = RtIpc::calc_offset_channels(num_channels);
236        let table = ChannelTable::new(param_consumers, param_producers, offset);
237
238        let chunk_header = RtIpc::chunk_header(&shm)?;
239        let chunk_table = RtIpc::chunk_table(&shm, num_channels)?;
240
241        header.write(&chunk_header)?;
242        table.write(&chunk_table)?;
243
244        RtIpc::construct(shm, table, true)
245    }
246
247    pub fn new_anon_shm(
248        param_consumers: &[ChannelParam],
249        param_producers: &[ChannelParam],
250        cookie: u32,
251    ) -> Result<RtIpc, CreateError> {
252        let shm_size = RtIpc::calc_shm_size(param_consumers, param_producers)?;
253
254        let shm = SharedMemory::new_anon(shm_size)?;
255        RtIpc::new(shm, param_consumers, param_producers, cookie)
256    }
257
258    pub fn new_named_shm(
259        param_consumers: &[ChannelParam],
260        param_producers: &[ChannelParam],
261        cookie: u32,
262        path: &Path,
263        mode: Mode,
264    ) -> Result<RtIpc, CreateError> {
265        let shm_size = RtIpc::calc_shm_size(param_consumers, param_producers)?;
266
267        let shm = SharedMemory::new_named(shm_size, path, mode)?;
268        RtIpc::new(shm, param_consumers, param_producers, cookie)
269    }
270
271    pub fn from_fd(fd: OwnedFd, cookie: u32) -> Result<RtIpc, CreateError> {
272        let shm = SharedMemory::from_fd(fd)?;
273        RtIpc::from_shm(shm, cookie)
274    }
275
276    pub fn take_consumer<T>(&mut self, index: usize) -> Result<Consumer<T>, ChannelError> {
277        let channel_option: &mut Option<ConsumerChannel> =
278            self.consumers.get_mut(index).ok_or(MemError::Index)?;
279        let channel: ConsumerChannel = channel_option.take().ok_or(ChannelError::Index)?;
280        Ok(Consumer::<T>::new(channel)?)
281    }
282
283    pub fn take_producer<T>(&mut self, index: usize) -> Result<Producer<T>, ChannelError> {
284        let channel_option: &mut Option<ProducerChannel> =
285            self.producers.get_mut(index).ok_or(MemError::Index)?;
286        let channel: ProducerChannel = channel_option.take().ok_or(ChannelError::Index)?;
287        Ok(Producer::<T>::new(channel)?)
288    }
289
290    pub fn get_fd(&self) -> &OwnedFd {
291        self.shm.get_fd()
292    }
293}
294
295impl fmt::Display for RtIpc {
296    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297        write!(f, "shm: {}", self.shm)
298    }
299}