1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::{Duration, Instant};
5
6use dashmap::DashMap;
7use once_cell::sync::Lazy;
8use serde_json::Value;
9
10#[derive(Debug, Clone)]
12pub struct Event {
13 pub name: String,
15 pub payload: HashMap<String, Value>,
17 pub time: Instant,
19 pub end: Instant,
21 pub duration: Duration,
23}
24
25pub type Subscriber = Box<dyn Fn(&Event) + Send + Sync + 'static>;
27
28type SharedSubscriber = Arc<dyn Fn(&Event) + Send + Sync + 'static>;
29
30#[derive(Clone)]
31struct Subscription {
32 id: usize,
33 callback: SharedSubscriber,
34}
35
36#[derive(Default)]
38pub struct Notifier {
39 subscriptions: DashMap<String, Vec<Subscription>>,
40 subscription_keys: DashMap<usize, String>,
41 next_id: AtomicUsize,
42}
43
44impl Notifier {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 subscriptions: DashMap::new(),
50 subscription_keys: DashMap::new(),
51 next_id: AtomicUsize::new(1),
52 }
53 }
54
55 pub fn subscribe(&self, event_name: &str, callback: Subscriber) -> usize {
60 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
61 let pattern = event_name.to_owned();
62 let shared = Arc::<dyn Fn(&Event) + Send + Sync>::from(callback);
63
64 self.subscriptions
65 .entry(pattern.clone())
66 .or_default()
67 .push(Subscription {
68 id,
69 callback: shared,
70 });
71 self.subscription_keys.insert(id, pattern);
72
73 id
74 }
75
76 pub fn unsubscribe(&self, id: usize) {
78 let Some((_, pattern)) = self.subscription_keys.remove(&id) else {
79 return;
80 };
81
82 let should_remove_key =
83 if let Some(mut subscriptions) = self.subscriptions.get_mut(&pattern) {
84 subscriptions.retain(|subscription| subscription.id != id);
85 subscriptions.is_empty()
86 } else {
87 false
88 };
89
90 if should_remove_key {
91 self.subscriptions.remove(&pattern);
92 }
93 }
94
95 pub fn instrument<F, R>(&self, event_name: &str, payload: HashMap<String, Value>, f: F) -> R
97 where
98 F: FnOnce() -> R,
99 {
100 let start = Instant::now();
101 let result = f();
102 let end = Instant::now();
103
104 self.publish(Event {
105 name: event_name.to_owned(),
106 payload,
107 time: start,
108 end,
109 duration: end.saturating_duration_since(start),
110 });
111
112 result
113 }
114
115 pub fn publish(&self, event: Event) {
117 let callbacks: Vec<SharedSubscriber> = self
118 .subscriptions
119 .iter()
120 .filter(|entry| Self::matches(entry.key(), &event.name))
121 .flat_map(|entry| {
122 entry
123 .value()
124 .iter()
125 .map(|subscription| Arc::clone(&subscription.callback))
126 .collect::<Vec<_>>()
127 })
128 .collect();
129
130 for callback in callbacks {
131 callback(&event);
132 }
133 }
134
135 fn matches(pattern: &str, event_name: &str) -> bool {
136 if pattern == "*" {
137 return true;
138 }
139
140 if let Some(prefix) = pattern.strip_suffix('*') {
141 return event_name.starts_with(prefix);
142 }
143
144 pattern == event_name
145 }
146}
147
148static DEFAULT_NOTIFIER: Lazy<Notifier> = Lazy::new(Notifier::new);
149
150#[must_use]
152pub fn default_notifier() -> &'static Notifier {
153 &DEFAULT_NOTIFIER
154}
155
156pub fn subscribe(event: &str, callback: Subscriber) -> usize {
158 default_notifier().subscribe(event, callback)
159}
160
161pub fn instrument<F, R>(event: &str, payload: HashMap<String, Value>, f: F) -> R
163where
164 F: FnOnce() -> R,
165{
166 default_notifier().instrument(event, payload, f)
167}
168
169#[cfg(test)]
170mod tests {
171 use std::sync::{Arc, Mutex};
172 use std::thread;
173 use std::time::Duration as StdDuration;
174
175 use super::*;
176
177 fn lock<T>(value: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
178 value.lock().expect("mutex should not be poisoned")
179 }
180
181 #[test]
182 fn subscribe_and_receive_published_events() {
183 let notifier = Notifier::new();
184 let events = Arc::new(Mutex::new(Vec::new()));
185 let received = Arc::clone(&events);
186
187 notifier.subscribe(
188 "render",
189 Box::new(move |event| {
190 lock(&received).push(event.name.clone());
191 }),
192 );
193
194 notifier.publish(Event {
195 name: "render".to_owned(),
196 payload: HashMap::new(),
197 time: Instant::now(),
198 end: Instant::now(),
199 duration: Duration::ZERO,
200 });
201
202 assert_eq!(&*lock(&events), &[String::from("render")]);
203 }
204
205 #[test]
206 fn instrument_measures_duration() {
207 let notifier = Notifier::new();
208 let durations = Arc::new(Mutex::new(Vec::new()));
209 let received = Arc::clone(&durations);
210
211 notifier.subscribe(
212 "slow",
213 Box::new(move |event| {
214 lock(&received).push(event.duration);
215 }),
216 );
217
218 notifier.instrument("slow", HashMap::new(), || {
219 thread::sleep(StdDuration::from_millis(15));
220 });
221
222 let durations = lock(&durations);
223 assert_eq!(durations.len(), 1);
224 assert!(durations[0] >= StdDuration::from_millis(15));
225 }
226
227 #[test]
228 fn unsubscribe_stops_delivery() {
229 let notifier = Notifier::new();
230 let hits = Arc::new(Mutex::new(0usize));
231 let received = Arc::clone(&hits);
232
233 let id = notifier.subscribe(
234 "render",
235 Box::new(move |_| {
236 *lock(&received) += 1;
237 }),
238 );
239
240 notifier.publish(Event {
241 name: "render".to_owned(),
242 payload: HashMap::new(),
243 time: Instant::now(),
244 end: Instant::now(),
245 duration: Duration::ZERO,
246 });
247 notifier.unsubscribe(id);
248 notifier.publish(Event {
249 name: "render".to_owned(),
250 payload: HashMap::new(),
251 time: Instant::now(),
252 end: Instant::now(),
253 duration: Duration::ZERO,
254 });
255
256 assert_eq!(*lock(&hits), 1);
257 }
258
259 #[test]
260 fn multiple_subscribers_receive_the_same_event() {
261 let notifier = Notifier::new();
262 let hits = Arc::new(Mutex::new(Vec::new()));
263
264 for name in ["first", "second"] {
265 let hits = Arc::clone(&hits);
266 notifier.subscribe(
267 "render",
268 Box::new(move |_| {
269 lock(&hits).push(name.to_owned());
270 }),
271 );
272 }
273
274 notifier.publish(Event {
275 name: "render".to_owned(),
276 payload: HashMap::new(),
277 time: Instant::now(),
278 end: Instant::now(),
279 duration: Duration::ZERO,
280 });
281
282 let hits = lock(&hits);
283 assert_eq!(hits.len(), 2);
284 assert!(hits.contains(&String::from("first")));
285 assert!(hits.contains(&String::from("second")));
286 }
287
288 #[test]
289 fn exact_subscriptions_do_not_receive_other_events() {
290 let notifier = Notifier::new();
291 let hits = Arc::new(Mutex::new(0usize));
292 let received = Arc::clone(&hits);
293
294 notifier.subscribe(
295 "render",
296 Box::new(move |_| {
297 *lock(&received) += 1;
298 }),
299 );
300
301 notifier.publish(Event {
302 name: "sql".to_owned(),
303 payload: HashMap::new(),
304 time: Instant::now(),
305 end: Instant::now(),
306 duration: Duration::ZERO,
307 });
308
309 assert_eq!(*lock(&hits), 0);
310 }
311
312 #[test]
313 fn wildcard_subscription_receives_all_events() {
314 let notifier = Notifier::new();
315 let hits = Arc::new(Mutex::new(Vec::new()));
316 let received = Arc::clone(&hits);
317
318 notifier.subscribe(
319 "*",
320 Box::new(move |event| {
321 lock(&received).push(event.name.clone());
322 }),
323 );
324
325 for name in ["render", "sql"] {
326 notifier.publish(Event {
327 name: name.to_owned(),
328 payload: HashMap::new(),
329 time: Instant::now(),
330 end: Instant::now(),
331 duration: Duration::ZERO,
332 });
333 }
334
335 assert_eq!(
336 &*lock(&hits),
337 &[String::from("render"), String::from("sql")]
338 );
339 }
340
341 #[test]
342 fn prefix_subscription_receives_matching_events_only() {
343 let notifier = Notifier::new();
344 let hits = Arc::new(Mutex::new(Vec::new()));
345 let received = Arc::clone(&hits);
346
347 notifier.subscribe(
348 "render.*",
349 Box::new(move |event| {
350 lock(&received).push(event.name.clone());
351 }),
352 );
353
354 notifier.publish(Event {
355 name: "render.template".to_owned(),
356 payload: HashMap::new(),
357 time: Instant::now(),
358 end: Instant::now(),
359 duration: Duration::ZERO,
360 });
361 notifier.publish(Event {
362 name: "sql.active_record".to_owned(),
363 payload: HashMap::new(),
364 time: Instant::now(),
365 end: Instant::now(),
366 duration: Duration::ZERO,
367 });
368
369 assert_eq!(&*lock(&hits), &[String::from("render.template")]);
370 }
371
372 #[test]
373 fn payload_is_passed_through() {
374 let notifier = Notifier::new();
375 let payloads = Arc::new(Mutex::new(Vec::new()));
376 let received = Arc::clone(&payloads);
377
378 notifier.subscribe(
379 "process",
380 Box::new(move |event| {
381 lock(&received).push(event.payload.get("status").cloned());
382 }),
383 );
384
385 notifier.instrument(
386 "process",
387 HashMap::from([(String::from("status"), Value::from("ok"))]),
388 || {},
389 );
390
391 assert_eq!(&*lock(&payloads), &[Some(Value::from("ok"))]);
392 }
393
394 #[test]
395 fn nested_instrumentation_publishes_both_events() {
396 let notifier = Notifier::new();
397 let names = Arc::new(Mutex::new(Vec::new()));
398 let received = Arc::clone(&names);
399
400 notifier.subscribe(
401 "*",
402 Box::new(move |event| {
403 lock(&received).push(event.name.clone());
404 }),
405 );
406
407 notifier.instrument("outer", HashMap::new(), || {
408 notifier.instrument("inner", HashMap::new(), || {});
409 });
410
411 assert_eq!(
412 &*lock(&names),
413 &[String::from("inner"), String::from("outer")]
414 );
415 }
416
417 #[test]
418 fn instrument_returns_block_result() {
419 let notifier = Notifier::new();
420
421 let result = notifier.instrument("math", HashMap::new(), || 6 * 7);
422
423 assert_eq!(result, 42);
424 }
425
426 #[test]
427 fn global_functions_work() {
428 let hits = Arc::new(Mutex::new(Vec::new()));
429 let received = Arc::clone(&hits);
430
431 let id = subscribe(
432 "global.test",
433 Box::new(move |event| {
434 lock(&received).push(event.name.clone());
435 }),
436 );
437
438 instrument("global.test", HashMap::new(), || {});
439 default_notifier().unsubscribe(id);
440
441 assert_eq!(&*lock(&hits), &[String::from("global.test")]);
442 }
443
444 #[test]
445 fn unsubscribe_unknown_id_is_a_noop() {
446 let notifier = Notifier::new();
447 notifier.unsubscribe(999);
448 }
449
450 #[test]
451 fn subscriptions_receive_distinct_ids() {
452 let notifier = Notifier::new();
453 let first = notifier.subscribe("a", Box::new(|_| {}));
454 let second = notifier.subscribe("a", Box::new(|_| {}));
455
456 assert_ne!(first, second);
457 }
458
459 #[test]
460 fn publish_to_no_subscribers_is_safe() {
461 let notifier = Notifier::new();
462 notifier.publish(Event {
463 name: "unused".to_owned(),
464 payload: HashMap::new(),
465 time: Instant::now(),
466 end: Instant::now(),
467 duration: Duration::ZERO,
468 });
469 }
470
471 #[test]
472 fn subscriber_can_observe_event_timestamps() {
473 let notifier = Notifier::new();
474 let observed = Arc::new(Mutex::new(Vec::new()));
475 let received = Arc::clone(&observed);
476
477 notifier.subscribe(
478 "timed",
479 Box::new(move |event| {
480 lock(&received).push((event.time <= event.end, event.duration >= Duration::ZERO));
481 }),
482 );
483
484 notifier.instrument("timed", HashMap::new(), || {
485 thread::sleep(StdDuration::from_millis(5));
486 });
487
488 assert_eq!(&*lock(&observed), &[(true, true)]);
489 }
490
491 #[test]
492 fn unsubscribe_removes_only_targeted_subscription() {
493 let notifier = Notifier::new();
494 let first_hits = Arc::new(Mutex::new(0usize));
495 let second_hits = Arc::new(Mutex::new(0usize));
496 let first_received = Arc::clone(&first_hits);
497 let second_received = Arc::clone(&second_hits);
498
499 let first_id = notifier.subscribe(
500 "render",
501 Box::new(move |_| {
502 *lock(&first_received) += 1;
503 }),
504 );
505 notifier.subscribe(
506 "render",
507 Box::new(move |_| {
508 *lock(&second_received) += 1;
509 }),
510 );
511
512 notifier.publish(Event {
513 name: "render".to_owned(),
514 payload: HashMap::new(),
515 time: Instant::now(),
516 end: Instant::now(),
517 duration: Duration::ZERO,
518 });
519 notifier.unsubscribe(first_id);
520 notifier.publish(Event {
521 name: "render".to_owned(),
522 payload: HashMap::new(),
523 time: Instant::now(),
524 end: Instant::now(),
525 duration: Duration::ZERO,
526 });
527
528 assert_eq!(*lock(&first_hits), 1);
529 assert_eq!(*lock(&second_hits), 2);
530 }
531
532 #[test]
533 fn matching_subscriptions_fan_out_to_all_matching_patterns() {
534 let notifier = Notifier::new();
535 let deliveries = Arc::new(Mutex::new(Vec::new()));
536
537 for (pattern, label) in [
538 ("render.template", "exact"),
539 ("render.*", "prefix"),
540 ("*", "wildcard"),
541 ] {
542 let deliveries = Arc::clone(&deliveries);
543 notifier.subscribe(
544 pattern,
545 Box::new(move |_| {
546 lock(&deliveries).push(label.to_owned());
547 }),
548 );
549 }
550
551 notifier.publish(Event {
552 name: "render.template".to_owned(),
553 payload: HashMap::new(),
554 time: Instant::now(),
555 end: Instant::now(),
556 duration: Duration::ZERO,
557 });
558
559 let mut deliveries = lock(&deliveries);
560 deliveries.sort();
561 assert_eq!(
562 &*deliveries,
563 &[
564 String::from("exact"),
565 String::from("prefix"),
566 String::from("wildcard"),
567 ]
568 );
569 }
570
571 #[test]
572 fn instrument_timestamps_bracket_execution_and_match_duration() {
573 let notifier = Notifier::new();
574 let observed = Arc::new(Mutex::new(Vec::new()));
575 let received = Arc::clone(&observed);
576
577 notifier.subscribe(
578 "timed",
579 Box::new(move |event| {
580 lock(&received).push((event.time, event.end, event.duration));
581 }),
582 );
583
584 let before = Instant::now();
585 let (inside_start, inside_end) = notifier.instrument("timed", HashMap::new(), || {
586 let inside_start = Instant::now();
587 thread::sleep(StdDuration::from_millis(5));
588 let inside_end = Instant::now();
589 (inside_start, inside_end)
590 });
591 let after = Instant::now();
592
593 let observed = lock(&observed);
594 assert_eq!(observed.len(), 1);
595 let (event_start, event_end, duration) = observed[0];
596 assert!(before <= event_start);
597 assert!(event_start <= inside_start);
598 assert!(inside_end <= event_end);
599 assert!(event_end <= after);
600 assert_eq!(duration, event_end.saturating_duration_since(event_start));
601 }
602
603 #[test]
604 fn nested_instrumentation_keeps_payloads_separate_and_ordered() {
605 let notifier = Notifier::new();
606 let observed = Arc::new(Mutex::new(Vec::new()));
607 let received = Arc::clone(&observed);
608
609 notifier.subscribe(
610 "*",
611 Box::new(move |event| {
612 lock(&received).push((event.name.clone(), event.payload.get("kind").cloned()));
613 }),
614 );
615
616 notifier.instrument(
617 "outer",
618 HashMap::from([(String::from("kind"), Value::from("outer"))]),
619 || {
620 notifier.instrument(
621 "inner",
622 HashMap::from([(String::from("kind"), Value::from("inner"))]),
623 || {},
624 );
625
626 let observed = lock(&observed);
627 assert_eq!(observed.len(), 1);
628 assert_eq!(
629 observed[0],
630 (String::from("inner"), Some(Value::from("inner")))
631 );
632 },
633 );
634
635 assert_eq!(
636 &*lock(&observed),
637 &[
638 (String::from("inner"), Some(Value::from("inner"))),
639 (String::from("outer"), Some(Value::from("outer"))),
640 ]
641 );
642 }
643
644 fn event(name: &str) -> Event {
645 Event {
646 name: name.to_owned(),
647 payload: HashMap::new(),
648 time: Instant::now(),
649 end: Instant::now(),
650 duration: Duration::ZERO,
651 }
652 }
653
654 fn publish_named(notifier: &Notifier, name: &str) {
655 notifier.publish(event(name));
656 }
657
658 macro_rules! pattern_delivery_case {
659 ($name:ident, $pattern:expr, $event_name:expr, $expected:expr) => {
660 #[test]
661 fn $name() {
662 let notifier = Notifier::new();
663 let hits = Arc::new(Mutex::new(0usize));
664 let received = Arc::clone(&hits);
665
666 notifier.subscribe(
667 $pattern,
668 Box::new(move |_| {
669 *lock(&received) += 1;
670 }),
671 );
672 publish_named(¬ifier, $event_name);
673
674 assert_eq!(*lock(&hits), $expected);
675 }
676 };
677 }
678
679 pattern_delivery_case!(exact_pattern_matches_same_name, "render", "render", 1);
680 pattern_delivery_case!(
681 exact_pattern_rejects_prefixed_name,
682 "render",
683 "render.template",
684 0
685 );
686 pattern_delivery_case!(wildcard_pattern_matches_empty_name, "*", "", 1);
687 pattern_delivery_case!(
688 wildcard_pattern_matches_nested_name,
689 "*",
690 "sql.active_record",
691 1
692 );
693 pattern_delivery_case!(prefix_pattern_matches_exact_prefix, "render*", "render", 1);
694 pattern_delivery_case!(
695 prefix_pattern_matches_extended_name,
696 "render*",
697 "render.template",
698 1
699 );
700 pattern_delivery_case!(
701 prefix_pattern_rejects_other_prefix,
702 "render*",
703 "sql.render",
704 0
705 );
706 pattern_delivery_case!(short_prefix_matches_base_name, "sql*", "sql", 1);
707 pattern_delivery_case!(
708 short_prefix_matches_extended_name,
709 "sql*",
710 "sql.active_record",
711 1
712 );
713 pattern_delivery_case!(exact_pattern_rejects_empty_name, "render", "", 0);
714
715 #[test]
716 fn new_notifier_starts_without_subscriptions() {
717 let notifier = Notifier::new();
718
719 assert!(notifier.subscriptions.is_empty());
720 assert!(notifier.subscription_keys.is_empty());
721 }
722
723 #[test]
724 fn first_subscription_id_is_one() {
725 let notifier = Notifier::new();
726
727 assert_eq!(notifier.subscribe("render", Box::new(|_| {})), 1);
728 }
729
730 #[test]
731 fn subscription_ids_increase_monotonically() {
732 let notifier = Notifier::new();
733 let first = notifier.subscribe("render", Box::new(|_| {}));
734 let second = notifier.subscribe("render", Box::new(|_| {}));
735 let third = notifier.subscribe("sql", Box::new(|_| {}));
736
737 assert!(first < second && second < third);
738 }
739
740 #[test]
741 fn subscribe_records_pattern_and_id() {
742 let notifier = Notifier::new();
743 let id = notifier.subscribe("render", Box::new(|_| {}));
744
745 assert_eq!(
746 notifier.subscription_keys.get(&id).as_deref(),
747 Some(&"render".to_owned())
748 );
749 assert_eq!(
750 notifier
751 .subscriptions
752 .get("render")
753 .map(|value| value.len()),
754 Some(1)
755 );
756 }
757
758 #[test]
759 fn unsubscribe_removes_id_mapping() {
760 let notifier = Notifier::new();
761 let id = notifier.subscribe("render", Box::new(|_| {}));
762 notifier.unsubscribe(id);
763
764 assert!(!notifier.subscription_keys.contains_key(&id));
765 }
766
767 #[test]
768 fn unsubscribe_last_subscription_removes_pattern_bucket() {
769 let notifier = Notifier::new();
770 let id = notifier.subscribe("render", Box::new(|_| {}));
771 notifier.unsubscribe(id);
772
773 assert!(!notifier.subscriptions.contains_key("render"));
774 }
775
776 #[test]
777 fn unsubscribe_one_of_many_keeps_pattern_bucket() {
778 let notifier = Notifier::new();
779 let first = notifier.subscribe("render", Box::new(|_| {}));
780 let second = notifier.subscribe("render", Box::new(|_| {}));
781 notifier.unsubscribe(first);
782
783 assert_eq!(
784 notifier
785 .subscriptions
786 .get("render")
787 .map(|value| value.len()),
788 Some(1)
789 );
790 assert!(notifier.subscription_keys.contains_key(&second));
791 }
792
793 #[test]
794 fn publish_no_subscribers_keeps_state_empty() {
795 let notifier = Notifier::new();
796 publish_named(¬ifier, "unused");
797
798 assert!(notifier.subscriptions.is_empty());
799 assert!(notifier.subscription_keys.is_empty());
800 }
801
802 #[test]
803 fn multiple_publishes_deliver_multiple_times() {
804 let notifier = Notifier::new();
805 let hits = Arc::new(Mutex::new(0usize));
806 let received = Arc::clone(&hits);
807
808 notifier.subscribe(
809 "render",
810 Box::new(move |_| {
811 *lock(&received) += 1;
812 }),
813 );
814 publish_named(¬ifier, "render");
815 publish_named(¬ifier, "render");
816 publish_named(¬ifier, "render");
817
818 assert_eq!(*lock(&hits), 3);
819 }
820
821 #[test]
822 fn same_callback_subscribed_twice_receives_two_deliveries() {
823 let notifier = Notifier::new();
824 let hits = Arc::new(Mutex::new(0usize));
825
826 for _ in 0..2 {
827 let received = Arc::clone(&hits);
828 notifier.subscribe(
829 "render",
830 Box::new(move |_| {
831 *lock(&received) += 1;
832 }),
833 );
834 }
835 publish_named(¬ifier, "render");
836
837 assert_eq!(*lock(&hits), 2);
838 }
839
840 #[test]
841 fn publish_order_is_preserved_within_same_pattern() {
842 let notifier = Notifier::new();
843 let order = Arc::new(Mutex::new(Vec::new()));
844
845 for label in ["first", "second", "third"] {
846 let order = Arc::clone(&order);
847 notifier.subscribe(
848 "render",
849 Box::new(move |_| {
850 lock(&order).push(label.to_owned());
851 }),
852 );
853 }
854 publish_named(¬ifier, "render");
855
856 assert_eq!(
857 &*lock(&order),
858 &[
859 String::from("first"),
860 String::from("second"),
861 String::from("third"),
862 ]
863 );
864 }
865
866 #[test]
867 fn same_event_reaches_exact_prefix_and_wildcard() {
868 let notifier = Notifier::new();
869 let deliveries = Arc::new(Mutex::new(Vec::new()));
870
871 for (pattern, label) in [
872 ("render", "exact"),
873 ("render*", "prefix"),
874 ("*", "wildcard"),
875 ] {
876 let deliveries = Arc::clone(&deliveries);
877 notifier.subscribe(
878 pattern,
879 Box::new(move |_| {
880 lock(&deliveries).push(label.to_owned());
881 }),
882 );
883 }
884 publish_named(¬ifier, "render");
885
886 let mut deliveries = lock(&deliveries);
887 deliveries.sort();
888 assert_eq!(
889 &*deliveries,
890 &[
891 String::from("exact"),
892 String::from("prefix"),
893 String::from("wildcard"),
894 ]
895 );
896 }
897
898 #[test]
899 fn instrument_with_empty_event_name_notifies_exact_subscriber() {
900 let notifier = Notifier::new();
901 let hits = Arc::new(Mutex::new(0usize));
902 let received = Arc::clone(&hits);
903 notifier.subscribe("", Box::new(move |_| *lock(&received) += 1));
904
905 notifier.instrument("", HashMap::new(), || {});
906
907 assert_eq!(*lock(&hits), 1);
908 }
909
910 #[test]
911 fn instrument_with_unicode_event_name_notifies_exact_subscriber() {
912 let notifier = Notifier::new();
913 let names = Arc::new(Mutex::new(Vec::new()));
914 let received = Arc::clone(&names);
915 notifier.subscribe(
916 "résumé.render",
917 Box::new(move |event| lock(&received).push(event.name.clone())),
918 );
919
920 notifier.instrument("résumé.render", HashMap::new(), || {});
921
922 assert_eq!(&*lock(&names), &[String::from("résumé.render")]);
923 }
924
925 #[test]
926 fn instrument_passes_numeric_payload() {
927 let notifier = Notifier::new();
928 let values = Arc::new(Mutex::new(Vec::new()));
929 let received = Arc::clone(&values);
930 notifier.subscribe(
931 "math",
932 Box::new(move |event| lock(&received).push(event.payload.get("answer").cloned())),
933 );
934
935 notifier.instrument(
936 "math",
937 HashMap::from([(String::from("answer"), Value::from(42))]),
938 || {},
939 );
940
941 assert_eq!(&*lock(&values), &[Some(Value::from(42))]);
942 }
943
944 #[test]
945 fn instrument_passes_unicode_payload() {
946 let notifier = Notifier::new();
947 let values = Arc::new(Mutex::new(Vec::new()));
948 let received = Arc::clone(&values);
949 notifier.subscribe(
950 "greet",
951 Box::new(move |event| lock(&received).push(event.payload.get("message").cloned())),
952 );
953
954 notifier.instrument(
955 "greet",
956 HashMap::from([(String::from("message"), Value::from("héllø 🌍"))]),
957 || {},
958 );
959
960 assert_eq!(&*lock(&values), &[Some(Value::from("héllø 🌍"))]);
961 }
962
963 #[test]
964 fn subscriber_can_publish_reentrant_event() {
965 let notifier = Arc::new(Notifier::new());
966 let seen = Arc::new(Mutex::new(Vec::new()));
967
968 {
969 let notifier = Arc::clone(¬ifier);
970 let notifier_for_callback = Arc::clone(¬ifier);
971 let seen = Arc::clone(&seen);
972 notifier.subscribe(
973 "outer",
974 Box::new(move |_| {
975 lock(&seen).push(String::from("outer"));
976 publish_named(¬ifier_for_callback, "inner");
977 }),
978 );
979 }
980 {
981 let seen = Arc::clone(&seen);
982 notifier.subscribe(
983 "inner",
984 Box::new(move |_| {
985 lock(&seen).push(String::from("inner"));
986 }),
987 );
988 }
989
990 publish_named(¬ifier, "outer");
991
992 assert_eq!(
993 &*lock(&seen),
994 &[String::from("outer"), String::from("inner")]
995 );
996 }
997
998 #[test]
999 fn callbacks_run_after_block_completes() {
1000 let notifier = Notifier::new();
1001 let state = Arc::new(Mutex::new(String::from("before")));
1002 let received_state = Arc::new(Mutex::new(Vec::new()));
1003
1004 {
1005 let state = Arc::clone(&state);
1006 let received_state = Arc::clone(&received_state);
1007 notifier.subscribe(
1008 "render",
1009 Box::new(move |_| {
1010 lock(&received_state).push(lock(&state).clone());
1011 }),
1012 );
1013 }
1014
1015 notifier.instrument("render", HashMap::new(), || {
1016 *lock(&state) = String::from("after");
1017 });
1018
1019 assert_eq!(&*lock(&received_state), &[String::from("after")]);
1020 }
1021
1022 #[test]
1023 fn default_notifier_returns_same_instance() {
1024 assert!(std::ptr::eq(default_notifier(), default_notifier()));
1025 }
1026
1027 #[test]
1028 fn global_wildcard_receives_multiple_events() {
1029 let hits = Arc::new(Mutex::new(Vec::new()));
1030 let received = Arc::clone(&hits);
1031 let id = subscribe(
1032 "*",
1033 Box::new(move |event| lock(&received).push(event.name.clone())),
1034 );
1035
1036 instrument("global.alpha", HashMap::new(), || {});
1037 instrument("global.beta", HashMap::new(), || {});
1038 default_notifier().unsubscribe(id);
1039
1040 let hits = lock(&hits);
1041 assert!(hits.contains(&String::from("global.alpha")));
1042 assert!(hits.contains(&String::from("global.beta")));
1043 }
1044
1045 #[test]
1046 fn global_subscription_ids_are_distinct() {
1047 let first = subscribe("global.ids", Box::new(|_| {}));
1048 let second = subscribe("global.ids", Box::new(|_| {}));
1049 default_notifier().unsubscribe(first);
1050 default_notifier().unsubscribe(second);
1051
1052 assert_ne!(first, second);
1053 }
1054
1055 #[test]
1056 fn unsubscribe_from_one_pattern_does_not_affect_other_pattern() {
1057 let notifier = Notifier::new();
1058 let render_id = notifier.subscribe("render", Box::new(|_| {}));
1059 let sql_id = notifier.subscribe("sql", Box::new(|_| {}));
1060 notifier.unsubscribe(render_id);
1061
1062 assert!(!notifier.subscription_keys.contains_key(&render_id));
1063 assert!(notifier.subscription_keys.contains_key(&sql_id));
1064 assert!(notifier.subscriptions.contains_key("sql"));
1065 }
1066
1067 #[test]
1068 fn publish_to_unmatched_pattern_does_not_invoke_exact_subscriber() {
1069 let notifier = Notifier::new();
1070 let hits = Arc::new(Mutex::new(0usize));
1071 let received = Arc::clone(&hits);
1072 notifier.subscribe(
1073 "render",
1074 Box::new(move |_| {
1075 *lock(&received) += 1;
1076 }),
1077 );
1078
1079 publish_named(¬ifier, "sql");
1080
1081 assert_eq!(*lock(&hits), 0);
1082 }
1083
1084 #[test]
1085 fn prefix_and_exact_subscribers_can_be_removed_independently() {
1086 let notifier = Notifier::new();
1087 let exact = notifier.subscribe("render", Box::new(|_| {}));
1088 let prefix = notifier.subscribe("render*", Box::new(|_| {}));
1089 notifier.unsubscribe(exact);
1090
1091 assert!(!notifier.subscription_keys.contains_key(&exact));
1092 assert!(notifier.subscription_keys.contains_key(&prefix));
1093 assert!(notifier.subscriptions.contains_key("render*"));
1094 }
1095
1096 #[test]
1097 fn nested_instrument_returns_outer_result() {
1098 let notifier = Notifier::new();
1099
1100 let result = notifier.instrument("outer", HashMap::new(), || {
1101 notifier.instrument("inner", HashMap::new(), || 21) + 21
1102 });
1103
1104 assert_eq!(result, 42);
1105 }
1106
1107 #[test]
1108 fn removing_both_same_pattern_subscriptions_clears_bucket() {
1109 let notifier = Notifier::new();
1110 let first = notifier.subscribe("render", Box::new(|_| {}));
1111 let second = notifier.subscribe("render", Box::new(|_| {}));
1112 notifier.unsubscribe(first);
1113 notifier.unsubscribe(second);
1114
1115 assert!(!notifier.subscriptions.contains_key("render"));
1116 assert!(notifier.subscription_keys.is_empty());
1117 }
1118}