eventador 0.0.16

lock-free pub/sub event-bus with sync and async APIs
Documentation
use crate::event::EventEnvelope;
use crate::sequence::sequencer::Sequencer;
use crate::WaitStrategy;
use crossbeam::utils::CachePadded;
use std::sync::Arc;

pub(crate) type EventWrapper = CachePadded<Arc<EventEnvelope>>;

pub struct RingBuffer {
    capacity: u64,
    buffer: Vec<EventWrapper>,
    sequencer: Sequencer,
    wait_strategy: WaitStrategy,
}

impl RingBuffer {
    pub fn new(capacity: u64, wait_strategy: WaitStrategy) -> anyhow::Result<Self> {
        if capacity > 1 && capacity.is_power_of_two() {
            let sequencer = Sequencer::new(capacity, wait_strategy);

            let ucapacity = capacity as usize;
            let mut buffer = Vec::with_capacity(ucapacity);

            for i in 0..ucapacity {
                buffer.insert(i, CachePadded::new(Arc::new(EventEnvelope::new())))
            }

            Ok(Self {
                capacity,
                buffer,
                sequencer,
                wait_strategy,
            })
        } else {
            Err(anyhow::Error::msg("expected capacity as a power of two"))
        }
    }

    pub(crate) fn sequencer(&self) -> &Sequencer {
        &self.sequencer
    }

    pub(crate) fn wait_strategy(&self) -> WaitStrategy {
        self.wait_strategy
    }

    pub(crate) fn next(&self) -> u64 {
        self.sequencer.next()
    }

    #[cfg(feature = "async")]
    pub(crate) async fn async_next(&self) -> u64 {
        self.sequencer.async_next().await
    }

    pub(crate) fn idx_from_sequence(&self, sequence: u64) -> usize {
        (sequence & (self.capacity - 1)) as usize
    }

    pub(crate) fn get_envelope(&self, sequence: u64) -> Option<EventWrapper> {
        let idx = self.idx_from_sequence(sequence);

        if let Some(envelope) = self.buffer.get(idx).clone() {
            Some(envelope.clone())
        } else {
            None
        }
    }
}

impl Default for RingBuffer {
    fn default() -> Self {
        Self::new(256, WaitStrategy::AllSubscribers).unwrap()
    }
}

#[cfg(test)]
mod tests {
    use crate::ring_buffer::RingBuffer;
    use crate::WaitStrategy;

    #[test]
    fn error_if_not_power_of_two() {
        assert!(RingBuffer::new(3, WaitStrategy::AllSubscribers).is_err());
    }

    #[test]
    fn success_if_power_of_two() {
        assert!(RingBuffer::new(16, WaitStrategy::AllSubscribers).is_ok());
    }
}