#[cfg(feature = "metrics")]
use std::time::Duration;
use std::{
fmt::Debug,
marker::PhantomData,
panic::Location,
sync::{
Arc, Mutex, Weak,
atomic::{AtomicBool, Ordering},
},
};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use uuid::Uuid;
#[cfg(feature = "metrics")]
use crate::metrics::CellMetrics;
use crate::{
signal::Signal,
subscription::SubscriptionGuard,
traits::{CellValue, DepNode, Gettable, Mutable, Watchable, WatchableResult},
};
#[cfg(feature = "metrics")]
#[derive(Debug, Clone)]
pub struct SlowSubscriberAlert {
pub subscriber_id: Uuid,
pub duration_ns: u64,
pub threshold_ns: u64,
}
#[cfg(feature = "metrics")]
type SlowSubscriberCallback = Arc<dyn Fn(SlowSubscriberAlert) + Send + Sync>;
#[derive(Debug, Clone)]
pub struct CellMutable;
#[derive(Debug, Clone)]
pub struct CellImmutable;
pub(crate) struct CellInner<T> {
pub(crate) id: Uuid,
pub(crate) subscribers: ArcSwap<Vec<(Uuid, Arc<Subscriber<T>>)>>,
pub(crate) subscribers_writer: Mutex<()>,
pub(crate) result_subscribers: ArcSwap<Vec<(Uuid, Arc<ResultSubscriber<T>>)>>,
pub(crate) result_subscribers_writer: Mutex<()>,
pub(crate) value: Mutex<Arc<T>>,
pub(crate) name: Mutex<Option<Arc<str>>>,
pub(crate) owned: DashMap<Uuid, SubscriptionGuard>,
pub(crate) completed: AtomicBool,
pub(crate) errored: AtomicBool,
pub(crate) error: Mutex<Option<Arc<anyhow::Error>>>,
#[cfg(feature = "metrics")]
pub(crate) metrics: Option<Arc<CellMetrics>>,
#[cfg(feature = "metrics")]
pub(crate) slow_subscriber_threshold_ns: ArcSwap<Option<u64>>,
#[cfg(feature = "metrics")]
pub(crate) slow_subscriber_callback: ArcSwap<Option<SlowSubscriberCallback>>,
#[allow(dead_code)]
pub(crate) caller: &'static Location<'static>,
}
pub struct Cell<T, M> {
pub(crate) inner: Arc<CellInner<T>>,
pub(crate) _marker: PhantomData<M>,
}
pub struct WeakCell<T, M> {
inner: Weak<CellInner<T>>,
_marker: PhantomData<M>,
}
impl<T, M> WeakCell<T, M> {
pub fn upgrade(&self) -> Option<Cell<T, M>> {
self.inner.upgrade().map(|inner| Cell {
inner,
_marker: PhantomData,
})
}
}
impl<T, M> Clone for WeakCell<T, M> {
fn clone(&self) -> Self {
WeakCell {
inner: self.inner.clone(),
_marker: PhantomData,
}
}
}
pub(crate) type SubscriberCallback<T> = Arc<dyn Fn(&Signal<T>) + Send + Sync>;
pub(crate) struct Subscriber<T> {
pub(crate) callback: SubscriberCallback<T>,
}
impl<T> Subscriber<T> {
pub(crate) fn new(callback: impl Fn(&Signal<T>) + Send + Sync + 'static) -> Self {
Self {
callback: Arc::new(callback),
}
}
}
pub(crate) type ResultSubscriberCallback<T> =
Arc<dyn Fn(&Signal<T>) -> Result<(), String> + Send + Sync>;
pub(crate) struct ResultSubscriber<T> {
pub(crate) callback: ResultSubscriberCallback<T>,
}
impl<T> ResultSubscriber<T> {
pub(crate) fn new(
callback: impl Fn(&Signal<T>) -> Result<(), String> + Send + Sync + 'static,
) -> Self {
Self {
callback: Arc::new(callback),
}
}
}
impl<T: CellValue> Cell<T, CellMutable> {
#[track_caller]
pub fn new(initial_value: T) -> Self {
let inner = Arc::new(CellInner {
id: Uuid::new_v4(),
subscribers: ArcSwap::from_pointee(Vec::new()),
subscribers_writer: Mutex::new(()),
result_subscribers: ArcSwap::from_pointee(Vec::new()),
result_subscribers_writer: Mutex::new(()),
value: Mutex::new(Arc::new(initial_value)),
name: Mutex::new(None),
owned: DashMap::new(),
completed: AtomicBool::new(false),
errored: AtomicBool::new(false),
error: Mutex::new(None),
#[cfg(feature = "metrics")]
metrics: default_metrics(),
#[cfg(feature = "metrics")]
slow_subscriber_threshold_ns: ArcSwap::from_pointee(None),
#[cfg(feature = "metrics")]
slow_subscriber_callback: ArcSwap::from_pointee(None),
caller: Location::caller(),
});
#[cfg(all(feature = "inspector", not(target_arch = "wasm32")))]
crate::registry::registry().register(inner.id, Arc::downgrade(&inner) as Weak<dyn DepNode>);
#[cfg(feature = "trace")]
crate::tracing::register_cell(inner.id, Some(Location::caller().to_string()));
Self {
inner,
_marker: PhantomData,
}
}
#[cfg(feature = "metrics")]
#[track_caller]
pub fn with_metrics(initial_value: T) -> Self {
let inner = Arc::new(CellInner {
id: Uuid::new_v4(),
subscribers: ArcSwap::from_pointee(Vec::new()),
subscribers_writer: Mutex::new(()),
result_subscribers: ArcSwap::from_pointee(Vec::new()),
result_subscribers_writer: Mutex::new(()),
value: Mutex::new(Arc::new(initial_value)),
name: Mutex::new(None),
owned: DashMap::new(),
completed: AtomicBool::new(false),
errored: AtomicBool::new(false),
error: Mutex::new(None),
metrics: Some(Arc::new(CellMetrics::new())),
slow_subscriber_threshold_ns: ArcSwap::from_pointee(None),
slow_subscriber_callback: ArcSwap::from_pointee(None),
caller: Location::caller(),
});
#[cfg(all(feature = "inspector", not(target_arch = "wasm32")))]
crate::registry::registry().register(inner.id, Arc::downgrade(&inner) as Weak<dyn DepNode>);
#[cfg(feature = "trace")]
crate::tracing::register_cell(inner.id, Some(Location::caller().to_string()));
Self {
inner,
_marker: PhantomData,
}
}
#[cfg(feature = "metrics")]
pub fn on_slow_subscriber<F>(&self, threshold: Duration, callback: F)
where
F: Fn(SlowSubscriberAlert) + Send + Sync + 'static,
{
self.inner
.slow_subscriber_threshold_ns
.store(Arc::new(Some(threshold.as_nanos() as u64)));
self.inner
.slow_subscriber_callback
.store(Arc::new(Some(Arc::new(callback))));
}
pub fn lock(self) -> Cell<T, CellImmutable> {
Cell {
inner: self.inner,
_marker: PhantomData,
}
}
pub fn with_name(self, name: impl Into<Arc<str>>) -> Self {
let name = name.into();
*self.inner.name.lock().expect("cell name poisoned") = Some(name.clone());
#[cfg(feature = "trace")]
crate::tracing::update_name(self.inner.id, name.to_string());
self
}
#[cfg(feature = "metrics")]
pub fn is_backed_up(&self) -> bool {
self.is_backed_up_threshold(std::time::Duration::from_millis(1))
}
#[cfg(feature = "metrics")]
pub fn is_backed_up_threshold(&self, threshold: std::time::Duration) -> bool {
self.inner
.metrics
.as_ref()
.map(|m| m.last_notify_time_ns() > threshold.as_nanos() as u64)
.unwrap_or(false)
}
#[cfg(feature = "metrics")]
pub fn try_set(&self, value: T) -> Result<(), T> {
if self.is_backed_up() {
Err(value)
} else {
self.set(value);
Ok(())
}
}
#[cfg(feature = "metrics")]
pub fn try_set_threshold(&self, value: T, threshold: std::time::Duration) -> Result<(), T> {
if self.is_backed_up_threshold(threshold) {
Err(value)
} else {
self.set(value);
Ok(())
}
}
}
impl<T, M> Clone for Cell<T, M> {
fn clone(&self) -> Self {
Cell {
inner: Arc::clone(&self.inner),
_marker: PhantomData,
}
}
}
impl<T, M> Cell<T, M> {
pub fn downgrade(&self) -> WeakCell<T, M> {
WeakCell {
inner: Arc::downgrade(&self.inner),
_marker: PhantomData,
}
}
#[cfg(feature = "metrics")]
pub fn metrics(&self) -> Option<&CellMetrics> {
self.inner.metrics.as_ref().map(|m| m.as_ref())
}
pub fn own(&self, guard: SubscriptionGuard) {
#[cfg(all(feature = "inspector", not(target_arch = "wasm32")))]
crate::registry::registry().mark_owned(guard.source().id(), self.inner.id);
self.inner.owned.insert(Uuid::new_v4(), guard);
#[cfg(feature = "trace")]
crate::tracing::update_owned_count(self.inner.id, self.inner.owned.len());
}
pub fn own_keyed(&self, key: Uuid, guard: SubscriptionGuard) {
#[cfg(all(feature = "inspector", not(target_arch = "wasm32")))]
{
if let Some((_, old_guard)) = self.inner.owned.remove(&key) {
crate::registry::registry().unmark_owned(old_guard.source().id());
}
crate::registry::registry().mark_owned(guard.source().id(), self.inner.id);
}
self.inner.owned.insert(key, guard);
#[cfg(feature = "trace")]
crate::tracing::update_owned_count(self.inner.id, self.inner.owned.len());
}
}
impl<T: Send + Sync, M: Send + Sync> DepNode for Cell<T, M> {
fn id(&self) -> Uuid {
self.inner.id
}
fn name(&self) -> Option<String> {
self.inner
.name
.lock()
.expect("cell name poisoned")
.as_ref()
.map(|s| s.to_string())
}
fn deps(&self) -> Vec<Arc<dyn DepNode>> {
let mut seen = std::collections::HashSet::new();
self.inner
.owned
.iter()
.filter_map(|entry| {
let source = entry.value().source();
let id = source.id();
if seen.insert(id) {
Some(Arc::clone(source))
} else {
None
}
})
.collect()
}
fn subscriber_count(&self) -> usize {
self.inner.subscribers.load().len() + self.inner.result_subscribers.load().len()
}
fn owned_count(&self) -> usize {
self.inner.owned.len()
}
}
impl<T: CellValue> Cell<T, CellImmutable> {
pub fn with_name(self, name: impl Into<Arc<str>>) -> Self {
let name = name.into();
*self.inner.name.lock().expect("cell name poisoned") = Some(name.clone());
#[cfg(feature = "trace")]
crate::tracing::update_name(self.inner.id, name.to_string());
self
}
}
impl<T: CellValue, M: Send + Sync + 'static> Cell<T, M> {
#[doc(hidden)]
pub fn notify(&self, signal: Signal<T>) {
if self.inner.completed.load(Ordering::SeqCst) || self.inner.errored.load(Ordering::SeqCst)
{
return;
}
#[cfg(feature = "metrics")]
let notify_start = self
.inner
.metrics
.as_ref()
.map(|_| std::time::Instant::now());
match &signal {
Signal::Value(arc_value) => {
*self.inner.value.lock().expect("cell value poisoned") = arc_value.clone();
}
Signal::Complete => {
self.inner.completed.store(true, Ordering::SeqCst);
}
Signal::Error(err) => {
self.inner.errored.store(true, Ordering::SeqCst);
*self.inner.error.lock().expect("cell error poisoned") = Some(err.clone());
}
}
let subs = self.inner.subscribers.load();
#[cfg(feature = "metrics")]
let metrics = &self.inner.metrics;
#[cfg(feature = "metrics")]
let (slow_threshold, slow_callback) = if metrics.is_some() {
(
**self.inner.slow_subscriber_threshold_ns.load(),
(**self.inner.slow_subscriber_callback.load()).clone(),
)
} else {
(None, None)
};
for (_subscriber_id, sub) in subs.iter() {
#[cfg(feature = "metrics")]
let sub_start = metrics.as_ref().map(|_| std::time::Instant::now());
(sub.callback)(&signal);
#[cfg(feature = "metrics")]
if let (Some(m), Some(start)) = (metrics, sub_start) {
let elapsed = start.elapsed().as_nanos() as u64;
m.update_slowest_subscriber(elapsed);
if let (Some(threshold), Some(cb)) = (&slow_threshold, &slow_callback)
&& elapsed > *threshold
{
let alert = SlowSubscriberAlert {
subscriber_id: *_subscriber_id,
duration_ns: elapsed,
threshold_ns: *threshold,
};
cb(alert);
}
}
}
let result_subs = self.inner.result_subscribers.load();
for (subscriber_id, sub) in result_subs.iter() {
#[cfg(feature = "metrics")]
let sub_start = metrics.as_ref().map(|_| std::time::Instant::now());
if let Err(err) = (sub.callback)(&signal) {
log::error!(
"hyphae: fallible subscriber {} on cell {} returned error: {}",
subscriber_id,
self.inner.id,
err
);
}
#[cfg(feature = "metrics")]
if let (Some(m), Some(start)) = (metrics, sub_start) {
let elapsed = start.elapsed().as_nanos() as u64;
m.update_slowest_subscriber(elapsed);
if let (Some(threshold), Some(cb)) = (&slow_threshold, &slow_callback)
&& elapsed > *threshold
{
let alert = SlowSubscriberAlert {
subscriber_id: *subscriber_id,
duration_ns: elapsed,
threshold_ns: *threshold,
};
cb(alert);
}
}
}
#[cfg(feature = "metrics")]
if let (Some(metrics), Some(start)) = (&self.inner.metrics, notify_start) {
let duration_ns = start.elapsed().as_nanos() as u64;
metrics.record_notify(duration_ns);
#[cfg(feature = "trace")]
crate::tracing::record_notify(
self.inner.id,
duration_ns,
subs.len() + result_subs.len(),
self.inner.owned.len(),
metrics.slowest_subscriber_ns(),
);
}
}
}
impl<T: CellValue, U: Send + Sync + 'static> Gettable<T> for Cell<T, U> {
fn get(&self) -> T {
let arc = self
.inner
.value
.lock()
.expect("cell value poisoned")
.clone();
(*arc).clone()
}
}
impl<T: CellValue, U: Send + Sync + 'static> Watchable<T> for Cell<T, U> {
fn subscribe(
&self,
callback: impl Fn(&Signal<T>) + Send + Sync + 'static,
) -> SubscriptionGuard {
let current = self
.inner
.value
.lock()
.expect("cell value poisoned")
.clone();
callback(&Signal::Value(current));
if self.is_complete() {
callback(&Signal::Complete);
} else if self.is_error()
&& let Some(err) = self.error()
{
callback(&Signal::Error(err));
}
let id = Uuid::new_v4();
let sub = Arc::new(Subscriber::new(callback));
{
let _w = self
.inner
.subscribers_writer
.lock()
.expect("cell subscribers_writer poisoned");
let current = self.inner.subscribers.load();
let mut next = (**current).clone();
next.push((id, sub));
self.inner.subscribers.store(Arc::new(next));
}
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.inner.metrics {
metrics.record_subscriber_added();
}
#[cfg(feature = "trace")]
crate::tracing::update_subscriber_count(
self.inner.id,
self.inner.subscribers.load().len() + self.inner.result_subscribers.load().len(),
);
let source: Arc<dyn DepNode> = Arc::new(self.clone());
let cell = self.clone();
#[cfg(feature = "metrics")]
let metrics = self.inner.metrics.clone();
SubscriptionGuard::new(id, source, move || {
let removed = {
let _w = cell
.inner
.subscribers_writer
.lock()
.expect("cell subscribers_writer poisoned");
let current = cell.inner.subscribers.load();
let prev_len = current.len();
let next: Vec<(Uuid, Arc<Subscriber<T>>)> = (**current)
.iter()
.filter(|(i, _)| *i != id)
.cloned()
.collect();
let removed = next.len() != prev_len;
if removed {
cell.inner.subscribers.store(Arc::new(next));
}
removed
};
#[cfg(feature = "metrics")]
if removed && let Some(m) = &metrics {
m.record_subscriber_removed();
}
#[cfg(not(feature = "metrics"))]
let _ = removed;
#[cfg(feature = "trace")]
crate::tracing::update_subscriber_count(
cell.inner.id,
cell.inner.subscribers.load().len() + cell.inner.result_subscribers.load().len(),
);
})
}
fn unsubscribe(&self, id: Uuid) {
let removed_from_subs = {
let _w = self
.inner
.subscribers_writer
.lock()
.expect("cell subscribers_writer poisoned");
let current = self.inner.subscribers.load();
let prev_len = current.len();
let next: Vec<(Uuid, Arc<Subscriber<T>>)> = (**current)
.iter()
.filter(|(i, _)| *i != id)
.cloned()
.collect();
let removed = next.len() != prev_len;
if removed {
self.inner.subscribers.store(Arc::new(next));
}
removed
};
let removed_from_result = if removed_from_subs {
false
} else {
let _w = self
.inner
.result_subscribers_writer
.lock()
.expect("cell result_subscribers_writer poisoned");
let current = self.inner.result_subscribers.load();
let prev_len = current.len();
let next: Vec<(Uuid, Arc<ResultSubscriber<T>>)> = (**current)
.iter()
.filter(|(i, _)| *i != id)
.cloned()
.collect();
let removed = next.len() != prev_len;
if removed {
self.inner.result_subscribers.store(Arc::new(next));
}
removed
};
if removed_from_subs || removed_from_result {
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.inner.metrics {
metrics.record_subscriber_removed();
}
#[cfg(feature = "trace")]
crate::tracing::update_subscriber_count(
self.inner.id,
self.inner.subscribers.load().len() + self.inner.result_subscribers.load().len(),
);
}
}
fn is_complete(&self) -> bool {
self.inner.completed.load(Ordering::SeqCst)
}
fn is_error(&self) -> bool {
self.inner.errored.load(Ordering::SeqCst)
}
fn error(&self) -> Option<Arc<anyhow::Error>> {
self.inner
.error
.lock()
.expect("cell error poisoned")
.clone()
}
}
impl<T: CellValue, U: Send + Sync + 'static> WatchableResult<T> for Cell<T, U> {
fn subscribe_result(
&self,
callback: impl Fn(&Signal<T>) -> Result<(), String> + Send + Sync + 'static,
) -> SubscriptionGuard {
let cell_id = self.inner.id;
let log_err = |id: &Uuid, err: &str| {
log::error!(
"hyphae: fallible subscriber {} on cell {} returned error: {}",
id,
cell_id,
err
);
};
let id = Uuid::new_v4();
let current = self
.inner
.value
.lock()
.expect("cell value poisoned")
.clone();
if let Err(err) = callback(&Signal::Value(current)) {
log_err(&id, &err);
}
if self.inner.completed.load(Ordering::SeqCst) {
if let Err(err) = callback(&Signal::Complete) {
log_err(&id, &err);
}
} else if self.inner.errored.load(Ordering::SeqCst)
&& let Some(e) = self
.inner
.error
.lock()
.expect("cell error poisoned")
.clone()
&& let Err(err) = callback(&Signal::Error(e))
{
log_err(&id, &err);
}
let sub = Arc::new(ResultSubscriber::new(callback));
{
let _w = self
.inner
.result_subscribers_writer
.lock()
.expect("cell result_subscribers_writer poisoned");
let current = self.inner.result_subscribers.load();
let mut next = (**current).clone();
next.push((id, sub));
self.inner.result_subscribers.store(Arc::new(next));
}
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.inner.metrics {
metrics.record_subscriber_added();
}
#[cfg(feature = "trace")]
crate::tracing::update_subscriber_count(
self.inner.id,
self.inner.subscribers.load().len() + self.inner.result_subscribers.load().len(),
);
let source: Arc<dyn DepNode> = Arc::new(self.clone());
let cell = self.clone();
#[cfg(feature = "metrics")]
let metrics = self.inner.metrics.clone();
SubscriptionGuard::new(id, source, move || {
let removed = {
let _w = cell
.inner
.result_subscribers_writer
.lock()
.expect("cell result_subscribers_writer poisoned");
let current = cell.inner.result_subscribers.load();
let prev_len = current.len();
let next: Vec<(Uuid, Arc<ResultSubscriber<T>>)> = (**current)
.iter()
.filter(|(i, _)| *i != id)
.cloned()
.collect();
let removed = next.len() != prev_len;
if removed {
cell.inner.result_subscribers.store(Arc::new(next));
}
removed
};
#[cfg(feature = "metrics")]
if removed && let Some(m) = &metrics {
m.record_subscriber_removed();
}
#[cfg(not(feature = "metrics"))]
let _ = removed;
#[cfg(feature = "trace")]
crate::tracing::update_subscriber_count(
cell.inner.id,
cell.inner.subscribers.load().len() + cell.inner.result_subscribers.load().len(),
);
})
}
}
impl<T: CellValue> Mutable<T> for Cell<T, CellMutable> {
fn set(&self, value: T) {
self.notify(Signal::value(value)); }
fn complete(&self) {
self.notify(Signal::Complete);
}
fn fail(&self, error: impl Into<anyhow::Error>) {
self.notify(Signal::error(error));
}
}
#[cfg(all(feature = "inspector", not(target_arch = "wasm32")))]
impl<T: CellValue> DepNode for CellInner<T> {
fn id(&self) -> Uuid {
self.id
}
fn name(&self) -> Option<String> {
self.name
.lock()
.expect("cell name poisoned")
.as_ref()
.map(|s| s.to_string())
}
fn deps(&self) -> Vec<Arc<dyn DepNode>> {
let mut seen = std::collections::HashSet::new();
self.owned
.iter()
.filter_map(|entry| {
let source = entry.value().source();
let id = source.id();
if seen.insert(id) {
Some(Arc::clone(source))
} else {
None
}
})
.collect()
}
fn subscriber_count(&self) -> usize {
self.subscribers.load().len() + self.result_subscribers.load().len()
}
fn owned_count(&self) -> usize {
self.owned.len()
}
fn value_debug(&self) -> Option<String> {
let arc = self.value.lock().expect("cell value poisoned").clone();
Some(format!("{:?}", &*arc))
}
fn caller(&self) -> Option<&'static Location<'static>> {
Some(self.caller)
}
}
impl<T> Drop for CellInner<T> {
fn drop(&mut self) {
#[cfg(feature = "trace")]
crate::tracing::deregister_cell(&self.id);
#[cfg(all(feature = "inspector", not(target_arch = "wasm32")))]
crate::registry::registry().deregister(&self.id);
}
}
#[cfg(all(feature = "metrics", feature = "trace"))]
fn default_metrics() -> Option<Arc<CellMetrics>> {
Some(Arc::new(CellMetrics::new()))
}
#[cfg(all(feature = "metrics", not(feature = "trace")))]
fn default_metrics() -> Option<Arc<CellMetrics>> {
None
}