1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use outlet::{RequestData, RequestHandler, ResponseData};
86use serde::{Deserialize, Serialize};
87use serde_json::Value;
88use sqlx::PgPool;
89use std::collections::HashMap;
90use std::sync::Arc;
91use tracing::{debug, error, instrument};
92use uuid::Uuid;
93
94pub mod error;
95pub mod repository;
96pub use error::PostgresHandlerError;
97pub use repository::{
98 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
99};
100
101pub fn migrator() -> sqlx::migrate::Migrator {
125 sqlx::migrate!("./migrations")
126}
127
128type RequestSerializer<T> =
133 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
134
135type ResponseSerializer<T> = Arc<
141 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
142 + Send
143 + Sync,
144>;
145
146#[derive(Clone)]
155pub struct PostgresHandler<TReq = Value, TRes = Value>
156where
157 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
158 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159{
160 pool: PgPool,
161 request_serializer: RequestSerializer<TReq>,
162 response_serializer: ResponseSerializer<TRes>,
163 instance_id: Uuid,
164}
165
166impl<TReq, TRes> PostgresHandler<TReq, TRes>
167where
168 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
169 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
170{
171 fn default_request_serializer() -> RequestSerializer<TReq> {
174 Arc::new(|request_data| {
175 let bytes = request_data.body.as_deref().unwrap_or(&[]);
176 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
177 let fallback_data = String::from_utf8_lossy(bytes).to_string();
178 SerializationError::new(fallback_data, error)
179 })
180 })
181 }
182
183 fn default_response_serializer() -> ResponseSerializer<TRes> {
186 Arc::new(|_request_data, response_data| {
187 let bytes = response_data.body.as_deref().unwrap_or(&[]);
188 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
189 let fallback_data = String::from_utf8_lossy(bytes).to_string();
190 SerializationError::new(fallback_data, error)
191 })
192 })
193 }
194
195 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
229 let pool = PgPool::connect(database_url)
230 .await
231 .map_err(PostgresHandlerError::Connection)?;
232
233 Ok(Self {
234 pool,
235 request_serializer: Self::default_request_serializer(),
236 response_serializer: Self::default_response_serializer(),
237 instance_id: Uuid::new_v4(),
238 })
239 }
240
241 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
253 where
254 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
255 {
256 self.request_serializer = Arc::new(serializer);
257 self
258 }
259
260 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
272 where
273 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
274 + Send
275 + Sync
276 + 'static,
277 {
278 self.response_serializer = Arc::new(serializer);
279 self
280 }
281
282 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
317 Ok(Self {
318 pool,
319 request_serializer: Self::default_request_serializer(),
320 response_serializer: Self::default_response_serializer(),
321 instance_id: Uuid::new_v4(),
322 })
323 }
324
325 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
327 let mut header_map = HashMap::new();
328 for (name, values) in headers {
329 if values.len() == 1 {
330 let value_str = String::from_utf8_lossy(&values[0]).to_string();
331 header_map.insert(name.clone(), Value::String(value_str));
332 } else {
333 let value_array: Vec<Value> = values
334 .iter()
335 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
336 .collect();
337 header_map.insert(name.clone(), Value::Array(value_array));
338 }
339 }
340 serde_json::to_value(header_map).unwrap_or(Value::Null)
341 }
342
343 fn request_body_to_json_with_fallback(
345 &self,
346 request_data: &outlet::RequestData,
347 ) -> (Value, bool) {
348 match (self.request_serializer)(request_data) {
349 Ok(typed_value) => {
350 if let Ok(json_value) = serde_json::to_value(&typed_value) {
351 (json_value, true)
352 } else {
353 (
355 Value::String(
356 serde_json::to_string(&typed_value)
357 .expect("Serialized value must be convertible to JSON string"),
358 ),
359 false,
360 )
361 }
362 }
363 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
364 }
365 }
366
367 fn response_body_to_json_with_fallback(
369 &self,
370 request_data: &outlet::RequestData,
371 response_data: &outlet::ResponseData,
372 ) -> (Value, bool) {
373 match (self.response_serializer)(request_data, response_data) {
374 Ok(typed_value) => {
375 if let Ok(json_value) = serde_json::to_value(&typed_value) {
376 (json_value, true)
377 } else {
378 (
380 Value::String(
381 serde_json::to_string(&typed_value)
382 .expect("Serialized value must be convertible to JSON string"),
383 ),
384 false,
385 )
386 }
387 }
388 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
389 }
390 }
391
392 pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
397 crate::repository::RequestRepository::new(self.pool.clone())
398 }
399}
400
401impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
402where
403 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
404 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
405{
406 #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
407 async fn handle_request(&self, data: RequestData) {
408 let headers_json = Self::headers_to_json(&data.headers);
409 let (body_json, parsed) = if data.body.is_some() {
410 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
411 (Some(json), parsed)
412 } else {
413 (None, false)
414 };
415
416 let timestamp: DateTime<Utc> = data.timestamp.into();
417
418 let result = sqlx::query(
419 r#"
420 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
421 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
422 "#,
423 )
424 .bind(self.instance_id)
425 .bind(data.correlation_id as i64)
426 .bind(timestamp)
427 .bind(data.method.to_string())
428 .bind(data.uri.to_string())
429 .bind(headers_json)
430 .bind(body_json)
431 .bind(parsed)
432 .execute(&self.pool)
433 .await;
434
435 if let Err(e) = result {
436 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
437 } else {
438 debug!(correlation_id = %data.correlation_id, "Request data inserted successfully");
439 }
440 }
441
442 #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
443 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
444 let headers_json = Self::headers_to_json(&response_data.headers);
445 let (body_json, parsed) = if response_data.body.is_some() {
446 let (json, parsed) =
447 self.response_body_to_json_with_fallback(&request_data, &response_data);
448 (Some(json), parsed)
449 } else {
450 (None, false)
451 };
452
453 let timestamp: DateTime<Utc> = response_data.timestamp.into();
454 let duration_ms = response_data.duration.as_millis() as i64;
455 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
456
457 let result = sqlx::query(
458 r#"
459 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
460 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
461 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
462 "#,
463 )
464 .bind(self.instance_id)
465 .bind(request_data.correlation_id as i64)
466 .bind(timestamp)
467 .bind(response_data.status.as_u16() as i32)
468 .bind(headers_json)
469 .bind(body_json)
470 .bind(parsed)
471 .bind(duration_to_first_byte_ms)
472 .bind(duration_ms)
473 .execute(&self.pool)
474 .await;
475
476 match result {
477 Err(e) => {
478 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
479 }
480 Ok(query_result) => {
481 if query_result.rows_affected() > 0 {
482 debug!(correlation_id = %request_data.correlation_id, "Response data inserted successfully");
483 } else {
484 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
485 }
486 }
487 }
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use bytes::Bytes;
495 use chrono::{DateTime, Utc};
496 use outlet::{RequestData, ResponseData};
497 use serde::{Deserialize, Serialize};
498 use serde_json::Value;
499 use sqlx::PgPool;
500 use std::collections::HashMap;
501 use std::time::{Duration, SystemTime};
502
503 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
504 struct TestRequest {
505 user_id: u64,
506 action: String,
507 }
508
509 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
510 struct TestResponse {
511 success: bool,
512 message: String,
513 }
514
515 fn create_test_request_data() -> RequestData {
516 let mut headers = HashMap::new();
517 headers.insert("content-type".to_string(), vec!["application/json".into()]);
518 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
519
520 let test_req = TestRequest {
521 user_id: 123,
522 action: "create_user".to_string(),
523 };
524 let body = serde_json::to_vec(&test_req).unwrap();
525
526 RequestData {
527 method: http::Method::POST,
528 uri: http::Uri::from_static("/api/users"),
529 headers,
530 body: Some(Bytes::from(body)),
531 timestamp: SystemTime::now(),
532 correlation_id: 0,
533 }
534 }
535
536 fn create_test_response_data() -> ResponseData {
537 let mut headers = HashMap::new();
538 headers.insert("content-type".to_string(), vec!["application/json".into()]);
539
540 let test_res = TestResponse {
541 success: true,
542 message: "User created successfully".to_string(),
543 };
544 let body = serde_json::to_vec(&test_res).unwrap();
545
546 ResponseData {
547 status: http::StatusCode::CREATED,
548 headers,
549 body: Some(Bytes::from(body)),
550 timestamp: SystemTime::now(),
551 duration_to_first_byte: Duration::from_millis(100),
552 duration: Duration::from_millis(150),
553 correlation_id: 0,
554 }
555 }
556
557 #[sqlx::test]
558 async fn test_handler_creation(pool: PgPool) {
559 crate::migrator().run(&pool).await.unwrap();
561
562 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
563 .await
564 .unwrap();
565
566 let repository = handler.repository();
568
569 let filter = RequestFilter::default();
571 let results = repository.query(filter).await.unwrap();
572 assert!(results.is_empty());
573 }
574
575 #[sqlx::test]
576 async fn test_handle_request_with_typed_body(pool: PgPool) {
577 crate::migrator().run(&pool).await.unwrap();
579
580 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
581 .await
582 .unwrap();
583 let repository = handler.repository();
584
585 let mut request_data = create_test_request_data();
586 let correlation_id = 12345;
587 request_data.correlation_id = correlation_id;
588
589 handler.handle_request(request_data.clone()).await;
591
592 let filter = RequestFilter {
594 correlation_id: Some(correlation_id as i64),
595 ..Default::default()
596 };
597 let results = repository.query(filter).await.unwrap();
598
599 assert_eq!(results.len(), 1);
600 let pair = &results[0];
601
602 assert_eq!(pair.request.correlation_id, correlation_id as i64);
603 assert_eq!(pair.request.method, "POST");
604 assert_eq!(pair.request.uri, "/api/users");
605
606 match &pair.request.body {
608 Some(Ok(parsed_body)) => {
609 assert_eq!(
610 *parsed_body,
611 TestRequest {
612 user_id: 123,
613 action: "create_user".to_string(),
614 }
615 );
616 }
617 _ => panic!("Expected successfully parsed request body"),
618 }
619
620 let headers_value = &pair.request.headers;
622 assert!(headers_value.get("content-type").is_some());
623 assert!(headers_value.get("user-agent").is_some());
624
625 assert!(pair.response.is_none());
627 }
628
629 #[sqlx::test]
630 async fn test_handle_response_with_typed_body(pool: PgPool) {
631 crate::migrator().run(&pool).await.unwrap();
633
634 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
635 .await
636 .unwrap();
637 let repository = handler.repository();
638
639 let mut request_data = create_test_request_data();
640 let mut response_data = create_test_response_data();
641 let correlation_id = 54321;
642 request_data.correlation_id = correlation_id;
643 response_data.correlation_id = correlation_id;
644
645 handler.handle_request(request_data.clone()).await;
647 handler
648 .handle_response(request_data, response_data.clone())
649 .await;
650
651 let filter = RequestFilter {
653 correlation_id: Some(correlation_id as i64),
654 ..Default::default()
655 };
656 let results = repository.query(filter).await.unwrap();
657
658 assert_eq!(results.len(), 1);
659 let pair = &results[0];
660
661 let response = pair.response.as_ref().expect("Response should be present");
663 assert_eq!(response.correlation_id, correlation_id as i64);
664 assert_eq!(response.status_code, 201);
665 assert_eq!(response.duration_ms, 150);
666
667 match &response.body {
669 Some(Ok(parsed_body)) => {
670 assert_eq!(
671 *parsed_body,
672 TestResponse {
673 success: true,
674 message: "User created successfully".to_string(),
675 }
676 );
677 }
678 _ => panic!("Expected successfully parsed response body"),
679 }
680 }
681
682 #[sqlx::test]
683 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
684 crate::migrator().run(&pool).await.unwrap();
686
687 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
688 .await
689 .unwrap();
690 let repository = handler.repository();
691
692 let mut headers = HashMap::new();
694 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
695
696 let invalid_json_body = b"not valid json for TestRequest";
697 let correlation_id = 99999;
698 let request_data = RequestData {
699 method: http::Method::POST,
700 uri: http::Uri::from_static("/api/test"),
701 headers,
702 body: Some(Bytes::from(invalid_json_body.to_vec())),
703 timestamp: SystemTime::now(),
704 correlation_id,
705 };
706
707 handler.handle_request(request_data).await;
708
709 let filter = RequestFilter {
711 correlation_id: Some(correlation_id as i64),
712 ..Default::default()
713 };
714 let results = repository.query(filter).await.unwrap();
715
716 assert_eq!(results.len(), 1);
717 let pair = &results[0];
718
719 match &pair.request.body {
721 Some(Err(raw_bytes)) => {
722 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
723 }
724 _ => panic!("Expected raw bytes fallback for unparseable body"),
725 }
726 }
727
728 #[sqlx::test]
729 async fn test_query_with_multiple_filters(pool: PgPool) {
730 crate::migrator().run(&pool).await.unwrap();
732
733 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
734 .await
735 .unwrap();
736 let repository = handler.repository();
737
738 let test_cases = vec![
740 (1001, "GET", "/api/users", 200, 100),
741 (1002, "POST", "/api/users", 201, 150),
742 (1003, "GET", "/api/orders", 404, 50),
743 (1004, "PUT", "/api/users/123", 200, 300),
744 ];
745
746 for (correlation_id, method, uri, status, duration_ms) in test_cases {
747 let mut headers = HashMap::new();
748 headers.insert("content-type".to_string(), vec!["application/json".into()]);
749
750 let request_data = RequestData {
751 method: method.parse().unwrap(),
752 uri: uri.parse().unwrap(),
753 headers: headers.clone(),
754 body: Some(Bytes::from(b"{}".to_vec())),
755 timestamp: SystemTime::now(),
756 correlation_id,
757 };
758
759 let response_data = ResponseData {
760 correlation_id,
761 status: http::StatusCode::from_u16(status).unwrap(),
762 headers,
763 body: Some(Bytes::from(b"{}".to_vec())),
764 timestamp: SystemTime::now(),
765 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
766 duration: Duration::from_millis(duration_ms),
767 };
768
769 handler.handle_request(request_data.clone()).await;
770 handler.handle_response(request_data, response_data).await;
771 }
772
773 let filter = RequestFilter {
775 method: Some("GET".to_string()),
776 ..Default::default()
777 };
778 let results = repository.query(filter).await.unwrap();
779 assert_eq!(results.len(), 2); let filter = RequestFilter {
783 status_code: Some(200),
784 ..Default::default()
785 };
786 let results = repository.query(filter).await.unwrap();
787 assert_eq!(results.len(), 2); let filter = RequestFilter {
791 uri_pattern: Some("/api/users%".to_string()),
792 ..Default::default()
793 };
794 let results = repository.query(filter).await.unwrap();
795 assert_eq!(results.len(), 3); let filter = RequestFilter {
799 min_duration_ms: Some(100),
800 max_duration_ms: Some(200),
801 ..Default::default()
802 };
803 let results = repository.query(filter).await.unwrap();
804 assert_eq!(results.len(), 2); let filter = RequestFilter {
808 method: Some("GET".to_string()),
809 status_code: Some(200),
810 ..Default::default()
811 };
812 let results = repository.query(filter).await.unwrap();
813 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
815 }
816
817 #[sqlx::test]
818 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
819 crate::migrator().run(&pool).await.unwrap();
821
822 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
823 .await
824 .unwrap();
825 let repository = handler.repository();
826
827 let now = SystemTime::now();
829 for i in 0..5 {
830 let correlation_id = 2000 + i;
831 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
834 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
835
836 let request_data = RequestData {
837 method: http::Method::GET,
838 uri: "/api/test".parse().unwrap(),
839 headers,
840 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
841 timestamp,
842 correlation_id,
843 };
844
845 handler.handle_request(request_data).await;
846 }
847
848 let filter = RequestFilter {
850 limit: Some(3),
851 ..Default::default()
852 };
853 let results = repository.query(filter).await.unwrap();
854 assert_eq!(results.len(), 3);
855
856 for i in 0..2 {
858 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
859 }
860
861 let filter = RequestFilter {
863 order_by_timestamp_desc: true,
864 limit: Some(2),
865 offset: Some(1),
866 ..Default::default()
867 };
868 let results = repository.query(filter).await.unwrap();
869 assert_eq!(results.len(), 2);
870
871 assert!(results[0].request.timestamp >= results[1].request.timestamp);
873 }
874
875 #[sqlx::test]
876 async fn test_headers_conversion(pool: PgPool) {
877 crate::migrator().run(&pool).await.unwrap();
879
880 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
881 .await
882 .unwrap();
883 let repository = handler.repository();
884
885 let mut headers = HashMap::new();
887 headers.insert("single-value".to_string(), vec!["test".into()]);
888 headers.insert(
889 "multi-value".to_string(),
890 vec!["val1".into(), "val2".into()],
891 );
892 headers.insert("empty-value".to_string(), vec!["".into()]);
893
894 let request_data = RequestData {
895 correlation_id: 3000,
896 method: http::Method::GET,
897 uri: "/test".parse().unwrap(),
898 headers,
899 body: None,
900 timestamp: SystemTime::now(),
901 };
902
903 let correlation_id = 3000;
904 handler.handle_request(request_data).await;
905
906 let filter = RequestFilter {
907 correlation_id: Some(correlation_id as i64),
908 ..Default::default()
909 };
910 let results = repository.query(filter).await.unwrap();
911
912 assert_eq!(results.len(), 1);
913 let headers_json = &results[0].request.headers;
914
915 assert_eq!(
917 headers_json["single-value"],
918 Value::String("test".to_string())
919 );
920
921 match &headers_json["multi-value"] {
923 Value::Array(arr) => {
924 assert_eq!(arr.len(), 2);
925 assert_eq!(arr[0], Value::String("val1".to_string()));
926 assert_eq!(arr[1], Value::String("val2".to_string()));
927 }
928 _ => panic!("Expected array for multi-value header"),
929 }
930
931 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
933 }
934
935 #[sqlx::test]
936 async fn test_timestamp_filtering(pool: PgPool) {
937 crate::migrator().run(&pool).await.unwrap();
939
940 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
941 .await
942 .unwrap();
943 let repository = handler.repository();
944
945 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
949 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
953
954 for (i, timestamp) in times.iter().enumerate() {
955 let correlation_id = 4001 + i as u64;
956 let request_data = RequestData {
957 method: http::Method::GET,
958 uri: "/test".parse().unwrap(),
959 headers: HashMap::new(),
960 body: None,
961 timestamp: *timestamp,
962 correlation_id,
963 };
964
965 handler.handle_request(request_data).await;
966 }
967
968 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
971 timestamp_after: Some(after_time),
972 ..Default::default()
973 };
974 let results = repository.query(filter).await.unwrap();
975 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
980 timestamp_before: Some(before_time),
981 ..Default::default()
982 };
983 let results = repository.query(filter).await.unwrap();
984 assert_eq!(results.len(), 2); let filter = RequestFilter {
988 timestamp_after: Some(after_time),
989 timestamp_before: Some(before_time),
990 ..Default::default()
991 };
992 let results = repository.query(filter).await.unwrap();
993 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
995 }
996
997 #[sqlx::test]
1002 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1003 crate::migrator().run(&pool).await.unwrap();
1005
1006 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1008 .await
1009 .unwrap();
1010 let repository = handler.repository();
1011
1012 let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1013 for (i, uri) in test_uris.iter().enumerate() {
1014 let correlation_id = 3000 + i as u64;
1015 let mut headers = HashMap::new();
1016 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1017
1018 let request_data = RequestData {
1019 method: http::Method::GET,
1020 uri: uri.parse().unwrap(),
1021 headers,
1022 body: Some(Bytes::from(b"{}".to_vec())),
1023 timestamp: SystemTime::now(),
1024 correlation_id,
1025 };
1026
1027 handler.handle_request(request_data).await;
1028 }
1029
1030 let filter = RequestFilter::default();
1032 let results = repository.query(filter).await.unwrap();
1033 assert_eq!(results.len(), 4);
1034 }
1035}