rs-zero 0.2.1

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::Duration;

use axum::{Router, middleware, routing::get};
use rs_zero::observability::{
    HttpMetricLabels, MetricsRegistry, RedisMetricLabels, SqlMetricLabels, metrics_router,
    observe_rpc_unary, observe_sql_query, record_metrics_middleware, record_rpc_streaming_snapshot,
};
use rs_zero::rest::{ApiResponse, RestConfig, RestResilienceConfig, RestServer};
use rs_zero::rpc::streaming::RpcStreamingSnapshot;
use tonic::Code;
use tower::ServiceExt;

#[test]
fn registry_exports_prometheus_text() {
    let registry = MetricsRegistry::new();
    registry.record_http_request(
        HttpMetricLabels::new("GET", "/ready", 200),
        Duration::from_millis(5),
    );
    registry.record_sql_query(
        SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
        Duration::from_millis(1),
    );
    registry.record_redis_command(
        RedisMetricLabels::new("GET", "primary", "error"),
        Duration::from_millis(2),
    );
    registry.record_redis_event(rs_zero::observability::RedisEventLabels::new(
        "redirect", "primary", "moved",
    ));
    registry.record_redis_degradation(rs_zero::observability::RedisDegradationLabels::new(
        "get",
        "return_miss",
        "primary",
    ));
    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_http_requests_total"));
    assert!(text.contains("rs_zero_sql_queries_total"));
    assert!(text.contains("rs_zero_redis_commands_total"));
    assert!(text.contains("rs_zero_redis_command_errors_total"));
    assert!(text.contains("rs_zero_redis_events_total"));
    assert!(text.contains("event=\"redirect\",shard=\"primary\",result=\"moved\""));
    assert!(text.contains("rs_zero_redis_degradations_total"));
    assert!(text.contains("operation=\"get\",action=\"return_miss\",shard=\"primary\""));
    assert!(text.contains("route=\"/ready\""));
}

#[tokio::test]
async fn sql_and_rpc_helpers_record_low_cardinality_metrics() {
    let registry = MetricsRegistry::new();

    let value: Result<i32, &'static str> = observe_sql_query(
        Some(&registry),
        "sqlite",
        "users",
        "find_by_id",
        "select",
        async { Ok(1) },
    )
    .await;
    assert_eq!(value.expect("sql"), 1);

    let rpc = observe_rpc_unary(Some(&registry), "hello.Hello", "Say", None, async {
        Ok::<_, tonic::Status>("ok")
    })
    .await;
    assert_eq!(rpc.expect("rpc"), "ok");

    record_rpc_streaming_snapshot(
        Some(&registry),
        "hello.Hello",
        "Chat",
        &RpcStreamingSnapshot {
            sent_messages: 1,
            received_messages: 2,
            completed: true,
            code: Some(Code::Ok),
            duration: Duration::from_millis(3),
        },
    );

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_sql_query_duration_seconds"));
    assert!(text.contains("repository=\"users\",method=\"find_by_id\""));
    assert!(text.contains("rs_zero_rpc_request_duration_seconds"));
    assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
    assert!(text.contains("service=\"hello.Hello\",method=\"Chat\",code=\"Ok\""));
    assert!(!text.contains("SELECT"));
    assert!(!text.contains("/users/42"));
    assert!(!text.contains("redis://"));
    assert!(!text.contains("password"));
    assert!(!text.contains("users:42"));
}

#[tokio::test]
async fn rest_resilience_records_low_cardinality_events() {
    let registry = MetricsRegistry::new();
    let config = RestConfig {
        metrics_registry: Some(registry.clone()),
        middlewares: rs_zero::rest::RestMiddlewareConfig {
            resilience: RestResilienceConfig {
                breaker_enabled: true,
                breaker_failure_threshold: 1,
                breaker_reset_timeout: Duration::from_secs(60),
                max_concurrency: Some(1),
                ..RestResilienceConfig::default()
            },
            ..rs_zero::rest::RestMiddlewareConfig::default()
        },
        ..RestConfig::default()
    };
    let app = RestServer::new(
        config,
        Router::new().route("/users/{id}", get(|| async { ApiResponse::success("ok") })),
    )
    .into_router();

    let response = app
        .oneshot(
            axum::http::Request::builder()
                .uri("/users/42")
                .body(axum::body::Body::empty())
                .expect("request"),
        )
        .await
        .expect("response");
    assert_eq!(response.status(), axum::http::StatusCode::OK);

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_resilience_events_total"));
    assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
    assert!(!text.contains("/users/42"));
    assert!(!text.contains("redis://"));
    assert!(!text.contains("password"));
    assert!(!text.contains("users:42"));
}

