use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use dashmap::DashMap;
use once_cell::sync::Lazy;
use serde_json::Value;
#[derive(Debug, Clone)]
pub struct Event {
pub name: String,
pub payload: HashMap<String, Value>,
pub time: Instant,
pub end: Instant,
pub duration: Duration,
}
pub type Subscriber = Box<dyn Fn(&Event) + Send + Sync + 'static>;
type SharedSubscriber = Arc<dyn Fn(&Event) + Send + Sync + 'static>;
#[derive(Clone)]
struct Subscription {
id: usize,
callback: SharedSubscriber,
}
#[derive(Default)]
pub struct Notifier {
subscriptions: DashMap<String, Vec<Subscription>>,
subscription_keys: DashMap<usize, String>,
next_id: AtomicUsize,
}
impl Notifier {
#[must_use]
pub fn new() -> Self {
Self {
subscriptions: DashMap::new(),
subscription_keys: DashMap::new(),
next_id: AtomicUsize::new(1),
}
}
pub fn subscribe(&self, event_name: &str, callback: Subscriber) -> usize {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let pattern = event_name.to_owned();
let shared = Arc::<dyn Fn(&Event) + Send + Sync>::from(callback);
self.subscriptions
.entry(pattern.clone())
.or_default()
.push(Subscription {
id,
callback: shared,
});
self.subscription_keys.insert(id, pattern);
id
}
pub fn unsubscribe(&self, id: usize) {
let Some((_, pattern)) = self.subscription_keys.remove(&id) else {
return;
};
let should_remove_key =
if let Some(mut subscriptions) = self.subscriptions.get_mut(&pattern) {
subscriptions.retain(|subscription| subscription.id != id);
subscriptions.is_empty()
} else {
false
};
if should_remove_key {
self.subscriptions.remove(&pattern);
}
}
pub fn instrument<F, R>(&self, event_name: &str, payload: HashMap<String, Value>, f: F) -> R
where
F: FnOnce() -> R,
{
let start = Instant::now();
let result = f();
let end = Instant::now();
self.publish(Event {
name: event_name.to_owned(),
payload,
time: start,
end,
duration: end.saturating_duration_since(start),
});
result
}
pub fn publish(&self, event: Event) {
let callbacks: Vec<SharedSubscriber> = self
.subscriptions
.iter()
.filter(|entry| Self::matches(entry.key(), &event.name))
.flat_map(|entry| {
entry
.value()
.iter()
.map(|subscription| Arc::clone(&subscription.callback))
.collect::<Vec<_>>()
})
.collect();
for callback in callbacks {
callback(&event);
}
}
fn matches(pattern: &str, event_name: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix('*') {
return event_name.starts_with(prefix);
}
pattern == event_name
}
}
static DEFAULT_NOTIFIER: Lazy<Notifier> = Lazy::new(Notifier::new);
#[must_use]
pub fn default_notifier() -> &'static Notifier {
&DEFAULT_NOTIFIER
}
pub fn subscribe(event: &str, callback: Subscriber) -> usize {
default_notifier().subscribe(event, callback)
}
pub fn instrument<F, R>(event: &str, payload: HashMap<String, Value>, f: F) -> R
where
F: FnOnce() -> R,
{
default_notifier().instrument(event, payload, f)
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration as StdDuration;
use super::*;
fn lock<T>(value: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
value.lock().expect("mutex should not be poisoned")
}
#[test]
fn subscribe_and_receive_published_events() {
let notifier = Notifier::new();
let events = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&events);
notifier.subscribe(
"render",
Box::new(move |event| {
lock(&received).push(event.name.clone());
}),
);
notifier.publish(Event {
name: "render".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
assert_eq!(&*lock(&events), &[String::from("render")]);
}
#[test]
fn instrument_measures_duration() {
let notifier = Notifier::new();
let durations = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&durations);
notifier.subscribe(
"slow",
Box::new(move |event| {
lock(&received).push(event.duration);
}),
);
notifier.instrument("slow", HashMap::new(), || {
thread::sleep(StdDuration::from_millis(15));
});
let durations = lock(&durations);
assert_eq!(durations.len(), 1);
assert!(durations[0] >= StdDuration::from_millis(15));
}
#[test]
fn unsubscribe_stops_delivery() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
let received = Arc::clone(&hits);
let id = notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&received) += 1;
}),
);
notifier.publish(Event {
name: "render".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
notifier.unsubscribe(id);
notifier.publish(Event {
name: "render".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
assert_eq!(*lock(&hits), 1);
}
#[test]
fn multiple_subscribers_receive_the_same_event() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(Vec::new()));
for name in ["first", "second"] {
let hits = Arc::clone(&hits);
notifier.subscribe(
"render",
Box::new(move |_| {
lock(&hits).push(name.to_owned());
}),
);
}
notifier.publish(Event {
name: "render".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
let hits = lock(&hits);
assert_eq!(hits.len(), 2);
assert!(hits.contains(&String::from("first")));
assert!(hits.contains(&String::from("second")));
}
#[test]
fn exact_subscriptions_do_not_receive_other_events() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
let received = Arc::clone(&hits);
notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&received) += 1;
}),
);
notifier.publish(Event {
name: "sql".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
assert_eq!(*lock(&hits), 0);
}
#[test]
fn wildcard_subscription_receives_all_events() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&hits);
notifier.subscribe(
"*",
Box::new(move |event| {
lock(&received).push(event.name.clone());
}),
);
for name in ["render", "sql"] {
notifier.publish(Event {
name: name.to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
}
assert_eq!(
&*lock(&hits),
&[String::from("render"), String::from("sql")]
);
}
#[test]
fn prefix_subscription_receives_matching_events_only() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&hits);
notifier.subscribe(
"render.*",
Box::new(move |event| {
lock(&received).push(event.name.clone());
}),
);
notifier.publish(Event {
name: "render.template".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
notifier.publish(Event {
name: "sql.active_record".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
assert_eq!(&*lock(&hits), &[String::from("render.template")]);
}
#[test]
fn payload_is_passed_through() {
let notifier = Notifier::new();
let payloads = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&payloads);
notifier.subscribe(
"process",
Box::new(move |event| {
lock(&received).push(event.payload.get("status").cloned());
}),
);
notifier.instrument(
"process",
HashMap::from([(String::from("status"), Value::from("ok"))]),
|| {},
);
assert_eq!(&*lock(&payloads), &[Some(Value::from("ok"))]);
}
#[test]
fn nested_instrumentation_publishes_both_events() {
let notifier = Notifier::new();
let names = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&names);
notifier.subscribe(
"*",
Box::new(move |event| {
lock(&received).push(event.name.clone());
}),
);
notifier.instrument("outer", HashMap::new(), || {
notifier.instrument("inner", HashMap::new(), || {});
});
assert_eq!(
&*lock(&names),
&[String::from("inner"), String::from("outer")]
);
}
#[test]
fn instrument_returns_block_result() {
let notifier = Notifier::new();
let result = notifier.instrument("math", HashMap::new(), || 6 * 7);
assert_eq!(result, 42);
}
#[test]
fn global_functions_work() {
let hits = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&hits);
let id = subscribe(
"global.test",
Box::new(move |event| {
lock(&received).push(event.name.clone());
}),
);
instrument("global.test", HashMap::new(), || {});
default_notifier().unsubscribe(id);
assert_eq!(&*lock(&hits), &[String::from("global.test")]);
}
#[test]
fn unsubscribe_unknown_id_is_a_noop() {
let notifier = Notifier::new();
notifier.unsubscribe(999);
}
#[test]
fn subscriptions_receive_distinct_ids() {
let notifier = Notifier::new();
let first = notifier.subscribe("a", Box::new(|_| {}));
let second = notifier.subscribe("a", Box::new(|_| {}));
assert_ne!(first, second);
}
#[test]
fn publish_to_no_subscribers_is_safe() {
let notifier = Notifier::new();
notifier.publish(Event {
name: "unused".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
}
#[test]
fn subscriber_can_observe_event_timestamps() {
let notifier = Notifier::new();
let observed = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&observed);
notifier.subscribe(
"timed",
Box::new(move |event| {
lock(&received).push((event.time <= event.end, event.duration >= Duration::ZERO));
}),
);
notifier.instrument("timed", HashMap::new(), || {
thread::sleep(StdDuration::from_millis(5));
});
assert_eq!(&*lock(&observed), &[(true, true)]);
}
#[test]
fn unsubscribe_removes_only_targeted_subscription() {
let notifier = Notifier::new();
let first_hits = Arc::new(Mutex::new(0usize));
let second_hits = Arc::new(Mutex::new(0usize));
let first_received = Arc::clone(&first_hits);
let second_received = Arc::clone(&second_hits);
let first_id = notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&first_received) += 1;
}),
);
notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&second_received) += 1;
}),
);
notifier.publish(Event {
name: "render".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
notifier.unsubscribe(first_id);
notifier.publish(Event {
name: "render".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
assert_eq!(*lock(&first_hits), 1);
assert_eq!(*lock(&second_hits), 2);
}
#[test]
fn matching_subscriptions_fan_out_to_all_matching_patterns() {
let notifier = Notifier::new();
let deliveries = Arc::new(Mutex::new(Vec::new()));
for (pattern, label) in [
("render.template", "exact"),
("render.*", "prefix"),
("*", "wildcard"),
] {
let deliveries = Arc::clone(&deliveries);
notifier.subscribe(
pattern,
Box::new(move |_| {
lock(&deliveries).push(label.to_owned());
}),
);
}
notifier.publish(Event {
name: "render.template".to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
});
let mut deliveries = lock(&deliveries);
deliveries.sort();
assert_eq!(
&*deliveries,
&[
String::from("exact"),
String::from("prefix"),
String::from("wildcard"),
]
);
}
#[test]
fn instrument_timestamps_bracket_execution_and_match_duration() {
let notifier = Notifier::new();
let observed = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&observed);
notifier.subscribe(
"timed",
Box::new(move |event| {
lock(&received).push((event.time, event.end, event.duration));
}),
);
let before = Instant::now();
let (inside_start, inside_end) = notifier.instrument("timed", HashMap::new(), || {
let inside_start = Instant::now();
thread::sleep(StdDuration::from_millis(5));
let inside_end = Instant::now();
(inside_start, inside_end)
});
let after = Instant::now();
let observed = lock(&observed);
assert_eq!(observed.len(), 1);
let (event_start, event_end, duration) = observed[0];
assert!(before <= event_start);
assert!(event_start <= inside_start);
assert!(inside_end <= event_end);
assert!(event_end <= after);
assert_eq!(duration, event_end.saturating_duration_since(event_start));
}
#[test]
fn nested_instrumentation_keeps_payloads_separate_and_ordered() {
let notifier = Notifier::new();
let observed = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&observed);
notifier.subscribe(
"*",
Box::new(move |event| {
lock(&received).push((event.name.clone(), event.payload.get("kind").cloned()));
}),
);
notifier.instrument(
"outer",
HashMap::from([(String::from("kind"), Value::from("outer"))]),
|| {
notifier.instrument(
"inner",
HashMap::from([(String::from("kind"), Value::from("inner"))]),
|| {},
);
let observed = lock(&observed);
assert_eq!(observed.len(), 1);
assert_eq!(
observed[0],
(String::from("inner"), Some(Value::from("inner")))
);
},
);
assert_eq!(
&*lock(&observed),
&[
(String::from("inner"), Some(Value::from("inner"))),
(String::from("outer"), Some(Value::from("outer"))),
]
);
}
fn event(name: &str) -> Event {
Event {
name: name.to_owned(),
payload: HashMap::new(),
time: Instant::now(),
end: Instant::now(),
duration: Duration::ZERO,
}
}
fn publish_named(notifier: &Notifier, name: &str) {
notifier.publish(event(name));
}
macro_rules! pattern_delivery_case {
($name:ident, $pattern:expr, $event_name:expr, $expected:expr) => {
#[test]
fn $name() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
let received = Arc::clone(&hits);
notifier.subscribe(
$pattern,
Box::new(move |_| {
*lock(&received) += 1;
}),
);
publish_named(¬ifier, $event_name);
assert_eq!(*lock(&hits), $expected);
}
};
}
pattern_delivery_case!(exact_pattern_matches_same_name, "render", "render", 1);
pattern_delivery_case!(
exact_pattern_rejects_prefixed_name,
"render",
"render.template",
0
);
pattern_delivery_case!(wildcard_pattern_matches_empty_name, "*", "", 1);
pattern_delivery_case!(
wildcard_pattern_matches_nested_name,
"*",
"sql.active_record",
1
);
pattern_delivery_case!(prefix_pattern_matches_exact_prefix, "render*", "render", 1);
pattern_delivery_case!(
prefix_pattern_matches_extended_name,
"render*",
"render.template",
1
);
pattern_delivery_case!(
prefix_pattern_rejects_other_prefix,
"render*",
"sql.render",
0
);
pattern_delivery_case!(short_prefix_matches_base_name, "sql*", "sql", 1);
pattern_delivery_case!(
short_prefix_matches_extended_name,
"sql*",
"sql.active_record",
1
);
pattern_delivery_case!(exact_pattern_rejects_empty_name, "render", "", 0);
#[test]
fn new_notifier_starts_without_subscriptions() {
let notifier = Notifier::new();
assert!(notifier.subscriptions.is_empty());
assert!(notifier.subscription_keys.is_empty());
}
#[test]
fn first_subscription_id_is_one() {
let notifier = Notifier::new();
assert_eq!(notifier.subscribe("render", Box::new(|_| {})), 1);
}
#[test]
fn subscription_ids_increase_monotonically() {
let notifier = Notifier::new();
let first = notifier.subscribe("render", Box::new(|_| {}));
let second = notifier.subscribe("render", Box::new(|_| {}));
let third = notifier.subscribe("sql", Box::new(|_| {}));
assert!(first < second && second < third);
}
#[test]
fn subscribe_records_pattern_and_id() {
let notifier = Notifier::new();
let id = notifier.subscribe("render", Box::new(|_| {}));
assert_eq!(
notifier.subscription_keys.get(&id).as_deref(),
Some(&"render".to_owned())
);
assert_eq!(
notifier
.subscriptions
.get("render")
.map(|value| value.len()),
Some(1)
);
}
#[test]
fn unsubscribe_removes_id_mapping() {
let notifier = Notifier::new();
let id = notifier.subscribe("render", Box::new(|_| {}));
notifier.unsubscribe(id);
assert!(!notifier.subscription_keys.contains_key(&id));
}
#[test]
fn unsubscribe_last_subscription_removes_pattern_bucket() {
let notifier = Notifier::new();
let id = notifier.subscribe("render", Box::new(|_| {}));
notifier.unsubscribe(id);
assert!(!notifier.subscriptions.contains_key("render"));
}
#[test]
fn unsubscribe_one_of_many_keeps_pattern_bucket() {
let notifier = Notifier::new();
let first = notifier.subscribe("render", Box::new(|_| {}));
let second = notifier.subscribe("render", Box::new(|_| {}));
notifier.unsubscribe(first);
assert_eq!(
notifier
.subscriptions
.get("render")
.map(|value| value.len()),
Some(1)
);
assert!(notifier.subscription_keys.contains_key(&second));
}
#[test]
fn publish_no_subscribers_keeps_state_empty() {
let notifier = Notifier::new();
publish_named(¬ifier, "unused");
assert!(notifier.subscriptions.is_empty());
assert!(notifier.subscription_keys.is_empty());
}
#[test]
fn multiple_publishes_deliver_multiple_times() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
let received = Arc::clone(&hits);
notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&received) += 1;
}),
);
publish_named(¬ifier, "render");
publish_named(¬ifier, "render");
publish_named(¬ifier, "render");
assert_eq!(*lock(&hits), 3);
}
#[test]
fn same_callback_subscribed_twice_receives_two_deliveries() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
for _ in 0..2 {
let received = Arc::clone(&hits);
notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&received) += 1;
}),
);
}
publish_named(¬ifier, "render");
assert_eq!(*lock(&hits), 2);
}
#[test]
fn publish_order_is_preserved_within_same_pattern() {
let notifier = Notifier::new();
let order = Arc::new(Mutex::new(Vec::new()));
for label in ["first", "second", "third"] {
let order = Arc::clone(&order);
notifier.subscribe(
"render",
Box::new(move |_| {
lock(&order).push(label.to_owned());
}),
);
}
publish_named(¬ifier, "render");
assert_eq!(
&*lock(&order),
&[
String::from("first"),
String::from("second"),
String::from("third"),
]
);
}
#[test]
fn same_event_reaches_exact_prefix_and_wildcard() {
let notifier = Notifier::new();
let deliveries = Arc::new(Mutex::new(Vec::new()));
for (pattern, label) in [
("render", "exact"),
("render*", "prefix"),
("*", "wildcard"),
] {
let deliveries = Arc::clone(&deliveries);
notifier.subscribe(
pattern,
Box::new(move |_| {
lock(&deliveries).push(label.to_owned());
}),
);
}
publish_named(¬ifier, "render");
let mut deliveries = lock(&deliveries);
deliveries.sort();
assert_eq!(
&*deliveries,
&[
String::from("exact"),
String::from("prefix"),
String::from("wildcard"),
]
);
}
#[test]
fn instrument_with_empty_event_name_notifies_exact_subscriber() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
let received = Arc::clone(&hits);
notifier.subscribe("", Box::new(move |_| *lock(&received) += 1));
notifier.instrument("", HashMap::new(), || {});
assert_eq!(*lock(&hits), 1);
}
#[test]
fn instrument_with_unicode_event_name_notifies_exact_subscriber() {
let notifier = Notifier::new();
let names = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&names);
notifier.subscribe(
"résumé.render",
Box::new(move |event| lock(&received).push(event.name.clone())),
);
notifier.instrument("résumé.render", HashMap::new(), || {});
assert_eq!(&*lock(&names), &[String::from("résumé.render")]);
}
#[test]
fn instrument_passes_numeric_payload() {
let notifier = Notifier::new();
let values = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&values);
notifier.subscribe(
"math",
Box::new(move |event| lock(&received).push(event.payload.get("answer").cloned())),
);
notifier.instrument(
"math",
HashMap::from([(String::from("answer"), Value::from(42))]),
|| {},
);
assert_eq!(&*lock(&values), &[Some(Value::from(42))]);
}
#[test]
fn instrument_passes_unicode_payload() {
let notifier = Notifier::new();
let values = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&values);
notifier.subscribe(
"greet",
Box::new(move |event| lock(&received).push(event.payload.get("message").cloned())),
);
notifier.instrument(
"greet",
HashMap::from([(String::from("message"), Value::from("héllø 🌍"))]),
|| {},
);
assert_eq!(&*lock(&values), &[Some(Value::from("héllø 🌍"))]);
}
#[test]
fn subscriber_can_publish_reentrant_event() {
let notifier = Arc::new(Notifier::new());
let seen = Arc::new(Mutex::new(Vec::new()));
{
let notifier = Arc::clone(¬ifier);
let notifier_for_callback = Arc::clone(¬ifier);
let seen = Arc::clone(&seen);
notifier.subscribe(
"outer",
Box::new(move |_| {
lock(&seen).push(String::from("outer"));
publish_named(¬ifier_for_callback, "inner");
}),
);
}
{
let seen = Arc::clone(&seen);
notifier.subscribe(
"inner",
Box::new(move |_| {
lock(&seen).push(String::from("inner"));
}),
);
}
publish_named(¬ifier, "outer");
assert_eq!(
&*lock(&seen),
&[String::from("outer"), String::from("inner")]
);
}
#[test]
fn callbacks_run_after_block_completes() {
let notifier = Notifier::new();
let state = Arc::new(Mutex::new(String::from("before")));
let received_state = Arc::new(Mutex::new(Vec::new()));
{
let state = Arc::clone(&state);
let received_state = Arc::clone(&received_state);
notifier.subscribe(
"render",
Box::new(move |_| {
lock(&received_state).push(lock(&state).clone());
}),
);
}
notifier.instrument("render", HashMap::new(), || {
*lock(&state) = String::from("after");
});
assert_eq!(&*lock(&received_state), &[String::from("after")]);
}
#[test]
fn default_notifier_returns_same_instance() {
assert!(std::ptr::eq(default_notifier(), default_notifier()));
}
#[test]
fn global_wildcard_receives_multiple_events() {
let hits = Arc::new(Mutex::new(Vec::new()));
let received = Arc::clone(&hits);
let id = subscribe(
"*",
Box::new(move |event| lock(&received).push(event.name.clone())),
);
instrument("global.alpha", HashMap::new(), || {});
instrument("global.beta", HashMap::new(), || {});
default_notifier().unsubscribe(id);
let hits = lock(&hits);
assert!(hits.contains(&String::from("global.alpha")));
assert!(hits.contains(&String::from("global.beta")));
}
#[test]
fn global_subscription_ids_are_distinct() {
let first = subscribe("global.ids", Box::new(|_| {}));
let second = subscribe("global.ids", Box::new(|_| {}));
default_notifier().unsubscribe(first);
default_notifier().unsubscribe(second);
assert_ne!(first, second);
}
#[test]
fn unsubscribe_from_one_pattern_does_not_affect_other_pattern() {
let notifier = Notifier::new();
let render_id = notifier.subscribe("render", Box::new(|_| {}));
let sql_id = notifier.subscribe("sql", Box::new(|_| {}));
notifier.unsubscribe(render_id);
assert!(!notifier.subscription_keys.contains_key(&render_id));
assert!(notifier.subscription_keys.contains_key(&sql_id));
assert!(notifier.subscriptions.contains_key("sql"));
}
#[test]
fn publish_to_unmatched_pattern_does_not_invoke_exact_subscriber() {
let notifier = Notifier::new();
let hits = Arc::new(Mutex::new(0usize));
let received = Arc::clone(&hits);
notifier.subscribe(
"render",
Box::new(move |_| {
*lock(&received) += 1;
}),
);
publish_named(¬ifier, "sql");
assert_eq!(*lock(&hits), 0);
}
#[test]
fn prefix_and_exact_subscribers_can_be_removed_independently() {
let notifier = Notifier::new();
let exact = notifier.subscribe("render", Box::new(|_| {}));
let prefix = notifier.subscribe("render*", Box::new(|_| {}));
notifier.unsubscribe(exact);
assert!(!notifier.subscription_keys.contains_key(&exact));
assert!(notifier.subscription_keys.contains_key(&prefix));
assert!(notifier.subscriptions.contains_key("render*"));
}
#[test]
fn nested_instrument_returns_outer_result() {
let notifier = Notifier::new();
let result = notifier.instrument("outer", HashMap::new(), || {
notifier.instrument("inner", HashMap::new(), || 21) + 21
});
assert_eq!(result, 42);
}
#[test]
fn removing_both_same_pattern_subscriptions_clears_bucket() {
let notifier = Notifier::new();
let first = notifier.subscribe("render", Box::new(|_| {}));
let second = notifier.subscribe("render", Box::new(|_| {}));
notifier.unsubscribe(first);
notifier.unsubscribe(second);
assert!(!notifier.subscriptions.contains_key("render"));
assert!(notifier.subscription_keys.is_empty());
}
}