1use std::collections::HashMap;
7use std::mem::MaybeUninit;
8use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
9use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
10use std::sync::{Arc, LazyLock, Mutex, RwLock};
11use std::time::Duration;
12
13use mio::event::Source;
14use mio::{Events, Poll, Token};
15
16pub trait MorbDataType: Send + Sync + 'static + Clone {}
17impl<T> MorbDataType for T where T: Send + Sync + 'static + Clone {}
18
19struct RingBuffer<T> {
20 slots: Box<[MaybeUninit<T>]>,
21 initialized: Box<[bool]>,
22}
23
24impl<T> RingBuffer<T> {
25 fn new(size: usize) -> Self {
26 let mut slots = Vec::with_capacity(size);
27 slots.resize_with(size, MaybeUninit::uninit);
28
29 Self {
30 slots: slots.into_boxed_slice(),
31 initialized: vec![false; size].into_boxed_slice(),
32 }
33 }
34
35 fn write(&mut self, index: usize, value: T) {
36 if self.initialized[index] {
37 unsafe {
38 self.slots[index].assume_init_drop();
39 }
40 }
41
42 self.slots[index].write(value);
43 self.initialized[index] = true;
44 }
45
46 fn read_cloned(&self, index: usize) -> Option<T>
47 where
48 T: Clone,
49 {
50 if !self.initialized[index] {
51 return None;
52 }
53
54 Some(unsafe { self.slots[index].assume_init_ref().clone() })
55 }
56}
57
58impl<T> Drop for RingBuffer<T> {
59 fn drop(&mut self) {
60 for (index, initialized) in self.initialized.iter().copied().enumerate() {
61 if initialized {
62 unsafe {
63 self.slots[index].assume_init_drop();
64 }
65 }
66 }
67 }
68}
69
70pub struct TopicManager {
72 topics: HashMap<String, Box<dyn std::any::Any + Send + Sync>>,
73 topic_num: usize,
74}
75
76static TOPIC_MANAGER: LazyLock<Arc<RwLock<TopicManager>>> =
77 LazyLock::new(|| Arc::new(RwLock::new(TopicManager::new())));
78
79pub fn create_topic<T: MorbDataType>(
83 name: String,
84 queue_size: u16,
85) -> Result<Arc<Topic<T>>, std::io::Error> {
86 TOPIC_MANAGER
87 .write()
88 .unwrap()
89 .create_topic(name, queue_size)
90}
91
92pub fn get_topic<T: MorbDataType>(name: &str) -> Option<Arc<Topic<T>>> {
94 TOPIC_MANAGER.read().unwrap().get_topic(name)
95}
96
97impl TopicManager {
98 pub fn new() -> Self {
100 Self {
101 topics: HashMap::new(),
102 topic_num: 0,
103 }
104 }
105
106 pub fn create_topic<T: MorbDataType>(
107 &mut self,
108 name: String,
109 queue_size: u16,
110 ) -> Result<Arc<Topic<T>>, std::io::Error> {
111 if self.topics.contains_key(&name) {
112 return Err(std::io::Error::new(
113 std::io::ErrorKind::AlreadyExists,
114 "Topic already exists",
115 ));
116 }
117 self.topic_num += 1;
118 let topic = Arc::new(Topic::new(name.clone(), queue_size, self.topic_num));
119 self.topics.insert(name, Box::new(topic.clone()));
120 Ok(topic)
121 }
122
123 pub fn get_topic<T: MorbDataType>(&self, name: &str) -> Option<Arc<Topic<T>>> {
124 self.topics
125 .get(name)
126 .and_then(|boxed| boxed.downcast_ref::<Arc<Topic<T>>>().cloned())
127 }
128}
129
130impl Default for TopicManager {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136pub struct Publisher<T: MorbDataType> {
137 topic: Arc<Topic<T>>,
138}
139
140impl<T: MorbDataType> Publisher<T> {
141 pub fn publish(&self, data: T) {
143 {
144 let mut fifo = self.topic.fifo.lock().unwrap();
145 let index = self.topic.generation.load(Ordering::Acquire) as usize
146 % (self.topic.queue_size as usize);
147 fifo.write(index, data);
148 self.topic.generation.fetch_add(1, Ordering::AcqRel);
149 }
150 self.topic.notify();
151 }
152}
153
154pub struct Subscriber<T: MorbDataType> {
156 topic: Arc<Topic<T>>,
157 sub_generation: u32,
158}
159
160impl<T: MorbDataType> Subscriber<T> {
161 pub fn check_update(&self) -> bool {
163 if self.sub_generation != self.topic.generation.load(Ordering::SeqCst) {
164 return true;
165 }
166 false
167 }
168
169 pub fn check_update_and_copy(&mut self) -> Option<T> {
174 let topic_generation = self.topic.generation.load(Ordering::SeqCst);
175 if self.sub_generation == topic_generation {
176 return None;
177 }
178 if self.sub_generation < topic_generation.saturating_sub(self.topic.queue_size as u32) {
179 self.sub_generation = topic_generation.saturating_sub(self.topic.queue_size as u32);
180 }
181 let index = (self.sub_generation as usize) % (self.topic.queue_size as usize);
182 self.sub_generation += 1;
183 self.topic.fifo.lock().unwrap().read_cloned(index)
184 }
185}
186
187pub struct TopicPoller {
189 poll: Poll,
190 events: Events,
191 manual_events: Vec<Token>,
192 registrations: Vec<TopicPollerRegistration>,
193}
194
195struct TopicPollerRegistration {
196 eventfd: RawFd,
197 token: Token,
198 generation: *const AtomicU32,
199 last_generation: u32,
200 poller_count: Arc<AtomicUsize>,
201 registered: bool,
202}
203
204impl Default for TopicPoller {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210impl TopicPoller {
211 pub fn new() -> Self {
213 Self {
214 poll: Poll::new().unwrap(),
215 events: Events::with_capacity(1024),
216 manual_events: Vec::new(),
217 registrations: Vec::new(),
218 }
219 }
220
221 pub fn add_topic<T: MorbDataType>(&mut self, topic: &Topic<T>) -> std::io::Result<()> {
223 self.registrations.push(TopicPollerRegistration {
224 eventfd: topic.eventfd.as_raw_fd(),
225 token: topic.token,
226 generation: &topic.generation,
227 last_generation: topic.generation.load(Ordering::SeqCst),
228 poller_count: Arc::clone(&topic.poller_count),
229 registered: false,
230 });
231
232 Ok(())
233 }
234
235 pub fn remove_topic<T: MorbDataType>(&mut self, topic: &Topic<T>) -> std::io::Result<()> {
237 if let Some(index) = self
238 .registrations
239 .iter()
240 .position(|registration| registration.eventfd == topic.eventfd.as_raw_fd())
241 {
242 let registration = self.registrations.swap_remove(index);
243 if registration.registered {
244 mio::unix::SourceFd(&topic.eventfd.as_raw_fd()).deregister(self.poll.registry())?;
245 registration.poller_count.fetch_sub(1, Ordering::Relaxed);
246 }
247 }
248
249 Ok(())
250 }
251
252 pub fn wait(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
254 self.manual_events.clear();
255
256 if timeout == Some(Duration::ZERO) {
257 for registration in &mut self.registrations {
258 let generation = unsafe { (*registration.generation).load(Ordering::SeqCst) };
259 if generation != registration.last_generation {
260 registration.last_generation = generation;
261 self.manual_events.push(registration.token);
262 }
263 }
264 return Ok(());
265 }
266
267 for registration in &mut self.registrations {
268 let generation = unsafe { (*registration.generation).load(Ordering::SeqCst) };
269 if generation != registration.last_generation {
270 registration.last_generation = generation;
271 self.manual_events.push(registration.token);
272 }
273 }
274 if !self.manual_events.is_empty() {
275 return Ok(());
276 }
277
278 for registration in &mut self.registrations {
279 if !registration.registered {
280 mio::unix::SourceFd(®istration.eventfd).register(
281 self.poll.registry(),
282 registration.token,
283 mio::Interest::READABLE,
284 )?;
285 registration.poller_count.fetch_add(1, Ordering::Relaxed);
286 registration.registered = true;
287 }
288 }
289
290 self.poll.poll(&mut self.events, timeout)
291 }
292
293 pub fn iter(&self) -> Box<dyn Iterator<Item = Token> + '_> {
295 if !self.manual_events.is_empty() {
296 Box::new(self.manual_events.iter().copied())
297 } else {
298 Box::new(self.events.iter().map(|event| event.token()))
299 }
300 }
301}
302
303impl Drop for TopicPoller {
304 fn drop(&mut self) {
305 for registration in self.registrations.drain(..) {
306 if registration.registered {
307 registration.poller_count.fetch_sub(1, Ordering::Relaxed);
308 }
309 }
310 }
311}
312
313pub struct Topic<T: MorbDataType> {
315 name: String,
316 fifo: Mutex<RingBuffer<T>>,
317 pub(crate) generation: AtomicU32,
318 queue_size: u16,
319 token: mio::Token,
320 eventfd: OwnedFd,
321 poller_count: Arc<AtomicUsize>,
322}
323
324impl<T: MorbDataType> Topic<T> {
325 fn new(name: String, queue_size: u16, topic_id: usize) -> Self {
326 assert!(queue_size > 0, "queue_size must be greater than 0");
327
328 Self {
329 name,
330 fifo: Mutex::new(RingBuffer::new(queue_size as usize)),
331 generation: AtomicU32::new(0),
332 queue_size,
333 token: Token(topic_id),
334 eventfd: unsafe { OwnedFd::from_raw_fd(libc::eventfd(0, libc::EFD_NONBLOCK)) },
335 poller_count: Arc::new(AtomicUsize::new(0)),
336 }
337 }
338
339 fn notify(&self) {
340 if self.poller_count.load(Ordering::Relaxed) == 0 {
341 return;
342 }
343
344 let value = usize::from(self.token) as u64;
345 unsafe {
346 libc::write(
347 self.eventfd.as_raw_fd(),
348 &value as *const u64 as *const libc::c_void,
349 std::mem::size_of::<u64>(),
350 );
351 }
352 }
353
354 pub fn clear_event(&self) {
356 let mut value: u64 = 0;
357 unsafe {
358 libc::read(
359 self.eventfd.as_raw_fd(),
360 &mut value as *mut u64 as *mut libc::c_void,
361 std::mem::size_of::<u64>(),
362 );
363 }
364 }
365
366 pub fn name(&self) -> &str {
368 &self.name
369 }
370
371 pub fn token(&self) -> Token {
373 self.token
374 }
375
376 pub fn create_publisher(self: &Arc<Self>) -> Publisher<T> {
378 Publisher {
379 topic: self.clone(),
380 }
381 }
382
383 pub fn create_subscriber(self: &Arc<Self>) -> Subscriber<T> {
385 Subscriber {
386 topic: self.clone(),
387 sub_generation: 0,
388 }
389 }
390}
391
392#[cfg(test)]
393mod tests;