outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use metrics::{counter, histogram};
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Instant, SystemTime};
93use tracing::{debug, error, instrument, warn};
94use uuid::Uuid;
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103// Re-export from sqlx-pool-router
104pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
105
106/// Get the migrator for running outlet-postgres database migrations.
107///
108/// This returns a SQLx migrator that can be used to set up the required
109/// `http_requests` and `http_responses` tables. The consuming application
110/// is responsible for running these migrations at the appropriate time
111/// and in the appropriate database schema.
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// use outlet_postgres::migrator;
117/// use sqlx::PgPool;
118///
119/// #[tokio::main]
120/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
121///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
122///     
123///     // Run outlet migrations
124///     migrator().run(&pool).await?;
125///     
126///     Ok(())
127/// }
128/// ```
129pub fn migrator() -> sqlx::migrate::Migrator {
130    sqlx::migrate!("./migrations")
131}
132
133/// Type alias for request body serializers.
134///
135/// Request serializers take full request context including headers and body bytes.
136/// On failure, they return a `SerializationError` with fallback data.
137type RequestSerializer<T> =
138    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
139
140/// Type alias for response body serializers.
141///
142/// Response serializers take both request and response context, allowing them to
143/// make parsing decisions based on request details and response headers (e.g., compression).
144/// On failure, they return a `SerializationError` with fallback data.
145type ResponseSerializer<T> = Arc<
146    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
147        + Send
148        + Sync,
149>;
150
151/// PostgreSQL handler for outlet middleware.
152///
153/// Implements the `RequestHandler` trait to log HTTP requests and responses
154/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
155///
156/// Generic over:
157/// - `P`: Pool provider implementing `PoolProvider` trait for read/write routing
158/// - `TReq` and `TRes`: Request and response body types for JSONB serialization
159///
160/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
161#[derive(Clone)]
162pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
163where
164    P: PoolProvider,
165    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
166    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
167{
168    pool: P,
169    request_serializer: RequestSerializer<TReq>,
170    response_serializer: ResponseSerializer<TRes>,
171    instance_id: Uuid,
172}
173
174impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
175where
176    P: PoolProvider,
177    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
178    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
179{
180    /// Default serializer that attempts serde JSON deserialization.
181    /// On failure, returns a SerializationError with raw bytes as fallback data.
182    fn default_request_serializer() -> RequestSerializer<TReq> {
183        Arc::new(|request_data| {
184            let bytes = request_data.body.as_deref().unwrap_or(&[]);
185            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
186                let fallback_data = String::from_utf8_lossy(bytes).to_string();
187                SerializationError::new(fallback_data, error)
188            })
189        })
190    }
191
192    /// Default serializer that attempts serde JSON deserialization.
193    /// On failure, returns a SerializationError with raw bytes as fallback data.
194    fn default_response_serializer() -> ResponseSerializer<TRes> {
195        Arc::new(|_request_data, response_data| {
196            let bytes = response_data.body.as_deref().unwrap_or(&[]);
197            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
198                let fallback_data = String::from_utf8_lossy(bytes).to_string();
199                SerializationError::new(fallback_data, error)
200            })
201        })
202    }
203
204    /// Add a custom request body serializer.
205    ///
206    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
207    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
208    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
209    ///
210    /// # Panics
211    ///
212    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
213    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
214    /// implementation of `TReq` and should be fixed by the caller.
215    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
216    where
217        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
218    {
219        self.request_serializer = Arc::new(serializer);
220        self
221    }
222
223    /// Add a custom response body serializer.
224    ///
225    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
226    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
227    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
228    ///
229    /// # Panics
230    ///
231    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
232    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
233    /// implementation of `TRes` and should be fixed by the caller.
234    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
235    where
236        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
237            + Send
238            + Sync
239            + 'static,
240    {
241        self.response_serializer = Arc::new(serializer);
242        self
243    }
244
245    /// Create a PostgreSQL handler from a pool provider.
246    ///
247    /// Use this if you want to use a custom pool provider implementation
248    /// (such as `DbPools` for read/write separation).
249    /// This will NOT run migrations - use `migrator()` to run migrations separately.
250    ///
251    /// # Arguments
252    ///
253    /// * `pool_provider` - Pool provider implementing `PoolProvider` trait
254    ///
255    /// # Examples
256    ///
257    /// ```rust,no_run
258    /// use outlet_postgres::{PostgresHandler, DbPools, migrator};
259    /// use sqlx::postgres::PgPoolOptions;
260    /// use serde::{Deserialize, Serialize};
261    ///
262    /// #[derive(Deserialize, Serialize)]
263    /// struct MyBodyType {
264    ///     id: u64,
265    ///     name: String,
266    /// }
267    ///
268    /// #[tokio::main]
269    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
270    ///     let primary = PgPoolOptions::new()
271    ///         .connect("postgresql://user:pass@primary/db").await?;
272    ///     let replica = PgPoolOptions::new()
273    ///         .connect("postgresql://user:pass@replica/db").await?;
274    ///
275    ///     // Run migrations on primary
276    ///     migrator().run(&primary).await?;
277    ///
278    ///     // Create handler with read/write separation
279    ///     let pools = DbPools::with_replica(primary, replica);
280    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::from_pool_provider(pools).await?;
281    ///     Ok(())
282    /// }
283    /// ```
284    pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
285        Ok(Self {
286            pool: pool_provider,
287            request_serializer: Self::default_request_serializer(),
288            response_serializer: Self::default_response_serializer(),
289            instance_id: Uuid::new_v4(),
290        })
291    }
292
293    /// Convert headers to a JSONB-compatible format.
294    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
295        let mut header_map = HashMap::new();
296        for (name, values) in headers {
297            if values.len() == 1 {
298                let value_str = String::from_utf8_lossy(&values[0]).to_string();
299                header_map.insert(name.clone(), Value::String(value_str));
300            } else {
301                let value_array: Vec<Value> = values
302                    .iter()
303                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
304                    .collect();
305                header_map.insert(name.clone(), Value::Array(value_array));
306            }
307        }
308        serde_json::to_value(header_map).unwrap_or(Value::Null)
309    }
310
311    /// Convert request data to a JSONB value using the configured serializer.
312    fn request_body_to_json_with_fallback(
313        &self,
314        request_data: &outlet::RequestData,
315    ) -> (Value, bool) {
316        match (self.request_serializer)(request_data) {
317            Ok(typed_value) => {
318                if let Ok(json_value) = serde_json::to_value(&typed_value) {
319                    (json_value, true)
320                } else {
321                    // This should never happen if the type implements Serialize correctly
322                    (
323                        Value::String(
324                            serde_json::to_string(&typed_value)
325                                .expect("Serialized value must be convertible to JSON string"),
326                        ),
327                        false,
328                    )
329                }
330            }
331            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
332        }
333    }
334
335    /// Convert response data to a JSONB value using the configured serializer.
336    fn response_body_to_json_with_fallback(
337        &self,
338        request_data: &outlet::RequestData,
339        response_data: &outlet::ResponseData,
340    ) -> (Value, bool) {
341        match (self.response_serializer)(request_data, response_data) {
342            Ok(typed_value) => {
343                if let Ok(json_value) = serde_json::to_value(&typed_value) {
344                    (json_value, true)
345                } else {
346                    // This should never happen if the type implements Serialize correctly
347                    (
348                        Value::String(
349                            serde_json::to_string(&typed_value)
350                                .expect("Serialized value must be convertible to JSON string"),
351                        ),
352                        false,
353                    )
354                }
355            }
356            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
357        }
358    }
359
360    /// Get a repository for querying logged requests and responses.
361    ///
362    /// Returns a `RequestRepository` with the same type parameters as this handler,
363    /// allowing for type-safe querying of request and response bodies.
364    pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
365        crate::repository::RequestRepository::new(self.pool.clone())
366    }
367}
368
369// Backward-compatible constructors for PgPool
370impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
371where
372    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
373    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
374{
375    /// Create a new PostgreSQL handler with a connection pool.
376    ///
377    /// This will connect to the database but will NOT run migrations.
378    /// Use `migrator()` to get a migrator and run migrations separately.
379    ///
380    /// # Arguments
381    ///
382    /// * `database_url` - PostgreSQL connection string
383    ///
384    /// # Examples
385    ///
386    /// ```rust,no_run
387    /// use outlet_postgres::{PostgresHandler, migrator};
388    /// use serde::{Deserialize, Serialize};
389    ///
390    /// #[derive(Deserialize, Serialize)]
391    /// struct MyBodyType {
392    ///     id: u64,
393    ///     name: String,
394    /// }
395    ///
396    /// #[tokio::main]
397    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
398    ///     // Run migrations first
399    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
400    ///     migrator().run(&pool).await?;
401    ///
402    ///     // Create handler
403    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
404    ///     Ok(())
405    /// }
406    /// ```
407    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
408        let pool = PgPool::connect(database_url)
409            .await
410            .map_err(PostgresHandlerError::Connection)?;
411
412        Ok(Self {
413            pool,
414            request_serializer: Self::default_request_serializer(),
415            response_serializer: Self::default_response_serializer(),
416            instance_id: Uuid::new_v4(),
417        })
418    }
419
420    /// Create a PostgreSQL handler from an existing connection pool.
421    ///
422    /// Use this if you already have a connection pool and want to reuse it.
423    /// This will NOT run migrations - use `migrator()` to run migrations separately.
424    ///
425    /// # Arguments
426    ///
427    /// * `pool` - Existing PostgreSQL connection pool
428    ///
429    /// # Examples
430    ///
431    /// ```rust,no_run
432    /// use outlet_postgres::{PostgresHandler, migrator};
433    /// use sqlx::PgPool;
434    /// use serde::{Deserialize, Serialize};
435    ///
436    /// #[derive(Deserialize, Serialize)]
437    /// struct MyBodyType {
438    ///     id: u64,
439    ///     name: String,
440    /// }
441    ///
442    /// #[tokio::main]
443    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
444    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
445    ///
446    ///     // Run migrations first
447    ///     migrator().run(&pool).await?;
448    ///
449    ///     // Create handler
450    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::from_pool(pool).await?;
451    ///     Ok(())
452    /// }
453    /// ```
454    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
455        Self::from_pool_provider(pool).await
456    }
457}
458
459impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
460where
461    P: PoolProvider,
462    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
463    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
464{
465    #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
466    async fn handle_request(&self, data: RequestData) {
467        let headers_json = Self::headers_to_json(&data.headers);
468        let (body_json, parsed) = if data.body.is_some() {
469            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
470            (Some(json), parsed)
471        } else {
472            (None, false)
473        };
474
475        let timestamp: DateTime<Utc> = data.timestamp.into();
476
477        let query_start = Instant::now();
478        let result = sqlx::query(
479            r#"
480            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
481            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
482            "#,
483        )
484        .bind(self.instance_id)
485        .bind(data.correlation_id as i64)
486        .bind(timestamp)
487        .bind(data.method.to_string())
488        .bind(data.uri.to_string())
489        .bind(headers_json)
490        .bind(body_json)
491        .bind(parsed)
492        .execute(self.pool.write())
493        .await;
494        let query_duration = query_start.elapsed();
495        histogram!("outlet_write_duration_seconds", "operation" => "request")
496            .record(query_duration.as_secs_f64());
497
498        if let Err(e) = result {
499            counter!("outlet_write_errors_total", "operation" => "request").increment(1);
500            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
501        } else {
502            let processing_lag_ms = SystemTime::now()
503                .duration_since(data.timestamp)
504                .unwrap_or_default()
505                .as_millis();
506            if processing_lag_ms > 1000 {
507                warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
508            } else {
509                debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
510            }
511        }
512    }
513
514    #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
515    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
516        let headers_json = Self::headers_to_json(&response_data.headers);
517        let (body_json, parsed) = if response_data.body.is_some() {
518            let (json, parsed) =
519                self.response_body_to_json_with_fallback(&request_data, &response_data);
520            (Some(json), parsed)
521        } else {
522            (None, false)
523        };
524
525        let timestamp: DateTime<Utc> = response_data.timestamp.into();
526        let duration_ms = response_data.duration.as_millis() as i64;
527        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
528
529        let query_start = Instant::now();
530        let result = sqlx::query(
531            r#"
532            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
533            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
534            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
535            "#,
536        )
537        .bind(self.instance_id)
538        .bind(request_data.correlation_id as i64)
539        .bind(timestamp)
540        .bind(response_data.status.as_u16() as i32)
541        .bind(headers_json)
542        .bind(body_json)
543        .bind(parsed)
544        .bind(duration_to_first_byte_ms)
545        .bind(duration_ms)
546        .execute(self.pool.write())
547        .await;
548        let query_duration = query_start.elapsed();
549        histogram!("outlet_write_duration_seconds", "operation" => "response")
550            .record(query_duration.as_secs_f64());
551
552        match result {
553            Err(e) => {
554                counter!("outlet_write_errors_total", "operation" => "response").increment(1);
555                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
556            }
557            Ok(query_result) => {
558                if query_result.rows_affected() > 0 {
559                    let processing_lag_ms = SystemTime::now()
560                        .duration_since(response_data.timestamp)
561                        .unwrap_or_default()
562                        .as_millis();
563                    if processing_lag_ms > 1000 {
564                        warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
565                    } else {
566                        debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
567                    }
568                } else {
569                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
570                }
571            }
572        }
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use bytes::Bytes;
580    use chrono::{DateTime, Utc};
581    use outlet::{RequestData, ResponseData};
582    use serde::{Deserialize, Serialize};
583    use serde_json::Value;
584    use sqlx::PgPool;
585    use std::collections::HashMap;
586    use std::time::{Duration, SystemTime};
587
588    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
589    struct TestRequest {
590        user_id: u64,
591        action: String,
592    }
593
594    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
595    struct TestResponse {
596        success: bool,
597        message: String,
598    }
599
600    fn create_test_request_data() -> RequestData {
601        let mut headers = HashMap::new();
602        headers.insert("content-type".to_string(), vec!["application/json".into()]);
603        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
604
605        let test_req = TestRequest {
606            user_id: 123,
607            action: "create_user".to_string(),
608        };
609        let body = serde_json::to_vec(&test_req).unwrap();
610
611        RequestData {
612            method: http::Method::POST,
613            uri: http::Uri::from_static("/api/users"),
614            headers,
615            body: Some(Bytes::from(body)),
616            timestamp: SystemTime::now(),
617            correlation_id: 0,
618        }
619    }
620
621    fn create_test_response_data() -> ResponseData {
622        let mut headers = HashMap::new();
623        headers.insert("content-type".to_string(), vec!["application/json".into()]);
624
625        let test_res = TestResponse {
626            success: true,
627            message: "User created successfully".to_string(),
628        };
629        let body = serde_json::to_vec(&test_res).unwrap();
630
631        ResponseData {
632            status: http::StatusCode::CREATED,
633            headers,
634            body: Some(Bytes::from(body)),
635            timestamp: SystemTime::now(),
636            duration_to_first_byte: Duration::from_millis(100),
637            duration: Duration::from_millis(150),
638            correlation_id: 0,
639        }
640    }
641
642    #[sqlx::test]
643    async fn test_handler_creation(pool: PgPool) {
644        // Run migrations first
645        crate::migrator().run(&pool).await.unwrap();
646
647        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
648            .await
649            .unwrap();
650
651        // Verify we can get a repository
652        let repository = handler.repository();
653
654        // Test initial state - no requests logged yet
655        let filter = RequestFilter::default();
656        let results = repository.query(filter).await.unwrap();
657        assert!(results.is_empty());
658    }
659
660    #[sqlx::test]
661    async fn test_handle_request_with_typed_body(pool: PgPool) {
662        // Run migrations first
663        crate::migrator().run(&pool).await.unwrap();
664
665        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
666            .await
667            .unwrap();
668        let repository = handler.repository();
669
670        let mut request_data = create_test_request_data();
671        let correlation_id = 12345;
672        request_data.correlation_id = correlation_id;
673
674        // Handle the request
675        handler.handle_request(request_data.clone()).await;
676
677        // Query back the request
678        let filter = RequestFilter {
679            correlation_id: Some(correlation_id as i64),
680            ..Default::default()
681        };
682        let results = repository.query(filter).await.unwrap();
683
684        assert_eq!(results.len(), 1);
685        let pair = &results[0];
686
687        assert_eq!(pair.request.correlation_id, correlation_id as i64);
688        assert_eq!(pair.request.method, "POST");
689        assert_eq!(pair.request.uri, "/api/users");
690
691        // Check that body was parsed successfully
692        match &pair.request.body {
693            Some(Ok(parsed_body)) => {
694                assert_eq!(
695                    *parsed_body,
696                    TestRequest {
697                        user_id: 123,
698                        action: "create_user".to_string(),
699                    }
700                );
701            }
702            _ => panic!("Expected successfully parsed request body"),
703        }
704
705        // Headers should be converted to JSON properly
706        let headers_value = &pair.request.headers;
707        assert!(headers_value.get("content-type").is_some());
708        assert!(headers_value.get("user-agent").is_some());
709
710        // No response yet
711        assert!(pair.response.is_none());
712    }
713
714    #[sqlx::test]
715    async fn test_handle_response_with_typed_body(pool: PgPool) {
716        // Run migrations first
717        crate::migrator().run(&pool).await.unwrap();
718
719        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
720            .await
721            .unwrap();
722        let repository = handler.repository();
723
724        let mut request_data = create_test_request_data();
725        let mut response_data = create_test_response_data();
726        let correlation_id = 54321;
727        request_data.correlation_id = correlation_id;
728        response_data.correlation_id = correlation_id;
729
730        // Handle both request and response
731        handler.handle_request(request_data.clone()).await;
732        handler
733            .handle_response(request_data, response_data.clone())
734            .await;
735
736        // Query back the complete pair
737        let filter = RequestFilter {
738            correlation_id: Some(correlation_id as i64),
739            ..Default::default()
740        };
741        let results = repository.query(filter).await.unwrap();
742
743        assert_eq!(results.len(), 1);
744        let pair = &results[0];
745
746        // Check response data
747        let response = pair.response.as_ref().expect("Response should be present");
748        assert_eq!(response.correlation_id, correlation_id as i64);
749        assert_eq!(response.status_code, 201);
750        assert_eq!(response.duration_ms, 150);
751
752        // Check that response body was parsed successfully
753        match &response.body {
754            Some(Ok(parsed_body)) => {
755                assert_eq!(
756                    *parsed_body,
757                    TestResponse {
758                        success: true,
759                        message: "User created successfully".to_string(),
760                    }
761                );
762            }
763            _ => panic!("Expected successfully parsed response body"),
764        }
765    }
766
767    #[sqlx::test]
768    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
769        // Run migrations first
770        crate::migrator().run(&pool).await.unwrap();
771
772        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
773            .await
774            .unwrap();
775        let repository = handler.repository();
776
777        // Create request with invalid JSON for TestRequest
778        let mut headers = HashMap::new();
779        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
780
781        let invalid_json_body = b"not valid json for TestRequest";
782        let correlation_id = 99999;
783        let request_data = RequestData {
784            method: http::Method::POST,
785            uri: http::Uri::from_static("/api/test"),
786            headers,
787            body: Some(Bytes::from(invalid_json_body.to_vec())),
788            timestamp: SystemTime::now(),
789            correlation_id,
790        };
791
792        handler.handle_request(request_data).await;
793
794        // Query back and verify fallback to base64
795        let filter = RequestFilter {
796            correlation_id: Some(correlation_id as i64),
797            ..Default::default()
798        };
799        let results = repository.query(filter).await.unwrap();
800
801        assert_eq!(results.len(), 1);
802        let pair = &results[0];
803
804        // Should fallback to raw bytes
805        match &pair.request.body {
806            Some(Err(raw_bytes)) => {
807                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
808            }
809            _ => panic!("Expected raw bytes fallback for unparseable body"),
810        }
811    }
812
813    #[sqlx::test]
814    async fn test_query_with_multiple_filters(pool: PgPool) {
815        // Run migrations first
816        crate::migrator().run(&pool).await.unwrap();
817
818        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
819            .await
820            .unwrap();
821        let repository = handler.repository();
822
823        // Insert multiple requests with different characteristics
824        let test_cases = vec![
825            (1001, "GET", "/api/users", 200, 100),
826            (1002, "POST", "/api/users", 201, 150),
827            (1003, "GET", "/api/orders", 404, 50),
828            (1004, "PUT", "/api/users/123", 200, 300),
829        ];
830
831        for (correlation_id, method, uri, status, duration_ms) in test_cases {
832            let mut headers = HashMap::new();
833            headers.insert("content-type".to_string(), vec!["application/json".into()]);
834
835            let request_data = RequestData {
836                method: method.parse().unwrap(),
837                uri: uri.parse().unwrap(),
838                headers: headers.clone(),
839                body: Some(Bytes::from(b"{}".to_vec())),
840                timestamp: SystemTime::now(),
841                correlation_id,
842            };
843
844            let response_data = ResponseData {
845                correlation_id,
846                status: http::StatusCode::from_u16(status).unwrap(),
847                headers,
848                body: Some(Bytes::from(b"{}".to_vec())),
849                timestamp: SystemTime::now(),
850                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
851                duration: Duration::from_millis(duration_ms),
852            };
853
854            handler.handle_request(request_data.clone()).await;
855            handler.handle_response(request_data, response_data).await;
856        }
857
858        // Test method filter
859        let filter = RequestFilter {
860            method: Some("GET".to_string()),
861            ..Default::default()
862        };
863        let results = repository.query(filter).await.unwrap();
864        assert_eq!(results.len(), 2); // 1001, 1003
865
866        // Test status code filter
867        let filter = RequestFilter {
868            status_code: Some(200),
869            ..Default::default()
870        };
871        let results = repository.query(filter).await.unwrap();
872        assert_eq!(results.len(), 2); // 1001, 1004
873
874        // Test URI pattern filter
875        let filter = RequestFilter {
876            uri_pattern: Some("/api/users%".to_string()),
877            ..Default::default()
878        };
879        let results = repository.query(filter).await.unwrap();
880        assert_eq!(results.len(), 3); // 1001, 1002, 1004
881
882        // Test duration range filter
883        let filter = RequestFilter {
884            min_duration_ms: Some(100),
885            max_duration_ms: Some(200),
886            ..Default::default()
887        };
888        let results = repository.query(filter).await.unwrap();
889        assert_eq!(results.len(), 2); // 1001, 1002
890
891        // Test combined filters
892        let filter = RequestFilter {
893            method: Some("GET".to_string()),
894            status_code: Some(200),
895            ..Default::default()
896        };
897        let results = repository.query(filter).await.unwrap();
898        assert_eq!(results.len(), 1); // Only 1001
899        assert_eq!(results[0].request.correlation_id, 1001);
900    }
901
902    #[sqlx::test]
903    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
904        // Run migrations first
905        crate::migrator().run(&pool).await.unwrap();
906
907        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
908            .await
909            .unwrap();
910        let repository = handler.repository();
911
912        // Insert requests with known timestamps
913        let now = SystemTime::now();
914        for i in 0..5 {
915            let correlation_id = 2000 + i;
916            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
917
918            let mut headers = HashMap::new();
919            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
920
921            let request_data = RequestData {
922                method: http::Method::GET,
923                uri: "/api/test".parse().unwrap(),
924                headers,
925                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
926                timestamp,
927                correlation_id,
928            };
929
930            handler.handle_request(request_data).await;
931        }
932
933        // Test default ordering (ASC) with limit
934        let filter = RequestFilter {
935            limit: Some(3),
936            ..Default::default()
937        };
938        let results = repository.query(filter).await.unwrap();
939        assert_eq!(results.len(), 3);
940
941        // Should be in ascending timestamp order
942        for i in 0..2 {
943            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
944        }
945
946        // Test descending order with offset
947        let filter = RequestFilter {
948            order_by_timestamp_desc: true,
949            limit: Some(2),
950            offset: Some(1),
951            ..Default::default()
952        };
953        let results = repository.query(filter).await.unwrap();
954        assert_eq!(results.len(), 2);
955
956        // Should be in descending order, skipping the first (newest) one
957        assert!(results[0].request.timestamp >= results[1].request.timestamp);
958    }
959
960    #[sqlx::test]
961    async fn test_headers_conversion(pool: PgPool) {
962        // Run migrations first
963        crate::migrator().run(&pool).await.unwrap();
964
965        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
966            .await
967            .unwrap();
968        let repository = handler.repository();
969
970        // Test various header scenarios
971        let mut headers = HashMap::new();
972        headers.insert("single-value".to_string(), vec!["test".into()]);
973        headers.insert(
974            "multi-value".to_string(),
975            vec!["val1".into(), "val2".into()],
976        );
977        headers.insert("empty-value".to_string(), vec!["".into()]);
978
979        let request_data = RequestData {
980            correlation_id: 3000,
981            method: http::Method::GET,
982            uri: "/test".parse().unwrap(),
983            headers,
984            body: None,
985            timestamp: SystemTime::now(),
986        };
987
988        let correlation_id = 3000;
989        handler.handle_request(request_data).await;
990
991        let filter = RequestFilter {
992            correlation_id: Some(correlation_id as i64),
993            ..Default::default()
994        };
995        let results = repository.query(filter).await.unwrap();
996
997        assert_eq!(results.len(), 1);
998        let headers_json = &results[0].request.headers;
999
1000        // Single value should be stored as string
1001        assert_eq!(
1002            headers_json["single-value"],
1003            Value::String("test".to_string())
1004        );
1005
1006        // Multi-value should be stored as array
1007        match &headers_json["multi-value"] {
1008            Value::Array(arr) => {
1009                assert_eq!(arr.len(), 2);
1010                assert_eq!(arr[0], Value::String("val1".to_string()));
1011                assert_eq!(arr[1], Value::String("val2".to_string()));
1012            }
1013            _ => panic!("Expected array for multi-value header"),
1014        }
1015
1016        // Empty value should still be a string
1017        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1018    }
1019
1020    #[sqlx::test]
1021    async fn test_timestamp_filtering(pool: PgPool) {
1022        // Run migrations first
1023        crate::migrator().run(&pool).await.unwrap();
1024
1025        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1026            .await
1027            .unwrap();
1028        let repository = handler.repository();
1029
1030        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
1031
1032        // Insert requests at different times
1033        let times = [
1034            base_time + Duration::from_secs(0),    // correlation_id 4001
1035            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
1036            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
1037        ];
1038
1039        for (i, timestamp) in times.iter().enumerate() {
1040            let correlation_id = 4001 + i as u64;
1041            let request_data = RequestData {
1042                method: http::Method::GET,
1043                uri: "/test".parse().unwrap(),
1044                headers: HashMap::new(),
1045                body: None,
1046                timestamp: *timestamp,
1047                correlation_id,
1048            };
1049
1050            handler.handle_request(request_data).await;
1051        }
1052
1053        // Test timestamp_after filter
1054        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
1055        let filter = RequestFilter {
1056            timestamp_after: Some(after_time),
1057            ..Default::default()
1058        };
1059        let results = repository.query(filter).await.unwrap();
1060        assert_eq!(results.len(), 2); // Should get 4002 and 4003
1061
1062        // Test timestamp_before filter
1063        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
1064        let filter = RequestFilter {
1065            timestamp_before: Some(before_time),
1066            ..Default::default()
1067        };
1068        let results = repository.query(filter).await.unwrap();
1069        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1070
1071        // Test timestamp range
1072        let filter = RequestFilter {
1073            timestamp_after: Some(after_time),
1074            timestamp_before: Some(before_time),
1075            ..Default::default()
1076        };
1077        let results = repository.query(filter).await.unwrap();
1078        assert_eq!(results.len(), 1); // Should get only 4002
1079        assert_eq!(results[0].request.correlation_id, 4002);
1080    }
1081
1082    // Note: Path filtering tests have been removed because path filtering
1083    // now happens at the outlet middleware layer, not in the PostgresHandler.
1084    // The handler now logs everything it receives, with filtering done upstream.
1085
1086    #[sqlx::test]
1087    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1088        // Run migrations first
1089        crate::migrator().run(&pool).await.unwrap();
1090
1091        // Handler without any path filtering
1092        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1093            .await
1094            .unwrap();
1095        let repository = handler.repository();
1096
1097        let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
1098        for (i, uri) in test_uris.iter().enumerate() {
1099            let correlation_id = 3000 + i as u64;
1100            let mut headers = HashMap::new();
1101            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1102
1103            let request_data = RequestData {
1104                method: http::Method::GET,
1105                uri: uri.parse().unwrap(),
1106                headers,
1107                body: Some(Bytes::from(b"{}".to_vec())),
1108                timestamp: SystemTime::now(),
1109                correlation_id,
1110            };
1111
1112            handler.handle_request(request_data).await;
1113        }
1114
1115        // Should have logged all 4 requests
1116        let filter = RequestFilter::default();
1117        let results = repository.query(filter).await.unwrap();
1118        assert_eq!(results.len(), 4);
1119    }
1120
1121    // Tests for read/write pool separation using TestDbPools
1122    #[sqlx::test]
1123    async fn test_write_operations_use_write_pool(pool: PgPool) {
1124        // Run migrations first
1125        crate::migrator().run(&pool).await.unwrap();
1126
1127        // Create TestDbPools which has a read-only replica
1128        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1129        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1130            .await
1131            .unwrap();
1132
1133        let mut request_data = create_test_request_data();
1134        let correlation_id = 5001;
1135        request_data.correlation_id = correlation_id;
1136
1137        // This should succeed because handle_request uses .write() which goes to primary
1138        handler.handle_request(request_data.clone()).await;
1139
1140        // Verify the write succeeded by reading from the primary pool
1141        let count: i64 =
1142            sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
1143                .bind(correlation_id as i64)
1144                .fetch_one(test_pools.write())
1145                .await
1146                .unwrap();
1147
1148        assert_eq!(count, 1, "Request should be written to primary pool");
1149    }
1150
1151    #[sqlx::test]
1152    async fn test_response_write_uses_write_pool(pool: PgPool) {
1153        // Run migrations first
1154        crate::migrator().run(&pool).await.unwrap();
1155
1156        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1157        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1158            .await
1159            .unwrap();
1160
1161        let mut request_data = create_test_request_data();
1162        let mut response_data = create_test_response_data();
1163        let correlation_id = 5002;
1164        request_data.correlation_id = correlation_id;
1165        response_data.correlation_id = correlation_id;
1166
1167        // Write request first
1168        handler.handle_request(request_data.clone()).await;
1169
1170        // Write response - should succeed because it uses .write()
1171        handler.handle_response(request_data, response_data).await;
1172
1173        // Verify both were written
1174        let count: i64 =
1175            sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
1176                .bind(correlation_id as i64)
1177                .fetch_one(test_pools.write())
1178                .await
1179                .unwrap();
1180
1181        assert_eq!(count, 1, "Response should be written to primary pool");
1182    }
1183
1184    #[sqlx::test]
1185    async fn test_repository_queries_use_read_pool(pool: PgPool) {
1186        // Run migrations first
1187        crate::migrator().run(&pool).await.unwrap();
1188
1189        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1190        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1191            .await
1192            .unwrap();
1193
1194        // Write some data using the handler (which uses write pool)
1195        let mut request_data = create_test_request_data();
1196        let correlation_id = 5003;
1197        request_data.correlation_id = correlation_id;
1198        handler.handle_request(request_data).await;
1199
1200        // Query using repository - should succeed because it uses .read()
1201        let repository = handler.repository();
1202        let filter = RequestFilter {
1203            correlation_id: Some(correlation_id as i64),
1204            ..Default::default()
1205        };
1206
1207        // This will succeed if repository.query() correctly uses .read()
1208        let results = repository.query(filter).await.unwrap();
1209        assert_eq!(results.len(), 1);
1210        assert_eq!(results[0].request.correlation_id, correlation_id as i64);
1211    }
1212
1213    #[sqlx::test]
1214    async fn test_replica_pool_rejects_writes(pool: PgPool) {
1215        // Run migrations first
1216        crate::migrator().run(&pool).await.unwrap();
1217
1218        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1219
1220        // Verify that the replica pool is actually read-only
1221        let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
1222            .bind(Uuid::new_v4())
1223            .bind(9999i64)
1224            .bind(Utc::now())
1225            .bind("GET")
1226            .bind("/test")
1227            .bind(serde_json::json!({}))
1228            .bind(None::<Value>)
1229            .bind(false)
1230            .execute(test_pools.read())
1231            .await;
1232
1233        // Should fail with a read-only transaction error
1234        assert!(
1235            result.is_err(),
1236            "Replica pool should reject write operations"
1237        );
1238
1239        let err = result.unwrap_err();
1240        let err_msg = err.to_string().to_lowercase();
1241        assert!(
1242            err_msg.contains("read-only") || err_msg.contains("read only"),
1243            "Error should mention read-only: {}",
1244            err
1245        );
1246    }
1247
1248    #[sqlx::test]
1249    async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
1250        // Run migrations first
1251        crate::migrator().run(&pool).await.unwrap();
1252
1253        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1254        let handler =
1255            PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
1256                .await
1257                .unwrap();
1258
1259        let mut request_data = create_test_request_data();
1260        let mut response_data = create_test_response_data();
1261        let correlation_id = 5004;
1262        request_data.correlation_id = correlation_id;
1263        response_data.correlation_id = correlation_id;
1264
1265        // Write request and response (uses write pool)
1266        handler.handle_request(request_data.clone()).await;
1267        handler.handle_response(request_data, response_data).await;
1268
1269        // Query back using repository (uses read pool)
1270        let repository = handler.repository();
1271        let filter = RequestFilter {
1272            correlation_id: Some(correlation_id as i64),
1273            ..Default::default()
1274        };
1275
1276        let results = repository.query(filter).await.unwrap();
1277        assert_eq!(results.len(), 1);
1278
1279        // Verify request data
1280        let pair = &results[0];
1281        assert_eq!(pair.request.correlation_id, correlation_id as i64);
1282        assert_eq!(pair.request.method, "POST");
1283        assert_eq!(pair.request.uri, "/api/users");
1284
1285        // Verify response data
1286        let response = pair.response.as_ref().expect("Response should exist");
1287        assert_eq!(response.correlation_id, correlation_id as i64);
1288        assert_eq!(response.status_code, 201);
1289
1290        // Verify parsed bodies
1291        match &pair.request.body {
1292            Some(Ok(parsed_body)) => {
1293                assert_eq!(
1294                    *parsed_body,
1295                    TestRequest {
1296                        user_id: 123,
1297                        action: "create_user".to_string(),
1298                    }
1299                );
1300            }
1301            _ => panic!("Expected successfully parsed request body"),
1302        }
1303
1304        match &response.body {
1305            Some(Ok(parsed_body)) => {
1306                assert_eq!(
1307                    *parsed_body,
1308                    TestResponse {
1309                        success: true,
1310                        message: "User created successfully".to_string(),
1311                    }
1312                );
1313            }
1314            _ => panic!("Expected successfully parsed response body"),
1315        }
1316    }
1317}