rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::Duration;

use axum::{Router, middleware, routing::get};
use rs_zero::observability::{
    HttpMetricLabels, MetricsRegistry, RedisMetricLabels, SqlMetricLabels, metrics_router,
    observe_rpc_unary, observe_sql_query, record_metrics_middleware, record_rpc_streaming_snapshot,
};
use rs_zero::rest::{ApiResponse, RestConfig, RestResilienceConfig, RestServer};
use rs_zero::rpc::streaming::RpcStreamingSnapshot;
use tonic::Code;
use tower::ServiceExt;

#[test]
fn registry_exports_prometheus_text() {
    let registry = MetricsRegistry::new();
    registry.record_http_request(
        HttpMetricLabels::new("GET", "/ready", 200),
        Duration::from_millis(5),
    );
    registry.record_sql_query(
        SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
        Duration::from_millis(1),
    );
    registry.record_redis_command(
        RedisMetricLabels::new("GET", "primary", "error"),
        Duration::from_millis(2),
    );
    registry.record_redis_event(rs_zero::observability::RedisEventLabels::new(
        "redirect", "primary", "moved",
    ));
    registry.record_redis_degradation(rs_zero::observability::RedisDegradationLabels::new(
        "get",
        "return_miss",
        "primary",
    ));
    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_http_requests_total"));
    assert!(text.contains("rs_zero_sql_queries_total"));
    assert!(text.contains("rs_zero_redis_commands_total"));
    assert!(text.contains("rs_zero_redis_command_errors_total"));
    assert!(text.contains("rs_zero_redis_events_total"));
    assert!(text.contains("event=\"redirect\",shard=\"primary\",result=\"moved\""));
    assert!(text.contains("rs_zero_redis_degradations_total"));
    assert!(text.contains("operation=\"get\",action=\"return_miss\",shard=\"primary\""));
    assert!(text.contains("route=\"/ready\""));
}

#[tokio::test]
async fn sql_and_rpc_helpers_record_low_cardinality_metrics() {
    let registry = MetricsRegistry::new();

    let value: Result<i32, &'static str> = observe_sql_query(
        Some(&registry),
        "sqlite",
        "users",
        "find_by_id",
        "select",
        async { Ok(1) },
    )
    .await;
    assert_eq!(value.expect("sql"), 1);

    let rpc = observe_rpc_unary(Some(&registry), "hello.Hello", "Say", None, async {
        Ok::<_, tonic::Status>("ok")
    })
    .await;
    assert_eq!(rpc.expect("rpc"), "ok");

    record_rpc_streaming_snapshot(
        Some(&registry),
        "hello.Hello",
        "Chat",
        &RpcStreamingSnapshot {
            sent_messages: 1,
            received_messages: 2,
            completed: true,
            code: Some(Code::Ok),
            duration: Duration::from_millis(3),
        },
    );

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_sql_query_duration_seconds"));
    assert!(text.contains("repository=\"users\",method=\"find_by_id\""));
    assert!(text.contains("rs_zero_rpc_request_duration_seconds"));
    assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
    assert!(text.contains("service=\"hello.Hello\",method=\"Chat\",code=\"Ok\""));
    assert!(!text.contains("SELECT"));
    assert!(!text.contains("/users/42"));
    assert!(!text.contains("redis://"));
    assert!(!text.contains("password"));
    assert!(!text.contains("users:42"));
}

#[tokio::test]
async fn rest_resilience_records_low_cardinality_events() {
    let registry = MetricsRegistry::new();
    let config = RestConfig {
        metrics_registry: Some(registry.clone()),
        middlewares: rs_zero::rest::RestMiddlewareConfig {
            resilience: RestResilienceConfig {
                breaker_enabled: true,
                breaker_failure_threshold: 1,
                breaker_reset_timeout: Duration::from_secs(60),
                max_concurrency: Some(1),
                ..RestResilienceConfig::default()
            },
            ..rs_zero::rest::RestMiddlewareConfig::default()
        },
        ..RestConfig::default()
    };
    let app = RestServer::new(
        config,
        Router::new().route("/users/{id}", get(|| async { ApiResponse::success("ok") })),
    )
    .into_router();

    let response = app
        .oneshot(
            axum::http::Request::builder()
                .uri("/users/42")
                .body(axum::body::Body::empty())
                .expect("request"),
        )
        .await
        .expect("response");
    assert_eq!(response.status(), axum::http::StatusCode::OK);

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_resilience_events_total"));
    assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
    assert!(!text.contains("/users/42"));
    assert!(!text.contains("redis://"));
    assert!(!text.contains("password"));
    assert!(!text.contains("users:42"));
}

#[tokio::test]
async fn rest_metrics_layer_records_matched_path() {
    let registry = MetricsRegistry::new();
    let app = Router::new()
        .route("/users/{id}", get(|| async { "ok" }))
        .merge(metrics_router(registry.clone()))
        .layer(middleware::from_fn_with_state(
            registry.clone(),
            record_metrics_middleware,
        ));

    let response = app
        .oneshot(
            axum::http::Request::builder()
                .uri("/users/42")
                .body(axum::body::Body::empty())
                .expect("request"),
        )
        .await
        .expect("response");
    assert_eq!(response.status(), axum::http::StatusCode::OK);

    let text = registry.render_prometheus();
    assert!(text.contains("route=\"/users/{id}\""));
    assert!(!text.contains("/users/42"));
    assert!(!text.contains("redis://"));
    assert!(!text.contains("password"));
    assert!(!text.contains("users:42"));
}

