1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use http::Uri;
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use tracing::{debug, error, instrument};
93use uuid::Uuid;
94
95pub mod error;
96pub mod repository;
97pub use error::PostgresHandlerError;
98pub use repository::{
99 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
100};
101
102pub fn migrator() -> sqlx::migrate::Migrator {
126 sqlx::migrate!("./migrations")
127}
128
129type RequestSerializer<T> =
134 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
135
136type ResponseSerializer<T> = Arc<
142 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
143 + Send
144 + Sync,
145>;
146
147#[derive(Clone)]
156pub struct PostgresHandler<TReq = Value, TRes = Value>
157where
158 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160{
161 pool: PgPool,
162 request_serializer: RequestSerializer<TReq>,
163 response_serializer: ResponseSerializer<TRes>,
164 path_filter: Option<PathFilter>,
165 instance_id: Uuid,
166}
167
168#[derive(Clone, Debug)]
170pub struct PathFilter {
171 pub allowed_prefixes: Vec<String>,
174 pub blocked_prefixes: Vec<String>,
177}
178
179impl<TReq, TRes> PostgresHandler<TReq, TRes>
180where
181 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
182 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
183{
184 fn default_request_serializer() -> RequestSerializer<TReq> {
187 Arc::new(|request_data| {
188 let bytes = request_data.body.as_deref().unwrap_or(&[]);
189 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
190 let fallback_data = String::from_utf8_lossy(bytes).to_string();
191 SerializationError::new(fallback_data, error)
192 })
193 })
194 }
195
196 fn default_response_serializer() -> ResponseSerializer<TRes> {
199 Arc::new(|_request_data, response_data| {
200 let bytes = response_data.body.as_deref().unwrap_or(&[]);
201 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
202 let fallback_data = String::from_utf8_lossy(bytes).to_string();
203 SerializationError::new(fallback_data, error)
204 })
205 })
206 }
207
208 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
242 let pool = PgPool::connect(database_url)
243 .await
244 .map_err(PostgresHandlerError::Connection)?;
245
246 Ok(Self {
247 pool,
248 request_serializer: Self::default_request_serializer(),
249 response_serializer: Self::default_response_serializer(),
250 path_filter: None,
251 instance_id: Uuid::new_v4(),
252 })
253 }
254
255 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
267 where
268 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
269 {
270 self.request_serializer = Arc::new(serializer);
271 self
272 }
273
274 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
286 where
287 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
288 + Send
289 + Sync
290 + 'static,
291 {
292 self.response_serializer = Arc::new(serializer);
293 self
294 }
295
296 pub fn with_path_filter(mut self, filter: PathFilter) -> Self {
315 self.path_filter = Some(filter);
316 self
317 }
318
319 pub fn with_path_prefix(mut self, prefix: &str) -> Self {
335 self.path_filter = Some(PathFilter {
336 allowed_prefixes: vec![prefix.to_string()],
337 blocked_prefixes: vec![],
338 });
339 self
340 }
341
342 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
377 Ok(Self {
378 pool,
379 request_serializer: Self::default_request_serializer(),
380 response_serializer: Self::default_response_serializer(),
381 path_filter: None,
382 instance_id: Uuid::new_v4(),
383 })
384 }
385
386 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
388 let mut header_map = HashMap::new();
389 for (name, values) in headers {
390 if values.len() == 1 {
391 let value_str = String::from_utf8_lossy(&values[0]).to_string();
392 header_map.insert(name.clone(), Value::String(value_str));
393 } else {
394 let value_array: Vec<Value> = values
395 .iter()
396 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
397 .collect();
398 header_map.insert(name.clone(), Value::Array(value_array));
399 }
400 }
401 serde_json::to_value(header_map).unwrap_or(Value::Null)
402 }
403
404 fn request_body_to_json_with_fallback(
406 &self,
407 request_data: &outlet::RequestData,
408 ) -> (Value, bool) {
409 match (self.request_serializer)(request_data) {
410 Ok(typed_value) => {
411 if let Ok(json_value) = serde_json::to_value(&typed_value) {
412 (json_value, true)
413 } else {
414 (
416 Value::String(
417 serde_json::to_string(&typed_value)
418 .expect("Serialized value must be convertible to JSON string"),
419 ),
420 false,
421 )
422 }
423 }
424 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
425 }
426 }
427
428 fn response_body_to_json_with_fallback(
430 &self,
431 request_data: &outlet::RequestData,
432 response_data: &outlet::ResponseData,
433 ) -> (Value, bool) {
434 match (self.response_serializer)(request_data, response_data) {
435 Ok(typed_value) => {
436 if let Ok(json_value) = serde_json::to_value(&typed_value) {
437 (json_value, true)
438 } else {
439 (
441 Value::String(
442 serde_json::to_string(&typed_value)
443 .expect("Serialized value must be convertible to JSON string"),
444 ),
445 false,
446 )
447 }
448 }
449 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
450 }
451 }
452
453 fn should_log_request(&self, uri: &Uri) -> bool {
455 let path = uri.path();
456 debug!(%path, "Evaluating prefix");
457 let Some(filter) = &self.path_filter else {
458 return true; };
460
461 for blocked_prefix in &filter.blocked_prefixes {
463 if path.starts_with(blocked_prefix) {
464 return false;
465 }
466 }
467
468 if filter.allowed_prefixes.is_empty() {
470 return true;
471 }
472
473 filter
475 .allowed_prefixes
476 .iter()
477 .any(|prefix| path.starts_with(prefix))
478 }
479
480 pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
485 crate::repository::RequestRepository::new(self.pool.clone())
486 }
487}
488
489impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
490where
491 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
492 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
493{
494 #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
495 async fn handle_request(&self, data: RequestData) {
496 if !self.should_log_request(&data.uri) {
498 debug!(correlation_id = %data.correlation_id, uri = %data.uri, "Skipping request due to path filter");
499 return;
500 }
501
502 let headers_json = Self::headers_to_json(&data.headers);
503 let (body_json, parsed) = if data.body.is_some() {
504 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
505 (Some(json), parsed)
506 } else {
507 (None, false)
508 };
509
510 let timestamp: DateTime<Utc> = data.timestamp.into();
511
512 let result = sqlx::query(
513 r#"
514 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
515 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
516 "#,
517 )
518 .bind(self.instance_id)
519 .bind(data.correlation_id as i64)
520 .bind(timestamp)
521 .bind(data.method.to_string())
522 .bind(data.uri.to_string())
523 .bind(headers_json)
524 .bind(body_json)
525 .bind(parsed)
526 .execute(&self.pool)
527 .await;
528
529 if let Err(e) = result {
530 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
531 } else {
532 debug!(correlation_id = %data.correlation_id, "Request data inserted successfully");
533 }
534 }
535
536 #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
537 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
538 if !self.should_log_request(&request_data.uri) {
539 debug!(correlation_id = %request_data.correlation_id, uri = %request_data.uri, "Skipping response due to path filter");
540 return;
541 }
542 let headers_json = Self::headers_to_json(&response_data.headers);
543 let (body_json, parsed) = if response_data.body.is_some() {
544 let (json, parsed) =
545 self.response_body_to_json_with_fallback(&request_data, &response_data);
546 (Some(json), parsed)
547 } else {
548 (None, false)
549 };
550
551 let timestamp: DateTime<Utc> = response_data.timestamp.into();
552 let duration_ms = response_data.duration.as_millis() as i64;
553 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
554
555 let result = sqlx::query(
556 r#"
557 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
558 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
559 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
560 "#,
561 )
562 .bind(self.instance_id)
563 .bind(request_data.correlation_id as i64)
564 .bind(timestamp)
565 .bind(response_data.status.as_u16() as i32)
566 .bind(headers_json)
567 .bind(body_json)
568 .bind(parsed)
569 .bind(duration_to_first_byte_ms)
570 .bind(duration_ms)
571 .execute(&self.pool)
572 .await;
573
574 match result {
575 Err(e) => {
576 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
577 }
578 Ok(query_result) => {
579 if query_result.rows_affected() > 0 {
580 debug!(correlation_id = %request_data.correlation_id, "Response data inserted successfully");
581 } else {
582 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
583 }
584 }
585 }
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use bytes::Bytes;
593 use chrono::{DateTime, Utc};
594 use outlet::{RequestData, ResponseData};
595 use serde::{Deserialize, Serialize};
596 use serde_json::Value;
597 use sqlx::PgPool;
598 use std::collections::HashMap;
599 use std::time::{Duration, SystemTime};
600
601 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
602 struct TestRequest {
603 user_id: u64,
604 action: String,
605 }
606
607 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
608 struct TestResponse {
609 success: bool,
610 message: String,
611 }
612
613 fn create_test_request_data() -> RequestData {
614 let mut headers = HashMap::new();
615 headers.insert("content-type".to_string(), vec!["application/json".into()]);
616 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
617
618 let test_req = TestRequest {
619 user_id: 123,
620 action: "create_user".to_string(),
621 };
622 let body = serde_json::to_vec(&test_req).unwrap();
623
624 RequestData {
625 method: http::Method::POST,
626 uri: http::Uri::from_static("/api/users"),
627 headers,
628 body: Some(Bytes::from(body)),
629 timestamp: SystemTime::now(),
630 correlation_id: 0,
631 }
632 }
633
634 fn create_test_response_data() -> ResponseData {
635 let mut headers = HashMap::new();
636 headers.insert("content-type".to_string(), vec!["application/json".into()]);
637
638 let test_res = TestResponse {
639 success: true,
640 message: "User created successfully".to_string(),
641 };
642 let body = serde_json::to_vec(&test_res).unwrap();
643
644 ResponseData {
645 status: http::StatusCode::CREATED,
646 headers,
647 body: Some(Bytes::from(body)),
648 timestamp: SystemTime::now(),
649 duration_to_first_byte: Duration::from_millis(100),
650 duration: Duration::from_millis(150),
651 correlation_id: 0,
652 }
653 }
654
655 #[sqlx::test]
656 async fn test_handler_creation(pool: PgPool) {
657 crate::migrator().run(&pool).await.unwrap();
659
660 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
661 .await
662 .unwrap();
663
664 let repository = handler.repository();
666
667 let filter = RequestFilter::default();
669 let results = repository.query(filter).await.unwrap();
670 assert!(results.is_empty());
671 }
672
673 #[sqlx::test]
674 async fn test_handle_request_with_typed_body(pool: PgPool) {
675 crate::migrator().run(&pool).await.unwrap();
677
678 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
679 .await
680 .unwrap();
681 let repository = handler.repository();
682
683 let mut request_data = create_test_request_data();
684 let correlation_id = 12345;
685 request_data.correlation_id = correlation_id;
686
687 handler.handle_request(request_data.clone()).await;
689
690 let filter = RequestFilter {
692 correlation_id: Some(correlation_id as i64),
693 ..Default::default()
694 };
695 let results = repository.query(filter).await.unwrap();
696
697 assert_eq!(results.len(), 1);
698 let pair = &results[0];
699
700 assert_eq!(pair.request.correlation_id, correlation_id as i64);
701 assert_eq!(pair.request.method, "POST");
702 assert_eq!(pair.request.uri, "/api/users");
703
704 match &pair.request.body {
706 Some(Ok(parsed_body)) => {
707 assert_eq!(
708 *parsed_body,
709 TestRequest {
710 user_id: 123,
711 action: "create_user".to_string(),
712 }
713 );
714 }
715 _ => panic!("Expected successfully parsed request body"),
716 }
717
718 let headers_value = &pair.request.headers;
720 assert!(headers_value.get("content-type").is_some());
721 assert!(headers_value.get("user-agent").is_some());
722
723 assert!(pair.response.is_none());
725 }
726
727 #[sqlx::test]
728 async fn test_handle_response_with_typed_body(pool: PgPool) {
729 crate::migrator().run(&pool).await.unwrap();
731
732 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
733 .await
734 .unwrap();
735 let repository = handler.repository();
736
737 let mut request_data = create_test_request_data();
738 let mut response_data = create_test_response_data();
739 let correlation_id = 54321;
740 request_data.correlation_id = correlation_id;
741 response_data.correlation_id = correlation_id;
742
743 handler.handle_request(request_data.clone()).await;
745 handler
746 .handle_response(request_data, response_data.clone())
747 .await;
748
749 let filter = RequestFilter {
751 correlation_id: Some(correlation_id as i64),
752 ..Default::default()
753 };
754 let results = repository.query(filter).await.unwrap();
755
756 assert_eq!(results.len(), 1);
757 let pair = &results[0];
758
759 let response = pair.response.as_ref().expect("Response should be present");
761 assert_eq!(response.correlation_id, correlation_id as i64);
762 assert_eq!(response.status_code, 201);
763 assert_eq!(response.duration_ms, 150);
764
765 match &response.body {
767 Some(Ok(parsed_body)) => {
768 assert_eq!(
769 *parsed_body,
770 TestResponse {
771 success: true,
772 message: "User created successfully".to_string(),
773 }
774 );
775 }
776 _ => panic!("Expected successfully parsed response body"),
777 }
778 }
779
780 #[sqlx::test]
781 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
782 crate::migrator().run(&pool).await.unwrap();
784
785 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
786 .await
787 .unwrap();
788 let repository = handler.repository();
789
790 let mut headers = HashMap::new();
792 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
793
794 let invalid_json_body = b"not valid json for TestRequest";
795 let correlation_id = 99999;
796 let request_data = RequestData {
797 method: http::Method::POST,
798 uri: http::Uri::from_static("/api/test"),
799 headers,
800 body: Some(Bytes::from(invalid_json_body.to_vec())),
801 timestamp: SystemTime::now(),
802 correlation_id,
803 };
804
805 handler.handle_request(request_data).await;
806
807 let filter = RequestFilter {
809 correlation_id: Some(correlation_id as i64),
810 ..Default::default()
811 };
812 let results = repository.query(filter).await.unwrap();
813
814 assert_eq!(results.len(), 1);
815 let pair = &results[0];
816
817 match &pair.request.body {
819 Some(Err(raw_bytes)) => {
820 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
821 }
822 _ => panic!("Expected raw bytes fallback for unparseable body"),
823 }
824 }
825
826 #[sqlx::test]
827 async fn test_query_with_multiple_filters(pool: PgPool) {
828 crate::migrator().run(&pool).await.unwrap();
830
831 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
832 .await
833 .unwrap();
834 let repository = handler.repository();
835
836 let test_cases = vec![
838 (1001, "GET", "/api/users", 200, 100),
839 (1002, "POST", "/api/users", 201, 150),
840 (1003, "GET", "/api/orders", 404, 50),
841 (1004, "PUT", "/api/users/123", 200, 300),
842 ];
843
844 for (correlation_id, method, uri, status, duration_ms) in test_cases {
845 let mut headers = HashMap::new();
846 headers.insert("content-type".to_string(), vec!["application/json".into()]);
847
848 let request_data = RequestData {
849 method: method.parse().unwrap(),
850 uri: uri.parse().unwrap(),
851 headers: headers.clone(),
852 body: Some(Bytes::from(b"{}".to_vec())),
853 timestamp: SystemTime::now(),
854 correlation_id,
855 };
856
857 let response_data = ResponseData {
858 correlation_id,
859 status: http::StatusCode::from_u16(status).unwrap(),
860 headers,
861 body: Some(Bytes::from(b"{}".to_vec())),
862 timestamp: SystemTime::now(),
863 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
864 duration: Duration::from_millis(duration_ms),
865 };
866
867 handler.handle_request(request_data.clone()).await;
868 handler.handle_response(request_data, response_data).await;
869 }
870
871 let filter = RequestFilter {
873 method: Some("GET".to_string()),
874 ..Default::default()
875 };
876 let results = repository.query(filter).await.unwrap();
877 assert_eq!(results.len(), 2); let filter = RequestFilter {
881 status_code: Some(200),
882 ..Default::default()
883 };
884 let results = repository.query(filter).await.unwrap();
885 assert_eq!(results.len(), 2); let filter = RequestFilter {
889 uri_pattern: Some("/api/users%".to_string()),
890 ..Default::default()
891 };
892 let results = repository.query(filter).await.unwrap();
893 assert_eq!(results.len(), 3); let filter = RequestFilter {
897 min_duration_ms: Some(100),
898 max_duration_ms: Some(200),
899 ..Default::default()
900 };
901 let results = repository.query(filter).await.unwrap();
902 assert_eq!(results.len(), 2); let filter = RequestFilter {
906 method: Some("GET".to_string()),
907 status_code: Some(200),
908 ..Default::default()
909 };
910 let results = repository.query(filter).await.unwrap();
911 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
913 }
914
915 #[sqlx::test]
916 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
917 crate::migrator().run(&pool).await.unwrap();
919
920 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
921 .await
922 .unwrap();
923 let repository = handler.repository();
924
925 let now = SystemTime::now();
927 for i in 0..5 {
928 let correlation_id = 2000 + i;
929 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
932 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
933
934 let request_data = RequestData {
935 method: http::Method::GET,
936 uri: "/api/test".parse().unwrap(),
937 headers,
938 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
939 timestamp,
940 correlation_id,
941 };
942
943 handler.handle_request(request_data).await;
944 }
945
946 let filter = RequestFilter {
948 limit: Some(3),
949 ..Default::default()
950 };
951 let results = repository.query(filter).await.unwrap();
952 assert_eq!(results.len(), 3);
953
954 for i in 0..2 {
956 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
957 }
958
959 let filter = RequestFilter {
961 order_by_timestamp_desc: true,
962 limit: Some(2),
963 offset: Some(1),
964 ..Default::default()
965 };
966 let results = repository.query(filter).await.unwrap();
967 assert_eq!(results.len(), 2);
968
969 assert!(results[0].request.timestamp >= results[1].request.timestamp);
971 }
972
973 #[sqlx::test]
974 async fn test_headers_conversion(pool: PgPool) {
975 crate::migrator().run(&pool).await.unwrap();
977
978 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
979 .await
980 .unwrap();
981 let repository = handler.repository();
982
983 let mut headers = HashMap::new();
985 headers.insert("single-value".to_string(), vec!["test".into()]);
986 headers.insert(
987 "multi-value".to_string(),
988 vec!["val1".into(), "val2".into()],
989 );
990 headers.insert("empty-value".to_string(), vec!["".into()]);
991
992 let request_data = RequestData {
993 correlation_id: 3000,
994 method: http::Method::GET,
995 uri: "/test".parse().unwrap(),
996 headers,
997 body: None,
998 timestamp: SystemTime::now(),
999 };
1000
1001 let correlation_id = 3000;
1002 handler.handle_request(request_data).await;
1003
1004 let filter = RequestFilter {
1005 correlation_id: Some(correlation_id as i64),
1006 ..Default::default()
1007 };
1008 let results = repository.query(filter).await.unwrap();
1009
1010 assert_eq!(results.len(), 1);
1011 let headers_json = &results[0].request.headers;
1012
1013 assert_eq!(
1015 headers_json["single-value"],
1016 Value::String("test".to_string())
1017 );
1018
1019 match &headers_json["multi-value"] {
1021 Value::Array(arr) => {
1022 assert_eq!(arr.len(), 2);
1023 assert_eq!(arr[0], Value::String("val1".to_string()));
1024 assert_eq!(arr[1], Value::String("val2".to_string()));
1025 }
1026 _ => panic!("Expected array for multi-value header"),
1027 }
1028
1029 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1031 }
1032
1033 #[sqlx::test]
1034 async fn test_timestamp_filtering(pool: PgPool) {
1035 crate::migrator().run(&pool).await.unwrap();
1037
1038 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1039 .await
1040 .unwrap();
1041 let repository = handler.repository();
1042
1043 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
1047 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
1051
1052 for (i, timestamp) in times.iter().enumerate() {
1053 let correlation_id = 4001 + i as u64;
1054 let request_data = RequestData {
1055 method: http::Method::GET,
1056 uri: "/test".parse().unwrap(),
1057 headers: HashMap::new(),
1058 body: None,
1059 timestamp: *timestamp,
1060 correlation_id,
1061 };
1062
1063 handler.handle_request(request_data).await;
1064 }
1065
1066 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
1069 timestamp_after: Some(after_time),
1070 ..Default::default()
1071 };
1072 let results = repository.query(filter).await.unwrap();
1073 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
1078 timestamp_before: Some(before_time),
1079 ..Default::default()
1080 };
1081 let results = repository.query(filter).await.unwrap();
1082 assert_eq!(results.len(), 2); let filter = RequestFilter {
1086 timestamp_after: Some(after_time),
1087 timestamp_before: Some(before_time),
1088 ..Default::default()
1089 };
1090 let results = repository.query(filter).await.unwrap();
1091 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1093 }
1094
1095 #[sqlx::test]
1096 async fn test_path_filtering_allowed_prefix(pool: PgPool) {
1097 crate::migrator().run(&pool).await.unwrap();
1099
1100 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1101 .await
1102 .unwrap()
1103 .with_path_prefix("/api/");
1104 let repository = handler.repository();
1105
1106 let test_cases = vec![
1108 ("/api/users", 1001, true), ("/api/orders", 1002, true), ("/health", 1003, false), ("/metrics", 1004, false), ("/api/health", 1005, true), ];
1114
1115 for (uri, correlation_id, _should_log) in &test_cases {
1116 let mut headers = HashMap::new();
1117 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1118
1119 let request_data = RequestData {
1120 method: http::Method::GET,
1121 uri: uri.parse().unwrap(),
1122 headers: headers.clone(),
1123 body: Some(Bytes::from(b"{}".to_vec())),
1124 timestamp: SystemTime::now(),
1125 correlation_id: *correlation_id,
1126 };
1127
1128 let response_data = ResponseData {
1129 correlation_id: *correlation_id,
1130 status: http::StatusCode::OK,
1131 headers,
1132 body: Some(Bytes::from(b"{}".to_vec())),
1133 timestamp: SystemTime::now(),
1134 duration_to_first_byte: Duration::from_millis(80),
1135 duration: Duration::from_millis(100),
1136 };
1137
1138 handler.handle_request(request_data.clone()).await;
1139 handler.handle_response(request_data, response_data).await;
1140 }
1141
1142 let filter = RequestFilter::default();
1144 let results = repository.query(filter).await.unwrap();
1145
1146 assert_eq!(results.len(), 3);
1148 let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1149 assert!(logged_ids.contains(&1001));
1150 assert!(logged_ids.contains(&1002));
1151 assert!(logged_ids.contains(&1005));
1152 assert!(!logged_ids.contains(&1003));
1153 assert!(!logged_ids.contains(&1004));
1154
1155 for result in &results {
1157 assert!(result.response.is_some());
1158 }
1159 }
1160
1161 #[sqlx::test]
1162 async fn test_path_filtering_blocked_prefix(pool: PgPool) {
1163 crate::migrator().run(&pool).await.unwrap();
1165
1166 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1167 .await
1168 .unwrap()
1169 .with_path_filter(PathFilter {
1170 allowed_prefixes: vec!["/api/".to_string()],
1171 blocked_prefixes: vec!["/api/health".to_string(), "/api/metrics".to_string()],
1172 });
1173 let repository = handler.repository();
1174
1175 let test_cases = vec![
1177 ("http://localhost/api/users", 2001, true), ("http://localhost/api/health", 2002, false), ("http://localhost/api/metrics", 2003, false), ("http://localhost/api/orders", 2004, true), ("http://localhost/health", 2005, false), ];
1183
1184 for (uri, correlation_id, _should_log) in &test_cases {
1185 let mut headers = HashMap::new();
1186 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1187
1188 let request_data = RequestData {
1189 method: http::Method::GET,
1190 uri: uri.parse().unwrap(),
1191 headers: headers.clone(),
1192 body: Some(Bytes::from(b"{}".to_vec())),
1193 timestamp: SystemTime::now(),
1194 correlation_id: *correlation_id,
1195 };
1196
1197 let response_data = ResponseData {
1198 correlation_id: *correlation_id,
1199 status: http::StatusCode::OK,
1200 headers,
1201 body: Some(Bytes::from(b"{}".to_vec())),
1202 timestamp: SystemTime::now(),
1203 duration_to_first_byte: Duration::from_millis(80),
1204 duration: Duration::from_millis(100),
1205 };
1206
1207 handler.handle_request(request_data.clone()).await;
1208 handler.handle_response(request_data, response_data).await;
1209 }
1210
1211 let filter = RequestFilter::default();
1213 let results = repository.query(filter).await.unwrap();
1214
1215 assert_eq!(results.len(), 2);
1216 let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1217 assert!(logged_ids.contains(&2001));
1218 assert!(logged_ids.contains(&2004));
1219 assert!(!logged_ids.contains(&2002)); assert!(!logged_ids.contains(&2003)); assert!(!logged_ids.contains(&2005)); }
1223
1224 #[sqlx::test]
1225 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1226 crate::migrator().run(&pool).await.unwrap();
1228
1229 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1231 .await
1232 .unwrap();
1233 let repository = handler.repository();
1234
1235 let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1236 for (i, uri) in test_uris.iter().enumerate() {
1237 let correlation_id = 3000 + i as u64;
1238 let mut headers = HashMap::new();
1239 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1240
1241 let request_data = RequestData {
1242 method: http::Method::GET,
1243 uri: uri.parse().unwrap(),
1244 headers,
1245 body: Some(Bytes::from(b"{}".to_vec())),
1246 timestamp: SystemTime::now(),
1247 correlation_id,
1248 };
1249
1250 handler.handle_request(request_data).await;
1251 }
1252
1253 let filter = RequestFilter::default();
1255 let results = repository.query(filter).await.unwrap();
1256 assert_eq!(results.len(), 4);
1257 }
1258}