use std::time::Duration;
use axum::{Router, middleware, routing::get};
use rs_zero::observability::{
HttpMetricLabels, MetricsRegistry, RedisMetricLabels, SqlMetricLabels, metrics_router,
observe_rpc_unary, observe_sql_query, record_metrics_middleware, record_rpc_streaming_snapshot,
};
use rs_zero::rest::{ApiResponse, RestConfig, RestResilienceConfig, RestServer};
use rs_zero::rpc::streaming::RpcStreamingSnapshot;
use tonic::Code;
use tower::ServiceExt;
#[test]
fn registry_exports_prometheus_text() {
let registry = MetricsRegistry::new();
registry.record_http_request(
HttpMetricLabels::new("GET", "/ready", 200),
Duration::from_millis(5),
);
registry.record_sql_query(
SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
Duration::from_millis(1),
);
registry.record_redis_command(
RedisMetricLabels::new("GET", "primary", "error"),
Duration::from_millis(2),
);
registry.record_redis_event(rs_zero::observability::RedisEventLabels::new(
"redirect", "primary", "moved",
));
registry.record_redis_degradation(rs_zero::observability::RedisDegradationLabels::new(
"get",
"return_miss",
"primary",
));
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_http_requests_total"));
assert!(text.contains("rs_zero_sql_queries_total"));
assert!(text.contains("rs_zero_redis_commands_total"));
assert!(text.contains("rs_zero_redis_command_errors_total"));
assert!(text.contains("rs_zero_redis_events_total"));
assert!(text.contains("event=\"redirect\",shard=\"primary\",result=\"moved\""));
assert!(text.contains("rs_zero_redis_degradations_total"));
assert!(text.contains("operation=\"get\",action=\"return_miss\",shard=\"primary\""));
assert!(text.contains("route=\"/ready\""));
}
#[tokio::test]
async fn sql_and_rpc_helpers_record_low_cardinality_metrics() {
let registry = MetricsRegistry::new();
let value: Result<i32, &'static str> = observe_sql_query(
Some(®istry),
"sqlite",
"users",
"find_by_id",
"select",
async { Ok(1) },
)
.await;
assert_eq!(value.expect("sql"), 1);
let rpc = observe_rpc_unary(Some(®istry), "hello.Hello", "Say", None, async {
Ok::<_, tonic::Status>("ok")
})
.await;
assert_eq!(rpc.expect("rpc"), "ok");
record_rpc_streaming_snapshot(
Some(®istry),
"hello.Hello",
"Chat",
&RpcStreamingSnapshot {
sent_messages: 1,
received_messages: 2,
completed: true,
code: Some(Code::Ok),
duration: Duration::from_millis(3),
},
);
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_sql_query_duration_seconds"));
assert!(text.contains("repository=\"users\",method=\"find_by_id\""));
assert!(text.contains("rs_zero_rpc_request_duration_seconds"));
assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
assert!(text.contains("service=\"hello.Hello\",method=\"Chat\",code=\"Ok\""));
assert!(!text.contains("SELECT"));
assert!(!text.contains("/users/42"));
assert!(!text.contains("redis://"));
assert!(!text.contains("password"));
assert!(!text.contains("users:42"));
}
#[tokio::test]
async fn rest_resilience_records_low_cardinality_events() {
let registry = MetricsRegistry::new();
let config = RestConfig {
metrics_registry: Some(registry.clone()),
middlewares: rs_zero::rest::RestMiddlewareConfig {
resilience: RestResilienceConfig {
breaker_enabled: true,
breaker_failure_threshold: 1,
breaker_reset_timeout: Duration::from_secs(60),
max_concurrency: Some(1),
..RestResilienceConfig::default()
},
..rs_zero::rest::RestMiddlewareConfig::default()
},
..RestConfig::default()
};
let app = RestServer::new(
config,
Router::new().route("/users/{id}", get(|| async { ApiResponse::success("ok") })),
)
.into_router();
let response = app
.oneshot(
axum::http::Request::builder()
.uri("/users/42")
.body(axum::body::Body::empty())
.expect("request"),
)
.await
.expect("response");
assert_eq!(response.status(), axum::http::StatusCode::OK);
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_resilience_events_total"));
assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
assert!(!text.contains("/users/42"));
assert!(!text.contains("redis://"));
assert!(!text.contains("password"));
assert!(!text.contains("users:42"));
}
#[tokio::test]
async fn rest_metrics_layer_records_matched_path() {
let registry = MetricsRegistry::new();
let app = Router::new()
.route("/users/{id}", get(|| async { "ok" }))
.merge(metrics_router(registry.clone()))
.layer(middleware::from_fn_with_state(
registry.clone(),
record_metrics_middleware,
));
let response = app
.oneshot(
axum::http::Request::builder()
.uri("/users/42")
.body(axum::body::Body::empty())
.expect("request"),
)
.await
.expect("response");
assert_eq!(response.status(), axum::http::StatusCode::OK);
let text = registry.render_prometheus();
assert!(text.contains("route=\"/users/{id}\""));
assert!(!text.contains("/users/42"));
assert!(!text.contains("redis://"));
assert!(!text.contains("password"));
assert!(!text.contains("users:42"));
}
#[cfg(all(feature = "cache-redis", feature = "observability"))]
#[tokio::test]
async fn redis_store_exposes_adapter_metrics_without_keys() {
use rs_zero::cache::{CacheKey, CacheStore};
use rs_zero::cache_redis::{RedisCacheConfig, RedisCacheStore, RedisUnavailablePolicy};
let registry = MetricsRegistry::new();
let store = RedisCacheStore::new(RedisCacheConfig {
url: "redis://127.0.0.1:1".to_string(),
connect_timeout: Duration::from_millis(5),
command_timeout: Duration::from_millis(5),
unavailable_policy: RedisUnavailablePolicy::fail_open_for_cache(),
..RedisCacheConfig::default()
})
.expect("redis config")
.with_metrics(registry.clone())
.with_shard_name("primary");
let key = CacheKey::new("users", ["42"]);
assert!(store.get_raw(&key).await.expect("fail-open miss").is_none());
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_redis_events_total"));
assert!(text.contains("event=\"pool\",shard=\"primary\""));
assert!(text.contains("rs_zero_redis_degradations_total"));
assert!(text.contains("operation=\"get\",action=\"return_miss\",shard=\"primary\""));
assert!(!text.contains("users:42"));
assert!(!text.contains("redis://"));
}
#[cfg(all(feature = "db-sqlite", feature = "observability"))]
#[tokio::test]
async fn sql_pool_health_metrics_cover_adapter_path() {
let registry = MetricsRegistry::new();
let config = rs_zero::db::DatabaseConfig {
url: "sqlite::memory:".to_string(),
..rs_zero::db::DatabaseConfig::default()
};
let pool = rs_zero::db::connect_pool(&config).await.expect("pool");
rs_zero::db::health_check_with_metrics(&pool, ®istry)
.await
.expect("health check");
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_sql_queries_total"));
assert!(text.contains("repository=\"db\",method=\"health_check\""));
assert!(!text.contains("SELECT 1"));
assert!(!text.contains("sqlite::memory:"));
}
#[cfg(all(feature = "rpc", feature = "resil", feature = "observability"))]
#[tokio::test]
async fn rpc_resilience_layer_records_metrics_without_manual_observe_helper() {
let registry = MetricsRegistry::new();
let layer = rs_zero::rpc::RpcResilienceLayer::new(
"hello.Hello",
rs_zero::rpc::RpcResilienceConfig::default(),
)
.with_metrics(registry.clone());
let value = layer
.run_unary("Say", || async { Ok::<_, tonic::Status>("ok") })
.await
.expect("rpc");
assert_eq!(value, "ok");
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_rpc_requests_total"));
assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
assert!(text.contains("rs_zero_resilience_events_total"));
assert!(!text.contains("/hello.Hello/Say?"));
}
#[cfg(all(feature = "rpc", feature = "resil", feature = "observability"))]
#[tokio::test]
async fn rpc_resilience_layer_uses_metadata_for_observability_context() {
let registry = MetricsRegistry::new();
let layer = rs_zero::rpc::RpcResilienceLayer::new(
"hello.Hello",
rs_zero::rpc::RpcResilienceConfig::default(),
)
.with_metrics(registry.clone());
let mut request = tonic::Request::new(());
request
.metadata_mut()
.insert("x-request-id", "req-layer-1".parse().expect("request id"));
let value = layer
.run_unary_with_metadata("Say", request.metadata(), || async {
Ok::<_, tonic::Status>("ok")
})
.await
.expect("rpc");
assert_eq!(value, "ok");
let text = registry.render_prometheus();
assert!(text.contains("service=\"hello.Hello\",method=\"Say\",code=\"Ok\""));
}
#[test]
fn rpc_observe_helper_emits_info_log_with_method_and_trace_fields() {
use rs_zero::core::logging::with_scoped_tracing;
use rs_zero::core::logging::{LogConfig, LogFormat, LogSpanEvents, LogWriterConfig};
let temp = tempfile::tempdir().expect("tempdir");
let path = temp.path().join("rpc.log");
let config = LogConfig {
filter: "info".to_string(),
ansi: false,
span_events: LogSpanEvents::None,
..LogConfig::default()
.with_format(LogFormat::Json)
.with_writer(LogWriterConfig::file(&path))
};
with_scoped_tracing(config, || {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
observe_rpc_unary(
None,
"hello.Greeter",
"say_hello",
Some("req-rpc-log-1"),
async { Ok::<_, tonic::Status>(()) },
)
.await
.expect("rpc");
});
})
.expect("scoped tracing");
let logs = std::fs::read_to_string(path).expect("logs");
assert!(logs.contains("rpc unary observed"));
assert!(logs.contains("hello.Greeter"));
assert!(logs.contains("say_hello"));
assert!(logs.contains("\"route\":\"say_hello\""));
assert!(logs.contains("req-rpc-log-1"));
assert!(logs.contains("Ok"));
}