Skip to main content

rustrails_support/
log_subscriber.rs

1use crate::notifications;
2use once_cell::sync::{Lazy, OnceCell};
3use parking_lot::RwLock;
4use serde_json::Value;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9/// A notification event adapted for log subscribers.
10#[derive(Debug, Clone, PartialEq)]
11pub struct LogEvent {
12    /// The event name.
13    pub name: String,
14    /// When the event started.
15    pub started_at: Instant,
16    /// How long the event took.
17    pub duration: Duration,
18    /// Structured payload attached to the event.
19    pub payload: HashMap<String, Value>,
20}
21
22/// Receives adapted notification events.
23pub trait LogSubscriber: Send + Sync {
24    /// Handles a delivered log event.
25    fn call(&self, event: &LogEvent);
26}
27
28static SUBSCRIBERS: Lazy<RwLock<Vec<Arc<dyn LogSubscriber>>>> =
29    Lazy::new(|| RwLock::new(Vec::new()));
30static BRIDGE_SUBSCRIPTION: OnceCell<usize> = OnceCell::new();
31
32/// Attaches a subscriber to the global notifications bridge.
33pub fn attach<S>(subscriber: S)
34where
35    S: LogSubscriber + 'static,
36{
37    ensure_bridge();
38    SUBSCRIBERS.write().push(Arc::new(subscriber));
39}
40
41fn ensure_bridge() {
42    BRIDGE_SUBSCRIPTION.get_or_init(|| {
43        notifications::subscribe(
44            "*",
45            Box::new(|event| {
46                let log_event = LogEvent {
47                    name: event.name.clone(),
48                    started_at: event.time,
49                    duration: event.duration,
50                    payload: event.payload.clone(),
51                };
52                let subscribers = SUBSCRIBERS.read().clone();
53                for subscriber in subscribers {
54                    subscriber.call(&log_event);
55                }
56            }),
57        )
58    });
59}
60
61#[cfg(test)]
62pub(crate) fn reset_subscribers() {
63    SUBSCRIBERS.write().clear();
64}
65
66#[cfg(test)]
67mod tests {
68    use super::{LogEvent, LogSubscriber, attach, reset_subscribers};
69    use crate::notifications;
70    use parking_lot::Mutex;
71    use serde_json::json;
72    use std::collections::HashMap;
73    use std::sync::{Arc, LazyLock, Mutex as StdMutex};
74    use std::time::Duration;
75
76    static TEST_LOCK: LazyLock<StdMutex<()>> = LazyLock::new(|| StdMutex::new(()));
77
78    #[derive(Default)]
79    struct RecordingSubscriber {
80        events: Arc<Mutex<Vec<LogEvent>>>,
81    }
82
83    impl RecordingSubscriber {
84        fn new() -> Self {
85            Self::default()
86        }
87
88        fn handle(&self) -> Arc<Mutex<Vec<LogEvent>>> {
89            Arc::clone(&self.events)
90        }
91    }
92
93    impl LogSubscriber for RecordingSubscriber {
94        fn call(&self, event: &LogEvent) {
95            self.events.lock().push(event.clone());
96        }
97    }
98
99    #[test]
100    fn attach_receives_instrumented_notifications() {
101        let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
102        reset_subscribers();
103        let subscriber = RecordingSubscriber::new();
104        let events = subscriber.handle();
105        attach(subscriber);
106
107        notifications::instrument(
108            "log_subscriber.attach_receives_instrumented_notifications",
109            HashMap::new(),
110            || (),
111        );
112
113        let matching = events
114            .lock()
115            .iter()
116            .filter(|event| {
117                event.name == "log_subscriber.attach_receives_instrumented_notifications"
118            })
119            .count();
120        assert_eq!(matching, 1);
121    }
122
123    #[test]
124    fn payload_is_forwarded_to_subscribers() {
125        let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
126        reset_subscribers();
127        let subscriber = RecordingSubscriber::new();
128        let events = subscriber.handle();
129        attach(subscriber);
130
131        notifications::instrument(
132            "log_subscriber.payload_is_forwarded_to_subscribers",
133            HashMap::from([(String::from("sql"), json!("select 1"))]),
134            || (),
135        );
136
137        let event = events
138            .lock()
139            .iter()
140            .find(|event| event.name == "log_subscriber.payload_is_forwarded_to_subscribers")
141            .cloned()
142            .expect("named event should be captured");
143        assert_eq!(event.payload.get("sql"), Some(&json!("select 1")));
144    }
145
146    #[test]
147    fn duration_is_forwarded_to_subscribers() {
148        let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
149        reset_subscribers();
150        let subscriber = RecordingSubscriber::new();
151        let events = subscriber.handle();
152        attach(subscriber);
153
154        notifications::instrument(
155            "log_subscriber.duration_is_forwarded_to_subscribers",
156            HashMap::new(),
157            || {
158                std::thread::sleep(Duration::from_millis(10));
159            },
160        );
161
162        let event = events
163            .lock()
164            .iter()
165            .find(|event| event.name == "log_subscriber.duration_is_forwarded_to_subscribers")
166            .cloned()
167            .expect("named event should be captured");
168        assert!(event.duration >= Duration::from_millis(10));
169    }
170
171    #[test]
172    fn multiple_attached_subscribers_receive_the_same_event() {
173        let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
174        reset_subscribers();
175        let first = RecordingSubscriber::new();
176        let second = RecordingSubscriber::new();
177        let first_events = first.handle();
178        let second_events = second.handle();
179        attach(first);
180        attach(second);
181
182        notifications::instrument(
183            "log_subscriber.multiple_attached_subscribers_receive_the_same_event",
184            HashMap::new(),
185            || (),
186        );
187
188        let first_matching = first_events
189            .lock()
190            .iter()
191            .filter(|event| {
192                event.name == "log_subscriber.multiple_attached_subscribers_receive_the_same_event"
193            })
194            .count();
195        let second_matching = second_events
196            .lock()
197            .iter()
198            .filter(|event| {
199                event.name == "log_subscriber.multiple_attached_subscribers_receive_the_same_event"
200            })
201            .count();
202        assert_eq!(first_matching, 1);
203        assert_eq!(second_matching, 1);
204    }
205
206    #[test]
207    fn started_at_is_recorded() {
208        let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
209        reset_subscribers();
210        let subscriber = RecordingSubscriber::new();
211        let events = subscriber.handle();
212        attach(subscriber);
213
214        notifications::instrument(
215            "log_subscriber.started_at_is_recorded",
216            HashMap::new(),
217            || (),
218        );
219
220        let event = events
221            .lock()
222            .iter()
223            .find(|event| event.name == "log_subscriber.started_at_is_recorded")
224            .cloned()
225            .expect("named event should be captured");
226        assert!(event.started_at <= std::time::Instant::now());
227    }
228
229    #[test]
230    fn reset_subscribers_clears_attached_subscribers() {
231        let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
232        reset_subscribers();
233        let subscriber = RecordingSubscriber::new();
234        let events = subscriber.handle();
235        attach(subscriber);
236        reset_subscribers();
237
238        notifications::instrument(
239            "log_subscriber.reset_subscribers_clears_attached_subscribers",
240            HashMap::new(),
241            || (),
242        );
243
244        let matching = events
245            .lock()
246            .iter()
247            .filter(|event| {
248                event.name == "log_subscriber.reset_subscribers_clears_attached_subscribers"
249            })
250            .count();
251        assert_eq!(matching, 0);
252    }
253}