Skip to main content

outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use metrics::{counter, histogram};
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Instant, SystemTime};
93use tracing::{debug, error, instrument, warn};
94use uuid::Uuid;
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103// Re-export from sqlx-pool-router
104pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
105
106/// Get the migrator for running outlet-postgres database migrations.
107///
108/// This returns a SQLx migrator that can be used to set up the required
109/// `http_requests` and `http_responses` tables. The consuming application
110/// is responsible for running these migrations at the appropriate time
111/// and in the appropriate database schema.
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// use outlet_postgres::migrator;
117/// use sqlx::PgPool;
118///
119/// #[tokio::main]
120/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
121///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
122///     
123///     // Run outlet migrations
124///     migrator().run(&pool).await?;
125///     
126///     Ok(())
127/// }
128/// ```
129pub fn migrator() -> sqlx::migrate::Migrator {
130    sqlx::migrate!("./migrations")
131}
132
133/// Type alias for request body serializers.
134///
135/// Request serializers take full request context including headers and body bytes.
136/// On failure, they return a `SerializationError` with fallback data.
137type RequestSerializer<T> =
138    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
139
140/// Type alias for response body serializers.
141///
142/// Response serializers take both request and response context, allowing them to
143/// make parsing decisions based on request details and response headers (e.g., compression).
144/// On failure, they return a `SerializationError` with fallback data.
145type ResponseSerializer<T> = Arc<
146    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
147        + Send
148        + Sync,
149>;
150
151/// PostgreSQL handler for outlet middleware.
152///
153/// Implements the `RequestHandler` trait to log HTTP requests and responses
154/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
155///
156/// Generic over:
157/// - `P`: Pool provider implementing `PoolProvider` trait for read/write routing
158/// - `TReq` and `TRes`: Request and response body types for JSONB serialization
159///
160/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
161#[derive(Clone)]
162pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
163where
164    P: PoolProvider,
165    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
166    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
167{
168    pool: P,
169    request_serializer: RequestSerializer<TReq>,
170    response_serializer: ResponseSerializer<TRes>,
171    instance_id: Uuid,
172}
173
174impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
175where
176    P: PoolProvider,
177    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
178    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
179{
180    /// Default serializer that attempts serde JSON deserialization.
181    /// On failure, returns a SerializationError with raw bytes as fallback data.
182    fn default_request_serializer() -> RequestSerializer<TReq> {
183        Arc::new(|request_data| {
184            let bytes = request_data.body.as_deref().unwrap_or(&[]);
185            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
186                let fallback_data = String::from_utf8_lossy(bytes).to_string();
187                SerializationError::new(fallback_data, error)
188            })
189        })
190    }
191
192    /// Default serializer that attempts serde JSON deserialization.
193    /// On failure, returns a SerializationError with raw bytes as fallback data.
194    fn default_response_serializer() -> ResponseSerializer<TRes> {
195        Arc::new(|_request_data, response_data| {
196            let bytes = response_data.body.as_deref().unwrap_or(&[]);
197            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
198                let fallback_data = String::from_utf8_lossy(bytes).to_string();
199                SerializationError::new(fallback_data, error)
200            })
201        })
202    }
203
204    /// Add a custom request body serializer.
205    ///
206    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
207    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
208    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
209    ///
210    /// # Panics
211    ///
212    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
213    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
214    /// implementation of `TReq` and should be fixed by the caller.
215    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
216    where
217        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
218    {
219        self.request_serializer = Arc::new(serializer);
220        self
221    }
222
223    /// Add a custom response body serializer.
224    ///
225    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
226    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
227    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
228    ///
229    /// # Panics
230    ///
231    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
232    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
233    /// implementation of `TRes` and should be fixed by the caller.
234    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
235    where
236        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
237            + Send
238            + Sync
239            + 'static,
240    {
241        self.response_serializer = Arc::new(serializer);
242        self
243    }
244
245    /// Create a PostgreSQL handler from a pool provider.
246    ///
247    /// Use this if you want to use a custom pool provider implementation
248    /// (such as `DbPools` for read/write separation).
249    /// This will NOT run migrations - use `migrator()` to run migrations separately.
250    ///
251    /// # Arguments
252    ///
253    /// * `pool_provider` - Pool provider implementing `PoolProvider` trait
254    ///
255    /// # Examples
256    ///
257    /// ```rust,no_run
258    /// use outlet_postgres::{PostgresHandler, DbPools, migrator};
259    /// use sqlx::postgres::PgPoolOptions;
260    /// use serde::{Deserialize, Serialize};
261    ///
262    /// #[derive(Deserialize, Serialize)]
263    /// struct MyBodyType {
264    ///     id: u64,
265    ///     name: String,
266    /// }
267    ///
268    /// #[tokio::main]
269    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
270    ///     let primary = PgPoolOptions::new()
271    ///         .connect("postgresql://user:pass@primary/db").await?;
272    ///     let replica = PgPoolOptions::new()
273    ///         .connect("postgresql://user:pass@replica/db").await?;
274    ///
275    ///     // Run migrations on primary
276    ///     migrator().run(&primary).await?;
277    ///
278    ///     // Create handler with read/write separation
279    ///     let pools = DbPools::with_replica(primary, replica);
280    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::from_pool_provider(pools).await?;
281    ///     Ok(())
282    /// }
283    /// ```
284    pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
285        Ok(Self {
286            pool: pool_provider,
287            request_serializer: Self::default_request_serializer(),
288            response_serializer: Self::default_response_serializer(),
289            instance_id: Uuid::new_v4(),
290        })
291    }
292
293    /// Convert headers to a JSONB-compatible format.
294    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
295        let mut header_map = HashMap::new();
296        for (name, values) in headers {
297            if values.len() == 1 {
298                let value_str = String::from_utf8_lossy(&values[0]).to_string();
299                header_map.insert(name.clone(), Value::String(value_str));
300            } else {
301                let value_array: Vec<Value> = values
302                    .iter()
303                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
304                    .collect();
305                header_map.insert(name.clone(), Value::Array(value_array));
306            }
307        }
308        serde_json::to_value(header_map).unwrap_or(Value::Null)
309    }
310
311    /// Convert request data to a JSONB value using the configured serializer.
312    fn request_body_to_json_with_fallback(
313        &self,
314        request_data: &outlet::RequestData,
315    ) -> (Value, bool) {
316        match (self.request_serializer)(request_data) {
317            Ok(typed_value) => {
318                if let Ok(json_value) = serde_json::to_value(&typed_value) {
319                    (json_value, true)
320                } else {
321                    // This should never happen if the type implements Serialize correctly
322                    (
323                        Value::String(
324                            serde_json::to_string(&typed_value)
325                                .expect("Serialized value must be convertible to JSON string"),
326                        ),
327                        false,
328                    )
329                }
330            }
331            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
332        }
333    }
334
335    /// Convert response data to a JSONB value using the configured serializer.
336    fn response_body_to_json_with_fallback(
337        &self,
338        request_data: &outlet::RequestData,
339        response_data: &outlet::ResponseData,
340    ) -> (Value, bool) {
341        match (self.response_serializer)(request_data, response_data) {
342            Ok(typed_value) => {
343                if let Ok(json_value) = serde_json::to_value(&typed_value) {
344                    (json_value, true)
345                } else {
346                    // This should never happen if the type implements Serialize correctly
347                    (
348                        Value::String(
349                            serde_json::to_string(&typed_value)
350                                .expect("Serialized value must be convertible to JSON string"),
351                        ),
352                        false,
353                    )
354                }
355            }
356            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
357        }
358    }
359
360    /// Get a repository for querying logged requests and responses.
361    ///
362    /// Returns a `RequestRepository` with the same type parameters as this handler,
363    /// allowing for type-safe querying of request and response bodies.
364    pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
365        crate::repository::RequestRepository::new(self.pool.clone())
366    }
367}
368
369// Backward-compatible constructors for PgPool
370impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
371where
372    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
373    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
374{
375    /// Create a new PostgreSQL handler with a connection pool.
376    ///
377    /// This will connect to the database but will NOT run migrations.
378    /// Use `migrator()` to get a migrator and run migrations separately.
379    ///
380    /// # Arguments
381    ///
382    /// * `database_url` - PostgreSQL connection string
383    ///
384    /// # Examples
385    ///
386    /// ```rust,no_run
387    /// use outlet_postgres::{PostgresHandler, migrator};
388    /// use serde::{Deserialize, Serialize};
389    ///
390    /// #[derive(Deserialize, Serialize)]
391    /// struct MyBodyType {
392    ///     id: u64,
393    ///     name: String,
394    /// }
395    ///
396    /// #[tokio::main]
397    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
398    ///     // Run migrations first
399    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
400    ///     migrator().run(&pool).await?;
401    ///
402    ///     // Create handler
403    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
404    ///     Ok(())
405    /// }
406    /// ```
407    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
408        let pool = PgPool::connect(database_url)
409            .await
410            .map_err(PostgresHandlerError::Connection)?;
411
412        Ok(Self {
413            pool,
414            request_serializer: Self::default_request_serializer(),
415            response_serializer: Self::default_response_serializer(),
416            instance_id: Uuid::new_v4(),
417        })
418    }
419
420    /// Create a PostgreSQL handler from an existing connection pool.
421    ///
422    /// Use this if you already have a connection pool and want to reuse it.
423    /// This will NOT run migrations - use `migrator()` to run migrations separately.
424    ///
425    /// # Arguments
426    ///
427    /// * `pool` - Existing PostgreSQL connection pool
428    ///
429    /// # Examples
430    ///
431    /// ```rust,no_run
432    /// use outlet_postgres::{PostgresHandler, migrator};
433    /// use sqlx::PgPool;
434    /// use serde::{Deserialize, Serialize};
435    ///
436    /// #[derive(Deserialize, Serialize)]
437    /// struct MyBodyType {
438    ///     id: u64,
439    ///     name: String,
440    /// }
441    ///
442    /// #[tokio::main]
443    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
444    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
445    ///
446    ///     // Run migrations first
447    ///     migrator().run(&pool).await?;
448    ///
449    ///     // Create handler
450    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::from_pool(pool).await?;
451    ///     Ok(())
452    /// }
453    /// ```
454    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
455        Self::from_pool_provider(pool).await
456    }
457}
458
459impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
460where
461    P: PoolProvider,
462    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
463    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
464{
465    #[instrument(name = "outlet.handle_request", skip(self, data), fields(correlation_id = %data.correlation_id))]
466    async fn handle_request(&self, data: RequestData) {
467        let headers_json = Self::headers_to_json(&data.headers);
468        let (body_json, parsed) = if data.body.is_some() {
469            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
470            (Some(json), parsed)
471        } else {
472            (None, false)
473        };
474
475        let timestamp: DateTime<Utc> = data.timestamp.into();
476
477        let query_start = Instant::now();
478        let result = sqlx::query(
479            r#"
480            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
481            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
482            "#,
483        )
484        .bind(self.instance_id)
485        .bind(data.correlation_id as i64)
486        .bind(timestamp)
487        .bind(data.method.to_string())
488        .bind(data.uri.to_string())
489        .bind(headers_json)
490        .bind(body_json)
491        .bind(parsed)
492        .bind(&data.trace_id)
493        .bind(&data.span_id)
494        .execute(self.pool.write())
495        .await;
496        let query_duration = query_start.elapsed();
497        histogram!("outlet_write_duration_seconds", "operation" => "request")
498            .record(query_duration.as_secs_f64());
499
500        if let Err(e) = result {
501            counter!("outlet_write_errors_total", "operation" => "request").increment(1);
502            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
503        } else {
504            let processing_lag_ms = SystemTime::now()
505                .duration_since(data.timestamp)
506                .unwrap_or_default()
507                .as_millis();
508            if processing_lag_ms > 1000 {
509                warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
510            } else {
511                debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
512            }
513        }
514    }
515
516    #[instrument(name = "outlet.handle_response", skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
517    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
518        let headers_json = Self::headers_to_json(&response_data.headers);
519        let (body_json, parsed) = if response_data.body.is_some() {
520            let (json, parsed) =
521                self.response_body_to_json_with_fallback(&request_data, &response_data);
522            (Some(json), parsed)
523        } else {
524            (None, false)
525        };
526
527        let timestamp: DateTime<Utc> = response_data.timestamp.into();
528        let duration_ms = response_data.duration.as_millis() as i64;
529        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
530
531        let query_start = Instant::now();
532        let result = sqlx::query(
533            r#"
534            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
535            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
536            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
537            "#,
538        )
539        .bind(self.instance_id)
540        .bind(request_data.correlation_id as i64)
541        .bind(timestamp)
542        .bind(response_data.status.as_u16() as i32)
543        .bind(headers_json)
544        .bind(body_json)
545        .bind(parsed)
546        .bind(duration_to_first_byte_ms)
547        .bind(duration_ms)
548        .execute(self.pool.write())
549        .await;
550        let query_duration = query_start.elapsed();
551        histogram!("outlet_write_duration_seconds", "operation" => "response")
552            .record(query_duration.as_secs_f64());
553
554        match result {
555            Err(e) => {
556                counter!("outlet_write_errors_total", "operation" => "response").increment(1);
557                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
558            }
559            Ok(query_result) => {
560                if query_result.rows_affected() > 0 {
561                    let processing_lag_ms = SystemTime::now()
562                        .duration_since(response_data.timestamp)
563                        .unwrap_or_default()
564                        .as_millis();
565                    if processing_lag_ms > 1000 {
566                        warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
567                    } else {
568                        debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
569                    }
570                } else {
571                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
572                }
573            }
574        }
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581    use bytes::Bytes;
582    use chrono::{DateTime, Utc};
583    use outlet::{RequestData, ResponseData};
584    use serde::{Deserialize, Serialize};
585    use serde_json::Value;
586    use sqlx::PgPool;
587    use std::collections::HashMap;
588    use std::time::{Duration, SystemTime};
589
590    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
591    struct TestRequest {
592        user_id: u64,
593        action: String,
594    }
595
596    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
597    struct TestResponse {
598        success: bool,
599        message: String,
600    }
601
602    fn create_test_request_data() -> RequestData {
603        let mut headers = HashMap::new();
604        headers.insert("content-type".to_string(), vec!["application/json".into()]);
605        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
606
607        let test_req = TestRequest {
608            user_id: 123,
609            action: "create_user".to_string(),
610        };
611        let body = serde_json::to_vec(&test_req).unwrap();
612
613        RequestData {
614            method: http::Method::POST,
615            uri: http::Uri::from_static("/api/users"),
616            headers,
617            body: Some(Bytes::from(body)),
618            timestamp: SystemTime::now(),
619            correlation_id: 0,
620            trace_id: None,
621            span_id: None,
622        }
623    }
624
625    fn create_test_response_data() -> ResponseData {
626        let mut headers = HashMap::new();
627        headers.insert("content-type".to_string(), vec!["application/json".into()]);
628
629        let test_res = TestResponse {
630            success: true,
631            message: "User created successfully".to_string(),
632        };
633        let body = serde_json::to_vec(&test_res).unwrap();
634
635        ResponseData {
636            status: http::StatusCode::CREATED,
637            headers,
638            body: Some(Bytes::from(body)),
639            timestamp: SystemTime::now(),
640            duration_to_first_byte: Duration::from_millis(100),
641            duration: Duration::from_millis(150),
642            correlation_id: 0,
643        }
644    }
645
646    #[sqlx::test]
647    async fn test_handler_creation(pool: PgPool) {
648        // Run migrations first
649        crate::migrator().run(&pool).await.unwrap();
650
651        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
652            .await
653            .unwrap();
654
655        // Verify we can get a repository
656        let repository = handler.repository();
657
658        // Test initial state - no requests logged yet
659        let filter = RequestFilter::default();
660        let results = repository.query(filter).await.unwrap();
661        assert!(results.is_empty());
662    }
663
664    #[sqlx::test]
665    async fn test_handle_request_with_typed_body(pool: PgPool) {
666        // Run migrations first
667        crate::migrator().run(&pool).await.unwrap();
668
669        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
670            .await
671            .unwrap();
672        let repository = handler.repository();
673
674        let mut request_data = create_test_request_data();
675        let correlation_id = 12345;
676        request_data.correlation_id = correlation_id;
677
678        // Handle the request
679        handler.handle_request(request_data.clone()).await;
680
681        // Query back the request
682        let filter = RequestFilter {
683            correlation_id: Some(correlation_id as i64),
684            ..Default::default()
685        };
686        let results = repository.query(filter).await.unwrap();
687
688        assert_eq!(results.len(), 1);
689        let pair = &results[0];
690
691        assert_eq!(pair.request.correlation_id, correlation_id as i64);
692        assert_eq!(pair.request.method, "POST");
693        assert_eq!(pair.request.uri, "/api/users");
694
695        // Check that body was parsed successfully
696        match &pair.request.body {
697            Some(Ok(parsed_body)) => {
698                assert_eq!(
699                    *parsed_body,
700                    TestRequest {
701                        user_id: 123,
702                        action: "create_user".to_string(),
703                    }
704                );
705            }
706            _ => panic!("Expected successfully parsed request body"),
707        }
708
709        // Headers should be converted to JSON properly
710        let headers_value = &pair.request.headers;
711        assert!(headers_value.get("content-type").is_some());
712        assert!(headers_value.get("user-agent").is_some());
713
714        // No response yet
715        assert!(pair.response.is_none());
716    }
717
718    #[sqlx::test]
719    async fn test_handle_response_with_typed_body(pool: PgPool) {
720        // Run migrations first
721        crate::migrator().run(&pool).await.unwrap();
722
723        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
724            .await
725            .unwrap();
726        let repository = handler.repository();
727
728        let mut request_data = create_test_request_data();
729        let mut response_data = create_test_response_data();
730        let correlation_id = 54321;
731        request_data.correlation_id = correlation_id;
732        response_data.correlation_id = correlation_id;
733
734        // Handle both request and response
735        handler.handle_request(request_data.clone()).await;
736        handler
737            .handle_response(request_data, response_data.clone())
738            .await;
739
740        // Query back the complete pair
741        let filter = RequestFilter {
742            correlation_id: Some(correlation_id as i64),
743            ..Default::default()
744        };
745        let results = repository.query(filter).await.unwrap();
746
747        assert_eq!(results.len(), 1);
748        let pair = &results[0];
749
750        // Check response data
751        let response = pair.response.as_ref().expect("Response should be present");
752        assert_eq!(response.correlation_id, correlation_id as i64);
753        assert_eq!(response.status_code, 201);
754        assert_eq!(response.duration_ms, 150);
755
756        // Check that response body was parsed successfully
757        match &response.body {
758            Some(Ok(parsed_body)) => {
759                assert_eq!(
760                    *parsed_body,
761                    TestResponse {
762                        success: true,
763                        message: "User created successfully".to_string(),
764                    }
765                );
766            }
767            _ => panic!("Expected successfully parsed response body"),
768        }
769    }
770
771    #[sqlx::test]
772    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
773        // Run migrations first
774        crate::migrator().run(&pool).await.unwrap();
775
776        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
777            .await
778            .unwrap();
779        let repository = handler.repository();
780
781        // Create request with invalid JSON for TestRequest
782        let mut headers = HashMap::new();
783        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
784
785        let invalid_json_body = b"not valid json for TestRequest";
786        let correlation_id = 99999;
787        let request_data = RequestData {
788            method: http::Method::POST,
789            uri: http::Uri::from_static("/api/test"),
790            headers,
791            body: Some(Bytes::from(invalid_json_body.to_vec())),
792            timestamp: SystemTime::now(),
793            correlation_id,
794            trace_id: None,
795            span_id: None,
796        };
797
798        handler.handle_request(request_data).await;
799
800        // Query back and verify fallback to base64
801        let filter = RequestFilter {
802            correlation_id: Some(correlation_id as i64),
803            ..Default::default()
804        };
805        let results = repository.query(filter).await.unwrap();
806
807        assert_eq!(results.len(), 1);
808        let pair = &results[0];
809
810        // Should fallback to raw bytes
811        match &pair.request.body {
812            Some(Err(raw_bytes)) => {
813                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
814            }
815            _ => panic!("Expected raw bytes fallback for unparseable body"),
816        }
817    }
818
819    #[sqlx::test]
820    async fn test_query_with_multiple_filters(pool: PgPool) {
821        // Run migrations first
822        crate::migrator().run(&pool).await.unwrap();
823
824        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
825            .await
826            .unwrap();
827        let repository = handler.repository();
828
829        // Insert multiple requests with different characteristics
830        let test_cases = vec![
831            (1001, "GET", "/api/users", 200, 100),
832            (1002, "POST", "/api/users", 201, 150),
833            (1003, "GET", "/api/orders", 404, 50),
834            (1004, "PUT", "/api/users/123", 200, 300),
835        ];
836
837        for (correlation_id, method, uri, status, duration_ms) in test_cases {
838            let mut headers = HashMap::new();
839            headers.insert("content-type".to_string(), vec!["application/json".into()]);
840
841            let request_data = RequestData {
842                method: method.parse().unwrap(),
843                uri: uri.parse().unwrap(),
844                headers: headers.clone(),
845                body: Some(Bytes::from(b"{}".to_vec())),
846                timestamp: SystemTime::now(),
847                correlation_id,
848                trace_id: None,
849                span_id: None,
850            };
851
852            let response_data = ResponseData {
853                correlation_id,
854                status: http::StatusCode::from_u16(status).unwrap(),
855                headers,
856                body: Some(Bytes::from(b"{}".to_vec())),
857                timestamp: SystemTime::now(),
858                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
859                duration: Duration::from_millis(duration_ms),
860            };
861
862            handler.handle_request(request_data.clone()).await;
863            handler.handle_response(request_data, response_data).await;
864        }
865
866        // Test method filter
867        let filter = RequestFilter {
868            method: Some("GET".to_string()),
869            ..Default::default()
870        };
871        let results = repository.query(filter).await.unwrap();
872        assert_eq!(results.len(), 2); // 1001, 1003
873
874        // Test status code filter
875        let filter = RequestFilter {
876            status_code: Some(200),
877            ..Default::default()
878        };
879        let results = repository.query(filter).await.unwrap();
880        assert_eq!(results.len(), 2); // 1001, 1004
881
882        // Test URI pattern filter
883        let filter = RequestFilter {
884            uri_pattern: Some("/api/users%".to_string()),
885            ..Default::default()
886        };
887        let results = repository.query(filter).await.unwrap();
888        assert_eq!(results.len(), 3); // 1001, 1002, 1004
889
890        // Test duration range filter
891        let filter = RequestFilter {
892            min_duration_ms: Some(100),
893            max_duration_ms: Some(200),
894            ..Default::default()
895        };
896        let results = repository.query(filter).await.unwrap();
897        assert_eq!(results.len(), 2); // 1001, 1002
898
899        // Test combined filters
900        let filter = RequestFilter {
901            method: Some("GET".to_string()),
902            status_code: Some(200),
903            ..Default::default()
904        };
905        let results = repository.query(filter).await.unwrap();
906        assert_eq!(results.len(), 1); // Only 1001
907        assert_eq!(results[0].request.correlation_id, 1001);
908    }
909
910    #[sqlx::test]
911    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
912        // Run migrations first
913        crate::migrator().run(&pool).await.unwrap();
914
915        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
916            .await
917            .unwrap();
918        let repository = handler.repository();
919
920        // Insert requests with known timestamps
921        let now = SystemTime::now();
922        for i in 0..5 {
923            let correlation_id = 2000 + i;
924            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
925
926            let mut headers = HashMap::new();
927            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
928
929            let request_data = RequestData {
930                method: http::Method::GET,
931                uri: "/api/test".parse().unwrap(),
932                headers,
933                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
934                timestamp,
935                correlation_id,
936                trace_id: None,
937                span_id: None,
938            };
939
940            handler.handle_request(request_data).await;
941        }
942
943        // Test default ordering (ASC) with limit
944        let filter = RequestFilter {
945            limit: Some(3),
946            ..Default::default()
947        };
948        let results = repository.query(filter).await.unwrap();
949        assert_eq!(results.len(), 3);
950
951        // Should be in ascending timestamp order
952        for i in 0..2 {
953            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
954        }
955
956        // Test descending order with offset
957        let filter = RequestFilter {
958            order_by_timestamp_desc: true,
959            limit: Some(2),
960            offset: Some(1),
961            ..Default::default()
962        };
963        let results = repository.query(filter).await.unwrap();
964        assert_eq!(results.len(), 2);
965
966        // Should be in descending order, skipping the first (newest) one
967        assert!(results[0].request.timestamp >= results[1].request.timestamp);
968    }
969
970    #[sqlx::test]
971    async fn test_headers_conversion(pool: PgPool) {
972        // Run migrations first
973        crate::migrator().run(&pool).await.unwrap();
974
975        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
976            .await
977            .unwrap();
978        let repository = handler.repository();
979
980        // Test various header scenarios
981        let mut headers = HashMap::new();
982        headers.insert("single-value".to_string(), vec!["test".into()]);
983        headers.insert(
984            "multi-value".to_string(),
985            vec!["val1".into(), "val2".into()],
986        );
987        headers.insert("empty-value".to_string(), vec!["".into()]);
988
989        let request_data = RequestData {
990            correlation_id: 3000,
991            method: http::Method::GET,
992            uri: "/test".parse().unwrap(),
993            headers,
994            body: None,
995            timestamp: SystemTime::now(),
996            trace_id: None,
997            span_id: None,
998        };
999
1000        let correlation_id = 3000;
1001        handler.handle_request(request_data).await;
1002
1003        let filter = RequestFilter {
1004            correlation_id: Some(correlation_id as i64),
1005            ..Default::default()
1006        };
1007        let results = repository.query(filter).await.unwrap();
1008
1009        assert_eq!(results.len(), 1);
1010        let headers_json = &results[0].request.headers;
1011
1012        // Single value should be stored as string
1013        assert_eq!(
1014            headers_json["single-value"],
1015            Value::String("test".to_string())
1016        );
1017
1018        // Multi-value should be stored as array
1019        match &headers_json["multi-value"] {
1020            Value::Array(arr) => {
1021                assert_eq!(arr.len(), 2);
1022                assert_eq!(arr[0], Value::String("val1".to_string()));
1023                assert_eq!(arr[1], Value::String("val2".to_string()));
1024            }
1025            _ => panic!("Expected array for multi-value header"),
1026        }
1027
1028        // Empty value should still be a string
1029        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1030    }
1031
1032    #[sqlx::test]
1033    async fn test_timestamp_filtering(pool: PgPool) {
1034        // Run migrations first
1035        crate::migrator().run(&pool).await.unwrap();
1036
1037        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1038            .await
1039            .unwrap();
1040        let repository = handler.repository();
1041
1042        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
1043
1044        // Insert requests at different times
1045        let times = [
1046            base_time + Duration::from_secs(0),    // correlation_id 4001
1047            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
1048            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
1049        ];
1050
1051        for (i, timestamp) in times.iter().enumerate() {
1052            let correlation_id = 4001 + i as u64;
1053            let request_data = RequestData {
1054                method: http::Method::GET,
1055                uri: "/test".parse().unwrap(),
1056                headers: HashMap::new(),
1057                body: None,
1058                timestamp: *timestamp,
1059                correlation_id,
1060                trace_id: None,
1061                span_id: None,
1062            };
1063
1064            handler.handle_request(request_data).await;
1065        }
1066
1067        // Test timestamp_after filter
1068        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
1069        let filter = RequestFilter {
1070            timestamp_after: Some(after_time),
1071            ..Default::default()
1072        };
1073        let results = repository.query(filter).await.unwrap();
1074        assert_eq!(results.len(), 2); // Should get 4002 and 4003
1075
1076        // Test timestamp_before filter
1077        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
1078        let filter = RequestFilter {
1079            timestamp_before: Some(before_time),
1080            ..Default::default()
1081        };
1082        let results = repository.query(filter).await.unwrap();
1083        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1084
1085        // Test timestamp range
1086        let filter = RequestFilter {
1087            timestamp_after: Some(after_time),
1088            timestamp_before: Some(before_time),
1089            ..Default::default()
1090        };
1091        let results = repository.query(filter).await.unwrap();
1092        assert_eq!(results.len(), 1); // Should get only 4002
1093        assert_eq!(results[0].request.correlation_id, 4002);
1094    }
1095
1096    // Note: Path filtering tests have been removed because path filtering
1097    // now happens at the outlet middleware layer, not in the PostgresHandler.
1098    // The handler now logs everything it receives, with filtering done upstream.
1099
1100    #[sqlx::test]
1101    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1102        // Run migrations first
1103        crate::migrator().run(&pool).await.unwrap();
1104
1105        // Handler without any path filtering
1106        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1107            .await
1108            .unwrap();
1109        let repository = handler.repository();
1110
1111        let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
1112        for (i, uri) in test_uris.iter().enumerate() {
1113            let correlation_id = 3000 + i as u64;
1114            let mut headers = HashMap::new();
1115            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1116
1117            let request_data = RequestData {
1118                method: http::Method::GET,
1119                uri: uri.parse().unwrap(),
1120                headers,
1121                body: Some(Bytes::from(b"{}".to_vec())),
1122                timestamp: SystemTime::now(),
1123                correlation_id,
1124                trace_id: None,
1125                span_id: None,
1126            };
1127
1128            handler.handle_request(request_data).await;
1129        }
1130
1131        // Should have logged all 4 requests
1132        let filter = RequestFilter::default();
1133        let results = repository.query(filter).await.unwrap();
1134        assert_eq!(results.len(), 4);
1135    }
1136
1137    // Tests for read/write pool separation using TestDbPools
1138    #[sqlx::test]
1139    async fn test_write_operations_use_write_pool(pool: PgPool) {
1140        // Run migrations first
1141        crate::migrator().run(&pool).await.unwrap();
1142
1143        // Create TestDbPools which has a read-only replica
1144        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1145        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1146            .await
1147            .unwrap();
1148
1149        let mut request_data = create_test_request_data();
1150        let correlation_id = 5001;
1151        request_data.correlation_id = correlation_id;
1152
1153        // This should succeed because handle_request uses .write() which goes to primary
1154        handler.handle_request(request_data.clone()).await;
1155
1156        // Verify the write succeeded by reading from the primary pool
1157        let count: i64 =
1158            sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
1159                .bind(correlation_id as i64)
1160                .fetch_one(test_pools.write())
1161                .await
1162                .unwrap();
1163
1164        assert_eq!(count, 1, "Request should be written to primary pool");
1165    }
1166
1167    #[sqlx::test]
1168    async fn test_response_write_uses_write_pool(pool: PgPool) {
1169        // Run migrations first
1170        crate::migrator().run(&pool).await.unwrap();
1171
1172        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1173        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1174            .await
1175            .unwrap();
1176
1177        let mut request_data = create_test_request_data();
1178        let mut response_data = create_test_response_data();
1179        let correlation_id = 5002;
1180        request_data.correlation_id = correlation_id;
1181        response_data.correlation_id = correlation_id;
1182
1183        // Write request first
1184        handler.handle_request(request_data.clone()).await;
1185
1186        // Write response - should succeed because it uses .write()
1187        handler.handle_response(request_data, response_data).await;
1188
1189        // Verify both were written
1190        let count: i64 =
1191            sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
1192                .bind(correlation_id as i64)
1193                .fetch_one(test_pools.write())
1194                .await
1195                .unwrap();
1196
1197        assert_eq!(count, 1, "Response should be written to primary pool");
1198    }
1199
1200    #[sqlx::test]
1201    async fn test_repository_queries_use_read_pool(pool: PgPool) {
1202        // Run migrations first
1203        crate::migrator().run(&pool).await.unwrap();
1204
1205        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1206        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1207            .await
1208            .unwrap();
1209
1210        // Write some data using the handler (which uses write pool)
1211        let mut request_data = create_test_request_data();
1212        let correlation_id = 5003;
1213        request_data.correlation_id = correlation_id;
1214        handler.handle_request(request_data).await;
1215
1216        // Query using repository - should succeed because it uses .read()
1217        let repository = handler.repository();
1218        let filter = RequestFilter {
1219            correlation_id: Some(correlation_id as i64),
1220            ..Default::default()
1221        };
1222
1223        // This will succeed if repository.query() correctly uses .read()
1224        let results = repository.query(filter).await.unwrap();
1225        assert_eq!(results.len(), 1);
1226        assert_eq!(results[0].request.correlation_id, correlation_id as i64);
1227    }
1228
1229    #[sqlx::test]
1230    async fn test_replica_pool_rejects_writes(pool: PgPool) {
1231        // Run migrations first
1232        crate::migrator().run(&pool).await.unwrap();
1233
1234        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1235
1236        // Verify that the replica pool is actually read-only
1237        let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
1238            .bind(Uuid::new_v4())
1239            .bind(9999i64)
1240            .bind(Utc::now())
1241            .bind("GET")
1242            .bind("/test")
1243            .bind(serde_json::json!({}))
1244            .bind(None::<Value>)
1245            .bind(false)
1246            .execute(test_pools.read())
1247            .await;
1248
1249        // Should fail with a read-only transaction error
1250        assert!(
1251            result.is_err(),
1252            "Replica pool should reject write operations"
1253        );
1254
1255        let err = result.unwrap_err();
1256        let err_msg = err.to_string().to_lowercase();
1257        assert!(
1258            err_msg.contains("read-only") || err_msg.contains("read only"),
1259            "Error should mention read-only: {}",
1260            err
1261        );
1262    }
1263
1264    #[sqlx::test]
1265    async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
1266        // Run migrations first
1267        crate::migrator().run(&pool).await.unwrap();
1268
1269        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1270        let handler =
1271            PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
1272                .await
1273                .unwrap();
1274
1275        let mut request_data = create_test_request_data();
1276        let mut response_data = create_test_response_data();
1277        let correlation_id = 5004;
1278        request_data.correlation_id = correlation_id;
1279        response_data.correlation_id = correlation_id;
1280
1281        // Write request and response (uses write pool)
1282        handler.handle_request(request_data.clone()).await;
1283        handler.handle_response(request_data, response_data).await;
1284
1285        // Query back using repository (uses read pool)
1286        let repository = handler.repository();
1287        let filter = RequestFilter {
1288            correlation_id: Some(correlation_id as i64),
1289            ..Default::default()
1290        };
1291
1292        let results = repository.query(filter).await.unwrap();
1293        assert_eq!(results.len(), 1);
1294
1295        // Verify request data
1296        let pair = &results[0];
1297        assert_eq!(pair.request.correlation_id, correlation_id as i64);
1298        assert_eq!(pair.request.method, "POST");
1299        assert_eq!(pair.request.uri, "/api/users");
1300
1301        // Verify response data
1302        let response = pair.response.as_ref().expect("Response should exist");
1303        assert_eq!(response.correlation_id, correlation_id as i64);
1304        assert_eq!(response.status_code, 201);
1305
1306        // Verify parsed bodies
1307        match &pair.request.body {
1308            Some(Ok(parsed_body)) => {
1309                assert_eq!(
1310                    *parsed_body,
1311                    TestRequest {
1312                        user_id: 123,
1313                        action: "create_user".to_string(),
1314                    }
1315                );
1316            }
1317            _ => panic!("Expected successfully parsed request body"),
1318        }
1319
1320        match &response.body {
1321            Some(Ok(parsed_body)) => {
1322                assert_eq!(
1323                    *parsed_body,
1324                    TestResponse {
1325                        success: true,
1326                        message: "User created successfully".to_string(),
1327                    }
1328                );
1329            }
1330            _ => panic!("Expected successfully parsed response body"),
1331        }
1332    }
1333}