#[tokio::test]
async fn rest_metrics_layer_records_matched_path() {
    let registry = MetricsRegistry::new();
    let app = Router::new()
        .route("/users/{id}", get(|| async { "ok" }))
        .merge(metrics_router(registry.clone()))
        .layer(middleware::from_fn_with_state(
            registry.clone(),
            record_metrics_middleware,
        ));

    let response = app
        .oneshot(
            axum::http::Request::builder()
                .uri("/users/42")
                .body(axum::body::Body::empty())
                .expect("request"),
        )
        .await
        .expect("response");
    assert_eq!(response.status(), axum::http::StatusCode::OK);

    let text = registry.render_prometheus();
    assert!(text.contains("route=\"/users/{id}\""));
    assert!(!text.contains("/users/42"));
    assert!(!text.contains("redis://"));
    assert!(!text.contains("password"));
    assert!(!text.contains("users:42"));
}

#[cfg(all(feature = "cache-redis", feature = "observability"))]
#[tokio::test]
async fn redis_store_exposes_adapter_metrics_without_keys() {
    use rs_zero::cache::{CacheKey, CacheStore};
    use rs_zero::cache_redis::{RedisCacheConfig, RedisCacheStore, RedisUnavailablePolicy};

    let registry = MetricsRegistry::new();
    let store = RedisCacheStore::new(RedisCacheConfig {
        url: "redis://127.0.0.1:1".to_string(),
        connect_timeout: Duration::from_millis(5),
        command_timeout: Duration::from_millis(5),
        unavailable_policy: RedisUnavailablePolicy::fail_open_for_cache(),
        ..RedisCacheConfig::default()
    })
    .expect("redis config")
    .with_metrics(registry.clone())
    .with_shard_name("primary");

    let key = CacheKey::new("users", ["42"]);
    assert!(store.get_raw(&key).await.expect("fail-open miss").is_none());

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_redis_events_total"));
    assert!(text.contains("event=\"pool\",shard=\"primary\""));
    assert!(text.contains("rs_zero_redis_degradations_total"));
    assert!(text.contains("operation=\"get\",action=\"return_miss\",shard=\"primary\""));
    assert!(!text.contains("users:42"));
    assert!(!text.contains("redis://"));
}

#[cfg(all(feature = "db-sqlite", feature = "observability"))]
#[tokio::test]
async fn sql_pool_health_metrics_cover_adapter_path() {
    let registry = MetricsRegistry::new();
    let config = rs_zero::db::DatabaseConfig {
        url: "sqlite::memory:".to_string(),
        ..rs_zero::db::DatabaseConfig::default()
    };
    let pool = rs_zero::db::connect_pool(&config).await.expect("pool");

    rs_zero::db::health_check_with_metrics(&pool, &registry)
        .await
        .expect("health check");

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_sql_queries_total"));
    assert!(text.contains("repository=\"db\",method=\"health_check\""));
    assert!(!text.contains("SELECT 1"));
    assert!(!text.contains("sqlite::memory:"));
}

#[cfg(all(feature = "rpc", feature = "resil", feature = "observability"))]
#[tokio::test]
async fn rpc_resilience_layer_records_metrics_without_manual_observe_helper() {
    let registry = MetricsRegistry::new();
    let layer = rs_zero::rpc::RpcResilienceLayer::new(
        "hello.Hello",
        rs_zero::rpc::RpcResilienceConfig::default(),
    )
    .with_metrics(registry.clone());

    let value = layer
        .run_unary("Say", || async { Ok::<_, tonic::Status>("ok") })
        .await
        .expect("rpc");
    assert_eq!(value, "ok");

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_rpc_requests_total"));
    assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
    assert!(text.contains("rs_zero_resilience_events_total"));
    assert!(!text.contains("/hello.Hello/Say?"));
}