rustrails_support/
log_subscriber.rs1use crate::notifications;
2use once_cell::sync::{Lazy, OnceCell};
3use parking_lot::RwLock;
4use serde_json::Value;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone, PartialEq)]
11pub struct LogEvent {
12 pub name: String,
14 pub started_at: Instant,
16 pub duration: Duration,
18 pub payload: HashMap<String, Value>,
20}
21
22pub trait LogSubscriber: Send + Sync {
24 fn call(&self, event: &LogEvent);
26}
27
28static SUBSCRIBERS: Lazy<RwLock<Vec<Arc<dyn LogSubscriber>>>> =
29 Lazy::new(|| RwLock::new(Vec::new()));
30static BRIDGE_SUBSCRIPTION: OnceCell<usize> = OnceCell::new();
31
32pub fn attach<S>(subscriber: S)
34where
35 S: LogSubscriber + 'static,
36{
37 ensure_bridge();
38 SUBSCRIBERS.write().push(Arc::new(subscriber));
39}
40
41fn ensure_bridge() {
42 BRIDGE_SUBSCRIPTION.get_or_init(|| {
43 notifications::subscribe(
44 "*",
45 Box::new(|event| {
46 let log_event = LogEvent {
47 name: event.name.clone(),
48 started_at: event.time,
49 duration: event.duration,
50 payload: event.payload.clone(),
51 };
52 let subscribers = SUBSCRIBERS.read().clone();
53 for subscriber in subscribers {
54 subscriber.call(&log_event);
55 }
56 }),
57 )
58 });
59}
60
61#[cfg(test)]
62pub(crate) fn reset_subscribers() {
63 SUBSCRIBERS.write().clear();
64}
65
66#[cfg(test)]
67mod tests {
68 use super::{LogEvent, LogSubscriber, attach, reset_subscribers};
69 use crate::notifications;
70 use parking_lot::Mutex;
71 use serde_json::json;
72 use std::collections::HashMap;
73 use std::sync::{Arc, LazyLock, Mutex as StdMutex};
74 use std::time::Duration;
75
76 static TEST_LOCK: LazyLock<StdMutex<()>> = LazyLock::new(|| StdMutex::new(()));
77
78 #[derive(Default)]
79 struct RecordingSubscriber {
80 events: Arc<Mutex<Vec<LogEvent>>>,
81 }
82
83 impl RecordingSubscriber {
84 fn new() -> Self {
85 Self::default()
86 }
87
88 fn handle(&self) -> Arc<Mutex<Vec<LogEvent>>> {
89 Arc::clone(&self.events)
90 }
91 }
92
93 impl LogSubscriber for RecordingSubscriber {
94 fn call(&self, event: &LogEvent) {
95 self.events.lock().push(event.clone());
96 }
97 }
98
99 #[test]
100 fn attach_receives_instrumented_notifications() {
101 let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
102 reset_subscribers();
103 let subscriber = RecordingSubscriber::new();
104 let events = subscriber.handle();
105 attach(subscriber);
106
107 notifications::instrument(
108 "log_subscriber.attach_receives_instrumented_notifications",
109 HashMap::new(),
110 || (),
111 );
112
113 let matching = events
114 .lock()
115 .iter()
116 .filter(|event| {
117 event.name == "log_subscriber.attach_receives_instrumented_notifications"
118 })
119 .count();
120 assert_eq!(matching, 1);
121 }
122
123 #[test]
124 fn payload_is_forwarded_to_subscribers() {
125 let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
126 reset_subscribers();
127 let subscriber = RecordingSubscriber::new();
128 let events = subscriber.handle();
129 attach(subscriber);
130
131 notifications::instrument(
132 "log_subscriber.payload_is_forwarded_to_subscribers",
133 HashMap::from([(String::from("sql"), json!("select 1"))]),
134 || (),
135 );
136
137 let event = events
138 .lock()
139 .iter()
140 .find(|event| event.name == "log_subscriber.payload_is_forwarded_to_subscribers")
141 .cloned()
142 .expect("named event should be captured");
143 assert_eq!(event.payload.get("sql"), Some(&json!("select 1")));
144 }
145
146 #[test]
147 fn duration_is_forwarded_to_subscribers() {
148 let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
149 reset_subscribers();
150 let subscriber = RecordingSubscriber::new();
151 let events = subscriber.handle();
152 attach(subscriber);
153
154 notifications::instrument(
155 "log_subscriber.duration_is_forwarded_to_subscribers",
156 HashMap::new(),
157 || {
158 std::thread::sleep(Duration::from_millis(10));
159 },
160 );
161
162 let event = events
163 .lock()
164 .iter()
165 .find(|event| event.name == "log_subscriber.duration_is_forwarded_to_subscribers")
166 .cloned()
167 .expect("named event should be captured");
168 assert!(event.duration >= Duration::from_millis(10));
169 }
170
171 #[test]
172 fn multiple_attached_subscribers_receive_the_same_event() {
173 let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
174 reset_subscribers();
175 let first = RecordingSubscriber::new();
176 let second = RecordingSubscriber::new();
177 let first_events = first.handle();
178 let second_events = second.handle();
179 attach(first);
180 attach(second);
181
182 notifications::instrument(
183 "log_subscriber.multiple_attached_subscribers_receive_the_same_event",
184 HashMap::new(),
185 || (),
186 );
187
188 let first_matching = first_events
189 .lock()
190 .iter()
191 .filter(|event| {
192 event.name == "log_subscriber.multiple_attached_subscribers_receive_the_same_event"
193 })
194 .count();
195 let second_matching = second_events
196 .lock()
197 .iter()
198 .filter(|event| {
199 event.name == "log_subscriber.multiple_attached_subscribers_receive_the_same_event"
200 })
201 .count();
202 assert_eq!(first_matching, 1);
203 assert_eq!(second_matching, 1);
204 }
205
206 #[test]
207 fn started_at_is_recorded() {
208 let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
209 reset_subscribers();
210 let subscriber = RecordingSubscriber::new();
211 let events = subscriber.handle();
212 attach(subscriber);
213
214 notifications::instrument(
215 "log_subscriber.started_at_is_recorded",
216 HashMap::new(),
217 || (),
218 );
219
220 let event = events
221 .lock()
222 .iter()
223 .find(|event| event.name == "log_subscriber.started_at_is_recorded")
224 .cloned()
225 .expect("named event should be captured");
226 assert!(event.started_at <= std::time::Instant::now());
227 }
228
229 #[test]
230 fn reset_subscribers_clears_attached_subscribers() {
231 let _guard = TEST_LOCK.lock().expect("test lock should not be poisoned");
232 reset_subscribers();
233 let subscriber = RecordingSubscriber::new();
234 let events = subscriber.handle();
235 attach(subscriber);
236 reset_subscribers();
237
238 notifications::instrument(
239 "log_subscriber.reset_subscribers_clears_attached_subscribers",
240 HashMap::new(),
241 || (),
242 );
243
244 let matching = events
245 .lock()
246 .iter()
247 .filter(|event| {
248 event.name == "log_subscriber.reset_subscribers_clears_attached_subscribers"
249 })
250 .count();
251 assert_eq!(matching, 0);
252 }
253}