#![cfg(feature = "tower-integration")]
use redis_cloud::CloudClient;
use redis_cloud::tower_support::ApiRequest;
use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tower::buffer::BufferLayer;
use tower::limit::RateLimitLayer;
use tower::timeout::TimeoutLayer;
use tower::{Service, ServiceBuilder, ServiceExt};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn test_tower_with_timeout_middleware() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/subscriptions"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({"subscriptions": []}))
.set_delay(Duration::from_millis(100)),
)
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.service(client.into_service());
let response = service
.oneshot(ApiRequest::get("/subscriptions"))
.await
.expect("Request should succeed with sufficient timeout");
assert_eq!(response.status, 200);
}
#[tokio::test]
async fn test_tower_timeout_expires() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/subscriptions"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({"subscriptions": []}))
.set_delay(Duration::from_millis(500)),
)
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_millis(100)))
.service(client.into_service());
let result = service.oneshot(ApiRequest::get("/subscriptions")).await;
assert!(result.is_err(), "Request should timeout");
}
#[tokio::test]
async fn test_tower_with_rate_limiting() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/account"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"id": 1})))
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let mut service = ServiceBuilder::new()
.layer(RateLimitLayer::new(2, Duration::from_millis(500)))
.service(client.into_service());
let start = std::time::Instant::now();
service
.ready()
.await
.expect("Service should be ready")
.call(ApiRequest::get("/account"))
.await
.expect("First request should succeed");
service
.ready()
.await
.expect("Service should be ready")
.call(ApiRequest::get("/account"))
.await
.expect("Second request should succeed");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"First two requests should not be rate limited"
);
let start = std::time::Instant::now();
service
.ready()
.await
.expect("Service should be ready")
.call(ApiRequest::get("/account"))
.await
.expect("Third request should succeed after delay");
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(300),
"Third request should be rate limited"
);
}
#[tokio::test]
async fn test_tower_with_buffer_layer() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/subscriptions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"subscriptions": []})))
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let service = ServiceBuilder::new()
.layer(BufferLayer::new(10))
.service(client.into_service());
let mut handles = vec![];
for _ in 0..5 {
let mut svc = service.clone();
let handle = tokio::spawn(async move {
svc.ready()
.await
.expect("Service should be ready")
.call(ApiRequest::get("/subscriptions"))
.await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.expect("Task should not panic");
assert!(result.is_ok(), "Buffered request should succeed");
}
}
#[tokio::test]
async fn test_tower_middleware_composition() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/account"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({"id": 123}))
.set_delay(Duration::from_millis(50)),
)
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(5)))
.layer(RateLimitLayer::new(10, Duration::from_secs(1)))
.layer(BufferLayer::new(100))
.service(client.into_service());
let response = service
.oneshot(ApiRequest::get("/account"))
.await
.expect("Request with composed middleware should succeed");
assert_eq!(response.status, 200);
assert_eq!(response.body["id"], 123);
}
#[tokio::test]
async fn test_tower_custom_middleware_request_counting() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
#[derive(Clone)]
struct RequestCounter<S> {
inner: S,
counter: Arc<AtomicU32>,
}
impl<S, Request> Service<Request> for RequestCounter<S>
where
S: Service<Request>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
self.counter.fetch_add(1, Ordering::SeqCst);
let fut = self.inner.call(request);
Box::pin(fut)
}
}
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/subscriptions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"subscriptions": []})))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/account"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"id": 1})))
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let counter = Arc::new(AtomicU32::new(0));
let mut service = RequestCounter {
inner: client.into_service(),
counter: counter.clone(),
};
service
.ready()
.await
.expect("Service ready")
.call(ApiRequest::get("/subscriptions"))
.await
.expect("Request 1 failed");
service
.ready()
.await
.expect("Service ready")
.call(ApiRequest::get("/account"))
.await
.expect("Request 2 failed");
service
.ready()
.await
.expect("Service ready")
.call(ApiRequest::get("/subscriptions"))
.await
.expect("Request 3 failed");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_tower_error_handling_through_middleware() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/subscriptions/999"))
.respond_with(ResponseTemplate::new(404).set_body_json(json!({
"error": {
"type": "SUBSCRIPTION_NOT_FOUND",
"status": "404 NOT_FOUND"
}
})))
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(5)))
.layer(BufferLayer::new(10))
.service(client.into_service());
let result = service.oneshot(ApiRequest::get("/subscriptions/999")).await;
assert!(
result.is_err(),
"404 error should propagate through middleware"
);
}
#[tokio::test]
async fn test_tower_with_concurrent_requests_through_buffer() {
let mock_server = MockServer::start().await;
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
Mock::given(method("GET"))
.and(path("/account"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(200).set_body_json(json!({"id": 1}))
})
.mount(&mock_server)
.await;
let client = CloudClient::builder()
.api_key("test-key")
.api_secret("test-secret")
.base_url(mock_server.uri())
.build()
.expect("Failed to create client");
let service = ServiceBuilder::new()
.layer(BufferLayer::new(50))
.service(client.into_service());
let mut handles = vec![];
for _ in 0..10 {
let mut svc = service.clone();
let handle = tokio::spawn(async move {
svc.ready()
.await
.expect("Service ready")
.call(ApiRequest::get("/account"))
.await
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task panic").expect("Request failed");
}
assert_eq!(counter.load(Ordering::SeqCst), 10);
}