use event_listener::EventListener;
use std::{
any::Any,
fmt::Debug,
sync::{atomic::Ordering, Arc},
};
use crate::*;
pub trait DynChannel {
fn close(&self) -> bool;
fn halt_some(&self, n: u32);
fn halt(&self);
fn process_count(&self) -> usize;
fn msg_count(&self) -> usize;
fn address_count(&self) -> usize;
fn is_closed(&self) -> bool;
fn capacity(&self) -> &Capacity;
fn has_exited(&self) -> bool;
fn add_address(&self) -> usize;
fn remove_address(&self) -> usize;
fn get_exit_listener(&self) -> EventListener;
fn actor_id(&self) -> u64;
fn is_bounded(&self) -> bool;
}
pub trait AnyChannel: DynChannel + Debug + Send + Sync + 'static {
fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
}
impl<M: Send + 'static> AnyChannel for Channel<M> {
fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
}
impl<M> DynChannel for Channel<M> {
fn close(&self) -> bool {
if self.queue.close() {
self.recv_event.notify(usize::MAX);
self.send_event.notify(usize::MAX);
true
} else {
false
}
}
fn halt_some(&self, n: u32) {
let n = i32::try_from(n).unwrap_or(i32::MAX);
self.halt_count
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
if count < 0 {
Some(n)
} else {
Some(count.saturating_add(n))
}
})
.unwrap();
self.recv_event.notify(usize::MAX);
}
fn process_count(&self) -> usize {
self.inbox_count.load(Ordering::Acquire)
}
fn msg_count(&self) -> usize {
self.queue.len()
}
fn address_count(&self) -> usize {
self.address_count.load(Ordering::Acquire)
}
fn is_closed(&self) -> bool {
self.queue.is_closed()
}
fn capacity(&self) -> &Capacity {
&self.capacity
}
fn has_exited(&self) -> bool {
self.inbox_count.load(Ordering::Acquire) == 0
}
fn add_address(&self) -> usize {
self.address_count.fetch_add(1, Ordering::AcqRel)
}
fn remove_address(&self) -> usize {
let prev_address_count = self.address_count.fetch_sub(1, Ordering::AcqRel);
assert!(prev_address_count >= 1);
prev_address_count
}
fn get_exit_listener(&self) -> EventListener {
self.get_exit_listener()
}
fn actor_id(&self) -> u64 {
self.actor_id
}
fn is_bounded(&self) -> bool {
self.capacity.is_bounded()
}
fn halt(&self) {
self.close();
self.halt_some(u32::MAX);
}
}