Skip to main content

rustrails_support/
notifications.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::time::{Duration, Instant};
5
6use dashmap::DashMap;
7use once_cell::sync::Lazy;
8use serde_json::Value;
9
10/// An instrumentation event published to subscribers.
11#[derive(Debug, Clone)]
12pub struct Event {
13    /// Event name.
14    pub name: String,
15    /// Arbitrary event payload.
16    pub payload: HashMap<String, Value>,
17    /// Instant when instrumentation started.
18    pub time: Instant,
19    /// Instant when instrumentation finished.
20    pub end: Instant,
21    /// Measured event duration.
22    pub duration: Duration,
23}
24
25/// Subscriber callback type accepted by the notifier.
26pub type Subscriber = Box<dyn Fn(&Event) + Send + Sync + 'static>;
27
28type SharedSubscriber = Arc<dyn Fn(&Event) + Send + Sync + 'static>;
29
30#[derive(Clone)]
31struct Subscription {
32    id: usize,
33    callback: SharedSubscriber,
34}
35
36/// A synchronous pub/sub notification bus for instrumentation events.
37#[derive(Default)]
38pub struct Notifier {
39    subscriptions: DashMap<String, Vec<Subscription>>,
40    subscription_keys: DashMap<usize, String>,
41    next_id: AtomicUsize,
42}
43
44impl Notifier {
45    /// Creates an empty notifier.
46    #[must_use]
47    pub fn new() -> Self {
48        Self {
49            subscriptions: DashMap::new(),
50            subscription_keys: DashMap::new(),
51            next_id: AtomicUsize::new(1),
52        }
53    }
54
55    /// Subscribes to `event_name` and returns a subscription id.
56    ///
57    /// `event_name` matches exactly. The special pattern `"*"` receives all events,
58    /// and patterns ending with `'*'` act as prefix matches.
59    pub fn subscribe(&self, event_name: &str, callback: Subscriber) -> usize {
60        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
61        let pattern = event_name.to_owned();
62        let shared = Arc::<dyn Fn(&Event) + Send + Sync>::from(callback);
63
64        self.subscriptions
65            .entry(pattern.clone())
66            .or_default()
67            .push(Subscription {
68                id,
69                callback: shared,
70            });
71        self.subscription_keys.insert(id, pattern);
72
73        id
74    }
75
76    /// Removes a subscription by id.
77    pub fn unsubscribe(&self, id: usize) {
78        let Some((_, pattern)) = self.subscription_keys.remove(&id) else {
79            return;
80        };
81
82        let should_remove_key =
83            if let Some(mut subscriptions) = self.subscriptions.get_mut(&pattern) {
84                subscriptions.retain(|subscription| subscription.id != id);
85                subscriptions.is_empty()
86            } else {
87                false
88            };
89
90        if should_remove_key {
91            self.subscriptions.remove(&pattern);
92        }
93    }
94
95    /// Instruments a block, publishing its event after the block completes.
96    pub fn instrument<F, R>(&self, event_name: &str, payload: HashMap<String, Value>, f: F) -> R
97    where
98        F: FnOnce() -> R,
99    {
100        let start = Instant::now();
101        let result = f();
102        let end = Instant::now();
103
104        self.publish(Event {
105            name: event_name.to_owned(),
106            payload,
107            time: start,
108            end,
109            duration: end.saturating_duration_since(start),
110        });
111
112        result
113    }
114
115    /// Publishes an event to all matching subscribers.
116    pub fn publish(&self, event: Event) {
117        let callbacks: Vec<SharedSubscriber> = self
118            .subscriptions
119            .iter()
120            .filter(|entry| Self::matches(entry.key(), &event.name))
121            .flat_map(|entry| {
122                entry
123                    .value()
124                    .iter()
125                    .map(|subscription| Arc::clone(&subscription.callback))
126                    .collect::<Vec<_>>()
127            })
128            .collect();
129
130        for callback in callbacks {
131            callback(&event);
132        }
133    }
134
135    fn matches(pattern: &str, event_name: &str) -> bool {
136        if pattern == "*" {
137            return true;
138        }
139
140        if let Some(prefix) = pattern.strip_suffix('*') {
141            return event_name.starts_with(prefix);
142        }
143
144        pattern == event_name
145    }
146}
147
148static DEFAULT_NOTIFIER: Lazy<Notifier> = Lazy::new(Notifier::new);
149
150/// Returns the global default notifier.
151#[must_use]
152pub fn default_notifier() -> &'static Notifier {
153    &DEFAULT_NOTIFIER
154}
155
156/// Subscribes to the global notifier.
157pub fn subscribe(event: &str, callback: Subscriber) -> usize {
158    default_notifier().subscribe(event, callback)
159}
160
161/// Instruments an event on the global notifier.
162pub fn instrument<F, R>(event: &str, payload: HashMap<String, Value>, f: F) -> R
163where
164    F: FnOnce() -> R,
165{
166    default_notifier().instrument(event, payload, f)
167}
168
169#[cfg(test)]
170mod tests {
171    use std::sync::{Arc, Mutex};
172    use std::thread;
173    use std::time::Duration as StdDuration;
174
175    use super::*;
176
177    fn lock<T>(value: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
178        value.lock().expect("mutex should not be poisoned")
179    }
180
181    #[test]
182    fn subscribe_and_receive_published_events() {
183        let notifier = Notifier::new();
184        let events = Arc::new(Mutex::new(Vec::new()));
185        let received = Arc::clone(&events);
186
187        notifier.subscribe(
188            "render",
189            Box::new(move |event| {
190                lock(&received).push(event.name.clone());
191            }),
192        );
193
194        notifier.publish(Event {
195            name: "render".to_owned(),
196            payload: HashMap::new(),
197            time: Instant::now(),
198            end: Instant::now(),
199            duration: Duration::ZERO,
200        });
201
202        assert_eq!(&*lock(&events), &[String::from("render")]);
203    }
204
205    #[test]
206    fn instrument_measures_duration() {
207        let notifier = Notifier::new();
208        let durations = Arc::new(Mutex::new(Vec::new()));
209        let received = Arc::clone(&durations);
210
211        notifier.subscribe(
212            "slow",
213            Box::new(move |event| {
214                lock(&received).push(event.duration);
215            }),
216        );
217
218        notifier.instrument("slow", HashMap::new(), || {
219            thread::sleep(StdDuration::from_millis(15));
220        });
221
222        let durations = lock(&durations);
223        assert_eq!(durations.len(), 1);
224        assert!(durations[0] >= StdDuration::from_millis(15));
225    }
226
227    #[test]
228    fn unsubscribe_stops_delivery() {
229        let notifier = Notifier::new();
230        let hits = Arc::new(Mutex::new(0usize));
231        let received = Arc::clone(&hits);
232
233        let id = notifier.subscribe(
234            "render",
235            Box::new(move |_| {
236                *lock(&received) += 1;
237            }),
238        );
239
240        notifier.publish(Event {
241            name: "render".to_owned(),
242            payload: HashMap::new(),
243            time: Instant::now(),
244            end: Instant::now(),
245            duration: Duration::ZERO,
246        });
247        notifier.unsubscribe(id);
248        notifier.publish(Event {
249            name: "render".to_owned(),
250            payload: HashMap::new(),
251            time: Instant::now(),
252            end: Instant::now(),
253            duration: Duration::ZERO,
254        });
255
256        assert_eq!(*lock(&hits), 1);
257    }
258
259    #[test]
260    fn multiple_subscribers_receive_the_same_event() {
261        let notifier = Notifier::new();
262        let hits = Arc::new(Mutex::new(Vec::new()));
263
264        for name in ["first", "second"] {
265            let hits = Arc::clone(&hits);
266            notifier.subscribe(
267                "render",
268                Box::new(move |_| {
269                    lock(&hits).push(name.to_owned());
270                }),
271            );
272        }
273
274        notifier.publish(Event {
275            name: "render".to_owned(),
276            payload: HashMap::new(),
277            time: Instant::now(),
278            end: Instant::now(),
279            duration: Duration::ZERO,
280        });
281
282        let hits = lock(&hits);
283        assert_eq!(hits.len(), 2);
284        assert!(hits.contains(&String::from("first")));
285        assert!(hits.contains(&String::from("second")));
286    }
287
288    #[test]
289    fn exact_subscriptions_do_not_receive_other_events() {
290        let notifier = Notifier::new();
291        let hits = Arc::new(Mutex::new(0usize));
292        let received = Arc::clone(&hits);
293
294        notifier.subscribe(
295            "render",
296            Box::new(move |_| {
297                *lock(&received) += 1;
298            }),
299        );
300
301        notifier.publish(Event {
302            name: "sql".to_owned(),
303            payload: HashMap::new(),
304            time: Instant::now(),
305            end: Instant::now(),
306            duration: Duration::ZERO,
307        });
308
309        assert_eq!(*lock(&hits), 0);
310    }
311
312    #[test]
313    fn wildcard_subscription_receives_all_events() {
314        let notifier = Notifier::new();
315        let hits = Arc::new(Mutex::new(Vec::new()));
316        let received = Arc::clone(&hits);
317
318        notifier.subscribe(
319            "*",
320            Box::new(move |event| {
321                lock(&received).push(event.name.clone());
322            }),
323        );
324
325        for name in ["render", "sql"] {
326            notifier.publish(Event {
327                name: name.to_owned(),
328                payload: HashMap::new(),
329                time: Instant::now(),
330                end: Instant::now(),
331                duration: Duration::ZERO,
332            });
333        }
334
335        assert_eq!(
336            &*lock(&hits),
337            &[String::from("render"), String::from("sql")]
338        );
339    }
340
341    #[test]
342    fn prefix_subscription_receives_matching_events_only() {
343        let notifier = Notifier::new();
344        let hits = Arc::new(Mutex::new(Vec::new()));
345        let received = Arc::clone(&hits);
346
347        notifier.subscribe(
348            "render.*",
349            Box::new(move |event| {
350                lock(&received).push(event.name.clone());
351            }),
352        );
353
354        notifier.publish(Event {
355            name: "render.template".to_owned(),
356            payload: HashMap::new(),
357            time: Instant::now(),
358            end: Instant::now(),
359            duration: Duration::ZERO,
360        });
361        notifier.publish(Event {
362            name: "sql.active_record".to_owned(),
363            payload: HashMap::new(),
364            time: Instant::now(),
365            end: Instant::now(),
366            duration: Duration::ZERO,
367        });
368
369        assert_eq!(&*lock(&hits), &[String::from("render.template")]);
370    }
371
372    #[test]
373    fn payload_is_passed_through() {
374        let notifier = Notifier::new();
375        let payloads = Arc::new(Mutex::new(Vec::new()));
376        let received = Arc::clone(&payloads);
377
378        notifier.subscribe(
379            "process",
380            Box::new(move |event| {
381                lock(&received).push(event.payload.get("status").cloned());
382            }),
383        );
384
385        notifier.instrument(
386            "process",
387            HashMap::from([(String::from("status"), Value::from("ok"))]),
388            || {},
389        );
390
391        assert_eq!(&*lock(&payloads), &[Some(Value::from("ok"))]);
392    }
393
394    #[test]
395    fn nested_instrumentation_publishes_both_events() {
396        let notifier = Notifier::new();
397        let names = Arc::new(Mutex::new(Vec::new()));
398        let received = Arc::clone(&names);
399
400        notifier.subscribe(
401            "*",
402            Box::new(move |event| {
403                lock(&received).push(event.name.clone());
404            }),
405        );
406
407        notifier.instrument("outer", HashMap::new(), || {
408            notifier.instrument("inner", HashMap::new(), || {});
409        });
410
411        assert_eq!(
412            &*lock(&names),
413            &[String::from("inner"), String::from("outer")]
414        );
415    }
416
417    #[test]
418    fn instrument_returns_block_result() {
419        let notifier = Notifier::new();
420
421        let result = notifier.instrument("math", HashMap::new(), || 6 * 7);
422
423        assert_eq!(result, 42);
424    }
425
426    #[test]
427    fn global_functions_work() {
428        let hits = Arc::new(Mutex::new(Vec::new()));
429        let received = Arc::clone(&hits);
430
431        let id = subscribe(
432            "global.test",
433            Box::new(move |event| {
434                lock(&received).push(event.name.clone());
435            }),
436        );
437
438        instrument("global.test", HashMap::new(), || {});
439        default_notifier().unsubscribe(id);
440
441        assert_eq!(&*lock(&hits), &[String::from("global.test")]);
442    }
443
444    #[test]
445    fn unsubscribe_unknown_id_is_a_noop() {
446        let notifier = Notifier::new();
447        notifier.unsubscribe(999);
448    }
449
450    #[test]
451    fn subscriptions_receive_distinct_ids() {
452        let notifier = Notifier::new();
453        let first = notifier.subscribe("a", Box::new(|_| {}));
454        let second = notifier.subscribe("a", Box::new(|_| {}));
455
456        assert_ne!(first, second);
457    }
458
459    #[test]
460    fn publish_to_no_subscribers_is_safe() {
461        let notifier = Notifier::new();
462        notifier.publish(Event {
463            name: "unused".to_owned(),
464            payload: HashMap::new(),
465            time: Instant::now(),
466            end: Instant::now(),
467            duration: Duration::ZERO,
468        });
469    }
470
471    #[test]
472    fn subscriber_can_observe_event_timestamps() {
473        let notifier = Notifier::new();
474        let observed = Arc::new(Mutex::new(Vec::new()));
475        let received = Arc::clone(&observed);
476
477        notifier.subscribe(
478            "timed",
479            Box::new(move |event| {
480                lock(&received).push((event.time <= event.end, event.duration >= Duration::ZERO));
481            }),
482        );
483
484        notifier.instrument("timed", HashMap::new(), || {
485            thread::sleep(StdDuration::from_millis(5));
486        });
487
488        assert_eq!(&*lock(&observed), &[(true, true)]);
489    }
490
491    #[test]
492    fn unsubscribe_removes_only_targeted_subscription() {
493        let notifier = Notifier::new();
494        let first_hits = Arc::new(Mutex::new(0usize));
495        let second_hits = Arc::new(Mutex::new(0usize));
496        let first_received = Arc::clone(&first_hits);
497        let second_received = Arc::clone(&second_hits);
498
499        let first_id = notifier.subscribe(
500            "render",
501            Box::new(move |_| {
502                *lock(&first_received) += 1;
503            }),
504        );
505        notifier.subscribe(
506            "render",
507            Box::new(move |_| {
508                *lock(&second_received) += 1;
509            }),
510        );
511
512        notifier.publish(Event {
513            name: "render".to_owned(),
514            payload: HashMap::new(),
515            time: Instant::now(),
516            end: Instant::now(),
517            duration: Duration::ZERO,
518        });
519        notifier.unsubscribe(first_id);
520        notifier.publish(Event {
521            name: "render".to_owned(),
522            payload: HashMap::new(),
523            time: Instant::now(),
524            end: Instant::now(),
525            duration: Duration::ZERO,
526        });
527
528        assert_eq!(*lock(&first_hits), 1);
529        assert_eq!(*lock(&second_hits), 2);
530    }
531
532    #[test]
533    fn matching_subscriptions_fan_out_to_all_matching_patterns() {
534        let notifier = Notifier::new();
535        let deliveries = Arc::new(Mutex::new(Vec::new()));
536
537        for (pattern, label) in [
538            ("render.template", "exact"),
539            ("render.*", "prefix"),
540            ("*", "wildcard"),
541        ] {
542            let deliveries = Arc::clone(&deliveries);
543            notifier.subscribe(
544                pattern,
545                Box::new(move |_| {
546                    lock(&deliveries).push(label.to_owned());
547                }),
548            );
549        }
550
551        notifier.publish(Event {
552            name: "render.template".to_owned(),
553            payload: HashMap::new(),
554            time: Instant::now(),
555            end: Instant::now(),
556            duration: Duration::ZERO,
557        });
558
559        let mut deliveries = lock(&deliveries);
560        deliveries.sort();
561        assert_eq!(
562            &*deliveries,
563            &[
564                String::from("exact"),
565                String::from("prefix"),
566                String::from("wildcard"),
567            ]
568        );
569    }
570
571    #[test]
572    fn instrument_timestamps_bracket_execution_and_match_duration() {
573        let notifier = Notifier::new();
574        let observed = Arc::new(Mutex::new(Vec::new()));
575        let received = Arc::clone(&observed);
576
577        notifier.subscribe(
578            "timed",
579            Box::new(move |event| {
580                lock(&received).push((event.time, event.end, event.duration));
581            }),
582        );
583
584        let before = Instant::now();
585        let (inside_start, inside_end) = notifier.instrument("timed", HashMap::new(), || {
586            let inside_start = Instant::now();
587            thread::sleep(StdDuration::from_millis(5));
588            let inside_end = Instant::now();
589            (inside_start, inside_end)
590        });
591        let after = Instant::now();
592
593        let observed = lock(&observed);
594        assert_eq!(observed.len(), 1);
595        let (event_start, event_end, duration) = observed[0];
596        assert!(before <= event_start);
597        assert!(event_start <= inside_start);
598        assert!(inside_end <= event_end);
599        assert!(event_end <= after);
600        assert_eq!(duration, event_end.saturating_duration_since(event_start));
601    }
602
603    #[test]
604    fn nested_instrumentation_keeps_payloads_separate_and_ordered() {
605        let notifier = Notifier::new();
606        let observed = Arc::new(Mutex::new(Vec::new()));
607        let received = Arc::clone(&observed);
608
609        notifier.subscribe(
610            "*",
611            Box::new(move |event| {
612                lock(&received).push((event.name.clone(), event.payload.get("kind").cloned()));
613            }),
614        );
615
616        notifier.instrument(
617            "outer",
618            HashMap::from([(String::from("kind"), Value::from("outer"))]),
619            || {
620                notifier.instrument(
621                    "inner",
622                    HashMap::from([(String::from("kind"), Value::from("inner"))]),
623                    || {},
624                );
625
626                let observed = lock(&observed);
627                assert_eq!(observed.len(), 1);
628                assert_eq!(
629                    observed[0],
630                    (String::from("inner"), Some(Value::from("inner")))
631                );
632            },
633        );
634
635        assert_eq!(
636            &*lock(&observed),
637            &[
638                (String::from("inner"), Some(Value::from("inner"))),
639                (String::from("outer"), Some(Value::from("outer"))),
640            ]
641        );
642    }
643
644    fn event(name: &str) -> Event {
645        Event {
646            name: name.to_owned(),
647            payload: HashMap::new(),
648            time: Instant::now(),
649            end: Instant::now(),
650            duration: Duration::ZERO,
651        }
652    }
653
654    fn publish_named(notifier: &Notifier, name: &str) {
655        notifier.publish(event(name));
656    }
657
658    macro_rules! pattern_delivery_case {
659        ($name:ident, $pattern:expr, $event_name:expr, $expected:expr) => {
660            #[test]
661            fn $name() {
662                let notifier = Notifier::new();
663                let hits = Arc::new(Mutex::new(0usize));
664                let received = Arc::clone(&hits);
665
666                notifier.subscribe(
667                    $pattern,
668                    Box::new(move |_| {
669                        *lock(&received) += 1;
670                    }),
671                );
672                publish_named(&notifier, $event_name);
673
674                assert_eq!(*lock(&hits), $expected);
675            }
676        };
677    }
678
679    pattern_delivery_case!(exact_pattern_matches_same_name, "render", "render", 1);
680    pattern_delivery_case!(
681        exact_pattern_rejects_prefixed_name,
682        "render",
683        "render.template",
684        0
685    );
686    pattern_delivery_case!(wildcard_pattern_matches_empty_name, "*", "", 1);
687    pattern_delivery_case!(
688        wildcard_pattern_matches_nested_name,
689        "*",
690        "sql.active_record",
691        1
692    );
693    pattern_delivery_case!(prefix_pattern_matches_exact_prefix, "render*", "render", 1);
694    pattern_delivery_case!(
695        prefix_pattern_matches_extended_name,
696        "render*",
697        "render.template",
698        1
699    );
700    pattern_delivery_case!(
701        prefix_pattern_rejects_other_prefix,
702        "render*",
703        "sql.render",
704        0
705    );
706    pattern_delivery_case!(short_prefix_matches_base_name, "sql*", "sql", 1);
707    pattern_delivery_case!(
708        short_prefix_matches_extended_name,
709        "sql*",
710        "sql.active_record",
711        1
712    );
713    pattern_delivery_case!(exact_pattern_rejects_empty_name, "render", "", 0);
714
715    #[test]
716    fn new_notifier_starts_without_subscriptions() {
717        let notifier = Notifier::new();
718
719        assert!(notifier.subscriptions.is_empty());
720        assert!(notifier.subscription_keys.is_empty());
721    }
722
723    #[test]
724    fn first_subscription_id_is_one() {
725        let notifier = Notifier::new();
726
727        assert_eq!(notifier.subscribe("render", Box::new(|_| {})), 1);
728    }
729
730    #[test]
731    fn subscription_ids_increase_monotonically() {
732        let notifier = Notifier::new();
733        let first = notifier.subscribe("render", Box::new(|_| {}));
734        let second = notifier.subscribe("render", Box::new(|_| {}));
735        let third = notifier.subscribe("sql", Box::new(|_| {}));
736
737        assert!(first < second && second < third);
738    }
739
740    #[test]
741    fn subscribe_records_pattern_and_id() {
742        let notifier = Notifier::new();
743        let id = notifier.subscribe("render", Box::new(|_| {}));
744
745        assert_eq!(
746            notifier.subscription_keys.get(&id).as_deref(),
747            Some(&"render".to_owned())
748        );
749        assert_eq!(
750            notifier
751                .subscriptions
752                .get("render")
753                .map(|value| value.len()),
754            Some(1)
755        );
756    }
757
758    #[test]
759    fn unsubscribe_removes_id_mapping() {
760        let notifier = Notifier::new();
761        let id = notifier.subscribe("render", Box::new(|_| {}));
762        notifier.unsubscribe(id);
763
764        assert!(!notifier.subscription_keys.contains_key(&id));
765    }
766
767    #[test]
768    fn unsubscribe_last_subscription_removes_pattern_bucket() {
769        let notifier = Notifier::new();
770        let id = notifier.subscribe("render", Box::new(|_| {}));
771        notifier.unsubscribe(id);
772
773        assert!(!notifier.subscriptions.contains_key("render"));
774    }
775
776    #[test]
777    fn unsubscribe_one_of_many_keeps_pattern_bucket() {
778        let notifier = Notifier::new();
779        let first = notifier.subscribe("render", Box::new(|_| {}));
780        let second = notifier.subscribe("render", Box::new(|_| {}));
781        notifier.unsubscribe(first);
782
783        assert_eq!(
784            notifier
785                .subscriptions
786                .get("render")
787                .map(|value| value.len()),
788            Some(1)
789        );
790        assert!(notifier.subscription_keys.contains_key(&second));
791    }
792
793    #[test]
794    fn publish_no_subscribers_keeps_state_empty() {
795        let notifier = Notifier::new();
796        publish_named(&notifier, "unused");
797
798        assert!(notifier.subscriptions.is_empty());
799        assert!(notifier.subscription_keys.is_empty());
800    }
801
802    #[test]
803    fn multiple_publishes_deliver_multiple_times() {
804        let notifier = Notifier::new();
805        let hits = Arc::new(Mutex::new(0usize));
806        let received = Arc::clone(&hits);
807
808        notifier.subscribe(
809            "render",
810            Box::new(move |_| {
811                *lock(&received) += 1;
812            }),
813        );
814        publish_named(&notifier, "render");
815        publish_named(&notifier, "render");
816        publish_named(&notifier, "render");
817
818        assert_eq!(*lock(&hits), 3);
819    }
820
821    #[test]
822    fn same_callback_subscribed_twice_receives_two_deliveries() {
823        let notifier = Notifier::new();
824        let hits = Arc::new(Mutex::new(0usize));
825
826        for _ in 0..2 {
827            let received = Arc::clone(&hits);
828            notifier.subscribe(
829                "render",
830                Box::new(move |_| {
831                    *lock(&received) += 1;
832                }),
833            );
834        }
835        publish_named(&notifier, "render");
836
837        assert_eq!(*lock(&hits), 2);
838    }
839
840    #[test]
841    fn publish_order_is_preserved_within_same_pattern() {
842        let notifier = Notifier::new();
843        let order = Arc::new(Mutex::new(Vec::new()));
844
845        for label in ["first", "second", "third"] {
846            let order = Arc::clone(&order);
847            notifier.subscribe(
848                "render",
849                Box::new(move |_| {
850                    lock(&order).push(label.to_owned());
851                }),
852            );
853        }
854        publish_named(&notifier, "render");
855
856        assert_eq!(
857            &*lock(&order),
858            &[
859                String::from("first"),
860                String::from("second"),
861                String::from("third"),
862            ]
863        );
864    }
865
866    #[test]
867    fn same_event_reaches_exact_prefix_and_wildcard() {
868        let notifier = Notifier::new();
869        let deliveries = Arc::new(Mutex::new(Vec::new()));
870
871        for (pattern, label) in [
872            ("render", "exact"),
873            ("render*", "prefix"),
874            ("*", "wildcard"),
875        ] {
876            let deliveries = Arc::clone(&deliveries);
877            notifier.subscribe(
878                pattern,
879                Box::new(move |_| {
880                    lock(&deliveries).push(label.to_owned());
881                }),
882            );
883        }
884        publish_named(&notifier, "render");
885
886        let mut deliveries = lock(&deliveries);
887        deliveries.sort();
888        assert_eq!(
889            &*deliveries,
890            &[
891                String::from("exact"),
892                String::from("prefix"),
893                String::from("wildcard"),
894            ]
895        );
896    }
897
898    #[test]
899    fn instrument_with_empty_event_name_notifies_exact_subscriber() {
900        let notifier = Notifier::new();
901        let hits = Arc::new(Mutex::new(0usize));
902        let received = Arc::clone(&hits);
903        notifier.subscribe("", Box::new(move |_| *lock(&received) += 1));
904
905        notifier.instrument("", HashMap::new(), || {});
906
907        assert_eq!(*lock(&hits), 1);
908    }
909
910    #[test]
911    fn instrument_with_unicode_event_name_notifies_exact_subscriber() {
912        let notifier = Notifier::new();
913        let names = Arc::new(Mutex::new(Vec::new()));
914        let received = Arc::clone(&names);
915        notifier.subscribe(
916            "résumé.render",
917            Box::new(move |event| lock(&received).push(event.name.clone())),
918        );
919
920        notifier.instrument("résumé.render", HashMap::new(), || {});
921
922        assert_eq!(&*lock(&names), &[String::from("résumé.render")]);
923    }
924
925    #[test]
926    fn instrument_passes_numeric_payload() {
927        let notifier = Notifier::new();
928        let values = Arc::new(Mutex::new(Vec::new()));
929        let received = Arc::clone(&values);
930        notifier.subscribe(
931            "math",
932            Box::new(move |event| lock(&received).push(event.payload.get("answer").cloned())),
933        );
934
935        notifier.instrument(
936            "math",
937            HashMap::from([(String::from("answer"), Value::from(42))]),
938            || {},
939        );
940
941        assert_eq!(&*lock(&values), &[Some(Value::from(42))]);
942    }
943
944    #[test]
945    fn instrument_passes_unicode_payload() {
946        let notifier = Notifier::new();
947        let values = Arc::new(Mutex::new(Vec::new()));
948        let received = Arc::clone(&values);
949        notifier.subscribe(
950            "greet",
951            Box::new(move |event| lock(&received).push(event.payload.get("message").cloned())),
952        );
953
954        notifier.instrument(
955            "greet",
956            HashMap::from([(String::from("message"), Value::from("héllø 🌍"))]),
957            || {},
958        );
959
960        assert_eq!(&*lock(&values), &[Some(Value::from("héllø 🌍"))]);
961    }
962
963    #[test]
964    fn subscriber_can_publish_reentrant_event() {
965        let notifier = Arc::new(Notifier::new());
966        let seen = Arc::new(Mutex::new(Vec::new()));
967
968        {
969            let notifier = Arc::clone(&notifier);
970            let notifier_for_callback = Arc::clone(&notifier);
971            let seen = Arc::clone(&seen);
972            notifier.subscribe(
973                "outer",
974                Box::new(move |_| {
975                    lock(&seen).push(String::from("outer"));
976                    publish_named(&notifier_for_callback, "inner");
977                }),
978            );
979        }
980        {
981            let seen = Arc::clone(&seen);
982            notifier.subscribe(
983                "inner",
984                Box::new(move |_| {
985                    lock(&seen).push(String::from("inner"));
986                }),
987            );
988        }
989
990        publish_named(&notifier, "outer");
991
992        assert_eq!(
993            &*lock(&seen),
994            &[String::from("outer"), String::from("inner")]
995        );
996    }
997
998    #[test]
999    fn callbacks_run_after_block_completes() {
1000        let notifier = Notifier::new();
1001        let state = Arc::new(Mutex::new(String::from("before")));
1002        let received_state = Arc::new(Mutex::new(Vec::new()));
1003
1004        {
1005            let state = Arc::clone(&state);
1006            let received_state = Arc::clone(&received_state);
1007            notifier.subscribe(
1008                "render",
1009                Box::new(move |_| {
1010                    lock(&received_state).push(lock(&state).clone());
1011                }),
1012            );
1013        }
1014
1015        notifier.instrument("render", HashMap::new(), || {
1016            *lock(&state) = String::from("after");
1017        });
1018
1019        assert_eq!(&*lock(&received_state), &[String::from("after")]);
1020    }
1021
1022    #[test]
1023    fn default_notifier_returns_same_instance() {
1024        assert!(std::ptr::eq(default_notifier(), default_notifier()));
1025    }
1026
1027    #[test]
1028    fn global_wildcard_receives_multiple_events() {
1029        let hits = Arc::new(Mutex::new(Vec::new()));
1030        let received = Arc::clone(&hits);
1031        let id = subscribe(
1032            "*",
1033            Box::new(move |event| lock(&received).push(event.name.clone())),
1034        );
1035
1036        instrument("global.alpha", HashMap::new(), || {});
1037        instrument("global.beta", HashMap::new(), || {});
1038        default_notifier().unsubscribe(id);
1039
1040        let hits = lock(&hits);
1041        assert!(hits.contains(&String::from("global.alpha")));
1042        assert!(hits.contains(&String::from("global.beta")));
1043    }
1044
1045    #[test]
1046    fn global_subscription_ids_are_distinct() {
1047        let first = subscribe("global.ids", Box::new(|_| {}));
1048        let second = subscribe("global.ids", Box::new(|_| {}));
1049        default_notifier().unsubscribe(first);
1050        default_notifier().unsubscribe(second);
1051
1052        assert_ne!(first, second);
1053    }
1054
1055    #[test]
1056    fn unsubscribe_from_one_pattern_does_not_affect_other_pattern() {
1057        let notifier = Notifier::new();
1058        let render_id = notifier.subscribe("render", Box::new(|_| {}));
1059        let sql_id = notifier.subscribe("sql", Box::new(|_| {}));
1060        notifier.unsubscribe(render_id);
1061
1062        assert!(!notifier.subscription_keys.contains_key(&render_id));
1063        assert!(notifier.subscription_keys.contains_key(&sql_id));
1064        assert!(notifier.subscriptions.contains_key("sql"));
1065    }
1066
1067    #[test]
1068    fn publish_to_unmatched_pattern_does_not_invoke_exact_subscriber() {
1069        let notifier = Notifier::new();
1070        let hits = Arc::new(Mutex::new(0usize));
1071        let received = Arc::clone(&hits);
1072        notifier.subscribe(
1073            "render",
1074            Box::new(move |_| {
1075                *lock(&received) += 1;
1076            }),
1077        );
1078
1079        publish_named(&notifier, "sql");
1080
1081        assert_eq!(*lock(&hits), 0);
1082    }
1083
1084    #[test]
1085    fn prefix_and_exact_subscribers_can_be_removed_independently() {
1086        let notifier = Notifier::new();
1087        let exact = notifier.subscribe("render", Box::new(|_| {}));
1088        let prefix = notifier.subscribe("render*", Box::new(|_| {}));
1089        notifier.unsubscribe(exact);
1090
1091        assert!(!notifier.subscription_keys.contains_key(&exact));
1092        assert!(notifier.subscription_keys.contains_key(&prefix));
1093        assert!(notifier.subscriptions.contains_key("render*"));
1094    }
1095
1096    #[test]
1097    fn nested_instrument_returns_outer_result() {
1098        let notifier = Notifier::new();
1099
1100        let result = notifier.instrument("outer", HashMap::new(), || {
1101            notifier.instrument("inner", HashMap::new(), || 21) + 21
1102        });
1103
1104        assert_eq!(result, 42);
1105    }
1106
1107    #[test]
1108    fn removing_both_same_pattern_subscriptions_clears_bucket() {
1109        let notifier = Notifier::new();
1110        let first = notifier.subscribe("render", Box::new(|_| {}));
1111        let second = notifier.subscribe("render", Box::new(|_| {}));
1112        notifier.unsubscribe(first);
1113        notifier.unsubscribe(second);
1114
1115        assert!(!notifier.subscriptions.contains_key("render"));
1116        assert!(notifier.subscription_keys.is_empty());
1117    }
1118}