wireframe 0.3.0

Simplify building servers and clients for custom binary protocols.
Documentation
//! Unit tests for streaming response helper adapters.

use std::io;

use futures::StreamExt;
use rstest::rstest;

use super::{
    streaming_helpers_support::{build_request, collect_typed_items},
    streaming_infra::{
        CorrelationId,
        MessageId,
        Payload,
        correlation_id,
        create_test_client,
        setup_streaming_test,
        spawn_malformed_server,
        spawn_mismatch_server,
        spawn_test_server,
    },
};
use crate::{
    client::{ClientError, StreamingResponseExt},
    correlation::CorrelatableFrame,
};

#[rstest]
#[tokio::test]
async fn typed_response_stream_yields_mapped_items_in_order(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![10]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![20]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(3),
            cid,
            Payload::new(vec![30]),
        ),
        super::streaming_infra::TestStreamEnvelope::terminator(cid),
    ];
    let items = collect_typed_items(cid, frames, |frame| Ok(Some(frame.payload))).await?;

    assert_eq!(items, vec![vec![10], vec![20], vec![30]]);
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_skips_control_frames(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![1]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![200]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(3),
            cid,
            Payload::new(vec![2]),
        ),
        super::streaming_infra::TestStreamEnvelope::terminator(cid),
    ];
    let items = collect_typed_items(cid, frames, |frame| {
        if frame.payload == vec![200] {
            Ok(None)
        } else {
            Ok(Some(frame.payload))
        }
    })
    .await?;

    assert_eq!(items, vec![vec![1], vec![2]]);
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_all_control_frames_yields_no_items(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![10]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![20]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(3),
            cid,
            Payload::new(vec![30]),
        ),
        super::streaming_infra::TestStreamEnvelope::terminator(cid),
    ];
    let items = collect_typed_items(cid, frames, |_frame| Ok(None)).await?;

    assert!(
        items.is_empty(),
        "non-empty protocol stream with all-control mapper should yield empty typed stream"
    );
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_surfaces_mapper_errors(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![1]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![2]),
        ),
        super::streaming_infra::TestStreamEnvelope::terminator(cid),
    ];
    let (mut client, _server) = setup_streaming_test(frames).await?;

    let mut stream = client
        .call_streaming(build_request(cid))
        .await?
        .typed_with(|frame| {
            if frame.payload == vec![2] {
                Err(ClientError::from(io::Error::new(
                    io::ErrorKind::InvalidData,
                    "mapper rejected frame",
                )))
            } else {
                Ok(Some(frame.payload))
            }
        });

    assert_eq!(stream.next().await.transpose()?, Some(vec![1]));

    let error = stream
        .next()
        .await
        .expect("mapper error should be yielded")
        .expect_err("second item should be a mapper error");
    assert!(
        matches!(error, ClientError::Wireframe(_)),
        "expected mapper error to surface as the supplied ClientError"
    );

    assert!(stream.next().await.is_none());
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_surfaces_mapper_errors_after_skipped_frames(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![200]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![1]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(3),
            cid,
            Payload::new(vec![2]),
        ),
        super::streaming_infra::TestStreamEnvelope::terminator(cid),
    ];
    let (mut client, _server) = setup_streaming_test(frames).await?;

    let mut stream = client
        .call_streaming(build_request(cid))
        .await?
        .typed_with(|frame| {
            if frame.payload == vec![200] {
                Ok(None)
            } else if frame.payload == vec![2] {
                Err(ClientError::from(io::Error::new(
                    io::ErrorKind::InvalidData,
                    "mapper rejected frame after skip",
                )))
            } else {
                Ok(Some(frame.payload))
            }
        });

    assert_eq!(
        stream.next().await.transpose()?,
        Some(vec![1]),
        "first data frame after skipped control frame should be yielded"
    );

    let error = stream
        .next()
        .await
        .expect("mapper error should be yielded")
        .expect_err("second item should be a mapper error");
    assert!(
        matches!(error, ClientError::Wireframe(_)),
        "expected mapper error to surface as the supplied ClientError"
    );

    assert!(stream.next().await.is_none());
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_propagates_correlation_mismatch(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let wrong_cid = CorrelationId::new(correlation_id.get() + 999);
    let server = spawn_mismatch_server(wrong_cid).await?;
    let mut client = create_test_client(server.addr).await?;

    let mut request = build_request(correlation_id);
    request.set_correlation_id(Some(correlation_id.get()));

    let error = client
        .call_streaming(request)
        .await?
        .typed_with(|frame| Ok(Some(frame.payload)))
        .next()
        .await
        .expect("stream error should be yielded")
        .expect_err("expected mismatch error");

    match error {
        ClientError::StreamCorrelationMismatch { expected, received } => {
            assert_eq!(expected, Some(correlation_id.get()));
            assert_eq!(received, Some(wrong_cid.get()));
        }
        other => panic!("expected StreamCorrelationMismatch, got {other:?}"),
    }
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_propagates_disconnects(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cid = correlation_id;
    let frames = vec![
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(1),
            cid,
            Payload::new(vec![10]),
        ),
        super::streaming_infra::TestStreamEnvelope::data(
            MessageId::new(2),
            cid,
            Payload::new(vec![20]),
        ),
    ];
    let server = spawn_test_server(frames, true).await?;
    let mut client = create_test_client(server.addr).await?;

    let mut stream = client
        .call_streaming(build_request(cid))
        .await?
        .typed_with(|frame| Ok(Some(frame.payload)));

    assert_eq!(stream.next().await.transpose()?, Some(vec![10]));
    assert_eq!(stream.next().await.transpose()?, Some(vec![20]));
    assert!(
        matches!(stream.next().await, Some(Err(ClientError::Wireframe(_)))),
        "disconnect should propagate unchanged"
    );
    Ok(())
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_propagates_decode_failures(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let server = spawn_malformed_server().await?;
    let mut client = create_test_client(server.addr).await?;

    let error = client
        .call_streaming(build_request(correlation_id))
        .await?
        .typed_with(|frame| Ok(Some(frame.payload)))
        .next()
        .await
        .expect("decode failure should be yielded")
        .expect_err("expected decode failure");

    assert!(
        matches!(error, ClientError::Wireframe(_)),
        "decode failure should propagate unchanged"
    );
    Ok(())
}

#[tokio::test]
async fn typed_response_stream_fuses_after_mapper_error() {
    let inner = futures::stream::iter(vec![Ok(1u8), Ok(2u8)]);
    let mut stream = inner.typed_with(|_frame| -> Result<Option<u8>, ClientError> {
        Err(ClientError::from(io::Error::new(
            io::ErrorKind::InvalidData,
            "mapper always fails",
        )))
    });

    let first = stream.next().await;
    assert!(
        matches!(first, Some(Err(ClientError::Wireframe(_)))),
        "first poll should yield the mapper error"
    );

    let second = stream.next().await;
    assert!(
        second.is_none(),
        "stream must be fused: polling after a mapper error should return None"
    );

    let third = stream.next().await;
    assert!(
        third.is_none(),
        "stream must remain fused on subsequent polls"
    );
}

#[rstest]
#[tokio::test]
async fn typed_response_stream_preserves_empty_streams(
    correlation_id: CorrelationId,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let frames = vec![super::streaming_infra::TestStreamEnvelope::terminator(
        correlation_id,
    )];
    let (mut client, _server) = setup_streaming_test(frames).await?;

    let next = client
        .call_streaming(build_request(correlation_id))
        .await?
        .typed_with(|frame| Ok(Some(frame.payload)))
        .next()
        .await;

    assert!(next.is_none(), "empty streams should remain empty");
    Ok(())
}