Skip to main content

rustrails_controller/
instrumentation.rs

1use std::collections::HashMap;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use axum::response::{IntoResponse, Response as AxumResponse};
5use serde_json::{Map, Value, json};
6
7use crate::params::Params;
8use crate::request::Request;
9
10#[derive(Debug, Clone, Copy)]
11struct TimingPayload {
12    started_at: u64,
13    finished_at: u64,
14    duration_ms: u128,
15}
16
17/// Instruments a controller action and publishes a `process_action.action_controller` event.
18pub fn process_action<F, R>(
19    controller: &str,
20    action: &str,
21    params: &Params,
22    request: &Request,
23    handler: F,
24) -> AxumResponse
25where
26    F: FnOnce() -> R,
27    R: IntoResponse,
28{
29    let started_at = Instant::now();
30    let started_at_unix_ms = unix_time_ms(SystemTime::now());
31    let response = handler().into_response();
32    let finished_at = Instant::now();
33    let finished_at_unix_ms = unix_time_ms(SystemTime::now());
34    let duration = finished_at.saturating_duration_since(started_at);
35    let timing = TimingPayload {
36        started_at: started_at_unix_ms,
37        finished_at: finished_at_unix_ms,
38        duration_ms: duration.as_millis(),
39    };
40
41    rustrails_support::notifications::default_notifier().publish(
42        rustrails_support::notifications::Event {
43            name: String::from("process_action.action_controller"),
44            payload: build_payload(
45                controller,
46                action,
47                params,
48                request,
49                response.status().as_u16(),
50                timing,
51            ),
52            time: started_at,
53            end: finished_at,
54            duration,
55        },
56    );
57
58    response
59}
60
61fn build_payload(
62    controller: &str,
63    action: &str,
64    params: &Params,
65    request: &Request,
66    status: u16,
67    timing: TimingPayload,
68) -> HashMap<String, Value> {
69    let params = params
70        .to_hash()
71        .iter()
72        .map(|(key, value)| (key.clone(), value.clone()))
73        .collect::<Map<String, Value>>();
74    HashMap::from([
75        (
76            String::from("controller"),
77            Value::String(controller.to_owned()),
78        ),
79        (String::from("action"), Value::String(action.to_owned())),
80        (String::from("params"), Value::Object(params)),
81        (
82            String::from("format"),
83            request
84                .format()
85                .map_or(Value::Null, |format| Value::String(format.to_owned())),
86        ),
87        (
88            String::from("method"),
89            Value::String(request.method().as_str().to_owned()),
90        ),
91        (
92            String::from("path"),
93            Value::String(request.path().to_owned()),
94        ),
95        (String::from("status"), Value::from(status)),
96        (String::from("start_time"), Value::from(timing.started_at)),
97        (String::from("end_time"), Value::from(timing.finished_at)),
98        (String::from("duration"), json!(timing.duration_ms)),
99    ])
100}
101
102fn unix_time_ms(time: SystemTime) -> u64 {
103    match time.duration_since(UNIX_EPOCH) {
104        Ok(duration) => u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
105        Err(_) => 0,
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use std::sync::{Arc, Mutex};
112    use std::thread;
113    use std::time::Duration;
114
115    use axum::body::{Body, to_bytes};
116    use http::StatusCode;
117    use serde_json::json;
118
119    use super::*;
120
121    static TEST_MUTEX: Mutex<()> = Mutex::new(());
122
123    fn lock<T>(value: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
124        value.lock().expect("mutex should not be poisoned")
125    }
126
127    fn sample_params() -> Params {
128        Params::from_map(HashMap::from([
129            (String::from("id"), json!(7)),
130            (String::from("page"), json!(2)),
131        ]))
132    }
133
134    fn sample_request() -> Request {
135        Request::new(http::Method::GET, "/posts/7.json")
136    }
137
138    fn subscribe_once() -> (
139        usize,
140        Arc<Mutex<Vec<rustrails_support::notifications::Event>>>,
141    ) {
142        let events = Arc::new(Mutex::new(Vec::new()));
143        let received = Arc::clone(&events);
144        let id = rustrails_support::notifications::subscribe(
145            "process_action.action_controller",
146            Box::new(move |event| {
147                lock(&received).push(event.clone());
148            }),
149        );
150        (id, events)
151    }
152
153    #[tokio::test]
154    async fn process_action_returns_original_response() {
155        let response = {
156            let _guard = lock(&TEST_MUTEX);
157            let request = sample_request();
158            process_action(
159                "PostsController",
160                "show",
161                &sample_params(),
162                &request,
163                || (StatusCode::CREATED, Body::from("created")),
164            )
165        };
166        assert_eq!(response.status(), StatusCode::CREATED);
167        let body = to_bytes(response.into_body(), usize::MAX)
168            .await
169            .expect("body should read");
170        assert_eq!(body, "created");
171    }
172
173    #[test]
174    fn process_action_publishes_expected_event_name() {
175        let _guard = lock(&TEST_MUTEX);
176        let request = sample_request();
177        let (id, events) = subscribe_once();
178        let _ = process_action(
179            "PostsController",
180            "show",
181            &sample_params(),
182            &request,
183            || StatusCode::OK,
184        );
185        rustrails_support::notifications::default_notifier().unsubscribe(id);
186
187        let events = lock(&events);
188        assert_eq!(events.len(), 1);
189        assert_eq!(events[0].name, "process_action.action_controller");
190    }
191
192    #[test]
193    fn process_action_payload_includes_controller_and_action() {
194        let _guard = lock(&TEST_MUTEX);
195        let request = sample_request();
196        let (id, events) = subscribe_once();
197        let _ = process_action(
198            "PostsController",
199            "show",
200            &sample_params(),
201            &request,
202            || StatusCode::OK,
203        );
204        rustrails_support::notifications::default_notifier().unsubscribe(id);
205
206        let payload = &lock(&events)[0].payload;
207        assert_eq!(
208            payload.get("controller"),
209            Some(&Value::String(String::from("PostsController")))
210        );
211        assert_eq!(
212            payload.get("action"),
213            Some(&Value::String(String::from("show")))
214        );
215    }
216
217    #[test]
218    fn process_action_payload_includes_params_method_and_path() {
219        let _guard = lock(&TEST_MUTEX);
220        let request = sample_request();
221        let (id, events) = subscribe_once();
222        let _ = process_action(
223            "PostsController",
224            "show",
225            &sample_params(),
226            &request,
227            || StatusCode::OK,
228        );
229        rustrails_support::notifications::default_notifier().unsubscribe(id);
230
231        let payload = &lock(&events)[0].payload;
232        assert_eq!(
233            payload.get("method"),
234            Some(&Value::String(String::from("GET")))
235        );
236        assert_eq!(
237            payload.get("path"),
238            Some(&Value::String(String::from("/posts/7.json")))
239        );
240        assert_eq!(payload.get("params"), Some(&json!({"id": 7, "page": 2})));
241    }
242
243    #[test]
244    fn process_action_payload_includes_format_and_status() {
245        let _guard = lock(&TEST_MUTEX);
246        let request = sample_request();
247        let (id, events) = subscribe_once();
248        let _ = process_action(
249            "PostsController",
250            "show",
251            &sample_params(),
252            &request,
253            || StatusCode::ACCEPTED,
254        );
255        rustrails_support::notifications::default_notifier().unsubscribe(id);
256
257        let payload = &lock(&events)[0].payload;
258        assert_eq!(
259            payload.get("format"),
260            Some(&Value::String(String::from("json")))
261        );
262        assert_eq!(payload.get("status"), Some(&Value::from(202_u16)));
263    }
264
265    #[test]
266    fn process_action_records_start_end_and_duration_payload() {
267        let _guard = lock(&TEST_MUTEX);
268        let request = sample_request();
269        let (id, events) = subscribe_once();
270        let _ = process_action(
271            "PostsController",
272            "show",
273            &sample_params(),
274            &request,
275            || {
276                thread::sleep(Duration::from_millis(5));
277                StatusCode::OK
278            },
279        );
280        rustrails_support::notifications::default_notifier().unsubscribe(id);
281
282        let payload = &lock(&events)[0].payload;
283        let start = payload
284            .get("start_time")
285            .and_then(Value::as_u64)
286            .expect("start time should exist");
287        let end = payload
288            .get("end_time")
289            .and_then(Value::as_u64)
290            .expect("end time should exist");
291        let duration = payload
292            .get("duration")
293            .and_then(Value::as_u64)
294            .expect("duration should exist");
295        assert!(end >= start);
296        assert!(duration >= 5);
297    }
298
299    #[test]
300    fn process_action_event_duration_tracks_elapsed_time() {
301        let _guard = lock(&TEST_MUTEX);
302        let request = sample_request();
303        let (id, events) = subscribe_once();
304        let _ = process_action(
305            "PostsController",
306            "show",
307            &sample_params(),
308            &request,
309            || {
310                thread::sleep(Duration::from_millis(5));
311                StatusCode::OK
312            },
313        );
314        rustrails_support::notifications::default_notifier().unsubscribe(id);
315
316        let event = &lock(&events)[0];
317        assert!(event.duration >= Duration::from_millis(5));
318        assert!(event.end >= event.time);
319    }
320
321    #[test]
322    fn process_action_supports_missing_request_format() {
323        let _guard = lock(&TEST_MUTEX);
324        let request = Request::new(http::Method::GET, "/posts/7");
325        let (id, events) = subscribe_once();
326        let _ = process_action(
327            "PostsController",
328            "show",
329            &sample_params(),
330            &request,
331            || StatusCode::OK,
332        );
333        rustrails_support::notifications::default_notifier().unsubscribe(id);
334
335        let payload = &lock(&events)[0].payload;
336        assert_eq!(payload.get("format"), Some(&Value::Null));
337    }
338
339    #[test]
340    fn process_action_payload_captures_non_success_statuses() {
341        let _guard = lock(&TEST_MUTEX);
342        let request = sample_request();
343        let (id, events) = subscribe_once();
344        let _ = process_action(
345            "PostsController",
346            "show",
347            &sample_params(),
348            &request,
349            || StatusCode::NOT_FOUND,
350        );
351        rustrails_support::notifications::default_notifier().unsubscribe(id);
352
353        let payload = &lock(&events)[0].payload;
354        assert_eq!(payload.get("status"), Some(&Value::from(404_u16)));
355    }
356
357    #[test]
358    fn process_action_build_payload_preserves_param_values() {
359        let request = sample_request();
360        let payload = build_payload(
361            "PostsController",
362            "show",
363            &sample_params(),
364            &request,
365            200,
366            TimingPayload {
367                started_at: 10,
368                finished_at: 15,
369                duration_ms: 5,
370            },
371        );
372        assert_eq!(payload.get("params"), Some(&json!({"id": 7, "page": 2})));
373    }
374
375    #[test]
376    fn unix_time_ms_returns_zero_before_epoch() {
377        assert_eq!(unix_time_ms(UNIX_EPOCH - Duration::from_secs(1)), 0);
378    }
379
380    #[test]
381    fn process_action_handles_empty_params() {
382        let _guard = lock(&TEST_MUTEX);
383        let request = sample_request();
384        let params = Params::new();
385        let (id, events) = subscribe_once();
386        let _ = process_action("PostsController", "index", &params, &request, || {
387            StatusCode::OK
388        });
389        rustrails_support::notifications::default_notifier().unsubscribe(id);
390        let payload = &lock(&events)[0].payload;
391        assert_eq!(payload.get("params"), Some(&json!({})));
392    }
393
394    #[test]
395    fn process_action_can_publish_multiple_events() {
396        let _guard = lock(&TEST_MUTEX);
397        let request = sample_request();
398        let (id, events) = subscribe_once();
399        let _ = process_action(
400            "PostsController",
401            "show",
402            &sample_params(),
403            &request,
404            || StatusCode::OK,
405        );
406        let _ = process_action("PostsController", "index", &Params::new(), &request, || {
407            StatusCode::OK
408        });
409        rustrails_support::notifications::default_notifier().unsubscribe(id);
410        assert_eq!(lock(&events).len(), 2);
411    }
412
413    #[test]
414    fn process_action_allows_json_bodies_via_into_response() {
415        let _guard = lock(&TEST_MUTEX);
416        let request = sample_request();
417        let response = process_action(
418            "PostsController",
419            "show",
420            &sample_params(),
421            &request,
422            || axum::Json(json!({"ok": true})),
423        );
424        assert_eq!(response.status(), StatusCode::OK);
425    }
426}