tower 0.4.3

Tower is a library of modular and reusable components for building robust clients and servers.
Documentation
#[path = "../support.rs"]
mod support;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
use tower::limit::concurrency::ConcurrencyLimitLayer;
use tower_test::{assert_request_eq, mock};

#[tokio::test(flavor = "current_thread")]
async fn basic_service_limit_functionality_with_poll_ready() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(2);
    let (mut service, mut handle) = mock::spawn_layer(limit);

    assert_ready_ok!(service.poll_ready());
    let r1 = service.call("hello 1");

    assert_ready_ok!(service.poll_ready());
    let r2 = service.call("hello 2");

    assert_pending!(service.poll_ready());

    assert!(!service.is_woken());

    // The request gets passed through
    assert_request_eq!(handle, "hello 1").send_response("world 1");

    // The next request gets passed through
    assert_request_eq!(handle, "hello 2").send_response("world 2");

    // There are no more requests
    assert_pending!(handle.poll_request());

    assert_eq!(r1.await.unwrap(), "world 1");

    assert!(service.is_woken());

    // Another request can be sent
    assert_ready_ok!(service.poll_ready());

    let r3 = service.call("hello 3");

    assert_pending!(service.poll_ready());

    assert_eq!(r2.await.unwrap(), "world 2");

    // The request gets passed through
    assert_request_eq!(handle, "hello 3").send_response("world 3");

    assert_eq!(r3.await.unwrap(), "world 3");
}

#[tokio::test(flavor = "current_thread")]
async fn basic_service_limit_functionality_without_poll_ready() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(2);
    let (mut service, mut handle) = mock::spawn_layer(limit);

    assert_ready_ok!(service.poll_ready());
    let r1 = service.call("hello 1");

    assert_ready_ok!(service.poll_ready());
    let r2 = service.call("hello 2");

    assert_pending!(service.poll_ready());

    // The request gets passed through
    assert_request_eq!(handle, "hello 1").send_response("world 1");

    assert!(!service.is_woken());

    // The next request gets passed through
    assert_request_eq!(handle, "hello 2").send_response("world 2");

    assert!(!service.is_woken());

    // There are no more requests
    assert_pending!(handle.poll_request());

    assert_eq!(r1.await.unwrap(), "world 1");

    assert!(service.is_woken());

    // One more request can be sent
    assert_ready_ok!(service.poll_ready());
    let r4 = service.call("hello 4");

    assert_pending!(service.poll_ready());

    assert_eq!(r2.await.unwrap(), "world 2");
    assert!(service.is_woken());

    // The request gets passed through
    assert_request_eq!(handle, "hello 4").send_response("world 4");

    assert_eq!(r4.await.unwrap(), "world 4");
}

#[tokio::test(flavor = "current_thread")]
async fn request_without_capacity() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(0);
    let (mut service, _) = mock::spawn_layer::<(), (), _>(limit);

    assert_pending!(service.poll_ready());
}

#[tokio::test(flavor = "current_thread")]
async fn reserve_capacity_without_sending_request() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(1);
    let (mut s1, mut handle) = mock::spawn_layer(limit);

    let mut s2 = s1.clone();

    // Reserve capacity in s1
    assert_ready_ok!(s1.poll_ready());

    // Service 2 cannot get capacity
    assert_pending!(s2.poll_ready());

    // s1 sends the request, then s2 is able to get capacity
    let r1 = s1.call("hello");

    assert_request_eq!(handle, "hello").send_response("world");

    assert_pending!(s2.poll_ready());

    r1.await.unwrap();

    assert_ready_ok!(s2.poll_ready());
}

#[tokio::test(flavor = "current_thread")]
async fn service_drop_frees_capacity() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(1);
    let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);

    let mut s2 = s1.clone();

    // Reserve capacity in s1
    assert_ready_ok!(s1.poll_ready());

    // Service 2 cannot get capacity
    assert_pending!(s2.poll_ready());

    drop(s1);

    assert!(s2.is_woken());
    assert_ready_ok!(s2.poll_ready());
}

#[tokio::test(flavor = "current_thread")]
async fn response_error_releases_capacity() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(1);
    let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit);

    let mut s2 = s1.clone();

    // Reserve capacity in s1
    assert_ready_ok!(s1.poll_ready());

    // s1 sends the request, then s2 is able to get capacity
    let r1 = s1.call("hello");

    assert_request_eq!(handle, "hello").send_error("boom");

    r1.await.unwrap_err();

    assert_ready_ok!(s2.poll_ready());
}

#[tokio::test(flavor = "current_thread")]
async fn response_future_drop_releases_capacity() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(1);
    let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit);

    let mut s2 = s1.clone();

    // Reserve capacity in s1
    assert_ready_ok!(s1.poll_ready());

    // s1 sends the request, then s2 is able to get capacity
    let r1 = s1.call("hello");

    assert_pending!(s2.poll_ready());

    drop(r1);

    assert_ready_ok!(s2.poll_ready());
}

#[tokio::test(flavor = "current_thread")]
async fn multi_waiters() {
    let _t = support::trace_init();
    let limit = ConcurrencyLimitLayer::new(1);
    let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
    let mut s2 = s1.clone();
    let mut s3 = s1.clone();

    // Reserve capacity in s1
    assert_ready_ok!(s1.poll_ready());

    // s2 and s3 are not ready
    assert_pending!(s2.poll_ready());
    assert_pending!(s3.poll_ready());

    drop(s1);

    assert!(s2.is_woken());
    assert!(!s3.is_woken());

    drop(s2);

    assert!(s3.is_woken());
}