outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use http::Uri;
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use tracing::{debug, error, instrument};
93use uuid::Uuid;
94
95pub mod error;
96pub mod repository;
97pub use error::PostgresHandlerError;
98pub use repository::{
99    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
100};
101
102/// Get the migrator for running outlet-postgres database migrations.
103///
104/// This returns a SQLx migrator that can be used to set up the required
105/// `http_requests` and `http_responses` tables. The consuming application
106/// is responsible for running these migrations at the appropriate time
107/// and in the appropriate database schema.
108///
109/// # Examples
110///
111/// ```rust,no_run
112/// use outlet_postgres::migrator;
113/// use sqlx::PgPool;
114///
115/// #[tokio::main]
116/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
117///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
118///     
119///     // Run outlet migrations
120///     migrator().run(&pool).await?;
121///     
122///     Ok(())
123/// }
124/// ```
125pub fn migrator() -> sqlx::migrate::Migrator {
126    sqlx::migrate!("./migrations")
127}
128
129/// Type alias for request body serializers.
130///
131/// Request serializers take full request context including headers and body bytes.
132/// On failure, they return a `SerializationError` with fallback data.
133type RequestSerializer<T> =
134    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
135
136/// Type alias for response body serializers.
137///
138/// Response serializers take both request and response context, allowing them to
139/// make parsing decisions based on request details and response headers (e.g., compression).
140/// On failure, they return a `SerializationError` with fallback data.
141type ResponseSerializer<T> = Arc<
142    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
143        + Send
144        + Sync,
145>;
146
147/// PostgreSQL handler for outlet middleware.
148///
149/// Implements the `RequestHandler` trait to log HTTP requests and responses
150/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
151///
152/// Generic over `TReq` and `TRes` which represent the request and response body types
153/// that should implement `Deserialize` for parsing and `Serialize` for database storage as JSONB.
154/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
155#[derive(Clone)]
156pub struct PostgresHandler<TReq = Value, TRes = Value>
157where
158    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160{
161    pool: PgPool,
162    request_serializer: RequestSerializer<TReq>,
163    response_serializer: ResponseSerializer<TRes>,
164    path_filter: Option<PathFilter>,
165    instance_id: Uuid,
166}
167
168/// Filter configuration for determining which requests to log.
169#[derive(Clone, Debug)]
170pub struct PathFilter {
171    /// Only log requests whose URI path starts with any of these prefixes.
172    /// If empty, all requests are logged.
173    pub allowed_prefixes: Vec<String>,
174    /// Skip requests whose URI path starts with any of these prefixes.
175    /// Takes precedence over allowed_prefixes.
176    pub blocked_prefixes: Vec<String>,
177}
178
179impl<TReq, TRes> PostgresHandler<TReq, TRes>
180where
181    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
182    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
183{
184    /// Default serializer that attempts serde JSON deserialization.
185    /// On failure, returns a SerializationError with raw bytes as fallback data.
186    fn default_request_serializer() -> RequestSerializer<TReq> {
187        Arc::new(|request_data| {
188            let bytes = request_data.body.as_deref().unwrap_or(&[]);
189            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
190                let fallback_data = String::from_utf8_lossy(bytes).to_string();
191                SerializationError::new(fallback_data, error)
192            })
193        })
194    }
195
196    /// Default serializer that attempts serde JSON deserialization.
197    /// On failure, returns a SerializationError with raw bytes as fallback data.
198    fn default_response_serializer() -> ResponseSerializer<TRes> {
199        Arc::new(|_request_data, response_data| {
200            let bytes = response_data.body.as_deref().unwrap_or(&[]);
201            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
202                let fallback_data = String::from_utf8_lossy(bytes).to_string();
203                SerializationError::new(fallback_data, error)
204            })
205        })
206    }
207
208    /// Create a new PostgreSQL handler with a connection pool.
209    ///
210    /// This will connect to the database but will NOT run migrations.
211    /// Use `migrator()` to get a migrator and run migrations separately.
212    ///
213    /// # Arguments
214    ///
215    /// * `database_url` - PostgreSQL connection string
216    ///
217    /// # Examples
218    ///
219    /// ```rust,no_run
220    /// use outlet_postgres::{PostgresHandler, migrator};
221    /// use serde::{Deserialize, Serialize};
222    /// use serde_json::Value;
223    ///
224    /// #[derive(Deserialize, Serialize)]
225    /// struct MyBodyType {
226    ///     id: u64,
227    ///     name: String,
228    /// }
229    ///
230    /// #[tokio::main]
231    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
232    ///     // Run migrations first
233    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
234    ///     migrator().run(&pool).await?;
235    ///     
236    ///     // Create handler
237    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
238    ///     Ok(())
239    /// }
240    /// ```
241    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
242        let pool = PgPool::connect(database_url)
243            .await
244            .map_err(PostgresHandlerError::Connection)?;
245
246        Ok(Self {
247            pool,
248            request_serializer: Self::default_request_serializer(),
249            response_serializer: Self::default_response_serializer(),
250            path_filter: None,
251            instance_id: Uuid::new_v4(),
252        })
253    }
254
255    /// Add a custom request body serializer.
256    ///
257    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
258    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
259    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
260    ///
261    /// # Panics
262    ///
263    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
264    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
265    /// implementation of `TReq` and should be fixed by the caller.
266    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
267    where
268        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
269    {
270        self.request_serializer = Arc::new(serializer);
271        self
272    }
273
274    /// Add a custom response body serializer.
275    ///
276    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
277    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
278    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
279    ///
280    /// # Panics
281    ///
282    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
283    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
284    /// implementation of `TRes` and should be fixed by the caller.
285    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
286    where
287        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
288            + Send
289            + Sync
290            + 'static,
291    {
292        self.response_serializer = Arc::new(serializer);
293        self
294    }
295
296    /// Add path filtering to only log requests for specific URI prefixes.
297    ///
298    /// # Examples
299    ///
300    /// ```rust,no_run
301    /// use outlet_postgres::{PostgresHandler, PathFilter};
302    ///
303    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
304    /// use serde_json::Value;
305    /// let handler = PostgresHandler::<Value, Value>::new("postgresql://user:pass@localhost/db")
306    ///     .await?
307    ///     .with_path_filter(PathFilter {
308    ///         allowed_prefixes: vec!["/api/".to_string(), "/webhook/".to_string()],
309    ///         blocked_prefixes: vec!["/api/health".to_string()],
310    ///     });
311    /// # Ok(())
312    /// # }
313    /// ```
314    pub fn with_path_filter(mut self, filter: PathFilter) -> Self {
315        self.path_filter = Some(filter);
316        self
317    }
318
319    /// Add simple path prefix filtering - only log requests starting with the given prefix.
320    ///
321    /// # Examples
322    ///
323    /// ```rust,no_run
324    /// use outlet_postgres::PostgresHandler;
325    ///
326    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327    /// use serde_json::Value;
328    /// let handler = PostgresHandler::<Value, Value>::new("postgresql://user:pass@localhost/db")
329    ///     .await?
330    ///     .with_path_prefix("/api/");
331    /// # Ok(())
332    /// # }
333    /// ```
334    pub fn with_path_prefix(mut self, prefix: &str) -> Self {
335        self.path_filter = Some(PathFilter {
336            allowed_prefixes: vec![prefix.to_string()],
337            blocked_prefixes: vec![],
338        });
339        self
340    }
341
342    /// Create a PostgreSQL handler from an existing connection pool.
343    ///
344    /// Use this if you already have a connection pool and want to reuse it.
345    /// This will NOT run migrations - use `migrator()` to run migrations separately.
346    ///
347    /// # Arguments
348    ///
349    /// * `pool` - Existing PostgreSQL connection pool
350    ///
351    /// # Examples
352    ///
353    /// ```rust,no_run
354    /// use outlet_postgres::{PostgresHandler, migrator};
355    /// use sqlx::PgPool;
356    /// use serde::{Deserialize, Serialize};
357    ///
358    /// #[derive(Deserialize, Serialize)]
359    /// struct MyBodyType {
360    ///     id: u64,
361    ///     name: String,
362    /// }
363    ///
364    /// #[tokio::main]
365    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
366    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
367    ///     
368    ///     // Run migrations first
369    ///     migrator().run(&pool).await?;
370    ///     
371    ///     // Create handler
372    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::from_pool(pool).await?;
373    ///     Ok(())
374    /// }
375    /// ```
376    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
377        Ok(Self {
378            pool,
379            request_serializer: Self::default_request_serializer(),
380            response_serializer: Self::default_response_serializer(),
381            path_filter: None,
382            instance_id: Uuid::new_v4(),
383        })
384    }
385
386    /// Convert headers to a JSONB-compatible format.
387    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
388        let mut header_map = HashMap::new();
389        for (name, values) in headers {
390            if values.len() == 1 {
391                let value_str = String::from_utf8_lossy(&values[0]).to_string();
392                header_map.insert(name.clone(), Value::String(value_str));
393            } else {
394                let value_array: Vec<Value> = values
395                    .iter()
396                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
397                    .collect();
398                header_map.insert(name.clone(), Value::Array(value_array));
399            }
400        }
401        serde_json::to_value(header_map).unwrap_or(Value::Null)
402    }
403
404    /// Convert request data to a JSONB value using the configured serializer.
405    fn request_body_to_json_with_fallback(
406        &self,
407        request_data: &outlet::RequestData,
408    ) -> (Value, bool) {
409        match (self.request_serializer)(request_data) {
410            Ok(typed_value) => {
411                if let Ok(json_value) = serde_json::to_value(&typed_value) {
412                    (json_value, true)
413                } else {
414                    // This should never happen if the type implements Serialize correctly
415                    (
416                        Value::String(
417                            serde_json::to_string(&typed_value)
418                                .expect("Serialized value must be convertible to JSON string"),
419                        ),
420                        false,
421                    )
422                }
423            }
424            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
425        }
426    }
427
428    /// Convert response data to a JSONB value using the configured serializer.
429    fn response_body_to_json_with_fallback(
430        &self,
431        request_data: &outlet::RequestData,
432        response_data: &outlet::ResponseData,
433    ) -> (Value, bool) {
434        match (self.response_serializer)(request_data, response_data) {
435            Ok(typed_value) => {
436                if let Ok(json_value) = serde_json::to_value(&typed_value) {
437                    (json_value, true)
438                } else {
439                    // This should never happen if the type implements Serialize correctly
440                    (
441                        Value::String(
442                            serde_json::to_string(&typed_value)
443                                .expect("Serialized value must be convertible to JSON string"),
444                        ),
445                        false,
446                    )
447                }
448            }
449            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
450        }
451    }
452
453    /// Check if a request URI should be logged based on the configured path filter.
454    fn should_log_request(&self, uri: &Uri) -> bool {
455        let path = uri.path();
456        debug!(%path, "Evaluating prefix");
457        let Some(filter) = &self.path_filter else {
458            return true; // No filter means log everything
459        };
460
461        // Check blocked prefixes first (they take precedence)
462        for blocked_prefix in &filter.blocked_prefixes {
463            if path.starts_with(blocked_prefix) {
464                return false;
465            }
466        }
467
468        // If no allowed prefixes specified, allow everything (after blocked check)
469        if filter.allowed_prefixes.is_empty() {
470            return true;
471        }
472
473        // Check if URI matches any allowed prefix
474        filter
475            .allowed_prefixes
476            .iter()
477            .any(|prefix| path.starts_with(prefix))
478    }
479
480    /// Get a repository for querying logged requests and responses.
481    ///
482    /// Returns a `RequestRepository` with the same type parameters as this handler,
483    /// allowing for type-safe querying of request and response bodies.
484    pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
485        crate::repository::RequestRepository::new(self.pool.clone())
486    }
487}
488
489impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
490where
491    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
492    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
493{
494    #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
495    async fn handle_request(&self, data: RequestData) {
496        // Check if this request should be logged
497        if !self.should_log_request(&data.uri) {
498            debug!(correlation_id = %data.correlation_id, uri = %data.uri, "Skipping request due to path filter");
499            return;
500        }
501
502        let headers_json = Self::headers_to_json(&data.headers);
503        let (body_json, parsed) = if data.body.is_some() {
504            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
505            (Some(json), parsed)
506        } else {
507            (None, false)
508        };
509
510        let timestamp: DateTime<Utc> = data.timestamp.into();
511
512        let result = sqlx::query(
513            r#"
514            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
515            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
516            "#,
517        )
518        .bind(self.instance_id)
519        .bind(data.correlation_id as i64)
520        .bind(timestamp)
521        .bind(data.method.to_string())
522        .bind(data.uri.to_string())
523        .bind(headers_json)
524        .bind(body_json)
525        .bind(parsed)
526        .execute(&self.pool)
527        .await;
528
529        if let Err(e) = result {
530            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
531        } else {
532            debug!(correlation_id = %data.correlation_id, "Request data inserted successfully");
533        }
534    }
535
536    #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
537    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
538        if !self.should_log_request(&request_data.uri) {
539            debug!(correlation_id = %request_data.correlation_id, uri = %request_data.uri, "Skipping response due to path filter");
540            return;
541        }
542        let headers_json = Self::headers_to_json(&response_data.headers);
543        let (body_json, parsed) = if response_data.body.is_some() {
544            let (json, parsed) =
545                self.response_body_to_json_with_fallback(&request_data, &response_data);
546            (Some(json), parsed)
547        } else {
548            (None, false)
549        };
550
551        let timestamp: DateTime<Utc> = response_data.timestamp.into();
552        let duration_ms = response_data.duration.as_millis() as i64;
553        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
554
555        let result = sqlx::query(
556            r#"
557            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
558            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
559            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
560            "#,
561        )
562        .bind(self.instance_id)
563        .bind(request_data.correlation_id as i64)
564        .bind(timestamp)
565        .bind(response_data.status.as_u16() as i32)
566        .bind(headers_json)
567        .bind(body_json)
568        .bind(parsed)
569        .bind(duration_to_first_byte_ms)
570        .bind(duration_ms)
571        .execute(&self.pool)
572        .await;
573
574        match result {
575            Err(e) => {
576                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
577            }
578            Ok(query_result) => {
579                if query_result.rows_affected() > 0 {
580                    debug!(correlation_id = %request_data.correlation_id, "Response data inserted successfully");
581                } else {
582                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
583                }
584            }
585        }
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use bytes::Bytes;
593    use chrono::{DateTime, Utc};
594    use outlet::{RequestData, ResponseData};
595    use serde::{Deserialize, Serialize};
596    use serde_json::Value;
597    use sqlx::PgPool;
598    use std::collections::HashMap;
599    use std::time::{Duration, SystemTime};
600
601    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
602    struct TestRequest {
603        user_id: u64,
604        action: String,
605    }
606
607    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
608    struct TestResponse {
609        success: bool,
610        message: String,
611    }
612
613    fn create_test_request_data() -> RequestData {
614        let mut headers = HashMap::new();
615        headers.insert("content-type".to_string(), vec!["application/json".into()]);
616        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
617
618        let test_req = TestRequest {
619            user_id: 123,
620            action: "create_user".to_string(),
621        };
622        let body = serde_json::to_vec(&test_req).unwrap();
623
624        RequestData {
625            method: http::Method::POST,
626            uri: http::Uri::from_static("/api/users"),
627            headers,
628            body: Some(Bytes::from(body)),
629            timestamp: SystemTime::now(),
630            correlation_id: 0,
631        }
632    }
633
634    fn create_test_response_data() -> ResponseData {
635        let mut headers = HashMap::new();
636        headers.insert("content-type".to_string(), vec!["application/json".into()]);
637
638        let test_res = TestResponse {
639            success: true,
640            message: "User created successfully".to_string(),
641        };
642        let body = serde_json::to_vec(&test_res).unwrap();
643
644        ResponseData {
645            status: http::StatusCode::CREATED,
646            headers,
647            body: Some(Bytes::from(body)),
648            timestamp: SystemTime::now(),
649            duration_to_first_byte: Duration::from_millis(100),
650            duration: Duration::from_millis(150),
651            correlation_id: 0,
652        }
653    }
654
655    #[sqlx::test]
656    async fn test_handler_creation(pool: PgPool) {
657        // Run migrations first
658        crate::migrator().run(&pool).await.unwrap();
659
660        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
661            .await
662            .unwrap();
663
664        // Verify we can get a repository
665        let repository = handler.repository();
666
667        // Test initial state - no requests logged yet
668        let filter = RequestFilter::default();
669        let results = repository.query(filter).await.unwrap();
670        assert!(results.is_empty());
671    }
672
673    #[sqlx::test]
674    async fn test_handle_request_with_typed_body(pool: PgPool) {
675        // Run migrations first
676        crate::migrator().run(&pool).await.unwrap();
677
678        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
679            .await
680            .unwrap();
681        let repository = handler.repository();
682
683        let mut request_data = create_test_request_data();
684        let correlation_id = 12345;
685        request_data.correlation_id = correlation_id;
686
687        // Handle the request
688        handler.handle_request(request_data.clone()).await;
689
690        // Query back the request
691        let filter = RequestFilter {
692            correlation_id: Some(correlation_id as i64),
693            ..Default::default()
694        };
695        let results = repository.query(filter).await.unwrap();
696
697        assert_eq!(results.len(), 1);
698        let pair = &results[0];
699
700        assert_eq!(pair.request.correlation_id, correlation_id as i64);
701        assert_eq!(pair.request.method, "POST");
702        assert_eq!(pair.request.uri, "/api/users");
703
704        // Check that body was parsed successfully
705        match &pair.request.body {
706            Some(Ok(parsed_body)) => {
707                assert_eq!(
708                    *parsed_body,
709                    TestRequest {
710                        user_id: 123,
711                        action: "create_user".to_string(),
712                    }
713                );
714            }
715            _ => panic!("Expected successfully parsed request body"),
716        }
717
718        // Headers should be converted to JSON properly
719        let headers_value = &pair.request.headers;
720        assert!(headers_value.get("content-type").is_some());
721        assert!(headers_value.get("user-agent").is_some());
722
723        // No response yet
724        assert!(pair.response.is_none());
725    }
726
727    #[sqlx::test]
728    async fn test_handle_response_with_typed_body(pool: PgPool) {
729        // Run migrations first
730        crate::migrator().run(&pool).await.unwrap();
731
732        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
733            .await
734            .unwrap();
735        let repository = handler.repository();
736
737        let mut request_data = create_test_request_data();
738        let mut response_data = create_test_response_data();
739        let correlation_id = 54321;
740        request_data.correlation_id = correlation_id;
741        response_data.correlation_id = correlation_id;
742
743        // Handle both request and response
744        handler.handle_request(request_data.clone()).await;
745        handler
746            .handle_response(request_data, response_data.clone())
747            .await;
748
749        // Query back the complete pair
750        let filter = RequestFilter {
751            correlation_id: Some(correlation_id as i64),
752            ..Default::default()
753        };
754        let results = repository.query(filter).await.unwrap();
755
756        assert_eq!(results.len(), 1);
757        let pair = &results[0];
758
759        // Check response data
760        let response = pair.response.as_ref().expect("Response should be present");
761        assert_eq!(response.correlation_id, correlation_id as i64);
762        assert_eq!(response.status_code, 201);
763        assert_eq!(response.duration_ms, 150);
764
765        // Check that response body was parsed successfully
766        match &response.body {
767            Some(Ok(parsed_body)) => {
768                assert_eq!(
769                    *parsed_body,
770                    TestResponse {
771                        success: true,
772                        message: "User created successfully".to_string(),
773                    }
774                );
775            }
776            _ => panic!("Expected successfully parsed response body"),
777        }
778    }
779
780    #[sqlx::test]
781    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
782        // Run migrations first
783        crate::migrator().run(&pool).await.unwrap();
784
785        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
786            .await
787            .unwrap();
788        let repository = handler.repository();
789
790        // Create request with invalid JSON for TestRequest
791        let mut headers = HashMap::new();
792        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
793
794        let invalid_json_body = b"not valid json for TestRequest";
795        let correlation_id = 99999;
796        let request_data = RequestData {
797            method: http::Method::POST,
798            uri: http::Uri::from_static("/api/test"),
799            headers,
800            body: Some(Bytes::from(invalid_json_body.to_vec())),
801            timestamp: SystemTime::now(),
802            correlation_id,
803        };
804
805        handler.handle_request(request_data).await;
806
807        // Query back and verify fallback to base64
808        let filter = RequestFilter {
809            correlation_id: Some(correlation_id as i64),
810            ..Default::default()
811        };
812        let results = repository.query(filter).await.unwrap();
813
814        assert_eq!(results.len(), 1);
815        let pair = &results[0];
816
817        // Should fallback to raw bytes
818        match &pair.request.body {
819            Some(Err(raw_bytes)) => {
820                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
821            }
822            _ => panic!("Expected raw bytes fallback for unparseable body"),
823        }
824    }
825
826    #[sqlx::test]
827    async fn test_query_with_multiple_filters(pool: PgPool) {
828        // Run migrations first
829        crate::migrator().run(&pool).await.unwrap();
830
831        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
832            .await
833            .unwrap();
834        let repository = handler.repository();
835
836        // Insert multiple requests with different characteristics
837        let test_cases = vec![
838            (1001, "GET", "/api/users", 200, 100),
839            (1002, "POST", "/api/users", 201, 150),
840            (1003, "GET", "/api/orders", 404, 50),
841            (1004, "PUT", "/api/users/123", 200, 300),
842        ];
843
844        for (correlation_id, method, uri, status, duration_ms) in test_cases {
845            let mut headers = HashMap::new();
846            headers.insert("content-type".to_string(), vec!["application/json".into()]);
847
848            let request_data = RequestData {
849                method: method.parse().unwrap(),
850                uri: uri.parse().unwrap(),
851                headers: headers.clone(),
852                body: Some(Bytes::from(b"{}".to_vec())),
853                timestamp: SystemTime::now(),
854                correlation_id,
855            };
856
857            let response_data = ResponseData {
858                correlation_id,
859                status: http::StatusCode::from_u16(status).unwrap(),
860                headers,
861                body: Some(Bytes::from(b"{}".to_vec())),
862                timestamp: SystemTime::now(),
863                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
864                duration: Duration::from_millis(duration_ms),
865            };
866
867            handler.handle_request(request_data.clone()).await;
868            handler.handle_response(request_data, response_data).await;
869        }
870
871        // Test method filter
872        let filter = RequestFilter {
873            method: Some("GET".to_string()),
874            ..Default::default()
875        };
876        let results = repository.query(filter).await.unwrap();
877        assert_eq!(results.len(), 2); // 1001, 1003
878
879        // Test status code filter
880        let filter = RequestFilter {
881            status_code: Some(200),
882            ..Default::default()
883        };
884        let results = repository.query(filter).await.unwrap();
885        assert_eq!(results.len(), 2); // 1001, 1004
886
887        // Test URI pattern filter
888        let filter = RequestFilter {
889            uri_pattern: Some("/api/users%".to_string()),
890            ..Default::default()
891        };
892        let results = repository.query(filter).await.unwrap();
893        assert_eq!(results.len(), 3); // 1001, 1002, 1004
894
895        // Test duration range filter
896        let filter = RequestFilter {
897            min_duration_ms: Some(100),
898            max_duration_ms: Some(200),
899            ..Default::default()
900        };
901        let results = repository.query(filter).await.unwrap();
902        assert_eq!(results.len(), 2); // 1001, 1002
903
904        // Test combined filters
905        let filter = RequestFilter {
906            method: Some("GET".to_string()),
907            status_code: Some(200),
908            ..Default::default()
909        };
910        let results = repository.query(filter).await.unwrap();
911        assert_eq!(results.len(), 1); // Only 1001
912        assert_eq!(results[0].request.correlation_id, 1001);
913    }
914
915    #[sqlx::test]
916    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
917        // Run migrations first
918        crate::migrator().run(&pool).await.unwrap();
919
920        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
921            .await
922            .unwrap();
923        let repository = handler.repository();
924
925        // Insert requests with known timestamps
926        let now = SystemTime::now();
927        for i in 0..5 {
928            let correlation_id = 2000 + i;
929            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
930
931            let mut headers = HashMap::new();
932            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
933
934            let request_data = RequestData {
935                method: http::Method::GET,
936                uri: "/api/test".parse().unwrap(),
937                headers,
938                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
939                timestamp,
940                correlation_id,
941            };
942
943            handler.handle_request(request_data).await;
944        }
945
946        // Test default ordering (ASC) with limit
947        let filter = RequestFilter {
948            limit: Some(3),
949            ..Default::default()
950        };
951        let results = repository.query(filter).await.unwrap();
952        assert_eq!(results.len(), 3);
953
954        // Should be in ascending timestamp order
955        for i in 0..2 {
956            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
957        }
958
959        // Test descending order with offset
960        let filter = RequestFilter {
961            order_by_timestamp_desc: true,
962            limit: Some(2),
963            offset: Some(1),
964            ..Default::default()
965        };
966        let results = repository.query(filter).await.unwrap();
967        assert_eq!(results.len(), 2);
968
969        // Should be in descending order, skipping the first (newest) one
970        assert!(results[0].request.timestamp >= results[1].request.timestamp);
971    }
972
973    #[sqlx::test]
974    async fn test_headers_conversion(pool: PgPool) {
975        // Run migrations first
976        crate::migrator().run(&pool).await.unwrap();
977
978        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
979            .await
980            .unwrap();
981        let repository = handler.repository();
982
983        // Test various header scenarios
984        let mut headers = HashMap::new();
985        headers.insert("single-value".to_string(), vec!["test".into()]);
986        headers.insert(
987            "multi-value".to_string(),
988            vec!["val1".into(), "val2".into()],
989        );
990        headers.insert("empty-value".to_string(), vec!["".into()]);
991
992        let request_data = RequestData {
993            correlation_id: 3000,
994            method: http::Method::GET,
995            uri: "/test".parse().unwrap(),
996            headers,
997            body: None,
998            timestamp: SystemTime::now(),
999        };
1000
1001        let correlation_id = 3000;
1002        handler.handle_request(request_data).await;
1003
1004        let filter = RequestFilter {
1005            correlation_id: Some(correlation_id as i64),
1006            ..Default::default()
1007        };
1008        let results = repository.query(filter).await.unwrap();
1009
1010        assert_eq!(results.len(), 1);
1011        let headers_json = &results[0].request.headers;
1012
1013        // Single value should be stored as string
1014        assert_eq!(
1015            headers_json["single-value"],
1016            Value::String("test".to_string())
1017        );
1018
1019        // Multi-value should be stored as array
1020        match &headers_json["multi-value"] {
1021            Value::Array(arr) => {
1022                assert_eq!(arr.len(), 2);
1023                assert_eq!(arr[0], Value::String("val1".to_string()));
1024                assert_eq!(arr[1], Value::String("val2".to_string()));
1025            }
1026            _ => panic!("Expected array for multi-value header"),
1027        }
1028
1029        // Empty value should still be a string
1030        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1031    }
1032
1033    #[sqlx::test]
1034    async fn test_timestamp_filtering(pool: PgPool) {
1035        // Run migrations first
1036        crate::migrator().run(&pool).await.unwrap();
1037
1038        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1039            .await
1040            .unwrap();
1041        let repository = handler.repository();
1042
1043        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
1044
1045        // Insert requests at different times
1046        let times = [
1047            base_time + Duration::from_secs(0),    // correlation_id 4001
1048            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
1049            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
1050        ];
1051
1052        for (i, timestamp) in times.iter().enumerate() {
1053            let correlation_id = 4001 + i as u64;
1054            let request_data = RequestData {
1055                method: http::Method::GET,
1056                uri: "/test".parse().unwrap(),
1057                headers: HashMap::new(),
1058                body: None,
1059                timestamp: *timestamp,
1060                correlation_id,
1061            };
1062
1063            handler.handle_request(request_data).await;
1064        }
1065
1066        // Test timestamp_after filter
1067        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
1068        let filter = RequestFilter {
1069            timestamp_after: Some(after_time),
1070            ..Default::default()
1071        };
1072        let results = repository.query(filter).await.unwrap();
1073        assert_eq!(results.len(), 2); // Should get 4002 and 4003
1074
1075        // Test timestamp_before filter
1076        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
1077        let filter = RequestFilter {
1078            timestamp_before: Some(before_time),
1079            ..Default::default()
1080        };
1081        let results = repository.query(filter).await.unwrap();
1082        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1083
1084        // Test timestamp range
1085        let filter = RequestFilter {
1086            timestamp_after: Some(after_time),
1087            timestamp_before: Some(before_time),
1088            ..Default::default()
1089        };
1090        let results = repository.query(filter).await.unwrap();
1091        assert_eq!(results.len(), 1); // Should get only 4002
1092        assert_eq!(results[0].request.correlation_id, 4002);
1093    }
1094
1095    #[sqlx::test]
1096    async fn test_path_filtering_allowed_prefix(pool: PgPool) {
1097        // Run migrations first
1098        crate::migrator().run(&pool).await.unwrap();
1099
1100        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1101            .await
1102            .unwrap()
1103            .with_path_prefix("/api/");
1104        let repository = handler.repository();
1105
1106        // Test requests - some should be logged, some shouldn't
1107        let test_cases = vec![
1108            ("/api/users", 1001, true),  // Should be logged
1109            ("/api/orders", 1002, true), // Should be logged
1110            ("/health", 1003, false),    // Should NOT be logged
1111            ("/metrics", 1004, false),   // Should NOT be logged
1112            ("/api/health", 1005, true), // Should be logged (starts with /api/)
1113        ];
1114
1115        for (uri, correlation_id, _should_log) in &test_cases {
1116            let mut headers = HashMap::new();
1117            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1118
1119            let request_data = RequestData {
1120                method: http::Method::GET,
1121                uri: uri.parse().unwrap(),
1122                headers: headers.clone(),
1123                body: Some(Bytes::from(b"{}".to_vec())),
1124                timestamp: SystemTime::now(),
1125                correlation_id: *correlation_id,
1126            };
1127
1128            let response_data = ResponseData {
1129                correlation_id: *correlation_id,
1130                status: http::StatusCode::OK,
1131                headers,
1132                body: Some(Bytes::from(b"{}".to_vec())),
1133                timestamp: SystemTime::now(),
1134                duration_to_first_byte: Duration::from_millis(80),
1135                duration: Duration::from_millis(100),
1136            };
1137
1138            handler.handle_request(request_data.clone()).await;
1139            handler.handle_response(request_data, response_data).await;
1140        }
1141
1142        // Query all requests and verify filtering worked
1143        let filter = RequestFilter::default();
1144        let results = repository.query(filter).await.unwrap();
1145
1146        // Should only have logged the /api/ requests (1001, 1002, 1005)
1147        assert_eq!(results.len(), 3);
1148        let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1149        assert!(logged_ids.contains(&1001));
1150        assert!(logged_ids.contains(&1002));
1151        assert!(logged_ids.contains(&1005));
1152        assert!(!logged_ids.contains(&1003));
1153        assert!(!logged_ids.contains(&1004));
1154
1155        // All logged requests should have corresponding responses
1156        for result in &results {
1157            assert!(result.response.is_some());
1158        }
1159    }
1160
1161    #[sqlx::test]
1162    async fn test_path_filtering_blocked_prefix(pool: PgPool) {
1163        // Run migrations first
1164        crate::migrator().run(&pool).await.unwrap();
1165
1166        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1167            .await
1168            .unwrap()
1169            .with_path_filter(PathFilter {
1170                allowed_prefixes: vec!["/api/".to_string()],
1171                blocked_prefixes: vec!["/api/health".to_string(), "/api/metrics".to_string()],
1172            });
1173        let repository = handler.repository();
1174
1175        // Test requests
1176        let test_cases = vec![
1177            ("http://localhost/api/users", 2001, true), // Should be logged
1178            ("http://localhost/api/health", 2002, false), // Should be BLOCKED
1179            ("http://localhost/api/metrics", 2003, false), // Should be BLOCKED
1180            ("http://localhost/api/orders", 2004, true), // Should be logged
1181            ("http://localhost/health", 2005, false),   // Not in allowed prefixes
1182        ];
1183
1184        for (uri, correlation_id, _should_log) in &test_cases {
1185            let mut headers = HashMap::new();
1186            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1187
1188            let request_data = RequestData {
1189                method: http::Method::GET,
1190                uri: uri.parse().unwrap(),
1191                headers: headers.clone(),
1192                body: Some(Bytes::from(b"{}".to_vec())),
1193                timestamp: SystemTime::now(),
1194                correlation_id: *correlation_id,
1195            };
1196
1197            let response_data = ResponseData {
1198                correlation_id: *correlation_id,
1199                status: http::StatusCode::OK,
1200                headers,
1201                body: Some(Bytes::from(b"{}".to_vec())),
1202                timestamp: SystemTime::now(),
1203                duration_to_first_byte: Duration::from_millis(80),
1204                duration: Duration::from_millis(100),
1205            };
1206
1207            handler.handle_request(request_data.clone()).await;
1208            handler.handle_response(request_data, response_data).await;
1209        }
1210
1211        // Should only have logged 2001 and 2004 (blocked prefixes take precedence)
1212        let filter = RequestFilter::default();
1213        let results = repository.query(filter).await.unwrap();
1214
1215        assert_eq!(results.len(), 2);
1216        let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1217        assert!(logged_ids.contains(&2001));
1218        assert!(logged_ids.contains(&2004));
1219        assert!(!logged_ids.contains(&2002)); // blocked
1220        assert!(!logged_ids.contains(&2003)); // blocked
1221        assert!(!logged_ids.contains(&2005)); // not in allowed prefixes
1222    }
1223
1224    #[sqlx::test]
1225    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1226        // Run migrations first
1227        crate::migrator().run(&pool).await.unwrap();
1228
1229        // Handler without any path filtering
1230        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1231            .await
1232            .unwrap();
1233        let repository = handler.repository();
1234
1235        let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1236        for (i, uri) in test_uris.iter().enumerate() {
1237            let correlation_id = 3000 + i as u64;
1238            let mut headers = HashMap::new();
1239            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1240
1241            let request_data = RequestData {
1242                method: http::Method::GET,
1243                uri: uri.parse().unwrap(),
1244                headers,
1245                body: Some(Bytes::from(b"{}".to_vec())),
1246                timestamp: SystemTime::now(),
1247                correlation_id,
1248            };
1249
1250            handler.handle_request(request_data).await;
1251        }
1252
1253        // Should have logged all 4 requests
1254        let filter = RequestFilter::default();
1255        let results = repository.query(filter).await.unwrap();
1256        assert_eq!(results.len(), 4);
1257    }
1258}