1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use metrics::{counter, histogram};
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Instant, SystemTime};
93use tracing::{debug, error, instrument, warn};
94use uuid::Uuid;
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
105
106pub fn migrator() -> sqlx::migrate::Migrator {
130 sqlx::migrate!("./migrations")
131}
132
133type RequestSerializer<T> =
138 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
139
140type ResponseSerializer<T> = Arc<
146 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
147 + Send
148 + Sync,
149>;
150
151#[derive(Clone)]
162pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
163where
164 P: PoolProvider,
165 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
166 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
167{
168 pool: P,
169 request_serializer: RequestSerializer<TReq>,
170 response_serializer: ResponseSerializer<TRes>,
171 instance_id: Uuid,
172}
173
174impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
175where
176 P: PoolProvider,
177 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
178 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
179{
180 fn default_request_serializer() -> RequestSerializer<TReq> {
183 Arc::new(|request_data| {
184 let bytes = request_data.body.as_deref().unwrap_or(&[]);
185 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
186 let fallback_data = String::from_utf8_lossy(bytes).to_string();
187 SerializationError::new(fallback_data, error)
188 })
189 })
190 }
191
192 fn default_response_serializer() -> ResponseSerializer<TRes> {
195 Arc::new(|_request_data, response_data| {
196 let bytes = response_data.body.as_deref().unwrap_or(&[]);
197 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
198 let fallback_data = String::from_utf8_lossy(bytes).to_string();
199 SerializationError::new(fallback_data, error)
200 })
201 })
202 }
203
204 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
216 where
217 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
218 {
219 self.request_serializer = Arc::new(serializer);
220 self
221 }
222
223 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
235 where
236 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
237 + Send
238 + Sync
239 + 'static,
240 {
241 self.response_serializer = Arc::new(serializer);
242 self
243 }
244
245 pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
285 Ok(Self {
286 pool: pool_provider,
287 request_serializer: Self::default_request_serializer(),
288 response_serializer: Self::default_response_serializer(),
289 instance_id: Uuid::new_v4(),
290 })
291 }
292
293 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
295 let mut header_map = HashMap::new();
296 for (name, values) in headers {
297 if values.len() == 1 {
298 let value_str = String::from_utf8_lossy(&values[0]).to_string();
299 header_map.insert(name.clone(), Value::String(value_str));
300 } else {
301 let value_array: Vec<Value> = values
302 .iter()
303 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
304 .collect();
305 header_map.insert(name.clone(), Value::Array(value_array));
306 }
307 }
308 serde_json::to_value(header_map).unwrap_or(Value::Null)
309 }
310
311 fn request_body_to_json_with_fallback(
313 &self,
314 request_data: &outlet::RequestData,
315 ) -> (Value, bool) {
316 match (self.request_serializer)(request_data) {
317 Ok(typed_value) => {
318 if let Ok(json_value) = serde_json::to_value(&typed_value) {
319 (json_value, true)
320 } else {
321 (
323 Value::String(
324 serde_json::to_string(&typed_value)
325 .expect("Serialized value must be convertible to JSON string"),
326 ),
327 false,
328 )
329 }
330 }
331 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
332 }
333 }
334
335 fn response_body_to_json_with_fallback(
337 &self,
338 request_data: &outlet::RequestData,
339 response_data: &outlet::ResponseData,
340 ) -> (Value, bool) {
341 match (self.response_serializer)(request_data, response_data) {
342 Ok(typed_value) => {
343 if let Ok(json_value) = serde_json::to_value(&typed_value) {
344 (json_value, true)
345 } else {
346 (
348 Value::String(
349 serde_json::to_string(&typed_value)
350 .expect("Serialized value must be convertible to JSON string"),
351 ),
352 false,
353 )
354 }
355 }
356 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
357 }
358 }
359
360 pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
365 crate::repository::RequestRepository::new(self.pool.clone())
366 }
367}
368
369impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
371where
372 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
373 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
374{
375 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
408 let pool = PgPool::connect(database_url)
409 .await
410 .map_err(PostgresHandlerError::Connection)?;
411
412 Ok(Self {
413 pool,
414 request_serializer: Self::default_request_serializer(),
415 response_serializer: Self::default_response_serializer(),
416 instance_id: Uuid::new_v4(),
417 })
418 }
419
420 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
455 Self::from_pool_provider(pool).await
456 }
457}
458
459impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
460where
461 P: PoolProvider,
462 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
463 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
464{
465 #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
466 async fn handle_request(&self, data: RequestData) {
467 let headers_json = Self::headers_to_json(&data.headers);
468 let (body_json, parsed) = if data.body.is_some() {
469 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
470 (Some(json), parsed)
471 } else {
472 (None, false)
473 };
474
475 let timestamp: DateTime<Utc> = data.timestamp.into();
476
477 let query_start = Instant::now();
478 let result = sqlx::query(
479 r#"
480 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
481 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
482 "#,
483 )
484 .bind(self.instance_id)
485 .bind(data.correlation_id as i64)
486 .bind(timestamp)
487 .bind(data.method.to_string())
488 .bind(data.uri.to_string())
489 .bind(headers_json)
490 .bind(body_json)
491 .bind(parsed)
492 .execute(self.pool.write())
493 .await;
494 let query_duration = query_start.elapsed();
495 histogram!("outlet_write_duration_seconds", "operation" => "request")
496 .record(query_duration.as_secs_f64());
497
498 if let Err(e) = result {
499 counter!("outlet_write_errors_total", "operation" => "request").increment(1);
500 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
501 } else {
502 let processing_lag_ms = SystemTime::now()
503 .duration_since(data.timestamp)
504 .unwrap_or_default()
505 .as_millis();
506 if processing_lag_ms > 1000 {
507 warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
508 } else {
509 debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
510 }
511 }
512 }
513
514 #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
515 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
516 let headers_json = Self::headers_to_json(&response_data.headers);
517 let (body_json, parsed) = if response_data.body.is_some() {
518 let (json, parsed) =
519 self.response_body_to_json_with_fallback(&request_data, &response_data);
520 (Some(json), parsed)
521 } else {
522 (None, false)
523 };
524
525 let timestamp: DateTime<Utc> = response_data.timestamp.into();
526 let duration_ms = response_data.duration.as_millis() as i64;
527 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
528
529 let query_start = Instant::now();
530 let result = sqlx::query(
531 r#"
532 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
533 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
534 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
535 "#,
536 )
537 .bind(self.instance_id)
538 .bind(request_data.correlation_id as i64)
539 .bind(timestamp)
540 .bind(response_data.status.as_u16() as i32)
541 .bind(headers_json)
542 .bind(body_json)
543 .bind(parsed)
544 .bind(duration_to_first_byte_ms)
545 .bind(duration_ms)
546 .execute(self.pool.write())
547 .await;
548 let query_duration = query_start.elapsed();
549 histogram!("outlet_write_duration_seconds", "operation" => "response")
550 .record(query_duration.as_secs_f64());
551
552 match result {
553 Err(e) => {
554 counter!("outlet_write_errors_total", "operation" => "response").increment(1);
555 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
556 }
557 Ok(query_result) => {
558 if query_result.rows_affected() > 0 {
559 let processing_lag_ms = SystemTime::now()
560 .duration_since(response_data.timestamp)
561 .unwrap_or_default()
562 .as_millis();
563 if processing_lag_ms > 1000 {
564 warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
565 } else {
566 debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
567 }
568 } else {
569 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
570 }
571 }
572 }
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use bytes::Bytes;
580 use chrono::{DateTime, Utc};
581 use outlet::{RequestData, ResponseData};
582 use serde::{Deserialize, Serialize};
583 use serde_json::Value;
584 use sqlx::PgPool;
585 use std::collections::HashMap;
586 use std::time::{Duration, SystemTime};
587
588 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
589 struct TestRequest {
590 user_id: u64,
591 action: String,
592 }
593
594 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
595 struct TestResponse {
596 success: bool,
597 message: String,
598 }
599
600 fn create_test_request_data() -> RequestData {
601 let mut headers = HashMap::new();
602 headers.insert("content-type".to_string(), vec!["application/json".into()]);
603 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
604
605 let test_req = TestRequest {
606 user_id: 123,
607 action: "create_user".to_string(),
608 };
609 let body = serde_json::to_vec(&test_req).unwrap();
610
611 RequestData {
612 method: http::Method::POST,
613 uri: http::Uri::from_static("/api/users"),
614 headers,
615 body: Some(Bytes::from(body)),
616 timestamp: SystemTime::now(),
617 correlation_id: 0,
618 }
619 }
620
621 fn create_test_response_data() -> ResponseData {
622 let mut headers = HashMap::new();
623 headers.insert("content-type".to_string(), vec!["application/json".into()]);
624
625 let test_res = TestResponse {
626 success: true,
627 message: "User created successfully".to_string(),
628 };
629 let body = serde_json::to_vec(&test_res).unwrap();
630
631 ResponseData {
632 status: http::StatusCode::CREATED,
633 headers,
634 body: Some(Bytes::from(body)),
635 timestamp: SystemTime::now(),
636 duration_to_first_byte: Duration::from_millis(100),
637 duration: Duration::from_millis(150),
638 correlation_id: 0,
639 }
640 }
641
642 #[sqlx::test]
643 async fn test_handler_creation(pool: PgPool) {
644 crate::migrator().run(&pool).await.unwrap();
646
647 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
648 .await
649 .unwrap();
650
651 let repository = handler.repository();
653
654 let filter = RequestFilter::default();
656 let results = repository.query(filter).await.unwrap();
657 assert!(results.is_empty());
658 }
659
660 #[sqlx::test]
661 async fn test_handle_request_with_typed_body(pool: PgPool) {
662 crate::migrator().run(&pool).await.unwrap();
664
665 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
666 .await
667 .unwrap();
668 let repository = handler.repository();
669
670 let mut request_data = create_test_request_data();
671 let correlation_id = 12345;
672 request_data.correlation_id = correlation_id;
673
674 handler.handle_request(request_data.clone()).await;
676
677 let filter = RequestFilter {
679 correlation_id: Some(correlation_id as i64),
680 ..Default::default()
681 };
682 let results = repository.query(filter).await.unwrap();
683
684 assert_eq!(results.len(), 1);
685 let pair = &results[0];
686
687 assert_eq!(pair.request.correlation_id, correlation_id as i64);
688 assert_eq!(pair.request.method, "POST");
689 assert_eq!(pair.request.uri, "/api/users");
690
691 match &pair.request.body {
693 Some(Ok(parsed_body)) => {
694 assert_eq!(
695 *parsed_body,
696 TestRequest {
697 user_id: 123,
698 action: "create_user".to_string(),
699 }
700 );
701 }
702 _ => panic!("Expected successfully parsed request body"),
703 }
704
705 let headers_value = &pair.request.headers;
707 assert!(headers_value.get("content-type").is_some());
708 assert!(headers_value.get("user-agent").is_some());
709
710 assert!(pair.response.is_none());
712 }
713
714 #[sqlx::test]
715 async fn test_handle_response_with_typed_body(pool: PgPool) {
716 crate::migrator().run(&pool).await.unwrap();
718
719 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
720 .await
721 .unwrap();
722 let repository = handler.repository();
723
724 let mut request_data = create_test_request_data();
725 let mut response_data = create_test_response_data();
726 let correlation_id = 54321;
727 request_data.correlation_id = correlation_id;
728 response_data.correlation_id = correlation_id;
729
730 handler.handle_request(request_data.clone()).await;
732 handler
733 .handle_response(request_data, response_data.clone())
734 .await;
735
736 let filter = RequestFilter {
738 correlation_id: Some(correlation_id as i64),
739 ..Default::default()
740 };
741 let results = repository.query(filter).await.unwrap();
742
743 assert_eq!(results.len(), 1);
744 let pair = &results[0];
745
746 let response = pair.response.as_ref().expect("Response should be present");
748 assert_eq!(response.correlation_id, correlation_id as i64);
749 assert_eq!(response.status_code, 201);
750 assert_eq!(response.duration_ms, 150);
751
752 match &response.body {
754 Some(Ok(parsed_body)) => {
755 assert_eq!(
756 *parsed_body,
757 TestResponse {
758 success: true,
759 message: "User created successfully".to_string(),
760 }
761 );
762 }
763 _ => panic!("Expected successfully parsed response body"),
764 }
765 }
766
767 #[sqlx::test]
768 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
769 crate::migrator().run(&pool).await.unwrap();
771
772 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
773 .await
774 .unwrap();
775 let repository = handler.repository();
776
777 let mut headers = HashMap::new();
779 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
780
781 let invalid_json_body = b"not valid json for TestRequest";
782 let correlation_id = 99999;
783 let request_data = RequestData {
784 method: http::Method::POST,
785 uri: http::Uri::from_static("/api/test"),
786 headers,
787 body: Some(Bytes::from(invalid_json_body.to_vec())),
788 timestamp: SystemTime::now(),
789 correlation_id,
790 };
791
792 handler.handle_request(request_data).await;
793
794 let filter = RequestFilter {
796 correlation_id: Some(correlation_id as i64),
797 ..Default::default()
798 };
799 let results = repository.query(filter).await.unwrap();
800
801 assert_eq!(results.len(), 1);
802 let pair = &results[0];
803
804 match &pair.request.body {
806 Some(Err(raw_bytes)) => {
807 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
808 }
809 _ => panic!("Expected raw bytes fallback for unparseable body"),
810 }
811 }
812
813 #[sqlx::test]
814 async fn test_query_with_multiple_filters(pool: PgPool) {
815 crate::migrator().run(&pool).await.unwrap();
817
818 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
819 .await
820 .unwrap();
821 let repository = handler.repository();
822
823 let test_cases = vec![
825 (1001, "GET", "/api/users", 200, 100),
826 (1002, "POST", "/api/users", 201, 150),
827 (1003, "GET", "/api/orders", 404, 50),
828 (1004, "PUT", "/api/users/123", 200, 300),
829 ];
830
831 for (correlation_id, method, uri, status, duration_ms) in test_cases {
832 let mut headers = HashMap::new();
833 headers.insert("content-type".to_string(), vec!["application/json".into()]);
834
835 let request_data = RequestData {
836 method: method.parse().unwrap(),
837 uri: uri.parse().unwrap(),
838 headers: headers.clone(),
839 body: Some(Bytes::from(b"{}".to_vec())),
840 timestamp: SystemTime::now(),
841 correlation_id,
842 };
843
844 let response_data = ResponseData {
845 correlation_id,
846 status: http::StatusCode::from_u16(status).unwrap(),
847 headers,
848 body: Some(Bytes::from(b"{}".to_vec())),
849 timestamp: SystemTime::now(),
850 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
851 duration: Duration::from_millis(duration_ms),
852 };
853
854 handler.handle_request(request_data.clone()).await;
855 handler.handle_response(request_data, response_data).await;
856 }
857
858 let filter = RequestFilter {
860 method: Some("GET".to_string()),
861 ..Default::default()
862 };
863 let results = repository.query(filter).await.unwrap();
864 assert_eq!(results.len(), 2); let filter = RequestFilter {
868 status_code: Some(200),
869 ..Default::default()
870 };
871 let results = repository.query(filter).await.unwrap();
872 assert_eq!(results.len(), 2); let filter = RequestFilter {
876 uri_pattern: Some("/api/users%".to_string()),
877 ..Default::default()
878 };
879 let results = repository.query(filter).await.unwrap();
880 assert_eq!(results.len(), 3); let filter = RequestFilter {
884 min_duration_ms: Some(100),
885 max_duration_ms: Some(200),
886 ..Default::default()
887 };
888 let results = repository.query(filter).await.unwrap();
889 assert_eq!(results.len(), 2); let filter = RequestFilter {
893 method: Some("GET".to_string()),
894 status_code: Some(200),
895 ..Default::default()
896 };
897 let results = repository.query(filter).await.unwrap();
898 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
900 }
901
902 #[sqlx::test]
903 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
904 crate::migrator().run(&pool).await.unwrap();
906
907 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
908 .await
909 .unwrap();
910 let repository = handler.repository();
911
912 let now = SystemTime::now();
914 for i in 0..5 {
915 let correlation_id = 2000 + i;
916 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
919 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
920
921 let request_data = RequestData {
922 method: http::Method::GET,
923 uri: "/api/test".parse().unwrap(),
924 headers,
925 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
926 timestamp,
927 correlation_id,
928 };
929
930 handler.handle_request(request_data).await;
931 }
932
933 let filter = RequestFilter {
935 limit: Some(3),
936 ..Default::default()
937 };
938 let results = repository.query(filter).await.unwrap();
939 assert_eq!(results.len(), 3);
940
941 for i in 0..2 {
943 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
944 }
945
946 let filter = RequestFilter {
948 order_by_timestamp_desc: true,
949 limit: Some(2),
950 offset: Some(1),
951 ..Default::default()
952 };
953 let results = repository.query(filter).await.unwrap();
954 assert_eq!(results.len(), 2);
955
956 assert!(results[0].request.timestamp >= results[1].request.timestamp);
958 }
959
960 #[sqlx::test]
961 async fn test_headers_conversion(pool: PgPool) {
962 crate::migrator().run(&pool).await.unwrap();
964
965 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
966 .await
967 .unwrap();
968 let repository = handler.repository();
969
970 let mut headers = HashMap::new();
972 headers.insert("single-value".to_string(), vec!["test".into()]);
973 headers.insert(
974 "multi-value".to_string(),
975 vec!["val1".into(), "val2".into()],
976 );
977 headers.insert("empty-value".to_string(), vec!["".into()]);
978
979 let request_data = RequestData {
980 correlation_id: 3000,
981 method: http::Method::GET,
982 uri: "/test".parse().unwrap(),
983 headers,
984 body: None,
985 timestamp: SystemTime::now(),
986 };
987
988 let correlation_id = 3000;
989 handler.handle_request(request_data).await;
990
991 let filter = RequestFilter {
992 correlation_id: Some(correlation_id as i64),
993 ..Default::default()
994 };
995 let results = repository.query(filter).await.unwrap();
996
997 assert_eq!(results.len(), 1);
998 let headers_json = &results[0].request.headers;
999
1000 assert_eq!(
1002 headers_json["single-value"],
1003 Value::String("test".to_string())
1004 );
1005
1006 match &headers_json["multi-value"] {
1008 Value::Array(arr) => {
1009 assert_eq!(arr.len(), 2);
1010 assert_eq!(arr[0], Value::String("val1".to_string()));
1011 assert_eq!(arr[1], Value::String("val2".to_string()));
1012 }
1013 _ => panic!("Expected array for multi-value header"),
1014 }
1015
1016 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1018 }
1019
1020 #[sqlx::test]
1021 async fn test_timestamp_filtering(pool: PgPool) {
1022 crate::migrator().run(&pool).await.unwrap();
1024
1025 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1026 .await
1027 .unwrap();
1028 let repository = handler.repository();
1029
1030 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
1034 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
1038
1039 for (i, timestamp) in times.iter().enumerate() {
1040 let correlation_id = 4001 + i as u64;
1041 let request_data = RequestData {
1042 method: http::Method::GET,
1043 uri: "/test".parse().unwrap(),
1044 headers: HashMap::new(),
1045 body: None,
1046 timestamp: *timestamp,
1047 correlation_id,
1048 };
1049
1050 handler.handle_request(request_data).await;
1051 }
1052
1053 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
1056 timestamp_after: Some(after_time),
1057 ..Default::default()
1058 };
1059 let results = repository.query(filter).await.unwrap();
1060 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
1065 timestamp_before: Some(before_time),
1066 ..Default::default()
1067 };
1068 let results = repository.query(filter).await.unwrap();
1069 assert_eq!(results.len(), 2); let filter = RequestFilter {
1073 timestamp_after: Some(after_time),
1074 timestamp_before: Some(before_time),
1075 ..Default::default()
1076 };
1077 let results = repository.query(filter).await.unwrap();
1078 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1080 }
1081
1082 #[sqlx::test]
1087 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1088 crate::migrator().run(&pool).await.unwrap();
1090
1091 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1093 .await
1094 .unwrap();
1095 let repository = handler.repository();
1096
1097 let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
1098 for (i, uri) in test_uris.iter().enumerate() {
1099 let correlation_id = 3000 + i as u64;
1100 let mut headers = HashMap::new();
1101 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1102
1103 let request_data = RequestData {
1104 method: http::Method::GET,
1105 uri: uri.parse().unwrap(),
1106 headers,
1107 body: Some(Bytes::from(b"{}".to_vec())),
1108 timestamp: SystemTime::now(),
1109 correlation_id,
1110 };
1111
1112 handler.handle_request(request_data).await;
1113 }
1114
1115 let filter = RequestFilter::default();
1117 let results = repository.query(filter).await.unwrap();
1118 assert_eq!(results.len(), 4);
1119 }
1120
1121 #[sqlx::test]
1123 async fn test_write_operations_use_write_pool(pool: PgPool) {
1124 crate::migrator().run(&pool).await.unwrap();
1126
1127 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1129 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1130 .await
1131 .unwrap();
1132
1133 let mut request_data = create_test_request_data();
1134 let correlation_id = 5001;
1135 request_data.correlation_id = correlation_id;
1136
1137 handler.handle_request(request_data.clone()).await;
1139
1140 let count: i64 =
1142 sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
1143 .bind(correlation_id as i64)
1144 .fetch_one(test_pools.write())
1145 .await
1146 .unwrap();
1147
1148 assert_eq!(count, 1, "Request should be written to primary pool");
1149 }
1150
1151 #[sqlx::test]
1152 async fn test_response_write_uses_write_pool(pool: PgPool) {
1153 crate::migrator().run(&pool).await.unwrap();
1155
1156 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1157 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1158 .await
1159 .unwrap();
1160
1161 let mut request_data = create_test_request_data();
1162 let mut response_data = create_test_response_data();
1163 let correlation_id = 5002;
1164 request_data.correlation_id = correlation_id;
1165 response_data.correlation_id = correlation_id;
1166
1167 handler.handle_request(request_data.clone()).await;
1169
1170 handler.handle_response(request_data, response_data).await;
1172
1173 let count: i64 =
1175 sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
1176 .bind(correlation_id as i64)
1177 .fetch_one(test_pools.write())
1178 .await
1179 .unwrap();
1180
1181 assert_eq!(count, 1, "Response should be written to primary pool");
1182 }
1183
1184 #[sqlx::test]
1185 async fn test_repository_queries_use_read_pool(pool: PgPool) {
1186 crate::migrator().run(&pool).await.unwrap();
1188
1189 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1190 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1191 .await
1192 .unwrap();
1193
1194 let mut request_data = create_test_request_data();
1196 let correlation_id = 5003;
1197 request_data.correlation_id = correlation_id;
1198 handler.handle_request(request_data).await;
1199
1200 let repository = handler.repository();
1202 let filter = RequestFilter {
1203 correlation_id: Some(correlation_id as i64),
1204 ..Default::default()
1205 };
1206
1207 let results = repository.query(filter).await.unwrap();
1209 assert_eq!(results.len(), 1);
1210 assert_eq!(results[0].request.correlation_id, correlation_id as i64);
1211 }
1212
1213 #[sqlx::test]
1214 async fn test_replica_pool_rejects_writes(pool: PgPool) {
1215 crate::migrator().run(&pool).await.unwrap();
1217
1218 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1219
1220 let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
1222 .bind(Uuid::new_v4())
1223 .bind(9999i64)
1224 .bind(Utc::now())
1225 .bind("GET")
1226 .bind("/test")
1227 .bind(serde_json::json!({}))
1228 .bind(None::<Value>)
1229 .bind(false)
1230 .execute(test_pools.read())
1231 .await;
1232
1233 assert!(
1235 result.is_err(),
1236 "Replica pool should reject write operations"
1237 );
1238
1239 let err = result.unwrap_err();
1240 let err_msg = err.to_string().to_lowercase();
1241 assert!(
1242 err_msg.contains("read-only") || err_msg.contains("read only"),
1243 "Error should mention read-only: {}",
1244 err
1245 );
1246 }
1247
1248 #[sqlx::test]
1249 async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
1250 crate::migrator().run(&pool).await.unwrap();
1252
1253 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1254 let handler =
1255 PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
1256 .await
1257 .unwrap();
1258
1259 let mut request_data = create_test_request_data();
1260 let mut response_data = create_test_response_data();
1261 let correlation_id = 5004;
1262 request_data.correlation_id = correlation_id;
1263 response_data.correlation_id = correlation_id;
1264
1265 handler.handle_request(request_data.clone()).await;
1267 handler.handle_response(request_data, response_data).await;
1268
1269 let repository = handler.repository();
1271 let filter = RequestFilter {
1272 correlation_id: Some(correlation_id as i64),
1273 ..Default::default()
1274 };
1275
1276 let results = repository.query(filter).await.unwrap();
1277 assert_eq!(results.len(), 1);
1278
1279 let pair = &results[0];
1281 assert_eq!(pair.request.correlation_id, correlation_id as i64);
1282 assert_eq!(pair.request.method, "POST");
1283 assert_eq!(pair.request.uri, "/api/users");
1284
1285 let response = pair.response.as_ref().expect("Response should exist");
1287 assert_eq!(response.correlation_id, correlation_id as i64);
1288 assert_eq!(response.status_code, 201);
1289
1290 match &pair.request.body {
1292 Some(Ok(parsed_body)) => {
1293 assert_eq!(
1294 *parsed_body,
1295 TestRequest {
1296 user_id: 123,
1297 action: "create_user".to_string(),
1298 }
1299 );
1300 }
1301 _ => panic!("Expected successfully parsed request body"),
1302 }
1303
1304 match &response.body {
1305 Some(Ok(parsed_body)) => {
1306 assert_eq!(
1307 *parsed_body,
1308 TestResponse {
1309 success: true,
1310 message: "User created successfully".to_string(),
1311 }
1312 );
1313 }
1314 _ => panic!("Expected successfully parsed response body"),
1315 }
1316 }
1317}