outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use outlet::{RequestData, RequestHandler, ResponseData};
86use serde::{Deserialize, Serialize};
87use serde_json::Value;
88use sqlx::PgPool;
89use std::collections::HashMap;
90use std::sync::Arc;
91use tracing::{debug, error, instrument};
92use uuid::Uuid;
93
94pub mod error;
95pub mod repository;
96pub use error::PostgresHandlerError;
97pub use repository::{
98    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
99};
100
101/// Get the migrator for running outlet-postgres database migrations.
102///
103/// This returns a SQLx migrator that can be used to set up the required
104/// `http_requests` and `http_responses` tables. The consuming application
105/// is responsible for running these migrations at the appropriate time
106/// and in the appropriate database schema.
107///
108/// # Examples
109///
110/// ```rust,no_run
111/// use outlet_postgres::migrator;
112/// use sqlx::PgPool;
113///
114/// #[tokio::main]
115/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
116///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
117///     
118///     // Run outlet migrations
119///     migrator().run(&pool).await?;
120///     
121///     Ok(())
122/// }
123/// ```
124pub fn migrator() -> sqlx::migrate::Migrator {
125    sqlx::migrate!("./migrations")
126}
127
128/// Type alias for request body serializers.
129///
130/// Request serializers take full request context including headers and body bytes.
131/// On failure, they return a `SerializationError` with fallback data.
132type RequestSerializer<T> =
133    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
134
135/// Type alias for response body serializers.
136///
137/// Response serializers take both request and response context, allowing them to
138/// make parsing decisions based on request details and response headers (e.g., compression).
139/// On failure, they return a `SerializationError` with fallback data.
140type ResponseSerializer<T> = Arc<
141    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
142        + Send
143        + Sync,
144>;
145
146/// PostgreSQL handler for outlet middleware.
147///
148/// Implements the `RequestHandler` trait to log HTTP requests and responses
149/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
150///
151/// Generic over `TReq` and `TRes` which represent the request and response body types
152/// that should implement `Deserialize` for parsing and `Serialize` for database storage as JSONB.
153/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
154#[derive(Clone)]
155pub struct PostgresHandler<TReq = Value, TRes = Value>
156where
157    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
158    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159{
160    pool: PgPool,
161    request_serializer: RequestSerializer<TReq>,
162    response_serializer: ResponseSerializer<TRes>,
163    instance_id: Uuid,
164}
165
166impl<TReq, TRes> PostgresHandler<TReq, TRes>
167where
168    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
169    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
170{
171    /// Default serializer that attempts serde JSON deserialization.
172    /// On failure, returns a SerializationError with raw bytes as fallback data.
173    fn default_request_serializer() -> RequestSerializer<TReq> {
174        Arc::new(|request_data| {
175            let bytes = request_data.body.as_deref().unwrap_or(&[]);
176            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
177                let fallback_data = String::from_utf8_lossy(bytes).to_string();
178                SerializationError::new(fallback_data, error)
179            })
180        })
181    }
182
183    /// Default serializer that attempts serde JSON deserialization.
184    /// On failure, returns a SerializationError with raw bytes as fallback data.
185    fn default_response_serializer() -> ResponseSerializer<TRes> {
186        Arc::new(|_request_data, response_data| {
187            let bytes = response_data.body.as_deref().unwrap_or(&[]);
188            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
189                let fallback_data = String::from_utf8_lossy(bytes).to_string();
190                SerializationError::new(fallback_data, error)
191            })
192        })
193    }
194
195    /// Create a new PostgreSQL handler with a connection pool.
196    ///
197    /// This will connect to the database but will NOT run migrations.
198    /// Use `migrator()` to get a migrator and run migrations separately.
199    ///
200    /// # Arguments
201    ///
202    /// * `database_url` - PostgreSQL connection string
203    ///
204    /// # Examples
205    ///
206    /// ```rust,no_run
207    /// use outlet_postgres::{PostgresHandler, migrator};
208    /// use serde::{Deserialize, Serialize};
209    /// use serde_json::Value;
210    ///
211    /// #[derive(Deserialize, Serialize)]
212    /// struct MyBodyType {
213    ///     id: u64,
214    ///     name: String,
215    /// }
216    ///
217    /// #[tokio::main]
218    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
219    ///     // Run migrations first
220    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
221    ///     migrator().run(&pool).await?;
222    ///     
223    ///     // Create handler
224    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
225    ///     Ok(())
226    /// }
227    /// ```
228    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
229        let pool = PgPool::connect(database_url)
230            .await
231            .map_err(PostgresHandlerError::Connection)?;
232
233        Ok(Self {
234            pool,
235            request_serializer: Self::default_request_serializer(),
236            response_serializer: Self::default_response_serializer(),
237            instance_id: Uuid::new_v4(),
238        })
239    }
240
241    /// Add a custom request body serializer.
242    ///
243    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
244    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
245    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
246    ///
247    /// # Panics
248    ///
249    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
250    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
251    /// implementation of `TReq` and should be fixed by the caller.
252    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
253    where
254        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
255    {
256        self.request_serializer = Arc::new(serializer);
257        self
258    }
259
260    /// Add a custom response body serializer.
261    ///
262    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
263    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
264    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
265    ///
266    /// # Panics
267    ///
268    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
269    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
270    /// implementation of `TRes` and should be fixed by the caller.
271    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
272    where
273        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
274            + Send
275            + Sync
276            + 'static,
277    {
278        self.response_serializer = Arc::new(serializer);
279        self
280    }
281
282    /// Create a PostgreSQL handler from an existing connection pool.
283    ///
284    /// Use this if you already have a connection pool and want to reuse it.
285    /// This will NOT run migrations - use `migrator()` to run migrations separately.
286    ///
287    /// # Arguments
288    ///
289    /// * `pool` - Existing PostgreSQL connection pool
290    ///
291    /// # Examples
292    ///
293    /// ```rust,no_run
294    /// use outlet_postgres::{PostgresHandler, migrator};
295    /// use sqlx::PgPool;
296    /// use serde::{Deserialize, Serialize};
297    ///
298    /// #[derive(Deserialize, Serialize)]
299    /// struct MyBodyType {
300    ///     id: u64,
301    ///     name: String,
302    /// }
303    ///
304    /// #[tokio::main]
305    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
306    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
307    ///     
308    ///     // Run migrations first
309    ///     migrator().run(&pool).await?;
310    ///     
311    ///     // Create handler
312    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::from_pool(pool).await?;
313    ///     Ok(())
314    /// }
315    /// ```
316    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
317        Ok(Self {
318            pool,
319            request_serializer: Self::default_request_serializer(),
320            response_serializer: Self::default_response_serializer(),
321            instance_id: Uuid::new_v4(),
322        })
323    }
324
325    /// Convert headers to a JSONB-compatible format.
326    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
327        let mut header_map = HashMap::new();
328        for (name, values) in headers {
329            if values.len() == 1 {
330                let value_str = String::from_utf8_lossy(&values[0]).to_string();
331                header_map.insert(name.clone(), Value::String(value_str));
332            } else {
333                let value_array: Vec<Value> = values
334                    .iter()
335                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
336                    .collect();
337                header_map.insert(name.clone(), Value::Array(value_array));
338            }
339        }
340        serde_json::to_value(header_map).unwrap_or(Value::Null)
341    }
342
343    /// Convert request data to a JSONB value using the configured serializer.
344    fn request_body_to_json_with_fallback(
345        &self,
346        request_data: &outlet::RequestData,
347    ) -> (Value, bool) {
348        match (self.request_serializer)(request_data) {
349            Ok(typed_value) => {
350                if let Ok(json_value) = serde_json::to_value(&typed_value) {
351                    (json_value, true)
352                } else {
353                    // This should never happen if the type implements Serialize correctly
354                    (
355                        Value::String(
356                            serde_json::to_string(&typed_value)
357                                .expect("Serialized value must be convertible to JSON string"),
358                        ),
359                        false,
360                    )
361                }
362            }
363            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
364        }
365    }
366
367    /// Convert response data to a JSONB value using the configured serializer.
368    fn response_body_to_json_with_fallback(
369        &self,
370        request_data: &outlet::RequestData,
371        response_data: &outlet::ResponseData,
372    ) -> (Value, bool) {
373        match (self.response_serializer)(request_data, response_data) {
374            Ok(typed_value) => {
375                if let Ok(json_value) = serde_json::to_value(&typed_value) {
376                    (json_value, true)
377                } else {
378                    // This should never happen if the type implements Serialize correctly
379                    (
380                        Value::String(
381                            serde_json::to_string(&typed_value)
382                                .expect("Serialized value must be convertible to JSON string"),
383                        ),
384                        false,
385                    )
386                }
387            }
388            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
389        }
390    }
391
392    /// Get a repository for querying logged requests and responses.
393    ///
394    /// Returns a `RequestRepository` with the same type parameters as this handler,
395    /// allowing for type-safe querying of request and response bodies.
396    pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
397        crate::repository::RequestRepository::new(self.pool.clone())
398    }
399}
400
401impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
402where
403    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
404    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
405{
406    #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
407    async fn handle_request(&self, data: RequestData) {
408        let headers_json = Self::headers_to_json(&data.headers);
409        let (body_json, parsed) = if data.body.is_some() {
410            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
411            (Some(json), parsed)
412        } else {
413            (None, false)
414        };
415
416        let timestamp: DateTime<Utc> = data.timestamp.into();
417
418        let result = sqlx::query(
419            r#"
420            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
421            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
422            "#,
423        )
424        .bind(self.instance_id)
425        .bind(data.correlation_id as i64)
426        .bind(timestamp)
427        .bind(data.method.to_string())
428        .bind(data.uri.to_string())
429        .bind(headers_json)
430        .bind(body_json)
431        .bind(parsed)
432        .execute(&self.pool)
433        .await;
434
435        if let Err(e) = result {
436            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
437        } else {
438            debug!(correlation_id = %data.correlation_id, "Request data inserted successfully");
439        }
440    }
441
442    #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
443    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
444        let headers_json = Self::headers_to_json(&response_data.headers);
445        let (body_json, parsed) = if response_data.body.is_some() {
446            let (json, parsed) =
447                self.response_body_to_json_with_fallback(&request_data, &response_data);
448            (Some(json), parsed)
449        } else {
450            (None, false)
451        };
452
453        let timestamp: DateTime<Utc> = response_data.timestamp.into();
454        let duration_ms = response_data.duration.as_millis() as i64;
455        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
456
457        let result = sqlx::query(
458            r#"
459            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
460            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
461            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
462            "#,
463        )
464        .bind(self.instance_id)
465        .bind(request_data.correlation_id as i64)
466        .bind(timestamp)
467        .bind(response_data.status.as_u16() as i32)
468        .bind(headers_json)
469        .bind(body_json)
470        .bind(parsed)
471        .bind(duration_to_first_byte_ms)
472        .bind(duration_ms)
473        .execute(&self.pool)
474        .await;
475
476        match result {
477            Err(e) => {
478                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
479            }
480            Ok(query_result) => {
481                if query_result.rows_affected() > 0 {
482                    debug!(correlation_id = %request_data.correlation_id, "Response data inserted successfully");
483                } else {
484                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
485                }
486            }
487        }
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use bytes::Bytes;
495    use chrono::{DateTime, Utc};
496    use outlet::{RequestData, ResponseData};
497    use serde::{Deserialize, Serialize};
498    use serde_json::Value;
499    use sqlx::PgPool;
500    use std::collections::HashMap;
501    use std::time::{Duration, SystemTime};
502
503    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
504    struct TestRequest {
505        user_id: u64,
506        action: String,
507    }
508
509    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
510    struct TestResponse {
511        success: bool,
512        message: String,
513    }
514
515    fn create_test_request_data() -> RequestData {
516        let mut headers = HashMap::new();
517        headers.insert("content-type".to_string(), vec!["application/json".into()]);
518        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
519
520        let test_req = TestRequest {
521            user_id: 123,
522            action: "create_user".to_string(),
523        };
524        let body = serde_json::to_vec(&test_req).unwrap();
525
526        RequestData {
527            method: http::Method::POST,
528            uri: http::Uri::from_static("/api/users"),
529            headers,
530            body: Some(Bytes::from(body)),
531            timestamp: SystemTime::now(),
532            correlation_id: 0,
533        }
534    }
535
536    fn create_test_response_data() -> ResponseData {
537        let mut headers = HashMap::new();
538        headers.insert("content-type".to_string(), vec!["application/json".into()]);
539
540        let test_res = TestResponse {
541            success: true,
542            message: "User created successfully".to_string(),
543        };
544        let body = serde_json::to_vec(&test_res).unwrap();
545
546        ResponseData {
547            status: http::StatusCode::CREATED,
548            headers,
549            body: Some(Bytes::from(body)),
550            timestamp: SystemTime::now(),
551            duration_to_first_byte: Duration::from_millis(100),
552            duration: Duration::from_millis(150),
553            correlation_id: 0,
554        }
555    }
556
557    #[sqlx::test]
558    async fn test_handler_creation(pool: PgPool) {
559        // Run migrations first
560        crate::migrator().run(&pool).await.unwrap();
561
562        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
563            .await
564            .unwrap();
565
566        // Verify we can get a repository
567        let repository = handler.repository();
568
569        // Test initial state - no requests logged yet
570        let filter = RequestFilter::default();
571        let results = repository.query(filter).await.unwrap();
572        assert!(results.is_empty());
573    }
574
575    #[sqlx::test]
576    async fn test_handle_request_with_typed_body(pool: PgPool) {
577        // Run migrations first
578        crate::migrator().run(&pool).await.unwrap();
579
580        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
581            .await
582            .unwrap();
583        let repository = handler.repository();
584
585        let mut request_data = create_test_request_data();
586        let correlation_id = 12345;
587        request_data.correlation_id = correlation_id;
588
589        // Handle the request
590        handler.handle_request(request_data.clone()).await;
591
592        // Query back the request
593        let filter = RequestFilter {
594            correlation_id: Some(correlation_id as i64),
595            ..Default::default()
596        };
597        let results = repository.query(filter).await.unwrap();
598
599        assert_eq!(results.len(), 1);
600        let pair = &results[0];
601
602        assert_eq!(pair.request.correlation_id, correlation_id as i64);
603        assert_eq!(pair.request.method, "POST");
604        assert_eq!(pair.request.uri, "/api/users");
605
606        // Check that body was parsed successfully
607        match &pair.request.body {
608            Some(Ok(parsed_body)) => {
609                assert_eq!(
610                    *parsed_body,
611                    TestRequest {
612                        user_id: 123,
613                        action: "create_user".to_string(),
614                    }
615                );
616            }
617            _ => panic!("Expected successfully parsed request body"),
618        }
619
620        // Headers should be converted to JSON properly
621        let headers_value = &pair.request.headers;
622        assert!(headers_value.get("content-type").is_some());
623        assert!(headers_value.get("user-agent").is_some());
624
625        // No response yet
626        assert!(pair.response.is_none());
627    }
628
629    #[sqlx::test]
630    async fn test_handle_response_with_typed_body(pool: PgPool) {
631        // Run migrations first
632        crate::migrator().run(&pool).await.unwrap();
633
634        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
635            .await
636            .unwrap();
637        let repository = handler.repository();
638
639        let mut request_data = create_test_request_data();
640        let mut response_data = create_test_response_data();
641        let correlation_id = 54321;
642        request_data.correlation_id = correlation_id;
643        response_data.correlation_id = correlation_id;
644
645        // Handle both request and response
646        handler.handle_request(request_data.clone()).await;
647        handler
648            .handle_response(request_data, response_data.clone())
649            .await;
650
651        // Query back the complete pair
652        let filter = RequestFilter {
653            correlation_id: Some(correlation_id as i64),
654            ..Default::default()
655        };
656        let results = repository.query(filter).await.unwrap();
657
658        assert_eq!(results.len(), 1);
659        let pair = &results[0];
660
661        // Check response data
662        let response = pair.response.as_ref().expect("Response should be present");
663        assert_eq!(response.correlation_id, correlation_id as i64);
664        assert_eq!(response.status_code, 201);
665        assert_eq!(response.duration_ms, 150);
666
667        // Check that response body was parsed successfully
668        match &response.body {
669            Some(Ok(parsed_body)) => {
670                assert_eq!(
671                    *parsed_body,
672                    TestResponse {
673                        success: true,
674                        message: "User created successfully".to_string(),
675                    }
676                );
677            }
678            _ => panic!("Expected successfully parsed response body"),
679        }
680    }
681
682    #[sqlx::test]
683    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
684        // Run migrations first
685        crate::migrator().run(&pool).await.unwrap();
686
687        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
688            .await
689            .unwrap();
690        let repository = handler.repository();
691
692        // Create request with invalid JSON for TestRequest
693        let mut headers = HashMap::new();
694        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
695
696        let invalid_json_body = b"not valid json for TestRequest";
697        let correlation_id = 99999;
698        let request_data = RequestData {
699            method: http::Method::POST,
700            uri: http::Uri::from_static("/api/test"),
701            headers,
702            body: Some(Bytes::from(invalid_json_body.to_vec())),
703            timestamp: SystemTime::now(),
704            correlation_id,
705        };
706
707        handler.handle_request(request_data).await;
708
709        // Query back and verify fallback to base64
710        let filter = RequestFilter {
711            correlation_id: Some(correlation_id as i64),
712            ..Default::default()
713        };
714        let results = repository.query(filter).await.unwrap();
715
716        assert_eq!(results.len(), 1);
717        let pair = &results[0];
718
719        // Should fallback to raw bytes
720        match &pair.request.body {
721            Some(Err(raw_bytes)) => {
722                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
723            }
724            _ => panic!("Expected raw bytes fallback for unparseable body"),
725        }
726    }
727
728    #[sqlx::test]
729    async fn test_query_with_multiple_filters(pool: PgPool) {
730        // Run migrations first
731        crate::migrator().run(&pool).await.unwrap();
732
733        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
734            .await
735            .unwrap();
736        let repository = handler.repository();
737
738        // Insert multiple requests with different characteristics
739        let test_cases = vec![
740            (1001, "GET", "/api/users", 200, 100),
741            (1002, "POST", "/api/users", 201, 150),
742            (1003, "GET", "/api/orders", 404, 50),
743            (1004, "PUT", "/api/users/123", 200, 300),
744        ];
745
746        for (correlation_id, method, uri, status, duration_ms) in test_cases {
747            let mut headers = HashMap::new();
748            headers.insert("content-type".to_string(), vec!["application/json".into()]);
749
750            let request_data = RequestData {
751                method: method.parse().unwrap(),
752                uri: uri.parse().unwrap(),
753                headers: headers.clone(),
754                body: Some(Bytes::from(b"{}".to_vec())),
755                timestamp: SystemTime::now(),
756                correlation_id,
757            };
758
759            let response_data = ResponseData {
760                correlation_id,
761                status: http::StatusCode::from_u16(status).unwrap(),
762                headers,
763                body: Some(Bytes::from(b"{}".to_vec())),
764                timestamp: SystemTime::now(),
765                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
766                duration: Duration::from_millis(duration_ms),
767            };
768
769            handler.handle_request(request_data.clone()).await;
770            handler.handle_response(request_data, response_data).await;
771        }
772
773        // Test method filter
774        let filter = RequestFilter {
775            method: Some("GET".to_string()),
776            ..Default::default()
777        };
778        let results = repository.query(filter).await.unwrap();
779        assert_eq!(results.len(), 2); // 1001, 1003
780
781        // Test status code filter
782        let filter = RequestFilter {
783            status_code: Some(200),
784            ..Default::default()
785        };
786        let results = repository.query(filter).await.unwrap();
787        assert_eq!(results.len(), 2); // 1001, 1004
788
789        // Test URI pattern filter
790        let filter = RequestFilter {
791            uri_pattern: Some("/api/users%".to_string()),
792            ..Default::default()
793        };
794        let results = repository.query(filter).await.unwrap();
795        assert_eq!(results.len(), 3); // 1001, 1002, 1004
796
797        // Test duration range filter
798        let filter = RequestFilter {
799            min_duration_ms: Some(100),
800            max_duration_ms: Some(200),
801            ..Default::default()
802        };
803        let results = repository.query(filter).await.unwrap();
804        assert_eq!(results.len(), 2); // 1001, 1002
805
806        // Test combined filters
807        let filter = RequestFilter {
808            method: Some("GET".to_string()),
809            status_code: Some(200),
810            ..Default::default()
811        };
812        let results = repository.query(filter).await.unwrap();
813        assert_eq!(results.len(), 1); // Only 1001
814        assert_eq!(results[0].request.correlation_id, 1001);
815    }
816
817    #[sqlx::test]
818    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
819        // Run migrations first
820        crate::migrator().run(&pool).await.unwrap();
821
822        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
823            .await
824            .unwrap();
825        let repository = handler.repository();
826
827        // Insert requests with known timestamps
828        let now = SystemTime::now();
829        for i in 0..5 {
830            let correlation_id = 2000 + i;
831            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
832
833            let mut headers = HashMap::new();
834            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
835
836            let request_data = RequestData {
837                method: http::Method::GET,
838                uri: "/api/test".parse().unwrap(),
839                headers,
840                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
841                timestamp,
842                correlation_id,
843            };
844
845            handler.handle_request(request_data).await;
846        }
847
848        // Test default ordering (ASC) with limit
849        let filter = RequestFilter {
850            limit: Some(3),
851            ..Default::default()
852        };
853        let results = repository.query(filter).await.unwrap();
854        assert_eq!(results.len(), 3);
855
856        // Should be in ascending timestamp order
857        for i in 0..2 {
858            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
859        }
860
861        // Test descending order with offset
862        let filter = RequestFilter {
863            order_by_timestamp_desc: true,
864            limit: Some(2),
865            offset: Some(1),
866            ..Default::default()
867        };
868        let results = repository.query(filter).await.unwrap();
869        assert_eq!(results.len(), 2);
870
871        // Should be in descending order, skipping the first (newest) one
872        assert!(results[0].request.timestamp >= results[1].request.timestamp);
873    }
874
875    #[sqlx::test]
876    async fn test_headers_conversion(pool: PgPool) {
877        // Run migrations first
878        crate::migrator().run(&pool).await.unwrap();
879
880        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
881            .await
882            .unwrap();
883        let repository = handler.repository();
884
885        // Test various header scenarios
886        let mut headers = HashMap::new();
887        headers.insert("single-value".to_string(), vec!["test".into()]);
888        headers.insert(
889            "multi-value".to_string(),
890            vec!["val1".into(), "val2".into()],
891        );
892        headers.insert("empty-value".to_string(), vec!["".into()]);
893
894        let request_data = RequestData {
895            correlation_id: 3000,
896            method: http::Method::GET,
897            uri: "/test".parse().unwrap(),
898            headers,
899            body: None,
900            timestamp: SystemTime::now(),
901        };
902
903        let correlation_id = 3000;
904        handler.handle_request(request_data).await;
905
906        let filter = RequestFilter {
907            correlation_id: Some(correlation_id as i64),
908            ..Default::default()
909        };
910        let results = repository.query(filter).await.unwrap();
911
912        assert_eq!(results.len(), 1);
913        let headers_json = &results[0].request.headers;
914
915        // Single value should be stored as string
916        assert_eq!(
917            headers_json["single-value"],
918            Value::String("test".to_string())
919        );
920
921        // Multi-value should be stored as array
922        match &headers_json["multi-value"] {
923            Value::Array(arr) => {
924                assert_eq!(arr.len(), 2);
925                assert_eq!(arr[0], Value::String("val1".to_string()));
926                assert_eq!(arr[1], Value::String("val2".to_string()));
927            }
928            _ => panic!("Expected array for multi-value header"),
929        }
930
931        // Empty value should still be a string
932        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
933    }
934
935    #[sqlx::test]
936    async fn test_timestamp_filtering(pool: PgPool) {
937        // Run migrations first
938        crate::migrator().run(&pool).await.unwrap();
939
940        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
941            .await
942            .unwrap();
943        let repository = handler.repository();
944
945        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
946
947        // Insert requests at different times
948        let times = [
949            base_time + Duration::from_secs(0),    // correlation_id 4001
950            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
951            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
952        ];
953
954        for (i, timestamp) in times.iter().enumerate() {
955            let correlation_id = 4001 + i as u64;
956            let request_data = RequestData {
957                method: http::Method::GET,
958                uri: "/test".parse().unwrap(),
959                headers: HashMap::new(),
960                body: None,
961                timestamp: *timestamp,
962                correlation_id,
963            };
964
965            handler.handle_request(request_data).await;
966        }
967
968        // Test timestamp_after filter
969        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
970        let filter = RequestFilter {
971            timestamp_after: Some(after_time),
972            ..Default::default()
973        };
974        let results = repository.query(filter).await.unwrap();
975        assert_eq!(results.len(), 2); // Should get 4002 and 4003
976
977        // Test timestamp_before filter
978        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
979        let filter = RequestFilter {
980            timestamp_before: Some(before_time),
981            ..Default::default()
982        };
983        let results = repository.query(filter).await.unwrap();
984        assert_eq!(results.len(), 2); // Should get 4001 and 4002
985
986        // Test timestamp range
987        let filter = RequestFilter {
988            timestamp_after: Some(after_time),
989            timestamp_before: Some(before_time),
990            ..Default::default()
991        };
992        let results = repository.query(filter).await.unwrap();
993        assert_eq!(results.len(), 1); // Should get only 4002
994        assert_eq!(results[0].request.correlation_id, 4002);
995    }
996
997    // Note: Path filtering tests have been removed because path filtering
998    // now happens at the outlet middleware layer, not in the PostgresHandler.
999    // The handler now logs everything it receives, with filtering done upstream.
1000
1001    #[sqlx::test]
1002    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1003        // Run migrations first
1004        crate::migrator().run(&pool).await.unwrap();
1005
1006        // Handler without any path filtering
1007        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1008            .await
1009            .unwrap();
1010        let repository = handler.repository();
1011
1012        let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1013        for (i, uri) in test_uris.iter().enumerate() {
1014            let correlation_id = 3000 + i as u64;
1015            let mut headers = HashMap::new();
1016            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1017
1018            let request_data = RequestData {
1019                method: http::Method::GET,
1020                uri: uri.parse().unwrap(),
1021                headers,
1022                body: Some(Bytes::from(b"{}".to_vec())),
1023                timestamp: SystemTime::now(),
1024                correlation_id,
1025            };
1026
1027            handler.handle_request(request_data).await;
1028        }
1029
1030        // Should have logged all 4 requests
1031        let filter = RequestFilter::default();
1032        let results = repository.query(filter).await.unwrap();
1033        assert_eq!(results.len(), 4);
1034    }
1035}