#![allow(dead_code, unused_imports)]
use rust_job_queue_api_worker_system::{connect, migrate, PoolConfig};
use sqlx::query::query;
use sqlx_postgres::PgPool;
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::postgres::Postgres;
use tokio::sync::OnceCell;
pub struct Shared {
pub port: u16,
_container: ContainerAsync<Postgres>,
}
static SHARED: OnceCell<Shared> = OnceCell::const_new();
pub async fn shared() -> &'static Shared {
SHARED
.get_or_init(|| async {
let container = Postgres::default()
.start()
.await
.expect("start postgres container");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("postgres host port");
Shared {
port,
_container: container,
}
})
.await
}
pub async fn fresh_pool() -> PgPool {
let s = shared().await;
let port = s.port;
let admin = sqlx_postgres::PgPool::connect(&format!(
"postgres://postgres:postgres@127.0.0.1:{port}/postgres"
))
.await
.expect("admin connect");
let db = format!("t{}", uuid::Uuid::now_v7().simple());
query(&format!("CREATE DATABASE {db}"))
.execute(&admin)
.await
.expect("create db");
admin.close().await;
let pool = connect(
&PoolConfig::from_url(format!(
"postgres://postgres:postgres@127.0.0.1:{port}/{db}"
))
.with_max_connections(16),
)
.await
.expect("connect");
migrate(&pool).await.expect("migrate");
pool
}
#[cfg(feature = "api")]
pub use api_helpers::{spawn_api, ApiServer};
#[cfg(feature = "api")]
mod api_helpers {
use std::sync::Arc;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use rust_job_queue_api_worker_system::api::{build_router, AppState};
use sqlx_postgres::PgPool;
use tokio::net::TcpListener;
use tokio::sync::OnceCell;
static METRICS: OnceCell<PrometheusHandle> = OnceCell::const_new();
async fn metrics_handle() -> PrometheusHandle {
METRICS
.get_or_init(|| async {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
let _ = metrics::set_global_recorder(recorder);
handle
})
.await
.clone()
}
pub struct ApiServer {
pub base_url: String,
cancel: tokio::sync::oneshot::Sender<()>,
task: tokio::task::JoinHandle<()>,
}
impl ApiServer {
pub async fn shutdown(self) {
let _ = self.cancel.send(());
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), self.task).await;
}
}
pub async fn spawn_api(pool: PgPool) -> ApiServer {
let metrics = metrics_handle().await;
let state = AppState {
pool,
metrics: Arc::new(metrics),
};
let app = build_router(state);
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let local_addr = listener.local_addr().expect("local_addr");
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move {
let _ = axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = rx.await;
})
.await;
});
ApiServer {
base_url: format!("http://{local_addr}"),
cancel: tx,
task,
}
}
}