1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use metrics::{counter, histogram};
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Instant, SystemTime};
93use tracing::{debug, error, instrument, warn};
94use uuid::Uuid;
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
105
106pub fn migrator() -> sqlx::migrate::Migrator {
130 sqlx::migrate!("./migrations")
131}
132
133type RequestSerializer<T> =
138 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
139
140type ResponseSerializer<T> = Arc<
146 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
147 + Send
148 + Sync,
149>;
150
151#[derive(Clone)]
162pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
163where
164 P: PoolProvider,
165 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
166 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
167{
168 pool: P,
169 request_serializer: RequestSerializer<TReq>,
170 response_serializer: ResponseSerializer<TRes>,
171 instance_id: Uuid,
172}
173
174impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
175where
176 P: PoolProvider,
177 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
178 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
179{
180 fn default_request_serializer() -> RequestSerializer<TReq> {
183 Arc::new(|request_data| {
184 let bytes = request_data.body.as_deref().unwrap_or(&[]);
185 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
186 let fallback_data = String::from_utf8_lossy(bytes).to_string();
187 SerializationError::new(fallback_data, error)
188 })
189 })
190 }
191
192 fn default_response_serializer() -> ResponseSerializer<TRes> {
195 Arc::new(|_request_data, response_data| {
196 let bytes = response_data.body.as_deref().unwrap_or(&[]);
197 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
198 let fallback_data = String::from_utf8_lossy(bytes).to_string();
199 SerializationError::new(fallback_data, error)
200 })
201 })
202 }
203
204 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
216 where
217 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
218 {
219 self.request_serializer = Arc::new(serializer);
220 self
221 }
222
223 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
235 where
236 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
237 + Send
238 + Sync
239 + 'static,
240 {
241 self.response_serializer = Arc::new(serializer);
242 self
243 }
244
245 pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
285 Ok(Self {
286 pool: pool_provider,
287 request_serializer: Self::default_request_serializer(),
288 response_serializer: Self::default_response_serializer(),
289 instance_id: Uuid::new_v4(),
290 })
291 }
292
293 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
295 let mut header_map = HashMap::new();
296 for (name, values) in headers {
297 if values.len() == 1 {
298 let value_str = String::from_utf8_lossy(&values[0]).to_string();
299 header_map.insert(name.clone(), Value::String(value_str));
300 } else {
301 let value_array: Vec<Value> = values
302 .iter()
303 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
304 .collect();
305 header_map.insert(name.clone(), Value::Array(value_array));
306 }
307 }
308 serde_json::to_value(header_map).unwrap_or(Value::Null)
309 }
310
311 fn request_body_to_json_with_fallback(
313 &self,
314 request_data: &outlet::RequestData,
315 ) -> (Value, bool) {
316 match (self.request_serializer)(request_data) {
317 Ok(typed_value) => {
318 if let Ok(json_value) = serde_json::to_value(&typed_value) {
319 (json_value, true)
320 } else {
321 (
323 Value::String(
324 serde_json::to_string(&typed_value)
325 .expect("Serialized value must be convertible to JSON string"),
326 ),
327 false,
328 )
329 }
330 }
331 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
332 }
333 }
334
335 fn response_body_to_json_with_fallback(
337 &self,
338 request_data: &outlet::RequestData,
339 response_data: &outlet::ResponseData,
340 ) -> (Value, bool) {
341 match (self.response_serializer)(request_data, response_data) {
342 Ok(typed_value) => {
343 if let Ok(json_value) = serde_json::to_value(&typed_value) {
344 (json_value, true)
345 } else {
346 (
348 Value::String(
349 serde_json::to_string(&typed_value)
350 .expect("Serialized value must be convertible to JSON string"),
351 ),
352 false,
353 )
354 }
355 }
356 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
357 }
358 }
359
360 pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
365 crate::repository::RequestRepository::new(self.pool.clone())
366 }
367}
368
369impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
371where
372 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
373 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
374{
375 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
408 let pool = PgPool::connect(database_url)
409 .await
410 .map_err(PostgresHandlerError::Connection)?;
411
412 Ok(Self {
413 pool,
414 request_serializer: Self::default_request_serializer(),
415 response_serializer: Self::default_response_serializer(),
416 instance_id: Uuid::new_v4(),
417 })
418 }
419
420 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
455 Self::from_pool_provider(pool).await
456 }
457}
458
459impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
460where
461 P: PoolProvider,
462 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
463 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
464{
465 #[instrument(name = "outlet.handle_request", skip(self, data), fields(correlation_id = %data.correlation_id))]
466 async fn handle_request(&self, data: RequestData) {
467 let headers_json = Self::headers_to_json(&data.headers);
468 let (body_json, parsed) = if data.body.is_some() {
469 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
470 (Some(json), parsed)
471 } else {
472 (None, false)
473 };
474
475 let timestamp: DateTime<Utc> = data.timestamp.into();
476
477 let query_start = Instant::now();
478 let result = sqlx::query(
479 r#"
480 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
481 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
482 "#,
483 )
484 .bind(self.instance_id)
485 .bind(data.correlation_id as i64)
486 .bind(timestamp)
487 .bind(data.method.to_string())
488 .bind(data.uri.to_string())
489 .bind(headers_json)
490 .bind(body_json)
491 .bind(parsed)
492 .bind(&data.trace_id)
493 .bind(&data.span_id)
494 .execute(self.pool.write())
495 .await;
496 let query_duration = query_start.elapsed();
497 histogram!("outlet_write_duration_seconds", "operation" => "request")
498 .record(query_duration.as_secs_f64());
499
500 if let Err(e) = result {
501 counter!("outlet_write_errors_total", "operation" => "request").increment(1);
502 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
503 } else {
504 let processing_lag_ms = SystemTime::now()
505 .duration_since(data.timestamp)
506 .unwrap_or_default()
507 .as_millis();
508 if processing_lag_ms > 1000 {
509 warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
510 } else {
511 debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
512 }
513 }
514 }
515
516 #[instrument(name = "outlet.handle_response", skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
517 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
518 let headers_json = Self::headers_to_json(&response_data.headers);
519 let (body_json, parsed) = if response_data.body.is_some() {
520 let (json, parsed) =
521 self.response_body_to_json_with_fallback(&request_data, &response_data);
522 (Some(json), parsed)
523 } else {
524 (None, false)
525 };
526
527 let timestamp: DateTime<Utc> = response_data.timestamp.into();
528 let duration_ms = response_data.duration.as_millis() as i64;
529 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
530
531 let query_start = Instant::now();
532 let result = sqlx::query(
533 r#"
534 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
535 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
536 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
537 "#,
538 )
539 .bind(self.instance_id)
540 .bind(request_data.correlation_id as i64)
541 .bind(timestamp)
542 .bind(response_data.status.as_u16() as i32)
543 .bind(headers_json)
544 .bind(body_json)
545 .bind(parsed)
546 .bind(duration_to_first_byte_ms)
547 .bind(duration_ms)
548 .execute(self.pool.write())
549 .await;
550 let query_duration = query_start.elapsed();
551 histogram!("outlet_write_duration_seconds", "operation" => "response")
552 .record(query_duration.as_secs_f64());
553
554 match result {
555 Err(e) => {
556 counter!("outlet_write_errors_total", "operation" => "response").increment(1);
557 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
558 }
559 Ok(query_result) => {
560 if query_result.rows_affected() > 0 {
561 let processing_lag_ms = SystemTime::now()
562 .duration_since(response_data.timestamp)
563 .unwrap_or_default()
564 .as_millis();
565 if processing_lag_ms > 1000 {
566 warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
567 } else {
568 debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
569 }
570 } else {
571 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
572 }
573 }
574 }
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581 use bytes::Bytes;
582 use chrono::{DateTime, Utc};
583 use outlet::{RequestData, ResponseData};
584 use serde::{Deserialize, Serialize};
585 use serde_json::Value;
586 use sqlx::PgPool;
587 use std::collections::HashMap;
588 use std::time::{Duration, SystemTime};
589
590 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
591 struct TestRequest {
592 user_id: u64,
593 action: String,
594 }
595
596 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
597 struct TestResponse {
598 success: bool,
599 message: String,
600 }
601
602 fn create_test_request_data() -> RequestData {
603 let mut headers = HashMap::new();
604 headers.insert("content-type".to_string(), vec!["application/json".into()]);
605 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
606
607 let test_req = TestRequest {
608 user_id: 123,
609 action: "create_user".to_string(),
610 };
611 let body = serde_json::to_vec(&test_req).unwrap();
612
613 RequestData {
614 method: http::Method::POST,
615 uri: http::Uri::from_static("/api/users"),
616 headers,
617 body: Some(Bytes::from(body)),
618 timestamp: SystemTime::now(),
619 correlation_id: 0,
620 trace_id: None,
621 span_id: None,
622 }
623 }
624
625 fn create_test_response_data() -> ResponseData {
626 let mut headers = HashMap::new();
627 headers.insert("content-type".to_string(), vec!["application/json".into()]);
628
629 let test_res = TestResponse {
630 success: true,
631 message: "User created successfully".to_string(),
632 };
633 let body = serde_json::to_vec(&test_res).unwrap();
634
635 ResponseData {
636 status: http::StatusCode::CREATED,
637 headers,
638 body: Some(Bytes::from(body)),
639 timestamp: SystemTime::now(),
640 duration_to_first_byte: Duration::from_millis(100),
641 duration: Duration::from_millis(150),
642 correlation_id: 0,
643 }
644 }
645
646 #[sqlx::test]
647 async fn test_handler_creation(pool: PgPool) {
648 crate::migrator().run(&pool).await.unwrap();
650
651 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
652 .await
653 .unwrap();
654
655 let repository = handler.repository();
657
658 let filter = RequestFilter::default();
660 let results = repository.query(filter).await.unwrap();
661 assert!(results.is_empty());
662 }
663
664 #[sqlx::test]
665 async fn test_handle_request_with_typed_body(pool: PgPool) {
666 crate::migrator().run(&pool).await.unwrap();
668
669 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
670 .await
671 .unwrap();
672 let repository = handler.repository();
673
674 let mut request_data = create_test_request_data();
675 let correlation_id = 12345;
676 request_data.correlation_id = correlation_id;
677
678 handler.handle_request(request_data.clone()).await;
680
681 let filter = RequestFilter {
683 correlation_id: Some(correlation_id as i64),
684 ..Default::default()
685 };
686 let results = repository.query(filter).await.unwrap();
687
688 assert_eq!(results.len(), 1);
689 let pair = &results[0];
690
691 assert_eq!(pair.request.correlation_id, correlation_id as i64);
692 assert_eq!(pair.request.method, "POST");
693 assert_eq!(pair.request.uri, "/api/users");
694
695 match &pair.request.body {
697 Some(Ok(parsed_body)) => {
698 assert_eq!(
699 *parsed_body,
700 TestRequest {
701 user_id: 123,
702 action: "create_user".to_string(),
703 }
704 );
705 }
706 _ => panic!("Expected successfully parsed request body"),
707 }
708
709 let headers_value = &pair.request.headers;
711 assert!(headers_value.get("content-type").is_some());
712 assert!(headers_value.get("user-agent").is_some());
713
714 assert!(pair.response.is_none());
716 }
717
718 #[sqlx::test]
719 async fn test_handle_response_with_typed_body(pool: PgPool) {
720 crate::migrator().run(&pool).await.unwrap();
722
723 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
724 .await
725 .unwrap();
726 let repository = handler.repository();
727
728 let mut request_data = create_test_request_data();
729 let mut response_data = create_test_response_data();
730 let correlation_id = 54321;
731 request_data.correlation_id = correlation_id;
732 response_data.correlation_id = correlation_id;
733
734 handler.handle_request(request_data.clone()).await;
736 handler
737 .handle_response(request_data, response_data.clone())
738 .await;
739
740 let filter = RequestFilter {
742 correlation_id: Some(correlation_id as i64),
743 ..Default::default()
744 };
745 let results = repository.query(filter).await.unwrap();
746
747 assert_eq!(results.len(), 1);
748 let pair = &results[0];
749
750 let response = pair.response.as_ref().expect("Response should be present");
752 assert_eq!(response.correlation_id, correlation_id as i64);
753 assert_eq!(response.status_code, 201);
754 assert_eq!(response.duration_ms, 150);
755
756 match &response.body {
758 Some(Ok(parsed_body)) => {
759 assert_eq!(
760 *parsed_body,
761 TestResponse {
762 success: true,
763 message: "User created successfully".to_string(),
764 }
765 );
766 }
767 _ => panic!("Expected successfully parsed response body"),
768 }
769 }
770
771 #[sqlx::test]
772 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
773 crate::migrator().run(&pool).await.unwrap();
775
776 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
777 .await
778 .unwrap();
779 let repository = handler.repository();
780
781 let mut headers = HashMap::new();
783 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
784
785 let invalid_json_body = b"not valid json for TestRequest";
786 let correlation_id = 99999;
787 let request_data = RequestData {
788 method: http::Method::POST,
789 uri: http::Uri::from_static("/api/test"),
790 headers,
791 body: Some(Bytes::from(invalid_json_body.to_vec())),
792 timestamp: SystemTime::now(),
793 correlation_id,
794 trace_id: None,
795 span_id: None,
796 };
797
798 handler.handle_request(request_data).await;
799
800 let filter = RequestFilter {
802 correlation_id: Some(correlation_id as i64),
803 ..Default::default()
804 };
805 let results = repository.query(filter).await.unwrap();
806
807 assert_eq!(results.len(), 1);
808 let pair = &results[0];
809
810 match &pair.request.body {
812 Some(Err(raw_bytes)) => {
813 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
814 }
815 _ => panic!("Expected raw bytes fallback for unparseable body"),
816 }
817 }
818
819 #[sqlx::test]
820 async fn test_query_with_multiple_filters(pool: PgPool) {
821 crate::migrator().run(&pool).await.unwrap();
823
824 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
825 .await
826 .unwrap();
827 let repository = handler.repository();
828
829 let test_cases = vec![
831 (1001, "GET", "/api/users", 200, 100),
832 (1002, "POST", "/api/users", 201, 150),
833 (1003, "GET", "/api/orders", 404, 50),
834 (1004, "PUT", "/api/users/123", 200, 300),
835 ];
836
837 for (correlation_id, method, uri, status, duration_ms) in test_cases {
838 let mut headers = HashMap::new();
839 headers.insert("content-type".to_string(), vec!["application/json".into()]);
840
841 let request_data = RequestData {
842 method: method.parse().unwrap(),
843 uri: uri.parse().unwrap(),
844 headers: headers.clone(),
845 body: Some(Bytes::from(b"{}".to_vec())),
846 timestamp: SystemTime::now(),
847 correlation_id,
848 trace_id: None,
849 span_id: None,
850 };
851
852 let response_data = ResponseData {
853 correlation_id,
854 status: http::StatusCode::from_u16(status).unwrap(),
855 headers,
856 body: Some(Bytes::from(b"{}".to_vec())),
857 timestamp: SystemTime::now(),
858 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
859 duration: Duration::from_millis(duration_ms),
860 };
861
862 handler.handle_request(request_data.clone()).await;
863 handler.handle_response(request_data, response_data).await;
864 }
865
866 let filter = RequestFilter {
868 method: Some("GET".to_string()),
869 ..Default::default()
870 };
871 let results = repository.query(filter).await.unwrap();
872 assert_eq!(results.len(), 2); let filter = RequestFilter {
876 status_code: Some(200),
877 ..Default::default()
878 };
879 let results = repository.query(filter).await.unwrap();
880 assert_eq!(results.len(), 2); let filter = RequestFilter {
884 uri_pattern: Some("/api/users%".to_string()),
885 ..Default::default()
886 };
887 let results = repository.query(filter).await.unwrap();
888 assert_eq!(results.len(), 3); let filter = RequestFilter {
892 min_duration_ms: Some(100),
893 max_duration_ms: Some(200),
894 ..Default::default()
895 };
896 let results = repository.query(filter).await.unwrap();
897 assert_eq!(results.len(), 2); let filter = RequestFilter {
901 method: Some("GET".to_string()),
902 status_code: Some(200),
903 ..Default::default()
904 };
905 let results = repository.query(filter).await.unwrap();
906 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
908 }
909
910 #[sqlx::test]
911 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
912 crate::migrator().run(&pool).await.unwrap();
914
915 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
916 .await
917 .unwrap();
918 let repository = handler.repository();
919
920 let now = SystemTime::now();
922 for i in 0..5 {
923 let correlation_id = 2000 + i;
924 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
927 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
928
929 let request_data = RequestData {
930 method: http::Method::GET,
931 uri: "/api/test".parse().unwrap(),
932 headers,
933 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
934 timestamp,
935 correlation_id,
936 trace_id: None,
937 span_id: None,
938 };
939
940 handler.handle_request(request_data).await;
941 }
942
943 let filter = RequestFilter {
945 limit: Some(3),
946 ..Default::default()
947 };
948 let results = repository.query(filter).await.unwrap();
949 assert_eq!(results.len(), 3);
950
951 for i in 0..2 {
953 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
954 }
955
956 let filter = RequestFilter {
958 order_by_timestamp_desc: true,
959 limit: Some(2),
960 offset: Some(1),
961 ..Default::default()
962 };
963 let results = repository.query(filter).await.unwrap();
964 assert_eq!(results.len(), 2);
965
966 assert!(results[0].request.timestamp >= results[1].request.timestamp);
968 }
969
970 #[sqlx::test]
971 async fn test_headers_conversion(pool: PgPool) {
972 crate::migrator().run(&pool).await.unwrap();
974
975 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
976 .await
977 .unwrap();
978 let repository = handler.repository();
979
980 let mut headers = HashMap::new();
982 headers.insert("single-value".to_string(), vec!["test".into()]);
983 headers.insert(
984 "multi-value".to_string(),
985 vec!["val1".into(), "val2".into()],
986 );
987 headers.insert("empty-value".to_string(), vec!["".into()]);
988
989 let request_data = RequestData {
990 correlation_id: 3000,
991 method: http::Method::GET,
992 uri: "/test".parse().unwrap(),
993 headers,
994 body: None,
995 timestamp: SystemTime::now(),
996 trace_id: None,
997 span_id: None,
998 };
999
1000 let correlation_id = 3000;
1001 handler.handle_request(request_data).await;
1002
1003 let filter = RequestFilter {
1004 correlation_id: Some(correlation_id as i64),
1005 ..Default::default()
1006 };
1007 let results = repository.query(filter).await.unwrap();
1008
1009 assert_eq!(results.len(), 1);
1010 let headers_json = &results[0].request.headers;
1011
1012 assert_eq!(
1014 headers_json["single-value"],
1015 Value::String("test".to_string())
1016 );
1017
1018 match &headers_json["multi-value"] {
1020 Value::Array(arr) => {
1021 assert_eq!(arr.len(), 2);
1022 assert_eq!(arr[0], Value::String("val1".to_string()));
1023 assert_eq!(arr[1], Value::String("val2".to_string()));
1024 }
1025 _ => panic!("Expected array for multi-value header"),
1026 }
1027
1028 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1030 }
1031
1032 #[sqlx::test]
1033 async fn test_timestamp_filtering(pool: PgPool) {
1034 crate::migrator().run(&pool).await.unwrap();
1036
1037 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1038 .await
1039 .unwrap();
1040 let repository = handler.repository();
1041
1042 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
1046 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
1050
1051 for (i, timestamp) in times.iter().enumerate() {
1052 let correlation_id = 4001 + i as u64;
1053 let request_data = RequestData {
1054 method: http::Method::GET,
1055 uri: "/test".parse().unwrap(),
1056 headers: HashMap::new(),
1057 body: None,
1058 timestamp: *timestamp,
1059 correlation_id,
1060 trace_id: None,
1061 span_id: None,
1062 };
1063
1064 handler.handle_request(request_data).await;
1065 }
1066
1067 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
1070 timestamp_after: Some(after_time),
1071 ..Default::default()
1072 };
1073 let results = repository.query(filter).await.unwrap();
1074 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
1079 timestamp_before: Some(before_time),
1080 ..Default::default()
1081 };
1082 let results = repository.query(filter).await.unwrap();
1083 assert_eq!(results.len(), 2); let filter = RequestFilter {
1087 timestamp_after: Some(after_time),
1088 timestamp_before: Some(before_time),
1089 ..Default::default()
1090 };
1091 let results = repository.query(filter).await.unwrap();
1092 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1094 }
1095
1096 #[sqlx::test]
1101 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1102 crate::migrator().run(&pool).await.unwrap();
1104
1105 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1107 .await
1108 .unwrap();
1109 let repository = handler.repository();
1110
1111 let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
1112 for (i, uri) in test_uris.iter().enumerate() {
1113 let correlation_id = 3000 + i as u64;
1114 let mut headers = HashMap::new();
1115 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1116
1117 let request_data = RequestData {
1118 method: http::Method::GET,
1119 uri: uri.parse().unwrap(),
1120 headers,
1121 body: Some(Bytes::from(b"{}".to_vec())),
1122 timestamp: SystemTime::now(),
1123 correlation_id,
1124 trace_id: None,
1125 span_id: None,
1126 };
1127
1128 handler.handle_request(request_data).await;
1129 }
1130
1131 let filter = RequestFilter::default();
1133 let results = repository.query(filter).await.unwrap();
1134 assert_eq!(results.len(), 4);
1135 }
1136
1137 #[sqlx::test]
1139 async fn test_write_operations_use_write_pool(pool: PgPool) {
1140 crate::migrator().run(&pool).await.unwrap();
1142
1143 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1145 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1146 .await
1147 .unwrap();
1148
1149 let mut request_data = create_test_request_data();
1150 let correlation_id = 5001;
1151 request_data.correlation_id = correlation_id;
1152
1153 handler.handle_request(request_data.clone()).await;
1155
1156 let count: i64 =
1158 sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
1159 .bind(correlation_id as i64)
1160 .fetch_one(test_pools.write())
1161 .await
1162 .unwrap();
1163
1164 assert_eq!(count, 1, "Request should be written to primary pool");
1165 }
1166
1167 #[sqlx::test]
1168 async fn test_response_write_uses_write_pool(pool: PgPool) {
1169 crate::migrator().run(&pool).await.unwrap();
1171
1172 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1173 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1174 .await
1175 .unwrap();
1176
1177 let mut request_data = create_test_request_data();
1178 let mut response_data = create_test_response_data();
1179 let correlation_id = 5002;
1180 request_data.correlation_id = correlation_id;
1181 response_data.correlation_id = correlation_id;
1182
1183 handler.handle_request(request_data.clone()).await;
1185
1186 handler.handle_response(request_data, response_data).await;
1188
1189 let count: i64 =
1191 sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
1192 .bind(correlation_id as i64)
1193 .fetch_one(test_pools.write())
1194 .await
1195 .unwrap();
1196
1197 assert_eq!(count, 1, "Response should be written to primary pool");
1198 }
1199
1200 #[sqlx::test]
1201 async fn test_repository_queries_use_read_pool(pool: PgPool) {
1202 crate::migrator().run(&pool).await.unwrap();
1204
1205 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1206 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1207 .await
1208 .unwrap();
1209
1210 let mut request_data = create_test_request_data();
1212 let correlation_id = 5003;
1213 request_data.correlation_id = correlation_id;
1214 handler.handle_request(request_data).await;
1215
1216 let repository = handler.repository();
1218 let filter = RequestFilter {
1219 correlation_id: Some(correlation_id as i64),
1220 ..Default::default()
1221 };
1222
1223 let results = repository.query(filter).await.unwrap();
1225 assert_eq!(results.len(), 1);
1226 assert_eq!(results[0].request.correlation_id, correlation_id as i64);
1227 }
1228
1229 #[sqlx::test]
1230 async fn test_replica_pool_rejects_writes(pool: PgPool) {
1231 crate::migrator().run(&pool).await.unwrap();
1233
1234 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1235
1236 let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
1238 .bind(Uuid::new_v4())
1239 .bind(9999i64)
1240 .bind(Utc::now())
1241 .bind("GET")
1242 .bind("/test")
1243 .bind(serde_json::json!({}))
1244 .bind(None::<Value>)
1245 .bind(false)
1246 .execute(test_pools.read())
1247 .await;
1248
1249 assert!(
1251 result.is_err(),
1252 "Replica pool should reject write operations"
1253 );
1254
1255 let err = result.unwrap_err();
1256 let err_msg = err.to_string().to_lowercase();
1257 assert!(
1258 err_msg.contains("read-only") || err_msg.contains("read only"),
1259 "Error should mention read-only: {}",
1260 err
1261 );
1262 }
1263
1264 #[sqlx::test]
1265 async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
1266 crate::migrator().run(&pool).await.unwrap();
1268
1269 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1270 let handler =
1271 PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
1272 .await
1273 .unwrap();
1274
1275 let mut request_data = create_test_request_data();
1276 let mut response_data = create_test_response_data();
1277 let correlation_id = 5004;
1278 request_data.correlation_id = correlation_id;
1279 response_data.correlation_id = correlation_id;
1280
1281 handler.handle_request(request_data.clone()).await;
1283 handler.handle_response(request_data, response_data).await;
1284
1285 let repository = handler.repository();
1287 let filter = RequestFilter {
1288 correlation_id: Some(correlation_id as i64),
1289 ..Default::default()
1290 };
1291
1292 let results = repository.query(filter).await.unwrap();
1293 assert_eq!(results.len(), 1);
1294
1295 let pair = &results[0];
1297 assert_eq!(pair.request.correlation_id, correlation_id as i64);
1298 assert_eq!(pair.request.method, "POST");
1299 assert_eq!(pair.request.uri, "/api/users");
1300
1301 let response = pair.response.as_ref().expect("Response should exist");
1303 assert_eq!(response.correlation_id, correlation_id as i64);
1304 assert_eq!(response.status_code, 201);
1305
1306 match &pair.request.body {
1308 Some(Ok(parsed_body)) => {
1309 assert_eq!(
1310 *parsed_body,
1311 TestRequest {
1312 user_id: 123,
1313 action: "create_user".to_string(),
1314 }
1315 );
1316 }
1317 _ => panic!("Expected successfully parsed request body"),
1318 }
1319
1320 match &response.body {
1321 Some(Ok(parsed_body)) => {
1322 assert_eq!(
1323 *parsed_body,
1324 TestResponse {
1325 success: true,
1326 message: "User created successfully".to_string(),
1327 }
1328 );
1329 }
1330 _ => panic!("Expected successfully parsed response body"),
1331 }
1332 }
1333}