1#[derive(Debug)]
52pub struct SerializationError {
53 pub fallback_data: String,
55 pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60 pub fn new(
62 fallback_data: String,
63 error: impl std::error::Error + Send + Sync + 'static,
64 ) -> Self {
65 Self {
66 fallback_data,
67 error: Box::new(error),
68 }
69 }
70}
71
72impl std::fmt::Display for SerializationError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "Serialization failed: {}", self.error)
75 }
76}
77
78impl std::error::Error for SerializationError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 Some(self.error.as_ref())
81 }
82}
83
84use chrono::{DateTime, Utc};
85use outlet::{RequestData, RequestHandler, ResponseData};
86use serde::{Deserialize, Serialize};
87use serde_json::Value;
88use sqlx::PgPool;
89use std::collections::HashMap;
90use std::sync::Arc;
91use std::time::{Instant, SystemTime};
92use tracing::{debug, error, instrument, warn};
93use uuid::Uuid;
94use metrics::{counter, histogram};
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100 HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103pub fn migrator() -> sqlx::migrate::Migrator {
127 sqlx::migrate!("./migrations")
128}
129
130type RequestSerializer<T> =
135 Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
136
137type ResponseSerializer<T> = Arc<
143 dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
144 + Send
145 + Sync,
146>;
147
148#[derive(Clone)]
157pub struct PostgresHandler<TReq = Value, TRes = Value>
158where
159 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
161{
162 pool: PgPool,
163 request_serializer: RequestSerializer<TReq>,
164 response_serializer: ResponseSerializer<TRes>,
165 instance_id: Uuid,
166}
167
168impl<TReq, TRes> PostgresHandler<TReq, TRes>
169where
170 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
171 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
172{
173 fn default_request_serializer() -> RequestSerializer<TReq> {
176 Arc::new(|request_data| {
177 let bytes = request_data.body.as_deref().unwrap_or(&[]);
178 serde_json::from_slice::<TReq>(bytes).map_err(|error| {
179 let fallback_data = String::from_utf8_lossy(bytes).to_string();
180 SerializationError::new(fallback_data, error)
181 })
182 })
183 }
184
185 fn default_response_serializer() -> ResponseSerializer<TRes> {
188 Arc::new(|_request_data, response_data| {
189 let bytes = response_data.body.as_deref().unwrap_or(&[]);
190 serde_json::from_slice::<TRes>(bytes).map_err(|error| {
191 let fallback_data = String::from_utf8_lossy(bytes).to_string();
192 SerializationError::new(fallback_data, error)
193 })
194 })
195 }
196
197 pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
231 let pool = PgPool::connect(database_url)
232 .await
233 .map_err(PostgresHandlerError::Connection)?;
234
235 Ok(Self {
236 pool,
237 request_serializer: Self::default_request_serializer(),
238 response_serializer: Self::default_response_serializer(),
239 instance_id: Uuid::new_v4(),
240 })
241 }
242
243 pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
255 where
256 F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
257 {
258 self.request_serializer = Arc::new(serializer);
259 self
260 }
261
262 pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
274 where
275 F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
276 + Send
277 + Sync
278 + 'static,
279 {
280 self.response_serializer = Arc::new(serializer);
281 self
282 }
283
284 pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
319 Ok(Self {
320 pool,
321 request_serializer: Self::default_request_serializer(),
322 response_serializer: Self::default_response_serializer(),
323 instance_id: Uuid::new_v4(),
324 })
325 }
326
327 fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
329 let mut header_map = HashMap::new();
330 for (name, values) in headers {
331 if values.len() == 1 {
332 let value_str = String::from_utf8_lossy(&values[0]).to_string();
333 header_map.insert(name.clone(), Value::String(value_str));
334 } else {
335 let value_array: Vec<Value> = values
336 .iter()
337 .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
338 .collect();
339 header_map.insert(name.clone(), Value::Array(value_array));
340 }
341 }
342 serde_json::to_value(header_map).unwrap_or(Value::Null)
343 }
344
345 fn request_body_to_json_with_fallback(
347 &self,
348 request_data: &outlet::RequestData,
349 ) -> (Value, bool) {
350 match (self.request_serializer)(request_data) {
351 Ok(typed_value) => {
352 if let Ok(json_value) = serde_json::to_value(&typed_value) {
353 (json_value, true)
354 } else {
355 (
357 Value::String(
358 serde_json::to_string(&typed_value)
359 .expect("Serialized value must be convertible to JSON string"),
360 ),
361 false,
362 )
363 }
364 }
365 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
366 }
367 }
368
369 fn response_body_to_json_with_fallback(
371 &self,
372 request_data: &outlet::RequestData,
373 response_data: &outlet::ResponseData,
374 ) -> (Value, bool) {
375 match (self.response_serializer)(request_data, response_data) {
376 Ok(typed_value) => {
377 if let Ok(json_value) = serde_json::to_value(&typed_value) {
378 (json_value, true)
379 } else {
380 (
382 Value::String(
383 serde_json::to_string(&typed_value)
384 .expect("Serialized value must be convertible to JSON string"),
385 ),
386 false,
387 )
388 }
389 }
390 Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
391 }
392 }
393
394 pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
399 crate::repository::RequestRepository::new(self.pool.clone())
400 }
401}
402
403impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
404where
405 TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
406 TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
407{
408 #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
409 async fn handle_request(&self, data: RequestData) {
410 let headers_json = Self::headers_to_json(&data.headers);
411 let (body_json, parsed) = if data.body.is_some() {
412 let (json, parsed) = self.request_body_to_json_with_fallback(&data);
413 (Some(json), parsed)
414 } else {
415 (None, false)
416 };
417
418 let timestamp: DateTime<Utc> = data.timestamp.into();
419
420 let query_start = Instant::now();
421 let result = sqlx::query(
422 r#"
423 INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
424 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
425 "#,
426 )
427 .bind(self.instance_id)
428 .bind(data.correlation_id as i64)
429 .bind(timestamp)
430 .bind(data.method.to_string())
431 .bind(data.uri.to_string())
432 .bind(headers_json)
433 .bind(body_json)
434 .bind(parsed)
435 .execute(&self.pool)
436 .await;
437 let query_duration = query_start.elapsed();
438 histogram!("outlet_write_duration_seconds", "operation" => "request").record(query_duration.as_secs_f64());
439
440 if let Err(e) = result {
441 counter!("outlet_write_errors_total", "operation" => "request").increment(1);
442 error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
443 } else {
444 let processing_lag_ms = SystemTime::now()
445 .duration_since(data.timestamp)
446 .unwrap_or_default()
447 .as_millis();
448 if processing_lag_ms > 1000 {
449 warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
450 } else {
451 debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
452 }
453 }
454 }
455
456 #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
457 async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
458 let headers_json = Self::headers_to_json(&response_data.headers);
459 let (body_json, parsed) = if response_data.body.is_some() {
460 let (json, parsed) =
461 self.response_body_to_json_with_fallback(&request_data, &response_data);
462 (Some(json), parsed)
463 } else {
464 (None, false)
465 };
466
467 let timestamp: DateTime<Utc> = response_data.timestamp.into();
468 let duration_ms = response_data.duration.as_millis() as i64;
469 let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
470
471 let query_start = Instant::now();
472 let result = sqlx::query(
473 r#"
474 INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
475 SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
476 WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
477 "#,
478 )
479 .bind(self.instance_id)
480 .bind(request_data.correlation_id as i64)
481 .bind(timestamp)
482 .bind(response_data.status.as_u16() as i32)
483 .bind(headers_json)
484 .bind(body_json)
485 .bind(parsed)
486 .bind(duration_to_first_byte_ms)
487 .bind(duration_ms)
488 .execute(&self.pool)
489 .await;
490 let query_duration = query_start.elapsed();
491 histogram!("outlet_write_duration_seconds", "operation" => "response").record(query_duration.as_secs_f64());
492
493 match result {
494 Err(e) => {
495 counter!("outlet_write_errors_total", "operation" => "response").increment(1);
496 error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
497 }
498 Ok(query_result) => {
499 if query_result.rows_affected() > 0 {
500 let processing_lag_ms = SystemTime::now()
501 .duration_since(response_data.timestamp)
502 .unwrap_or_default()
503 .as_millis();
504 if processing_lag_ms > 1000 {
505 warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
506 } else {
507 debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
508 }
509 } else {
510 debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
511 }
512 }
513 }
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use bytes::Bytes;
521 use chrono::{DateTime, Utc};
522 use outlet::{RequestData, ResponseData};
523 use serde::{Deserialize, Serialize};
524 use serde_json::Value;
525 use sqlx::PgPool;
526 use std::collections::HashMap;
527 use std::time::{Duration, SystemTime};
528
529 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
530 struct TestRequest {
531 user_id: u64,
532 action: String,
533 }
534
535 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
536 struct TestResponse {
537 success: bool,
538 message: String,
539 }
540
541 fn create_test_request_data() -> RequestData {
542 let mut headers = HashMap::new();
543 headers.insert("content-type".to_string(), vec!["application/json".into()]);
544 headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
545
546 let test_req = TestRequest {
547 user_id: 123,
548 action: "create_user".to_string(),
549 };
550 let body = serde_json::to_vec(&test_req).unwrap();
551
552 RequestData {
553 method: http::Method::POST,
554 uri: http::Uri::from_static("/api/users"),
555 headers,
556 body: Some(Bytes::from(body)),
557 timestamp: SystemTime::now(),
558 correlation_id: 0,
559 }
560 }
561
562 fn create_test_response_data() -> ResponseData {
563 let mut headers = HashMap::new();
564 headers.insert("content-type".to_string(), vec!["application/json".into()]);
565
566 let test_res = TestResponse {
567 success: true,
568 message: "User created successfully".to_string(),
569 };
570 let body = serde_json::to_vec(&test_res).unwrap();
571
572 ResponseData {
573 status: http::StatusCode::CREATED,
574 headers,
575 body: Some(Bytes::from(body)),
576 timestamp: SystemTime::now(),
577 duration_to_first_byte: Duration::from_millis(100),
578 duration: Duration::from_millis(150),
579 correlation_id: 0,
580 }
581 }
582
583 #[sqlx::test]
584 async fn test_handler_creation(pool: PgPool) {
585 crate::migrator().run(&pool).await.unwrap();
587
588 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
589 .await
590 .unwrap();
591
592 let repository = handler.repository();
594
595 let filter = RequestFilter::default();
597 let results = repository.query(filter).await.unwrap();
598 assert!(results.is_empty());
599 }
600
601 #[sqlx::test]
602 async fn test_handle_request_with_typed_body(pool: PgPool) {
603 crate::migrator().run(&pool).await.unwrap();
605
606 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
607 .await
608 .unwrap();
609 let repository = handler.repository();
610
611 let mut request_data = create_test_request_data();
612 let correlation_id = 12345;
613 request_data.correlation_id = correlation_id;
614
615 handler.handle_request(request_data.clone()).await;
617
618 let filter = RequestFilter {
620 correlation_id: Some(correlation_id as i64),
621 ..Default::default()
622 };
623 let results = repository.query(filter).await.unwrap();
624
625 assert_eq!(results.len(), 1);
626 let pair = &results[0];
627
628 assert_eq!(pair.request.correlation_id, correlation_id as i64);
629 assert_eq!(pair.request.method, "POST");
630 assert_eq!(pair.request.uri, "/api/users");
631
632 match &pair.request.body {
634 Some(Ok(parsed_body)) => {
635 assert_eq!(
636 *parsed_body,
637 TestRequest {
638 user_id: 123,
639 action: "create_user".to_string(),
640 }
641 );
642 }
643 _ => panic!("Expected successfully parsed request body"),
644 }
645
646 let headers_value = &pair.request.headers;
648 assert!(headers_value.get("content-type").is_some());
649 assert!(headers_value.get("user-agent").is_some());
650
651 assert!(pair.response.is_none());
653 }
654
655 #[sqlx::test]
656 async fn test_handle_response_with_typed_body(pool: PgPool) {
657 crate::migrator().run(&pool).await.unwrap();
659
660 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
661 .await
662 .unwrap();
663 let repository = handler.repository();
664
665 let mut request_data = create_test_request_data();
666 let mut response_data = create_test_response_data();
667 let correlation_id = 54321;
668 request_data.correlation_id = correlation_id;
669 response_data.correlation_id = correlation_id;
670
671 handler.handle_request(request_data.clone()).await;
673 handler
674 .handle_response(request_data, response_data.clone())
675 .await;
676
677 let filter = RequestFilter {
679 correlation_id: Some(correlation_id as i64),
680 ..Default::default()
681 };
682 let results = repository.query(filter).await.unwrap();
683
684 assert_eq!(results.len(), 1);
685 let pair = &results[0];
686
687 let response = pair.response.as_ref().expect("Response should be present");
689 assert_eq!(response.correlation_id, correlation_id as i64);
690 assert_eq!(response.status_code, 201);
691 assert_eq!(response.duration_ms, 150);
692
693 match &response.body {
695 Some(Ok(parsed_body)) => {
696 assert_eq!(
697 *parsed_body,
698 TestResponse {
699 success: true,
700 message: "User created successfully".to_string(),
701 }
702 );
703 }
704 _ => panic!("Expected successfully parsed response body"),
705 }
706 }
707
708 #[sqlx::test]
709 async fn test_handle_unparseable_body_fallback(pool: PgPool) {
710 crate::migrator().run(&pool).await.unwrap();
712
713 let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
714 .await
715 .unwrap();
716 let repository = handler.repository();
717
718 let mut headers = HashMap::new();
720 headers.insert("content-type".to_string(), vec!["text/plain".into()]);
721
722 let invalid_json_body = b"not valid json for TestRequest";
723 let correlation_id = 99999;
724 let request_data = RequestData {
725 method: http::Method::POST,
726 uri: http::Uri::from_static("/api/test"),
727 headers,
728 body: Some(Bytes::from(invalid_json_body.to_vec())),
729 timestamp: SystemTime::now(),
730 correlation_id,
731 };
732
733 handler.handle_request(request_data).await;
734
735 let filter = RequestFilter {
737 correlation_id: Some(correlation_id as i64),
738 ..Default::default()
739 };
740 let results = repository.query(filter).await.unwrap();
741
742 assert_eq!(results.len(), 1);
743 let pair = &results[0];
744
745 match &pair.request.body {
747 Some(Err(raw_bytes)) => {
748 assert_eq!(raw_bytes.as_ref(), invalid_json_body);
749 }
750 _ => panic!("Expected raw bytes fallback for unparseable body"),
751 }
752 }
753
754 #[sqlx::test]
755 async fn test_query_with_multiple_filters(pool: PgPool) {
756 crate::migrator().run(&pool).await.unwrap();
758
759 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
760 .await
761 .unwrap();
762 let repository = handler.repository();
763
764 let test_cases = vec![
766 (1001, "GET", "/api/users", 200, 100),
767 (1002, "POST", "/api/users", 201, 150),
768 (1003, "GET", "/api/orders", 404, 50),
769 (1004, "PUT", "/api/users/123", 200, 300),
770 ];
771
772 for (correlation_id, method, uri, status, duration_ms) in test_cases {
773 let mut headers = HashMap::new();
774 headers.insert("content-type".to_string(), vec!["application/json".into()]);
775
776 let request_data = RequestData {
777 method: method.parse().unwrap(),
778 uri: uri.parse().unwrap(),
779 headers: headers.clone(),
780 body: Some(Bytes::from(b"{}".to_vec())),
781 timestamp: SystemTime::now(),
782 correlation_id,
783 };
784
785 let response_data = ResponseData {
786 correlation_id,
787 status: http::StatusCode::from_u16(status).unwrap(),
788 headers,
789 body: Some(Bytes::from(b"{}".to_vec())),
790 timestamp: SystemTime::now(),
791 duration_to_first_byte: Duration::from_millis(duration_ms / 2),
792 duration: Duration::from_millis(duration_ms),
793 };
794
795 handler.handle_request(request_data.clone()).await;
796 handler.handle_response(request_data, response_data).await;
797 }
798
799 let filter = RequestFilter {
801 method: Some("GET".to_string()),
802 ..Default::default()
803 };
804 let results = repository.query(filter).await.unwrap();
805 assert_eq!(results.len(), 2); let filter = RequestFilter {
809 status_code: Some(200),
810 ..Default::default()
811 };
812 let results = repository.query(filter).await.unwrap();
813 assert_eq!(results.len(), 2); let filter = RequestFilter {
817 uri_pattern: Some("/api/users%".to_string()),
818 ..Default::default()
819 };
820 let results = repository.query(filter).await.unwrap();
821 assert_eq!(results.len(), 3); let filter = RequestFilter {
825 min_duration_ms: Some(100),
826 max_duration_ms: Some(200),
827 ..Default::default()
828 };
829 let results = repository.query(filter).await.unwrap();
830 assert_eq!(results.len(), 2); let filter = RequestFilter {
834 method: Some("GET".to_string()),
835 status_code: Some(200),
836 ..Default::default()
837 };
838 let results = repository.query(filter).await.unwrap();
839 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
841 }
842
843 #[sqlx::test]
844 async fn test_query_with_pagination_and_ordering(pool: PgPool) {
845 crate::migrator().run(&pool).await.unwrap();
847
848 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
849 .await
850 .unwrap();
851 let repository = handler.repository();
852
853 let now = SystemTime::now();
855 for i in 0..5 {
856 let correlation_id = 2000 + i;
857 let timestamp = now + Duration::from_secs(i * 10); let mut headers = HashMap::new();
860 headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
861
862 let request_data = RequestData {
863 method: http::Method::GET,
864 uri: "/api/test".parse().unwrap(),
865 headers,
866 body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
867 timestamp,
868 correlation_id,
869 };
870
871 handler.handle_request(request_data).await;
872 }
873
874 let filter = RequestFilter {
876 limit: Some(3),
877 ..Default::default()
878 };
879 let results = repository.query(filter).await.unwrap();
880 assert_eq!(results.len(), 3);
881
882 for i in 0..2 {
884 assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
885 }
886
887 let filter = RequestFilter {
889 order_by_timestamp_desc: true,
890 limit: Some(2),
891 offset: Some(1),
892 ..Default::default()
893 };
894 let results = repository.query(filter).await.unwrap();
895 assert_eq!(results.len(), 2);
896
897 assert!(results[0].request.timestamp >= results[1].request.timestamp);
899 }
900
901 #[sqlx::test]
902 async fn test_headers_conversion(pool: PgPool) {
903 crate::migrator().run(&pool).await.unwrap();
905
906 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
907 .await
908 .unwrap();
909 let repository = handler.repository();
910
911 let mut headers = HashMap::new();
913 headers.insert("single-value".to_string(), vec!["test".into()]);
914 headers.insert(
915 "multi-value".to_string(),
916 vec!["val1".into(), "val2".into()],
917 );
918 headers.insert("empty-value".to_string(), vec!["".into()]);
919
920 let request_data = RequestData {
921 correlation_id: 3000,
922 method: http::Method::GET,
923 uri: "/test".parse().unwrap(),
924 headers,
925 body: None,
926 timestamp: SystemTime::now(),
927 };
928
929 let correlation_id = 3000;
930 handler.handle_request(request_data).await;
931
932 let filter = RequestFilter {
933 correlation_id: Some(correlation_id as i64),
934 ..Default::default()
935 };
936 let results = repository.query(filter).await.unwrap();
937
938 assert_eq!(results.len(), 1);
939 let headers_json = &results[0].request.headers;
940
941 assert_eq!(
943 headers_json["single-value"],
944 Value::String("test".to_string())
945 );
946
947 match &headers_json["multi-value"] {
949 Value::Array(arr) => {
950 assert_eq!(arr.len(), 2);
951 assert_eq!(arr[0], Value::String("val1".to_string()));
952 assert_eq!(arr[1], Value::String("val2".to_string()));
953 }
954 _ => panic!("Expected array for multi-value header"),
955 }
956
957 assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
959 }
960
961 #[sqlx::test]
962 async fn test_timestamp_filtering(pool: PgPool) {
963 crate::migrator().run(&pool).await.unwrap();
965
966 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
967 .await
968 .unwrap();
969 let repository = handler.repository();
970
971 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); let times = [
975 base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
979
980 for (i, timestamp) in times.iter().enumerate() {
981 let correlation_id = 4001 + i as u64;
982 let request_data = RequestData {
983 method: http::Method::GET,
984 uri: "/test".parse().unwrap(),
985 headers: HashMap::new(),
986 body: None,
987 timestamp: *timestamp,
988 correlation_id,
989 };
990
991 handler.handle_request(request_data).await;
992 }
993
994 let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
997 timestamp_after: Some(after_time),
998 ..Default::default()
999 };
1000 let results = repository.query(filter).await.unwrap();
1001 assert_eq!(results.len(), 2); let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
1006 timestamp_before: Some(before_time),
1007 ..Default::default()
1008 };
1009 let results = repository.query(filter).await.unwrap();
1010 assert_eq!(results.len(), 2); let filter = RequestFilter {
1014 timestamp_after: Some(after_time),
1015 timestamp_before: Some(before_time),
1016 ..Default::default()
1017 };
1018 let results = repository.query(filter).await.unwrap();
1019 assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
1021 }
1022
1023 #[sqlx::test]
1028 async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1029 crate::migrator().run(&pool).await.unwrap();
1031
1032 let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1034 .await
1035 .unwrap();
1036 let repository = handler.repository();
1037
1038 let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1039 for (i, uri) in test_uris.iter().enumerate() {
1040 let correlation_id = 3000 + i as u64;
1041 let mut headers = HashMap::new();
1042 headers.insert("content-type".to_string(), vec!["application/json".into()]);
1043
1044 let request_data = RequestData {
1045 method: http::Method::GET,
1046 uri: uri.parse().unwrap(),
1047 headers,
1048 body: Some(Bytes::from(b"{}".to_vec())),
1049 timestamp: SystemTime::now(),
1050 correlation_id,
1051 };
1052
1053 handler.handle_request(request_data).await;
1054 }
1055
1056 let filter = RequestFilter::default();
1058 let results = repository.query(filter).await.unwrap();
1059 assert_eq!(results.len(), 4);
1060 }
1061}