use axum::{
extract::{Query, State},
routing::{get, post},
Json, Router,
};
use outlet::{RequestLoggerConfig, RequestLoggerLayer};
use outlet_postgres::{DbPools, PostgresHandler, RequestFilter};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sqlx::postgres::PgPoolOptions;
use tower::ServiceBuilder;
#[derive(Serialize, Deserialize)]
struct ApiRequest {
action: String,
data: Value,
}
#[derive(Deserialize)]
struct QueryParams {
limit: Option<i64>,
method: Option<String>,
}
async fn handle_api_request(Json(payload): Json<ApiRequest>) -> Json<Value> {
Json(json!({
"status": "success",
"action": payload.action,
"processed": true
}))
}
async fn get_analytics(
State(handler): State<PostgresHandler<DbPools, Value, Value>>,
Query(params): Query<QueryParams>,
) -> Json<Value> {
let repository = handler.repository();
let filter = RequestFilter {
method: params.method,
limit: params.limit.or(Some(10)),
order_by_timestamp_desc: true,
..Default::default()
};
match repository.query(filter).await {
Ok(pairs) => {
let results: Vec<Value> = pairs
.into_iter()
.map(|pair| {
json!({
"correlation_id": pair.request.correlation_id,
"method": pair.request.method,
"uri": pair.request.uri,
"timestamp": pair.request.timestamp,
"status_code": pair.response.as_ref().map(|r| r.status_code),
"duration_ms": pair.response.as_ref().map(|r| r.duration_ms),
})
})
.collect();
Json(json!({
"total": results.len(),
"requests": results
}))
}
Err(e) => Json(json!({
"error": e.to_string(),
"requests": []
})),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let primary_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgresql://postgres:password@localhost/outlet_demo".to_string());
let replica_url = std::env::var("REPLICA_DATABASE_URL").unwrap_or_else(|_| primary_url.clone());
println!("🔌 Connecting to databases:");
println!(" Primary (write): {}", primary_url);
println!(" Replica (read): {}", replica_url);
let primary_pool = PgPoolOptions::new()
.max_connections(5)
.connect(&primary_url)
.await?;
let replica_pool = PgPoolOptions::new()
.max_connections(10) .connect(&replica_url)
.await?;
outlet_postgres::migrator().run(&primary_pool).await?;
let pools = if primary_url == replica_url {
println!("⚠️ Using single pool (primary and replica are the same)");
DbPools::new(primary_pool)
} else {
println!("✓ Using separate pools for read/write separation");
DbPools::with_replica(primary_pool, replica_pool)
};
let handler = PostgresHandler::from_pool_provider(pools).await?;
let config = RequestLoggerConfig {
capture_request_body: true,
capture_response_body: true,
..Default::default()
};
let layer = RequestLoggerLayer::new(config, handler.clone());
let app = Router::new()
.route("/api/action", post(handle_api_request))
.route("/analytics", get(get_analytics))
.with_state(handler)
.layer(ServiceBuilder::new().layer(layer));
println!();
println!("🚀 Server starting on http://localhost:3000");
println!();
println!("📝 Try these commands:");
println!();
println!(" # Make some API requests (writes to primary):");
println!(" curl -X POST http://localhost:3000/api/action \\");
println!(" -H 'Content-Type: application/json' \\");
println!(" -d '{{\"action\":\"create\",\"data\":{{\"id\":1}}}}'");
println!();
println!(" # Query analytics (reads from replica):");
println!(" curl 'http://localhost:3000/analytics?limit=10'");
println!(" curl 'http://localhost:3000/analytics?method=POST&limit=5'");
println!();
println!("💡 Benefits of read/write separation:");
println!(" - Write operations (logging) don't impact read performance");
println!(" - Read operations (analytics) can scale independently");
println!(" - Replica can have different connection pool size");
println!();
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}