1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use http::Uri;
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use tracing::{debug, error, instrument};
93use uuid::Uuid;
94
95pub mod error;
96pub mod repository;
97pub use error::PostgresHandlerError;
98pub use repository::{
99 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
100};
101
102pub fn migrator() -> sqlx::migrate::Migrator {
126 sqlx::migrate!("./migrations")
127}
128
129type RequestSerializer<T> =
134 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
135
136type ResponseSerializer<T> = Arc<
142 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
143 + Send
144 + Sync,
145>;
146
147#[derive(Clone)]
156pub struct PostgresHandler<TReq = Value, TRes = Value>
157where
158 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160{
161 pool: PgPool,
162 request_serializer: RequestSerializer<TReq>,
163 response_serializer: ResponseSerializer<TRes>,
164 path_filter: Option<PathFilter>,
165 instance_id: Uuid,
166}
167
168#[derive(Clone, Debug)]
170pub struct PathFilter {
171 pub allowed_prefixes: Vec<String>,
174 pub blocked_prefixes: Vec<String>,
177}
178
179impl<TReq, TRes> PostgresHandler<TReq, TRes>
180where
181 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
182 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
183{
184 fn default_request_serializer() -> RequestSerializer<TReq> {
187 Arc::new(|request_data| {
188 let bytes = request_data.body.as_deref().unwrap_or(&[]);
189 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
190 let fallback_data = String::from_utf8_lossy(bytes).to_string();
191 SerializationError::new(fallback_data, error)
192 })
193 })
194 }
195
196 fn default_response_serializer() -> ResponseSerializer<TRes> {
199 Arc::new(|_request_data, response_data| {
200 let bytes = response_data.body.as_deref().unwrap_or(&[]);
201 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
202 let fallback_data = String::from_utf8_lossy(bytes).to_string();
203 SerializationError::new(fallback_data, error)
204 })
205 })
206 }
207
208 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
242 let pool = PgPool::connect(database_url)
243 .await
244 .map_err(PostgresHandlerError::Connection)?;
245
246 Ok(Self {
247 pool,
248 request_serializer: Self::default_request_serializer(),
249 response_serializer: Self::default_response_serializer(),
250 path_filter: None,
251 instance_id: Uuid::new_v4(),
252 })
253 }
254
255 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
267 where
268 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
269 {
270 self.request_serializer = Arc::new(serializer);
271 self
272 }
273
274 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
286 where
287 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
288 + Send
289 + Sync
290 + 'static,
291 {
292 self.response_serializer = Arc::new(serializer);
293 self
294 }
295
296 pub fn with_path_filter(mut self, filter: PathFilter) -> Self {
315 self.path_filter = Some(filter);
316 self
317 }
318
319 pub fn with_path_prefix(mut self, prefix: &str) -> Self {
335 self.path_filter = Some(PathFilter {
336 allowed_prefixes: vec![prefix.to_string()],
337 blocked_prefixes: vec![],
338 });
339 self
340 }
341
342 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
377 Ok(Self {
378 pool,
379 request_serializer: Self::default_request_serializer(),
380 response_serializer: Self::default_response_serializer(),
381 path_filter: None,
382 instance_id: Uuid::new_v4(),
383 })
384 }
385
386 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
388 let mut header_map = HashMap::new();
389 for (name, values) in headers {
390 if values.len() == 1 {
391 let value_str = String::from_utf8_lossy(&values[0]).to_string();
392 header_map.insert(name.clone(), Value::String(value_str));
393 } else {
394 let value_array: Vec<Value> = values
395 .iter()
396 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
397 .collect();
398 header_map.insert(name.clone(), Value::Array(value_array));
399 }
400 }
401 serde_json::to_value(header_map).unwrap_or(Value::Null)
402 }
403
404 fn request_body_to_json_with_fallback(
406 &self,
407 request_data: &outlet::RequestData,
408 ) -> (Value, bool) {
409 match (self.request_serializer)(request_data) {
410 Ok(typed_value) => {
411 if let Ok(json_value) = serde_json::to_value(&typed_value) {
412 (json_value, true)
413 } else {
414 (
416 Value::String(
417 serde_json::to_string(&typed_value)
418 .expect("Serialized value must be convertible to JSON string"),
419 ),
420 false,
421 )
422 }
423 }
424 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
425 }
426 }
427
428 fn response_body_to_json_with_fallback(
430 &self,
431 request_data: &outlet::RequestData,
432 response_data: &outlet::ResponseData,
433 ) -> (Value, bool) {
434 match (self.response_serializer)(request_data, response_data) {
435 Ok(typed_value) => {
436 if let Ok(json_value) = serde_json::to_value(&typed_value) {
437 (json_value, true)
438 } else {
439 (
441 Value::String(
442 serde_json::to_string(&typed_value)
443 .expect("Serialized value must be convertible to JSON string"),
444 ),
445 false,
446 )
447 }
448 }
449 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
450 }
451 }
452
453 fn should_log_request(&self, uri: &Uri) -> bool {
455 let path = uri.path();
456 debug!(%path, "Evaluating prefix");
457 let Some(filter) = &self.path_filter else {
458 return true; };
460
461 for blocked_prefix in &filter.blocked_prefixes {
463 if path.starts_with(blocked_prefix) {
464 return false;
465 }
466 }
467
468 if filter.allowed_prefixes.is_empty() {
470 return true;
471 }
472
473 filter
475 .allowed_prefixes
476 .iter()
477 .any(|prefix| path.starts_with(prefix))
478 }
479
480 pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
485 crate::repository::RequestRepository::new(self.pool.clone())
486 }
487}
488
489impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
490where
491 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
492 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
493{
494 #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
495 async fn handle_request(&self, data: RequestData) {
496 if !self.should_log_request(&data.uri) {
498 debug!(correlation_id = %data.correlation_id, uri = %data.uri, "Skipping request due to path filter");
499 return;
500 }
501
502 let headers_json = Self::headers_to_json(&data.headers);
503 let (body_json, parsed) = if data.body.is_some() {
504 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
505 (Some(json), parsed)
506 } else {
507 (None, false)
508 };
509
510 let timestamp: DateTime<Utc> = data.timestamp.into();
511
512 let result = sqlx::query(
513 r#"
514 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
515 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
516 "#,
517 )
518 .bind(self.instance_id)
519 .bind(data.correlation_id as i64)
520 .bind(timestamp)
521 .bind(data.method.to_string())
522 .bind(data.uri.to_string())
523 .bind(headers_json)
524 .bind(body_json)
525 .bind(parsed)
526 .execute(&self.pool)
527 .await;
528
529 if let Err(e) = result {
530 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
531 } else {
532 debug!(correlation_id = %data.correlation_id, "Request data inserted successfully");
533 }
534 }
535
536 #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
537 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
538 let headers_json = Self::headers_to_json(&response_data.headers);
539 let (body_json, parsed) = if response_data.body.is_some() {
540 let (json, parsed) =
541 self.response_body_to_json_with_fallback(&request_data, &response_data);
542 (Some(json), parsed)
543 } else {
544 (None, false)
545 };
546
547 let timestamp: DateTime<Utc> = response_data.timestamp.into();
548 let duration_ms = response_data.duration.as_millis() as i64;
549 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
550
551 let result = sqlx::query(
552 r#"
553 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
554 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
555 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
556 "#,
557 )
558 .bind(self.instance_id)
559 .bind(request_data.correlation_id as i64)
560 .bind(timestamp)
561 .bind(response_data.status.as_u16() as i32)
562 .bind(headers_json)
563 .bind(body_json)
564 .bind(parsed)
565 .bind(duration_to_first_byte_ms)
566 .bind(duration_ms)
567 .execute(&self.pool)
568 .await;
569
570 match result {
571 Err(e) => {
572 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
573 }
574 Ok(query_result) => {
575 if query_result.rows_affected() > 0 {
576 debug!(correlation_id = %request_data.correlation_id, "Response data inserted successfully");
577 } else {
578 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
579 }
580 }
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use bytes::Bytes;
589 use chrono::{DateTime, Utc};
590 use outlet::{RequestData, ResponseData};
591 use serde::{Deserialize, Serialize};
592 use serde_json::Value;
593 use sqlx::PgPool;
594 use std::collections::HashMap;
595 use std::time::{Duration, SystemTime};
596
597 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
598 struct TestRequest {
599 user_id: u64,
600 action: String,
601 }
602
603 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
604 struct TestResponse {
605 success: bool,
606 message: String,
607 }
608
609 fn create_test_request_data() -> RequestData {
610 let mut headers = HashMap::new();
611 headers.insert("content-type".to_string(), vec!["application/json".into()]);
612 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
613
614 let test_req = TestRequest {
615 user_id: 123,
616 action: "create_user".to_string(),
617 };
618 let body = serde_json::to_vec(&test_req).unwrap();
619
620 RequestData {
621 method: http::Method::POST,
622 uri: http::Uri::from_static("/api/users"),
623 headers,
624 body: Some(Bytes::from(body)),
625 timestamp: SystemTime::now(),
626 correlation_id: 0,
627 }
628 }
629
630 fn create_test_response_data() -> ResponseData {
631 let mut headers = HashMap::new();
632 headers.insert("content-type".to_string(), vec!["application/json".into()]);
633
634 let test_res = TestResponse {
635 success: true,
636 message: "User created successfully".to_string(),
637 };
638 let body = serde_json::to_vec(&test_res).unwrap();
639
640 ResponseData {
641 status: http::StatusCode::CREATED,
642 headers,
643 body: Some(Bytes::from(body)),
644 timestamp: SystemTime::now(),
645 duration_to_first_byte: Duration::from_millis(100),
646 duration: Duration::from_millis(150),
647 correlation_id: 0,
648 }
649 }
650
651 #[sqlx::test]
652 async fn test_handler_creation(pool: PgPool) {
653 crate::migrator().run(&pool).await.unwrap();
655
656 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
657 .await
658 .unwrap();
659
660 let repository = handler.repository();
662
663 let filter = RequestFilter::default();
665 let results = repository.query(filter).await.unwrap();
666 assert!(results.is_empty());
667 }
668
669 #[sqlx::test]
670 async fn test_handle_request_with_typed_body(pool: PgPool) {
671 crate::migrator().run(&pool).await.unwrap();
673
674 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
675 .await
676 .unwrap();
677 let repository = handler.repository();
678
679 let mut request_data = create_test_request_data();
680 let correlation_id = 12345;
681 request_data.correlation_id = correlation_id;
682
683 handler.handle_request(request_data.clone()).await;
685
686 let filter = RequestFilter {
688 correlation_id: Some(correlation_id as i64),
689 ..Default::default()
690 };
691 let results = repository.query(filter).await.unwrap();
692
693 assert_eq!(results.len(), 1);
694 let pair = &results[0];
695
696 assert_eq!(pair.request.correlation_id, correlation_id as i64);
697 assert_eq!(pair.request.method, "POST");
698 assert_eq!(pair.request.uri, "/api/users");
699
700 match &pair.request.body {
702 Some(Ok(parsed_body)) => {
703 assert_eq!(
704 *parsed_body,
705 TestRequest {
706 user_id: 123,
707 action: "create_user".to_string(),
708 }
709 );
710 }
711 _ => panic!("Expected successfully parsed request body"),
712 }
713
714 let headers_value = &pair.request.headers;
716 assert!(headers_value.get("content-type").is_some());
717 assert!(headers_value.get("user-agent").is_some());
718
719 assert!(pair.response.is_none());
721 }
722
723 #[sqlx::test]
724 async fn test_handle_response_with_typed_body(pool: PgPool) {
725 crate::migrator().run(&pool).await.unwrap();
727
728 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
729 .await
730 .unwrap();
731 let repository = handler.repository();
732
733 let mut request_data = create_test_request_data();
734 let mut response_data = create_test_response_data();
735 let correlation_id = 54321;
736 request_data.correlation_id = correlation_id;
737 response_data.correlation_id = correlation_id;
738
739 handler.handle_request(request_data.clone()).await;
741 handler
742 .handle_response(request_data, response_data.clone())
743 .await;
744
745 let filter = RequestFilter {
747 correlation_id: Some(correlation_id as i64),
748 ..Default::default()
749 };
750 let results = repository.query(filter).await.unwrap();
751
752 assert_eq!(results.len(), 1);
753 let pair = &results[0];
754
755 let response = pair.response.as_ref().expect("Response should be present");
757 assert_eq!(response.correlation_id, correlation_id as i64);
758 assert_eq!(response.status_code, 201);
759 assert_eq!(response.duration_ms, 150);
760
761 match &response.body {
763 Some(Ok(parsed_body)) => {
764 assert_eq!(
765 *parsed_body,
766 TestResponse {
767 success: true,
768 message: "User created successfully".to_string(),
769 }
770 );
771 }
772 _ => panic!("Expected successfully parsed response body"),
773 }
774 }
775
776 #[sqlx::test]
777 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
778 crate::migrator().run(&pool).await.unwrap();
780
781 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
782 .await
783 .unwrap();
784 let repository = handler.repository();
785
786 let mut headers = HashMap::new();
788 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
789
790 let invalid_json_body = b"not valid json for TestRequest";
791 let correlation_id = 99999;
792 let request_data = RequestData {
793 method: http::Method::POST,
794 uri: http::Uri::from_static("/api/test"),
795 headers,
796 body: Some(Bytes::from(invalid_json_body.to_vec())),
797 timestamp: SystemTime::now(),
798 correlation_id,
799 };
800
801 handler.handle_request(request_data).await;
802
803 let filter = RequestFilter {
805 correlation_id: Some(correlation_id as i64),
806 ..Default::default()
807 };
808 let results = repository.query(filter).await.unwrap();
809
810 assert_eq!(results.len(), 1);
811 let pair = &results[0];
812
813 match &pair.request.body {
815 Some(Err(raw_bytes)) => {
816 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
817 }
818 _ => panic!("Expected raw bytes fallback for unparseable body"),
819 }
820 }
821
822 #[sqlx::test]
823 async fn test_query_with_multiple_filters(pool: PgPool) {
824 crate::migrator().run(&pool).await.unwrap();
826
827 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
828 .await
829 .unwrap();
830 let repository = handler.repository();
831
832 let test_cases = vec![
834 (1001, "GET", "/api/users", 200, 100),
835 (1002, "POST", "/api/users", 201, 150),
836 (1003, "GET", "/api/orders", 404, 50),
837 (1004, "PUT", "/api/users/123", 200, 300),
838 ];
839
840 for (correlation_id, method, uri, status, duration_ms) in test_cases {
841 let mut headers = HashMap::new();
842 headers.insert("content-type".to_string(), vec!["application/json".into()]);
843
844 let request_data = RequestData {
845 method: method.parse().unwrap(),
846 uri: uri.parse().unwrap(),
847 headers: headers.clone(),
848 body: Some(Bytes::from(b"{}".to_vec())),
849 timestamp: SystemTime::now(),
850 correlation_id,
851 };
852
853 let response_data = ResponseData {
854 correlation_id,
855 status: http::StatusCode::from_u16(status).unwrap(),
856 headers,
857 body: Some(Bytes::from(b"{}".to_vec())),
858 timestamp: SystemTime::now(),
859 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
860 duration: Duration::from_millis(duration_ms),
861 };
862
863 handler.handle_request(request_data.clone()).await;
864 handler.handle_response(request_data, response_data).await;
865 }
866
867 let filter = RequestFilter {
869 method: Some("GET".to_string()),
870 ..Default::default()
871 };
872 let results = repository.query(filter).await.unwrap();
873 assert_eq!(results.len(), 2); let filter = RequestFilter {
877 status_code: Some(200),
878 ..Default::default()
879 };
880 let results = repository.query(filter).await.unwrap();
881 assert_eq!(results.len(), 2); let filter = RequestFilter {
885 uri_pattern: Some("/api/users%".to_string()),
886 ..Default::default()
887 };
888 let results = repository.query(filter).await.unwrap();
889 assert_eq!(results.len(), 3); let filter = RequestFilter {
893 min_duration_ms: Some(100),
894 max_duration_ms: Some(200),
895 ..Default::default()
896 };
897 let results = repository.query(filter).await.unwrap();
898 assert_eq!(results.len(), 2); let filter = RequestFilter {
902 method: Some("GET".to_string()),
903 status_code: Some(200),
904 ..Default::default()
905 };
906 let results = repository.query(filter).await.unwrap();
907 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
909 }
910
911 #[sqlx::test]
912 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
913 crate::migrator().run(&pool).await.unwrap();
915
916 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
917 .await
918 .unwrap();
919 let repository = handler.repository();
920
921 let now = SystemTime::now();
923 for i in 0..5 {
924 let correlation_id = 2000 + i;
925 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
928 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
929
930 let request_data = RequestData {
931 method: http::Method::GET,
932 uri: "/api/test".parse().unwrap(),
933 headers,
934 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
935 timestamp,
936 correlation_id,
937 };
938
939 handler.handle_request(request_data).await;
940 }
941
942 let filter = RequestFilter {
944 limit: Some(3),
945 ..Default::default()
946 };
947 let results = repository.query(filter).await.unwrap();
948 assert_eq!(results.len(), 3);
949
950 for i in 0..2 {
952 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
953 }
954
955 let filter = RequestFilter {
957 order_by_timestamp_desc: true,
958 limit: Some(2),
959 offset: Some(1),
960 ..Default::default()
961 };
962 let results = repository.query(filter).await.unwrap();
963 assert_eq!(results.len(), 2);
964
965 assert!(results[0].request.timestamp >= results[1].request.timestamp);
967 }
968
969 #[sqlx::test]
970 async fn test_headers_conversion(pool: PgPool) {
971 crate::migrator().run(&pool).await.unwrap();
973
974 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
975 .await
976 .unwrap();
977 let repository = handler.repository();
978
979 let mut headers = HashMap::new();
981 headers.insert("single-value".to_string(), vec!["test".into()]);
982 headers.insert(
983 "multi-value".to_string(),
984 vec!["val1".into(), "val2".into()],
985 );
986 headers.insert("empty-value".to_string(), vec!["".into()]);
987
988 let request_data = RequestData {
989 correlation_id: 3000,
990 method: http::Method::GET,
991 uri: "/test".parse().unwrap(),
992 headers,
993 body: None,
994 timestamp: SystemTime::now(),
995 };
996
997 let correlation_id = 3000;
998 handler.handle_request(request_data).await;
999
1000 let filter = RequestFilter {
1001 correlation_id: Some(correlation_id as i64),
1002 ..Default::default()
1003 };
1004 let results = repository.query(filter).await.unwrap();
1005
1006 assert_eq!(results.len(), 1);
1007 let headers_json = &results[0].request.headers;
1008
1009 assert_eq!(
1011 headers_json["single-value"],
1012 Value::String("test".to_string())
1013 );
1014
1015 match &headers_json["multi-value"] {
1017 Value::Array(arr) => {
1018 assert_eq!(arr.len(), 2);
1019 assert_eq!(arr[0], Value::String("val1".to_string()));
1020 assert_eq!(arr[1], Value::String("val2".to_string()));
1021 }
1022 _ => panic!("Expected array for multi-value header"),
1023 }
1024
1025 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1027 }
1028
1029 #[sqlx::test]
1030 async fn test_timestamp_filtering(pool: PgPool) {
1031 crate::migrator().run(&pool).await.unwrap();
1033
1034 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1035 .await
1036 .unwrap();
1037 let repository = handler.repository();
1038
1039 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
1043 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
1047
1048 for (i, timestamp) in times.iter().enumerate() {
1049 let correlation_id = 4001 + i as u64;
1050 let request_data = RequestData {
1051 method: http::Method::GET,
1052 uri: "/test".parse().unwrap(),
1053 headers: HashMap::new(),
1054 body: None,
1055 timestamp: *timestamp,
1056 correlation_id,
1057 };
1058
1059 handler.handle_request(request_data).await;
1060 }
1061
1062 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
1065 timestamp_after: Some(after_time),
1066 ..Default::default()
1067 };
1068 let results = repository.query(filter).await.unwrap();
1069 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
1074 timestamp_before: Some(before_time),
1075 ..Default::default()
1076 };
1077 let results = repository.query(filter).await.unwrap();
1078 assert_eq!(results.len(), 2); let filter = RequestFilter {
1082 timestamp_after: Some(after_time),
1083 timestamp_before: Some(before_time),
1084 ..Default::default()
1085 };
1086 let results = repository.query(filter).await.unwrap();
1087 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1089 }
1090
1091 #[sqlx::test]
1092 async fn test_path_filtering_allowed_prefix(pool: PgPool) {
1093 crate::migrator().run(&pool).await.unwrap();
1095
1096 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1097 .await
1098 .unwrap()
1099 .with_path_prefix("/api/");
1100 let repository = handler.repository();
1101
1102 let test_cases = vec![
1104 ("/api/users", 1001, true), ("/api/orders", 1002, true), ("/health", 1003, false), ("/metrics", 1004, false), ("/api/health", 1005, true), ];
1110
1111 for (uri, correlation_id, _should_log) in &test_cases {
1112 let mut headers = HashMap::new();
1113 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1114
1115 let request_data = RequestData {
1116 method: http::Method::GET,
1117 uri: uri.parse().unwrap(),
1118 headers: headers.clone(),
1119 body: Some(Bytes::from(b"{}".to_vec())),
1120 timestamp: SystemTime::now(),
1121 correlation_id: *correlation_id,
1122 };
1123
1124 let response_data = ResponseData {
1125 correlation_id: *correlation_id,
1126 status: http::StatusCode::OK,
1127 headers,
1128 body: Some(Bytes::from(b"{}".to_vec())),
1129 timestamp: SystemTime::now(),
1130 duration_to_first_byte: Duration::from_millis(80),
1131 duration: Duration::from_millis(100),
1132 };
1133
1134 handler.handle_request(request_data.clone()).await;
1135 handler.handle_response(request_data, response_data).await;
1136 }
1137
1138 let filter = RequestFilter::default();
1140 let results = repository.query(filter).await.unwrap();
1141
1142 assert_eq!(results.len(), 3);
1144 let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1145 assert!(logged_ids.contains(&1001));
1146 assert!(logged_ids.contains(&1002));
1147 assert!(logged_ids.contains(&1005));
1148 assert!(!logged_ids.contains(&1003));
1149 assert!(!logged_ids.contains(&1004));
1150
1151 for result in &results {
1153 assert!(result.response.is_some());
1154 }
1155 }
1156
1157 #[sqlx::test]
1158 async fn test_path_filtering_blocked_prefix(pool: PgPool) {
1159 crate::migrator().run(&pool).await.unwrap();
1161
1162 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1163 .await
1164 .unwrap()
1165 .with_path_filter(PathFilter {
1166 allowed_prefixes: vec!["/api/".to_string()],
1167 blocked_prefixes: vec!["/api/health".to_string(), "/api/metrics".to_string()],
1168 });
1169 let repository = handler.repository();
1170
1171 let test_cases = vec![
1173 ("http://localhost/api/users", 2001, true), ("http://localhost/api/health", 2002, false), ("http://localhost/api/metrics", 2003, false), ("http://localhost/api/orders", 2004, true), ("http://localhost/health", 2005, false), ];
1179
1180 for (uri, correlation_id, _should_log) in &test_cases {
1181 let mut headers = HashMap::new();
1182 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1183
1184 let request_data = RequestData {
1185 method: http::Method::GET,
1186 uri: uri.parse().unwrap(),
1187 headers: headers.clone(),
1188 body: Some(Bytes::from(b"{}".to_vec())),
1189 timestamp: SystemTime::now(),
1190 correlation_id: *correlation_id,
1191 };
1192
1193 let response_data = ResponseData {
1194 correlation_id: *correlation_id,
1195 status: http::StatusCode::OK,
1196 headers,
1197 body: Some(Bytes::from(b"{}".to_vec())),
1198 timestamp: SystemTime::now(),
1199 duration_to_first_byte: Duration::from_millis(80),
1200 duration: Duration::from_millis(100),
1201 };
1202
1203 handler.handle_request(request_data.clone()).await;
1204 handler.handle_response(request_data, response_data).await;
1205 }
1206
1207 let filter = RequestFilter::default();
1209 let results = repository.query(filter).await.unwrap();
1210
1211 assert_eq!(results.len(), 2);
1212 let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1213 assert!(logged_ids.contains(&2001));
1214 assert!(logged_ids.contains(&2004));
1215 assert!(!logged_ids.contains(&2002)); assert!(!logged_ids.contains(&2003)); assert!(!logged_ids.contains(&2005)); }
1219
1220 #[sqlx::test]
1221 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1222 crate::migrator().run(&pool).await.unwrap();
1224
1225 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1227 .await
1228 .unwrap();
1229 let repository = handler.repository();
1230
1231 let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1232 for (i, uri) in test_uris.iter().enumerate() {
1233 let correlation_id = 3000 + i as u64;
1234 let mut headers = HashMap::new();
1235 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1236
1237 let request_data = RequestData {
1238 method: http::Method::GET,
1239 uri: uri.parse().unwrap(),
1240 headers,
1241 body: Some(Bytes::from(b"{}".to_vec())),
1242 timestamp: SystemTime::now(),
1243 correlation_id,
1244 };
1245
1246 handler.handle_request(request_data).await;
1247 }
1248
1249 let filter = RequestFilter::default();
1251 let results = repository.query(filter).await.unwrap();
1252 assert_eq!(results.len(), 4);
1253 }
1254}