use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
pub use self::thread::Thread;
pub use self::generic::{Allocator, AllocatorBuilder};
pub mod thread;
pub mod process;
pub mod generic;
pub mod canary;
pub mod counters;
pub mod zero_copy;
use crate::{Bytesable, Push, Pull};
use crate::allocator::process::{Process as TypedProcess, ProcessBuilder as TypedProcessBuilder};
use crate::allocator::zero_copy::allocator_process::{ProcessAllocator as BytesProcess, ProcessBuilder as BytesProcessBuilder};
pub(crate) trait AllocateBuilder : Send {
type Allocator: Allocate;
fn build(self) -> Self::Allocator;
}
use std::any::Any;
pub trait Exchangeable : Send+Any+Bytesable { }
impl<T: Send+Any+Bytesable> Exchangeable for T { }
pub(crate) trait Allocate {
fn index(&self) -> usize;
fn peers(&self) -> usize;
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
fn await_events(&self, _duration: Option<Duration>) { }
fn receive(&mut self) { }
fn release(&mut self) { }
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
let (pushers, pull) = self.allocate(identifier);
(Box::new(Broadcaster { spare: None, pushers }), pull)
}
}
struct Broadcaster<T> {
spare: Option<T>,
pushers: Vec<Box<dyn Push<T>>>,
}
impl<T: Clone> Push<T> for Broadcaster<T> {
fn push(&mut self, element: &mut Option<T>) {
for pusher in self.pushers.iter_mut().skip(1) {
self.spare.clone_from(element);
pusher.push(&mut self.spare);
}
for pusher in self.pushers.iter_mut().take(1) {
pusher.push(element);
}
}
}
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
pub(crate) trait PeerBuilder {
type Peer: AllocateBuilder + Sized;
fn new_vector(peers: usize, refill: BytesRefill) -> Vec<Self::Peer>;
}
#[non_exhaustive]
pub enum ProcessBuilder {
Typed(TypedProcessBuilder),
Bytes(BytesProcessBuilder),
}
impl ProcessBuilder {
pub fn build(self) -> Process {
match self {
ProcessBuilder::Typed(t) => Process::Typed(t.build()),
ProcessBuilder::Bytes(b) => Process::Bytes(b.build()),
}
}
pub fn new_typed_vector(peers: usize, refill: BytesRefill) -> Vec<Self> {
<TypedProcess as PeerBuilder>::new_vector(peers, refill)
.into_iter()
.map(ProcessBuilder::Typed)
.collect()
}
pub fn new_bytes_vector(peers: usize, refill: BytesRefill) -> Vec<Self> {
<BytesProcessBuilder as PeerBuilder>::new_vector(peers, refill)
.into_iter()
.map(ProcessBuilder::Bytes)
.collect()
}
}
#[non_exhaustive]
pub enum Process {
Typed(TypedProcess),
Bytes(BytesProcess),
}
impl Process {
pub(crate) fn index(&self) -> usize {
match self {
Process::Typed(p) => p.index(),
Process::Bytes(pb) => pb.index(),
}
}
pub(crate) fn peers(&self) -> usize {
match self {
Process::Typed(p) => p.peers(),
Process::Bytes(pb) => pb.peers(),
}
}
pub(crate) fn allocate<T: Exchangeable>(&mut self, identifier: usize)
-> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>)
{
match self {
Process::Typed(p) => p.allocate(identifier),
Process::Bytes(pb) => pb.allocate(identifier),
}
}
pub(crate) fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize)
-> (Box<dyn Push<T>>, Box<dyn Pull<T>>)
{
match self {
Process::Typed(p) => p.broadcast(identifier),
Process::Bytes(pb) => pb.broadcast(identifier),
}
}
pub(crate) fn receive(&mut self) {
match self {
Process::Typed(p) => p.receive(),
Process::Bytes(pb) => pb.receive(),
}
}
pub(crate) fn release(&mut self) {
match self {
Process::Typed(p) => p.release(),
Process::Bytes(pb) => pb.release(),
}
}
pub(crate) fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
match self {
Process::Typed(p) => p.events(),
Process::Bytes(pb) => pb.events(),
}
}
pub(crate) fn await_events(&self, duration: Option<std::time::Duration>) {
match self {
Process::Typed(p) => p.await_events(duration),
Process::Bytes(pb) => pb.await_events(duration),
}
}
}