use super::context::{MetricsCollector, SignalContext, SignalMetrics};
use super::core::{AsyncSignalDispatcher, ReceiverFn, SignalDispatcher, SignalName};
use super::error::SignalError;
use super::middleware::{MiddlewareFn, SignalMiddleware};
use parking_lot::RwLock;
use std::any::TypeId;
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use std::time::Instant;
type PredicateFn<T> = Arc<dyn Fn(&T) -> bool + Send + Sync>;
pub(crate) struct ReceiverInfo<T: Send + Sync + 'static> {
pub(crate) receiver: ReceiverFn<T>,
pub(crate) sender_type_id: Option<TypeId>,
pub(crate) dispatch_uid: Option<String>,
pub(crate) priority: i32, pub(crate) predicate: Option<PredicateFn<T>>, }
impl<T: Send + Sync + 'static> Clone for ReceiverInfo<T> {
fn clone(&self) -> Self {
Self {
receiver: Arc::clone(&self.receiver),
sender_type_id: self.sender_type_id,
dispatch_uid: self.dispatch_uid.clone(),
priority: self.priority,
predicate: self.predicate.clone(),
}
}
}
pub struct Signal<T: Send + Sync + 'static> {
receivers: Arc<RwLock<Vec<ReceiverInfo<T>>>>,
middlewares: Arc<RwLock<Vec<MiddlewareFn<T>>>>,
context: SignalContext,
metrics: Arc<MetricsCollector>,
name: String,
}
impl<T: Send + Sync + 'static> Signal<T> {
pub fn new(name: SignalName) -> Self {
Self {
receivers: Arc::new(RwLock::new(Vec::new())),
middlewares: Arc::new(RwLock::new(Vec::new())),
context: SignalContext::new(),
metrics: Arc::new(MetricsCollector::new()),
name: name.as_str().to_string(),
}
}
#[doc(hidden)]
pub fn new_with_string(name: impl Into<String>) -> Self {
Self {
receivers: Arc::new(RwLock::new(Vec::new())),
middlewares: Arc::new(RwLock::new(Vec::new())),
context: SignalContext::new(),
metrics: Arc::new(MetricsCollector::new()),
name: name.into(),
}
}
pub fn metrics(&self) -> SignalMetrics {
self.metrics.snapshot()
}
pub fn reset_metrics(&self) {
self.metrics.reset();
}
pub fn add_middleware<M>(&self, middleware: M)
where
M: SignalMiddleware<T> + 'static,
{
let mut middlewares = self.middlewares.write();
middlewares.push(Arc::new(middleware));
}
pub fn context(&self) -> &SignalContext {
&self.context
}
pub fn connect_with_options<F, Fut>(
&self,
receiver: F,
sender_type_id: Option<TypeId>,
dispatch_uid: Option<String>,
priority: i32,
) where
F: Fn(Arc<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), SignalError>> + Send + 'static,
{
self.connect_with_full_options::<F, Fut, fn(&T) -> bool>(
receiver,
sender_type_id,
dispatch_uid,
priority,
None,
);
}
pub fn connect_with_full_options<F, Fut, P>(
&self,
receiver: F,
sender_type_id: Option<TypeId>,
dispatch_uid: Option<String>,
priority: i32,
predicate: Option<P>,
) where
F: Fn(Arc<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), SignalError>> + Send + 'static,
P: Fn(&T) -> bool + Send + Sync + 'static,
{
let boxed: ReceiverFn<T> = Arc::new(move |instance| Box::pin(receiver(instance)));
let pred: Option<PredicateFn<T>> = predicate.map(|p| Arc::new(p) as PredicateFn<T>);
let mut receivers = self.receivers.write();
if let Some(ref uid) = dispatch_uid {
receivers.retain(|r| r.dispatch_uid.as_ref() != Some(uid));
}
receivers.push(ReceiverInfo {
receiver: boxed,
sender_type_id,
dispatch_uid,
priority,
predicate: pred,
});
receivers.sort_by(|a, b| b.priority.cmp(&a.priority));
}
pub fn connect<F, Fut>(&self, receiver: F)
where
F: Fn(Arc<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), SignalError>> + Send + 'static,
{
self.connect_with_options(receiver, None, None, 0);
}
pub fn connect_with_priority<F, Fut>(&self, receiver: F, priority: i32)
where
F: Fn(Arc<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), SignalError>> + Send + 'static,
{
self.connect_with_options(receiver, None, None, priority);
}
pub fn chain(&self, next: &Signal<T>)
where
T: Clone,
{
let next_clone = next.clone();
self.connect(move |instance| {
let next = next_clone.clone();
async move {
let value = (*instance).clone();
next.send(value).await
}
});
}
pub fn chain_with<U, F>(&self, next: &Signal<U>, transform: F)
where
U: Send + Sync + 'static,
F: Fn(Arc<T>) -> U + Send + Sync + 'static,
{
let next_clone = next.clone();
let transform = Arc::new(transform);
self.connect(move |instance| {
let next = next_clone.clone();
let transform = transform.clone();
async move {
let transformed = transform(instance);
next.send(transformed).await
}
});
}
pub fn merge(signals: Vec<&Signal<T>>) -> Signal<T>
where
T: Clone,
{
let merged = Signal::new(SignalName::custom("merged_signal"));
for signal in signals {
let merged_clone = merged.clone();
signal.connect(move |instance| {
let merged = merged_clone.clone();
async move {
let value = (*instance).clone();
merged.send(value).await
}
});
}
merged
}
pub fn filter<P>(&self, predicate: P) -> Signal<T>
where
P: Fn(&T) -> bool + Send + Sync + 'static,
T: Clone,
{
let filtered = Signal::new_with_string(format!("{}_filtered", self.name));
let predicate = Arc::new(predicate);
let filtered_clone = filtered.clone();
self.connect(move |instance| {
let filtered = filtered_clone.clone();
let predicate = predicate.clone();
async move {
if predicate(&*instance) {
let value = (*instance).clone();
filtered.send(value).await
} else {
Ok(())
}
}
});
filtered
}
pub fn map<U, F>(&self, transform: F) -> Signal<U>
where
U: Send + Sync + 'static,
F: Fn(Arc<T>) -> U + Send + Sync + 'static,
{
let mapped = Signal::new_with_string(format!("{}_mapped", self.name));
let transform = Arc::new(transform);
let mapped_clone = mapped.clone();
self.connect(move |instance| {
let mapped = mapped_clone.clone();
let transform = transform.clone();
async move {
let transformed = transform(instance);
mapped.send(transformed).await
}
});
mapped
}
pub fn connect_if<F, Fut, P>(&self, receiver: F, predicate: P)
where
F: Fn(Arc<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), SignalError>> + Send + 'static,
P: Fn(&T) -> bool + Send + Sync + 'static,
{
self.connect_with_full_options(receiver, None, None, 0, Some(predicate));
}
pub fn disconnect(&self, dispatch_uid: &str) -> bool {
let mut receivers = self.receivers.write();
let original_len = receivers.len();
receivers.retain(|r| r.dispatch_uid.as_deref() != Some(dispatch_uid));
receivers.len() < original_len
}
pub async fn send_with_sender(
&self,
instance: T,
sender_type_id: Option<TypeId>,
) -> Result<(), SignalError> {
self.metrics.record_send();
let instance = Arc::new(instance);
let receivers = self.receivers.read().clone();
let middlewares = self.middlewares.read().clone();
for middleware in &middlewares {
let should_continue = middleware.before_send(&instance).await?;
if !should_continue {
return Ok(()); }
}
let mut results = Vec::new();
for receiver_info in receivers {
if let Some(expected_type_id) = receiver_info.sender_type_id {
if let Some(actual_type_id) = sender_type_id {
if expected_type_id != actual_type_id {
continue; }
} else {
continue; }
}
if let Some(ref predicate) = receiver_info.predicate
&& !predicate(&instance)
{
continue; }
let dispatch_uid_ref = receiver_info.dispatch_uid.as_deref();
let mut should_execute = true;
for middleware in &middlewares {
let can_execute = middleware
.before_receiver(&instance, dispatch_uid_ref)
.await?;
if !can_execute {
should_execute = false;
break;
}
}
if !should_execute {
continue; }
let start = Instant::now();
let result = (receiver_info.receiver)(Arc::clone(&instance)).await;
let duration = start.elapsed();
self.metrics
.record_receiver_execution(duration, result.is_ok());
for middleware in &middlewares {
middleware
.after_receiver(&instance, dispatch_uid_ref, &result)
.await?;
}
result?;
results.push(Ok(()));
}
for middleware in &middlewares {
middleware.after_send(&instance, &results).await?;
}
Ok(())
}
pub async fn send(&self, instance: T) -> Result<(), SignalError> {
self.send_with_sender(instance, None).await
}
pub async fn send_robust(
&self,
instance: T,
sender_type_id: Option<TypeId>,
) -> Vec<Result<(), SignalError>> {
self.metrics.record_send();
let instance = Arc::new(instance);
let receivers = self.receivers.read().clone();
let middlewares = self.middlewares.read().clone();
let mut results = Vec::new();
for middleware in &middlewares {
if let Ok(should_continue) = middleware.before_send(&instance).await
&& !should_continue
{
return results; }
}
for receiver_info in receivers {
if let Some(expected_type_id) = receiver_info.sender_type_id {
if let Some(actual_type_id) = sender_type_id {
if expected_type_id != actual_type_id {
continue; }
} else {
continue; }
}
if let Some(ref predicate) = receiver_info.predicate
&& !predicate(&instance)
{
continue; }
let dispatch_uid_ref = receiver_info.dispatch_uid.as_deref();
let mut should_execute = true;
for middleware in &middlewares {
if let Ok(can_execute) = middleware
.before_receiver(&instance, dispatch_uid_ref)
.await && !can_execute
{
should_execute = false;
break;
}
}
if !should_execute {
continue; }
let start = Instant::now();
let result = (receiver_info.receiver)(Arc::clone(&instance)).await;
let duration = start.elapsed();
self.metrics
.record_receiver_execution(duration, result.is_ok());
for middleware in &middlewares {
let _ = middleware
.after_receiver(&instance, dispatch_uid_ref, &result)
.await;
}
results.push(result);
}
for middleware in &middlewares {
if let Err(e) = middleware.after_send(&instance, &results).await {
eprintln!("Signal after_send middleware error: {}", e);
}
}
results
}
#[cfg(native)]
pub fn send_async(&self, instance: T) {
let instance = Arc::new(instance);
let receivers = self.receivers.read().clone();
let middlewares = self.middlewares.read().clone();
let metrics = Arc::clone(&self.metrics);
metrics.record_send();
tokio::spawn(async move {
for middleware in &middlewares {
match middleware.before_send(&instance).await {
Ok(should_continue) => {
if !should_continue {
return;
}
}
Err(_) => return,
}
}
let mut results = Vec::with_capacity(receivers.len());
for receiver_info in receivers {
if let Some(ref predicate) = receiver_info.predicate
&& !predicate(&instance)
{
continue;
}
let dispatch_uid_ref = receiver_info.dispatch_uid.as_deref();
let mut should_execute = true;
for middleware in &middlewares {
if let Ok(can_execute) = middleware
.before_receiver(&instance, dispatch_uid_ref)
.await && !can_execute
{
should_execute = false;
break;
}
}
if !should_execute {
continue;
}
let start = Instant::now();
let result = (receiver_info.receiver)(Arc::clone(&instance)).await;
let duration = start.elapsed();
metrics.record_receiver_execution(duration, result.is_ok());
for middleware in &middlewares {
let _ = middleware
.after_receiver(&instance, dispatch_uid_ref, &result)
.await;
}
results.push(result);
}
for middleware in &middlewares {
let _ = middleware.after_send(&instance, &results).await;
}
});
}
pub fn receiver_count(&self) -> usize {
self.receivers.read().len()
}
pub fn disconnect_all(&self) {
self.receivers.write().clear();
}
}
impl<T: Send + Sync + 'static> Clone for Signal<T> {
fn clone(&self) -> Self {
Self {
receivers: Arc::clone(&self.receivers),
middlewares: Arc::clone(&self.middlewares),
context: self.context.clone(),
metrics: Arc::clone(&self.metrics),
name: self.name.clone(),
}
}
}
impl<T: Send + Sync + 'static> fmt::Debug for Signal<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Signal")
.field("name", &self.name)
.field("receiver_count", &self.receiver_count())
.finish()
}
}
impl<T: Send + Sync + 'static> SignalDispatcher<T> for Signal<T> {
fn receiver_count(&self) -> usize {
Signal::receiver_count(self)
}
fn disconnect_all(&self) {
Signal::disconnect_all(self)
}
fn disconnect(&self, dispatch_uid: &str) -> bool {
Signal::disconnect(self, dispatch_uid)
}
}
#[async_trait::async_trait]
impl<T: Send + Sync + 'static> AsyncSignalDispatcher<T> for Signal<T> {
async fn send(&self, instance: T) -> Result<(), SignalError> {
Signal::send(self, instance).await
}
async fn send_with_sender(
&self,
instance: T,
sender_type_id: Option<TypeId>,
) -> Result<(), SignalError> {
Signal::send_with_sender(self, instance, sender_type_id).await
}
async fn send_robust(
&self,
instance: T,
sender_type_id: Option<TypeId>,
) -> Vec<Result<(), SignalError>> {
Signal::send_robust(self, instance, sender_type_id).await
}
}