use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use arc_swap::ArcSwap;
use futures_util::future::join_all;
use once_cell::sync::Lazy;
use scc::HashMap as SccHashMap;
use tokio::sync::broadcast;
use super::signal::RpcHandler;
use super::signal::Signal;
use super::signal::SignalExporter;
use super::signal::SignalHandler;
const DEFAULT_BROADCAST_CAPACITY: usize = 64;
static GLOBAL_BROADCAST_CAPACITY: AtomicUsize = AtomicUsize::new(DEFAULT_BROADCAST_CAPACITY);
static EXPORTER_KEY_COUNTER: AtomicU64 = AtomicU64::new(0);
type HandlerList = Arc<ArcSwap<Vec<SignalHandler>>>;
#[derive(Default)]
pub(crate) struct Inner {
handlers: SccHashMap<String, HandlerList>,
topics: SccHashMap<String, broadcast::Sender<Signal>>,
pub(crate) rpc: SccHashMap<String, RpcHandler>,
exporters: SccHashMap<u64, SignalExporter>,
}
fn new_handler_list() -> HandlerList {
Arc::new(ArcSwap::new(Arc::new(Vec::new())))
}
#[derive(Clone, Default)]
pub struct SignalArbiter {
pub(crate) inner: Arc<Inner>,
}
static APP_SIGNAL_ARBITER: Lazy<SignalArbiter> = Lazy::new(SignalArbiter::new);
pub fn app_signals() -> &'static SignalArbiter {
&APP_SIGNAL_ARBITER
}
pub fn app_events() -> &'static SignalArbiter {
app_signals()
}
impl SignalArbiter {
pub fn new() -> Self {
Self::default()
}
pub fn set_global_broadcast_capacity(capacity: usize) {
let cap = capacity.clamp(1, super::signal::MAX_BROADCAST_CAPACITY);
GLOBAL_BROADCAST_CAPACITY.store(cap, Ordering::SeqCst);
}
pub fn global_broadcast_capacity() -> usize {
GLOBAL_BROADCAST_CAPACITY.load(Ordering::SeqCst)
}
pub(crate) fn topic_sender(&self, id: &str) -> broadcast::Sender<Signal> {
if let Some(existing) = self.inner.topics.get_sync(id) {
existing.clone()
} else {
let cap = GLOBAL_BROADCAST_CAPACITY.load(Ordering::SeqCst);
let (tx, _rx) = broadcast::channel(cap);
let entry = self.inner.topics.entry_sync(id.to_string()).or_insert(tx);
entry.clone()
}
}
pub fn on<F, Fut>(&self, id: impl Into<String>, handler: F)
where
F: Fn(Signal) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let id = id.into();
let handler: SignalHandler = Arc::new(move |signal: Signal| {
let fut = handler(signal);
Box::pin(fut)
});
let list = self.handler_list_for(id);
list.rcu(|current| {
let mut next = Vec::with_capacity(current.len() + 1);
next.extend(current.iter().cloned());
next.push(handler.clone());
Arc::new(next)
});
}
fn handler_list_for(&self, id: String) -> HandlerList {
let entry = self
.inner
.handlers
.entry_sync(id)
.or_insert_with(new_handler_list);
entry.clone()
}
pub fn subscribe(&self, id: impl AsRef<str>) -> broadcast::Receiver<Signal> {
let id_str = id.as_ref();
let sender = self.topic_sender(id_str);
sender.subscribe()
}
pub fn subscribe_prefix(&self, prefix: impl AsRef<str>) -> broadcast::Receiver<Signal> {
let mut key = prefix.as_ref().to_string();
if !key.ends_with('*') {
key.push('*');
}
let sender = self.topic_sender(&key);
sender.subscribe()
}
pub fn subscribe_all(&self) -> broadcast::Receiver<Signal> {
self.subscribe_prefix("")
}
pub(crate) fn broadcast(&self, signal: Signal) {
if let Some(sender) = self.inner.topics.get_sync(&signal.id) {
let _ = sender.send(signal.clone());
}
let mut targets: Vec<broadcast::Sender<Signal>> = Vec::new();
self.inner.topics.iter_sync(|key, v| {
if let Some(prefix) = key.strip_suffix('*')
&& signal.id.starts_with(prefix)
{
targets.push(v.clone());
}
true
});
for sender in targets {
let _ = sender.send(signal.clone());
}
}
pub async fn once(&self, id: impl AsRef<str>) -> Option<Signal> {
let mut rx = self.subscribe(id);
loop {
match rx.recv().await {
Ok(sig) => return Some(sig),
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(_) => return None,
}
}
}
pub async fn emit(&self, signal: Signal) {
self.broadcast(signal.clone());
self
.inner
.exporters
.iter_async(|_, v| {
v(&signal);
true
})
.await;
if let Some(entry) = self.inner.handlers.get_async(&signal.id).await {
let list = entry.clone();
drop(entry);
let handlers = list.load_full();
let futures = handlers.iter().map(|handler| {
let s = signal.clone();
handler(s)
});
let _ = join_all(futures).await;
}
}
pub async fn emit_app(signal: Signal) {
app_signals().emit(signal).await;
}
pub fn register_exporter<F>(&self, exporter: F)
where
F: Fn(&Signal) + Send + Sync + 'static,
{
let key = EXPORTER_KEY_COUNTER.fetch_add(1, Ordering::Relaxed);
let exporter: SignalExporter = Arc::new(exporter);
self.inner.exporters.upsert_sync(key, exporter);
}
pub(crate) fn merge_from(&self, other: &SignalArbiter) {
other.inner.handlers.iter_sync(|k, other_list| {
let other_handlers = other_list.load_full();
if other_handlers.is_empty() {
return true;
}
let target_list = self.handler_list_for(k.clone());
target_list.rcu(|current| {
let mut next = Vec::with_capacity(current.len() + other_handlers.len());
next.extend(current.iter().cloned());
next.extend(other_handlers.iter().cloned());
Arc::new(next)
});
true
});
other.inner.topics.iter_sync(|k, v| {
self.inner.topics.entry_sync(k.clone()).or_insert(v.clone());
true
});
other.inner.rpc.iter_sync(|k, v| {
self.inner.rpc.upsert_sync(k.clone(), v.clone());
true
});
other.inner.exporters.iter_sync(|k, v| {
self.inner.exporters.upsert_sync(*k, v.clone());
true
});
}
pub fn signal_ids(&self) -> Vec<String> {
let mut ids = Vec::new();
self.inner.topics.iter_sync(|k, _| {
if !k.ends_with('*') {
ids.push(k.clone());
}
true
});
ids
}
pub fn signal_prefixes(&self) -> Vec<String> {
let mut prefixes = Vec::new();
self.inner.topics.iter_sync(|k, _| {
if k.ends_with('*') {
prefixes.push(k.clone());
}
true
});
prefixes
}
}