1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use metrics::{counter, histogram};
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Instant, SystemTime};
93use tracing::{debug, error, instrument, warn};
94use uuid::Uuid;
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
105
106pub fn migrator() -> sqlx::migrate::Migrator {
130 sqlx::migrate!("./migrations")
131}
132
133type RequestSerializer<T> =
138 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
139
140type ResponseSerializer<T> = Arc<
146 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
147 + Send
148 + Sync,
149>;
150
151#[derive(Clone)]
162pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
163where
164 P: PoolProvider,
165 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
166 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
167{
168 pool: P,
169 request_serializer: RequestSerializer<TReq>,
170 response_serializer: ResponseSerializer<TRes>,
171 instance_id: Uuid,
172}
173
174impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
175where
176 P: PoolProvider,
177 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
178 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
179{
180 fn default_request_serializer() -> RequestSerializer<TReq> {
183 Arc::new(|request_data| {
184 let bytes = request_data.body.as_deref().unwrap_or(&[]);
185 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
186 let fallback_data = String::from_utf8_lossy(bytes).to_string();
187 SerializationError::new(fallback_data, error)
188 })
189 })
190 }
191
192 fn default_response_serializer() -> ResponseSerializer<TRes> {
195 Arc::new(|_request_data, response_data| {
196 let bytes = response_data.body.as_deref().unwrap_or(&[]);
197 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
198 let fallback_data = String::from_utf8_lossy(bytes).to_string();
199 SerializationError::new(fallback_data, error)
200 })
201 })
202 }
203
204 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
216 where
217 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
218 {
219 self.request_serializer = Arc::new(serializer);
220 self
221 }
222
223 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
235 where
236 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
237 + Send
238 + Sync
239 + 'static,
240 {
241 self.response_serializer = Arc::new(serializer);
242 self
243 }
244
245 pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
285 Ok(Self {
286 pool: pool_provider,
287 request_serializer: Self::default_request_serializer(),
288 response_serializer: Self::default_response_serializer(),
289 instance_id: Uuid::new_v4(),
290 })
291 }
292
293 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
295 let mut header_map = HashMap::new();
296 for (name, values) in headers {
297 if values.len() == 1 {
298 let value_str = String::from_utf8_lossy(&values[0]).to_string();
299 header_map.insert(name.clone(), Value::String(value_str));
300 } else {
301 let value_array: Vec<Value> = values
302 .iter()
303 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
304 .collect();
305 header_map.insert(name.clone(), Value::Array(value_array));
306 }
307 }
308 serde_json::to_value(header_map).unwrap_or(Value::Null)
309 }
310
311 fn request_body_to_json_with_fallback(
313 &self,
314 request_data: &outlet::RequestData,
315 ) -> (Value, bool) {
316 match (self.request_serializer)(request_data) {
317 Ok(typed_value) => {
318 if let Ok(json_value) = serde_json::to_value(&typed_value) {
319 (json_value, true)
320 } else {
321 (
323 Value::String(
324 serde_json::to_string(&typed_value)
325 .expect("Serialized value must be convertible to JSON string"),
326 ),
327 false,
328 )
329 }
330 }
331 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
332 }
333 }
334
335 fn response_body_to_json_with_fallback(
337 &self,
338 request_data: &outlet::RequestData,
339 response_data: &outlet::ResponseData,
340 ) -> (Value, bool) {
341 match (self.response_serializer)(request_data, response_data) {
342 Ok(typed_value) => {
343 if let Ok(json_value) = serde_json::to_value(&typed_value) {
344 (json_value, true)
345 } else {
346 (
348 Value::String(
349 serde_json::to_string(&typed_value)
350 .expect("Serialized value must be convertible to JSON string"),
351 ),
352 false,
353 )
354 }
355 }
356 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
357 }
358 }
359
360 pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
365 crate::repository::RequestRepository::new(self.pool.clone())
366 }
367}
368
369impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
371where
372 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
373 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
374{
375 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
408 let pool = PgPool::connect(database_url)
409 .await
410 .map_err(PostgresHandlerError::Connection)?;
411
412 Ok(Self {
413 pool,
414 request_serializer: Self::default_request_serializer(),
415 response_serializer: Self::default_response_serializer(),
416 instance_id: Uuid::new_v4(),
417 })
418 }
419
420 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
455 Self::from_pool_provider(pool).await
456 }
457}
458
459impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
460where
461 P: PoolProvider,
462 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
463 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
464{
465 #[instrument(name = "outlet.handle_request", skip(self, data), fields(correlation_id = %data.correlation_id))]
466 async fn handle_request(&self, data: RequestData) {
467 let headers_json = Self::headers_to_json(&data.headers);
468 let (body_json, parsed) = if data.body.is_some() {
469 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
470 (Some(json), parsed)
471 } else {
472 (None, false)
473 };
474
475 let timestamp: DateTime<Utc> = data.timestamp.into();
476
477 let query_start = Instant::now();
478 let result = sqlx::query(
479 r#"
480 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
481 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
482 "#,
483 )
484 .bind(self.instance_id)
485 .bind(data.correlation_id as i64)
486 .bind(timestamp)
487 .bind(data.method.to_string())
488 .bind(data.uri.to_string())
489 .bind(headers_json)
490 .bind(body_json)
491 .bind(parsed)
492 .bind(&data.trace_id)
493 .bind(&data.span_id)
494 .execute(self.pool.write())
495 .await;
496 let query_duration = query_start.elapsed();
497 histogram!("outlet_write_duration_seconds", "operation" => "request")
498 .record(query_duration.as_secs_f64());
499
500 if let Err(e) = result {
501 counter!("outlet_write_errors_total", "operation" => "request").increment(1);
502 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
503 } else {
504 let processing_lag_ms = SystemTime::now()
505 .duration_since(data.timestamp)
506 .unwrap_or_default()
507 .as_millis();
508 if processing_lag_ms > 1000 {
509 warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
510 } else {
511 debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
512 }
513 }
514 }
515
516 #[instrument(name = "outlet.handle_response", skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
517 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
518 let headers_json = Self::headers_to_json(&response_data.headers);
519 let (body_json, parsed) = if response_data.body.is_some() {
520 let (json, parsed) =
521 self.response_body_to_json_with_fallback(&request_data, &response_data);
522 (Some(json), parsed)
523 } else {
524 (None, false)
525 };
526
527 let timestamp: DateTime<Utc> = response_data.timestamp.into();
528 let duration_ms = response_data.duration.as_millis() as i64;
529 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
530
531 let query_start = Instant::now();
532 let result = sqlx::query(
533 r#"
534 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
535 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
536 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
537 "#,
538 )
539 .bind(self.instance_id)
540 .bind(request_data.correlation_id as i64)
541 .bind(timestamp)
542 .bind(response_data.status.as_u16() as i32)
543 .bind(headers_json)
544 .bind(body_json)
545 .bind(parsed)
546 .bind(duration_to_first_byte_ms)
547 .bind(duration_ms)
548 .execute(self.pool.write())
549 .await;
550 let query_duration = query_start.elapsed();
551 histogram!("outlet_write_duration_seconds", "operation" => "response")
552 .record(query_duration.as_secs_f64());
553
554 match result {
555 Err(e) => {
556 counter!("outlet_write_errors_total", "operation" => "response").increment(1);
557 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
558 }
559 Ok(query_result) => {
560 if query_result.rows_affected() > 0 {
561 let processing_lag_ms = SystemTime::now()
562 .duration_since(response_data.timestamp)
563 .unwrap_or_default()
564 .as_millis();
565 if processing_lag_ms > 1000 {
566 warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
567 } else {
568 debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
569 }
570 } else {
571 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
572 }
573 }
574 }
575 }
576
577 #[instrument(name = "outlet.handle_request_batch", skip(self, batch), fields(batch_size = batch.len()))]
578 async fn handle_request_batch(&self, batch: &[RequestData]) {
579 if batch.is_empty() {
580 return;
581 }
582
583 let len = batch.len();
584 let mut instance_ids = Vec::with_capacity(len);
585 let mut correlation_ids = Vec::with_capacity(len);
586 let mut timestamps = Vec::with_capacity(len);
587 let mut methods = Vec::with_capacity(len);
588 let mut uris = Vec::with_capacity(len);
589 let mut headers_col: Vec<Value> = Vec::with_capacity(len);
590 let mut bodies: Vec<Option<Value>> = Vec::with_capacity(len);
591 let mut body_parsed_col = Vec::with_capacity(len);
592 let mut trace_ids: Vec<Option<String>> = Vec::with_capacity(len);
593 let mut span_ids: Vec<Option<String>> = Vec::with_capacity(len);
594
595 for data in batch {
596 instance_ids.push(self.instance_id);
597 correlation_ids.push(data.correlation_id as i64);
598 timestamps.push(DateTime::<Utc>::from(data.timestamp));
599 methods.push(data.method.to_string());
600 uris.push(data.uri.to_string());
601 headers_col.push(Self::headers_to_json(&data.headers));
602
603 let (body_json, parsed) = if data.body.is_some() {
604 let (json, parsed) = self.request_body_to_json_with_fallback(data);
605 (Some(json), parsed)
606 } else {
607 (None, false)
608 };
609 bodies.push(body_json);
610 body_parsed_col.push(parsed);
611 trace_ids.push(data.trace_id.clone());
612 span_ids.push(data.span_id.clone());
613 }
614
615 let query_start = Instant::now();
616 let result = sqlx::query(
617 r#"
618 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
619 SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::timestamptz[], $4::varchar[], $5::text[], $6::jsonb[], $7::jsonb[], $8::boolean[], $9::varchar[], $10::varchar[])
620 "#,
621 )
622 .bind(&instance_ids)
623 .bind(&correlation_ids)
624 .bind(×tamps)
625 .bind(&methods)
626 .bind(&uris)
627 .bind(&headers_col)
628 .bind(&bodies)
629 .bind(&body_parsed_col)
630 .bind(&trace_ids)
631 .bind(&span_ids)
632 .execute(self.pool.write())
633 .await;
634 let query_duration = query_start.elapsed();
635 histogram!("outlet_write_duration_seconds", "operation" => "request_batch")
636 .record(query_duration.as_secs_f64());
637
638 match result {
639 Ok(r) => {
640 debug!(
641 rows = r.rows_affected(),
642 duration_ms = query_duration.as_millis() as u64,
643 "Request batch inserted"
644 );
645 }
646 Err(e) => {
647 counter!("outlet_write_errors_total", "operation" => "request_batch").increment(1);
648 error!(batch_size = len, error = %e, "Failed to bulk insert request batch");
649 }
650 }
651 }
652
653 #[instrument(name = "outlet.handle_response_batch", skip(self, batch), fields(batch_size = batch.len()))]
654 async fn handle_response_batch(&self, batch: &[(RequestData, ResponseData)]) {
655 if batch.is_empty() {
656 return;
657 }
658
659 let len = batch.len();
660 let mut instance_ids = Vec::with_capacity(len);
661 let mut correlation_ids = Vec::with_capacity(len);
662 let mut timestamps = Vec::with_capacity(len);
663 let mut status_codes = Vec::with_capacity(len);
664 let mut headers_col: Vec<Value> = Vec::with_capacity(len);
665 let mut bodies: Vec<Option<Value>> = Vec::with_capacity(len);
666 let mut body_parsed_col = Vec::with_capacity(len);
667 let mut duration_to_first_byte_ms_col = Vec::with_capacity(len);
668 let mut duration_ms_col = Vec::with_capacity(len);
669
670 for (request_data, response_data) in batch {
671 instance_ids.push(self.instance_id);
672 correlation_ids.push(request_data.correlation_id as i64);
673 timestamps.push(DateTime::<Utc>::from(response_data.timestamp));
674 status_codes.push(response_data.status.as_u16() as i32);
675 headers_col.push(Self::headers_to_json(&response_data.headers));
676
677 let (body_json, parsed) = if response_data.body.is_some() {
678 let (json, parsed) =
679 self.response_body_to_json_with_fallback(request_data, response_data);
680 (Some(json), parsed)
681 } else {
682 (None, false)
683 };
684 bodies.push(body_json);
685 body_parsed_col.push(parsed);
686 duration_to_first_byte_ms_col
687 .push(response_data.duration_to_first_byte.as_millis() as i64);
688 duration_ms_col.push(response_data.duration.as_millis() as i64);
689 }
690
691 let query_start = Instant::now();
692 let result = sqlx::query(
693 r#"
694 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
695 SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::timestamptz[], $4::int[], $5::jsonb[], $6::jsonb[], $7::boolean[], $8::bigint[], $9::bigint[])
696 "#,
697 )
698 .bind(&instance_ids)
699 .bind(&correlation_ids)
700 .bind(×tamps)
701 .bind(&status_codes)
702 .bind(&headers_col)
703 .bind(&bodies)
704 .bind(&body_parsed_col)
705 .bind(&duration_to_first_byte_ms_col)
706 .bind(&duration_ms_col)
707 .execute(self.pool.write())
708 .await;
709 let query_duration = query_start.elapsed();
710 histogram!("outlet_write_duration_seconds", "operation" => "response_batch")
711 .record(query_duration.as_secs_f64());
712
713 match result {
714 Ok(r) => {
715 debug!(
716 rows = r.rows_affected(),
717 duration_ms = query_duration.as_millis() as u64,
718 "Response batch inserted"
719 );
720 }
721 Err(e) => {
722 counter!("outlet_write_errors_total", "operation" => "response_batch").increment(1);
723 error!(batch_size = len, error = %e, "Failed to bulk insert response batch");
724 }
725 }
726 }
727}
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732 use bytes::Bytes;
733 use chrono::{DateTime, Utc};
734 use outlet::{RequestData, ResponseData};
735 use serde::{Deserialize, Serialize};
736 use serde_json::Value;
737 use sqlx::PgPool;
738 use std::collections::HashMap;
739 use std::time::{Duration, SystemTime};
740
741 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
742 struct TestRequest {
743 user_id: u64,
744 action: String,
745 }
746
747 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
748 struct TestResponse {
749 success: bool,
750 message: String,
751 }
752
753 fn create_test_request_data() -> RequestData {
754 let mut headers = HashMap::new();
755 headers.insert("content-type".to_string(), vec!["application/json".into()]);
756 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
757
758 let test_req = TestRequest {
759 user_id: 123,
760 action: "create_user".to_string(),
761 };
762 let body = serde_json::to_vec(&test_req).unwrap();
763
764 RequestData {
765 method: http::Method::POST,
766 uri: http::Uri::from_static("/api/users"),
767 headers,
768 body: Some(Bytes::from(body)),
769 timestamp: SystemTime::now(),
770 correlation_id: 0,
771 trace_id: None,
772 span_id: None,
773 }
774 }
775
776 fn create_test_response_data() -> ResponseData {
777 let mut headers = HashMap::new();
778 headers.insert("content-type".to_string(), vec!["application/json".into()]);
779
780 let test_res = TestResponse {
781 success: true,
782 message: "User created successfully".to_string(),
783 };
784 let body = serde_json::to_vec(&test_res).unwrap();
785
786 ResponseData {
787 status: http::StatusCode::CREATED,
788 headers,
789 body: Some(Bytes::from(body)),
790 timestamp: SystemTime::now(),
791 duration_to_first_byte: Duration::from_millis(100),
792 duration: Duration::from_millis(150),
793 correlation_id: 0,
794 }
795 }
796
797 #[sqlx::test]
798 async fn test_handler_creation(pool: PgPool) {
799 crate::migrator().run(&pool).await.unwrap();
801
802 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
803 .await
804 .unwrap();
805
806 let repository = handler.repository();
808
809 let filter = RequestFilter::default();
811 let results = repository.query(filter).await.unwrap();
812 assert!(results.is_empty());
813 }
814
815 #[sqlx::test]
816 async fn test_handle_request_with_typed_body(pool: PgPool) {
817 crate::migrator().run(&pool).await.unwrap();
819
820 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
821 .await
822 .unwrap();
823 let repository = handler.repository();
824
825 let mut request_data = create_test_request_data();
826 let correlation_id = 12345;
827 request_data.correlation_id = correlation_id;
828
829 handler.handle_request(request_data.clone()).await;
831
832 let filter = RequestFilter {
834 correlation_id: Some(correlation_id as i64),
835 ..Default::default()
836 };
837 let results = repository.query(filter).await.unwrap();
838
839 assert_eq!(results.len(), 1);
840 let pair = &results[0];
841
842 assert_eq!(pair.request.correlation_id, correlation_id as i64);
843 assert_eq!(pair.request.method, "POST");
844 assert_eq!(pair.request.uri, "/api/users");
845
846 match &pair.request.body {
848 Some(Ok(parsed_body)) => {
849 assert_eq!(
850 *parsed_body,
851 TestRequest {
852 user_id: 123,
853 action: "create_user".to_string(),
854 }
855 );
856 }
857 _ => panic!("Expected successfully parsed request body"),
858 }
859
860 let headers_value = &pair.request.headers;
862 assert!(headers_value.get("content-type").is_some());
863 assert!(headers_value.get("user-agent").is_some());
864
865 assert!(pair.response.is_none());
867 }
868
869 #[sqlx::test]
870 async fn test_handle_response_with_typed_body(pool: PgPool) {
871 crate::migrator().run(&pool).await.unwrap();
873
874 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
875 .await
876 .unwrap();
877 let repository = handler.repository();
878
879 let mut request_data = create_test_request_data();
880 let mut response_data = create_test_response_data();
881 let correlation_id = 54321;
882 request_data.correlation_id = correlation_id;
883 response_data.correlation_id = correlation_id;
884
885 handler.handle_request(request_data.clone()).await;
887 handler
888 .handle_response(request_data, response_data.clone())
889 .await;
890
891 let filter = RequestFilter {
893 correlation_id: Some(correlation_id as i64),
894 ..Default::default()
895 };
896 let results = repository.query(filter).await.unwrap();
897
898 assert_eq!(results.len(), 1);
899 let pair = &results[0];
900
901 let response = pair.response.as_ref().expect("Response should be present");
903 assert_eq!(response.correlation_id, correlation_id as i64);
904 assert_eq!(response.status_code, 201);
905 assert_eq!(response.duration_ms, 150);
906
907 match &response.body {
909 Some(Ok(parsed_body)) => {
910 assert_eq!(
911 *parsed_body,
912 TestResponse {
913 success: true,
914 message: "User created successfully".to_string(),
915 }
916 );
917 }
918 _ => panic!("Expected successfully parsed response body"),
919 }
920 }
921
922 #[sqlx::test]
923 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
924 crate::migrator().run(&pool).await.unwrap();
926
927 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
928 .await
929 .unwrap();
930 let repository = handler.repository();
931
932 let mut headers = HashMap::new();
934 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
935
936 let invalid_json_body = b"not valid json for TestRequest";
937 let correlation_id = 99999;
938 let request_data = RequestData {
939 method: http::Method::POST,
940 uri: http::Uri::from_static("/api/test"),
941 headers,
942 body: Some(Bytes::from(invalid_json_body.to_vec())),
943 timestamp: SystemTime::now(),
944 correlation_id,
945 trace_id: None,
946 span_id: None,
947 };
948
949 handler.handle_request(request_data).await;
950
951 let filter = RequestFilter {
953 correlation_id: Some(correlation_id as i64),
954 ..Default::default()
955 };
956 let results = repository.query(filter).await.unwrap();
957
958 assert_eq!(results.len(), 1);
959 let pair = &results[0];
960
961 match &pair.request.body {
963 Some(Err(raw_bytes)) => {
964 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
965 }
966 _ => panic!("Expected raw bytes fallback for unparseable body"),
967 }
968 }
969
970 #[sqlx::test]
971 async fn test_query_with_multiple_filters(pool: PgPool) {
972 crate::migrator().run(&pool).await.unwrap();
974
975 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
976 .await
977 .unwrap();
978 let repository = handler.repository();
979
980 let test_cases = vec![
982 (1001, "GET", "/api/users", 200, 100),
983 (1002, "POST", "/api/users", 201, 150),
984 (1003, "GET", "/api/orders", 404, 50),
985 (1004, "PUT", "/api/users/123", 200, 300),
986 ];
987
988 for (correlation_id, method, uri, status, duration_ms) in test_cases {
989 let mut headers = HashMap::new();
990 headers.insert("content-type".to_string(), vec!["application/json".into()]);
991
992 let request_data = RequestData {
993 method: method.parse().unwrap(),
994 uri: uri.parse().unwrap(),
995 headers: headers.clone(),
996 body: Some(Bytes::from(b"{}".to_vec())),
997 timestamp: SystemTime::now(),
998 correlation_id,
999 trace_id: None,
1000 span_id: None,
1001 };
1002
1003 let response_data = ResponseData {
1004 correlation_id,
1005 status: http::StatusCode::from_u16(status).unwrap(),
1006 headers,
1007 body: Some(Bytes::from(b"{}".to_vec())),
1008 timestamp: SystemTime::now(),
1009 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
1010 duration: Duration::from_millis(duration_ms),
1011 };
1012
1013 handler.handle_request(request_data.clone()).await;
1014 handler.handle_response(request_data, response_data).await;
1015 }
1016
1017 let filter = RequestFilter {
1019 method: Some("GET".to_string()),
1020 ..Default::default()
1021 };
1022 let results = repository.query(filter).await.unwrap();
1023 assert_eq!(results.len(), 2); let filter = RequestFilter {
1027 status_code: Some(200),
1028 ..Default::default()
1029 };
1030 let results = repository.query(filter).await.unwrap();
1031 assert_eq!(results.len(), 2); let filter = RequestFilter {
1035 uri_pattern: Some("/api/users%".to_string()),
1036 ..Default::default()
1037 };
1038 let results = repository.query(filter).await.unwrap();
1039 assert_eq!(results.len(), 3); let filter = RequestFilter {
1043 min_duration_ms: Some(100),
1044 max_duration_ms: Some(200),
1045 ..Default::default()
1046 };
1047 let results = repository.query(filter).await.unwrap();
1048 assert_eq!(results.len(), 2); let filter = RequestFilter {
1052 method: Some("GET".to_string()),
1053 status_code: Some(200),
1054 ..Default::default()
1055 };
1056 let results = repository.query(filter).await.unwrap();
1057 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
1059 }
1060
1061 #[sqlx::test]
1062 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
1063 crate::migrator().run(&pool).await.unwrap();
1065
1066 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1067 .await
1068 .unwrap();
1069 let repository = handler.repository();
1070
1071 let now = SystemTime::now();
1073 for i in 0..5 {
1074 let correlation_id = 2000 + i;
1075 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
1078 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
1079
1080 let request_data = RequestData {
1081 method: http::Method::GET,
1082 uri: "/api/test".parse().unwrap(),
1083 headers,
1084 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
1085 timestamp,
1086 correlation_id,
1087 trace_id: None,
1088 span_id: None,
1089 };
1090
1091 handler.handle_request(request_data).await;
1092 }
1093
1094 let filter = RequestFilter {
1096 limit: Some(3),
1097 ..Default::default()
1098 };
1099 let results = repository.query(filter).await.unwrap();
1100 assert_eq!(results.len(), 3);
1101
1102 for i in 0..2 {
1104 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
1105 }
1106
1107 let filter = RequestFilter {
1109 order_by_timestamp_desc: true,
1110 limit: Some(2),
1111 offset: Some(1),
1112 ..Default::default()
1113 };
1114 let results = repository.query(filter).await.unwrap();
1115 assert_eq!(results.len(), 2);
1116
1117 assert!(results[0].request.timestamp >= results[1].request.timestamp);
1119 }
1120
1121 #[sqlx::test]
1122 async fn test_headers_conversion(pool: PgPool) {
1123 crate::migrator().run(&pool).await.unwrap();
1125
1126 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1127 .await
1128 .unwrap();
1129 let repository = handler.repository();
1130
1131 let mut headers = HashMap::new();
1133 headers.insert("single-value".to_string(), vec!["test".into()]);
1134 headers.insert(
1135 "multi-value".to_string(),
1136 vec!["val1".into(), "val2".into()],
1137 );
1138 headers.insert("empty-value".to_string(), vec!["".into()]);
1139
1140 let request_data = RequestData {
1141 correlation_id: 3000,
1142 method: http::Method::GET,
1143 uri: "/test".parse().unwrap(),
1144 headers,
1145 body: None,
1146 timestamp: SystemTime::now(),
1147 trace_id: None,
1148 span_id: None,
1149 };
1150
1151 let correlation_id = 3000;
1152 handler.handle_request(request_data).await;
1153
1154 let filter = RequestFilter {
1155 correlation_id: Some(correlation_id as i64),
1156 ..Default::default()
1157 };
1158 let results = repository.query(filter).await.unwrap();
1159
1160 assert_eq!(results.len(), 1);
1161 let headers_json = &results[0].request.headers;
1162
1163 assert_eq!(
1165 headers_json["single-value"],
1166 Value::String("test".to_string())
1167 );
1168
1169 match &headers_json["multi-value"] {
1171 Value::Array(arr) => {
1172 assert_eq!(arr.len(), 2);
1173 assert_eq!(arr[0], Value::String("val1".to_string()));
1174 assert_eq!(arr[1], Value::String("val2".to_string()));
1175 }
1176 _ => panic!("Expected array for multi-value header"),
1177 }
1178
1179 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1181 }
1182
1183 #[sqlx::test]
1184 async fn test_timestamp_filtering(pool: PgPool) {
1185 crate::migrator().run(&pool).await.unwrap();
1187
1188 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1189 .await
1190 .unwrap();
1191 let repository = handler.repository();
1192
1193 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
1197 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
1201
1202 for (i, timestamp) in times.iter().enumerate() {
1203 let correlation_id = 4001 + i as u64;
1204 let request_data = RequestData {
1205 method: http::Method::GET,
1206 uri: "/test".parse().unwrap(),
1207 headers: HashMap::new(),
1208 body: None,
1209 timestamp: *timestamp,
1210 correlation_id,
1211 trace_id: None,
1212 span_id: None,
1213 };
1214
1215 handler.handle_request(request_data).await;
1216 }
1217
1218 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
1221 timestamp_after: Some(after_time),
1222 ..Default::default()
1223 };
1224 let results = repository.query(filter).await.unwrap();
1225 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
1230 timestamp_before: Some(before_time),
1231 ..Default::default()
1232 };
1233 let results = repository.query(filter).await.unwrap();
1234 assert_eq!(results.len(), 2); let filter = RequestFilter {
1238 timestamp_after: Some(after_time),
1239 timestamp_before: Some(before_time),
1240 ..Default::default()
1241 };
1242 let results = repository.query(filter).await.unwrap();
1243 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1245 }
1246
1247 #[sqlx::test]
1252 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1253 crate::migrator().run(&pool).await.unwrap();
1255
1256 let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1258 .await
1259 .unwrap();
1260 let repository = handler.repository();
1261
1262 let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
1263 for (i, uri) in test_uris.iter().enumerate() {
1264 let correlation_id = 3000 + i as u64;
1265 let mut headers = HashMap::new();
1266 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1267
1268 let request_data = RequestData {
1269 method: http::Method::GET,
1270 uri: uri.parse().unwrap(),
1271 headers,
1272 body: Some(Bytes::from(b"{}".to_vec())),
1273 timestamp: SystemTime::now(),
1274 correlation_id,
1275 trace_id: None,
1276 span_id: None,
1277 };
1278
1279 handler.handle_request(request_data).await;
1280 }
1281
1282 let filter = RequestFilter::default();
1284 let results = repository.query(filter).await.unwrap();
1285 assert_eq!(results.len(), 4);
1286 }
1287
1288 #[sqlx::test]
1290 async fn test_write_operations_use_write_pool(pool: PgPool) {
1291 crate::migrator().run(&pool).await.unwrap();
1293
1294 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1296 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1297 .await
1298 .unwrap();
1299
1300 let mut request_data = create_test_request_data();
1301 let correlation_id = 5001;
1302 request_data.correlation_id = correlation_id;
1303
1304 handler.handle_request(request_data.clone()).await;
1306
1307 let count: i64 =
1309 sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
1310 .bind(correlation_id as i64)
1311 .fetch_one(test_pools.write())
1312 .await
1313 .unwrap();
1314
1315 assert_eq!(count, 1, "Request should be written to primary pool");
1316 }
1317
1318 #[sqlx::test]
1319 async fn test_response_write_uses_write_pool(pool: PgPool) {
1320 crate::migrator().run(&pool).await.unwrap();
1322
1323 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1324 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1325 .await
1326 .unwrap();
1327
1328 let mut request_data = create_test_request_data();
1329 let mut response_data = create_test_response_data();
1330 let correlation_id = 5002;
1331 request_data.correlation_id = correlation_id;
1332 response_data.correlation_id = correlation_id;
1333
1334 handler.handle_request(request_data.clone()).await;
1336
1337 handler.handle_response(request_data, response_data).await;
1339
1340 let count: i64 =
1342 sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
1343 .bind(correlation_id as i64)
1344 .fetch_one(test_pools.write())
1345 .await
1346 .unwrap();
1347
1348 assert_eq!(count, 1, "Response should be written to primary pool");
1349 }
1350
1351 #[sqlx::test]
1352 async fn test_repository_queries_use_read_pool(pool: PgPool) {
1353 crate::migrator().run(&pool).await.unwrap();
1355
1356 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1357 let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1358 .await
1359 .unwrap();
1360
1361 let mut request_data = create_test_request_data();
1363 let correlation_id = 5003;
1364 request_data.correlation_id = correlation_id;
1365 handler.handle_request(request_data).await;
1366
1367 let repository = handler.repository();
1369 let filter = RequestFilter {
1370 correlation_id: Some(correlation_id as i64),
1371 ..Default::default()
1372 };
1373
1374 let results = repository.query(filter).await.unwrap();
1376 assert_eq!(results.len(), 1);
1377 assert_eq!(results[0].request.correlation_id, correlation_id as i64);
1378 }
1379
1380 #[sqlx::test]
1381 async fn test_replica_pool_rejects_writes(pool: PgPool) {
1382 crate::migrator().run(&pool).await.unwrap();
1384
1385 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1386
1387 let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
1389 .bind(Uuid::new_v4())
1390 .bind(9999i64)
1391 .bind(Utc::now())
1392 .bind("GET")
1393 .bind("/test")
1394 .bind(serde_json::json!({}))
1395 .bind(None::<Value>)
1396 .bind(false)
1397 .execute(test_pools.read())
1398 .await;
1399
1400 assert!(
1402 result.is_err(),
1403 "Replica pool should reject write operations"
1404 );
1405
1406 let err = result.unwrap_err();
1407 let err_msg = err.to_string().to_lowercase();
1408 assert!(
1409 err_msg.contains("read-only") || err_msg.contains("read only"),
1410 "Error should mention read-only: {}",
1411 err
1412 );
1413 }
1414
1415 #[sqlx::test]
1416 async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
1417 crate::migrator().run(&pool).await.unwrap();
1419
1420 let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1421 let handler =
1422 PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
1423 .await
1424 .unwrap();
1425
1426 let mut request_data = create_test_request_data();
1427 let mut response_data = create_test_response_data();
1428 let correlation_id = 5004;
1429 request_data.correlation_id = correlation_id;
1430 response_data.correlation_id = correlation_id;
1431
1432 handler.handle_request(request_data.clone()).await;
1434 handler.handle_response(request_data, response_data).await;
1435
1436 let repository = handler.repository();
1438 let filter = RequestFilter {
1439 correlation_id: Some(correlation_id as i64),
1440 ..Default::default()
1441 };
1442
1443 let results = repository.query(filter).await.unwrap();
1444 assert_eq!(results.len(), 1);
1445
1446 let pair = &results[0];
1448 assert_eq!(pair.request.correlation_id, correlation_id as i64);
1449 assert_eq!(pair.request.method, "POST");
1450 assert_eq!(pair.request.uri, "/api/users");
1451
1452 let response = pair.response.as_ref().expect("Response should exist");
1454 assert_eq!(response.correlation_id, correlation_id as i64);
1455 assert_eq!(response.status_code, 201);
1456
1457 match &pair.request.body {
1459 Some(Ok(parsed_body)) => {
1460 assert_eq!(
1461 *parsed_body,
1462 TestRequest {
1463 user_id: 123,
1464 action: "create_user".to_string(),
1465 }
1466 );
1467 }
1468 _ => panic!("Expected successfully parsed request body"),
1469 }
1470
1471 match &response.body {
1472 Some(Ok(parsed_body)) => {
1473 assert_eq!(
1474 *parsed_body,
1475 TestResponse {
1476 success: true,
1477 message: "User created successfully".to_string(),
1478 }
1479 );
1480 }
1481 _ => panic!("Expected successfully parsed response body"),
1482 }
1483 }
1484
1485 #[sqlx::test]
1490 async fn test_request_batch_insert(pool: PgPool) {
1491 crate::migrator().run(&pool).await.unwrap();
1492 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1493 .await
1494 .unwrap();
1495
1496 let mut batch = Vec::new();
1497 for i in 0..5 {
1498 let mut req = create_test_request_data();
1499 req.correlation_id = 1000 + i;
1500 req.uri = format!("/api/batch/{i}").parse().unwrap();
1501 batch.push(req);
1502 }
1503
1504 handler.handle_request_batch(&batch).await;
1505
1506 let count: (i64,) = sqlx::query_as(
1508 "SELECT COUNT(*) FROM http_requests WHERE correlation_id BETWEEN 1000 AND 1004",
1509 )
1510 .fetch_one(&pool)
1511 .await
1512 .unwrap();
1513 assert_eq!(count.0, 5);
1514 }
1515
1516 #[sqlx::test]
1517 async fn test_response_batch_insert(pool: PgPool) {
1518 crate::migrator().run(&pool).await.unwrap();
1519 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1520 .await
1521 .unwrap();
1522
1523 let mut pairs = Vec::new();
1525 for i in 0..3 {
1526 let mut req = create_test_request_data();
1527 req.correlation_id = 2000 + i;
1528 handler.handle_request(req.clone()).await;
1529
1530 let mut res = create_test_response_data();
1531 res.correlation_id = 2000 + i;
1532 pairs.push((req, res));
1533 }
1534
1535 handler.handle_response_batch(&pairs).await;
1536
1537 let count: (i64,) = sqlx::query_as(
1539 "SELECT COUNT(*) FROM http_responses WHERE correlation_id BETWEEN 2000 AND 2002",
1540 )
1541 .fetch_one(&pool)
1542 .await
1543 .unwrap();
1544 assert_eq!(count.0, 3);
1545 }
1546
1547 #[sqlx::test]
1548 async fn test_batch_with_mixed_bodies(pool: PgPool) {
1549 crate::migrator().run(&pool).await.unwrap();
1550 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1551 .await
1552 .unwrap();
1553
1554 let mut batch = Vec::new();
1555
1556 let mut req_with_body = create_test_request_data();
1558 req_with_body.correlation_id = 3000;
1559 batch.push(req_with_body);
1560
1561 let mut req_no_body = create_test_request_data();
1563 req_no_body.correlation_id = 3001;
1564 req_no_body.body = None;
1565 batch.push(req_no_body);
1566
1567 let mut req_bad_body = create_test_request_data();
1569 req_bad_body.correlation_id = 3002;
1570 req_bad_body.body = Some(Bytes::from("not valid json"));
1571 batch.push(req_bad_body);
1572
1573 handler.handle_request_batch(&batch).await;
1574
1575 let count: (i64,) = sqlx::query_as(
1577 "SELECT COUNT(*) FROM http_requests WHERE correlation_id BETWEEN 3000 AND 3002",
1578 )
1579 .fetch_one(&pool)
1580 .await
1581 .unwrap();
1582 assert_eq!(count.0, 3);
1583
1584 let rows: Vec<(i64, Option<bool>)> = sqlx::query_as(
1586 "SELECT correlation_id, body_parsed FROM http_requests WHERE correlation_id BETWEEN 3000 AND 3002 ORDER BY correlation_id",
1587 )
1588 .fetch_all(&pool)
1589 .await
1590 .unwrap();
1591
1592 assert_eq!(rows[0].1, Some(true)); assert_eq!(rows[1].1, Some(false)); assert_eq!(rows[2].1, Some(false)); }
1596
1597 #[sqlx::test]
1598 async fn test_empty_batch_is_noop(pool: PgPool) {
1599 crate::migrator().run(&pool).await.unwrap();
1600 let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1601 .await
1602 .unwrap();
1603
1604 handler.handle_request_batch(&[]).await;
1606 handler.handle_response_batch(&[]).await;
1607 }
1608
1609 #[sqlx::test]
1610 async fn test_batch_write_uses_write_pool(pool: PgPool) {
1611 use sqlx_pool_router::TestDbPools;
1612 crate::migrator().run(&pool).await.unwrap();
1613 let test_pools = TestDbPools::new(pool).await.unwrap();
1614 let handler =
1615 PostgresHandler::<TestDbPools, TestRequest, TestResponse>::from_pool_provider(
1616 test_pools,
1617 )
1618 .await
1619 .unwrap();
1620
1621 let mut req = create_test_request_data();
1622 req.correlation_id = 4000;
1623 handler.handle_request_batch(&[req.clone()]).await;
1624
1625 let res = create_test_response_data();
1626 handler.handle_response_batch(&[(req, res)]).await;
1627 }
1628}