#[derive(Debug)]
pub struct SerializationError {
pub fallback_data: String,
pub error: Box<dyn std::error::Error + Send + Sync>,
}
impl SerializationError {
pub fn new(
fallback_data: String,
error: impl std::error::Error + Send + Sync + 'static,
) -> Self {
Self {
fallback_data,
error: Box::new(error),
}
}
}
impl std::fmt::Display for SerializationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Serialization failed: {}", self.error)
}
}
impl std::error::Error for SerializationError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.error.as_ref())
}
}
use chrono::{DateTime, Utc};
use metrics::{counter, histogram};
use outlet::{RequestData, RequestHandler, ResponseData};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use tracing::{debug, error, instrument, warn};
use uuid::Uuid;
pub mod error;
pub mod repository;
pub use error::PostgresHandlerError;
pub use repository::{
HttpRequest, HttpResponse, RequestFilter, RequestRepository, RequestResponsePair,
};
pub use sqlx_pool_router::{DbPools, PoolProvider, TestDbPools};
pub fn migrator() -> sqlx::migrate::Migrator {
sqlx::migrate!("./migrations")
}
type RequestSerializer<T> =
Arc<dyn Fn(&outlet::RequestData) -> Result<T, SerializationError> + Send + Sync>;
type ResponseSerializer<T> = Arc<
dyn Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<T, SerializationError>
+ Send
+ Sync,
>;
#[derive(Clone)]
pub struct PostgresHandler<P = PgPool, TReq = Value, TRes = Value>
where
P: PoolProvider,
TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
{
pool: P,
request_serializer: RequestSerializer<TReq>,
response_serializer: ResponseSerializer<TRes>,
instance_id: Uuid,
}
impl<P, TReq, TRes> PostgresHandler<P, TReq, TRes>
where
P: PoolProvider,
TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
{
fn default_request_serializer() -> RequestSerializer<TReq> {
Arc::new(|request_data| {
let bytes = request_data.body.as_deref().unwrap_or(&[]);
serde_json::from_slice::<TReq>(bytes).map_err(|error| {
let fallback_data = String::from_utf8_lossy(bytes).to_string();
SerializationError::new(fallback_data, error)
})
})
}
fn default_response_serializer() -> ResponseSerializer<TRes> {
Arc::new(|_request_data, response_data| {
let bytes = response_data.body.as_deref().unwrap_or(&[]);
serde_json::from_slice::<TRes>(bytes).map_err(|error| {
let fallback_data = String::from_utf8_lossy(bytes).to_string();
SerializationError::new(fallback_data, error)
})
})
}
pub fn with_request_serializer<F>(mut self, serializer: F) -> Self
where
F: Fn(&outlet::RequestData) -> Result<TReq, SerializationError> + Send + Sync + 'static,
{
self.request_serializer = Arc::new(serializer);
self
}
pub fn with_response_serializer<F>(mut self, serializer: F) -> Self
where
F: Fn(&outlet::RequestData, &outlet::ResponseData) -> Result<TRes, SerializationError>
+ Send
+ Sync
+ 'static,
{
self.response_serializer = Arc::new(serializer);
self
}
pub async fn from_pool_provider(pool_provider: P) -> Result<Self, PostgresHandlerError> {
Ok(Self {
pool: pool_provider,
request_serializer: Self::default_request_serializer(),
response_serializer: Self::default_response_serializer(),
instance_id: Uuid::new_v4(),
})
}
fn headers_to_json(headers: &HashMap<String, Vec<bytes::Bytes>>) -> Value {
let mut header_map = HashMap::new();
for (name, values) in headers {
if values.len() == 1 {
let value_str = String::from_utf8_lossy(&values[0]).to_string();
header_map.insert(name.clone(), Value::String(value_str));
} else {
let value_array: Vec<Value> = values
.iter()
.map(|v| Value::String(String::from_utf8_lossy(v).to_string()))
.collect();
header_map.insert(name.clone(), Value::Array(value_array));
}
}
serde_json::to_value(header_map).unwrap_or(Value::Null)
}
fn request_body_to_json_with_fallback(
&self,
request_data: &outlet::RequestData,
) -> (Value, bool) {
match (self.request_serializer)(request_data) {
Ok(typed_value) => {
if let Ok(json_value) = serde_json::to_value(&typed_value) {
(json_value, true)
} else {
(
Value::String(
serde_json::to_string(&typed_value)
.expect("Serialized value must be convertible to JSON string"),
),
false,
)
}
}
Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
}
}
fn response_body_to_json_with_fallback(
&self,
request_data: &outlet::RequestData,
response_data: &outlet::ResponseData,
) -> (Value, bool) {
match (self.response_serializer)(request_data, response_data) {
Ok(typed_value) => {
if let Ok(json_value) = serde_json::to_value(&typed_value) {
(json_value, true)
} else {
(
Value::String(
serde_json::to_string(&typed_value)
.expect("Serialized value must be convertible to JSON string"),
),
false,
)
}
}
Err(serialization_error) => (Value::String(serialization_error.fallback_data), false),
}
}
pub fn repository(&self) -> crate::repository::RequestRepository<P, TReq, TRes> {
crate::repository::RequestRepository::new(self.pool.clone())
}
}
impl<TReq, TRes> PostgresHandler<PgPool, TReq, TRes>
where
TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
{
pub async fn new(database_url: &str) -> Result<Self, PostgresHandlerError> {
let pool = PgPool::connect(database_url)
.await
.map_err(PostgresHandlerError::Connection)?;
Ok(Self {
pool,
request_serializer: Self::default_request_serializer(),
response_serializer: Self::default_response_serializer(),
instance_id: Uuid::new_v4(),
})
}
pub async fn from_pool(pool: PgPool) -> Result<Self, PostgresHandlerError> {
Self::from_pool_provider(pool).await
}
}
impl<P, TReq, TRes> RequestHandler for PostgresHandler<P, TReq, TRes>
where
P: PoolProvider,
TReq: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
TRes: for<'de> Deserialize<'de> + Serialize + Send + Sync + 'static,
{
#[instrument(name = "outlet.handle_request", skip(self, data), fields(correlation_id = %data.correlation_id))]
async fn handle_request(&self, data: RequestData) {
let headers_json = Self::headers_to_json(&data.headers);
let (body_json, parsed) = if data.body.is_some() {
let (json, parsed) = self.request_body_to_json_with_fallback(&data);
(Some(json), parsed)
} else {
(None, false)
};
let timestamp: DateTime<Utc> = data.timestamp.into();
let query_start = Instant::now();
let result = sqlx::query(
r#"
INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(self.instance_id)
.bind(data.correlation_id as i64)
.bind(timestamp)
.bind(data.method.to_string())
.bind(data.uri.to_string())
.bind(headers_json)
.bind(body_json)
.bind(parsed)
.bind(&data.trace_id)
.bind(&data.span_id)
.execute(self.pool.write())
.await;
let query_duration = query_start.elapsed();
histogram!("outlet_write_duration_seconds", "operation" => "request")
.record(query_duration.as_secs_f64());
if let Err(e) = result {
counter!("outlet_write_errors_total", "operation" => "request").increment(1);
error!(correlation_id = %data.correlation_id, error = %e, "Failed to insert request data");
} else {
let processing_lag_ms = SystemTime::now()
.duration_since(data.timestamp)
.unwrap_or_default()
.as_millis();
if processing_lag_ms > 1000 {
warn!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged (slow)");
} else {
debug!(correlation_id = %data.correlation_id, method = %data.method, uri = %data.uri, lag_ms = %processing_lag_ms, "Request logged");
}
}
}
#[instrument(name = "outlet.handle_response", skip(self, request_data, response_data), fields(correlation_id = %request_data.correlation_id))]
async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) {
let headers_json = Self::headers_to_json(&response_data.headers);
let (body_json, parsed) = if response_data.body.is_some() {
let (json, parsed) =
self.response_body_to_json_with_fallback(&request_data, &response_data);
(Some(json), parsed)
} else {
(None, false)
};
let timestamp: DateTime<Utc> = response_data.timestamp.into();
let duration_ms = response_data.duration.as_millis() as i64;
let duration_to_first_byte_ms = response_data.duration_to_first_byte.as_millis() as i64;
let query_start = Instant::now();
let result = sqlx::query(
r#"
INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9
WHERE EXISTS (SELECT 1 FROM http_requests WHERE instance_id = $1 AND correlation_id = $2)
"#,
)
.bind(self.instance_id)
.bind(request_data.correlation_id as i64)
.bind(timestamp)
.bind(response_data.status.as_u16() as i32)
.bind(headers_json)
.bind(body_json)
.bind(parsed)
.bind(duration_to_first_byte_ms)
.bind(duration_ms)
.execute(self.pool.write())
.await;
let query_duration = query_start.elapsed();
histogram!("outlet_write_duration_seconds", "operation" => "response")
.record(query_duration.as_secs_f64());
match result {
Err(e) => {
counter!("outlet_write_errors_total", "operation" => "response").increment(1);
error!(correlation_id = %request_data.correlation_id, error = %e, "Failed to insert response data");
}
Ok(query_result) => {
if query_result.rows_affected() > 0 {
let processing_lag_ms = SystemTime::now()
.duration_since(response_data.timestamp)
.unwrap_or_default()
.as_millis();
if processing_lag_ms > 1000 {
warn!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged (slow)");
} else {
debug!(correlation_id = %request_data.correlation_id, status = %response_data.status, duration_ms = %duration_ms, lag_ms = %processing_lag_ms, "Response logged");
}
} else {
debug!(correlation_id = %request_data.correlation_id, "No matching request found for response, skipping insert")
}
}
}
}
#[instrument(name = "outlet.handle_request_batch", skip(self, batch), fields(batch_size = batch.len()))]
async fn handle_request_batch(&self, batch: &[RequestData]) {
if batch.is_empty() {
return;
}
let len = batch.len();
let mut instance_ids = Vec::with_capacity(len);
let mut correlation_ids = Vec::with_capacity(len);
let mut timestamps = Vec::with_capacity(len);
let mut methods = Vec::with_capacity(len);
let mut uris = Vec::with_capacity(len);
let mut headers_col: Vec<Value> = Vec::with_capacity(len);
let mut bodies: Vec<Option<Value>> = Vec::with_capacity(len);
let mut body_parsed_col = Vec::with_capacity(len);
let mut trace_ids: Vec<Option<String>> = Vec::with_capacity(len);
let mut span_ids: Vec<Option<String>> = Vec::with_capacity(len);
for data in batch {
instance_ids.push(self.instance_id);
correlation_ids.push(data.correlation_id as i64);
timestamps.push(DateTime::<Utc>::from(data.timestamp));
methods.push(data.method.to_string());
uris.push(data.uri.to_string());
headers_col.push(Self::headers_to_json(&data.headers));
let (body_json, parsed) = if data.body.is_some() {
let (json, parsed) = self.request_body_to_json_with_fallback(data);
(Some(json), parsed)
} else {
(None, false)
};
bodies.push(body_json);
body_parsed_col.push(parsed);
trace_ids.push(data.trace_id.clone());
span_ids.push(data.span_id.clone());
}
let query_start = Instant::now();
let result = sqlx::query(
r#"
INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed, trace_id, span_id)
SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::timestamptz[], $4::varchar[], $5::text[], $6::jsonb[], $7::jsonb[], $8::boolean[], $9::varchar[], $10::varchar[])
"#,
)
.bind(&instance_ids)
.bind(&correlation_ids)
.bind(×tamps)
.bind(&methods)
.bind(&uris)
.bind(&headers_col)
.bind(&bodies)
.bind(&body_parsed_col)
.bind(&trace_ids)
.bind(&span_ids)
.execute(self.pool.write())
.await;
let query_duration = query_start.elapsed();
histogram!("outlet_write_duration_seconds", "operation" => "request_batch")
.record(query_duration.as_secs_f64());
match result {
Ok(r) => {
debug!(
rows = r.rows_affected(),
duration_ms = query_duration.as_millis() as u64,
"Request batch inserted"
);
}
Err(e) => {
counter!("outlet_write_errors_total", "operation" => "request_batch").increment(1);
error!(batch_size = len, error = %e, "Failed to bulk insert request batch");
}
}
}
#[instrument(name = "outlet.handle_response_batch", skip(self, batch), fields(batch_size = batch.len()))]
async fn handle_response_batch(&self, batch: &[(RequestData, ResponseData)]) {
if batch.is_empty() {
return;
}
let len = batch.len();
let mut instance_ids = Vec::with_capacity(len);
let mut correlation_ids = Vec::with_capacity(len);
let mut timestamps = Vec::with_capacity(len);
let mut status_codes = Vec::with_capacity(len);
let mut headers_col: Vec<Value> = Vec::with_capacity(len);
let mut bodies: Vec<Option<Value>> = Vec::with_capacity(len);
let mut body_parsed_col = Vec::with_capacity(len);
let mut duration_to_first_byte_ms_col = Vec::with_capacity(len);
let mut duration_ms_col = Vec::with_capacity(len);
for (request_data, response_data) in batch {
instance_ids.push(self.instance_id);
correlation_ids.push(request_data.correlation_id as i64);
timestamps.push(DateTime::<Utc>::from(response_data.timestamp));
status_codes.push(response_data.status.as_u16() as i32);
headers_col.push(Self::headers_to_json(&response_data.headers));
let (body_json, parsed) = if response_data.body.is_some() {
let (json, parsed) =
self.response_body_to_json_with_fallback(request_data, response_data);
(Some(json), parsed)
} else {
(None, false)
};
bodies.push(body_json);
body_parsed_col.push(parsed);
duration_to_first_byte_ms_col
.push(response_data.duration_to_first_byte.as_millis() as i64);
duration_ms_col.push(response_data.duration.as_millis() as i64);
}
let query_start = Instant::now();
let result = sqlx::query(
r#"
INSERT INTO http_responses (instance_id, correlation_id, timestamp, status_code, headers, body, body_parsed, duration_to_first_byte_ms, duration_ms)
SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::timestamptz[], $4::int[], $5::jsonb[], $6::jsonb[], $7::boolean[], $8::bigint[], $9::bigint[])
"#,
)
.bind(&instance_ids)
.bind(&correlation_ids)
.bind(×tamps)
.bind(&status_codes)
.bind(&headers_col)
.bind(&bodies)
.bind(&body_parsed_col)
.bind(&duration_to_first_byte_ms_col)
.bind(&duration_ms_col)
.execute(self.pool.write())
.await;
let query_duration = query_start.elapsed();
histogram!("outlet_write_duration_seconds", "operation" => "response_batch")
.record(query_duration.as_secs_f64());
match result {
Ok(r) => {
debug!(
rows = r.rows_affected(),
duration_ms = query_duration.as_millis() as u64,
"Response batch inserted"
);
}
Err(e) => {
counter!("outlet_write_errors_total", "operation" => "response_batch").increment(1);
error!(batch_size = len, error = %e, "Failed to bulk insert response batch");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use outlet::{RequestData, ResponseData};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::PgPool;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
struct TestRequest {
user_id: u64,
action: String,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
struct TestResponse {
success: bool,
message: String,
}
fn create_test_request_data() -> RequestData {
let mut headers = HashMap::new();
headers.insert("content-type".to_string(), vec!["application/json".into()]);
headers.insert("user-agent".to_string(), vec!["test-client/1.0".into()]);
let test_req = TestRequest {
user_id: 123,
action: "create_user".to_string(),
};
let body = serde_json::to_vec(&test_req).unwrap();
RequestData {
method: http::Method::POST,
uri: http::Uri::from_static("/api/users"),
headers,
body: Some(Bytes::from(body)),
timestamp: SystemTime::now(),
correlation_id: 0,
trace_id: None,
span_id: None,
}
}
fn create_test_response_data() -> ResponseData {
let mut headers = HashMap::new();
headers.insert("content-type".to_string(), vec!["application/json".into()]);
let test_res = TestResponse {
success: true,
message: "User created successfully".to_string(),
};
let body = serde_json::to_vec(&test_res).unwrap();
ResponseData {
status: http::StatusCode::CREATED,
headers,
body: Some(Bytes::from(body)),
timestamp: SystemTime::now(),
duration_to_first_byte: Duration::from_millis(100),
duration: Duration::from_millis(150),
correlation_id: 0,
}
}
#[sqlx::test]
async fn test_handler_creation(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let filter = RequestFilter::default();
let results = repository.query(filter).await.unwrap();
assert!(results.is_empty());
}
#[sqlx::test]
async fn test_handle_request_with_typed_body(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let mut request_data = create_test_request_data();
let correlation_id = 12345;
request_data.correlation_id = correlation_id;
handler.handle_request(request_data.clone()).await;
let filter = RequestFilter {
correlation_id: Some(correlation_id as i64),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1);
let pair = &results[0];
assert_eq!(pair.request.correlation_id, correlation_id as i64);
assert_eq!(pair.request.method, "POST");
assert_eq!(pair.request.uri, "/api/users");
match &pair.request.body {
Some(Ok(parsed_body)) => {
assert_eq!(
*parsed_body,
TestRequest {
user_id: 123,
action: "create_user".to_string(),
}
);
}
_ => panic!("Expected successfully parsed request body"),
}
let headers_value = &pair.request.headers;
assert!(headers_value.get("content-type").is_some());
assert!(headers_value.get("user-agent").is_some());
assert!(pair.response.is_none());
}
#[sqlx::test]
async fn test_handle_response_with_typed_body(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let mut request_data = create_test_request_data();
let mut response_data = create_test_response_data();
let correlation_id = 54321;
request_data.correlation_id = correlation_id;
response_data.correlation_id = correlation_id;
handler.handle_request(request_data.clone()).await;
handler
.handle_response(request_data, response_data.clone())
.await;
let filter = RequestFilter {
correlation_id: Some(correlation_id as i64),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1);
let pair = &results[0];
let response = pair.response.as_ref().expect("Response should be present");
assert_eq!(response.correlation_id, correlation_id as i64);
assert_eq!(response.status_code, 201);
assert_eq!(response.duration_ms, 150);
match &response.body {
Some(Ok(parsed_body)) => {
assert_eq!(
*parsed_body,
TestResponse {
success: true,
message: "User created successfully".to_string(),
}
);
}
_ => panic!("Expected successfully parsed response body"),
}
}
#[sqlx::test]
async fn test_handle_unparseable_body_fallback(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let mut headers = HashMap::new();
headers.insert("content-type".to_string(), vec!["text/plain".into()]);
let invalid_json_body = b"not valid json for TestRequest";
let correlation_id = 99999;
let request_data = RequestData {
method: http::Method::POST,
uri: http::Uri::from_static("/api/test"),
headers,
body: Some(Bytes::from(invalid_json_body.to_vec())),
timestamp: SystemTime::now(),
correlation_id,
trace_id: None,
span_id: None,
};
handler.handle_request(request_data).await;
let filter = RequestFilter {
correlation_id: Some(correlation_id as i64),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1);
let pair = &results[0];
match &pair.request.body {
Some(Err(raw_bytes)) => {
assert_eq!(raw_bytes.as_ref(), invalid_json_body);
}
_ => panic!("Expected raw bytes fallback for unparseable body"),
}
}
#[sqlx::test]
async fn test_query_with_multiple_filters(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let test_cases = vec![
(1001, "GET", "/api/users", 200, 100),
(1002, "POST", "/api/users", 201, 150),
(1003, "GET", "/api/orders", 404, 50),
(1004, "PUT", "/api/users/123", 200, 300),
];
for (correlation_id, method, uri, status, duration_ms) in test_cases {
let mut headers = HashMap::new();
headers.insert("content-type".to_string(), vec!["application/json".into()]);
let request_data = RequestData {
method: method.parse().unwrap(),
uri: uri.parse().unwrap(),
headers: headers.clone(),
body: Some(Bytes::from(b"{}".to_vec())),
timestamp: SystemTime::now(),
correlation_id,
trace_id: None,
span_id: None,
};
let response_data = ResponseData {
correlation_id,
status: http::StatusCode::from_u16(status).unwrap(),
headers,
body: Some(Bytes::from(b"{}".to_vec())),
timestamp: SystemTime::now(),
duration_to_first_byte: Duration::from_millis(duration_ms / 2),
duration: Duration::from_millis(duration_ms),
};
handler.handle_request(request_data.clone()).await;
handler.handle_response(request_data, response_data).await;
}
let filter = RequestFilter {
method: Some("GET".to_string()),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 2);
let filter = RequestFilter {
status_code: Some(200),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 2);
let filter = RequestFilter {
uri_pattern: Some("/api/users%".to_string()),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 3);
let filter = RequestFilter {
min_duration_ms: Some(100),
max_duration_ms: Some(200),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 2);
let filter = RequestFilter {
method: Some("GET".to_string()),
status_code: Some(200),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 1001);
}
#[sqlx::test]
async fn test_query_with_pagination_and_ordering(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let now = SystemTime::now();
for i in 0..5 {
let correlation_id = 2000 + i;
let timestamp = now + Duration::from_secs(i * 10);
let mut headers = HashMap::new();
headers.insert("x-test-id".to_string(), vec![i.to_string().into()]);
let request_data = RequestData {
method: http::Method::GET,
uri: "/api/test".parse().unwrap(),
headers,
body: Some(Bytes::from(format!("{{\"id\": {i}}}").into_bytes())),
timestamp,
correlation_id,
trace_id: None,
span_id: None,
};
handler.handle_request(request_data).await;
}
let filter = RequestFilter {
limit: Some(3),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 3);
for i in 0..2 {
assert!(results[i].request.timestamp <= results[i + 1].request.timestamp);
}
let filter = RequestFilter {
order_by_timestamp_desc: true,
limit: Some(2),
offset: Some(1),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].request.timestamp >= results[1].request.timestamp);
}
#[sqlx::test]
async fn test_headers_conversion(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let mut headers = HashMap::new();
headers.insert("single-value".to_string(), vec!["test".into()]);
headers.insert(
"multi-value".to_string(),
vec!["val1".into(), "val2".into()],
);
headers.insert("empty-value".to_string(), vec!["".into()]);
let request_data = RequestData {
correlation_id: 3000,
method: http::Method::GET,
uri: "/test".parse().unwrap(),
headers,
body: None,
timestamp: SystemTime::now(),
trace_id: None,
span_id: None,
};
let correlation_id = 3000;
handler.handle_request(request_data).await;
let filter = RequestFilter {
correlation_id: Some(correlation_id as i64),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1);
let headers_json = &results[0].request.headers;
assert_eq!(
headers_json["single-value"],
Value::String("test".to_string())
);
match &headers_json["multi-value"] {
Value::Array(arr) => {
assert_eq!(arr.len(), 2);
assert_eq!(arr[0], Value::String("val1".to_string()));
assert_eq!(arr[1], Value::String("val2".to_string()));
}
_ => panic!("Expected array for multi-value header"),
}
assert_eq!(headers_json["empty-value"], Value::String("".to_string()));
}
#[sqlx::test]
async fn test_timestamp_filtering(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1_600_000_000);
let times = [
base_time + Duration::from_secs(0), base_time + Duration::from_secs(3600), base_time + Duration::from_secs(7200), ];
for (i, timestamp) in times.iter().enumerate() {
let correlation_id = 4001 + i as u64;
let request_data = RequestData {
method: http::Method::GET,
uri: "/test".parse().unwrap(),
headers: HashMap::new(),
body: None,
timestamp: *timestamp,
correlation_id,
trace_id: None,
span_id: None,
};
handler.handle_request(request_data).await;
}
let after_time: DateTime<Utc> = (base_time + Duration::from_secs(1800)).into(); let filter = RequestFilter {
timestamp_after: Some(after_time),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 2);
let before_time: DateTime<Utc> = (base_time + Duration::from_secs(5400)).into(); let filter = RequestFilter {
timestamp_before: Some(before_time),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 2);
let filter = RequestFilter {
timestamp_after: Some(after_time),
timestamp_before: Some(before_time),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1); assert_eq!(results[0].request.correlation_id, 4002);
}
#[sqlx::test]
async fn test_no_path_filtering_logs_everything(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, Value, Value>::from_pool(pool.clone())
.await
.unwrap();
let repository = handler.repository();
let test_uris = ["/api/users", "/health", "/metrics", "/random/path"];
for (i, uri) in test_uris.iter().enumerate() {
let correlation_id = 3000 + i as u64;
let mut headers = HashMap::new();
headers.insert("content-type".to_string(), vec!["application/json".into()]);
let request_data = RequestData {
method: http::Method::GET,
uri: uri.parse().unwrap(),
headers,
body: Some(Bytes::from(b"{}".to_vec())),
timestamp: SystemTime::now(),
correlation_id,
trace_id: None,
span_id: None,
};
handler.handle_request(request_data).await;
}
let filter = RequestFilter::default();
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 4);
}
#[sqlx::test]
async fn test_write_operations_use_write_pool(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let test_pools = crate::TestDbPools::new(pool).await.unwrap();
let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
.await
.unwrap();
let mut request_data = create_test_request_data();
let correlation_id = 5001;
request_data.correlation_id = correlation_id;
handler.handle_request(request_data.clone()).await;
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM http_requests WHERE correlation_id = $1")
.bind(correlation_id as i64)
.fetch_one(test_pools.write())
.await
.unwrap();
assert_eq!(count, 1, "Request should be written to primary pool");
}
#[sqlx::test]
async fn test_response_write_uses_write_pool(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let test_pools = crate::TestDbPools::new(pool).await.unwrap();
let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
.await
.unwrap();
let mut request_data = create_test_request_data();
let mut response_data = create_test_response_data();
let correlation_id = 5002;
request_data.correlation_id = correlation_id;
response_data.correlation_id = correlation_id;
handler.handle_request(request_data.clone()).await;
handler.handle_response(request_data, response_data).await;
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM http_responses WHERE correlation_id = $1")
.bind(correlation_id as i64)
.fetch_one(test_pools.write())
.await
.unwrap();
assert_eq!(count, 1, "Response should be written to primary pool");
}
#[sqlx::test]
async fn test_repository_queries_use_read_pool(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let test_pools = crate::TestDbPools::new(pool).await.unwrap();
let handler = PostgresHandler::<_, Value, Value>::from_pool_provider(test_pools.clone())
.await
.unwrap();
let mut request_data = create_test_request_data();
let correlation_id = 5003;
request_data.correlation_id = correlation_id;
handler.handle_request(request_data).await;
let repository = handler.repository();
let filter = RequestFilter {
correlation_id: Some(correlation_id as i64),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].request.correlation_id, correlation_id as i64);
}
#[sqlx::test]
async fn test_replica_pool_rejects_writes(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let test_pools = crate::TestDbPools::new(pool).await.unwrap();
let result = sqlx::query("INSERT INTO http_requests (instance_id, correlation_id, timestamp, method, uri, headers, body, body_parsed) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
.bind(Uuid::new_v4())
.bind(9999i64)
.bind(Utc::now())
.bind("GET")
.bind("/test")
.bind(serde_json::json!({}))
.bind(None::<Value>)
.bind(false)
.execute(test_pools.read())
.await;
assert!(
result.is_err(),
"Replica pool should reject write operations"
);
let err = result.unwrap_err();
let err_msg = err.to_string().to_lowercase();
assert!(
err_msg.contains("read-only") || err_msg.contains("read only"),
"Error should mention read-only: {}",
err
);
}
#[sqlx::test]
async fn test_full_request_response_cycle_with_read_write_separation(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let test_pools = crate::TestDbPools::new(pool).await.unwrap();
let handler =
PostgresHandler::<_, TestRequest, TestResponse>::from_pool_provider(test_pools)
.await
.unwrap();
let mut request_data = create_test_request_data();
let mut response_data = create_test_response_data();
let correlation_id = 5004;
request_data.correlation_id = correlation_id;
response_data.correlation_id = correlation_id;
handler.handle_request(request_data.clone()).await;
handler.handle_response(request_data, response_data).await;
let repository = handler.repository();
let filter = RequestFilter {
correlation_id: Some(correlation_id as i64),
..Default::default()
};
let results = repository.query(filter).await.unwrap();
assert_eq!(results.len(), 1);
let pair = &results[0];
assert_eq!(pair.request.correlation_id, correlation_id as i64);
assert_eq!(pair.request.method, "POST");
assert_eq!(pair.request.uri, "/api/users");
let response = pair.response.as_ref().expect("Response should exist");
assert_eq!(response.correlation_id, correlation_id as i64);
assert_eq!(response.status_code, 201);
match &pair.request.body {
Some(Ok(parsed_body)) => {
assert_eq!(
*parsed_body,
TestRequest {
user_id: 123,
action: "create_user".to_string(),
}
);
}
_ => panic!("Expected successfully parsed request body"),
}
match &response.body {
Some(Ok(parsed_body)) => {
assert_eq!(
*parsed_body,
TestResponse {
success: true,
message: "User created successfully".to_string(),
}
);
}
_ => panic!("Expected successfully parsed response body"),
}
}
#[sqlx::test]
async fn test_request_batch_insert(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let mut batch = Vec::new();
for i in 0..5 {
let mut req = create_test_request_data();
req.correlation_id = 1000 + i;
req.uri = format!("/api/batch/{i}").parse().unwrap();
batch.push(req);
}
handler.handle_request_batch(&batch).await;
let count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM http_requests WHERE correlation_id BETWEEN 1000 AND 1004",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 5);
}
#[sqlx::test]
async fn test_response_batch_insert(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let mut pairs = Vec::new();
for i in 0..3 {
let mut req = create_test_request_data();
req.correlation_id = 2000 + i;
handler.handle_request(req.clone()).await;
let mut res = create_test_response_data();
res.correlation_id = 2000 + i;
pairs.push((req, res));
}
handler.handle_response_batch(&pairs).await;
let count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM http_responses WHERE correlation_id BETWEEN 2000 AND 2002",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 3);
}
#[sqlx::test]
async fn test_batch_with_mixed_bodies(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
let mut batch = Vec::new();
let mut req_with_body = create_test_request_data();
req_with_body.correlation_id = 3000;
batch.push(req_with_body);
let mut req_no_body = create_test_request_data();
req_no_body.correlation_id = 3001;
req_no_body.body = None;
batch.push(req_no_body);
let mut req_bad_body = create_test_request_data();
req_bad_body.correlation_id = 3002;
req_bad_body.body = Some(Bytes::from("not valid json"));
batch.push(req_bad_body);
handler.handle_request_batch(&batch).await;
let count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM http_requests WHERE correlation_id BETWEEN 3000 AND 3002",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 3);
let rows: Vec<(i64, Option<bool>)> = sqlx::query_as(
"SELECT correlation_id, body_parsed FROM http_requests WHERE correlation_id BETWEEN 3000 AND 3002 ORDER BY correlation_id",
)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows[0].1, Some(true)); assert_eq!(rows[1].1, Some(false)); assert_eq!(rows[2].1, Some(false)); }
#[sqlx::test]
async fn test_empty_batch_is_noop(pool: PgPool) {
crate::migrator().run(&pool).await.unwrap();
let handler = PostgresHandler::<PgPool, TestRequest, TestResponse>::from_pool(pool.clone())
.await
.unwrap();
handler.handle_request_batch(&[]).await;
handler.handle_response_batch(&[]).await;
}
#[sqlx::test]
async fn test_batch_write_uses_write_pool(pool: PgPool) {
use sqlx_pool_router::TestDbPools;
crate::migrator().run(&pool).await.unwrap();
let test_pools = TestDbPools::new(pool).await.unwrap();
let handler =
PostgresHandler::<TestDbPools, TestRequest, TestResponse>::from_pool_provider(
test_pools,
)
.await
.unwrap();
let mut req = create_test_request_data();
req.correlation_id = 4000;
handler.handle_request_batch(&[req.clone()]).await;
let res = create_test_response_data();
handler.handle_response_batch(&[(req, res)]).await;
}
}