morb 0.1.0

A lightweight in-process publish/subscribe library for Rust
Documentation
use std::collections::HashMap;
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, LazyLock, Mutex, RwLock};
use std::time::Duration;

use mio::{Events, Poll, Token};
use mio::event::Source;

pub trait MorbDataType: Send + Sync + 'static + Clone {}
impl<T> MorbDataType for T where T: Send + Sync + 'static + Clone {}

pub struct TopicManager {
    topics: HashMap<String, Box<dyn std::any::Any + Send + Sync>>,
    topic_num: usize,
}

static TOPIC_MANAGER: LazyLock<Arc<RwLock<TopicManager>>> =
    LazyLock::new(|| Arc::new(RwLock::new(TopicManager::new())));

pub fn create_topic<T: MorbDataType>(name: String, queue_size: u16) -> Result<Arc<Topic<T>>,std::io::Error> {
    TOPIC_MANAGER
        .write()
        .unwrap()
        .create_topic(name, queue_size)
}

pub fn get_topic<T: MorbDataType>(name: &str) -> Option<Arc<Topic<T>>> {
    TOPIC_MANAGER.read().unwrap().get_topic(name)
}

impl TopicManager {
    pub fn new() -> Self {
        Self {
            topics: HashMap::new(),
            topic_num: 0,
        }
    }

    pub fn create_topic<T: MorbDataType>(
        &mut self,
        name: String,
        queue_size: u16,
    ) -> Result<Arc<Topic<T>>,std::io::Error> {
        if self.topics.contains_key(&name) {
            return Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, "Topic already exists"));
        }
        self.topic_num += 1;
        let topic = Arc::new(Topic::new(name.clone(), queue_size, self.topic_num));
        self.topics.insert(name, Box::new(topic.clone()));
        Ok(topic)
    }

    pub fn get_topic<T: MorbDataType>(&self, name: &str) -> Option<Arc<Topic<T>>> {
        self.topics
            .get(name)
            .and_then(|boxed| boxed.downcast_ref::<Arc<Topic<T>>>().cloned())
    }
}

impl Default for TopicManager {
    fn default() -> Self {
        Self::new()
    }
}

pub struct Publisher<T: MorbDataType> {
    topic: Arc<Topic<T>>,
}

impl<T: MorbDataType> Publisher<T> {
    pub fn publish(&self, data: T) {
        {
            let mut fifo = self.topic.fifo.lock().unwrap();
            let index = self.topic.generation.load(Ordering::Acquire) as usize % (self.topic.queue_size as usize);
            fifo[index] = Some(data);
            self.topic.generation.fetch_add(1, Ordering::AcqRel);  
        }
        self.topic.notify();
    }
}

pub struct Subscriber<T: MorbDataType> {
    topic: Arc<Topic<T>>,
    sub_generation: u32,
}

impl<T: MorbDataType> Subscriber<T> {
    pub fn check_update(&self) -> bool {
        if self.sub_generation != self.topic.generation.load(Ordering::SeqCst) {
            return true;
        }
        false
    }

    pub fn check_update_and_copy(&mut self) -> Option<T> {
        let topic_generation = self.topic.generation.load(Ordering::SeqCst);
        if self.sub_generation == topic_generation {
            return None;
        }
        if self.sub_generation < topic_generation.saturating_sub(self.topic.queue_size as u32) {
            self.sub_generation = topic_generation.saturating_sub(self.topic.queue_size as u32);
        }
        let index = (self.sub_generation as usize) % (self.topic.queue_size as usize);
        self.sub_generation += 1;
        self.topic.fifo.lock().unwrap()[index].clone()
    }
}

pub struct TopicPoller {
    poll: Poll,
    events: Events,
}

impl Default for TopicPoller {
    fn default() -> Self {
        Self::new()
    }
}

impl TopicPoller {
    pub fn new() -> Self {
        Self {
            poll: Poll::new().unwrap(),
            events: Events::with_capacity(1024),
        }
    }

    pub fn add_topic<T: MorbDataType>(&mut self, topic: &Topic<T>) -> std::io::Result<()> {
        mio::unix::SourceFd(&topic.eventfd.as_raw_fd()).register(
            self.poll.registry(),
            topic.token,
            mio::Interest::READABLE,
        )
    }

    pub fn remove_topic<T: MorbDataType>(&mut self, topic: &Topic<T>) -> std::io::Result<()> {
        mio::unix::SourceFd(&topic.eventfd.as_raw_fd()).deregister(
            self.poll.registry(),
        )
    }

    pub fn wait(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
        self.poll.poll(&mut self.events, timeout)
    }

    pub fn iter(&self) -> impl Iterator<Item = Token> + '_ {
        self.events.iter().map(|event| event.token())
    }
}

pub struct Topic<T: MorbDataType> {
    name: String,
    fifo: Mutex<Vec<Option<T>>>,
    pub(crate) generation: AtomicU32,
    queue_size: u16,
    token: mio::Token,
    eventfd: OwnedFd,
}

impl<T: MorbDataType> Topic<T> {
    fn new(name: String, queue_size: u16, topic_id: usize) -> Self {
        assert!(queue_size > 0, "queue_size must be greater than 0");

        Self {
            name,
            fifo: Mutex::new(vec![None; queue_size as usize]),
            generation: AtomicU32::new(0),
            queue_size,
            token: Token(topic_id),
            eventfd: unsafe { OwnedFd::from_raw_fd(libc::eventfd(0, libc::EFD_NONBLOCK)) },
        }
    }

    fn notify(&self) {
        let value = usize::from(self.token) as u64;
        unsafe {
            libc::write(
                self.eventfd.as_raw_fd(),
                &value as *const u64 as *const libc::c_void,
                std::mem::size_of::<u64>(),
            );
        }
    }

    pub fn clear_event(&self) {
        let mut value: u64 = 0;
        unsafe {
            libc::read(
                self.eventfd.as_raw_fd(),
                &mut value as *mut u64 as *mut libc::c_void,
                std::mem::size_of::<u64>(),
            );
        }
    }

    pub fn name(&self) -> &str {
        &self.name
    }

    pub fn token(&self) -> Token {
        self.token
    }

    pub fn create_publisher(self: &Arc<Self>) -> Publisher<T> {
        Publisher {
            topic: self.clone(),
        }
    }

    pub fn create_subscriber(self: &Arc<Self>) -> Subscriber<T> {
        Subscriber {
            topic: self.clone(),
            sub_generation: 0,
        }
    }
}

#[cfg(test)]
mod tests;