wireframe 0.3.0

Simplify building servers and clients for custom binary protocols.
Documentation
//! Step definitions for client streaming response behavioural tests.
//!
//! Steps run async world methods on the shared runtime stored in
//! `ClientStreamingWorld` rather than constructing a per-step runtime.

use std::{future::Future, pin::Pin};

use rstest_bdd_macros::{given, then, when};

use crate::fixtures::client_streaming::{ClientStreamingWorld, TestResult};

/// Helper to abort existing server, restart with new configuration, and
/// reconnect client.
///
/// The `server_starter` closure receives the world and returns a pinned
/// future so the borrow can outlive the closure boundary.
fn with_server_restart(
    world: &mut ClientStreamingWorld,
    server_starter: impl for<'a> FnOnce(
        &'a mut ClientStreamingWorld,
    ) -> Pin<Box<dyn Future<Output = TestResult> + 'a>>,
) -> TestResult {
    world.abort_server();
    world.block_on(|w| server_starter(w))??;
    world.block_on(|w| Box::pin(w.connect_client()))?
}

#[given("a streaming echo server")]
fn given_streaming_server(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.block_on(|w| {
        Box::pin(async {
            // Default server sends frames based on the scenario's When step.
            // Start with a normal 3-frame server as default; specific scenarios
            // override via their own Given steps.
            w.start_normal_server(3).await?;
            w.connect_client().await
        })
    })?
}

#[given("a streaming server that returns mismatched correlation IDs")]
fn given_mismatch_server(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    with_server_restart(client_streaming_world, |world| {
        Box::pin(world.start_mismatch_server())
    })
}

#[given("a streaming server that disconnects after {count:usize} frames")]
fn given_disconnect_server(
    client_streaming_world: &mut ClientStreamingWorld,
    count: usize,
) -> TestResult {
    with_server_restart(client_streaming_world, |world| {
        Box::pin(world.start_disconnect_server(count))
    })
}

#[given("a streaming server that emits interleaved high- and low-priority pushes")]
fn given_interleaved_priority_server(
    client_streaming_world: &mut ClientStreamingWorld,
) -> TestResult {
    with_server_restart(client_streaming_world, |world| {
        Box::pin(world.start_interleaved_priority_server())
    })
}

#[given("a streaming server with shared cross-priority push rate limiting")]
fn given_shared_rate_limit_server(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    with_server_restart(client_streaming_world, |world| {
        Box::pin(world.start_shared_rate_limit_server())
    })
}

#[given("a streaming server that interleaves control and data frames")]
fn given_control_interleaved_server(
    client_streaming_world: &mut ClientStreamingWorld,
) -> TestResult {
    with_server_restart(client_streaming_world, |world| {
        Box::pin(world.start_control_interleaved_server())
    })
}

#[when("the client sends a streaming request with {count:usize} data frames")]
fn when_streaming_request_with_count(
    client_streaming_world: &mut ClientStreamingWorld,
    count: usize,
) -> TestResult {
    with_server_restart(client_streaming_world, |world| {
        Box::pin(world.start_normal_server(count))
    })?;
    client_streaming_world.block_on(|w| Box::pin(w.send_streaming_request()))?
}

#[when("the client sends a streaming request")]
fn when_streaming_request(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.block_on(|w| Box::pin(w.send_streaming_request()))?
}

#[when("the client consumes the stream through the typed helper")]
fn when_typed_streaming_request(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.block_on(|w| Box::pin(w.send_typed_streaming_request()))?
}

#[then("all {count:usize} data frames are received in order")]
fn then_frames_received_in_order(
    client_streaming_world: &mut ClientStreamingWorld,
    count: usize,
) -> TestResult {
    client_streaming_world.verify_frame_count(count)?;
    client_streaming_world.verify_frame_order()
}

#[then("no data frames are received")]
fn then_no_frames(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.verify_frame_count(0)
}

#[then("the stream terminates cleanly")]
fn then_clean_termination(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.verify_clean_termination()
}

#[then("a StreamCorrelationMismatch error is returned")]
fn then_correlation_mismatch(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.verify_correlation_mismatch_error()
}

#[then("{count:usize} data frames are received")]
fn then_n_frames_received(
    client_streaming_world: &mut ClientStreamingWorld,
    count: usize,
) -> TestResult {
    client_streaming_world.verify_frame_count(count)
}

#[then("a disconnection error is returned")]
fn then_disconnect_error(client_streaming_world: &mut ClientStreamingWorld) -> TestResult {
    client_streaming_world.verify_disconnect_error()
}

#[then("interleaved priority frames are received without low-priority starvation")]
fn then_interleaved_priority_fairness(
    client_streaming_world: &mut ClientStreamingWorld,
) -> TestResult {
    client_streaming_world.verify_interleaved_priority_order()
}

#[then("the shared limiter blocks cross-priority bursts before refill")]
fn then_shared_rate_limit_symmetry(
    client_streaming_world: &mut ClientStreamingWorld,
) -> TestResult {
    client_streaming_world.verify_shared_rate_limit_symmetry()
}

#[then("typed items are received in order as {expected}")]
fn then_typed_items_received(
    client_streaming_world: &mut ClientStreamingWorld,
    expected: String,
) -> TestResult {
    let expected_items = expected
        .split(',')
        .filter(|value| !value.is_empty())
        .map(|value| {
            value
                .trim()
                .parse::<u8>()
                .map_err(|e| format!("invalid typed item {value:?}: {e}"))
        })
        .collect::<Result<Vec<_>, _>>()?;
    client_streaming_world.verify_typed_item_order(&expected_items)
}