use super::{format, routes::Routes};
#[cfg(any(feature = "cache_inmem", feature = "cache_redis"))]
use crate::config;
use crate::{app::AppContext, Result};
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
};
use serde::Serialize;
#[derive(Serialize)]
pub struct Health {
pub ok: bool,
}
pub async fn ping() -> Result<Response> {
format::json(Health { ok: true })
}
pub async fn health() -> Result<Response> {
format::json(Health { ok: true })
}
pub async fn readiness(State(ctx): State<AppContext>) -> (StatusCode, Response) {
#[cfg(feature = "with-db")]
if let Err(error) = &ctx.db.ping().await {
tracing::error!(err.msg = %error, err.detail = ?error, "readiness_db_ping_error");
return (
StatusCode::SERVICE_UNAVAILABLE,
format::json(Health { ok: false }).into_response(),
);
}
if let Some(queue) = &ctx.queue_provider {
if let Err(error) = queue.ping().await {
tracing::error!(err.msg = %error, err.detail = ?error, "readiness_queue_ping_error");
return (
StatusCode::SERVICE_UNAVAILABLE,
format::json(Health { ok: false }).into_response(),
);
}
}
#[cfg(any(feature = "cache_inmem", feature = "cache_redis"))]
{
match ctx.config.cache {
#[cfg(feature = "cache_inmem")]
config::CacheConfig::InMem(_) => {
if let Err(error) = &ctx.cache.driver.ping().await {
tracing::error!(err.msg = %error, err.detail = ?error, "readiness_cache_ping_error");
return (
StatusCode::SERVICE_UNAVAILABLE,
format::json(Health { ok: false }).into_response(),
);
}
}
#[cfg(feature = "cache_redis")]
config::CacheConfig::Redis(_) => {
if let Err(error) = &ctx.cache.driver.ping().await {
tracing::error!(err.msg = %error, err.detail = ?error, "readiness_cache_ping_error");
return (
StatusCode::SERVICE_UNAVAILABLE,
format::json(Health { ok: false }).into_response(),
);
}
}
config::CacheConfig::Null => (),
}
}
(
StatusCode::OK,
format::json(Health { ok: true }).into_response(),
)
}
pub fn routes() -> Routes {
Routes::new()
.add("/_readiness", get(readiness))
.add("/_ping", get(ping))
.add("/_health", get(health))
}
#[cfg(test)]
mod tests {
use axum::routing::get;
use pipi::tests_cfg::db::fail_connection;
use pipi::{bgworker, cache, config, controller::monitoring, tests_cfg};
use serde_json::Value;
use tower::ServiceExt;
#[cfg(feature = "cache_redis")]
use crate::tests_cfg::redis::setup_redis_container;
#[tokio::test]
async fn ping_works() {
let ctx = tests_cfg::app::get_app_context().await;
let router = axum::Router::new()
.route("/_ping", get(monitoring::ping))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_ping")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[tokio::test]
async fn health_works() {
let ctx = tests_cfg::app::get_app_context().await;
let router = axum::Router::new()
.route("/_health", get(monitoring::health))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_health")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[cfg(not(feature = "with-db"))]
#[tokio::test]
async fn readiness_no_features() {
let ctx = tests_cfg::app::get_app_context().await;
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[cfg(feature = "with-db")]
#[tokio::test]
async fn readiness_with_db_success() {
let ctx = tests_cfg::app::get_app_context().await;
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[cfg(feature = "with-db")]
#[tokio::test]
async fn readiness_with_db_failure() {
let mut ctx = tests_cfg::app::get_app_context().await;
ctx.db = fail_connection().await;
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 503);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], false);
}
#[cfg(feature = "cache_inmem")]
#[tokio::test]
async fn readiness_with_cache_inmem() {
let mut ctx = tests_cfg::app::get_app_context().await;
ctx.cache = cache::drivers::inmem::new(&pipi::config::InMemCacheConfig {
max_capacity: 32 * 1024 * 1024,
})
.into();
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[cfg(feature = "cache_redis")]
#[tokio::test]
async fn readiness_with_cache_redis_success() {
let (redis_url, _container) = setup_redis_container().await;
let mut ctx = tests_cfg::app::get_app_context().await;
let redis_cache = cache::drivers::redis::new(&config::RedisCacheConfig {
uri: redis_url,
max_size: 10,
})
.await
.expect("Failed to create Redis cache");
ctx.cache = redis_cache.into();
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[cfg(feature = "cache_redis")]
#[tokio::test]
async fn readiness_with_cache_redis_failure() {
let mut ctx = tests_cfg::app::get_app_context().await;
let failour_redis_url = "redis://127.0.0.2:0";
ctx.config.cache = config::CacheConfig::Redis(pipi::config::RedisCacheConfig {
uri: failour_redis_url.to_string(),
max_size: 10,
});
ctx.cache = cache::drivers::redis::new(&config::RedisCacheConfig {
uri: failour_redis_url.to_string(),
max_size: 10,
})
.await
.expect("Failed to create Redis cache")
.into();
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 503);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], false);
}
#[tokio::test]
async fn readiness_with_queue_not_present() {
let mut ctx = tests_cfg::app::get_app_context().await;
ctx.config.workers.mode = config::WorkerMode::BackgroundQueue;
ctx.queue_provider = Some(std::sync::Arc::new(bgworker::Queue::None));
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], true);
}
#[cfg(feature = "bg_redis")]
#[tokio::test]
async fn readiness_with_queue_present_failure() {
let mut ctx = tests_cfg::app::get_app_context().await;
let failure_redis_url = "redis://127.0.0.2:0";
ctx.config.workers.mode = config::WorkerMode::BackgroundQueue;
ctx.config.queue = Some(config::QueueConfig::Redis(config::RedisQueueConfig {
uri: failure_redis_url.to_string(),
dangerously_flush: false,
queues: None,
num_workers: 1,
}));
ctx.queue_provider = Some(std::sync::Arc::new(
bgworker::redis::create_provider(&config::RedisQueueConfig {
uri: failure_redis_url.to_string(),
dangerously_flush: false,
queues: None,
num_workers: 1,
})
.await
.expect("Failed to create Redis queue provider"),
));
let router = axum::Router::new()
.route("/_readiness", get(monitoring::readiness))
.with_state(ctx);
let req = axum::http::Request::builder()
.uri("/_readiness")
.method("GET")
.body(axum::body::Body::empty())
.unwrap();
let response = router.oneshot(req).await.unwrap();
assert_eq!(response.status(), 503);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let res_json: Value = serde_json::from_slice(&body).expect("Valid JSON response");
assert_eq!(res_json["ok"], false);
}
}