use alloc::sync::Weak;
use alloc::vec::Vec;
use core::fmt;
use core::marker::PhantomData;
use core::sync::atomic::{AtomicBool, Ordering::Relaxed};
#[cfg(doc)]
use alloc::sync::Arc;
use crate::maybe_sync::RwLock;
use crate::{IntoListener, Listen, Listener};
#[cfg(doc)]
use crate::{FromListener, Source, sync};
#[cfg_attr(not(feature = "std-sync"), allow(rustdoc::broken_intra_doc_links))]
pub struct Notifier<M, L> {
state: RwLock<State<M, L>>,
}
enum State<M, L> {
Open(RawNotifier<M, L>),
Closed,
}
pub struct RawNotifier<M, L> {
listeners: Vec<NotifierEntry<L>>,
_phantom: PhantomData<fn(&M)>,
}
pub(crate) struct NotifierEntry<L> {
listener: L,
was_alive: AtomicBool,
}
impl<M, L> Notifier<M, L> {
#[must_use]
pub fn new() -> Self {
Self {
state: RwLock::new(State::Open(RawNotifier::new())),
}
}
pub fn close(&self) {
*self.state.write() = State::Closed;
}
}
impl<M, L> RawNotifier<M, L> {
#[must_use]
pub fn new() -> Self {
Self {
listeners: Vec::new(),
_phantom: PhantomData,
}
}
fn count_approximate(&self) -> usize {
self.listeners.len()
}
}
impl<M, L: Listener<M>> Notifier<M, L> {
#[cfg_attr(
feature = "std-sync",
doc = " use nosy::{Listen, sync::Notifier, Log};"
)]
#[cfg_attr(
not(feature = "std-sync"),
doc = " use nosy::{Listen, unsync::Notifier, Log};"
)]
#[must_use]
pub fn forwarder(this: Weak<Self>) -> NotifierForwarder<M, L> {
NotifierForwarder(this)
}
pub fn notify(&self, message: &M) {
self.notify_many(core::slice::from_ref(message))
}
pub fn notify_many(&self, messages: &[M]) {
match &*self.state.read() {
State::Open(raw) => raw.notify_many(messages),
State::Closed => panic!("cannot send messages after Notifier::close()"),
}
}
pub fn buffer<const CAPACITY: usize>(&self) -> Buffer<'_, M, L, CAPACITY> {
Buffer::new(self)
}
pub fn count(&self) -> usize {
let mut state = self.state.write();
state.drop_dead_listeners();
state.count_approximate()
}
}
impl<M, L: Listener<M>> RawNotifier<M, L> {
pub fn notify(&self, message: &M) {
self.notify_many(core::slice::from_ref(message))
}
pub fn notify_many(&self, messages: &[M]) {
for NotifierEntry {
listener,
was_alive,
} in self.listeners.iter()
{
let alive = listener.receive(messages);
was_alive.fetch_and(alive, Relaxed);
}
}
pub fn buffer<const CAPACITY: usize>(&mut self) -> RawBuffer<'_, M, L, CAPACITY> {
RawBuffer::new(self)
}
pub fn listen<L2: IntoListener<L, M>>(&mut self, listener: L2) {
if !listener.receive(&[]) {
return;
}
self.drop_dead_if_full();
self.listeners.push(NotifierEntry {
listener: listener.into_listener(),
was_alive: AtomicBool::new(true),
});
}
fn listen_raw(&mut self, listener: L) {
if !listener.receive(&[]) {
return;
}
self.drop_dead_if_full();
self.listeners.push(NotifierEntry {
listener,
was_alive: AtomicBool::new(true),
});
}
#[mutants::skip] fn drop_dead_listeners(&mut self) {
let listeners = &mut self.listeners;
let mut i = 0;
while i < listeners.len() {
let entry = &listeners[i];
if entry.was_alive.load(Relaxed) && entry.listener.receive(&[]) {
i += 1;
} else {
listeners.swap_remove(i);
}
}
}
fn drop_dead_if_full(&mut self) {
let full = self.listeners.len() >= self.listeners.capacity();
if full {
self.drop_dead_listeners();
}
}
#[must_use]
pub fn count(&self) -> usize {
self.listeners
.iter()
.filter(|entry| entry.was_alive.load(Relaxed) && entry.listener.receive(&[]))
.count()
}
}
impl<M, L: Listener<M>> Listen for Notifier<M, L> {
type Msg = M;
type Listener = L;
fn listen_raw(&self, listener: L) {
match *self.state.write() {
State::Open(ref mut raw_notifier) => raw_notifier.listen_raw(listener),
State::Closed => {}
}
}
fn listen<L2: IntoListener<Self::Listener, Self::Msg>>(&self, listener: L2) {
match *self.state.write() {
State::Open(ref mut raw_notifier) => raw_notifier.listen(listener),
State::Closed => {}
}
}
}
impl<M, L: Listener<M>> Default for Notifier<M, L> {
fn default() -> Self {
Self::new()
}
}
impl<M, L: Listener<M>> Default for RawNotifier<M, L> {
fn default() -> Self {
Self::new()
}
}
impl<M, L> fmt::Debug for Notifier<M, L> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.state.try_read().as_deref() {
Ok(State::Open(raw_notifier)) => {
write!(fmt, "Notifier({})", raw_notifier.count_approximate())
}
Ok(State::Closed) => write!(fmt, "Notifier(closed)"),
Err(_) => write!(fmt, "Notifier(?)"),
}
}
}
impl<M, L> fmt::Debug for RawNotifier<M, L> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "RawNotifier({})", self.count_approximate())
}
}
impl<M, L> State<M, L> {
fn count_approximate(&self) -> usize {
match self {
State::Open(listeners) => listeners.count_approximate(),
State::Closed => 0,
}
}
}
impl<M, L: Listener<M>> State<M, L> {
fn drop_dead_listeners(&mut self) {
match self {
State::Closed => {}
State::Open(raw_notifier) => raw_notifier.drop_dead_listeners(),
}
}
}
#[derive(Debug)]
pub struct Buffer<'notifier, M, L, const CAPACITY: usize>
where
L: Listener<M>,
{
pub(crate) buffer: arrayvec::ArrayVec<M, CAPACITY>,
pub(crate) notifier: &'notifier Notifier<M, L>,
}
#[derive(Debug)]
pub struct RawBuffer<'notifier, M, L, const CAPACITY: usize>
where
L: Listener<M>,
{
pub(crate) buffer: arrayvec::ArrayVec<M, CAPACITY>,
pub(crate) notifier: &'notifier mut RawNotifier<M, L>,
}
impl<'notifier, M, L, const CAPACITY: usize> Buffer<'notifier, M, L, CAPACITY>
where
L: Listener<M>,
{
pub(crate) fn new(notifier: &'notifier Notifier<M, L>) -> Self {
Self {
buffer: arrayvec::ArrayVec::new(),
notifier,
}
}
pub fn push(&mut self, message: M) {
self.buffer.push(message);
if self.buffer.is_full() {
self.flush();
}
}
#[cold]
pub(crate) fn flush(&mut self) {
self.notifier.notify_many(&self.buffer);
self.buffer.clear();
}
}
impl<'notifier, M, L, const CAPACITY: usize> RawBuffer<'notifier, M, L, CAPACITY>
where
L: Listener<M>,
{
pub(crate) fn new(notifier: &'notifier mut RawNotifier<M, L>) -> Self {
Self {
buffer: arrayvec::ArrayVec::new(),
notifier,
}
}
pub fn push(&mut self, message: M) {
self.buffer.push(message);
if self.buffer.is_full() {
self.flush();
}
}
#[cold]
pub(crate) fn flush(&mut self) {
self.notifier.notify_many(&self.buffer);
self.buffer.clear();
}
}
impl<M, L, const CAPACITY: usize> Drop for Buffer<'_, M, L, CAPACITY>
where
L: Listener<M>,
{
fn drop(&mut self) {
if !self.buffer.is_empty() {
self.flush();
}
}
}
impl<M, L, const CAPACITY: usize> Drop for RawBuffer<'_, M, L, CAPACITY>
where
L: Listener<M>,
{
fn drop(&mut self) {
if !self.buffer.is_empty() {
self.flush();
}
}
}
pub struct NotifierForwarder<M, L>(pub(super) Weak<Notifier<M, L>>);
impl<M, L> fmt::Debug for NotifierForwarder<M, L> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NotifierForwarder")
.field("alive(shallow)", &(self.0.strong_count() > 0))
.finish_non_exhaustive()
}
}
impl<M, L> fmt::Pointer for NotifierForwarder<M, L> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.as_ptr().fmt(f)
}
}
impl<M, L: Listener<M>> Listener<M> for NotifierForwarder<M, L> {
fn receive(&self, messages: &[M]) -> bool {
if let Some(notifier) = self.0.upgrade() {
notifier.notify_many(messages);
true
} else {
false
}
}
}
impl<L: Listener<M>, M> crate::FromListener<NotifierForwarder<M, L>, M>
for NotifierForwarder<M, L>
{
fn from_listener(listener: NotifierForwarder<M, L>) -> Self {
listener
}
}
impl<M, L> Clone for NotifierForwarder<M, L> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}