use std::sync::{atomic::AtomicI64, Arc};
use core_affinity::CoreId;
use crossbeam_utils::CachePadded;
use crate::{Sequence, affinity::cpu_has_core_else_panic, barrier::{Barrier, NONE}, consumer::{event_poller::{EventPoller, Branch}, start_processor, start_processor_with_state}};
use crate::consumer::Consumer;
use crate::cursor::Cursor;
use crate::producer::{single::SingleProducerBarrier, multi::MultiProducerBarrier};
use crate::ringbuffer::RingBuffer;
use crate::wait_strategies::WaitStrategy;
use self::{multi::MPBuilder, single::SPBuilder};
pub mod single;
pub mod multi;
pub struct NC;
pub struct SC;
pub struct MC;
pub fn build_single_producer<E, W, F>(size: usize, event_factory: F, wait_strategy: W)
-> SPBuilder<NC, E, W, SingleProducerBarrier>
where
F: FnMut() -> E,
E: 'static + Send + Sync,
W: 'static + WaitStrategy,
{
let producer_barrier = Arc::new(SingleProducerBarrier::new());
let dependent_barrier = Arc::clone(&producer_barrier);
SPBuilder::new(size, event_factory, wait_strategy, producer_barrier, dependent_barrier)
}
pub fn build_multi_producer<E, W, F>(size: usize, event_factory: F, wait_strategy: W)
-> MPBuilder<NC, E, W, MultiProducerBarrier>
where
F: FnMut() -> E,
E: 'static + Send + Sync,
W: 'static + WaitStrategy,
{
let producer_barrier = Arc::new(MultiProducerBarrier::new(size));
let dependent_barrier = Arc::clone(&producer_barrier);
MPBuilder::new(size, event_factory, wait_strategy, producer_barrier, dependent_barrier)
}
pub trait ProcessorSettings<E, W>: Sized {
#[doc(hidden)]
fn shared(&mut self) -> &mut Shared<E, W>;
fn pin_at_core(mut self, id: usize) -> Self {
self.shared().pin_at_core(id);
self
}
fn thread_name(mut self, name: &'static str) -> Self {
self.shared().thread_named(name);
self
}
}
trait Builder<E, W, B>: ProcessorSettings<E, W>
where
E: 'static + Send + Sync,
B: 'static + Barrier,
W: 'static + WaitStrategy,
{
fn add_event_handler<EH>(&mut self, event_handler: EH)
where
EH: 'static + Send + FnMut(&E, Sequence, bool)
{
let barrier = self.dependent_barrier();
let (cursor, consumer) = start_processor(event_handler, self.shared(), barrier);
self.shared().add_consumer_and_cursor(consumer, cursor);
}
fn build_event_poller(&mut self) -> EventPoller<E, B> {
let cursor = Arc::new(Cursor::new());
self.shared().add_cursor(Arc::clone(&cursor));
EventPoller::new(
Arc::clone(&self.shared().ring_buffer),
Arc::clone(&self.dependent_barrier()),
Arc::clone(&self.shared().shutdown_at_sequence),
cursor,
)
}
fn build_branch(&mut self) -> Branch<E, B> {
let poller = EventPoller::new(
Arc::clone(&self.shared().ring_buffer),
Arc::clone(&self.dependent_barrier()),
Arc::clone(&self.shared().shutdown_at_sequence),
Arc::new(Cursor::new()));
Branch::new(poller)
}
fn add_cursor_from_branch<B2>(&mut self, branch: &Branch<E, B2>) {
self.shared().add_cursor(branch.cursor_for_poller());
}
fn add_event_handler_with_state<EH, S, IS>(&mut self, event_handler: EH, initialize_state: IS)
where
EH: 'static + Send + FnMut(&mut S, &E, Sequence, bool),
IS: 'static + Send + FnOnce() -> S,
{
let barrier = self.dependent_barrier();
let (cursor, consumer) = start_processor_with_state(event_handler, self.shared(), barrier, initialize_state);
self.shared().add_consumer_and_cursor(consumer, cursor);
}
fn dependent_barrier(&self) -> Arc<B>;
}
#[doc(hidden)]
pub struct Shared<E, W> {
pub(crate) shutdown_at_sequence: Arc<CachePadded<AtomicI64>>,
pub(crate) ring_buffer: Arc<RingBuffer<E>>,
pub(crate) consumers: Vec<Consumer>,
current_consumer_cursors: Option<Vec<Arc<Cursor>>>,
pub(crate) wait_strategy: W,
pub(crate) thread_context: ThreadContext,
}
impl <E, W> Shared<E, W> {
fn new<F>(size: usize, event_factory: F, wait_strategy: W) -> Self
where
F: FnMut() -> E
{
let ring_buffer = Arc::new(RingBuffer::new(size, event_factory));
let shutdown_at_sequence = Arc::new(CachePadded::new(AtomicI64::new(NONE)));
let current_consumer_cursors = Some(vec![]);
Self {
ring_buffer,
wait_strategy,
shutdown_at_sequence,
current_consumer_cursors,
consumers: vec![],
thread_context: ThreadContext::default(),
}
}
fn add_consumer_and_cursor(&mut self, consumer: Consumer, cursor: Arc<Cursor>) {
self.consumers.push(consumer);
self.add_cursor(cursor);
}
fn add_cursor(&mut self, cursor: Arc<Cursor>) {
self.current_consumer_cursors.as_mut().unwrap().push(cursor);
}
fn pin_at_core(&mut self, id: usize) {
cpu_has_core_else_panic(id);
self.thread_context.affinity = Some(CoreId { id } );
}
fn thread_named(&mut self, name: &'static str) {
self.thread_context.name = Some(name.to_owned());
}
}
#[derive(Default)]
pub(crate) struct ThreadContext {
affinity: Option<CoreId>,
name: Option<String>,
id: usize,
}
impl ThreadContext {
pub(crate) fn name(&mut self) -> String {
self.name.take().or_else(|| {
self.id += 1;
Some(format!("processor-{}", self.id))
}).unwrap()
}
pub(crate) fn affinity(&mut self) -> Option<CoreId> {
self.affinity.take()
}
}