wireframe 0.3.0

Simplify building servers and clients for custom binary protocols.
Documentation
//! `ClientPoolHandleWorld` fixture for `PoolHandle` behavioural scenarios.

use std::{
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::Duration,
};

use rstest::fixture;
use tokio::{sync::Mutex, time::timeout};
use wireframe::{
    client::{ClientPoolConfig, PoolFairnessPolicy},
    test_helpers::{
        Ping,
        Pong,
        PoolTestServer,
        TestClientPool,
        acquire_and_record,
        build_pooled_client,
    },
};
pub use wireframe_testing::TestResult;

pub struct ClientPoolHandleWorld {
    pool: Option<TestClientPool>,
    server: Option<PoolTestServer>,
    preamble_callback_count: Arc<AtomicUsize>,
    grant_order: Vec<String>,
    blocked_waiter: bool,
    waiter_recovered: bool,
    warm_reuse_then_recycle: bool,
}

impl Default for ClientPoolHandleWorld {
    fn default() -> Self {
        Self {
            pool: None,
            server: None,
            preamble_callback_count: Arc::new(AtomicUsize::new(0)),
            grant_order: Vec::new(),
            blocked_waiter: false,
            waiter_recovered: false,
            warm_reuse_then_recycle: false,
        }
    }
}

#[rustfmt::skip]
#[fixture]
pub fn client_pool_handle_world() -> ClientPoolHandleWorld {
    ClientPoolHandleWorld::default()
}

impl ClientPoolHandleWorld {
    async fn connect_handle_pool(&mut self, fairness_policy: PoolFairnessPolicy) -> TestResult {
        self.start_server().await?;
        self.connect_pool(
            ClientPoolConfig::default()
                .pool_size(1)
                .fairness_policy(fairness_policy),
        )
        .await
    }

    async fn record_grants(&mut self, grants: Arc<Mutex<Vec<&'static str>>>) -> TestResult {
        self.grant_order = grants
            .lock()
            .await
            .iter()
            .map(ToString::to_string)
            .collect();
        Ok(())
    }

    async fn start_server(&mut self) -> TestResult {
        self.server = Some(PoolTestServer::start().await?);
        Ok(())
    }

    async fn connect_pool(&mut self, config: ClientPoolConfig) -> TestResult {
        let addr = self.server.as_ref().ok_or("server missing")?.addr;
        let pool = build_pooled_client(addr, config, self.preamble_callback_count.clone()).await?;
        self.pool = Some(pool);
        Ok(())
    }

    fn pool_preamble_count_is(&self, expected: usize) -> bool {
        self.preamble_callback_count.load(Ordering::SeqCst) == expected
    }

    fn server_state_matches(&self, preamble_count: usize, connection_count: usize) -> bool {
        self.server.as_ref().is_some_and(|s| {
            s.preamble_count() == preamble_count && s.connection_count() == connection_count
        })
    }

    pub async fn run_round_robin_scenario(&mut self) -> TestResult {
        self.connect_handle_pool(PoolFairnessPolicy::RoundRobin)
            .await?;
        let pool = self.pool.as_ref().ok_or("pool missing")?;
        let grants = Arc::new(Mutex::new(Vec::new()));

        let left = tokio::spawn(acquire_and_record(
            pool.handle(),
            "session-a",
            3,
            Arc::clone(&grants),
        ));
        let right = tokio::spawn(acquire_and_record(
            pool.handle(),
            "session-b",
            3,
            Arc::clone(&grants),
        ));
        left.await??;
        right.await??;
        self.record_grants(grants).await
    }

    pub async fn run_fifo_scenario(&mut self) -> TestResult {
        self.connect_handle_pool(PoolFairnessPolicy::Fifo).await?;
        let pool = self.pool.as_ref().ok_or("pool missing")?;
        let blocker = pool.acquire().await?;
        let grants = Arc::new(Mutex::new(Vec::new()));
        let (first, second, third) = spawn_fifo_waiters(pool, Arc::clone(&grants)).await;

        drop(blocker);
        first.await??;
        second.await??;
        third.await??;
        self.record_grants(grants).await
    }

    pub async fn run_back_pressure_scenario(&mut self) -> TestResult {
        self.start_server().await?;
        self.connect_pool(ClientPoolConfig::default().pool_size(1))
            .await?;
        let pool = self.pool.as_ref().ok_or("pool missing")?;
        let mut first = pool.handle();
        let mut second = pool.handle();

        let held_lease = first.acquire().await?;
        self.blocked_waiter = timeout(Duration::from_millis(25), second.acquire())
            .await
            .is_err();

        drop(held_lease);
        self.waiter_recovered = matches!(
            timeout(Duration::from_millis(100), second.acquire()).await,
            Ok(Ok(_))
        );
        Ok(())
    }

    pub async fn run_warm_reuse_then_idle_recycle_scenario(&mut self) -> TestResult {
        tokio::time::pause();
        self.start_server().await?;
        let idle_timeout = Duration::from_millis(50);
        self.connect_pool(
            ClientPoolConfig::default()
                .pool_size(1)
                .idle_timeout(idle_timeout),
        )
        .await?;
        let pool = self.pool.as_ref().ok_or("pool missing")?;
        let mut handle = pool.handle();

        let first: Pong = handle.call(&Ping(1)).await?;
        let second: Pong = handle.call(&Ping(2)).await?;
        if first != Pong(1) || second != Pong(2) {
            return Err("unexpected warm reuse response sequence".into());
        }
        let warm_reuse_preserved =
            self.pool_preamble_count_is(1) && self.server_state_matches(1, 1);

        tokio::time::advance(idle_timeout + idle_timeout).await;
        tokio::task::yield_now().await;

        let third: Pong = handle.call(&Ping(3)).await?;
        self.warm_reuse_then_recycle = third == Pong(3)
            && warm_reuse_preserved
            && self.pool_preamble_count_is(2)
            && self.server_state_matches(2, 2);
        Ok(())
    }

    pub fn sessions_alternate_fairly(&self) -> bool {
        if self.grant_order.len() != 6 {
            return false;
        }

        let first = self.grant_order.first();
        let second = self.grant_order.get(1);

        if first == second {
            return false;
        }

        for (i, grant) in self.grant_order.iter().enumerate() {
            let expected = if i & 1 == 0 { first } else { second };
            if Some(grant) != expected {
                return false;
            }
        }

        true
    }

    pub fn fifo_order_preserved(&self) -> bool { self.grant_order == ["first", "second", "third"] }

    pub fn back_pressure_preserved(&self) -> bool { self.blocked_waiter && self.waiter_recovered }

    pub fn warm_reuse_then_recycle_preserved(&self) -> bool { self.warm_reuse_then_recycle }
}

async fn spawn_fifo_waiters(
    pool: &TestClientPool,
    grants: Arc<Mutex<Vec<&'static str>>>,
) -> (
    tokio::task::JoinHandle<Result<(), wireframe::client::ClientError>>,
    tokio::task::JoinHandle<Result<(), wireframe::client::ClientError>>,
    tokio::task::JoinHandle<Result<(), wireframe::client::ClientError>>,
) {
    let first = tokio::spawn(acquire_and_record(
        pool.handle(),
        "first",
        1,
        Arc::clone(&grants),
    ));
    tokio::task::yield_now().await;
    let second = tokio::spawn(acquire_and_record(
        pool.handle(),
        "second",
        1,
        Arc::clone(&grants),
    ));
    tokio::task::yield_now().await;
    let third = tokio::spawn(acquire_and_record(pool.handle(), "third", 1, grants));
    tokio::task::yield_now().await;
    (first, second, third)
}