1use std::collections::HashMap;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use axum::response::{IntoResponse, Response as AxumResponse};
5use serde_json::{Map, Value, json};
6
7use crate::params::Params;
8use crate::request::Request;
9
10#[derive(Debug, Clone, Copy)]
11struct TimingPayload {
12 started_at: u64,
13 finished_at: u64,
14 duration_ms: u128,
15}
16
17pub fn process_action<F, R>(
19 controller: &str,
20 action: &str,
21 params: &Params,
22 request: &Request,
23 handler: F,
24) -> AxumResponse
25where
26 F: FnOnce() -> R,
27 R: IntoResponse,
28{
29 let started_at = Instant::now();
30 let started_at_unix_ms = unix_time_ms(SystemTime::now());
31 let response = handler().into_response();
32 let finished_at = Instant::now();
33 let finished_at_unix_ms = unix_time_ms(SystemTime::now());
34 let duration = finished_at.saturating_duration_since(started_at);
35 let timing = TimingPayload {
36 started_at: started_at_unix_ms,
37 finished_at: finished_at_unix_ms,
38 duration_ms: duration.as_millis(),
39 };
40
41 rustrails_support::notifications::default_notifier().publish(
42 rustrails_support::notifications::Event {
43 name: String::from("process_action.action_controller"),
44 payload: build_payload(
45 controller,
46 action,
47 params,
48 request,
49 response.status().as_u16(),
50 timing,
51 ),
52 time: started_at,
53 end: finished_at,
54 duration,
55 },
56 );
57
58 response
59}
60
61fn build_payload(
62 controller: &str,
63 action: &str,
64 params: &Params,
65 request: &Request,
66 status: u16,
67 timing: TimingPayload,
68) -> HashMap<String, Value> {
69 let params = params
70 .to_hash()
71 .iter()
72 .map(|(key, value)| (key.clone(), value.clone()))
73 .collect::<Map<String, Value>>();
74 HashMap::from([
75 (
76 String::from("controller"),
77 Value::String(controller.to_owned()),
78 ),
79 (String::from("action"), Value::String(action.to_owned())),
80 (String::from("params"), Value::Object(params)),
81 (
82 String::from("format"),
83 request
84 .format()
85 .map_or(Value::Null, |format| Value::String(format.to_owned())),
86 ),
87 (
88 String::from("method"),
89 Value::String(request.method().as_str().to_owned()),
90 ),
91 (
92 String::from("path"),
93 Value::String(request.path().to_owned()),
94 ),
95 (String::from("status"), Value::from(status)),
96 (String::from("start_time"), Value::from(timing.started_at)),
97 (String::from("end_time"), Value::from(timing.finished_at)),
98 (String::from("duration"), json!(timing.duration_ms)),
99 ])
100}
101
102fn unix_time_ms(time: SystemTime) -> u64 {
103 match time.duration_since(UNIX_EPOCH) {
104 Ok(duration) => u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
105 Err(_) => 0,
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use std::sync::{Arc, Mutex};
112 use std::thread;
113 use std::time::Duration;
114
115 use axum::body::{Body, to_bytes};
116 use http::StatusCode;
117 use serde_json::json;
118
119 use super::*;
120
121 static TEST_MUTEX: Mutex<()> = Mutex::new(());
122
123 fn lock<T>(value: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
124 value.lock().expect("mutex should not be poisoned")
125 }
126
127 fn sample_params() -> Params {
128 Params::from_map(HashMap::from([
129 (String::from("id"), json!(7)),
130 (String::from("page"), json!(2)),
131 ]))
132 }
133
134 fn sample_request() -> Request {
135 Request::new(http::Method::GET, "/posts/7.json")
136 }
137
138 fn subscribe_once() -> (
139 usize,
140 Arc<Mutex<Vec<rustrails_support::notifications::Event>>>,
141 ) {
142 let events = Arc::new(Mutex::new(Vec::new()));
143 let received = Arc::clone(&events);
144 let id = rustrails_support::notifications::subscribe(
145 "process_action.action_controller",
146 Box::new(move |event| {
147 lock(&received).push(event.clone());
148 }),
149 );
150 (id, events)
151 }
152
153 #[tokio::test]
154 async fn process_action_returns_original_response() {
155 let response = {
156 let _guard = lock(&TEST_MUTEX);
157 let request = sample_request();
158 process_action(
159 "PostsController",
160 "show",
161 &sample_params(),
162 &request,
163 || (StatusCode::CREATED, Body::from("created")),
164 )
165 };
166 assert_eq!(response.status(), StatusCode::CREATED);
167 let body = to_bytes(response.into_body(), usize::MAX)
168 .await
169 .expect("body should read");
170 assert_eq!(body, "created");
171 }
172
173 #[test]
174 fn process_action_publishes_expected_event_name() {
175 let _guard = lock(&TEST_MUTEX);
176 let request = sample_request();
177 let (id, events) = subscribe_once();
178 let _ = process_action(
179 "PostsController",
180 "show",
181 &sample_params(),
182 &request,
183 || StatusCode::OK,
184 );
185 rustrails_support::notifications::default_notifier().unsubscribe(id);
186
187 let events = lock(&events);
188 assert_eq!(events.len(), 1);
189 assert_eq!(events[0].name, "process_action.action_controller");
190 }
191
192 #[test]
193 fn process_action_payload_includes_controller_and_action() {
194 let _guard = lock(&TEST_MUTEX);
195 let request = sample_request();
196 let (id, events) = subscribe_once();
197 let _ = process_action(
198 "PostsController",
199 "show",
200 &sample_params(),
201 &request,
202 || StatusCode::OK,
203 );
204 rustrails_support::notifications::default_notifier().unsubscribe(id);
205
206 let payload = &lock(&events)[0].payload;
207 assert_eq!(
208 payload.get("controller"),
209 Some(&Value::String(String::from("PostsController")))
210 );
211 assert_eq!(
212 payload.get("action"),
213 Some(&Value::String(String::from("show")))
214 );
215 }
216
217 #[test]
218 fn process_action_payload_includes_params_method_and_path() {
219 let _guard = lock(&TEST_MUTEX);
220 let request = sample_request();
221 let (id, events) = subscribe_once();
222 let _ = process_action(
223 "PostsController",
224 "show",
225 &sample_params(),
226 &request,
227 || StatusCode::OK,
228 );
229 rustrails_support::notifications::default_notifier().unsubscribe(id);
230
231 let payload = &lock(&events)[0].payload;
232 assert_eq!(
233 payload.get("method"),
234 Some(&Value::String(String::from("GET")))
235 );
236 assert_eq!(
237 payload.get("path"),
238 Some(&Value::String(String::from("/posts/7.json")))
239 );
240 assert_eq!(payload.get("params"), Some(&json!({"id": 7, "page": 2})));
241 }
242
243 #[test]
244 fn process_action_payload_includes_format_and_status() {
245 let _guard = lock(&TEST_MUTEX);
246 let request = sample_request();
247 let (id, events) = subscribe_once();
248 let _ = process_action(
249 "PostsController",
250 "show",
251 &sample_params(),
252 &request,
253 || StatusCode::ACCEPTED,
254 );
255 rustrails_support::notifications::default_notifier().unsubscribe(id);
256
257 let payload = &lock(&events)[0].payload;
258 assert_eq!(
259 payload.get("format"),
260 Some(&Value::String(String::from("json")))
261 );
262 assert_eq!(payload.get("status"), Some(&Value::from(202_u16)));
263 }
264
265 #[test]
266 fn process_action_records_start_end_and_duration_payload() {
267 let _guard = lock(&TEST_MUTEX);
268 let request = sample_request();
269 let (id, events) = subscribe_once();
270 let _ = process_action(
271 "PostsController",
272 "show",
273 &sample_params(),
274 &request,
275 || {
276 thread::sleep(Duration::from_millis(5));
277 StatusCode::OK
278 },
279 );
280 rustrails_support::notifications::default_notifier().unsubscribe(id);
281
282 let payload = &lock(&events)[0].payload;
283 let start = payload
284 .get("start_time")
285 .and_then(Value::as_u64)
286 .expect("start time should exist");
287 let end = payload
288 .get("end_time")
289 .and_then(Value::as_u64)
290 .expect("end time should exist");
291 let duration = payload
292 .get("duration")
293 .and_then(Value::as_u64)
294 .expect("duration should exist");
295 assert!(end >= start);
296 assert!(duration >= 5);
297 }
298
299 #[test]
300 fn process_action_event_duration_tracks_elapsed_time() {
301 let _guard = lock(&TEST_MUTEX);
302 let request = sample_request();
303 let (id, events) = subscribe_once();
304 let _ = process_action(
305 "PostsController",
306 "show",
307 &sample_params(),
308 &request,
309 || {
310 thread::sleep(Duration::from_millis(5));
311 StatusCode::OK
312 },
313 );
314 rustrails_support::notifications::default_notifier().unsubscribe(id);
315
316 let event = &lock(&events)[0];
317 assert!(event.duration >= Duration::from_millis(5));
318 assert!(event.end >= event.time);
319 }
320
321 #[test]
322 fn process_action_supports_missing_request_format() {
323 let _guard = lock(&TEST_MUTEX);
324 let request = Request::new(http::Method::GET, "/posts/7");
325 let (id, events) = subscribe_once();
326 let _ = process_action(
327 "PostsController",
328 "show",
329 &sample_params(),
330 &request,
331 || StatusCode::OK,
332 );
333 rustrails_support::notifications::default_notifier().unsubscribe(id);
334
335 let payload = &lock(&events)[0].payload;
336 assert_eq!(payload.get("format"), Some(&Value::Null));
337 }
338
339 #[test]
340 fn process_action_payload_captures_non_success_statuses() {
341 let _guard = lock(&TEST_MUTEX);
342 let request = sample_request();
343 let (id, events) = subscribe_once();
344 let _ = process_action(
345 "PostsController",
346 "show",
347 &sample_params(),
348 &request,
349 || StatusCode::NOT_FOUND,
350 );
351 rustrails_support::notifications::default_notifier().unsubscribe(id);
352
353 let payload = &lock(&events)[0].payload;
354 assert_eq!(payload.get("status"), Some(&Value::from(404_u16)));
355 }
356
357 #[test]
358 fn process_action_build_payload_preserves_param_values() {
359 let request = sample_request();
360 let payload = build_payload(
361 "PostsController",
362 "show",
363 &sample_params(),
364 &request,
365 200,
366 TimingPayload {
367 started_at: 10,
368 finished_at: 15,
369 duration_ms: 5,
370 },
371 );
372 assert_eq!(payload.get("params"), Some(&json!({"id": 7, "page": 2})));
373 }
374
375 #[test]
376 fn unix_time_ms_returns_zero_before_epoch() {
377 assert_eq!(unix_time_ms(UNIX_EPOCH - Duration::from_secs(1)), 0);
378 }
379
380 #[test]
381 fn process_action_handles_empty_params() {
382 let _guard = lock(&TEST_MUTEX);
383 let request = sample_request();
384 let params = Params::new();
385 let (id, events) = subscribe_once();
386 let _ = process_action("PostsController", "index", ¶ms, &request, || {
387 StatusCode::OK
388 });
389 rustrails_support::notifications::default_notifier().unsubscribe(id);
390 let payload = &lock(&events)[0].payload;
391 assert_eq!(payload.get("params"), Some(&json!({})));
392 }
393
394 #[test]
395 fn process_action_can_publish_multiple_events() {
396 let _guard = lock(&TEST_MUTEX);
397 let request = sample_request();
398 let (id, events) = subscribe_once();
399 let _ = process_action(
400 "PostsController",
401 "show",
402 &sample_params(),
403 &request,
404 || StatusCode::OK,
405 );
406 let _ = process_action("PostsController", "index", &Params::new(), &request, || {
407 StatusCode::OK
408 });
409 rustrails_support::notifications::default_notifier().unsubscribe(id);
410 assert_eq!(lock(&events).len(), 2);
411 }
412
413 #[test]
414 fn process_action_allows_json_bodies_via_into_response() {
415 let _guard = lock(&TEST_MUTEX);
416 let request = sample_request();
417 let response = process_action(
418 "PostsController",
419 "show",
420 &sample_params(),
421 &request,
422 || axum::Json(json!({"ok": true})),
423 );
424 assert_eq!(response.status(), StatusCode::OK);
425 }
426}