#![allow(clippy::unwrap_used)] #![allow(clippy::missing_panics_doc)] #![allow(missing_docs)]
use std::sync::Arc;
use fraiseql_server::usage::{
aggregator::{PostgresBackend, UsageAggregator},
events::MutationAuditEvent,
};
use sqlx::PgPool;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::postgres::Postgres;
async fn setup_pg() -> (PgPool, impl std::any::Any) {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPool::connect(&url).await.unwrap();
(pool, container)
}
fn event(tenant: &str, period: &str, entity: &str) -> MutationAuditEvent {
MutationAuditEvent::new(format!("create_{entity}"), entity, "create", tenant, period)
}
#[tokio::test]
async fn test_postgres_backend_flush_and_load_round_trip() {
let (pool, _container) = setup_pg().await;
let backend = Arc::new(PostgresBackend::new(pool.clone()).await.unwrap());
let agg = UsageAggregator::new_with_backend(backend.clone());
agg.record(&event("acme", "2026-05", "User"));
agg.record(&event("acme", "2026-05", "User"));
agg.record(&event("acme", "2026-05", "Order"));
agg.flush_to_backend().await.unwrap();
let new_agg = UsageAggregator::new_with_backend(backend.clone());
new_agg.load_from_backend().await.unwrap();
assert_eq!(new_agg.query("acme", "2026-05").mutations["User"], 2);
assert_eq!(new_agg.query("acme", "2026-05").mutations["Order"], 1);
}
#[tokio::test]
async fn test_postgres_backend_flush_is_idempotent() {
let (pool, _container) = setup_pg().await;
let backend = Arc::new(PostgresBackend::new(pool.clone()).await.unwrap());
let agg = UsageAggregator::new_with_backend(backend.clone());
agg.record(&event("t1", "2026-05", "Widget"));
agg.record(&event("t1", "2026-05", "Widget"));
agg.flush_to_backend().await.unwrap();
agg.flush_to_backend().await.unwrap();
let new_agg = UsageAggregator::new_with_backend(backend.clone());
new_agg.load_from_backend().await.unwrap();
assert_eq!(new_agg.query("t1", "2026-05").mutations["Widget"], 2);
}
#[tokio::test]
async fn test_postgres_backend_load_merges_with_inflight() {
let (pool, _container) = setup_pg().await;
let backend = Arc::new(PostgresBackend::new(pool.clone()).await.unwrap());
let agg = UsageAggregator::new_with_backend(backend.clone());
for _ in 0..3 {
agg.record(&event("tenant", "2026-05", "Thing"));
}
agg.flush_to_backend().await.unwrap();
let new_agg = UsageAggregator::new_with_backend(backend.clone());
new_agg.record(&event("tenant", "2026-05", "Thing"));
new_agg.record(&event("tenant", "2026-05", "Thing"));
new_agg.load_from_backend().await.unwrap();
assert_eq!(new_agg.query("tenant", "2026-05").mutations["Thing"], 5);
}
#[tokio::test]
async fn test_postgres_backend_tenant_isolation() {
let (pool, _container) = setup_pg().await;
let backend = Arc::new(PostgresBackend::new(pool.clone()).await.unwrap());
let agg = UsageAggregator::new_with_backend(backend.clone());
agg.record(&event("tenant_a", "2026-05", "User"));
agg.record(&event("tenant_b", "2026-05", "User"));
agg.record(&event("tenant_b", "2026-05", "User"));
agg.flush_to_backend().await.unwrap();
let new_agg = UsageAggregator::new_with_backend(backend.clone());
new_agg.load_from_backend().await.unwrap();
assert_eq!(new_agg.query("tenant_a", "2026-05").mutations["User"], 1);
assert_eq!(new_agg.query("tenant_b", "2026-05").mutations["User"], 2);
}
#[tokio::test]
async fn test_postgres_backend_empty_load_is_noop() {
let (pool, _container) = setup_pg().await;
let backend = Arc::new(PostgresBackend::new(pool.clone()).await.unwrap());
let agg = UsageAggregator::new_with_backend(backend);
agg.load_from_backend().await.unwrap();
assert_eq!(agg.entry_count(), 0);
}