outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use outlet::{RequestData, RequestHandler, ResponseData};
86use serde::{Deserialize, Serialize};
87use serde_json::Value;
88use sqlx::PgPool;
89use std::collections::HashMap;
90use std::sync::Arc;
91use std::time::{Instant, SystemTime};
92use tracing::{debug, error, instrument, warn};
93use uuid::Uuid;
94use metrics::{counter, histogram};
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103/// Get the migrator for running outlet-postgres database migrations.
104///
105/// This returns a SQLx migrator that can be used to set up the required
106/// `http_requests` and `http_responses` tables. The consuming application
107/// is responsible for running these migrations at the appropriate time
108/// and in the appropriate database schema.
109///
110/// # Examples
111///
112/// ```rust,no_run
113/// use outlet_postgres::migrator;
114/// use sqlx::PgPool;
115///
116/// #[tokio::main]
117/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
118///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
119///     
120///     // Run outlet migrations
121///     migrator().run(&pool).await?;
122///     
123///     Ok(())
124/// }
125/// ```
126pub fn migrator() -> sqlx::migrate::Migrator {
127    sqlx::migrate!("./migrations")
128}
129
130/// Type alias for request body serializers.
131///
132/// Request serializers take full request context including headers and body bytes.
133/// On failure, they return a `SerializationError` with fallback data.
134type RequestSerializer<T> =
135    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
136
137/// Type alias for response body serializers.
138///
139/// Response serializers take both request and response context, allowing them to
140/// make parsing decisions based on request details and response headers (e.g., compression).
141/// On failure, they return a `SerializationError` with fallback data.
142type ResponseSerializer<T> = Arc<
143    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
144        + Send
145        + Sync,
146>;
147
148/// PostgreSQL handler for outlet middleware.
149///
150/// Implements the `RequestHandler` trait to log HTTP requests and responses
151/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
152///
153/// Generic over `TReq` and `TRes` which represent the request and response body types
154/// that should implement `Deserialize` for parsing and `Serialize` for database storage as JSONB.
155/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
156#[derive(Clone)]
157pub struct PostgresHandler<TReq = Value, TRes = Value>
158where
159    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
161{
162    pool: PgPool,
163    request_serializer: RequestSerializer<TReq>,
164    response_serializer: ResponseSerializer<TRes>,
165    instance_id: Uuid,
166}
167
168impl<TReq, TRes> PostgresHandler<TReq, TRes>
169where
170    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
171    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
172{
173    /// Default serializer that attempts serde JSON deserialization.
174    /// On failure, returns a SerializationError with raw bytes as fallback data.
175    fn default_request_serializer() -> RequestSerializer<TReq> {
176        Arc::new(|request_data| {
177            let bytes = request_data.body.as_deref().unwrap_or(&[]);
178            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
179                let fallback_data = String::from_utf8_lossy(bytes).to_string();
180                SerializationError::new(fallback_data, error)
181            })
182        })
183    }
184
185    /// Default serializer that attempts serde JSON deserialization.
186    /// On failure, returns a SerializationError with raw bytes as fallback data.
187    fn default_response_serializer() -> ResponseSerializer<TRes> {
188        Arc::new(|_request_data, response_data| {
189            let bytes = response_data.body.as_deref().unwrap_or(&[]);
190            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
191                let fallback_data = String::from_utf8_lossy(bytes).to_string();
192                SerializationError::new(fallback_data, error)
193            })
194        })
195    }
196
197    /// Create a new PostgreSQL handler with a connection pool.
198    ///
199    /// This will connect to the database but will NOT run migrations.
200    /// Use `migrator()` to get a migrator and run migrations separately.
201    ///
202    /// # Arguments
203    ///
204    /// * `database_url` - PostgreSQL connection string
205    ///
206    /// # Examples
207    ///
208    /// ```rust,no_run
209    /// use outlet_postgres::{PostgresHandler, migrator};
210    /// use serde::{Deserialize, Serialize};
211    /// use serde_json::Value;
212    ///
213    /// #[derive(Deserialize, Serialize)]
214    /// struct MyBodyType {
215    ///     id: u64,
216    ///     name: String,
217    /// }
218    ///
219    /// #[tokio::main]
220    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
221    ///     // Run migrations first
222    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
223    ///     migrator().run(&pool).await?;
224    ///     
225    ///     // Create handler
226    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
227    ///     Ok(())
228    /// }
229    /// ```
230    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
231        let pool = PgPool::connect(database_url)
232            .await
233            .map_err(PostgresHandlerError::Connection)?;
234
235        Ok(Self {
236            pool,
237            request_serializer: Self::default_request_serializer(),
238            response_serializer: Self::default_response_serializer(),
239            instance_id: Uuid::new_v4(),
240        })
241    }
242
243    /// Add a custom request body serializer.
244    ///
245    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
246    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
247    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
248    ///
249    /// # Panics
250    ///
251    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
252    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
253    /// implementation of `TReq` and should be fixed by the caller.
254    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
255    where
256        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
257    {
258        self.request_serializer = Arc::new(serializer);
259        self
260    }
261
262    /// Add a custom response body serializer.
263    ///
264    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
265    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
266    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
267    ///
268    /// # Panics
269    ///
270    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
271    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
272    /// implementation of `TRes` and should be fixed by the caller.
273    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
274    where
275        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
276            + Send
277            + Sync
278            + 'static,
279    {
280        self.response_serializer = Arc::new(serializer);
281        self
282    }
283
284    /// Create a PostgreSQL handler from an existing connection pool.
285    ///
286    /// Use this if you already have a connection pool and want to reuse it.
287    /// This will NOT run migrations - use `migrator()` to run migrations separately.
288    ///
289    /// # Arguments
290    ///
291    /// * `pool` - Existing PostgreSQL connection pool
292    ///
293    /// # Examples
294    ///
295    /// ```rust,no_run
296    /// use outlet_postgres::{PostgresHandler, migrator};
297    /// use sqlx::PgPool;
298    /// use serde::{Deserialize, Serialize};
299    ///
300    /// #[derive(Deserialize, Serialize)]
301    /// struct MyBodyType {
302    ///     id: u64,
303    ///     name: String,
304    /// }
305    ///
306    /// #[tokio::main]
307    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
308    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
309    ///     
310    ///     // Run migrations first
311    ///     migrator().run(&pool).await?;
312    ///     
313    ///     // Create handler
314    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::from_pool(pool).await?;
315    ///     Ok(())
316    /// }
317    /// ```
318    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
319        Ok(Self {
320            pool,
321            request_serializer: Self::default_request_serializer(),
322            response_serializer: Self::default_response_serializer(),
323            instance_id: Uuid::new_v4(),
324        })
325    }
326
327    /// Convert headers to a JSONB-compatible format.
328    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
329        let mut header_map = HashMap::new();
330        for (name, values) in headers {
331            if values.len() == 1 {
332                let value_str = String::from_utf8_lossy(&values[0]).to_string();
333                header_map.insert(name.clone(), Value::String(value_str));
334            } else {
335                let value_array: Vec<Value> = values
336                    .iter()
337                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
338                    .collect();
339                header_map.insert(name.clone(), Value::Array(value_array));
340            }
341        }
342        serde_json::to_value(header_map).unwrap_or(Value::Null)
343    }
344
345    /// Convert request data to a JSONB value using the configured serializer.
346    fn request_body_to_json_with_fallback(
347        &self,
348        request_data: &outlet::RequestData,
349    ) -> (Value, bool) {
350        match (self.request_serializer)(request_data) {
351            Ok(typed_value) => {
352                if let Ok(json_value) = serde_json::to_value(&typed_value) {
353                    (json_value, true)
354                } else {
355                    // This should never happen if the type implements Serialize correctly
356                    (
357                        Value::String(
358                            serde_json::to_string(&typed_value)
359                                .expect("Serialized value must be convertible to JSON string"),
360                        ),
361                        false,
362                    )
363                }
364            }
365            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
366        }
367    }
368
369    /// Convert response data to a JSONB value using the configured serializer.
370    fn response_body_to_json_with_fallback(
371        &self,
372        request_data: &outlet::RequestData,
373        response_data: &outlet::ResponseData,
374    ) -> (Value, bool) {
375        match (self.response_serializer)(request_data, response_data) {
376            Ok(typed_value) => {
377                if let Ok(json_value) = serde_json::to_value(&typed_value) {
378                    (json_value, true)
379                } else {
380                    // This should never happen if the type implements Serialize correctly
381                    (
382                        Value::String(
383                            serde_json::to_string(&typed_value)
384                                .expect("Serialized value must be convertible to JSON string"),
385                        ),
386                        false,
387                    )
388                }
389            }
390            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
391        }
392    }
393
394    /// Get a repository for querying logged requests and responses.
395    ///
396    /// Returns a `RequestRepository` with the same type parameters as this handler,
397    /// allowing for type-safe querying of request and response bodies.
398    pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
399        crate::repository::RequestRepository::new(self.pool.clone())
400    }
401}
402
403impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
404where
405    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
406    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
407{
408    #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
409    async fn handle_request(&self, data: RequestData) {
410        let headers_json = Self::headers_to_json(&data.headers);
411        let (body_json, parsed) = if data.body.is_some() {
412            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
413            (Some(json), parsed)
414        } else {
415            (None, false)
416        };
417
418        let timestamp: DateTime<Utc> = data.timestamp.into();
419
420        let query_start = Instant::now();
421        let result = sqlx::query(
422            r#"
423            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
424            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
425            "#,
426        )
427        .bind(self.instance_id)
428        .bind(data.correlation_id as i64)
429        .bind(timestamp)
430        .bind(data.method.to_string())
431        .bind(data.uri.to_string())
432        .bind(headers_json)
433        .bind(body_json)
434        .bind(parsed)
435        .execute(&self.pool)
436        .await;
437        let query_duration = query_start.elapsed();
438        histogram!("outlet_write_duration_seconds", "operation" => "request").record(query_duration.as_secs_f64());
439
440        if let Err(e) = result {
441            counter!("outlet_write_errors_total", "operation" => "request").increment(1);
442            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
443        } else {
444            let processing_lag_ms = SystemTime::now()
445                .duration_since(data.timestamp)
446                .unwrap_or_default()
447                .as_millis();
448            if processing_lag_ms > 1000 {
449                warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
450            } else {
451                debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
452            }
453        }
454    }
455
456    #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
457    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
458        let headers_json = Self::headers_to_json(&response_data.headers);
459        let (body_json, parsed) = if response_data.body.is_some() {
460            let (json, parsed) =
461                self.response_body_to_json_with_fallback(&request_data, &response_data);
462            (Some(json), parsed)
463        } else {
464            (None, false)
465        };
466
467        let timestamp: DateTime<Utc> = response_data.timestamp.into();
468        let duration_ms = response_data.duration.as_millis() as i64;
469        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
470
471        let query_start = Instant::now();
472        let result = sqlx::query(
473            r#"
474            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
475            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
476            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
477            "#,
478        )
479        .bind(self.instance_id)
480        .bind(request_data.correlation_id as i64)
481        .bind(timestamp)
482        .bind(response_data.status.as_u16() as i32)
483        .bind(headers_json)
484        .bind(body_json)
485        .bind(parsed)
486        .bind(duration_to_first_byte_ms)
487        .bind(duration_ms)
488        .execute(&self.pool)
489        .await;
490        let query_duration = query_start.elapsed();
491        histogram!("outlet_write_duration_seconds", "operation" => "response").record(query_duration.as_secs_f64());
492
493        match result {
494            Err(e) => {
495                counter!("outlet_write_errors_total", "operation" => "response").increment(1);
496                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
497            }
498            Ok(query_result) => {
499                if query_result.rows_affected() > 0 {
500                    let processing_lag_ms = SystemTime::now()
501                        .duration_since(response_data.timestamp)
502                        .unwrap_or_default()
503                        .as_millis();
504                    if processing_lag_ms > 1000 {
505                        warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
506                    } else {
507                        debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
508                    }
509                } else {
510                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
511                }
512            }
513        }
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use bytes::Bytes;
521    use chrono::{DateTime, Utc};
522    use outlet::{RequestData, ResponseData};
523    use serde::{Deserialize, Serialize};
524    use serde_json::Value;
525    use sqlx::PgPool;
526    use std::collections::HashMap;
527    use std::time::{Duration, SystemTime};
528
529    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
530    struct TestRequest {
531        user_id: u64,
532        action: String,
533    }
534
535    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
536    struct TestResponse {
537        success: bool,
538        message: String,
539    }
540
541    fn create_test_request_data() -> RequestData {
542        let mut headers = HashMap::new();
543        headers.insert("content-type".to_string(), vec!["application/json".into()]);
544        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
545
546        let test_req = TestRequest {
547            user_id: 123,
548            action: "create_user".to_string(),
549        };
550        let body = serde_json::to_vec(&test_req).unwrap();
551
552        RequestData {
553            method: http::Method::POST,
554            uri: http::Uri::from_static("/api/users"),
555            headers,
556            body: Some(Bytes::from(body)),
557            timestamp: SystemTime::now(),
558            correlation_id: 0,
559        }
560    }
561
562    fn create_test_response_data() -> ResponseData {
563        let mut headers = HashMap::new();
564        headers.insert("content-type".to_string(), vec!["application/json".into()]);
565
566        let test_res = TestResponse {
567            success: true,
568            message: "User created successfully".to_string(),
569        };
570        let body = serde_json::to_vec(&test_res).unwrap();
571
572        ResponseData {
573            status: http::StatusCode::CREATED,
574            headers,
575            body: Some(Bytes::from(body)),
576            timestamp: SystemTime::now(),
577            duration_to_first_byte: Duration::from_millis(100),
578            duration: Duration::from_millis(150),
579            correlation_id: 0,
580        }
581    }
582
583    #[sqlx::test]
584    async fn test_handler_creation(pool: PgPool) {
585        // Run migrations first
586        crate::migrator().run(&pool).await.unwrap();
587
588        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
589            .await
590            .unwrap();
591
592        // Verify we can get a repository
593        let repository = handler.repository();
594
595        // Test initial state - no requests logged yet
596        let filter = RequestFilter::default();
597        let results = repository.query(filter).await.unwrap();
598        assert!(results.is_empty());
599    }
600
601    #[sqlx::test]
602    async fn test_handle_request_with_typed_body(pool: PgPool) {
603        // Run migrations first
604        crate::migrator().run(&pool).await.unwrap();
605
606        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
607            .await
608            .unwrap();
609        let repository = handler.repository();
610
611        let mut request_data = create_test_request_data();
612        let correlation_id = 12345;
613        request_data.correlation_id = correlation_id;
614
615        // Handle the request
616        handler.handle_request(request_data.clone()).await;
617
618        // Query back the request
619        let filter = RequestFilter {
620            correlation_id: Some(correlation_id as i64),
621            ..Default::default()
622        };
623        let results = repository.query(filter).await.unwrap();
624
625        assert_eq!(results.len(), 1);
626        let pair = &results[0];
627
628        assert_eq!(pair.request.correlation_id, correlation_id as i64);
629        assert_eq!(pair.request.method, "POST");
630        assert_eq!(pair.request.uri, "/api/users");
631
632        // Check that body was parsed successfully
633        match &pair.request.body {
634            Some(Ok(parsed_body)) => {
635                assert_eq!(
636                    *parsed_body,
637                    TestRequest {
638                        user_id: 123,
639                        action: "create_user".to_string(),
640                    }
641                );
642            }
643            _ => panic!("Expected successfully parsed request body"),
644        }
645
646        // Headers should be converted to JSON properly
647        let headers_value = &pair.request.headers;
648        assert!(headers_value.get("content-type").is_some());
649        assert!(headers_value.get("user-agent").is_some());
650
651        // No response yet
652        assert!(pair.response.is_none());
653    }
654
655    #[sqlx::test]
656    async fn test_handle_response_with_typed_body(pool: PgPool) {
657        // Run migrations first
658        crate::migrator().run(&pool).await.unwrap();
659
660        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
661            .await
662            .unwrap();
663        let repository = handler.repository();
664
665        let mut request_data = create_test_request_data();
666        let mut response_data = create_test_response_data();
667        let correlation_id = 54321;
668        request_data.correlation_id = correlation_id;
669        response_data.correlation_id = correlation_id;
670
671        // Handle both request and response
672        handler.handle_request(request_data.clone()).await;
673        handler
674            .handle_response(request_data, response_data.clone())
675            .await;
676
677        // Query back the complete pair
678        let filter = RequestFilter {
679            correlation_id: Some(correlation_id as i64),
680            ..Default::default()
681        };
682        let results = repository.query(filter).await.unwrap();
683
684        assert_eq!(results.len(), 1);
685        let pair = &results[0];
686
687        // Check response data
688        let response = pair.response.as_ref().expect("Response should be present");
689        assert_eq!(response.correlation_id, correlation_id as i64);
690        assert_eq!(response.status_code, 201);
691        assert_eq!(response.duration_ms, 150);
692
693        // Check that response body was parsed successfully
694        match &response.body {
695            Some(Ok(parsed_body)) => {
696                assert_eq!(
697                    *parsed_body,
698                    TestResponse {
699                        success: true,
700                        message: "User created successfully".to_string(),
701                    }
702                );
703            }
704            _ => panic!("Expected successfully parsed response body"),
705        }
706    }
707
708    #[sqlx::test]
709    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
710        // Run migrations first
711        crate::migrator().run(&pool).await.unwrap();
712
713        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
714            .await
715            .unwrap();
716        let repository = handler.repository();
717
718        // Create request with invalid JSON for TestRequest
719        let mut headers = HashMap::new();
720        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
721
722        let invalid_json_body = b"not valid json for TestRequest";
723        let correlation_id = 99999;
724        let request_data = RequestData {
725            method: http::Method::POST,
726            uri: http::Uri::from_static("/api/test"),
727            headers,
728            body: Some(Bytes::from(invalid_json_body.to_vec())),
729            timestamp: SystemTime::now(),
730            correlation_id,
731        };
732
733        handler.handle_request(request_data).await;
734
735        // Query back and verify fallback to base64
736        let filter = RequestFilter {
737            correlation_id: Some(correlation_id as i64),
738            ..Default::default()
739        };
740        let results = repository.query(filter).await.unwrap();
741
742        assert_eq!(results.len(), 1);
743        let pair = &results[0];
744
745        // Should fallback to raw bytes
746        match &pair.request.body {
747            Some(Err(raw_bytes)) => {
748                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
749            }
750            _ => panic!("Expected raw bytes fallback for unparseable body"),
751        }
752    }
753
754    #[sqlx::test]
755    async fn test_query_with_multiple_filters(pool: PgPool) {
756        // Run migrations first
757        crate::migrator().run(&pool).await.unwrap();
758
759        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
760            .await
761            .unwrap();
762        let repository = handler.repository();
763
764        // Insert multiple requests with different characteristics
765        let test_cases = vec![
766            (1001, "GET", "/api/users", 200, 100),
767            (1002, "POST", "/api/users", 201, 150),
768            (1003, "GET", "/api/orders", 404, 50),
769            (1004, "PUT", "/api/users/123", 200, 300),
770        ];
771
772        for (correlation_id, method, uri, status, duration_ms) in test_cases {
773            let mut headers = HashMap::new();
774            headers.insert("content-type".to_string(), vec!["application/json".into()]);
775
776            let request_data = RequestData {
777                method: method.parse().unwrap(),
778                uri: uri.parse().unwrap(),
779                headers: headers.clone(),
780                body: Some(Bytes::from(b"{}".to_vec())),
781                timestamp: SystemTime::now(),
782                correlation_id,
783            };
784
785            let response_data = ResponseData {
786                correlation_id,
787                status: http::StatusCode::from_u16(status).unwrap(),
788                headers,
789                body: Some(Bytes::from(b"{}".to_vec())),
790                timestamp: SystemTime::now(),
791                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
792                duration: Duration::from_millis(duration_ms),
793            };
794
795            handler.handle_request(request_data.clone()).await;
796            handler.handle_response(request_data, response_data).await;
797        }
798
799        // Test method filter
800        let filter = RequestFilter {
801            method: Some("GET".to_string()),
802            ..Default::default()
803        };
804        let results = repository.query(filter).await.unwrap();
805        assert_eq!(results.len(), 2); // 1001, 1003
806
807        // Test status code filter
808        let filter = RequestFilter {
809            status_code: Some(200),
810            ..Default::default()
811        };
812        let results = repository.query(filter).await.unwrap();
813        assert_eq!(results.len(), 2); // 1001, 1004
814
815        // Test URI pattern filter
816        let filter = RequestFilter {
817            uri_pattern: Some("/api/users%".to_string()),
818            ..Default::default()
819        };
820        let results = repository.query(filter).await.unwrap();
821        assert_eq!(results.len(), 3); // 1001, 1002, 1004
822
823        // Test duration range filter
824        let filter = RequestFilter {
825            min_duration_ms: Some(100),
826            max_duration_ms: Some(200),
827            ..Default::default()
828        };
829        let results = repository.query(filter).await.unwrap();
830        assert_eq!(results.len(), 2); // 1001, 1002
831
832        // Test combined filters
833        let filter = RequestFilter {
834            method: Some("GET".to_string()),
835            status_code: Some(200),
836            ..Default::default()
837        };
838        let results = repository.query(filter).await.unwrap();
839        assert_eq!(results.len(), 1); // Only 1001
840        assert_eq!(results[0].request.correlation_id, 1001);
841    }
842
843    #[sqlx::test]
844    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
845        // Run migrations first
846        crate::migrator().run(&pool).await.unwrap();
847
848        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
849            .await
850            .unwrap();
851        let repository = handler.repository();
852
853        // Insert requests with known timestamps
854        let now = SystemTime::now();
855        for i in 0..5 {
856            let correlation_id = 2000 + i;
857            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
858
859            let mut headers = HashMap::new();
860            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
861
862            let request_data = RequestData {
863                method: http::Method::GET,
864                uri: "/api/test".parse().unwrap(),
865                headers,
866                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
867                timestamp,
868                correlation_id,
869            };
870
871            handler.handle_request(request_data).await;
872        }
873
874        // Test default ordering (ASC) with limit
875        let filter = RequestFilter {
876            limit: Some(3),
877            ..Default::default()
878        };
879        let results = repository.query(filter).await.unwrap();
880        assert_eq!(results.len(), 3);
881
882        // Should be in ascending timestamp order
883        for i in 0..2 {
884            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
885        }
886
887        // Test descending order with offset
888        let filter = RequestFilter {
889            order_by_timestamp_desc: true,
890            limit: Some(2),
891            offset: Some(1),
892            ..Default::default()
893        };
894        let results = repository.query(filter).await.unwrap();
895        assert_eq!(results.len(), 2);
896
897        // Should be in descending order, skipping the first (newest) one
898        assert!(results[0].request.timestamp >= results[1].request.timestamp);
899    }
900
901    #[sqlx::test]
902    async fn test_headers_conversion(pool: PgPool) {
903        // Run migrations first
904        crate::migrator().run(&pool).await.unwrap();
905
906        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
907            .await
908            .unwrap();
909        let repository = handler.repository();
910
911        // Test various header scenarios
912        let mut headers = HashMap::new();
913        headers.insert("single-value".to_string(), vec!["test".into()]);
914        headers.insert(
915            "multi-value".to_string(),
916            vec!["val1".into(), "val2".into()],
917        );
918        headers.insert("empty-value".to_string(), vec!["".into()]);
919
920        let request_data = RequestData {
921            correlation_id: 3000,
922            method: http::Method::GET,
923            uri: "/test".parse().unwrap(),
924            headers,
925            body: None,
926            timestamp: SystemTime::now(),
927        };
928
929        let correlation_id = 3000;
930        handler.handle_request(request_data).await;
931
932        let filter = RequestFilter {
933            correlation_id: Some(correlation_id as i64),
934            ..Default::default()
935        };
936        let results = repository.query(filter).await.unwrap();
937
938        assert_eq!(results.len(), 1);
939        let headers_json = &results[0].request.headers;
940
941        // Single value should be stored as string
942        assert_eq!(
943            headers_json["single-value"],
944            Value::String("test".to_string())
945        );
946
947        // Multi-value should be stored as array
948        match &headers_json["multi-value"] {
949            Value::Array(arr) => {
950                assert_eq!(arr.len(), 2);
951                assert_eq!(arr[0], Value::String("val1".to_string()));
952                assert_eq!(arr[1], Value::String("val2".to_string()));
953            }
954            _ => panic!("Expected array for multi-value header"),
955        }
956
957        // Empty value should still be a string
958        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
959    }
960
961    #[sqlx::test]
962    async fn test_timestamp_filtering(pool: PgPool) {
963        // Run migrations first
964        crate::migrator().run(&pool).await.unwrap();
965
966        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
967            .await
968            .unwrap();
969        let repository = handler.repository();
970
971        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
972
973        // Insert requests at different times
974        let times = [
975            base_time + Duration::from_secs(0),    // correlation_id 4001
976            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
977            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
978        ];
979
980        for (i, timestamp) in times.iter().enumerate() {
981            let correlation_id = 4001 + i as u64;
982            let request_data = RequestData {
983                method: http::Method::GET,
984                uri: "/test".parse().unwrap(),
985                headers: HashMap::new(),
986                body: None,
987                timestamp: *timestamp,
988                correlation_id,
989            };
990
991            handler.handle_request(request_data).await;
992        }
993
994        // Test timestamp_after filter
995        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
996        let filter = RequestFilter {
997            timestamp_after: Some(after_time),
998            ..Default::default()
999        };
1000        let results = repository.query(filter).await.unwrap();
1001        assert_eq!(results.len(), 2); // Should get 4002 and 4003
1002
1003        // Test timestamp_before filter
1004        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
1005        let filter = RequestFilter {
1006            timestamp_before: Some(before_time),
1007            ..Default::default()
1008        };
1009        let results = repository.query(filter).await.unwrap();
1010        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1011
1012        // Test timestamp range
1013        let filter = RequestFilter {
1014            timestamp_after: Some(after_time),
1015            timestamp_before: Some(before_time),
1016            ..Default::default()
1017        };
1018        let results = repository.query(filter).await.unwrap();
1019        assert_eq!(results.len(), 1); // Should get only 4002
1020        assert_eq!(results[0].request.correlation_id, 4002);
1021    }
1022
1023    // Note: Path filtering tests have been removed because path filtering
1024    // now happens at the outlet middleware layer, not in the PostgresHandler.
1025    // The handler now logs everything it receives, with filtering done upstream.
1026
1027    #[sqlx::test]
1028    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1029        // Run migrations first
1030        crate::migrator().run(&pool).await.unwrap();
1031
1032        // Handler without any path filtering
1033        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1034            .await
1035            .unwrap();
1036        let repository = handler.repository();
1037
1038        let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1039        for (i, uri) in test_uris.iter().enumerate() {
1040            let correlation_id = 3000 + i as u64;
1041            let mut headers = HashMap::new();
1042            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1043
1044            let request_data = RequestData {
1045                method: http::Method::GET,
1046                uri: uri.parse().unwrap(),
1047                headers,
1048                body: Some(Bytes::from(b"{}".to_vec())),
1049                timestamp: SystemTime::now(),
1050                correlation_id,
1051            };
1052
1053            handler.handle_request(request_data).await;
1054        }
1055
1056        // Should have logged all 4 requests
1057        let filter = RequestFilter::default();
1058        let results = repository.query(filter).await.unwrap();
1059        assert_eq!(results.len(), 4);
1060    }
1061}