use std::{sync::{atomic::{fence, AtomicI64, Ordering}, Arc}};
use crossbeam_utils::CachePadded;
use thiserror::Error;
use crate::{cursor::Cursor, barrier::Barrier, ringbuffer::RingBuffer, Sequence};
pub struct EventPoller<E, B> {
ring_buffer: Arc<RingBuffer<E>>,
dependent_barrier: Arc<B>,
shutdown_at_sequence: Arc<CachePadded<AtomicI64>>,
cursor: Arc<Cursor>,
}
pub struct Branch<E, B> {
poller: EventPoller<E, B>,
}
impl<E, B> Branch<E, B> {
pub(crate) fn new(poller: EventPoller<E, B>) -> Self {
Self { poller }
}
pub(crate) fn cursor_for_poller(&self) -> Arc<Cursor> {
Arc::clone(&self.poller.cursor)
}
pub(crate) fn into_poller(self) -> EventPoller<E, B> {
self.poller
}
}
#[derive(Debug, Error, PartialEq)]
pub enum Polling {
#[error("No available events.")]
NoEvents,
#[error("Disruptor is shut down.")]
Shutdown,
}
pub struct EventGuard<'p, E, B> {
parent: &'p mut EventPoller<E, B>,
sequence: Sequence,
available: Sequence,
}
impl<'g, E, B> Iterator for &'g mut EventGuard<'_, E, B> {
type Item = &'g E;
fn next(&mut self) -> Option<Self::Item> {
if self.sequence > self.available {
return None;
}
let event_ptr = self.parent.ring_buffer.get(self.sequence);
let event = unsafe { &*event_ptr };
self.sequence += 1;
Some(event)
}
}
impl<E, B> ExactSizeIterator for &mut EventGuard<'_, E, B> {
fn len(&self) -> usize {
(self.available - self.sequence + 1) as usize
}
}
impl<E, B> Drop for EventGuard<'_, E, B> {
fn drop(&mut self) {
self.parent.cursor.store(self.available);
}
}
impl<E, B> EventPoller<E, B>
where
B: Barrier,
{
pub(crate) fn new(
ring_buffer: Arc<RingBuffer<E>>,
dependent_barrier: Arc<B>,
shutdown_at_sequence: Arc<CachePadded<AtomicI64>>,
cursor: Arc<Cursor>,
) -> Self {
Self {
ring_buffer,
dependent_barrier,
shutdown_at_sequence,
cursor,
}
}
pub fn poll(&mut self) -> Result<EventGuard<'_, E, B>, Polling> {
self.take(u64::MAX)
}
#[inline]
pub fn has_available(&self) -> bool {
let sequence = self.cursor.relaxed_value() + 1;
self.dependent_barrier.get_after(sequence) >= sequence
}
pub fn take(&mut self, limit: u64) -> Result<EventGuard<'_, E, B>, Polling> {
let cursor_at = self.cursor.relaxed_value();
let sequence = cursor_at + 1;
if sequence == self.shutdown_at_sequence.load(Ordering::Relaxed) {
return Err(Polling::Shutdown);
}
let available = self.dependent_barrier.get_after(sequence);
if available < sequence {
return Err(Polling::NoEvents);
}
fence(Ordering::Acquire);
let max_sequence = (cursor_at).saturating_add_unsigned(limit);
let available = std::cmp::min(available, max_sequence);
Ok(EventGuard {
parent: self,
sequence,
available,
})
}
}