Skip to main content

morb/
lib.rs

1//! `morb` is a lightweight in-process publish/subscribe library for Rust.
2//!
3//! It provides named topics, fixed-size message retention via a ring buffer,
4//! and poll-based notifications built on `mio` and `eventfd`.
5
6use std::collections::HashMap;
7use std::mem::MaybeUninit;
8use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
9use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
10use std::sync::{Arc, LazyLock, Mutex, RwLock};
11use std::time::Duration;
12
13use mio::event::Source;
14use mio::{Events, Poll, Token};
15
16pub trait MorbDataType: Send + Sync + 'static + Clone {}
17impl<T> MorbDataType for T where T: Send + Sync + 'static + Clone {}
18
19struct RingBuffer<T> {
20    slots: Box<[MaybeUninit<T>]>,
21    initialized: Box<[bool]>,
22}
23
24impl<T> RingBuffer<T> {
25    fn new(size: usize) -> Self {
26        let mut slots = Vec::with_capacity(size);
27        slots.resize_with(size, MaybeUninit::uninit);
28
29        Self {
30            slots: slots.into_boxed_slice(),
31            initialized: vec![false; size].into_boxed_slice(),
32        }
33    }
34
35    fn write(&mut self, index: usize, value: T) {
36        if self.initialized[index] {
37            unsafe {
38                self.slots[index].assume_init_drop();
39            }
40        }
41
42        self.slots[index].write(value);
43        self.initialized[index] = true;
44    }
45
46    fn read_cloned(&self, index: usize) -> Option<T>
47    where
48        T: Clone,
49    {
50        if !self.initialized[index] {
51            return None;
52        }
53
54        Some(unsafe { self.slots[index].assume_init_ref().clone() })
55    }
56}
57
58impl<T> Drop for RingBuffer<T> {
59    fn drop(&mut self) {
60        for (index, initialized) in self.initialized.iter().copied().enumerate() {
61            if initialized {
62                unsafe {
63                    self.slots[index].assume_init_drop();
64                }
65            }
66        }
67    }
68}
69
70/// Stores all globally registered topics.
71pub struct TopicManager {
72    topics: HashMap<String, Box<dyn std::any::Any + Send + Sync>>,
73    topic_num: usize,
74}
75
76static TOPIC_MANAGER: LazyLock<Arc<RwLock<TopicManager>>> =
77    LazyLock::new(|| Arc::new(RwLock::new(TopicManager::new())));
78
79/// Creates a new topic with the given name and queue size.
80///
81/// Returns an error if a topic with the same name already exists.
82pub fn create_topic<T: MorbDataType>(
83    name: String,
84    queue_size: u16,
85) -> Result<Arc<Topic<T>>, std::io::Error> {
86    TOPIC_MANAGER
87        .write()
88        .unwrap()
89        .create_topic(name, queue_size)
90}
91
92/// Returns a previously created topic by name and type.
93pub fn get_topic<T: MorbDataType>(name: &str) -> Option<Arc<Topic<T>>> {
94    TOPIC_MANAGER.read().unwrap().get_topic(name)
95}
96
97impl TopicManager {
98    /// Creates an empty topic manager.
99    pub fn new() -> Self {
100        Self {
101            topics: HashMap::new(),
102            topic_num: 0,
103        }
104    }
105
106    pub fn create_topic<T: MorbDataType>(
107        &mut self,
108        name: String,
109        queue_size: u16,
110    ) -> Result<Arc<Topic<T>>, std::io::Error> {
111        if self.topics.contains_key(&name) {
112            return Err(std::io::Error::new(
113                std::io::ErrorKind::AlreadyExists,
114                "Topic already exists",
115            ));
116        }
117        self.topic_num += 1;
118        let topic = Arc::new(Topic::new(name.clone(), queue_size, self.topic_num));
119        self.topics.insert(name, Box::new(topic.clone()));
120        Ok(topic)
121    }
122
123    pub fn get_topic<T: MorbDataType>(&self, name: &str) -> Option<Arc<Topic<T>>> {
124        self.topics
125            .get(name)
126            .and_then(|boxed| boxed.downcast_ref::<Arc<Topic<T>>>().cloned())
127    }
128}
129
130impl Default for TopicManager {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136pub struct Publisher<T: MorbDataType> {
137    topic: Arc<Topic<T>>,
138}
139
140impl<T: MorbDataType> Publisher<T> {
141    /// Publishes a value to the topic.
142    pub fn publish(&self, data: T) {
143        {
144            let mut fifo = self.topic.fifo.lock().unwrap();
145            let index = self.topic.generation.load(Ordering::Acquire) as usize
146                % (self.topic.queue_size as usize);
147            fifo.write(index, data);
148            self.topic.generation.fetch_add(1, Ordering::AcqRel);
149        }
150        self.topic.notify();
151    }
152}
153
154/// Reads messages from a topic in publish order.
155pub struct Subscriber<T: MorbDataType> {
156    topic: Arc<Topic<T>>,
157    sub_generation: u32,
158}
159
160impl<T: MorbDataType> Subscriber<T> {
161    /// Returns `true` if the subscriber has unread data.
162    pub fn check_update(&self) -> bool {
163        if self.sub_generation != self.topic.generation.load(Ordering::SeqCst) {
164            return true;
165        }
166        false
167    }
168
169    /// Returns the next unread value, if any.
170    ///
171    /// If the subscriber has fallen behind the ring buffer, only retained
172    /// messages are returned.
173    pub fn check_update_and_copy(&mut self) -> Option<T> {
174        let topic_generation = self.topic.generation.load(Ordering::SeqCst);
175        if self.sub_generation == topic_generation {
176            return None;
177        }
178        if self.sub_generation < topic_generation.saturating_sub(self.topic.queue_size as u32) {
179            self.sub_generation = topic_generation.saturating_sub(self.topic.queue_size as u32);
180        }
181        let index = (self.sub_generation as usize) % (self.topic.queue_size as usize);
182        self.sub_generation += 1;
183        self.topic.fifo.lock().unwrap().read_cloned(index)
184    }
185}
186
187/// Waits for updates from one or more topics.
188pub struct TopicPoller {
189    poll: Poll,
190    events: Events,
191    manual_events: Vec<Token>,
192    registrations: Vec<TopicPollerRegistration>,
193}
194
195struct TopicPollerRegistration {
196    eventfd: RawFd,
197    token: Token,
198    generation: *const AtomicU32,
199    last_generation: u32,
200    poller_count: Arc<AtomicUsize>,
201    registered: bool,
202}
203
204impl Default for TopicPoller {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210impl TopicPoller {
211    /// Creates a new topic poller.
212    pub fn new() -> Self {
213        Self {
214            poll: Poll::new().unwrap(),
215            events: Events::with_capacity(1024),
216            manual_events: Vec::new(),
217            registrations: Vec::new(),
218        }
219    }
220
221    /// Registers a topic with the poller.
222    pub fn add_topic<T: MorbDataType>(&mut self, topic: &Topic<T>) -> std::io::Result<()> {
223        self.registrations.push(TopicPollerRegistration {
224            eventfd: topic.eventfd.as_raw_fd(),
225            token: topic.token,
226            generation: &topic.generation,
227            last_generation: topic.generation.load(Ordering::SeqCst),
228            poller_count: Arc::clone(&topic.poller_count),
229            registered: false,
230        });
231
232        Ok(())
233    }
234
235    /// Removes a topic from the poller.
236    pub fn remove_topic<T: MorbDataType>(&mut self, topic: &Topic<T>) -> std::io::Result<()> {
237        if let Some(index) = self
238            .registrations
239            .iter()
240            .position(|registration| registration.eventfd == topic.eventfd.as_raw_fd())
241        {
242            let registration = self.registrations.swap_remove(index);
243            if registration.registered {
244                mio::unix::SourceFd(&topic.eventfd.as_raw_fd()).deregister(self.poll.registry())?;
245                registration.poller_count.fetch_sub(1, Ordering::Relaxed);
246            }
247        }
248
249        Ok(())
250    }
251
252    /// Waits until at least one registered topic becomes readable or the timeout expires.
253    pub fn wait(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
254        self.manual_events.clear();
255
256        if timeout == Some(Duration::ZERO) {
257            for registration in &mut self.registrations {
258                let generation = unsafe { (*registration.generation).load(Ordering::SeqCst) };
259                if generation != registration.last_generation {
260                    registration.last_generation = generation;
261                    self.manual_events.push(registration.token);
262                }
263            }
264            return Ok(());
265        }
266
267        for registration in &mut self.registrations {
268            let generation = unsafe { (*registration.generation).load(Ordering::SeqCst) };
269            if generation != registration.last_generation {
270                registration.last_generation = generation;
271                self.manual_events.push(registration.token);
272            }
273        }
274        if !self.manual_events.is_empty() {
275            return Ok(());
276        }
277
278        for registration in &mut self.registrations {
279            if !registration.registered {
280                mio::unix::SourceFd(&registration.eventfd).register(
281                    self.poll.registry(),
282                    registration.token,
283                    mio::Interest::READABLE,
284                )?;
285                registration.poller_count.fetch_add(1, Ordering::Relaxed);
286                registration.registered = true;
287            }
288        }
289
290        self.poll.poll(&mut self.events, timeout)
291    }
292
293    /// Iterates over ready topic tokens from the last `wait` call.
294    pub fn iter(&self) -> Box<dyn Iterator<Item = Token> + '_> {
295        if !self.manual_events.is_empty() {
296            Box::new(self.manual_events.iter().copied())
297        } else {
298            Box::new(self.events.iter().map(|event| event.token()))
299        }
300    }
301}
302
303impl Drop for TopicPoller {
304    fn drop(&mut self) {
305        for registration in self.registrations.drain(..) {
306            if registration.registered {
307                registration.poller_count.fetch_sub(1, Ordering::Relaxed);
308            }
309        }
310    }
311}
312
313/// A named message channel with fixed-size retention and poll notifications.
314pub struct Topic<T: MorbDataType> {
315    name: String,
316    fifo: Mutex<RingBuffer<T>>,
317    pub(crate) generation: AtomicU32,
318    queue_size: u16,
319    token: mio::Token,
320    eventfd: OwnedFd,
321    poller_count: Arc<AtomicUsize>,
322}
323
324impl<T: MorbDataType> Topic<T> {
325    fn new(name: String, queue_size: u16, topic_id: usize) -> Self {
326        assert!(queue_size > 0, "queue_size must be greater than 0");
327
328        Self {
329            name,
330            fifo: Mutex::new(RingBuffer::new(queue_size as usize)),
331            generation: AtomicU32::new(0),
332            queue_size,
333            token: Token(topic_id),
334            eventfd: unsafe { OwnedFd::from_raw_fd(libc::eventfd(0, libc::EFD_NONBLOCK)) },
335            poller_count: Arc::new(AtomicUsize::new(0)),
336        }
337    }
338
339    fn notify(&self) {
340        if self.poller_count.load(Ordering::Relaxed) == 0 {
341            return;
342        }
343
344        let value = usize::from(self.token) as u64;
345        unsafe {
346            libc::write(
347                self.eventfd.as_raw_fd(),
348                &value as *const u64 as *const libc::c_void,
349                std::mem::size_of::<u64>(),
350            );
351        }
352    }
353
354    /// Clears the pending poll notification for this topic.
355    pub fn clear_event(&self) {
356        let mut value: u64 = 0;
357        unsafe {
358            libc::read(
359                self.eventfd.as_raw_fd(),
360                &mut value as *mut u64 as *mut libc::c_void,
361                std::mem::size_of::<u64>(),
362            );
363        }
364    }
365
366    /// Returns the topic name.
367    pub fn name(&self) -> &str {
368        &self.name
369    }
370
371    /// Returns the poll token associated with this topic.
372    pub fn token(&self) -> Token {
373        self.token
374    }
375
376    /// Creates a publisher for this topic.
377    pub fn create_publisher(self: &Arc<Self>) -> Publisher<T> {
378        Publisher {
379            topic: self.clone(),
380        }
381    }
382
383    /// Creates a subscriber for this topic.
384    pub fn create_subscriber(self: &Arc<Self>) -> Subscriber<T> {
385        Subscriber {
386            topic: self.clone(),
387            sub_generation: 0,
388        }
389    }
390}
391
392#[cfg(test)]
393mod tests;