outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use outlet::{RequestData, RequestHandler, ResponseData};
86use serde::{Deserialize, Serialize};
87use serde_json::Value;
88use sqlx::PgPool;
89use std::collections::HashMap;
90use std::sync::Arc;
91use std::time::SystemTime;
92use tracing::{debug, error, instrument, warn};
93use uuid::Uuid;
94
95pub mod error;
96pub mod repository;
97pub use error::PostgresHandlerError;
98pub use repository::{
99    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
100};
101
102/// Get the migrator for running outlet-postgres database migrations.
103///
104/// This returns a SQLx migrator that can be used to set up the required
105/// `http_requests` and `http_responses` tables. The consuming application
106/// is responsible for running these migrations at the appropriate time
107/// and in the appropriate database schema.
108///
109/// # Examples
110///
111/// ```rust,no_run
112/// use outlet_postgres::migrator;
113/// use sqlx::PgPool;
114///
115/// #[tokio::main]
116/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
117///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
118///     
119///     // Run outlet migrations
120///     migrator().run(&pool).await?;
121///     
122///     Ok(())
123/// }
124/// ```
125pub fn migrator() -> sqlx::migrate::Migrator {
126    sqlx::migrate!("./migrations")
127}
128
129/// Type alias for request body serializers.
130///
131/// Request serializers take full request context including headers and body bytes.
132/// On failure, they return a `SerializationError` with fallback data.
133type RequestSerializer<T> =
134    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
135
136/// Type alias for response body serializers.
137///
138/// Response serializers take both request and response context, allowing them to
139/// make parsing decisions based on request details and response headers (e.g., compression).
140/// On failure, they return a `SerializationError` with fallback data.
141type ResponseSerializer<T> = Arc<
142    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
143        + Send
144        + Sync,
145>;
146
147/// PostgreSQL handler for outlet middleware.
148///
149/// Implements the `RequestHandler` trait to log HTTP requests and responses
150/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
151///
152/// Generic over `TReq` and `TRes` which represent the request and response body types
153/// that should implement `Deserialize` for parsing and `Serialize` for database storage as JSONB.
154/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
155#[derive(Clone)]
156pub struct PostgresHandler<TReq = Value, TRes = Value>
157where
158    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160{
161    pool: PgPool,
162    request_serializer: RequestSerializer<TReq>,
163    response_serializer: ResponseSerializer<TRes>,
164    instance_id: Uuid,
165}
166
167impl<TReq, TRes> PostgresHandler<TReq, TRes>
168where
169    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
170    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
171{
172    /// Default serializer that attempts serde JSON deserialization.
173    /// On failure, returns a SerializationError with raw bytes as fallback data.
174    fn default_request_serializer() -> RequestSerializer<TReq> {
175        Arc::new(|request_data| {
176            let bytes = request_data.body.as_deref().unwrap_or(&[]);
177            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
178                let fallback_data = String::from_utf8_lossy(bytes).to_string();
179                SerializationError::new(fallback_data, error)
180            })
181        })
182    }
183
184    /// Default serializer that attempts serde JSON deserialization.
185    /// On failure, returns a SerializationError with raw bytes as fallback data.
186    fn default_response_serializer() -> ResponseSerializer<TRes> {
187        Arc::new(|_request_data, response_data| {
188            let bytes = response_data.body.as_deref().unwrap_or(&[]);
189            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
190                let fallback_data = String::from_utf8_lossy(bytes).to_string();
191                SerializationError::new(fallback_data, error)
192            })
193        })
194    }
195
196    /// Create a new PostgreSQL handler with a connection pool.
197    ///
198    /// This will connect to the database but will NOT run migrations.
199    /// Use `migrator()` to get a migrator and run migrations separately.
200    ///
201    /// # Arguments
202    ///
203    /// * `database_url` - PostgreSQL connection string
204    ///
205    /// # Examples
206    ///
207    /// ```rust,no_run
208    /// use outlet_postgres::{PostgresHandler, migrator};
209    /// use serde::{Deserialize, Serialize};
210    /// use serde_json::Value;
211    ///
212    /// #[derive(Deserialize, Serialize)]
213    /// struct MyBodyType {
214    ///     id: u64,
215    ///     name: String,
216    /// }
217    ///
218    /// #[tokio::main]
219    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
220    ///     // Run migrations first
221    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
222    ///     migrator().run(&pool).await?;
223    ///     
224    ///     // Create handler
225    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
226    ///     Ok(())
227    /// }
228    /// ```
229    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
230        let pool = PgPool::connect(database_url)
231            .await
232            .map_err(PostgresHandlerError::Connection)?;
233
234        Ok(Self {
235            pool,
236            request_serializer: Self::default_request_serializer(),
237            response_serializer: Self::default_response_serializer(),
238            instance_id: Uuid::new_v4(),
239        })
240    }
241
242    /// Add a custom request body serializer.
243    ///
244    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
245    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
246    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
247    ///
248    /// # Panics
249    ///
250    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
251    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
252    /// implementation of `TReq` and should be fixed by the caller.
253    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
254    where
255        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
256    {
257        self.request_serializer = Arc::new(serializer);
258        self
259    }
260
261    /// Add a custom response body serializer.
262    ///
263    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
264    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
265    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
266    ///
267    /// # Panics
268    ///
269    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
270    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
271    /// implementation of `TRes` and should be fixed by the caller.
272    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
273    where
274        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
275            + Send
276            + Sync
277            + 'static,
278    {
279        self.response_serializer = Arc::new(serializer);
280        self
281    }
282
283    /// Create a PostgreSQL handler from an existing connection pool.
284    ///
285    /// Use this if you already have a connection pool and want to reuse it.
286    /// This will NOT run migrations - use `migrator()` to run migrations separately.
287    ///
288    /// # Arguments
289    ///
290    /// * `pool` - Existing PostgreSQL connection pool
291    ///
292    /// # Examples
293    ///
294    /// ```rust,no_run
295    /// use outlet_postgres::{PostgresHandler, migrator};
296    /// use sqlx::PgPool;
297    /// use serde::{Deserialize, Serialize};
298    ///
299    /// #[derive(Deserialize, Serialize)]
300    /// struct MyBodyType {
301    ///     id: u64,
302    ///     name: String,
303    /// }
304    ///
305    /// #[tokio::main]
306    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
307    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
308    ///     
309    ///     // Run migrations first
310    ///     migrator().run(&pool).await?;
311    ///     
312    ///     // Create handler
313    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::from_pool(pool).await?;
314    ///     Ok(())
315    /// }
316    /// ```
317    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
318        Ok(Self {
319            pool,
320            request_serializer: Self::default_request_serializer(),
321            response_serializer: Self::default_response_serializer(),
322            instance_id: Uuid::new_v4(),
323        })
324    }
325
326    /// Convert headers to a JSONB-compatible format.
327    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
328        let mut header_map = HashMap::new();
329        for (name, values) in headers {
330            if values.len() == 1 {
331                let value_str = String::from_utf8_lossy(&values[0]).to_string();
332                header_map.insert(name.clone(), Value::String(value_str));
333            } else {
334                let value_array: Vec<Value> = values
335                    .iter()
336                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
337                    .collect();
338                header_map.insert(name.clone(), Value::Array(value_array));
339            }
340        }
341        serde_json::to_value(header_map).unwrap_or(Value::Null)
342    }
343
344    /// Convert request data to a JSONB value using the configured serializer.
345    fn request_body_to_json_with_fallback(
346        &self,
347        request_data: &outlet::RequestData,
348    ) -> (Value, bool) {
349        match (self.request_serializer)(request_data) {
350            Ok(typed_value) => {
351                if let Ok(json_value) = serde_json::to_value(&typed_value) {
352                    (json_value, true)
353                } else {
354                    // This should never happen if the type implements Serialize correctly
355                    (
356                        Value::String(
357                            serde_json::to_string(&typed_value)
358                                .expect("Serialized value must be convertible to JSON string"),
359                        ),
360                        false,
361                    )
362                }
363            }
364            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
365        }
366    }
367
368    /// Convert response data to a JSONB value using the configured serializer.
369    fn response_body_to_json_with_fallback(
370        &self,
371        request_data: &outlet::RequestData,
372        response_data: &outlet::ResponseData,
373    ) -> (Value, bool) {
374        match (self.response_serializer)(request_data, response_data) {
375            Ok(typed_value) => {
376                if let Ok(json_value) = serde_json::to_value(&typed_value) {
377                    (json_value, true)
378                } else {
379                    // This should never happen if the type implements Serialize correctly
380                    (
381                        Value::String(
382                            serde_json::to_string(&typed_value)
383                                .expect("Serialized value must be convertible to JSON string"),
384                        ),
385                        false,
386                    )
387                }
388            }
389            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
390        }
391    }
392
393    /// Get a repository for querying logged requests and responses.
394    ///
395    /// Returns a `RequestRepository` with the same type parameters as this handler,
396    /// allowing for type-safe querying of request and response bodies.
397    pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
398        crate::repository::RequestRepository::new(self.pool.clone())
399    }
400}
401
402impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
403where
404    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
405    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
406{
407    #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
408    async fn handle_request(&self, data: RequestData) {
409        let headers_json = Self::headers_to_json(&data.headers);
410        let (body_json, parsed) = if data.body.is_some() {
411            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
412            (Some(json), parsed)
413        } else {
414            (None, false)
415        };
416
417        let timestamp: DateTime<Utc> = data.timestamp.into();
418
419        let result = sqlx::query(
420            r#"
421            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
422            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
423            "#,
424        )
425        .bind(self.instance_id)
426        .bind(data.correlation_id as i64)
427        .bind(timestamp)
428        .bind(data.method.to_string())
429        .bind(data.uri.to_string())
430        .bind(headers_json)
431        .bind(body_json)
432        .bind(parsed)
433        .execute(&self.pool)
434        .await;
435
436        if let Err(e) = result {
437            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
438        } else {
439            let processing_lag_ms = SystemTime::now()
440                .duration_since(data.timestamp)
441                .unwrap_or_default()
442                .as_millis();
443            if processing_lag_ms > 1000 {
444                warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
445            } else {
446                debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
447            }
448        }
449    }
450
451    #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
452    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
453        let headers_json = Self::headers_to_json(&response_data.headers);
454        let (body_json, parsed) = if response_data.body.is_some() {
455            let (json, parsed) =
456                self.response_body_to_json_with_fallback(&request_data, &response_data);
457            (Some(json), parsed)
458        } else {
459            (None, false)
460        };
461
462        let timestamp: DateTime<Utc> = response_data.timestamp.into();
463        let duration_ms = response_data.duration.as_millis() as i64;
464        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
465
466        let result = sqlx::query(
467            r#"
468            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
469            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
470            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
471            "#,
472        )
473        .bind(self.instance_id)
474        .bind(request_data.correlation_id as i64)
475        .bind(timestamp)
476        .bind(response_data.status.as_u16() as i32)
477        .bind(headers_json)
478        .bind(body_json)
479        .bind(parsed)
480        .bind(duration_to_first_byte_ms)
481        .bind(duration_ms)
482        .execute(&self.pool)
483        .await;
484
485        match result {
486            Err(e) => {
487                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
488            }
489            Ok(query_result) => {
490                if query_result.rows_affected() > 0 {
491                    let processing_lag_ms = SystemTime::now()
492                        .duration_since(response_data.timestamp)
493                        .unwrap_or_default()
494                        .as_millis();
495                    if processing_lag_ms > 1000 {
496                        warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
497                    } else {
498                        debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
499                    }
500                } else {
501                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
502                }
503            }
504        }
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511    use bytes::Bytes;
512    use chrono::{DateTime, Utc};
513    use outlet::{RequestData, ResponseData};
514    use serde::{Deserialize, Serialize};
515    use serde_json::Value;
516    use sqlx::PgPool;
517    use std::collections::HashMap;
518    use std::time::{Duration, SystemTime};
519
520    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
521    struct TestRequest {
522        user_id: u64,
523        action: String,
524    }
525
526    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
527    struct TestResponse {
528        success: bool,
529        message: String,
530    }
531
532    fn create_test_request_data() -> RequestData {
533        let mut headers = HashMap::new();
534        headers.insert("content-type".to_string(), vec!["application/json".into()]);
535        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
536
537        let test_req = TestRequest {
538            user_id: 123,
539            action: "create_user".to_string(),
540        };
541        let body = serde_json::to_vec(&test_req).unwrap();
542
543        RequestData {
544            method: http::Method::POST,
545            uri: http::Uri::from_static("/api/users"),
546            headers,
547            body: Some(Bytes::from(body)),
548            timestamp: SystemTime::now(),
549            correlation_id: 0,
550        }
551    }
552
553    fn create_test_response_data() -> ResponseData {
554        let mut headers = HashMap::new();
555        headers.insert("content-type".to_string(), vec!["application/json".into()]);
556
557        let test_res = TestResponse {
558            success: true,
559            message: "User created successfully".to_string(),
560        };
561        let body = serde_json::to_vec(&test_res).unwrap();
562
563        ResponseData {
564            status: http::StatusCode::CREATED,
565            headers,
566            body: Some(Bytes::from(body)),
567            timestamp: SystemTime::now(),
568            duration_to_first_byte: Duration::from_millis(100),
569            duration: Duration::from_millis(150),
570            correlation_id: 0,
571        }
572    }
573
574    #[sqlx::test]
575    async fn test_handler_creation(pool: PgPool) {
576        // Run migrations first
577        crate::migrator().run(&pool).await.unwrap();
578
579        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
580            .await
581            .unwrap();
582
583        // Verify we can get a repository
584        let repository = handler.repository();
585
586        // Test initial state - no requests logged yet
587        let filter = RequestFilter::default();
588        let results = repository.query(filter).await.unwrap();
589        assert!(results.is_empty());
590    }
591
592    #[sqlx::test]
593    async fn test_handle_request_with_typed_body(pool: PgPool) {
594        // Run migrations first
595        crate::migrator().run(&pool).await.unwrap();
596
597        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
598            .await
599            .unwrap();
600        let repository = handler.repository();
601
602        let mut request_data = create_test_request_data();
603        let correlation_id = 12345;
604        request_data.correlation_id = correlation_id;
605
606        // Handle the request
607        handler.handle_request(request_data.clone()).await;
608
609        // Query back the request
610        let filter = RequestFilter {
611            correlation_id: Some(correlation_id as i64),
612            ..Default::default()
613        };
614        let results = repository.query(filter).await.unwrap();
615
616        assert_eq!(results.len(), 1);
617        let pair = &results[0];
618
619        assert_eq!(pair.request.correlation_id, correlation_id as i64);
620        assert_eq!(pair.request.method, "POST");
621        assert_eq!(pair.request.uri, "/api/users");
622
623        // Check that body was parsed successfully
624        match &pair.request.body {
625            Some(Ok(parsed_body)) => {
626                assert_eq!(
627                    *parsed_body,
628                    TestRequest {
629                        user_id: 123,
630                        action: "create_user".to_string(),
631                    }
632                );
633            }
634            _ => panic!("Expected successfully parsed request body"),
635        }
636
637        // Headers should be converted to JSON properly
638        let headers_value = &pair.request.headers;
639        assert!(headers_value.get("content-type").is_some());
640        assert!(headers_value.get("user-agent").is_some());
641
642        // No response yet
643        assert!(pair.response.is_none());
644    }
645
646    #[sqlx::test]
647    async fn test_handle_response_with_typed_body(pool: PgPool) {
648        // Run migrations first
649        crate::migrator().run(&pool).await.unwrap();
650
651        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
652            .await
653            .unwrap();
654        let repository = handler.repository();
655
656        let mut request_data = create_test_request_data();
657        let mut response_data = create_test_response_data();
658        let correlation_id = 54321;
659        request_data.correlation_id = correlation_id;
660        response_data.correlation_id = correlation_id;
661
662        // Handle both request and response
663        handler.handle_request(request_data.clone()).await;
664        handler
665            .handle_response(request_data, response_data.clone())
666            .await;
667
668        // Query back the complete pair
669        let filter = RequestFilter {
670            correlation_id: Some(correlation_id as i64),
671            ..Default::default()
672        };
673        let results = repository.query(filter).await.unwrap();
674
675        assert_eq!(results.len(), 1);
676        let pair = &results[0];
677
678        // Check response data
679        let response = pair.response.as_ref().expect("Response should be present");
680        assert_eq!(response.correlation_id, correlation_id as i64);
681        assert_eq!(response.status_code, 201);
682        assert_eq!(response.duration_ms, 150);
683
684        // Check that response body was parsed successfully
685        match &response.body {
686            Some(Ok(parsed_body)) => {
687                assert_eq!(
688                    *parsed_body,
689                    TestResponse {
690                        success: true,
691                        message: "User created successfully".to_string(),
692                    }
693                );
694            }
695            _ => panic!("Expected successfully parsed response body"),
696        }
697    }
698
699    #[sqlx::test]
700    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
701        // Run migrations first
702        crate::migrator().run(&pool).await.unwrap();
703
704        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
705            .await
706            .unwrap();
707        let repository = handler.repository();
708
709        // Create request with invalid JSON for TestRequest
710        let mut headers = HashMap::new();
711        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
712
713        let invalid_json_body = b"not valid json for TestRequest";
714        let correlation_id = 99999;
715        let request_data = RequestData {
716            method: http::Method::POST,
717            uri: http::Uri::from_static("/api/test"),
718            headers,
719            body: Some(Bytes::from(invalid_json_body.to_vec())),
720            timestamp: SystemTime::now(),
721            correlation_id,
722        };
723
724        handler.handle_request(request_data).await;
725
726        // Query back and verify fallback to base64
727        let filter = RequestFilter {
728            correlation_id: Some(correlation_id as i64),
729            ..Default::default()
730        };
731        let results = repository.query(filter).await.unwrap();
732
733        assert_eq!(results.len(), 1);
734        let pair = &results[0];
735
736        // Should fallback to raw bytes
737        match &pair.request.body {
738            Some(Err(raw_bytes)) => {
739                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
740            }
741            _ => panic!("Expected raw bytes fallback for unparseable body"),
742        }
743    }
744
745    #[sqlx::test]
746    async fn test_query_with_multiple_filters(pool: PgPool) {
747        // Run migrations first
748        crate::migrator().run(&pool).await.unwrap();
749
750        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
751            .await
752            .unwrap();
753        let repository = handler.repository();
754
755        // Insert multiple requests with different characteristics
756        let test_cases = vec![
757            (1001, "GET", "/api/users", 200, 100),
758            (1002, "POST", "/api/users", 201, 150),
759            (1003, "GET", "/api/orders", 404, 50),
760            (1004, "PUT", "/api/users/123", 200, 300),
761        ];
762
763        for (correlation_id, method, uri, status, duration_ms) in test_cases {
764            let mut headers = HashMap::new();
765            headers.insert("content-type".to_string(), vec!["application/json".into()]);
766
767            let request_data = RequestData {
768                method: method.parse().unwrap(),
769                uri: uri.parse().unwrap(),
770                headers: headers.clone(),
771                body: Some(Bytes::from(b"{}".to_vec())),
772                timestamp: SystemTime::now(),
773                correlation_id,
774            };
775
776            let response_data = ResponseData {
777                correlation_id,
778                status: http::StatusCode::from_u16(status).unwrap(),
779                headers,
780                body: Some(Bytes::from(b"{}".to_vec())),
781                timestamp: SystemTime::now(),
782                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
783                duration: Duration::from_millis(duration_ms),
784            };
785
786            handler.handle_request(request_data.clone()).await;
787            handler.handle_response(request_data, response_data).await;
788        }
789
790        // Test method filter
791        let filter = RequestFilter {
792            method: Some("GET".to_string()),
793            ..Default::default()
794        };
795        let results = repository.query(filter).await.unwrap();
796        assert_eq!(results.len(), 2); // 1001, 1003
797
798        // Test status code filter
799        let filter = RequestFilter {
800            status_code: Some(200),
801            ..Default::default()
802        };
803        let results = repository.query(filter).await.unwrap();
804        assert_eq!(results.len(), 2); // 1001, 1004
805
806        // Test URI pattern filter
807        let filter = RequestFilter {
808            uri_pattern: Some("/api/users%".to_string()),
809            ..Default::default()
810        };
811        let results = repository.query(filter).await.unwrap();
812        assert_eq!(results.len(), 3); // 1001, 1002, 1004
813
814        // Test duration range filter
815        let filter = RequestFilter {
816            min_duration_ms: Some(100),
817            max_duration_ms: Some(200),
818            ..Default::default()
819        };
820        let results = repository.query(filter).await.unwrap();
821        assert_eq!(results.len(), 2); // 1001, 1002
822
823        // Test combined filters
824        let filter = RequestFilter {
825            method: Some("GET".to_string()),
826            status_code: Some(200),
827            ..Default::default()
828        };
829        let results = repository.query(filter).await.unwrap();
830        assert_eq!(results.len(), 1); // Only 1001
831        assert_eq!(results[0].request.correlation_id, 1001);
832    }
833
834    #[sqlx::test]
835    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
836        // Run migrations first
837        crate::migrator().run(&pool).await.unwrap();
838
839        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
840            .await
841            .unwrap();
842        let repository = handler.repository();
843
844        // Insert requests with known timestamps
845        let now = SystemTime::now();
846        for i in 0..5 {
847            let correlation_id = 2000 + i;
848            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
849
850            let mut headers = HashMap::new();
851            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
852
853            let request_data = RequestData {
854                method: http::Method::GET,
855                uri: "/api/test".parse().unwrap(),
856                headers,
857                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
858                timestamp,
859                correlation_id,
860            };
861
862            handler.handle_request(request_data).await;
863        }
864
865        // Test default ordering (ASC) with limit
866        let filter = RequestFilter {
867            limit: Some(3),
868            ..Default::default()
869        };
870        let results = repository.query(filter).await.unwrap();
871        assert_eq!(results.len(), 3);
872
873        // Should be in ascending timestamp order
874        for i in 0..2 {
875            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
876        }
877
878        // Test descending order with offset
879        let filter = RequestFilter {
880            order_by_timestamp_desc: true,
881            limit: Some(2),
882            offset: Some(1),
883            ..Default::default()
884        };
885        let results = repository.query(filter).await.unwrap();
886        assert_eq!(results.len(), 2);
887
888        // Should be in descending order, skipping the first (newest) one
889        assert!(results[0].request.timestamp >= results[1].request.timestamp);
890    }
891
892    #[sqlx::test]
893    async fn test_headers_conversion(pool: PgPool) {
894        // Run migrations first
895        crate::migrator().run(&pool).await.unwrap();
896
897        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
898            .await
899            .unwrap();
900        let repository = handler.repository();
901
902        // Test various header scenarios
903        let mut headers = HashMap::new();
904        headers.insert("single-value".to_string(), vec!["test".into()]);
905        headers.insert(
906            "multi-value".to_string(),
907            vec!["val1".into(), "val2".into()],
908        );
909        headers.insert("empty-value".to_string(), vec!["".into()]);
910
911        let request_data = RequestData {
912            correlation_id: 3000,
913            method: http::Method::GET,
914            uri: "/test".parse().unwrap(),
915            headers,
916            body: None,
917            timestamp: SystemTime::now(),
918        };
919
920        let correlation_id = 3000;
921        handler.handle_request(request_data).await;
922
923        let filter = RequestFilter {
924            correlation_id: Some(correlation_id as i64),
925            ..Default::default()
926        };
927        let results = repository.query(filter).await.unwrap();
928
929        assert_eq!(results.len(), 1);
930        let headers_json = &results[0].request.headers;
931
932        // Single value should be stored as string
933        assert_eq!(
934            headers_json["single-value"],
935            Value::String("test".to_string())
936        );
937
938        // Multi-value should be stored as array
939        match &headers_json["multi-value"] {
940            Value::Array(arr) => {
941                assert_eq!(arr.len(), 2);
942                assert_eq!(arr[0], Value::String("val1".to_string()));
943                assert_eq!(arr[1], Value::String("val2".to_string()));
944            }
945            _ => panic!("Expected array for multi-value header"),
946        }
947
948        // Empty value should still be a string
949        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
950    }
951
952    #[sqlx::test]
953    async fn test_timestamp_filtering(pool: PgPool) {
954        // Run migrations first
955        crate::migrator().run(&pool).await.unwrap();
956
957        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
958            .await
959            .unwrap();
960        let repository = handler.repository();
961
962        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
963
964        // Insert requests at different times
965        let times = [
966            base_time + Duration::from_secs(0),    // correlation_id 4001
967            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
968            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
969        ];
970
971        for (i, timestamp) in times.iter().enumerate() {
972            let correlation_id = 4001 + i as u64;
973            let request_data = RequestData {
974                method: http::Method::GET,
975                uri: "/test".parse().unwrap(),
976                headers: HashMap::new(),
977                body: None,
978                timestamp: *timestamp,
979                correlation_id,
980            };
981
982            handler.handle_request(request_data).await;
983        }
984
985        // Test timestamp_after filter
986        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
987        let filter = RequestFilter {
988            timestamp_after: Some(after_time),
989            ..Default::default()
990        };
991        let results = repository.query(filter).await.unwrap();
992        assert_eq!(results.len(), 2); // Should get 4002 and 4003
993
994        // Test timestamp_before filter
995        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
996        let filter = RequestFilter {
997            timestamp_before: Some(before_time),
998            ..Default::default()
999        };
1000        let results = repository.query(filter).await.unwrap();
1001        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1002
1003        // Test timestamp range
1004        let filter = RequestFilter {
1005            timestamp_after: Some(after_time),
1006            timestamp_before: Some(before_time),
1007            ..Default::default()
1008        };
1009        let results = repository.query(filter).await.unwrap();
1010        assert_eq!(results.len(), 1); // Should get only 4002
1011        assert_eq!(results[0].request.correlation_id, 4002);
1012    }
1013
1014    // Note: Path filtering tests have been removed because path filtering
1015    // now happens at the outlet middleware layer, not in the PostgresHandler.
1016    // The handler now logs everything it receives, with filtering done upstream.
1017
1018    #[sqlx::test]
1019    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1020        // Run migrations first
1021        crate::migrator().run(&pool).await.unwrap();
1022
1023        // Handler without any path filtering
1024        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1025            .await
1026            .unwrap();
1027        let repository = handler.repository();
1028
1029        let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1030        for (i, uri) in test_uris.iter().enumerate() {
1031            let correlation_id = 3000 + i as u64;
1032            let mut headers = HashMap::new();
1033            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1034
1035            let request_data = RequestData {
1036                method: http::Method::GET,
1037                uri: uri.parse().unwrap(),
1038                headers,
1039                body: Some(Bytes::from(b"{}".to_vec())),
1040                timestamp: SystemTime::now(),
1041                correlation_id,
1042            };
1043
1044            handler.handle_request(request_data).await;
1045        }
1046
1047        // Should have logged all 4 requests
1048        let filter = RequestFilter::default();
1049        let results = repository.query(filter).await.unwrap();
1050        assert_eq!(results.len(), 4);
1051    }
1052}