Skip to main content

outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use metrics::{counter, histogram};
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use std::time::{Instant, SystemTime};
93use tracing::{debug, error, instrument, warn};
94use uuid::Uuid;
95
96pub mod error;
97pub mod repository;
98pub use error::PostgresHandlerError;
99pub use repository::{
100    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
101};
102
103// Re-export from sqlx-pool-router
104pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
105
106/// Get the migrator for running outlet-postgres database migrations.
107///
108/// This returns a SQLx migrator that can be used to set up the required
109/// `http_requests` and `http_responses` tables. The consuming application
110/// is responsible for running these migrations at the appropriate time
111/// and in the appropriate database schema.
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// use outlet_postgres::migrator;
117/// use sqlx::PgPool;
118///
119/// #[tokio::main]
120/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
121///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
122///     
123///     // Run outlet migrations
124///     migrator().run(&pool).await?;
125///     
126///     Ok(())
127/// }
128/// ```
129pub fn migrator() -> sqlx::migrate::Migrator {
130    sqlx::migrate!("./migrations")
131}
132
133/// Type alias for request body serializers.
134///
135/// Request serializers take full request context including headers and body bytes.
136/// On failure, they return a `SerializationError` with fallback data.
137type RequestSerializer<T> =
138    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
139
140/// Type alias for response body serializers.
141///
142/// Response serializers take both request and response context, allowing them to
143/// make parsing decisions based on request details and response headers (e.g., compression).
144/// On failure, they return a `SerializationError` with fallback data.
145type ResponseSerializer<T> = Arc<
146    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
147        + Send
148        + Sync,
149>;
150
151/// PostgreSQL handler for outlet middleware.
152///
153/// Implements the `RequestHandler` trait to log HTTP requests and responses
154/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
155///
156/// Generic over:
157/// - `P`: Pool provider implementing `PoolProvider` trait for read/write routing
158/// - `TReq` and `TRes`: Request and response body types for JSONB serialization
159///
160/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
161#[derive(Clone)]
162pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
163where
164    P: PoolProvider,
165    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
166    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
167{
168    pool: P,
169    request_serializer: RequestSerializer<TReq>,
170    response_serializer: ResponseSerializer<TRes>,
171    instance_id: Uuid,
172}
173
174impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
175where
176    P: PoolProvider,
177    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
178    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
179{
180    /// Default serializer that attempts serde JSON deserialization.
181    /// On failure, returns a SerializationError with raw bytes as fallback data.
182    fn default_request_serializer() -> RequestSerializer<TReq> {
183        Arc::new(|request_data| {
184            let bytes = request_data.body.as_deref().unwrap_or(&[]);
185            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
186                let fallback_data = String::from_utf8_lossy(bytes).to_string();
187                SerializationError::new(fallback_data, error)
188            })
189        })
190    }
191
192    /// Default serializer that attempts serde JSON deserialization.
193    /// On failure, returns a SerializationError with raw bytes as fallback data.
194    fn default_response_serializer() -> ResponseSerializer<TRes> {
195        Arc::new(|_request_data, response_data| {
196            let bytes = response_data.body.as_deref().unwrap_or(&[]);
197            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
198                let fallback_data = String::from_utf8_lossy(bytes).to_string();
199                SerializationError::new(fallback_data, error)
200            })
201        })
202    }
203
204    /// Add a custom request body serializer.
205    ///
206    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
207    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
208    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
209    ///
210    /// # Panics
211    ///
212    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
213    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
214    /// implementation of `TReq` and should be fixed by the caller.
215    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
216    where
217        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
218    {
219        self.request_serializer = Arc::new(serializer);
220        self
221    }
222
223    /// Add a custom response body serializer.
224    ///
225    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
226    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
227    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
228    ///
229    /// # Panics
230    ///
231    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
232    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
233    /// implementation of `TRes` and should be fixed by the caller.
234    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
235    where
236        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
237            + Send
238            + Sync
239            + 'static,
240    {
241        self.response_serializer = Arc::new(serializer);
242        self
243    }
244
245    /// Create a PostgreSQL handler from a pool provider.
246    ///
247    /// Use this if you want to use a custom pool provider implementation
248    /// (such as `DbPools` for read/write separation).
249    /// This will NOT run migrations - use `migrator()` to run migrations separately.
250    ///
251    /// # Arguments
252    ///
253    /// * `pool_provider` - Pool provider implementing `PoolProvider` trait
254    ///
255    /// # Examples
256    ///
257    /// ```rust,no_run
258    /// use outlet_postgres::{PostgresHandler, DbPools, migrator};
259    /// use sqlx::postgres::PgPoolOptions;
260    /// use serde::{Deserialize, Serialize};
261    ///
262    /// #[derive(Deserialize, Serialize)]
263    /// struct MyBodyType {
264    ///     id: u64,
265    ///     name: String,
266    /// }
267    ///
268    /// #[tokio::main]
269    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
270    ///     let primary = PgPoolOptions::new()
271    ///         .connect("postgresql://user:pass@primary/db").await?;
272    ///     let replica = PgPoolOptions::new()
273    ///         .connect("postgresql://user:pass@replica/db").await?;
274    ///
275    ///     // Run migrations on primary
276    ///     migrator().run(&primary).await?;
277    ///
278    ///     // Create handler with read/write separation
279    ///     let pools = DbPools::with_replica(primary, replica);
280    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::from_pool_provider(pools).await?;
281    ///     Ok(())
282    /// }
283    /// ```
284    pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
285        Ok(Self {
286            pool: pool_provider,
287            request_serializer: Self::default_request_serializer(),
288            response_serializer: Self::default_response_serializer(),
289            instance_id: Uuid::new_v4(),
290        })
291    }
292
293    /// Convert headers to a JSONB-compatible format.
294    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
295        let mut header_map = HashMap::new();
296        for (name, values) in headers {
297            if values.len() == 1 {
298                let value_str = String::from_utf8_lossy(&values[0]).to_string();
299                header_map.insert(name.clone(), Value::String(value_str));
300            } else {
301                let value_array: Vec<Value> = values
302                    .iter()
303                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
304                    .collect();
305                header_map.insert(name.clone(), Value::Array(value_array));
306            }
307        }
308        serde_json::to_value(header_map).unwrap_or(Value::Null)
309    }
310
311    /// Convert request data to a JSONB value using the configured serializer.
312    fn request_body_to_json_with_fallback(
313        &self,
314        request_data: &outlet::RequestData,
315    ) -> (Value, bool) {
316        match (self.request_serializer)(request_data) {
317            Ok(typed_value) => {
318                if let Ok(json_value) = serde_json::to_value(&typed_value) {
319                    (json_value, true)
320                } else {
321                    // This should never happen if the type implements Serialize correctly
322                    (
323                        Value::String(
324                            serde_json::to_string(&typed_value)
325                                .expect("Serialized value must be convertible to JSON string"),
326                        ),
327                        false,
328                    )
329                }
330            }
331            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
332        }
333    }
334
335    /// Convert response data to a JSONB value using the configured serializer.
336    fn response_body_to_json_with_fallback(
337        &self,
338        request_data: &outlet::RequestData,
339        response_data: &outlet::ResponseData,
340    ) -> (Value, bool) {
341        match (self.response_serializer)(request_data, response_data) {
342            Ok(typed_value) => {
343                if let Ok(json_value) = serde_json::to_value(&typed_value) {
344                    (json_value, true)
345                } else {
346                    // This should never happen if the type implements Serialize correctly
347                    (
348                        Value::String(
349                            serde_json::to_string(&typed_value)
350                                .expect("Serialized value must be convertible to JSON string"),
351                        ),
352                        false,
353                    )
354                }
355            }
356            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
357        }
358    }
359
360    /// Get a repository for querying logged requests and responses.
361    ///
362    /// Returns a `RequestRepository` with the same type parameters as this handler,
363    /// allowing for type-safe querying of request and response bodies.
364    pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
365        crate::repository::RequestRepository::new(self.pool.clone())
366    }
367}
368
369// Backward-compatible constructors for PgPool
370impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
371where
372    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
373    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
374{
375    /// Create a new PostgreSQL handler with a connection pool.
376    ///
377    /// This will connect to the database but will NOT run migrations.
378    /// Use `migrator()` to get a migrator and run migrations separately.
379    ///
380    /// # Arguments
381    ///
382    /// * `database_url` - PostgreSQL connection string
383    ///
384    /// # Examples
385    ///
386    /// ```rust,no_run
387    /// use outlet_postgres::{PostgresHandler, migrator};
388    /// use serde::{Deserialize, Serialize};
389    ///
390    /// #[derive(Deserialize, Serialize)]
391    /// struct MyBodyType {
392    ///     id: u64,
393    ///     name: String,
394    /// }
395    ///
396    /// #[tokio::main]
397    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
398    ///     // Run migrations first
399    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
400    ///     migrator().run(&pool).await?;
401    ///
402    ///     // Create handler
403    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
404    ///     Ok(())
405    /// }
406    /// ```
407    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
408        let pool = PgPool::connect(database_url)
409            .await
410            .map_err(PostgresHandlerError::Connection)?;
411
412        Ok(Self {
413            pool,
414            request_serializer: Self::default_request_serializer(),
415            response_serializer: Self::default_response_serializer(),
416            instance_id: Uuid::new_v4(),
417        })
418    }
419
420    /// Create a PostgreSQL handler from an existing connection pool.
421    ///
422    /// Use this if you already have a connection pool and want to reuse it.
423    /// This will NOT run migrations - use `migrator()` to run migrations separately.
424    ///
425    /// # Arguments
426    ///
427    /// * `pool` - Existing PostgreSQL connection pool
428    ///
429    /// # Examples
430    ///
431    /// ```rust,no_run
432    /// use outlet_postgres::{PostgresHandler, migrator};
433    /// use sqlx::PgPool;
434    /// use serde::{Deserialize, Serialize};
435    ///
436    /// #[derive(Deserialize, Serialize)]
437    /// struct MyBodyType {
438    ///     id: u64,
439    ///     name: String,
440    /// }
441    ///
442    /// #[tokio::main]
443    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
444    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
445    ///
446    ///     // Run migrations first
447    ///     migrator().run(&pool).await?;
448    ///
449    ///     // Create handler
450    ///     let handler = PostgresHandler::<_, MyBodyType, MyBodyType>::from_pool(pool).await?;
451    ///     Ok(())
452    /// }
453    /// ```
454    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
455        Self::from_pool_provider(pool).await
456    }
457}
458
459impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
460where
461    P: PoolProvider,
462    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
463    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
464{
465    #[instrument(name = "outlet.handle_request", skip(self, data), fields(correlation_id = %data.correlation_id))]
466    async fn handle_request(&self, data: RequestData) {
467        let headers_json = Self::headers_to_json(&data.headers);
468        let (body_json, parsed) = if data.body.is_some() {
469            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
470            (Some(json), parsed)
471        } else {
472            (None, false)
473        };
474
475        let timestamp: DateTime<Utc> = data.timestamp.into();
476
477        let query_start = Instant::now();
478        let result = sqlx::query(
479            r#"
480            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
481            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
482            "#,
483        )
484        .bind(self.instance_id)
485        .bind(data.correlation_id as i64)
486        .bind(timestamp)
487        .bind(data.method.to_string())
488        .bind(data.uri.to_string())
489        .bind(headers_json)
490        .bind(body_json)
491        .bind(parsed)
492        .bind(&data.trace_id)
493        .bind(&data.span_id)
494        .execute(self.pool.write())
495        .await;
496        let query_duration = query_start.elapsed();
497        histogram!("outlet_write_duration_seconds", "operation" => "request")
498            .record(query_duration.as_secs_f64());
499
500        if let Err(e) = result {
501            counter!("outlet_write_errors_total", "operation" => "request").increment(1);
502            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
503        } else {
504            let processing_lag_ms = SystemTime::now()
505                .duration_since(data.timestamp)
506                .unwrap_or_default()
507                .as_millis();
508            if processing_lag_ms > 1000 {
509                warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
510            } else {
511                debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
512            }
513        }
514    }
515
516    #[instrument(name = "outlet.handle_response", skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
517    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
518        let headers_json = Self::headers_to_json(&response_data.headers);
519        let (body_json, parsed) = if response_data.body.is_some() {
520            let (json, parsed) =
521                self.response_body_to_json_with_fallback(&request_data, &response_data);
522            (Some(json), parsed)
523        } else {
524            (None, false)
525        };
526
527        let timestamp: DateTime<Utc> = response_data.timestamp.into();
528        let duration_ms = response_data.duration.as_millis() as i64;
529        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
530
531        let query_start = Instant::now();
532        let result = sqlx::query(
533            r#"
534            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
535            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
536            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
537            "#,
538        )
539        .bind(self.instance_id)
540        .bind(request_data.correlation_id as i64)
541        .bind(timestamp)
542        .bind(response_data.status.as_u16() as i32)
543        .bind(headers_json)
544        .bind(body_json)
545        .bind(parsed)
546        .bind(duration_to_first_byte_ms)
547        .bind(duration_ms)
548        .execute(self.pool.write())
549        .await;
550        let query_duration = query_start.elapsed();
551        histogram!("outlet_write_duration_seconds", "operation" => "response")
552            .record(query_duration.as_secs_f64());
553
554        match result {
555            Err(e) => {
556                counter!("outlet_write_errors_total", "operation" => "response").increment(1);
557                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
558            }
559            Ok(query_result) => {
560                if query_result.rows_affected() > 0 {
561                    let processing_lag_ms = SystemTime::now()
562                        .duration_since(response_data.timestamp)
563                        .unwrap_or_default()
564                        .as_millis();
565                    if processing_lag_ms > 1000 {
566                        warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
567                    } else {
568                        debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
569                    }
570                } else {
571                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
572                }
573            }
574        }
575    }
576
577    #[instrument(name = "outlet.handle_request_batch", skip(self, batch), fields(batch_size = batch.len()))]
578    async fn handle_request_batch(&self, batch: &[RequestData]) {
579        if batch.is_empty() {
580            return;
581        }
582
583        let len = batch.len();
584        let mut instance_ids = Vec::with_capacity(len);
585        let mut correlation_ids = Vec::with_capacity(len);
586        let mut timestamps = Vec::with_capacity(len);
587        let mut methods = Vec::with_capacity(len);
588        let mut uris = Vec::with_capacity(len);
589        let mut headers_col: Vec<Value> = Vec::with_capacity(len);
590        let mut bodies: Vec<Option<Value>> = Vec::with_capacity(len);
591        let mut body_parsed_col = Vec::with_capacity(len);
592        let mut trace_ids: Vec<Option<String>> = Vec::with_capacity(len);
593        let mut span_ids: Vec<Option<String>> = Vec::with_capacity(len);
594
595        for data in batch {
596            instance_ids.push(self.instance_id);
597            correlation_ids.push(data.correlation_id as i64);
598            timestamps.push(DateTime::<Utc>::from(data.timestamp));
599            methods.push(data.method.to_string());
600            uris.push(data.uri.to_string());
601            headers_col.push(Self::headers_to_json(&data.headers));
602
603            let (body_json, parsed) = if data.body.is_some() {
604                let (json, parsed) = self.request_body_to_json_with_fallback(data);
605                (Some(json), parsed)
606            } else {
607                (None, false)
608            };
609            bodies.push(body_json);
610            body_parsed_col.push(parsed);
611            trace_ids.push(data.trace_id.clone());
612            span_ids.push(data.span_id.clone());
613        }
614
615        let query_start = Instant::now();
616        let result = sqlx::query(
617            r#"
618            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
619            SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::timestamptz[], $4::varchar[], $5::text[], $6::jsonb[], $7::jsonb[], $8::boolean[], $9::varchar[], $10::varchar[])
620            "#,
621        )
622        .bind(&instance_ids)
623        .bind(&correlation_ids)
624        .bind(&timestamps)
625        .bind(&methods)
626        .bind(&uris)
627        .bind(&headers_col)
628        .bind(&bodies)
629        .bind(&body_parsed_col)
630        .bind(&trace_ids)
631        .bind(&span_ids)
632        .execute(self.pool.write())
633        .await;
634        let query_duration = query_start.elapsed();
635        histogram!("outlet_write_duration_seconds", "operation" => "request_batch")
636            .record(query_duration.as_secs_f64());
637
638        match result {
639            Ok(r) => {
640                debug!(
641                    rows = r.rows_affected(),
642                    duration_ms = query_duration.as_millis() as u64,
643                    "Request batch inserted"
644                );
645            }
646            Err(e) => {
647                counter!("outlet_write_errors_total", "operation" => "request_batch").increment(1);
648                error!(batch_size = len, error = %e, "Failed to bulk insert request batch");
649            }
650        }
651    }
652
653    #[instrument(name = "outlet.handle_response_batch", skip(self, batch), fields(batch_size = batch.len()))]
654    async fn handle_response_batch(&self, batch: &[(RequestData, ResponseData)]) {
655        if batch.is_empty() {
656            return;
657        }
658
659        let len = batch.len();
660        let mut instance_ids = Vec::with_capacity(len);
661        let mut correlation_ids = Vec::with_capacity(len);
662        let mut timestamps = Vec::with_capacity(len);
663        let mut status_codes = Vec::with_capacity(len);
664        let mut headers_col: Vec<Value> = Vec::with_capacity(len);
665        let mut bodies: Vec<Option<Value>> = Vec::with_capacity(len);
666        let mut body_parsed_col = Vec::with_capacity(len);
667        let mut duration_to_first_byte_ms_col = Vec::with_capacity(len);
668        let mut duration_ms_col = Vec::with_capacity(len);
669
670        for (request_data, response_data) in batch {
671            instance_ids.push(self.instance_id);
672            correlation_ids.push(request_data.correlation_id as i64);
673            timestamps.push(DateTime::<Utc>::from(response_data.timestamp));
674            status_codes.push(response_data.status.as_u16() as i32);
675            headers_col.push(Self::headers_to_json(&response_data.headers));
676
677            let (body_json, parsed) = if response_data.body.is_some() {
678                let (json, parsed) =
679                    self.response_body_to_json_with_fallback(request_data, response_data);
680                (Some(json), parsed)
681            } else {
682                (None, false)
683            };
684            bodies.push(body_json);
685            body_parsed_col.push(parsed);
686            duration_to_first_byte_ms_col
687                .push(response_data.duration_to_first_byte.as_millis() as i64);
688            duration_ms_col.push(response_data.duration.as_millis() as i64);
689        }
690
691        let query_start = Instant::now();
692        let result = sqlx::query(
693            r#"
694            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
695            SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::timestamptz[], $4::int[], $5::jsonb[], $6::jsonb[], $7::boolean[], $8::bigint[], $9::bigint[])
696            "#,
697        )
698        .bind(&instance_ids)
699        .bind(&correlation_ids)
700        .bind(&timestamps)
701        .bind(&status_codes)
702        .bind(&headers_col)
703        .bind(&bodies)
704        .bind(&body_parsed_col)
705        .bind(&duration_to_first_byte_ms_col)
706        .bind(&duration_ms_col)
707        .execute(self.pool.write())
708        .await;
709        let query_duration = query_start.elapsed();
710        histogram!("outlet_write_duration_seconds", "operation" => "response_batch")
711            .record(query_duration.as_secs_f64());
712
713        match result {
714            Ok(r) => {
715                debug!(
716                    rows = r.rows_affected(),
717                    duration_ms = query_duration.as_millis() as u64,
718                    "Response batch inserted"
719                );
720            }
721            Err(e) => {
722                counter!("outlet_write_errors_total", "operation" => "response_batch").increment(1);
723                error!(batch_size = len, error = %e, "Failed to bulk insert response batch");
724            }
725        }
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732    use bytes::Bytes;
733    use chrono::{DateTime, Utc};
734    use outlet::{RequestData, ResponseData};
735    use serde::{Deserialize, Serialize};
736    use serde_json::Value;
737    use sqlx::PgPool;
738    use std::collections::HashMap;
739    use std::time::{Duration, SystemTime};
740
741    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
742    struct TestRequest {
743        user_id: u64,
744        action: String,
745    }
746
747    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
748    struct TestResponse {
749        success: bool,
750        message: String,
751    }
752
753    fn create_test_request_data() -> RequestData {
754        let mut headers = HashMap::new();
755        headers.insert("content-type".to_string(), vec!["application/json".into()]);
756        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
757
758        let test_req = TestRequest {
759            user_id: 123,
760            action: "create_user".to_string(),
761        };
762        let body = serde_json::to_vec(&test_req).unwrap();
763
764        RequestData {
765            method: http::Method::POST,
766            uri: http::Uri::from_static("/api/users"),
767            headers,
768            body: Some(Bytes::from(body)),
769            timestamp: SystemTime::now(),
770            correlation_id: 0,
771            trace_id: None,
772            span_id: None,
773        }
774    }
775
776    fn create_test_response_data() -> ResponseData {
777        let mut headers = HashMap::new();
778        headers.insert("content-type".to_string(), vec!["application/json".into()]);
779
780        let test_res = TestResponse {
781            success: true,
782            message: "User created successfully".to_string(),
783        };
784        let body = serde_json::to_vec(&test_res).unwrap();
785
786        ResponseData {
787            status: http::StatusCode::CREATED,
788            headers,
789            body: Some(Bytes::from(body)),
790            timestamp: SystemTime::now(),
791            duration_to_first_byte: Duration::from_millis(100),
792            duration: Duration::from_millis(150),
793            correlation_id: 0,
794        }
795    }
796
797    #[sqlx::test]
798    async fn test_handler_creation(pool: PgPool) {
799        // Run migrations first
800        crate::migrator().run(&pool).await.unwrap();
801
802        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
803            .await
804            .unwrap();
805
806        // Verify we can get a repository
807        let repository = handler.repository();
808
809        // Test initial state - no requests logged yet
810        let filter = RequestFilter::default();
811        let results = repository.query(filter).await.unwrap();
812        assert!(results.is_empty());
813    }
814
815    #[sqlx::test]
816    async fn test_handle_request_with_typed_body(pool: PgPool) {
817        // Run migrations first
818        crate::migrator().run(&pool).await.unwrap();
819
820        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
821            .await
822            .unwrap();
823        let repository = handler.repository();
824
825        let mut request_data = create_test_request_data();
826        let correlation_id = 12345;
827        request_data.correlation_id = correlation_id;
828
829        // Handle the request
830        handler.handle_request(request_data.clone()).await;
831
832        // Query back the request
833        let filter = RequestFilter {
834            correlation_id: Some(correlation_id as i64),
835            ..Default::default()
836        };
837        let results = repository.query(filter).await.unwrap();
838
839        assert_eq!(results.len(), 1);
840        let pair = &results[0];
841
842        assert_eq!(pair.request.correlation_id, correlation_id as i64);
843        assert_eq!(pair.request.method, "POST");
844        assert_eq!(pair.request.uri, "/api/users");
845
846        // Check that body was parsed successfully
847        match &pair.request.body {
848            Some(Ok(parsed_body)) => {
849                assert_eq!(
850                    *parsed_body,
851                    TestRequest {
852                        user_id: 123,
853                        action: "create_user".to_string(),
854                    }
855                );
856            }
857            _ => panic!("Expected successfully parsed request body"),
858        }
859
860        // Headers should be converted to JSON properly
861        let headers_value = &pair.request.headers;
862        assert!(headers_value.get("content-type").is_some());
863        assert!(headers_value.get("user-agent").is_some());
864
865        // No response yet
866        assert!(pair.response.is_none());
867    }
868
869    #[sqlx::test]
870    async fn test_handle_response_with_typed_body(pool: PgPool) {
871        // Run migrations first
872        crate::migrator().run(&pool).await.unwrap();
873
874        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
875            .await
876            .unwrap();
877        let repository = handler.repository();
878
879        let mut request_data = create_test_request_data();
880        let mut response_data = create_test_response_data();
881        let correlation_id = 54321;
882        request_data.correlation_id = correlation_id;
883        response_data.correlation_id = correlation_id;
884
885        // Handle both request and response
886        handler.handle_request(request_data.clone()).await;
887        handler
888            .handle_response(request_data, response_data.clone())
889            .await;
890
891        // Query back the complete pair
892        let filter = RequestFilter {
893            correlation_id: Some(correlation_id as i64),
894            ..Default::default()
895        };
896        let results = repository.query(filter).await.unwrap();
897
898        assert_eq!(results.len(), 1);
899        let pair = &results[0];
900
901        // Check response data
902        let response = pair.response.as_ref().expect("Response should be present");
903        assert_eq!(response.correlation_id, correlation_id as i64);
904        assert_eq!(response.status_code, 201);
905        assert_eq!(response.duration_ms, 150);
906
907        // Check that response body was parsed successfully
908        match &response.body {
909            Some(Ok(parsed_body)) => {
910                assert_eq!(
911                    *parsed_body,
912                    TestResponse {
913                        success: true,
914                        message: "User created successfully".to_string(),
915                    }
916                );
917            }
918            _ => panic!("Expected successfully parsed response body"),
919        }
920    }
921
922    #[sqlx::test]
923    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
924        // Run migrations first
925        crate::migrator().run(&pool).await.unwrap();
926
927        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
928            .await
929            .unwrap();
930        let repository = handler.repository();
931
932        // Create request with invalid JSON for TestRequest
933        let mut headers = HashMap::new();
934        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
935
936        let invalid_json_body = b"not valid json for TestRequest";
937        let correlation_id = 99999;
938        let request_data = RequestData {
939            method: http::Method::POST,
940            uri: http::Uri::from_static("/api/test"),
941            headers,
942            body: Some(Bytes::from(invalid_json_body.to_vec())),
943            timestamp: SystemTime::now(),
944            correlation_id,
945            trace_id: None,
946            span_id: None,
947        };
948
949        handler.handle_request(request_data).await;
950
951        // Query back and verify fallback to base64
952        let filter = RequestFilter {
953            correlation_id: Some(correlation_id as i64),
954            ..Default::default()
955        };
956        let results = repository.query(filter).await.unwrap();
957
958        assert_eq!(results.len(), 1);
959        let pair = &results[0];
960
961        // Should fallback to raw bytes
962        match &pair.request.body {
963            Some(Err(raw_bytes)) => {
964                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
965            }
966            _ => panic!("Expected raw bytes fallback for unparseable body"),
967        }
968    }
969
970    #[sqlx::test]
971    async fn test_query_with_multiple_filters(pool: PgPool) {
972        // Run migrations first
973        crate::migrator().run(&pool).await.unwrap();
974
975        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
976            .await
977            .unwrap();
978        let repository = handler.repository();
979
980        // Insert multiple requests with different characteristics
981        let test_cases = vec![
982            (1001, "GET", "/api/users", 200, 100),
983            (1002, "POST", "/api/users", 201, 150),
984            (1003, "GET", "/api/orders", 404, 50),
985            (1004, "PUT", "/api/users/123", 200, 300),
986        ];
987
988        for (correlation_id, method, uri, status, duration_ms) in test_cases {
989            let mut headers = HashMap::new();
990            headers.insert("content-type".to_string(), vec!["application/json".into()]);
991
992            let request_data = RequestData {
993                method: method.parse().unwrap(),
994                uri: uri.parse().unwrap(),
995                headers: headers.clone(),
996                body: Some(Bytes::from(b"{}".to_vec())),
997                timestamp: SystemTime::now(),
998                correlation_id,
999                trace_id: None,
1000                span_id: None,
1001            };
1002
1003            let response_data = ResponseData {
1004                correlation_id,
1005                status: http::StatusCode::from_u16(status).unwrap(),
1006                headers,
1007                body: Some(Bytes::from(b"{}".to_vec())),
1008                timestamp: SystemTime::now(),
1009                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
1010                duration: Duration::from_millis(duration_ms),
1011            };
1012
1013            handler.handle_request(request_data.clone()).await;
1014            handler.handle_response(request_data, response_data).await;
1015        }
1016
1017        // Test method filter
1018        let filter = RequestFilter {
1019            method: Some("GET".to_string()),
1020            ..Default::default()
1021        };
1022        let results = repository.query(filter).await.unwrap();
1023        assert_eq!(results.len(), 2); // 1001, 1003
1024
1025        // Test status code filter
1026        let filter = RequestFilter {
1027            status_code: Some(200),
1028            ..Default::default()
1029        };
1030        let results = repository.query(filter).await.unwrap();
1031        assert_eq!(results.len(), 2); // 1001, 1004
1032
1033        // Test URI pattern filter
1034        let filter = RequestFilter {
1035            uri_pattern: Some("/api/users%".to_string()),
1036            ..Default::default()
1037        };
1038        let results = repository.query(filter).await.unwrap();
1039        assert_eq!(results.len(), 3); // 1001, 1002, 1004
1040
1041        // Test duration range filter
1042        let filter = RequestFilter {
1043            min_duration_ms: Some(100),
1044            max_duration_ms: Some(200),
1045            ..Default::default()
1046        };
1047        let results = repository.query(filter).await.unwrap();
1048        assert_eq!(results.len(), 2); // 1001, 1002
1049
1050        // Test combined filters
1051        let filter = RequestFilter {
1052            method: Some("GET".to_string()),
1053            status_code: Some(200),
1054            ..Default::default()
1055        };
1056        let results = repository.query(filter).await.unwrap();
1057        assert_eq!(results.len(), 1); // Only 1001
1058        assert_eq!(results[0].request.correlation_id, 1001);
1059    }
1060
1061    #[sqlx::test]
1062    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
1063        // Run migrations first
1064        crate::migrator().run(&pool).await.unwrap();
1065
1066        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1067            .await
1068            .unwrap();
1069        let repository = handler.repository();
1070
1071        // Insert requests with known timestamps
1072        let now = SystemTime::now();
1073        for i in 0..5 {
1074            let correlation_id = 2000 + i;
1075            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
1076
1077            let mut headers = HashMap::new();
1078            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
1079
1080            let request_data = RequestData {
1081                method: http::Method::GET,
1082                uri: "/api/test".parse().unwrap(),
1083                headers,
1084                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
1085                timestamp,
1086                correlation_id,
1087                trace_id: None,
1088                span_id: None,
1089            };
1090
1091            handler.handle_request(request_data).await;
1092        }
1093
1094        // Test default ordering (ASC) with limit
1095        let filter = RequestFilter {
1096            limit: Some(3),
1097            ..Default::default()
1098        };
1099        let results = repository.query(filter).await.unwrap();
1100        assert_eq!(results.len(), 3);
1101
1102        // Should be in ascending timestamp order
1103        for i in 0..2 {
1104            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
1105        }
1106
1107        // Test descending order with offset
1108        let filter = RequestFilter {
1109            order_by_timestamp_desc: true,
1110            limit: Some(2),
1111            offset: Some(1),
1112            ..Default::default()
1113        };
1114        let results = repository.query(filter).await.unwrap();
1115        assert_eq!(results.len(), 2);
1116
1117        // Should be in descending order, skipping the first (newest) one
1118        assert!(results[0].request.timestamp >= results[1].request.timestamp);
1119    }
1120
1121    #[sqlx::test]
1122    async fn test_headers_conversion(pool: PgPool) {
1123        // Run migrations first
1124        crate::migrator().run(&pool).await.unwrap();
1125
1126        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1127            .await
1128            .unwrap();
1129        let repository = handler.repository();
1130
1131        // Test various header scenarios
1132        let mut headers = HashMap::new();
1133        headers.insert("single-value".to_string(), vec!["test".into()]);
1134        headers.insert(
1135            "multi-value".to_string(),
1136            vec!["val1".into(), "val2".into()],
1137        );
1138        headers.insert("empty-value".to_string(), vec!["".into()]);
1139
1140        let request_data = RequestData {
1141            correlation_id: 3000,
1142            method: http::Method::GET,
1143            uri: "/test".parse().unwrap(),
1144            headers,
1145            body: None,
1146            timestamp: SystemTime::now(),
1147            trace_id: None,
1148            span_id: None,
1149        };
1150
1151        let correlation_id = 3000;
1152        handler.handle_request(request_data).await;
1153
1154        let filter = RequestFilter {
1155            correlation_id: Some(correlation_id as i64),
1156            ..Default::default()
1157        };
1158        let results = repository.query(filter).await.unwrap();
1159
1160        assert_eq!(results.len(), 1);
1161        let headers_json = &results[0].request.headers;
1162
1163        // Single value should be stored as string
1164        assert_eq!(
1165            headers_json["single-value"],
1166            Value::String("test".to_string())
1167        );
1168
1169        // Multi-value should be stored as array
1170        match &headers_json["multi-value"] {
1171            Value::Array(arr) => {
1172                assert_eq!(arr.len(), 2);
1173                assert_eq!(arr[0], Value::String("val1".to_string()));
1174                assert_eq!(arr[1], Value::String("val2".to_string()));
1175            }
1176            _ => panic!("Expected array for multi-value header"),
1177        }
1178
1179        // Empty value should still be a string
1180        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1181    }
1182
1183    #[sqlx::test]
1184    async fn test_timestamp_filtering(pool: PgPool) {
1185        // Run migrations first
1186        crate::migrator().run(&pool).await.unwrap();
1187
1188        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1189            .await
1190            .unwrap();
1191        let repository = handler.repository();
1192
1193        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
1194
1195        // Insert requests at different times
1196        let times = [
1197            base_time + Duration::from_secs(0),    // correlation_id 4001
1198            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
1199            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
1200        ];
1201
1202        for (i, timestamp) in times.iter().enumerate() {
1203            let correlation_id = 4001 + i as u64;
1204            let request_data = RequestData {
1205                method: http::Method::GET,
1206                uri: "/test".parse().unwrap(),
1207                headers: HashMap::new(),
1208                body: None,
1209                timestamp: *timestamp,
1210                correlation_id,
1211                trace_id: None,
1212                span_id: None,
1213            };
1214
1215            handler.handle_request(request_data).await;
1216        }
1217
1218        // Test timestamp_after filter
1219        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
1220        let filter = RequestFilter {
1221            timestamp_after: Some(after_time),
1222            ..Default::default()
1223        };
1224        let results = repository.query(filter).await.unwrap();
1225        assert_eq!(results.len(), 2); // Should get 4002 and 4003
1226
1227        // Test timestamp_before filter
1228        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
1229        let filter = RequestFilter {
1230            timestamp_before: Some(before_time),
1231            ..Default::default()
1232        };
1233        let results = repository.query(filter).await.unwrap();
1234        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1235
1236        // Test timestamp range
1237        let filter = RequestFilter {
1238            timestamp_after: Some(after_time),
1239            timestamp_before: Some(before_time),
1240            ..Default::default()
1241        };
1242        let results = repository.query(filter).await.unwrap();
1243        assert_eq!(results.len(), 1); // Should get only 4002
1244        assert_eq!(results[0].request.correlation_id, 4002);
1245    }
1246
1247    // Note: Path filtering tests have been removed because path filtering
1248    // now happens at the outlet middleware layer, not in the PostgresHandler.
1249    // The handler now logs everything it receives, with filtering done upstream.
1250
1251    #[sqlx::test]
1252    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1253        // Run migrations first
1254        crate::migrator().run(&pool).await.unwrap();
1255
1256        // Handler without any path filtering
1257        let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
1258            .await
1259            .unwrap();
1260        let repository = handler.repository();
1261
1262        let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
1263        for (i, uri) in test_uris.iter().enumerate() {
1264            let correlation_id = 3000 + i as u64;
1265            let mut headers = HashMap::new();
1266            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1267
1268            let request_data = RequestData {
1269                method: http::Method::GET,
1270                uri: uri.parse().unwrap(),
1271                headers,
1272                body: Some(Bytes::from(b"{}".to_vec())),
1273                timestamp: SystemTime::now(),
1274                correlation_id,
1275                trace_id: None,
1276                span_id: None,
1277            };
1278
1279            handler.handle_request(request_data).await;
1280        }
1281
1282        // Should have logged all 4 requests
1283        let filter = RequestFilter::default();
1284        let results = repository.query(filter).await.unwrap();
1285        assert_eq!(results.len(), 4);
1286    }
1287
1288    // Tests for read/write pool separation using TestDbPools
1289    #[sqlx::test]
1290    async fn test_write_operations_use_write_pool(pool: PgPool) {
1291        // Run migrations first
1292        crate::migrator().run(&pool).await.unwrap();
1293
1294        // Create TestDbPools which has a read-only replica
1295        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1296        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1297            .await
1298            .unwrap();
1299
1300        let mut request_data = create_test_request_data();
1301        let correlation_id = 5001;
1302        request_data.correlation_id = correlation_id;
1303
1304        // This should succeed because handle_request uses .write() which goes to primary
1305        handler.handle_request(request_data.clone()).await;
1306
1307        // Verify the write succeeded by reading from the primary pool
1308        let count: i64 =
1309            sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
1310                .bind(correlation_id as i64)
1311                .fetch_one(test_pools.write())
1312                .await
1313                .unwrap();
1314
1315        assert_eq!(count, 1, "Request should be written to primary pool");
1316    }
1317
1318    #[sqlx::test]
1319    async fn test_response_write_uses_write_pool(pool: PgPool) {
1320        // Run migrations first
1321        crate::migrator().run(&pool).await.unwrap();
1322
1323        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1324        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1325            .await
1326            .unwrap();
1327
1328        let mut request_data = create_test_request_data();
1329        let mut response_data = create_test_response_data();
1330        let correlation_id = 5002;
1331        request_data.correlation_id = correlation_id;
1332        response_data.correlation_id = correlation_id;
1333
1334        // Write request first
1335        handler.handle_request(request_data.clone()).await;
1336
1337        // Write response - should succeed because it uses .write()
1338        handler.handle_response(request_data, response_data).await;
1339
1340        // Verify both were written
1341        let count: i64 =
1342            sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
1343                .bind(correlation_id as i64)
1344                .fetch_one(test_pools.write())
1345                .await
1346                .unwrap();
1347
1348        assert_eq!(count, 1, "Response should be written to primary pool");
1349    }
1350
1351    #[sqlx::test]
1352    async fn test_repository_queries_use_read_pool(pool: PgPool) {
1353        // Run migrations first
1354        crate::migrator().run(&pool).await.unwrap();
1355
1356        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1357        let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
1358            .await
1359            .unwrap();
1360
1361        // Write some data using the handler (which uses write pool)
1362        let mut request_data = create_test_request_data();
1363        let correlation_id = 5003;
1364        request_data.correlation_id = correlation_id;
1365        handler.handle_request(request_data).await;
1366
1367        // Query using repository - should succeed because it uses .read()
1368        let repository = handler.repository();
1369        let filter = RequestFilter {
1370            correlation_id: Some(correlation_id as i64),
1371            ..Default::default()
1372        };
1373
1374        // This will succeed if repository.query() correctly uses .read()
1375        let results = repository.query(filter).await.unwrap();
1376        assert_eq!(results.len(), 1);
1377        assert_eq!(results[0].request.correlation_id, correlation_id as i64);
1378    }
1379
1380    #[sqlx::test]
1381    async fn test_replica_pool_rejects_writes(pool: PgPool) {
1382        // Run migrations first
1383        crate::migrator().run(&pool).await.unwrap();
1384
1385        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1386
1387        // Verify that the replica pool is actually read-only
1388        let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
1389            .bind(Uuid::new_v4())
1390            .bind(9999i64)
1391            .bind(Utc::now())
1392            .bind("GET")
1393            .bind("/test")
1394            .bind(serde_json::json!({}))
1395            .bind(None::<Value>)
1396            .bind(false)
1397            .execute(test_pools.read())
1398            .await;
1399
1400        // Should fail with a read-only transaction error
1401        assert!(
1402            result.is_err(),
1403            "Replica pool should reject write operations"
1404        );
1405
1406        let err = result.unwrap_err();
1407        let err_msg = err.to_string().to_lowercase();
1408        assert!(
1409            err_msg.contains("read-only") || err_msg.contains("read only"),
1410            "Error should mention read-only: {}",
1411            err
1412        );
1413    }
1414
1415    #[sqlx::test]
1416    async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
1417        // Run migrations first
1418        crate::migrator().run(&pool).await.unwrap();
1419
1420        let test_pools = crate::TestDbPools::new(pool).await.unwrap();
1421        let handler =
1422            PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
1423                .await
1424                .unwrap();
1425
1426        let mut request_data = create_test_request_data();
1427        let mut response_data = create_test_response_data();
1428        let correlation_id = 5004;
1429        request_data.correlation_id = correlation_id;
1430        response_data.correlation_id = correlation_id;
1431
1432        // Write request and response (uses write pool)
1433        handler.handle_request(request_data.clone()).await;
1434        handler.handle_response(request_data, response_data).await;
1435
1436        // Query back using repository (uses read pool)
1437        let repository = handler.repository();
1438        let filter = RequestFilter {
1439            correlation_id: Some(correlation_id as i64),
1440            ..Default::default()
1441        };
1442
1443        let results = repository.query(filter).await.unwrap();
1444        assert_eq!(results.len(), 1);
1445
1446        // Verify request data
1447        let pair = &results[0];
1448        assert_eq!(pair.request.correlation_id, correlation_id as i64);
1449        assert_eq!(pair.request.method, "POST");
1450        assert_eq!(pair.request.uri, "/api/users");
1451
1452        // Verify response data
1453        let response = pair.response.as_ref().expect("Response should exist");
1454        assert_eq!(response.correlation_id, correlation_id as i64);
1455        assert_eq!(response.status_code, 201);
1456
1457        // Verify parsed bodies
1458        match &pair.request.body {
1459            Some(Ok(parsed_body)) => {
1460                assert_eq!(
1461                    *parsed_body,
1462                    TestRequest {
1463                        user_id: 123,
1464                        action: "create_user".to_string(),
1465                    }
1466                );
1467            }
1468            _ => panic!("Expected successfully parsed request body"),
1469        }
1470
1471        match &response.body {
1472            Some(Ok(parsed_body)) => {
1473                assert_eq!(
1474                    *parsed_body,
1475                    TestResponse {
1476                        success: true,
1477                        message: "User created successfully".to_string(),
1478                    }
1479                );
1480            }
1481            _ => panic!("Expected successfully parsed response body"),
1482        }
1483    }
1484
1485    // -----------------------------------------------------------------------
1486    // Batch INSERT tests
1487    // -----------------------------------------------------------------------
1488
1489    #[sqlx::test]
1490    async fn test_request_batch_insert(pool: PgPool) {
1491        crate::migrator().run(&pool).await.unwrap();
1492        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1493            .await
1494            .unwrap();
1495
1496        let mut batch = Vec::new();
1497        for i in 0..5 {
1498            let mut req = create_test_request_data();
1499            req.correlation_id = 1000 + i;
1500            req.uri = format!("/api/batch/{i}").parse().unwrap();
1501            batch.push(req);
1502        }
1503
1504        handler.handle_request_batch(&batch).await;
1505
1506        // Verify all 5 rows were inserted
1507        let count: (i64,) = sqlx::query_as(
1508            "SELECT COUNT(*) FROM http_requests WHERE correlation_id BETWEEN 1000 AND 1004",
1509        )
1510        .fetch_one(&pool)
1511        .await
1512        .unwrap();
1513        assert_eq!(count.0, 5);
1514    }
1515
1516    #[sqlx::test]
1517    async fn test_response_batch_insert(pool: PgPool) {
1518        crate::migrator().run(&pool).await.unwrap();
1519        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1520            .await
1521            .unwrap();
1522
1523        // Insert matching requests first
1524        let mut pairs = Vec::new();
1525        for i in 0..3 {
1526            let mut req = create_test_request_data();
1527            req.correlation_id = 2000 + i;
1528            handler.handle_request(req.clone()).await;
1529
1530            let mut res = create_test_response_data();
1531            res.correlation_id = 2000 + i;
1532            pairs.push((req, res));
1533        }
1534
1535        handler.handle_response_batch(&pairs).await;
1536
1537        // Verify all 3 response rows were inserted
1538        let count: (i64,) = sqlx::query_as(
1539            "SELECT COUNT(*) FROM http_responses WHERE correlation_id BETWEEN 2000 AND 2002",
1540        )
1541        .fetch_one(&pool)
1542        .await
1543        .unwrap();
1544        assert_eq!(count.0, 3);
1545    }
1546
1547    #[sqlx::test]
1548    async fn test_batch_with_mixed_bodies(pool: PgPool) {
1549        crate::migrator().run(&pool).await.unwrap();
1550        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1551            .await
1552            .unwrap();
1553
1554        let mut batch = Vec::new();
1555
1556        // Request with body
1557        let mut req_with_body = create_test_request_data();
1558        req_with_body.correlation_id = 3000;
1559        batch.push(req_with_body);
1560
1561        // Request without body
1562        let mut req_no_body = create_test_request_data();
1563        req_no_body.correlation_id = 3001;
1564        req_no_body.body = None;
1565        batch.push(req_no_body);
1566
1567        // Request with unparseable body
1568        let mut req_bad_body = create_test_request_data();
1569        req_bad_body.correlation_id = 3002;
1570        req_bad_body.body = Some(Bytes::from("not valid json"));
1571        batch.push(req_bad_body);
1572
1573        handler.handle_request_batch(&batch).await;
1574
1575        // All 3 should be inserted
1576        let count: (i64,) = sqlx::query_as(
1577            "SELECT COUNT(*) FROM http_requests WHERE correlation_id BETWEEN 3000 AND 3002",
1578        )
1579        .fetch_one(&pool)
1580        .await
1581        .unwrap();
1582        assert_eq!(count.0, 3);
1583
1584        // Check body_parsed flags
1585        let rows: Vec<(i64, Option<bool>)> = sqlx::query_as(
1586            "SELECT correlation_id, body_parsed FROM http_requests WHERE correlation_id BETWEEN 3000 AND 3002 ORDER BY correlation_id",
1587        )
1588        .fetch_all(&pool)
1589        .await
1590        .unwrap();
1591
1592        assert_eq!(rows[0].1, Some(true)); // parsed JSON
1593        assert_eq!(rows[1].1, Some(false)); // no body
1594        assert_eq!(rows[2].1, Some(false)); // fallback string
1595    }
1596
1597    #[sqlx::test]
1598    async fn test_empty_batch_is_noop(pool: PgPool) {
1599        crate::migrator().run(&pool).await.unwrap();
1600        let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
1601            .await
1602            .unwrap();
1603
1604        // Should not error
1605        handler.handle_request_batch(&[]).await;
1606        handler.handle_response_batch(&[]).await;
1607    }
1608
1609    #[sqlx::test]
1610    async fn test_batch_write_uses_write_pool(pool: PgPool) {
1611        use sqlx_pool_router::TestDbPools;
1612        crate::migrator().run(&pool).await.unwrap();
1613        let test_pools = TestDbPools::new(pool).await.unwrap();
1614        let handler =
1615            PostgresHandler::<TestDbPools, TestRequest, TestResponse>::from_pool_provider(
1616                test_pools,
1617            )
1618            .await
1619            .unwrap();
1620
1621        let mut req = create_test_request_data();
1622        req.correlation_id = 4000;
1623        handler.handle_request_batch(&[req.clone()]).await;
1624
1625        let res = create_test_response_data();
1626        handler.handle_response_batch(&[(req, res)]).await;
1627    }
1628}