use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use tokio::runtime::Handle;
use crate::concurrency::{ChannelMode, ConcurrentRing, ReportChannel};
use crate::error::{CaducusError, CaducusErrorKind};
use crate::reclaimer;
pub struct SpscBuilder<T: Send + 'static> {
capacity: usize,
ttl: Duration,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
runtime: Option<Handle>,
}
impl<T: Send + 'static> SpscBuilder<T> {
pub fn new(capacity: usize, ttl: Duration) -> Self {
Self {
capacity,
ttl,
expiry_channel: None,
shutdown_channel: None,
runtime: None,
}
}
pub fn expiry_channel(mut self, ch: Arc<dyn ReportChannel<T>>) -> Self {
self.expiry_channel = Some(ch);
self
}
pub fn shutdown_channel(mut self, ch: Arc<dyn ReportChannel<T>>) -> Self {
self.shutdown_channel = Some(ch);
self
}
pub fn runtime(mut self, handle: Handle) -> Self {
self.runtime = Some(handle);
self
}
pub fn build(self) -> Result<(SpscSender<T>, super::receiver::Receiver<T>), CaducusError> {
let handle = resolve_runtime(self.runtime)?;
let ring = ConcurrentRing::new(
self.capacity,
self.ttl,
ChannelMode::Spsc,
self.expiry_channel,
self.shutdown_channel,
)?;
let ring = Arc::new(ring);
let notify_reclaimer = ring.notify_reclaimer_handle();
let notify_receiver = ring.notify_receiver_handle();
reclaimer::spawn_reclaimer(
Arc::downgrade(&ring),
notify_reclaimer,
notify_receiver,
&handle,
);
Ok((
SpscSender {
ring: Arc::clone(&ring),
},
super::receiver::Receiver::new(ring),
))
}
}
pub struct MpscBuilder<T: Send + 'static> {
capacity: usize,
ttl: Duration,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
runtime: Option<Handle>,
}
impl<T: Send + 'static> MpscBuilder<T> {
pub fn new(capacity: usize, ttl: Duration) -> Self {
Self {
capacity,
ttl,
expiry_channel: None,
shutdown_channel: None,
runtime: None,
}
}
pub fn expiry_channel(mut self, ch: Arc<dyn ReportChannel<T>>) -> Self {
self.expiry_channel = Some(ch);
self
}
pub fn shutdown_channel(mut self, ch: Arc<dyn ReportChannel<T>>) -> Self {
self.shutdown_channel = Some(ch);
self
}
pub fn runtime(mut self, handle: Handle) -> Self {
self.runtime = Some(handle);
self
}
pub fn build(self) -> Result<(MpscSender<T>, super::receiver::Receiver<T>), CaducusError> {
let handle = resolve_runtime(self.runtime)?;
let ring = ConcurrentRing::new(self.capacity, self.ttl, ChannelMode::Mpsc, None, None)?;
let ring = Arc::new(ring);
let notify_reclaimer = ring.notify_reclaimer_handle();
let notify_receiver = ring.notify_receiver_handle();
reclaimer::spawn_reclaimer(
Arc::downgrade(&ring),
notify_reclaimer,
notify_receiver,
&handle,
);
Ok((
MpscSender {
ring: Arc::clone(&ring),
expiry_channel: Mutex::new(self.expiry_channel),
shutdown_channel: Mutex::new(self.shutdown_channel),
sender_count: Arc::new(AtomicUsize::new(1)),
},
super::receiver::Receiver::new(ring),
))
}
}
fn resolve_runtime(provided: Option<Handle>) -> Result<Handle, CaducusError> {
match provided {
Some(h) => Ok(h),
None => Handle::try_current().map_err(|_| CaducusError {
kind: CaducusErrorKind::NoRuntime,
}),
}
}
pub struct SpscSender<T: Send + 'static> {
ring: Arc<ConcurrentRing<T>>,
}
impl<T: Send + 'static> std::fmt::Debug for SpscSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpscSender").finish_non_exhaustive()
}
}
impl<T: Send + 'static> SpscSender<T> {
pub fn send(&self, item: T) -> Result<(), CaducusError<T>> {
self.ring.send_spsc(item)
}
pub fn update_capacity(&self, new: usize) {
self.ring.update_capacity(new);
}
pub fn update_ttl(&self, duration: Duration) -> Result<(), CaducusError> {
self.ring.update_ttl(duration)
}
pub fn shutdown(&self) {
reclaimer::shutdown_and_report(&self.ring);
}
pub fn is_closed(&self) -> bool {
self.ring.is_shutdown()
}
}
impl<T: Send + 'static> Drop for SpscSender<T> {
fn drop(&mut self) {
reclaimer::shutdown_and_report(&self.ring);
}
}
pub struct MpscSender<T: Send + 'static> {
ring: Arc<ConcurrentRing<T>>,
expiry_channel: Mutex<Option<Arc<dyn ReportChannel<T>>>>,
shutdown_channel: Mutex<Option<Arc<dyn ReportChannel<T>>>>,
sender_count: Arc<AtomicUsize>,
}
impl<T: Send + 'static> std::fmt::Debug for MpscSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MpscSender").finish_non_exhaustive()
}
}
impl<T: Send + 'static> MpscSender<T> {
pub fn send(&self, item: T) -> Result<(), CaducusError<T>> {
let exp = self
.expiry_channel
.lock()
.unwrap_or_else(PoisonError::into_inner);
let shut = self
.shutdown_channel
.lock()
.unwrap_or_else(PoisonError::into_inner);
let expiry = exp.clone();
let shutdown = shut.clone();
drop(shut);
drop(exp);
self.ring.send_mpsc(item, expiry, shutdown)
}
pub fn set_expiry_channel(&self, ch: Option<Arc<dyn ReportChannel<T>>>) {
*self
.expiry_channel
.lock()
.unwrap_or_else(PoisonError::into_inner) = ch;
}
pub fn set_shutdown_channel(&self, ch: Option<Arc<dyn ReportChannel<T>>>) {
*self
.shutdown_channel
.lock()
.unwrap_or_else(PoisonError::into_inner) = ch;
}
pub fn set_channels(
&self,
expiry: Option<Arc<dyn ReportChannel<T>>>,
shutdown: Option<Arc<dyn ReportChannel<T>>>,
) {
let mut exp = self
.expiry_channel
.lock()
.unwrap_or_else(PoisonError::into_inner);
let mut shut = self
.shutdown_channel
.lock()
.unwrap_or_else(PoisonError::into_inner);
*exp = expiry;
*shut = shutdown;
}
pub fn update_capacity(&self, new: usize) {
self.ring.update_capacity(new);
}
pub fn update_ttl(&self, duration: Duration) -> Result<(), CaducusError> {
self.ring.update_ttl(duration)
}
pub fn shutdown(&self) {
reclaimer::shutdown_and_report(&self.ring);
}
pub fn is_closed(&self) -> bool {
self.ring.is_shutdown()
}
}
impl<T: Send + 'static> Clone for MpscSender<T> {
fn clone(&self) -> Self {
self.sender_count.fetch_add(1, Ordering::Relaxed);
let exp = self
.expiry_channel
.lock()
.unwrap_or_else(PoisonError::into_inner);
let shut = self
.shutdown_channel
.lock()
.unwrap_or_else(PoisonError::into_inner);
let expiry = exp.clone();
let shutdown = shut.clone();
drop(shut);
drop(exp);
Self {
ring: Arc::clone(&self.ring),
expiry_channel: Mutex::new(expiry),
shutdown_channel: Mutex::new(shutdown),
sender_count: Arc::clone(&self.sender_count),
}
}
}
impl<T: Send + 'static> Drop for MpscSender<T> {
fn drop(&mut self) {
if self.sender_count.fetch_sub(1, Ordering::Release) == 1 {
std::sync::atomic::fence(Ordering::Acquire);
reclaimer::shutdown_and_report(&self.ring);
}
}
}