use std::sync::atomic::{fence, Ordering};
use crate::{barrier::Barrier, cursor::Cursor, producer::ProducerBarrier};
use crossbeam_utils::CachePadded;
use crate::{consumer::Consumer, ringbuffer::RingBuffer, Sequence};
use std::sync::{Arc, atomic::AtomicI64};
use super::*;
pub struct SingleProducer<E, C> {
shutdown_at_sequence: Arc<CachePadded<AtomicI64>>,
ring_buffer: Arc<RingBuffer<E>>,
producer_barrier: Arc<SingleProducerBarrier>,
consumers: Vec<Consumer>,
consumer_barrier: C,
sequence: Sequence,
sequence_clear_of_consumers: Sequence,
}
impl<E, C> Producer<E> for SingleProducer<E, C>
where
C: Barrier
{
#[inline]
fn try_publish<F>(&mut self, update: F) -> Result<Sequence, RingBufferFull>
where
F: FnOnce(&mut E)
{
self.next_sequences(1).map_err(|_| RingBufferFull)?;
let sequence = self.apply_update(update);
Ok(sequence)
}
#[inline]
fn publish<F>(&mut self, update: F)
where
F: FnOnce(&mut E)
{
while self.next_sequences(1).is_err() { }
self.apply_update(update);
}
#[inline]
fn try_batch_publish<'a, F>(&'a mut self, n: usize, update: F) -> Result<Sequence, MissingFreeSlots>
where
E: 'a,
F: FnOnce(MutBatchIter<'a, E>)
{
self.next_sequences(n)?;
let sequence = self.apply_updates(n, update);
Ok(sequence)
}
#[inline]
fn batch_publish<'a, F>(&'a mut self, n: usize, update: F)
where
E: 'a,
F: FnOnce(MutBatchIter<'a, E>)
{
while self.next_sequences(n).is_err() { }
self.apply_updates(n, update);
}
}
impl<E, C> SingleProducer<E, C>
where
C: Barrier
{
pub(crate) fn new(
shutdown_at_sequence: Arc<CachePadded<AtomicI64>>,
ring_buffer: Arc<RingBuffer<E>>,
producer_barrier: Arc<SingleProducerBarrier>,
consumers: Vec<Consumer>,
consumer_barrier: C,
) -> Self
{
let sequence_clear_of_consumers = ring_buffer.size() - 1;
Self {
shutdown_at_sequence,
ring_buffer,
producer_barrier,
consumers,
consumer_barrier,
sequence: 0,
sequence_clear_of_consumers,
}
}
#[inline]
fn next_sequences(&mut self, n: usize) -> Result<Sequence, MissingFreeSlots> {
let n = n as i64;
let n_next = self.sequence - 1 + n;
if self.sequence_clear_of_consumers < n_next {
let last_published = self.sequence - 1;
let rear_sequence_read = self.consumer_barrier.get_after(last_published);
let free_slots = self.ring_buffer.free_slots(last_published, rear_sequence_read);
if free_slots < n {
return Err(MissingFreeSlots((n - free_slots) as u64));
}
fence(Ordering::Acquire);
self.sequence_clear_of_consumers = last_published + free_slots;
}
Ok(n_next)
}
#[inline]
fn apply_update<F>(&mut self, update: F) -> Sequence
where
F: FnOnce(&mut E)
{
let sequence = self.sequence;
let event_ptr = self.ring_buffer.get(sequence);
let event = unsafe { &mut *event_ptr };
update(event);
self.producer_barrier.publish(sequence);
self.sequence += 1;
sequence
}
#[inline]
fn apply_updates<'a, F>(&'a mut self, n: usize, updates: F) -> Sequence
where
E: 'a,
F: FnOnce(MutBatchIter<'a, E>)
{
let n = n as i64;
let lower = self.sequence;
let upper = lower + n - 1;
let iter = MutBatchIter::new(lower, upper, &self.ring_buffer);
updates(iter);
self.producer_barrier.publish(upper);
self.sequence += n;
upper
}
}
impl<E, C> Drop for SingleProducer<E, C> {
fn drop(&mut self) {
self.shutdown_at_sequence.store(self.sequence, Ordering::Relaxed);
self.consumers.iter_mut().for_each(|c| c.join());
}
}
pub struct SingleProducerBarrier {
cursor: Cursor
}
impl SingleProducerBarrier {
pub(crate) fn new() -> Self {
Self {
cursor: Cursor::new()
}
}
}
impl Barrier for SingleProducerBarrier {
#[inline]
fn get_after(&self, _prev: Sequence) -> Sequence {
self.cursor.relaxed_value()
}
}
impl ProducerBarrier for SingleProducerBarrier {
#[inline]
fn publish(&self, sequence: Sequence) {
self.cursor.store(sequence);
}
}