1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use outlet::{RequestData, RequestHandler, ResponseData};
86use serde::{Deserialize, Serialize};
87use serde_json::Value;
88use sqlx::PgPool;
89use std::collections::HashMap;
90use std::sync::Arc;
91use std::time::SystemTime;
92use tracing::{debug, error, instrument, warn};
93use uuid::Uuid;
94
95pub mod error;
96pub mod repository;
97pub use error::PostgresHandlerError;
98pub use repository::{
99 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
100};
101
102pub fn migrator() -> sqlx::migrate::Migrator {
126 sqlx::migrate!("./migrations")
127}
128
129type RequestSerializer<T> =
134 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
135
136type ResponseSerializer<T> = Arc<
142 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
143 + Send
144 + Sync,
145>;
146
147#[derive(Clone)]
156pub struct PostgresHandler<TReq = Value, TRes = Value>
157where
158 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160{
161 pool: PgPool,
162 request_serializer: RequestSerializer<TReq>,
163 response_serializer: ResponseSerializer<TRes>,
164 instance_id: Uuid,
165}
166
167impl<TReq, TRes> PostgresHandler<TReq, TRes>
168where
169 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
170 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
171{
172 fn default_request_serializer() -> RequestSerializer<TReq> {
175 Arc::new(|request_data| {
176 let bytes = request_data.body.as_deref().unwrap_or(&[]);
177 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
178 let fallback_data = String::from_utf8_lossy(bytes).to_string();
179 SerializationError::new(fallback_data, error)
180 })
181 })
182 }
183
184 fn default_response_serializer() -> ResponseSerializer<TRes> {
187 Arc::new(|_request_data, response_data| {
188 let bytes = response_data.body.as_deref().unwrap_or(&[]);
189 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
190 let fallback_data = String::from_utf8_lossy(bytes).to_string();
191 SerializationError::new(fallback_data, error)
192 })
193 })
194 }
195
196 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
230 let pool = PgPool::connect(database_url)
231 .await
232 .map_err(PostgresHandlerError::Connection)?;
233
234 Ok(Self {
235 pool,
236 request_serializer: Self::default_request_serializer(),
237 response_serializer: Self::default_response_serializer(),
238 instance_id: Uuid::new_v4(),
239 })
240 }
241
242 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
254 where
255 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
256 {
257 self.request_serializer = Arc::new(serializer);
258 self
259 }
260
261 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
273 where
274 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
275 + Send
276 + Sync
277 + 'static,
278 {
279 self.response_serializer = Arc::new(serializer);
280 self
281 }
282
283 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
318 Ok(Self {
319 pool,
320 request_serializer: Self::default_request_serializer(),
321 response_serializer: Self::default_response_serializer(),
322 instance_id: Uuid::new_v4(),
323 })
324 }
325
326 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
328 let mut header_map = HashMap::new();
329 for (name, values) in headers {
330 if values.len() == 1 {
331 let value_str = String::from_utf8_lossy(&values[0]).to_string();
332 header_map.insert(name.clone(), Value::String(value_str));
333 } else {
334 let value_array: Vec<Value> = values
335 .iter()
336 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
337 .collect();
338 header_map.insert(name.clone(), Value::Array(value_array));
339 }
340 }
341 serde_json::to_value(header_map).unwrap_or(Value::Null)
342 }
343
344 fn request_body_to_json_with_fallback(
346 &self,
347 request_data: &outlet::RequestData,
348 ) -> (Value, bool) {
349 match (self.request_serializer)(request_data) {
350 Ok(typed_value) => {
351 if let Ok(json_value) = serde_json::to_value(&typed_value) {
352 (json_value, true)
353 } else {
354 (
356 Value::String(
357 serde_json::to_string(&typed_value)
358 .expect("Serialized value must be convertible to JSON string"),
359 ),
360 false,
361 )
362 }
363 }
364 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
365 }
366 }
367
368 fn response_body_to_json_with_fallback(
370 &self,
371 request_data: &outlet::RequestData,
372 response_data: &outlet::ResponseData,
373 ) -> (Value, bool) {
374 match (self.response_serializer)(request_data, response_data) {
375 Ok(typed_value) => {
376 if let Ok(json_value) = serde_json::to_value(&typed_value) {
377 (json_value, true)
378 } else {
379 (
381 Value::String(
382 serde_json::to_string(&typed_value)
383 .expect("Serialized value must be convertible to JSON string"),
384 ),
385 false,
386 )
387 }
388 }
389 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
390 }
391 }
392
393 pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
398 crate::repository::RequestRepository::new(self.pool.clone())
399 }
400}
401
402impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
403where
404 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
405 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
406{
407 #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
408 async fn handle_request(&self, data: RequestData) {
409 let headers_json = Self::headers_to_json(&data.headers);
410 let (body_json, parsed) = if data.body.is_some() {
411 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
412 (Some(json), parsed)
413 } else {
414 (None, false)
415 };
416
417 let timestamp: DateTime<Utc> = data.timestamp.into();
418
419 let result = sqlx::query(
420 r#"
421 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
422 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
423 "#,
424 )
425 .bind(self.instance_id)
426 .bind(data.correlation_id as i64)
427 .bind(timestamp)
428 .bind(data.method.to_string())
429 .bind(data.uri.to_string())
430 .bind(headers_json)
431 .bind(body_json)
432 .bind(parsed)
433 .execute(&self.pool)
434 .await;
435
436 if let Err(e) = result {
437 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
438 } else {
439 let processing_lag_ms = SystemTime::now()
440 .duration_since(data.timestamp)
441 .unwrap_or_default()
442 .as_millis();
443 if processing_lag_ms > 1000 {
444 warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
445 } else {
446 debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
447 }
448 }
449 }
450
451 #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
452 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
453 let headers_json = Self::headers_to_json(&response_data.headers);
454 let (body_json, parsed) = if response_data.body.is_some() {
455 let (json, parsed) =
456 self.response_body_to_json_with_fallback(&request_data, &response_data);
457 (Some(json), parsed)
458 } else {
459 (None, false)
460 };
461
462 let timestamp: DateTime<Utc> = response_data.timestamp.into();
463 let duration_ms = response_data.duration.as_millis() as i64;
464 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
465
466 let result = sqlx::query(
467 r#"
468 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
469 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
470 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
471 "#,
472 )
473 .bind(self.instance_id)
474 .bind(request_data.correlation_id as i64)
475 .bind(timestamp)
476 .bind(response_data.status.as_u16() as i32)
477 .bind(headers_json)
478 .bind(body_json)
479 .bind(parsed)
480 .bind(duration_to_first_byte_ms)
481 .bind(duration_ms)
482 .execute(&self.pool)
483 .await;
484
485 match result {
486 Err(e) => {
487 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
488 }
489 Ok(query_result) => {
490 if query_result.rows_affected() > 0 {
491 let processing_lag_ms = SystemTime::now()
492 .duration_since(response_data.timestamp)
493 .unwrap_or_default()
494 .as_millis();
495 if processing_lag_ms > 1000 {
496 warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
497 } else {
498 debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
499 }
500 } else {
501 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
502 }
503 }
504 }
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511 use bytes::Bytes;
512 use chrono::{DateTime, Utc};
513 use outlet::{RequestData, ResponseData};
514 use serde::{Deserialize, Serialize};
515 use serde_json::Value;
516 use sqlx::PgPool;
517 use std::collections::HashMap;
518 use std::time::{Duration, SystemTime};
519
520 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
521 struct TestRequest {
522 user_id: u64,
523 action: String,
524 }
525
526 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
527 struct TestResponse {
528 success: bool,
529 message: String,
530 }
531
532 fn create_test_request_data() -> RequestData {
533 let mut headers = HashMap::new();
534 headers.insert("content-type".to_string(), vec!["application/json".into()]);
535 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
536
537 let test_req = TestRequest {
538 user_id: 123,
539 action: "create_user".to_string(),
540 };
541 let body = serde_json::to_vec(&test_req).unwrap();
542
543 RequestData {
544 method: http::Method::POST,
545 uri: http::Uri::from_static("/api/users"),
546 headers,
547 body: Some(Bytes::from(body)),
548 timestamp: SystemTime::now(),
549 correlation_id: 0,
550 }
551 }
552
553 fn create_test_response_data() -> ResponseData {
554 let mut headers = HashMap::new();
555 headers.insert("content-type".to_string(), vec!["application/json".into()]);
556
557 let test_res = TestResponse {
558 success: true,
559 message: "User created successfully".to_string(),
560 };
561 let body = serde_json::to_vec(&test_res).unwrap();
562
563 ResponseData {
564 status: http::StatusCode::CREATED,
565 headers,
566 body: Some(Bytes::from(body)),
567 timestamp: SystemTime::now(),
568 duration_to_first_byte: Duration::from_millis(100),
569 duration: Duration::from_millis(150),
570 correlation_id: 0,
571 }
572 }
573
574 #[sqlx::test]
575 async fn test_handler_creation(pool: PgPool) {
576 crate::migrator().run(&pool).await.unwrap();
578
579 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
580 .await
581 .unwrap();
582
583 let repository = handler.repository();
585
586 let filter = RequestFilter::default();
588 let results = repository.query(filter).await.unwrap();
589 assert!(results.is_empty());
590 }
591
592 #[sqlx::test]
593 async fn test_handle_request_with_typed_body(pool: PgPool) {
594 crate::migrator().run(&pool).await.unwrap();
596
597 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
598 .await
599 .unwrap();
600 let repository = handler.repository();
601
602 let mut request_data = create_test_request_data();
603 let correlation_id = 12345;
604 request_data.correlation_id = correlation_id;
605
606 handler.handle_request(request_data.clone()).await;
608
609 let filter = RequestFilter {
611 correlation_id: Some(correlation_id as i64),
612 ..Default::default()
613 };
614 let results = repository.query(filter).await.unwrap();
615
616 assert_eq!(results.len(), 1);
617 let pair = &results[0];
618
619 assert_eq!(pair.request.correlation_id, correlation_id as i64);
620 assert_eq!(pair.request.method, "POST");
621 assert_eq!(pair.request.uri, "/api/users");
622
623 match &pair.request.body {
625 Some(Ok(parsed_body)) => {
626 assert_eq!(
627 *parsed_body,
628 TestRequest {
629 user_id: 123,
630 action: "create_user".to_string(),
631 }
632 );
633 }
634 _ => panic!("Expected successfully parsed request body"),
635 }
636
637 let headers_value = &pair.request.headers;
639 assert!(headers_value.get("content-type").is_some());
640 assert!(headers_value.get("user-agent").is_some());
641
642 assert!(pair.response.is_none());
644 }
645
646 #[sqlx::test]
647 async fn test_handle_response_with_typed_body(pool: PgPool) {
648 crate::migrator().run(&pool).await.unwrap();
650
651 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
652 .await
653 .unwrap();
654 let repository = handler.repository();
655
656 let mut request_data = create_test_request_data();
657 let mut response_data = create_test_response_data();
658 let correlation_id = 54321;
659 request_data.correlation_id = correlation_id;
660 response_data.correlation_id = correlation_id;
661
662 handler.handle_request(request_data.clone()).await;
664 handler
665 .handle_response(request_data, response_data.clone())
666 .await;
667
668 let filter = RequestFilter {
670 correlation_id: Some(correlation_id as i64),
671 ..Default::default()
672 };
673 let results = repository.query(filter).await.unwrap();
674
675 assert_eq!(results.len(), 1);
676 let pair = &results[0];
677
678 let response = pair.response.as_ref().expect("Response should be present");
680 assert_eq!(response.correlation_id, correlation_id as i64);
681 assert_eq!(response.status_code, 201);
682 assert_eq!(response.duration_ms, 150);
683
684 match &response.body {
686 Some(Ok(parsed_body)) => {
687 assert_eq!(
688 *parsed_body,
689 TestResponse {
690 success: true,
691 message: "User created successfully".to_string(),
692 }
693 );
694 }
695 _ => panic!("Expected successfully parsed response body"),
696 }
697 }
698
699 #[sqlx::test]
700 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
701 crate::migrator().run(&pool).await.unwrap();
703
704 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
705 .await
706 .unwrap();
707 let repository = handler.repository();
708
709 let mut headers = HashMap::new();
711 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
712
713 let invalid_json_body = b"not valid json for TestRequest";
714 let correlation_id = 99999;
715 let request_data = RequestData {
716 method: http::Method::POST,
717 uri: http::Uri::from_static("/api/test"),
718 headers,
719 body: Some(Bytes::from(invalid_json_body.to_vec())),
720 timestamp: SystemTime::now(),
721 correlation_id,
722 };
723
724 handler.handle_request(request_data).await;
725
726 let filter = RequestFilter {
728 correlation_id: Some(correlation_id as i64),
729 ..Default::default()
730 };
731 let results = repository.query(filter).await.unwrap();
732
733 assert_eq!(results.len(), 1);
734 let pair = &results[0];
735
736 match &pair.request.body {
738 Some(Err(raw_bytes)) => {
739 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
740 }
741 _ => panic!("Expected raw bytes fallback for unparseable body"),
742 }
743 }
744
745 #[sqlx::test]
746 async fn test_query_with_multiple_filters(pool: PgPool) {
747 crate::migrator().run(&pool).await.unwrap();
749
750 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
751 .await
752 .unwrap();
753 let repository = handler.repository();
754
755 let test_cases = vec![
757 (1001, "GET", "/api/users", 200, 100),
758 (1002, "POST", "/api/users", 201, 150),
759 (1003, "GET", "/api/orders", 404, 50),
760 (1004, "PUT", "/api/users/123", 200, 300),
761 ];
762
763 for (correlation_id, method, uri, status, duration_ms) in test_cases {
764 let mut headers = HashMap::new();
765 headers.insert("content-type".to_string(), vec!["application/json".into()]);
766
767 let request_data = RequestData {
768 method: method.parse().unwrap(),
769 uri: uri.parse().unwrap(),
770 headers: headers.clone(),
771 body: Some(Bytes::from(b"{}".to_vec())),
772 timestamp: SystemTime::now(),
773 correlation_id,
774 };
775
776 let response_data = ResponseData {
777 correlation_id,
778 status: http::StatusCode::from_u16(status).unwrap(),
779 headers,
780 body: Some(Bytes::from(b"{}".to_vec())),
781 timestamp: SystemTime::now(),
782 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
783 duration: Duration::from_millis(duration_ms),
784 };
785
786 handler.handle_request(request_data.clone()).await;
787 handler.handle_response(request_data, response_data).await;
788 }
789
790 let filter = RequestFilter {
792 method: Some("GET".to_string()),
793 ..Default::default()
794 };
795 let results = repository.query(filter).await.unwrap();
796 assert_eq!(results.len(), 2); let filter = RequestFilter {
800 status_code: Some(200),
801 ..Default::default()
802 };
803 let results = repository.query(filter).await.unwrap();
804 assert_eq!(results.len(), 2); let filter = RequestFilter {
808 uri_pattern: Some("/api/users%".to_string()),
809 ..Default::default()
810 };
811 let results = repository.query(filter).await.unwrap();
812 assert_eq!(results.len(), 3); let filter = RequestFilter {
816 min_duration_ms: Some(100),
817 max_duration_ms: Some(200),
818 ..Default::default()
819 };
820 let results = repository.query(filter).await.unwrap();
821 assert_eq!(results.len(), 2); let filter = RequestFilter {
825 method: Some("GET".to_string()),
826 status_code: Some(200),
827 ..Default::default()
828 };
829 let results = repository.query(filter).await.unwrap();
830 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
832 }
833
834 #[sqlx::test]
835 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
836 crate::migrator().run(&pool).await.unwrap();
838
839 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
840 .await
841 .unwrap();
842 let repository = handler.repository();
843
844 let now = SystemTime::now();
846 for i in 0..5 {
847 let correlation_id = 2000 + i;
848 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
851 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
852
853 let request_data = RequestData {
854 method: http::Method::GET,
855 uri: "/api/test".parse().unwrap(),
856 headers,
857 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
858 timestamp,
859 correlation_id,
860 };
861
862 handler.handle_request(request_data).await;
863 }
864
865 let filter = RequestFilter {
867 limit: Some(3),
868 ..Default::default()
869 };
870 let results = repository.query(filter).await.unwrap();
871 assert_eq!(results.len(), 3);
872
873 for i in 0..2 {
875 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
876 }
877
878 let filter = RequestFilter {
880 order_by_timestamp_desc: true,
881 limit: Some(2),
882 offset: Some(1),
883 ..Default::default()
884 };
885 let results = repository.query(filter).await.unwrap();
886 assert_eq!(results.len(), 2);
887
888 assert!(results[0].request.timestamp >= results[1].request.timestamp);
890 }
891
892 #[sqlx::test]
893 async fn test_headers_conversion(pool: PgPool) {
894 crate::migrator().run(&pool).await.unwrap();
896
897 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
898 .await
899 .unwrap();
900 let repository = handler.repository();
901
902 let mut headers = HashMap::new();
904 headers.insert("single-value".to_string(), vec!["test".into()]);
905 headers.insert(
906 "multi-value".to_string(),
907 vec!["val1".into(), "val2".into()],
908 );
909 headers.insert("empty-value".to_string(), vec!["".into()]);
910
911 let request_data = RequestData {
912 correlation_id: 3000,
913 method: http::Method::GET,
914 uri: "/test".parse().unwrap(),
915 headers,
916 body: None,
917 timestamp: SystemTime::now(),
918 };
919
920 let correlation_id = 3000;
921 handler.handle_request(request_data).await;
922
923 let filter = RequestFilter {
924 correlation_id: Some(correlation_id as i64),
925 ..Default::default()
926 };
927 let results = repository.query(filter).await.unwrap();
928
929 assert_eq!(results.len(), 1);
930 let headers_json = &results[0].request.headers;
931
932 assert_eq!(
934 headers_json["single-value"],
935 Value::String("test".to_string())
936 );
937
938 match &headers_json["multi-value"] {
940 Value::Array(arr) => {
941 assert_eq!(arr.len(), 2);
942 assert_eq!(arr[0], Value::String("val1".to_string()));
943 assert_eq!(arr[1], Value::String("val2".to_string()));
944 }
945 _ => panic!("Expected array for multi-value header"),
946 }
947
948 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
950 }
951
952 #[sqlx::test]
953 async fn test_timestamp_filtering(pool: PgPool) {
954 crate::migrator().run(&pool).await.unwrap();
956
957 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
958 .await
959 .unwrap();
960 let repository = handler.repository();
961
962 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
966 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
970
971 for (i, timestamp) in times.iter().enumerate() {
972 let correlation_id = 4001 + i as u64;
973 let request_data = RequestData {
974 method: http::Method::GET,
975 uri: "/test".parse().unwrap(),
976 headers: HashMap::new(),
977 body: None,
978 timestamp: *timestamp,
979 correlation_id,
980 };
981
982 handler.handle_request(request_data).await;
983 }
984
985 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
988 timestamp_after: Some(after_time),
989 ..Default::default()
990 };
991 let results = repository.query(filter).await.unwrap();
992 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
997 timestamp_before: Some(before_time),
998 ..Default::default()
999 };
1000 let results = repository.query(filter).await.unwrap();
1001 assert_eq!(results.len(), 2); let filter = RequestFilter {
1005 timestamp_after: Some(after_time),
1006 timestamp_before: Some(before_time),
1007 ..Default::default()
1008 };
1009 let results = repository.query(filter).await.unwrap();
1010 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1012 }
1013
1014 #[sqlx::test]
1019 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1020 crate::migrator().run(&pool).await.unwrap();
1022
1023 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1025 .await
1026 .unwrap();
1027 let repository = handler.repository();
1028
1029 let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1030 for (i, uri) in test_uris.iter().enumerate() {
1031 let correlation_id = 3000 + i as u64;
1032 let mut headers = HashMap::new();
1033 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1034
1035 let request_data = RequestData {
1036 method: http::Method::GET,
1037 uri: uri.parse().unwrap(),
1038 headers,
1039 body: Some(Bytes::from(b"{}".to_vec())),
1040 timestamp: SystemTime::now(),
1041 correlation_id,
1042 };
1043
1044 handler.handle_request(request_data).await;
1045 }
1046
1047 let filter = RequestFilter::default();
1049 let results = repository.query(filter).await.unwrap();
1050 assert_eq!(results.len(), 4);
1051 }
1052}