#[cfg(all(feature = "cache-redis", feature = "observability"))]
#[tokio::test]
async fn redis_store_exposes_adapter_metrics_without_keys() {
    use rs_zero::cache::{CacheKey, CacheStore};
    use rs_zero::cache_redis::{RedisCacheConfig, RedisCacheStore, RedisUnavailablePolicy};

    let registry = MetricsRegistry::new();
    let store = RedisCacheStore::new(RedisCacheConfig {
        url: "redis://127.0.0.1:1".to_string(),
        connect_timeout: Duration::from_millis(5),
        command_timeout: Duration::from_millis(5),
        unavailable_policy: RedisUnavailablePolicy::fail_open_for_cache(),
        ..RedisCacheConfig::default()
    })
    .expect("redis config")
    .with_metrics(registry.clone())
    .with_shard_name("primary");

    let key = CacheKey::new("users", ["42"]);
    assert!(store.get_raw(&key).await.expect("fail-open miss").is_none());

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_redis_events_total"));
    assert!(text.contains("event=\"pool\",shard=\"primary\""));
    assert!(text.contains("rs_zero_redis_degradations_total"));
    assert!(text.contains("operation=\"get\",action=\"return_miss\",shard=\"primary\""));
    assert!(!text.contains("users:42"));
    assert!(!text.contains("redis://"));
}

#[cfg(all(feature = "db-sqlite", feature = "observability"))]
#[tokio::test]
async fn sql_pool_health_metrics_cover_adapter_path() {
    let registry = MetricsRegistry::new();
    let config = rs_zero::db::DatabaseConfig {
        url: "sqlite::memory:".to_string(),
        ..rs_zero::db::DatabaseConfig::default()
    };
    let pool = rs_zero::db::connect_pool(&config).await.expect("pool");

    rs_zero::db::health_check_with_metrics(&pool, &registry)
        .await
        .expect("health check");

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_sql_queries_total"));
    assert!(text.contains("repository=\"db\",method=\"health_check\""));
    assert!(!text.contains("SELECT 1"));
    assert!(!text.contains("sqlite::memory:"));
}

#[cfg(all(feature = "rpc", feature = "resil", feature = "observability"))]
#[tokio::test]
async fn rpc_resilience_layer_records_metrics_without_manual_observe_helper() {
    let registry = MetricsRegistry::new();
    let layer = rs_zero::rpc::RpcResilienceLayer::new(
        "hello.Hello",
        rs_zero::rpc::RpcResilienceConfig::default(),
    )
    .with_metrics(registry.clone());

    let value = layer
        .run_unary("Say", || async { Ok::<_, tonic::Status>("ok") })
        .await
        .expect("rpc");
    assert_eq!(value, "ok");

    let text = registry.render_prometheus();
    assert!(text.contains("rs_zero_rpc_requests_total"));
    assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
    assert!(text.contains("rs_zero_resilience_events_total"));
    assert!(!text.contains("/hello.Hello/Say?"));
}

#[cfg(all(feature = "rpc", feature = "resil", feature = "observability"))]
#[tokio::test]
async fn rpc_resilience_layer_uses_metadata_for_observability_context() {
    let registry = MetricsRegistry::new();
    let layer = rs_zero::rpc::RpcResilienceLayer::new(
        "hello.Hello",
        rs_zero::rpc::RpcResilienceConfig::default(),
    )
    .with_metrics(registry.clone());
    let mut request = tonic::Request::new(());
    request
        .metadata_mut()
        .insert("x-request-id", "req-layer-1".parse().expect("request id"));

    let value = layer
        .run_unary_with_metadata("Say", request.metadata(), || async {
            Ok::<_, tonic::Status>("ok")
        })
        .await
        .expect("rpc");

    assert_eq!(value, "ok");
    let text = registry.render_prometheus();
    assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
}

#[test]
fn rpc_observe_helper_emits_info_log_with_method_and_trace_fields() {
    use rs_zero::core::logging::with_scoped_tracing;
    use rs_zero::core::logging::{LogConfig, LogFormat, LogSpanEvents, LogWriterConfig};

    let temp = tempfile::tempdir().expect("tempdir");
    let path = temp.path().join("rpc.log");
    let config = LogConfig {
        filter: "info".to_string(),
        ansi: false,
        span_events: LogSpanEvents::None,
        ..LogConfig::default()
            .with_format(LogFormat::Json)
            .with_writer(LogWriterConfig::file(&path))
    };

    with_scoped_tracing(config, || {
        let runtime = tokio::runtime::Runtime::new().expect("runtime");
        runtime.block_on(async {
            observe_rpc_unary(
                None,
                "hello.Greeter",
                "say_hello",
                Some("req-rpc-log-1"),
                async { Ok::<_, tonic::Status>(()) },
            )
            .await
            .expect("rpc");
        });
    })
    .expect("scoped tracing");

    let logs = std::fs::read_to_string(path).expect("logs");
    assert!(logs.contains("rpc unary observed"));
    assert!(logs.contains("hello.Greeter"));
    assert!(logs.contains("say_hello"));
    assert!(logs.contains("\"route\":\"say_hello\""));
    assert!(logs.contains("req-rpc-log-1"));
    assert!(logs.contains("Ok"));
}