use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::bytes::Bytes;
use super::server::Interceptor;
use super::status::Status;
use super::streaming::{MetadataValue, Request, Response};
#[derive(Clone)]
pub struct InterceptorLayer {
interceptors: Vec<Arc<dyn Interceptor>>,
}
impl std::fmt::Debug for InterceptorLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InterceptorLayer")
.field(
"interceptors",
&format!("[{} interceptors]", self.interceptors.len()),
)
.finish()
}
}
impl InterceptorLayer {
#[must_use]
pub fn new() -> Self {
Self {
interceptors: Vec::with_capacity(4),
}
}
#[must_use]
pub fn layer<I>(mut self, interceptor: I) -> Self
where
I: Interceptor + 'static,
{
self.interceptors.push(Arc::new(interceptor));
self
}
#[must_use]
pub fn layers<I>(mut self, interceptors: impl IntoIterator<Item = I>) -> Self
where
I: Interceptor + 'static,
{
let interceptors = interceptors.into_iter();
let (lower, upper) = interceptors.size_hint();
self.interceptors.reserve(upper.unwrap_or(lower));
for interceptor in interceptors {
self.interceptors.push(Arc::new(interceptor));
}
self
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.interceptors.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.interceptors.len()
}
}
impl Default for InterceptorLayer {
fn default() -> Self {
Self::new()
}
}
impl Interceptor for InterceptorLayer {
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
for interceptor in &self.interceptors {
interceptor.intercept_request(request)?;
}
Ok(())
}
fn intercept_response(&self, response: &mut Response<Bytes>) -> Result<(), Status> {
for interceptor in self.interceptors.iter().rev() {
interceptor.intercept_response(response)?;
}
Ok(())
}
fn intercept_response_with_request(
&self,
request: &Request<Bytes>,
response: &mut Response<Bytes>,
) -> Result<(), Status> {
for interceptor in self.interceptors.iter().rev() {
interceptor.intercept_response_with_request(request, response)?;
}
Ok(())
}
}
#[derive(Clone)]
pub struct FnInterceptor<F> {
f: F,
}
impl<F> FnInterceptor<F>
where
F: Fn(&mut Request<Bytes>) -> Result<(), Status> + Send + Sync,
{
pub fn new(f: F) -> Self {
Self { f }
}
}
impl<F> std::fmt::Debug for FnInterceptor<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FnInterceptor").finish_non_exhaustive()
}
}
impl<F> Interceptor for FnInterceptor<F>
where
F: Fn(&mut Request<Bytes>) -> Result<(), Status> + Send + Sync,
{
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
(self.f)(request)
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
Ok(())
}
}
pub fn fn_interceptor<F>(f: F) -> FnInterceptor<F>
where
F: Fn(&mut Request<Bytes>) -> Result<(), Status> + Send + Sync,
{
FnInterceptor::new(f)
}
#[derive(Debug, Clone)]
pub struct TracingInterceptor {
generate_request_id: bool,
next_request_id: Arc<AtomicU64>,
}
impl Default for TracingInterceptor {
fn default() -> Self {
Self::new()
}
}
impl TracingInterceptor {
#[must_use]
pub fn new() -> Self {
Self {
generate_request_id: true,
next_request_id: Arc::new(AtomicU64::new(1)),
}
}
#[must_use]
pub fn with_request_id(mut self, enabled: bool) -> Self {
self.generate_request_id = enabled;
self
}
}
impl Interceptor for TracingInterceptor {
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
if self.generate_request_id && request.metadata().get("x-request-id").is_none() {
let id = format!(
"req-{:016x}",
self.next_request_id.fetch_add(1, Ordering::Relaxed)
);
request.metadata_mut().insert("x-request-id", id);
}
Ok(())
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
Ok(())
}
}
#[must_use]
pub fn trace_interceptor() -> TracingInterceptor {
TracingInterceptor::new()
}
#[derive(Debug, Clone)]
pub struct BearerAuthInterceptor {
token: String,
}
impl BearerAuthInterceptor {
#[must_use]
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into(),
}
}
}
impl Interceptor for BearerAuthInterceptor {
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
request
.metadata_mut()
.insert("authorization", format!("Bearer {}", self.token));
Ok(())
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
Ok(())
}
}
#[must_use]
pub fn auth_bearer_interceptor(token: impl Into<String>) -> BearerAuthInterceptor {
BearerAuthInterceptor::new(token)
}
fn metadata_to_string(value: &MetadataValue) -> Option<&str> {
match value {
MetadataValue::Ascii(s) => Some(s.as_str()),
MetadataValue::Binary(_) => None,
}
}
fn bearer_token(auth: &str) -> Option<&str> {
let (scheme, token) = auth.split_once(' ')?;
if !scheme.eq_ignore_ascii_case("bearer") {
return None;
}
let token = token.trim_start_matches(' ');
if token.is_empty() {
return None;
}
Some(token)
}
fn copy_metadata_value(
metadata: &mut super::streaming::Metadata,
key: &str,
value: &MetadataValue,
) {
match value {
MetadataValue::Ascii(ascii) => {
metadata.insert(key, ascii.clone());
}
MetadataValue::Binary(binary) => {
metadata.insert_bin(key, binary.clone());
}
}
}
#[derive(Debug)]
pub struct BearerAuthValidator<F> {
validator: F,
}
impl<F> BearerAuthValidator<F>
where
F: Fn(&str) -> bool + Send + Sync,
{
pub fn new(validator: F) -> Self {
Self { validator }
}
}
impl<F> Interceptor for BearerAuthValidator<F>
where
F: Fn(&str) -> bool + Send + Sync,
{
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
let auth_value = request
.metadata()
.get("authorization")
.ok_or_else(|| Status::unauthenticated("missing authorization header"))?;
let auth_str = metadata_to_string(auth_value)
.ok_or_else(|| Status::unauthenticated("authorization must be ASCII"))?;
let token = bearer_token(auth_str)
.ok_or_else(|| Status::unauthenticated("invalid authorization format"))?;
if (self.validator)(token) {
Ok(())
} else {
Err(Status::unauthenticated("invalid token"))
}
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
Ok(())
}
}
pub fn auth_validator<F>(validator: F) -> BearerAuthValidator<F>
where
F: Fn(&str) -> bool + Send + Sync,
{
BearerAuthValidator::new(validator)
}
#[derive(Debug, Clone)]
pub struct MetadataPropagator {
keys: Vec<String>,
}
impl MetadataPropagator {
#[must_use]
pub fn new(keys: impl IntoIterator<Item = impl Into<String>>) -> Self {
let keys = keys.into_iter();
let (lower, upper) = keys.size_hint();
let mut collected_keys = Vec::with_capacity(upper.unwrap_or(lower));
for key in keys {
collected_keys.push(key.into());
}
Self {
keys: collected_keys,
}
}
}
impl Interceptor for MetadataPropagator {
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
let mut keys_to_propagate = Vec::with_capacity(self.keys.len());
for key in &self.keys {
if request.metadata().get(key).is_some() {
keys_to_propagate.push(key.clone());
}
}
if !keys_to_propagate.is_empty() {
request
.metadata_mut()
.insert("x-propagate-keys", keys_to_propagate.join(","));
}
Ok(())
}
fn intercept_response(&self, response: &mut Response<Bytes>) -> Result<(), Status> {
let _ = response;
Ok(())
}
fn intercept_response_with_request(
&self,
request: &Request<Bytes>,
response: &mut Response<Bytes>,
) -> Result<(), Status> {
let response_metadata = response.metadata_mut();
response_metadata.reserve(self.keys.len());
for key in &self.keys {
if response_metadata.get(key).is_some() {
continue;
}
if let Some(value) = request.metadata().get(key) {
copy_metadata_value(response_metadata, key, value);
}
}
Ok(())
}
}
#[must_use]
pub fn metadata_propagator(
keys: impl IntoIterator<Item = impl Into<String>>,
) -> MetadataPropagator {
MetadataPropagator::new(keys)
}
#[derive(Debug)]
pub struct RateLimitInterceptor {
max_requests: u32,
current: std::sync::atomic::AtomicU32,
}
impl RateLimitInterceptor {
#[must_use]
pub fn new(max_requests: u32) -> Self {
Self {
max_requests,
current: std::sync::atomic::AtomicU32::new(0),
}
}
fn try_acquire_slot(&self) -> bool {
let mut current = self.current.load(std::sync::atomic::Ordering::Relaxed);
loop {
if current >= self.max_requests {
return false;
}
match self.current.compare_exchange_weak(
current,
current + 1,
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Relaxed,
) {
Ok(_) => return true,
Err(observed) => current = observed,
}
}
}
fn release_slot(&self) {
let _ = self.current.fetch_update(
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Relaxed,
|current| current.checked_sub(1),
);
}
pub fn reset(&self) {
self.current.store(0, std::sync::atomic::Ordering::Relaxed);
}
#[must_use]
pub fn current_count(&self) -> u32 {
self.current.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Interceptor for RateLimitInterceptor {
fn intercept_request(&self, _request: &mut Request<Bytes>) -> Result<(), Status> {
if self.try_acquire_slot() {
Ok(())
} else {
Err(Status::resource_exhausted("rate limit exceeded"))
}
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
self.release_slot();
Ok(())
}
fn intercept_response_with_request(
&self,
_request: &Request<Bytes>,
_response: &mut Response<Bytes>,
) -> Result<(), Status> {
self.release_slot();
Ok(())
}
}
#[must_use]
pub fn rate_limiter(max_requests: u32) -> RateLimitInterceptor {
RateLimitInterceptor::new(max_requests)
}
#[derive(Debug, Clone, Default)]
pub struct LoggingInterceptor {
log_requests: bool,
log_responses: bool,
}
impl LoggingInterceptor {
#[must_use]
pub fn new() -> Self {
Self {
log_requests: true,
log_responses: true,
}
}
#[must_use]
pub fn log_requests(mut self, enabled: bool) -> Self {
self.log_requests = enabled;
self
}
#[must_use]
pub fn log_responses(mut self, enabled: bool) -> Self {
self.log_responses = enabled;
self
}
}
impl Interceptor for LoggingInterceptor {
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
if self.log_requests {
request.metadata_mut().insert("x-logged", "true");
}
Ok(())
}
fn intercept_response(&self, response: &mut Response<Bytes>) -> Result<(), Status> {
if self.log_responses {
response.metadata_mut().insert("x-logged", "true");
}
Ok(())
}
}
#[must_use]
pub fn logging_interceptor() -> LoggingInterceptor {
LoggingInterceptor::new()
}
#[derive(Debug, Clone)]
pub struct TimeoutInterceptor {
timeout_ms: u64,
}
impl TimeoutInterceptor {
#[must_use]
pub fn new(timeout_ms: u64) -> Self {
Self { timeout_ms }
}
}
impl Interceptor for TimeoutInterceptor {
fn intercept_request(&self, request: &mut Request<Bytes>) -> Result<(), Status> {
if request.metadata().get("grpc-timeout").is_none() {
request
.metadata_mut()
.insert("grpc-timeout", format!("{}m", self.timeout_ms));
}
Ok(())
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
Ok(())
}
}
#[must_use]
pub fn timeout_interceptor(timeout_ms: u64) -> TimeoutInterceptor {
TimeoutInterceptor::new(timeout_ms)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::grpc::Code;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[derive(Debug)]
struct ResponseOrderInterceptor {
name: &'static str,
calls: std::sync::Arc<std::sync::Mutex<Vec<&'static str>>>,
}
impl Interceptor for ResponseOrderInterceptor {
fn intercept_request(&self, _request: &mut Request<Bytes>) -> Result<(), Status> {
Ok(())
}
fn intercept_response(&self, _response: &mut Response<Bytes>) -> Result<(), Status> {
Ok(())
}
fn intercept_response_with_request(
&self,
_request: &Request<Bytes>,
_response: &mut Response<Bytes>,
) -> Result<(), Status> {
self.calls.lock().unwrap().push(self.name);
Ok(())
}
}
#[test]
fn interceptor_layer_empty() {
init_test("interceptor_layer_empty");
let layer = InterceptorLayer::new();
let empty = layer.is_empty();
crate::assert_with_log!(empty, "empty", true, empty);
let len = layer.len();
crate::assert_with_log!(len == 0, "len", 0, len);
crate::test_complete!("interceptor_layer_empty");
}
#[test]
fn interceptor_layer_chain() {
init_test("interceptor_layer_chain");
let layer = InterceptorLayer::new()
.layer(trace_interceptor())
.layer(logging_interceptor());
let empty = layer.is_empty();
crate::assert_with_log!(!empty, "not empty", false, empty);
let len = layer.len();
crate::assert_with_log!(len == 2, "len", 2, len);
crate::test_complete!("interceptor_layer_chain");
}
#[test]
fn interceptor_layer_request() {
init_test("interceptor_layer_request");
let layer = InterceptorLayer::new().layer(trace_interceptor());
let mut request = Request::new(Bytes::new());
layer.intercept_request(&mut request).unwrap();
let has_id = request.metadata().get("x-request-id").is_some();
crate::assert_with_log!(has_id, "request id", true, has_id);
crate::test_complete!("interceptor_layer_request");
}
#[test]
fn bearer_auth_interceptor() {
init_test("bearer_auth_interceptor");
let interceptor = auth_bearer_interceptor("my-token");
let mut request = Request::new(Bytes::new());
interceptor.intercept_request(&mut request).unwrap();
let auth = request.metadata().get("authorization").unwrap();
let ok = matches!(auth, MetadataValue::Ascii(s) if s == "Bearer my-token");
crate::assert_with_log!(ok, "auth header", true, ok);
crate::test_complete!("bearer_auth_interceptor");
}
#[test]
fn bearer_auth_validator_success() {
init_test("bearer_auth_validator_success");
let interceptor = auth_validator(|token| token == "valid-token");
let mut request = Request::new(Bytes::new());
request
.metadata_mut()
.insert("authorization", "Bearer valid-token");
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "intercept ok", true, ok);
crate::test_complete!("bearer_auth_validator_success");
}
#[test]
fn bearer_auth_validator_invalid() {
init_test("bearer_auth_validator_invalid");
let interceptor = auth_validator(|token| token == "valid-token");
let mut request = Request::new(Bytes::new());
request
.metadata_mut()
.insert("authorization", "Bearer invalid-token");
let err = interceptor.intercept_request(&mut request).unwrap_err();
let code = err.code();
crate::assert_with_log!(
code == Code::Unauthenticated,
"code",
Code::Unauthenticated,
code
);
crate::test_complete!("bearer_auth_validator_invalid");
}
#[test]
fn bearer_auth_validator_missing() {
init_test("bearer_auth_validator_missing");
let interceptor = auth_validator(|_| true);
let mut request = Request::new(Bytes::new());
let err = interceptor.intercept_request(&mut request).unwrap_err();
let code = err.code();
crate::assert_with_log!(
code == Code::Unauthenticated,
"code",
Code::Unauthenticated,
code
);
crate::test_complete!("bearer_auth_validator_missing");
}
#[test]
fn bearer_auth_validator_accepts_case_insensitive_scheme() {
init_test("bearer_auth_validator_accepts_case_insensitive_scheme");
let interceptor = auth_validator(|token| token == "valid-token");
let mut request = Request::new(Bytes::new());
request
.metadata_mut()
.insert("authorization", "bEaReR valid-token");
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "intercept ok", true, ok);
crate::test_complete!("bearer_auth_validator_accepts_case_insensitive_scheme");
}
#[test]
fn bearer_auth_validator_rejects_empty_token() {
init_test("bearer_auth_validator_rejects_empty_token");
let interceptor = auth_validator(|_| true);
let mut request = Request::new(Bytes::new());
request.metadata_mut().insert("authorization", "Bearer ");
let err = interceptor.intercept_request(&mut request).unwrap_err();
let code = err.code();
crate::assert_with_log!(
code == Code::Unauthenticated,
"code",
Code::Unauthenticated,
code
);
crate::test_complete!("bearer_auth_validator_rejects_empty_token");
}
#[test]
fn metadata_propagator_marks_keys() {
init_test("metadata_propagator_marks_keys");
let interceptor = metadata_propagator(["x-request-id", "x-trace-id"]);
let mut request = Request::new(Bytes::new());
request.metadata_mut().insert("x-request-id", "req-123");
request.metadata_mut().insert("x-trace-id", "trace-456");
interceptor.intercept_request(&mut request).unwrap();
let has_keys = request.metadata().get("x-propagate-keys").is_some();
crate::assert_with_log!(has_keys, "propagate keys", true, has_keys);
crate::test_complete!("metadata_propagator_marks_keys");
}
#[test]
fn rate_limiter_allows_under_limit() {
init_test("rate_limiter_allows_under_limit");
let interceptor = rate_limiter(10);
for _ in 0..10 {
let mut request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "intercept ok", true, ok);
}
let count = interceptor.current_count();
crate::assert_with_log!(count == 10, "count", 10, count);
crate::test_complete!("rate_limiter_allows_under_limit");
}
#[test]
fn rate_limiter_rejects_over_limit() {
init_test("rate_limiter_rejects_over_limit");
let interceptor = rate_limiter(2);
let mut request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "first ok", true, ok);
let mut request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "second ok", true, ok);
let mut request = Request::new(Bytes::new());
let err = interceptor.intercept_request(&mut request).unwrap_err();
let code = err.code();
crate::assert_with_log!(
code == Code::ResourceExhausted,
"code",
Code::ResourceExhausted,
code
);
crate::test_complete!("rate_limiter_rejects_over_limit");
}
#[test]
fn rate_limiter_reset() {
init_test("rate_limiter_reset");
let interceptor = rate_limiter(1);
let mut request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "first ok", true, ok);
let mut request = Request::new(Bytes::new());
let err = interceptor.intercept_request(&mut request).is_err();
crate::assert_with_log!(err, "second err", true, err);
interceptor.reset();
let count = interceptor.current_count();
crate::assert_with_log!(count == 0, "count", 0, count);
let mut request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "after reset ok", true, ok);
crate::test_complete!("rate_limiter_reset");
}
#[test]
fn rate_limiter_releases_slot_on_response() {
init_test("rate_limiter_releases_slot_on_response");
let interceptor = rate_limiter(1);
let mut first_request = Request::new(Bytes::new());
interceptor.intercept_request(&mut first_request).unwrap();
let mut blocked_request = Request::new(Bytes::new());
let blocked = interceptor.intercept_request(&mut blocked_request).is_err();
crate::assert_with_log!(
blocked,
"second blocked while first inflight",
true,
blocked
);
let mut response = Response::new(Bytes::new());
interceptor
.intercept_response_with_request(&first_request, &mut response)
.unwrap();
let count = interceptor.current_count();
crate::assert_with_log!(count == 0, "count after release", 0, count);
let mut retry_request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut retry_request).is_ok();
crate::assert_with_log!(ok, "request after response ok", true, ok);
crate::test_complete!("rate_limiter_releases_slot_on_response");
}
#[test]
fn rate_limiter_response_without_request_does_not_underflow() {
init_test("rate_limiter_response_without_request_does_not_underflow");
let interceptor = rate_limiter(1);
let mut response = Response::new(Bytes::new());
interceptor.intercept_response(&mut response).unwrap();
let count = interceptor.current_count();
crate::assert_with_log!(count == 0, "count stays at zero", 0, count);
let mut request = Request::new(Bytes::new());
let ok = interceptor.intercept_request(&mut request).is_ok();
crate::assert_with_log!(ok, "request after stray response ok", true, ok);
crate::test_complete!("rate_limiter_response_without_request_does_not_underflow");
}
#[test]
fn timeout_interceptor_adds_header() {
init_test("timeout_interceptor_adds_header");
let interceptor = timeout_interceptor(5000);
let mut request = Request::new(Bytes::new());
interceptor.intercept_request(&mut request).unwrap();
let timeout = request.metadata().get("grpc-timeout").unwrap();
let ok = matches!(timeout, MetadataValue::Ascii(s) if s == "5000m");
crate::assert_with_log!(ok, "timeout header", true, ok);
crate::test_complete!("timeout_interceptor_adds_header");
}
#[test]
fn timeout_interceptor_preserves_existing() {
init_test("timeout_interceptor_preserves_existing");
let interceptor = timeout_interceptor(5000);
let mut request = Request::new(Bytes::new());
request.metadata_mut().insert("grpc-timeout", "1000m");
interceptor.intercept_request(&mut request).unwrap();
let timeout = request.metadata().get("grpc-timeout").unwrap();
let ok = matches!(timeout, MetadataValue::Ascii(s) if s == "1000m");
crate::assert_with_log!(ok, "timeout header", true, ok);
crate::test_complete!("timeout_interceptor_preserves_existing");
}
#[test]
fn fn_interceptor_custom() {
init_test("fn_interceptor_custom");
let interceptor = fn_interceptor(|request: &mut Request<Bytes>| {
request.metadata_mut().insert("x-custom", "value");
Ok(())
});
let mut request = Request::new(Bytes::new());
interceptor.intercept_request(&mut request).unwrap();
let value = request.metadata().get("x-custom").unwrap();
let ok = matches!(value, MetadataValue::Ascii(s) if s == "value");
crate::assert_with_log!(ok, "custom header", true, ok);
crate::test_complete!("fn_interceptor_custom");
}
#[test]
fn logging_interceptor_marks_request() {
init_test("logging_interceptor_marks_request");
let interceptor = logging_interceptor();
let mut request = Request::new(Bytes::new());
interceptor.intercept_request(&mut request).unwrap();
let logged = request.metadata().get("x-logged").is_some();
crate::assert_with_log!(logged, "logged header", true, logged);
crate::test_complete!("logging_interceptor_marks_request");
}
#[test]
fn logging_interceptor_marks_response() {
init_test("logging_interceptor_marks_response");
let interceptor = logging_interceptor();
let mut response = Response::new(Bytes::new());
interceptor.intercept_response(&mut response).unwrap();
let logged = response.metadata().get("x-logged").is_some();
crate::assert_with_log!(logged, "logged header", true, logged);
crate::test_complete!("logging_interceptor_marks_response");
}
#[test]
fn metadata_propagator_copies_selected_request_metadata_to_response() {
init_test("metadata_propagator_copies_selected_request_metadata_to_response");
let interceptor = metadata_propagator(["x-request-id", "x-trace-id"]);
let mut request = Request::new(Bytes::new());
request.metadata_mut().insert("x-request-id", "req-123");
request.metadata_mut().insert("x-trace-id", "trace-456");
request.metadata_mut().insert("x-unrelated", "skip-me");
let mut response = Response::new(Bytes::new());
interceptor
.intercept_response_with_request(&request, &mut response)
.unwrap();
let request_id = response.metadata().get("x-request-id");
let trace_id = response.metadata().get("x-trace-id");
let unrelated = response.metadata().get("x-unrelated");
let request_id_ok =
matches!(request_id, Some(MetadataValue::Ascii(value)) if value == "req-123");
let trace_id_ok =
matches!(trace_id, Some(MetadataValue::Ascii(value)) if value == "trace-456");
crate::assert_with_log!(request_id_ok, "request id propagated", true, request_id_ok);
crate::assert_with_log!(trace_id_ok, "trace id propagated", true, trace_id_ok);
crate::assert_with_log!(
unrelated.is_none(),
"unrelated absent",
true,
unrelated.is_none()
);
crate::test_complete!("metadata_propagator_copies_selected_request_metadata_to_response");
}
#[test]
fn metadata_propagator_response_only_hook_is_requestless_noop() {
init_test("metadata_propagator_response_only_hook_is_requestless_noop");
let interceptor = metadata_propagator(["x-request-id"]);
let mut response = Response::new(Bytes::new());
interceptor.intercept_response(&mut response).unwrap();
let propagated = response.metadata().get("x-request-id");
crate::assert_with_log!(
propagated.is_none(),
"response-only hook does not invent request metadata",
true,
propagated.is_none()
);
crate::test_complete!("metadata_propagator_response_only_hook_is_requestless_noop");
}
#[test]
fn interceptor_layer_request_aware_response_hook_preserves_composition() {
init_test("interceptor_layer_request_aware_response_hook_preserves_composition");
let layer = InterceptorLayer::new()
.layer(logging_interceptor())
.layer(metadata_propagator(["x-request-id"]));
let mut request = Request::new(Bytes::new());
request.metadata_mut().insert("x-request-id", "req-123");
let mut response = Response::new(Bytes::new());
layer
.intercept_response_with_request(&request, &mut response)
.unwrap();
let logged = response.metadata().get("x-logged");
let request_id = response.metadata().get("x-request-id");
let logged_ok = matches!(logged, Some(MetadataValue::Ascii(value)) if value == "true");
let request_id_ok =
matches!(request_id, Some(MetadataValue::Ascii(value)) if value == "req-123");
crate::assert_with_log!(logged_ok, "logging interceptor still runs", true, logged_ok);
crate::assert_with_log!(
request_id_ok,
"request-aware propagation copies metadata",
true,
request_id_ok
);
crate::test_complete!(
"interceptor_layer_request_aware_response_hook_preserves_composition"
);
}
#[test]
fn interceptor_layer_request_aware_response_hook_runs_in_reverse_order() {
init_test("interceptor_layer_request_aware_response_hook_runs_in_reverse_order");
let calls = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let layer = InterceptorLayer::new()
.layer(ResponseOrderInterceptor {
name: "outer",
calls: std::sync::Arc::clone(&calls),
})
.layer(ResponseOrderInterceptor {
name: "inner",
calls: std::sync::Arc::clone(&calls),
});
let request = Request::new(Bytes::new());
let mut response = Response::new(Bytes::new());
layer
.intercept_response_with_request(&request, &mut response)
.unwrap();
let calls = calls.lock().unwrap().clone();
crate::assert_with_log!(
calls == vec!["inner", "outer"],
"request-aware response order",
vec!["inner", "outer"],
calls
);
crate::test_complete!(
"interceptor_layer_request_aware_response_hook_runs_in_reverse_order"
);
}
#[test]
fn tracing_interceptor_generates_request_id() {
init_test("tracing_interceptor_generates_request_id");
let interceptor = trace_interceptor();
let mut request = Request::new(Bytes::new());
interceptor.intercept_request(&mut request).unwrap();
let id = request.metadata().get("x-request-id").unwrap();
let ok = matches!(id, MetadataValue::Ascii(s) if s.starts_with("req-"));
crate::assert_with_log!(ok, "request id", true, ok);
crate::test_complete!("tracing_interceptor_generates_request_id");
}
#[test]
fn tracing_interceptor_uses_deterministic_sequential_ids() {
init_test("tracing_interceptor_uses_deterministic_sequential_ids");
let interceptor = trace_interceptor();
let cloned = interceptor.clone();
let mut first = Request::new(Bytes::new());
interceptor.intercept_request(&mut first).unwrap();
let first_id = first.metadata().get("x-request-id").unwrap();
let mut second = Request::new(Bytes::new());
cloned.intercept_request(&mut second).unwrap();
let second_id = second.metadata().get("x-request-id").unwrap();
let ok = matches!(
(first_id, second_id),
(MetadataValue::Ascii(first), MetadataValue::Ascii(second))
if first == "req-0000000000000001" && second == "req-0000000000000002"
);
crate::assert_with_log!(ok, "sequential request ids", true, ok);
crate::test_complete!("tracing_interceptor_uses_deterministic_sequential_ids");
}
#[test]
fn tracing_interceptor_preserves_existing_request_id() {
init_test("tracing_interceptor_preserves_existing_request_id");
let interceptor = trace_interceptor();
let mut request = Request::new(Bytes::new());
request
.metadata_mut()
.insert("x-request-id", "req-custom".to_string());
interceptor.intercept_request(&mut request).unwrap();
let ok = matches!(
request.metadata().get("x-request-id"),
Some(MetadataValue::Ascii(id)) if id == "req-custom"
);
crate::assert_with_log!(ok, "preserved request id", true, ok);
crate::test_complete!("tracing_interceptor_preserves_existing_request_id");
}
}