outlet_postgres/
lib.rs

1//! # outlet-postgres
2//!
3//! PostgreSQL logging handler for the outlet HTTP request/response middleware.
4//! This crate implements the `RequestHandler` trait from outlet to log HTTP
5//! requests and responses to PostgreSQL with JSONB serialization for bodies.
6//!
7//! ## Quick Start
8//!
9//! Basic usage:
10//!
11//! ```rust,no_run
12//! use outlet::{RequestLoggerLayer, RequestLoggerConfig};
13//! use outlet_postgres::PostgresHandler;
14//! use axum::{routing::get, Router};
15//! use tower::ServiceBuilder;
16//!
17//! async fn hello() -> &'static str {
18//!     "Hello, World!"
19//! }
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let database_url = "postgresql://user:password@localhost/dbname";
24//!     let handler: PostgresHandler = PostgresHandler::new(database_url).await?;
25//!     let layer = RequestLoggerLayer::new(RequestLoggerConfig::default(), handler);
26//!
27//!     let app = Router::new()
28//!         .route("/hello", get(hello))
29//!         .layer(ServiceBuilder::new().layer(layer));
30//!
31//!     let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
32//!     axum::serve(listener, app).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Features
38//!
39//! - **PostgreSQL Integration**: Uses sqlx for async PostgreSQL operations
40//! - **JSONB Bodies**: Serializes request/response bodies to JSONB fields
41//! - **Type-safe Querying**: Query logged data with typed request/response bodies
42//! - **Correlation**: Links requests and responses via correlation IDs
43//! - **Error Handling**: Graceful error handling with logging
44//! - **Flexible Serialization**: Generic error handling for custom serializer types
45
46/// Error type for serialization failures with fallback data.
47///
48/// When serializers fail to parse request/response bodies into structured types,
49/// this error provides both the parsing error details and fallback data that
50/// can be stored as a string representation.
51#[derive(Debug)]
52pub struct SerializationError {
53    /// The fallback representation of the data (e.g., base64-encoded, raw string)
54    pub fallback_data: String,
55    /// The underlying error that caused serialization to fail
56    pub error: Box<dyn std::error::Error + Send + Sync>,
57}
58
59impl SerializationError {
60    /// Create a new serialization error with fallback data
61    pub fn new(
62        fallback_data: String,
63        error: impl std::error::Error + Send + Sync + 'static,
64    ) -> Self {
65        Self {
66            fallback_data,
67            error: Box::new(error),
68        }
69    }
70}
71
72impl std::fmt::Display for SerializationError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Serialization failed: {}", self.error)
75    }
76}
77
78impl std::error::Error for SerializationError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        Some(self.error.as_ref())
81    }
82}
83
84use chrono::{DateTime, Utc};
85use http::Uri;
86use outlet::{RequestData, RequestHandler, ResponseData};
87use serde::{Deserialize, Serialize};
88use serde_json::Value;
89use sqlx::PgPool;
90use std::collections::HashMap;
91use std::sync::Arc;
92use tracing::{debug, error, instrument};
93use uuid::Uuid;
94
95pub mod error;
96pub mod repository;
97pub use error::PostgresHandlerError;
98pub use repository::{
99    HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
100};
101
102/// Get the migrator for running outlet-postgres database migrations.
103///
104/// This returns a SQLx migrator that can be used to set up the required
105/// `http_requests` and `http_responses` tables. The consuming application
106/// is responsible for running these migrations at the appropriate time
107/// and in the appropriate database schema.
108///
109/// # Examples
110///
111/// ```rust,no_run
112/// use outlet_postgres::migrator;
113/// use sqlx::PgPool;
114///
115/// #[tokio::main]
116/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
117///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
118///     
119///     // Run outlet migrations
120///     migrator().run(&pool).await?;
121///     
122///     Ok(())
123/// }
124/// ```
125pub fn migrator() -> sqlx::migrate::Migrator {
126    sqlx::migrate!("./migrations")
127}
128
129/// Type alias for request body serializers.
130///
131/// Request serializers take full request context including headers and body bytes.
132/// On failure, they return a `SerializationError` with fallback data.
133type RequestSerializer<T> =
134    Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
135
136/// Type alias for response body serializers.
137///
138/// Response serializers take both request and response context, allowing them to
139/// make parsing decisions based on request details and response headers (e.g., compression).
140/// On failure, they return a `SerializationError` with fallback data.
141type ResponseSerializer<T> = Arc<
142    dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
143        + Send
144        + Sync,
145>;
146
147/// PostgreSQL handler for outlet middleware.
148///
149/// Implements the `RequestHandler` trait to log HTTP requests and responses
150/// to PostgreSQL. Request and response bodies are serialized to JSONB fields.
151///
152/// Generic over `TReq` and `TRes` which represent the request and response body types
153/// that should implement `Deserialize` for parsing and `Serialize` for database storage as JSONB.
154/// Use `serde_json::Value` for flexible JSON storage, or custom structs for typed storage.
155#[derive(Clone)]
156pub struct PostgresHandler<TReq = Value, TRes = Value>
157where
158    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
159    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
160{
161    pool: PgPool,
162    request_serializer: RequestSerializer<TReq>,
163    response_serializer: ResponseSerializer<TRes>,
164    path_filter: Option<PathFilter>,
165    instance_id: Uuid,
166}
167
168/// Filter configuration for determining which requests to log.
169#[derive(Clone, Debug)]
170pub struct PathFilter {
171    /// Only log requests whose URI path starts with any of these prefixes.
172    /// If empty, all requests are logged.
173    pub allowed_prefixes: Vec<String>,
174    /// Skip requests whose URI path starts with any of these prefixes.
175    /// Takes precedence over allowed_prefixes.
176    pub blocked_prefixes: Vec<String>,
177}
178
179impl<TReq, TRes> PostgresHandler<TReq, TRes>
180where
181    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
182    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
183{
184    /// Default serializer that attempts serde JSON deserialization.
185    /// On failure, returns a SerializationError with raw bytes as fallback data.
186    fn default_request_serializer() -> RequestSerializer<TReq> {
187        Arc::new(|request_data| {
188            let bytes = request_data.body.as_deref().unwrap_or(&[]);
189            serde_json::from_slice::<TReq>(bytes).map_err(|error| {
190                let fallback_data = String::from_utf8_lossy(bytes).to_string();
191                SerializationError::new(fallback_data, error)
192            })
193        })
194    }
195
196    /// Default serializer that attempts serde JSON deserialization.
197    /// On failure, returns a SerializationError with raw bytes as fallback data.
198    fn default_response_serializer() -> ResponseSerializer<TRes> {
199        Arc::new(|_request_data, response_data| {
200            let bytes = response_data.body.as_deref().unwrap_or(&[]);
201            serde_json::from_slice::<TRes>(bytes).map_err(|error| {
202                let fallback_data = String::from_utf8_lossy(bytes).to_string();
203                SerializationError::new(fallback_data, error)
204            })
205        })
206    }
207
208    /// Create a new PostgreSQL handler with a connection pool.
209    ///
210    /// This will connect to the database but will NOT run migrations.
211    /// Use `migrator()` to get a migrator and run migrations separately.
212    ///
213    /// # Arguments
214    ///
215    /// * `database_url` - PostgreSQL connection string
216    ///
217    /// # Examples
218    ///
219    /// ```rust,no_run
220    /// use outlet_postgres::{PostgresHandler, migrator};
221    /// use serde::{Deserialize, Serialize};
222    /// use serde_json::Value;
223    ///
224    /// #[derive(Deserialize, Serialize)]
225    /// struct MyBodyType {
226    ///     id: u64,
227    ///     name: String,
228    /// }
229    ///
230    /// #[tokio::main]
231    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
232    ///     // Run migrations first
233    ///     let pool = sqlx::PgPool::connect("postgresql://user:pass@localhost/db").await?;
234    ///     migrator().run(&pool).await?;
235    ///     
236    ///     // Create handler
237    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::new("postgresql://user:pass@localhost/db").await?;
238    ///     Ok(())
239    /// }
240    /// ```
241    pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
242        let pool = PgPool::connect(database_url)
243            .await
244            .map_err(PostgresHandlerError::Connection)?;
245
246        Ok(Self {
247            pool,
248            request_serializer: Self::default_request_serializer(),
249            response_serializer: Self::default_response_serializer(),
250            path_filter: None,
251            instance_id: Uuid::new_v4(),
252        })
253    }
254
255    /// Add a custom request body serializer.
256    ///
257    /// The serializer function takes raw bytes and should return a `Result<TReq, String>`.
258    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
259    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
260    ///
261    /// # Panics
262    ///
263    /// This will panic if the serializer succeeds but the resulting `TReq` value cannot be
264    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
265    /// implementation of `TReq` and should be fixed by the caller.
266    pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
267    where
268        F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
269    {
270        self.request_serializer = Arc::new(serializer);
271        self
272    }
273
274    /// Add a custom response body serializer.
275    ///
276    /// The serializer function takes raw bytes and should return a `Result<TRes, String>`.
277    /// If the serializer succeeds, the result will be stored as JSONB and `body_parsed` will be true.
278    /// If it fails, the raw content will be stored as a UTF-8 string and `body_parsed` will be false.
279    ///
280    /// # Panics
281    ///
282    /// This will panic if the serializer succeeds but the resulting `TRes` value cannot be
283    /// converted to JSON via `serde_json::to_value()`. This indicates a bug in the `Serialize`
284    /// implementation of `TRes` and should be fixed by the caller.
285    pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
286    where
287        F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
288            + Send
289            + Sync
290            + 'static,
291    {
292        self.response_serializer = Arc::new(serializer);
293        self
294    }
295
296    /// Add path filtering to only log requests for specific URI prefixes.
297    ///
298    /// # Examples
299    ///
300    /// ```rust,no_run
301    /// use outlet_postgres::{PostgresHandler, PathFilter};
302    ///
303    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
304    /// use serde_json::Value;
305    /// let handler = PostgresHandler::<Value, Value>::new("postgresql://user:pass@localhost/db")
306    ///     .await?
307    ///     .with_path_filter(PathFilter {
308    ///         allowed_prefixes: vec!["/api/".to_string(), "/webhook/".to_string()],
309    ///         blocked_prefixes: vec!["/api/health".to_string()],
310    ///     });
311    /// # Ok(())
312    /// # }
313    /// ```
314    pub fn with_path_filter(mut self, filter: PathFilter) -> Self {
315        self.path_filter = Some(filter);
316        self
317    }
318
319    /// Add simple path prefix filtering - only log requests starting with the given prefix.
320    ///
321    /// # Examples
322    ///
323    /// ```rust,no_run
324    /// use outlet_postgres::PostgresHandler;
325    ///
326    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327    /// use serde_json::Value;
328    /// let handler = PostgresHandler::<Value, Value>::new("postgresql://user:pass@localhost/db")
329    ///     .await?
330    ///     .with_path_prefix("/api/");
331    /// # Ok(())
332    /// # }
333    /// ```
334    pub fn with_path_prefix(mut self, prefix: &str) -> Self {
335        self.path_filter = Some(PathFilter {
336            allowed_prefixes: vec![prefix.to_string()],
337            blocked_prefixes: vec![],
338        });
339        self
340    }
341
342    /// Create a PostgreSQL handler from an existing connection pool.
343    ///
344    /// Use this if you already have a connection pool and want to reuse it.
345    /// This will NOT run migrations - use `migrator()` to run migrations separately.
346    ///
347    /// # Arguments
348    ///
349    /// * `pool` - Existing PostgreSQL connection pool
350    ///
351    /// # Examples
352    ///
353    /// ```rust,no_run
354    /// use outlet_postgres::{PostgresHandler, migrator};
355    /// use sqlx::PgPool;
356    /// use serde::{Deserialize, Serialize};
357    ///
358    /// #[derive(Deserialize, Serialize)]
359    /// struct MyBodyType {
360    ///     id: u64,
361    ///     name: String,
362    /// }
363    ///
364    /// #[tokio::main]
365    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
366    ///     let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
367    ///     
368    ///     // Run migrations first
369    ///     migrator().run(&pool).await?;
370    ///     
371    ///     // Create handler
372    ///     let handler = PostgresHandler::<MyBodyType, MyBodyType>::from_pool(pool).await?;
373    ///     Ok(())
374    /// }
375    /// ```
376    pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
377        Ok(Self {
378            pool,
379            request_serializer: Self::default_request_serializer(),
380            response_serializer: Self::default_response_serializer(),
381            path_filter: None,
382            instance_id: Uuid::new_v4(),
383        })
384    }
385
386    /// Convert headers to a JSONB-compatible format.
387    fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
388        let mut header_map = HashMap::new();
389        for (name, values) in headers {
390            if values.len() == 1 {
391                let value_str = String::from_utf8_lossy(&values[0]).to_string();
392                header_map.insert(name.clone(), Value::String(value_str));
393            } else {
394                let value_array: Vec<Value> = values
395                    .iter()
396                    .map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
397                    .collect();
398                header_map.insert(name.clone(), Value::Array(value_array));
399            }
400        }
401        serde_json::to_value(header_map).unwrap_or(Value::Null)
402    }
403
404    /// Convert request data to a JSONB value using the configured serializer.
405    fn request_body_to_json_with_fallback(
406        &self,
407        request_data: &outlet::RequestData,
408    ) -> (Value, bool) {
409        match (self.request_serializer)(request_data) {
410            Ok(typed_value) => {
411                if let Ok(json_value) = serde_json::to_value(&typed_value) {
412                    (json_value, true)
413                } else {
414                    // This should never happen if the type implements Serialize correctly
415                    (
416                        Value::String(
417                            serde_json::to_string(&typed_value)
418                                .expect("Serialized value must be convertible to JSON string"),
419                        ),
420                        false,
421                    )
422                }
423            }
424            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
425        }
426    }
427
428    /// Convert response data to a JSONB value using the configured serializer.
429    fn response_body_to_json_with_fallback(
430        &self,
431        request_data: &outlet::RequestData,
432        response_data: &outlet::ResponseData,
433    ) -> (Value, bool) {
434        match (self.response_serializer)(request_data, response_data) {
435            Ok(typed_value) => {
436                if let Ok(json_value) = serde_json::to_value(&typed_value) {
437                    (json_value, true)
438                } else {
439                    // This should never happen if the type implements Serialize correctly
440                    (
441                        Value::String(
442                            serde_json::to_string(&typed_value)
443                                .expect("Serialized value must be convertible to JSON string"),
444                        ),
445                        false,
446                    )
447                }
448            }
449            Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
450        }
451    }
452
453    /// Check if a request URI should be logged based on the configured path filter.
454    fn should_log_request(&self, uri: &Uri) -> bool {
455        let path = uri.path();
456        debug!(%path, "Evaluating prefix");
457        let Some(filter) = &self.path_filter else {
458            return true; // No filter means log everything
459        };
460
461        // Check blocked prefixes first (they take precedence)
462        for blocked_prefix in &filter.blocked_prefixes {
463            if path.starts_with(blocked_prefix) {
464                return false;
465            }
466        }
467
468        // If no allowed prefixes specified, allow everything (after blocked check)
469        if filter.allowed_prefixes.is_empty() {
470            return true;
471        }
472
473        // Check if URI matches any allowed prefix
474        filter
475            .allowed_prefixes
476            .iter()
477            .any(|prefix| path.starts_with(prefix))
478    }
479
480    /// Get a repository for querying logged requests and responses.
481    ///
482    /// Returns a `RequestRepository` with the same type parameters as this handler,
483    /// allowing for type-safe querying of request and response bodies.
484    pub fn repository(&self) -> crate::repository::RequestRepository<TReq, TRes> {
485        crate::repository::RequestRepository::new(self.pool.clone())
486    }
487}
488
489impl<TReq, TRes> RequestHandler for PostgresHandler<TReq, TRes>
490where
491    TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
492    TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
493{
494    #[instrument(skip(self, data), fields(correlation_id = %data.correlation_id))]
495    async fn handle_request(&self, data: RequestData) {
496        // Check if this request should be logged
497        if !self.should_log_request(&data.uri) {
498            debug!(correlation_id = %data.correlation_id, uri = %data.uri, "Skipping request due to path filter");
499            return;
500        }
501
502        let headers_json = Self::headers_to_json(&data.headers);
503        let (body_json, parsed) = if data.body.is_some() {
504            let (json, parsed) = self.request_body_to_json_with_fallback(&data);
505            (Some(json), parsed)
506        } else {
507            (None, false)
508        };
509
510        let timestamp: DateTime<Utc> = data.timestamp.into();
511
512        let result = sqlx::query(
513            r#"
514            INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed)
515            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
516            "#,
517        )
518        .bind(self.instance_id)
519        .bind(data.correlation_id as i64)
520        .bind(timestamp)
521        .bind(data.method.to_string())
522        .bind(data.uri.to_string())
523        .bind(headers_json)
524        .bind(body_json)
525        .bind(parsed)
526        .execute(&self.pool)
527        .await;
528
529        if let Err(e) = result {
530            error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
531        } else {
532            debug!(correlation_id = %data.correlation_id, "Request data inserted successfully");
533        }
534    }
535
536    #[instrument(skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
537    async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
538        let headers_json = Self::headers_to_json(&response_data.headers);
539        let (body_json, parsed) = if response_data.body.is_some() {
540            let (json, parsed) =
541                self.response_body_to_json_with_fallback(&request_data, &response_data);
542            (Some(json), parsed)
543        } else {
544            (None, false)
545        };
546
547        let timestamp: DateTime<Utc> = response_data.timestamp.into();
548        let duration_ms = response_data.duration.as_millis() as i64;
549        let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
550
551        let result = sqlx::query(
552            r#"
553            INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
554            SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
555            WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
556            "#,
557        )
558        .bind(self.instance_id)
559        .bind(request_data.correlation_id as i64)
560        .bind(timestamp)
561        .bind(response_data.status.as_u16() as i32)
562        .bind(headers_json)
563        .bind(body_json)
564        .bind(parsed)
565        .bind(duration_to_first_byte_ms)
566        .bind(duration_ms)
567        .execute(&self.pool)
568        .await;
569
570        match result {
571            Err(e) => {
572                error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
573            }
574            Ok(query_result) => {
575                if query_result.rows_affected() > 0 {
576                    debug!(correlation_id = %request_data.correlation_id, "Response data inserted successfully");
577                } else {
578                    debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
579                }
580            }
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use bytes::Bytes;
589    use chrono::{DateTime, Utc};
590    use outlet::{RequestData, ResponseData};
591    use serde::{Deserialize, Serialize};
592    use serde_json::Value;
593    use sqlx::PgPool;
594    use std::collections::HashMap;
595    use std::time::{Duration, SystemTime};
596
597    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
598    struct TestRequest {
599        user_id: u64,
600        action: String,
601    }
602
603    #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
604    struct TestResponse {
605        success: bool,
606        message: String,
607    }
608
609    fn create_test_request_data() -> RequestData {
610        let mut headers = HashMap::new();
611        headers.insert("content-type".to_string(), vec!["application/json".into()]);
612        headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
613
614        let test_req = TestRequest {
615            user_id: 123,
616            action: "create_user".to_string(),
617        };
618        let body = serde_json::to_vec(&test_req).unwrap();
619
620        RequestData {
621            method: http::Method::POST,
622            uri: http::Uri::from_static("/api/users"),
623            headers,
624            body: Some(Bytes::from(body)),
625            timestamp: SystemTime::now(),
626            correlation_id: 0,
627        }
628    }
629
630    fn create_test_response_data() -> ResponseData {
631        let mut headers = HashMap::new();
632        headers.insert("content-type".to_string(), vec!["application/json".into()]);
633
634        let test_res = TestResponse {
635            success: true,
636            message: "User created successfully".to_string(),
637        };
638        let body = serde_json::to_vec(&test_res).unwrap();
639
640        ResponseData {
641            status: http::StatusCode::CREATED,
642            headers,
643            body: Some(Bytes::from(body)),
644            timestamp: SystemTime::now(),
645            duration_to_first_byte: Duration::from_millis(100),
646            duration: Duration::from_millis(150),
647            correlation_id: 0,
648        }
649    }
650
651    #[sqlx::test]
652    async fn test_handler_creation(pool: PgPool) {
653        // Run migrations first
654        crate::migrator().run(&pool).await.unwrap();
655
656        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
657            .await
658            .unwrap();
659
660        // Verify we can get a repository
661        let repository = handler.repository();
662
663        // Test initial state - no requests logged yet
664        let filter = RequestFilter::default();
665        let results = repository.query(filter).await.unwrap();
666        assert!(results.is_empty());
667    }
668
669    #[sqlx::test]
670    async fn test_handle_request_with_typed_body(pool: PgPool) {
671        // Run migrations first
672        crate::migrator().run(&pool).await.unwrap();
673
674        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
675            .await
676            .unwrap();
677        let repository = handler.repository();
678
679        let mut request_data = create_test_request_data();
680        let correlation_id = 12345;
681        request_data.correlation_id = correlation_id;
682
683        // Handle the request
684        handler.handle_request(request_data.clone()).await;
685
686        // Query back the request
687        let filter = RequestFilter {
688            correlation_id: Some(correlation_id as i64),
689            ..Default::default()
690        };
691        let results = repository.query(filter).await.unwrap();
692
693        assert_eq!(results.len(), 1);
694        let pair = &results[0];
695
696        assert_eq!(pair.request.correlation_id, correlation_id as i64);
697        assert_eq!(pair.request.method, "POST");
698        assert_eq!(pair.request.uri, "/api/users");
699
700        // Check that body was parsed successfully
701        match &pair.request.body {
702            Some(Ok(parsed_body)) => {
703                assert_eq!(
704                    *parsed_body,
705                    TestRequest {
706                        user_id: 123,
707                        action: "create_user".to_string(),
708                    }
709                );
710            }
711            _ => panic!("Expected successfully parsed request body"),
712        }
713
714        // Headers should be converted to JSON properly
715        let headers_value = &pair.request.headers;
716        assert!(headers_value.get("content-type").is_some());
717        assert!(headers_value.get("user-agent").is_some());
718
719        // No response yet
720        assert!(pair.response.is_none());
721    }
722
723    #[sqlx::test]
724    async fn test_handle_response_with_typed_body(pool: PgPool) {
725        // Run migrations first
726        crate::migrator().run(&pool).await.unwrap();
727
728        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
729            .await
730            .unwrap();
731        let repository = handler.repository();
732
733        let mut request_data = create_test_request_data();
734        let mut response_data = create_test_response_data();
735        let correlation_id = 54321;
736        request_data.correlation_id = correlation_id;
737        response_data.correlation_id = correlation_id;
738
739        // Handle both request and response
740        handler.handle_request(request_data.clone()).await;
741        handler
742            .handle_response(request_data, response_data.clone())
743            .await;
744
745        // Query back the complete pair
746        let filter = RequestFilter {
747            correlation_id: Some(correlation_id as i64),
748            ..Default::default()
749        };
750        let results = repository.query(filter).await.unwrap();
751
752        assert_eq!(results.len(), 1);
753        let pair = &results[0];
754
755        // Check response data
756        let response = pair.response.as_ref().expect("Response should be present");
757        assert_eq!(response.correlation_id, correlation_id as i64);
758        assert_eq!(response.status_code, 201);
759        assert_eq!(response.duration_ms, 150);
760
761        // Check that response body was parsed successfully
762        match &response.body {
763            Some(Ok(parsed_body)) => {
764                assert_eq!(
765                    *parsed_body,
766                    TestResponse {
767                        success: true,
768                        message: "User created successfully".to_string(),
769                    }
770                );
771            }
772            _ => panic!("Expected successfully parsed response body"),
773        }
774    }
775
776    #[sqlx::test]
777    async fn test_handle_unparseable_body_fallback(pool: PgPool) {
778        // Run migrations first
779        crate::migrator().run(&pool).await.unwrap();
780
781        let handler = PostgresHandler::<TestRequest, TestResponse>::from_pool(pool.clone())
782            .await
783            .unwrap();
784        let repository = handler.repository();
785
786        // Create request with invalid JSON for TestRequest
787        let mut headers = HashMap::new();
788        headers.insert("content-type".to_string(), vec!["text/plain".into()]);
789
790        let invalid_json_body = b"not valid json for TestRequest";
791        let correlation_id = 99999;
792        let request_data = RequestData {
793            method: http::Method::POST,
794            uri: http::Uri::from_static("/api/test"),
795            headers,
796            body: Some(Bytes::from(invalid_json_body.to_vec())),
797            timestamp: SystemTime::now(),
798            correlation_id,
799        };
800
801        handler.handle_request(request_data).await;
802
803        // Query back and verify fallback to base64
804        let filter = RequestFilter {
805            correlation_id: Some(correlation_id as i64),
806            ..Default::default()
807        };
808        let results = repository.query(filter).await.unwrap();
809
810        assert_eq!(results.len(), 1);
811        let pair = &results[0];
812
813        // Should fallback to raw bytes
814        match &pair.request.body {
815            Some(Err(raw_bytes)) => {
816                assert_eq!(raw_bytes.as_ref(), invalid_json_body);
817            }
818            _ => panic!("Expected raw bytes fallback for unparseable body"),
819        }
820    }
821
822    #[sqlx::test]
823    async fn test_query_with_multiple_filters(pool: PgPool) {
824        // Run migrations first
825        crate::migrator().run(&pool).await.unwrap();
826
827        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
828            .await
829            .unwrap();
830        let repository = handler.repository();
831
832        // Insert multiple requests with different characteristics
833        let test_cases = vec![
834            (1001, "GET", "/api/users", 200, 100),
835            (1002, "POST", "/api/users", 201, 150),
836            (1003, "GET", "/api/orders", 404, 50),
837            (1004, "PUT", "/api/users/123", 200, 300),
838        ];
839
840        for (correlation_id, method, uri, status, duration_ms) in test_cases {
841            let mut headers = HashMap::new();
842            headers.insert("content-type".to_string(), vec!["application/json".into()]);
843
844            let request_data = RequestData {
845                method: method.parse().unwrap(),
846                uri: uri.parse().unwrap(),
847                headers: headers.clone(),
848                body: Some(Bytes::from(b"{}".to_vec())),
849                timestamp: SystemTime::now(),
850                correlation_id,
851            };
852
853            let response_data = ResponseData {
854                correlation_id,
855                status: http::StatusCode::from_u16(status).unwrap(),
856                headers,
857                body: Some(Bytes::from(b"{}".to_vec())),
858                timestamp: SystemTime::now(),
859                duration_to_first_byte: Duration::from_millis(duration_ms / 2),
860                duration: Duration::from_millis(duration_ms),
861            };
862
863            handler.handle_request(request_data.clone()).await;
864            handler.handle_response(request_data, response_data).await;
865        }
866
867        // Test method filter
868        let filter = RequestFilter {
869            method: Some("GET".to_string()),
870            ..Default::default()
871        };
872        let results = repository.query(filter).await.unwrap();
873        assert_eq!(results.len(), 2); // 1001, 1003
874
875        // Test status code filter
876        let filter = RequestFilter {
877            status_code: Some(200),
878            ..Default::default()
879        };
880        let results = repository.query(filter).await.unwrap();
881        assert_eq!(results.len(), 2); // 1001, 1004
882
883        // Test URI pattern filter
884        let filter = RequestFilter {
885            uri_pattern: Some("/api/users%".to_string()),
886            ..Default::default()
887        };
888        let results = repository.query(filter).await.unwrap();
889        assert_eq!(results.len(), 3); // 1001, 1002, 1004
890
891        // Test duration range filter
892        let filter = RequestFilter {
893            min_duration_ms: Some(100),
894            max_duration_ms: Some(200),
895            ..Default::default()
896        };
897        let results = repository.query(filter).await.unwrap();
898        assert_eq!(results.len(), 2); // 1001, 1002
899
900        // Test combined filters
901        let filter = RequestFilter {
902            method: Some("GET".to_string()),
903            status_code: Some(200),
904            ..Default::default()
905        };
906        let results = repository.query(filter).await.unwrap();
907        assert_eq!(results.len(), 1); // Only 1001
908        assert_eq!(results[0].request.correlation_id, 1001);
909    }
910
911    #[sqlx::test]
912    async fn test_query_with_pagination_and_ordering(pool: PgPool) {
913        // Run migrations first
914        crate::migrator().run(&pool).await.unwrap();
915
916        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
917            .await
918            .unwrap();
919        let repository = handler.repository();
920
921        // Insert requests with known timestamps
922        let now = SystemTime::now();
923        for i in 0..5 {
924            let correlation_id = 2000 + i;
925            let timestamp = now + Duration::from_secs(i * 10); // 10 second intervals
926
927            let mut headers = HashMap::new();
928            headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
929
930            let request_data = RequestData {
931                method: http::Method::GET,
932                uri: "/api/test".parse().unwrap(),
933                headers,
934                body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
935                timestamp,
936                correlation_id,
937            };
938
939            handler.handle_request(request_data).await;
940        }
941
942        // Test default ordering (ASC) with limit
943        let filter = RequestFilter {
944            limit: Some(3),
945            ..Default::default()
946        };
947        let results = repository.query(filter).await.unwrap();
948        assert_eq!(results.len(), 3);
949
950        // Should be in ascending timestamp order
951        for i in 0..2 {
952            assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
953        }
954
955        // Test descending order with offset
956        let filter = RequestFilter {
957            order_by_timestamp_desc: true,
958            limit: Some(2),
959            offset: Some(1),
960            ..Default::default()
961        };
962        let results = repository.query(filter).await.unwrap();
963        assert_eq!(results.len(), 2);
964
965        // Should be in descending order, skipping the first (newest) one
966        assert!(results[0].request.timestamp >= results[1].request.timestamp);
967    }
968
969    #[sqlx::test]
970    async fn test_headers_conversion(pool: PgPool) {
971        // Run migrations first
972        crate::migrator().run(&pool).await.unwrap();
973
974        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
975            .await
976            .unwrap();
977        let repository = handler.repository();
978
979        // Test various header scenarios
980        let mut headers = HashMap::new();
981        headers.insert("single-value".to_string(), vec!["test".into()]);
982        headers.insert(
983            "multi-value".to_string(),
984            vec!["val1".into(), "val2".into()],
985        );
986        headers.insert("empty-value".to_string(), vec!["".into()]);
987
988        let request_data = RequestData {
989            correlation_id: 3000,
990            method: http::Method::GET,
991            uri: "/test".parse().unwrap(),
992            headers,
993            body: None,
994            timestamp: SystemTime::now(),
995        };
996
997        let correlation_id = 3000;
998        handler.handle_request(request_data).await;
999
1000        let filter = RequestFilter {
1001            correlation_id: Some(correlation_id as i64),
1002            ..Default::default()
1003        };
1004        let results = repository.query(filter).await.unwrap();
1005
1006        assert_eq!(results.len(), 1);
1007        let headers_json = &results[0].request.headers;
1008
1009        // Single value should be stored as string
1010        assert_eq!(
1011            headers_json["single-value"],
1012            Value::String("test".to_string())
1013        );
1014
1015        // Multi-value should be stored as array
1016        match &headers_json["multi-value"] {
1017            Value::Array(arr) => {
1018                assert_eq!(arr.len(), 2);
1019                assert_eq!(arr[0], Value::String("val1".to_string()));
1020                assert_eq!(arr[1], Value::String("val2".to_string()));
1021            }
1022            _ => panic!("Expected array for multi-value header"),
1023        }
1024
1025        // Empty value should still be a string
1026        assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
1027    }
1028
1029    #[sqlx::test]
1030    async fn test_timestamp_filtering(pool: PgPool) {
1031        // Run migrations first
1032        crate::migrator().run(&pool).await.unwrap();
1033
1034        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1035            .await
1036            .unwrap();
1037        let repository = handler.repository();
1038
1039        let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000); // Sept 2020
1040
1041        // Insert requests at different times
1042        let times = [
1043            base_time + Duration::from_secs(0),    // correlation_id 4001
1044            base_time + Duration::from_secs(3600), // correlation_id 4002 (1 hour later)
1045            base_time + Duration::from_secs(7200), // correlation_id 4003 (2 hours later)
1046        ];
1047
1048        for (i, timestamp) in times.iter().enumerate() {
1049            let correlation_id = 4001 + i as u64;
1050            let request_data = RequestData {
1051                method: http::Method::GET,
1052                uri: "/test".parse().unwrap(),
1053                headers: HashMap::new(),
1054                body: None,
1055                timestamp: *timestamp,
1056                correlation_id,
1057            };
1058
1059            handler.handle_request(request_data).await;
1060        }
1061
1062        // Test timestamp_after filter
1063        let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); // 30 min after first
1064        let filter = RequestFilter {
1065            timestamp_after: Some(after_time),
1066            ..Default::default()
1067        };
1068        let results = repository.query(filter).await.unwrap();
1069        assert_eq!(results.len(), 2); // Should get 4002 and 4003
1070
1071        // Test timestamp_before filter
1072        let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); // 1.5 hours after first
1073        let filter = RequestFilter {
1074            timestamp_before: Some(before_time),
1075            ..Default::default()
1076        };
1077        let results = repository.query(filter).await.unwrap();
1078        assert_eq!(results.len(), 2); // Should get 4001 and 4002
1079
1080        // Test timestamp range
1081        let filter = RequestFilter {
1082            timestamp_after: Some(after_time),
1083            timestamp_before: Some(before_time),
1084            ..Default::default()
1085        };
1086        let results = repository.query(filter).await.unwrap();
1087        assert_eq!(results.len(), 1); // Should get only 4002
1088        assert_eq!(results[0].request.correlation_id, 4002);
1089    }
1090
1091    #[sqlx::test]
1092    async fn test_path_filtering_allowed_prefix(pool: PgPool) {
1093        // Run migrations first
1094        crate::migrator().run(&pool).await.unwrap();
1095
1096        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1097            .await
1098            .unwrap()
1099            .with_path_prefix("/api/");
1100        let repository = handler.repository();
1101
1102        // Test requests - some should be logged, some shouldn't
1103        let test_cases = vec![
1104            ("/api/users", 1001, true),  // Should be logged
1105            ("/api/orders", 1002, true), // Should be logged
1106            ("/health", 1003, false),    // Should NOT be logged
1107            ("/metrics", 1004, false),   // Should NOT be logged
1108            ("/api/health", 1005, true), // Should be logged (starts with /api/)
1109        ];
1110
1111        for (uri, correlation_id, _should_log) in &test_cases {
1112            let mut headers = HashMap::new();
1113            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1114
1115            let request_data = RequestData {
1116                method: http::Method::GET,
1117                uri: uri.parse().unwrap(),
1118                headers: headers.clone(),
1119                body: Some(Bytes::from(b"{}".to_vec())),
1120                timestamp: SystemTime::now(),
1121                correlation_id: *correlation_id,
1122            };
1123
1124            let response_data = ResponseData {
1125                correlation_id: *correlation_id,
1126                status: http::StatusCode::OK,
1127                headers,
1128                body: Some(Bytes::from(b"{}".to_vec())),
1129                timestamp: SystemTime::now(),
1130                duration_to_first_byte: Duration::from_millis(80),
1131                duration: Duration::from_millis(100),
1132            };
1133
1134            handler.handle_request(request_data.clone()).await;
1135            handler.handle_response(request_data, response_data).await;
1136        }
1137
1138        // Query all requests and verify filtering worked
1139        let filter = RequestFilter::default();
1140        let results = repository.query(filter).await.unwrap();
1141
1142        // Should only have logged the /api/ requests (1001, 1002, 1005)
1143        assert_eq!(results.len(), 3);
1144        let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1145        assert!(logged_ids.contains(&1001));
1146        assert!(logged_ids.contains(&1002));
1147        assert!(logged_ids.contains(&1005));
1148        assert!(!logged_ids.contains(&1003));
1149        assert!(!logged_ids.contains(&1004));
1150
1151        // All logged requests should have corresponding responses
1152        for result in &results {
1153            assert!(result.response.is_some());
1154        }
1155    }
1156
1157    #[sqlx::test]
1158    async fn test_path_filtering_blocked_prefix(pool: PgPool) {
1159        // Run migrations first
1160        crate::migrator().run(&pool).await.unwrap();
1161
1162        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1163            .await
1164            .unwrap()
1165            .with_path_filter(PathFilter {
1166                allowed_prefixes: vec!["/api/".to_string()],
1167                blocked_prefixes: vec!["/api/health".to_string(), "/api/metrics".to_string()],
1168            });
1169        let repository = handler.repository();
1170
1171        // Test requests
1172        let test_cases = vec![
1173            ("http://localhost/api/users", 2001, true), // Should be logged
1174            ("http://localhost/api/health", 2002, false), // Should be BLOCKED
1175            ("http://localhost/api/metrics", 2003, false), // Should be BLOCKED
1176            ("http://localhost/api/orders", 2004, true), // Should be logged
1177            ("http://localhost/health", 2005, false),   // Not in allowed prefixes
1178        ];
1179
1180        for (uri, correlation_id, _should_log) in &test_cases {
1181            let mut headers = HashMap::new();
1182            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1183
1184            let request_data = RequestData {
1185                method: http::Method::GET,
1186                uri: uri.parse().unwrap(),
1187                headers: headers.clone(),
1188                body: Some(Bytes::from(b"{}".to_vec())),
1189                timestamp: SystemTime::now(),
1190                correlation_id: *correlation_id,
1191            };
1192
1193            let response_data = ResponseData {
1194                correlation_id: *correlation_id,
1195                status: http::StatusCode::OK,
1196                headers,
1197                body: Some(Bytes::from(b"{}".to_vec())),
1198                timestamp: SystemTime::now(),
1199                duration_to_first_byte: Duration::from_millis(80),
1200                duration: Duration::from_millis(100),
1201            };
1202
1203            handler.handle_request(request_data.clone()).await;
1204            handler.handle_response(request_data, response_data).await;
1205        }
1206
1207        // Should only have logged 2001 and 2004 (blocked prefixes take precedence)
1208        let filter = RequestFilter::default();
1209        let results = repository.query(filter).await.unwrap();
1210
1211        assert_eq!(results.len(), 2);
1212        let logged_ids: Vec<i64> = results.iter().map(|r| r.request.correlation_id).collect();
1213        assert!(logged_ids.contains(&2001));
1214        assert!(logged_ids.contains(&2004));
1215        assert!(!logged_ids.contains(&2002)); // blocked
1216        assert!(!logged_ids.contains(&2003)); // blocked
1217        assert!(!logged_ids.contains(&2005)); // not in allowed prefixes
1218    }
1219
1220    #[sqlx::test]
1221    async fn test_no_path_filtering_logs_everything(pool: PgPool) {
1222        // Run migrations first
1223        crate::migrator().run(&pool).await.unwrap();
1224
1225        // Handler without any path filtering
1226        let handler = PostgresHandler::<Value, Value>::from_pool(pool.clone())
1227            .await
1228            .unwrap();
1229        let repository = handler.repository();
1230
1231        let test_uris = vec!["/api/users", "/health", "/metrics", "/random/path"];
1232        for (i, uri) in test_uris.iter().enumerate() {
1233            let correlation_id = 3000 + i as u64;
1234            let mut headers = HashMap::new();
1235            headers.insert("content-type".to_string(), vec!["application/json".into()]);
1236
1237            let request_data = RequestData {
1238                method: http::Method::GET,
1239                uri: uri.parse().unwrap(),
1240                headers,
1241                body: Some(Bytes::from(b"{}".to_vec())),
1242                timestamp: SystemTime::now(),
1243                correlation_id,
1244            };
1245
1246            handler.handle_request(request_data).await;
1247        }
1248
1249        // Should have logged all 4 requests
1250        let filter = RequestFilter::default();
1251        let results = repository.query(filter).await.unwrap();
1252        assert_eq!(results.len(), 4);
1253    }
1254}