# `wireframe_testing`: testing helpers for Wireframe
`wireframe_testing` is the companion crate for exercising Wireframe
applications and codecs in unit, integration, and behavioural tests. It
provides in-memory drivers for `WireframeApp` instances, codec-aware framing
helpers, and a scoped observability harness for log and metrics assertions.
## Motivation
Wireframe now supports pluggable protocol codecs (ADR 004). The test harness
must encode and decode frames using the selected `FrameCodec`, preserving
protocol metadata and correlation identifiers. It must also allow malformed
wire bytes for negative codec tests. ADR 006 proposes a unified test
observability harness, so tests can assert on logs and metrics
deterministically without external exporters.
## Design goals
- Work with any `FrameCodec`, including codecs that carry metadata and
correlation identifiers.
- Preserve frame metadata when driving handlers or asserting responses.
- Allow raw byte injection for malformed frame and recovery tests.
- Provide per-test log and metrics capture without leaking global state.
- Keep helpers fast by using in-memory duplex streams instead of sockets.
## Crate layout
- `src/lib.rs` re-exports the public API.
- `src/helpers.rs` provides in-memory drivers and codec-aware encode/decode
helpers.
- `src/integration_helpers.rs` exposes shared helpers for integration tests.
- `src/reassembly/` contains deterministic assertion helpers for fragment and
message-assembly outcomes.
- `src/observability/` implements the observability harness from ADR 006,
split into `mod.rs`, `labels.rs`, and `assertions.rs`.
- `src/logging.rs` retains the standalone `LoggerHandle` fixture.
- `src/fixtures/` (or `src/codec_fixtures.rs`) stores reusable frame fixtures
for the default codec and example codecs.
- `src/multi_packet.rs` keeps the `collect_multi_packet` helper.
- `src/client_pair.rs` provides the in-process server/client pair harness for
loopback integration tests (roadmap `12.3.2`).
## Dependencies
```toml
[dependencies]
tokio = { version = "1", features = ["macros", "rt", "io-util"] }
wireframe = { version = "0.1.0", path = ".." }
bincode = "2.0"
bytes = "1.0"
futures = "0.3"
tokio-util = { version = "0.7", features = ["codec"] }
log = "0.4"
logtest = "2"
metrics = "0.24.3"
metrics-util = "0.20.0"
rstest = "0.18.2"
```
## Codec-aware drivers
The helpers remain centred on a single in-memory driver that runs
`WireframeApp::handle_connection` against a `tokio::io::duplex` stream. The
driver is responsible for framing inbound and outbound data using the selected
`FrameCodec` and for surfacing server panics as `io::Error` values prefixed
with `server task failed`.
`wireframe_testing` retains the `TestSerializer` trait alias to keep bounds
readable:
```rust,no_run
use wireframe::app::Envelope;
use wireframe::frame::FrameMetadata;
use wireframe::serializer::Serializer;
pub trait TestSerializer:
Serializer + FrameMetadata<Frame = Envelope> + Send + Sync + 'static
{
}
```
### Driver entry points
Length-delimited helpers match the current `wireframe_testing` API and accept
raw frame bytes. Use `drive_with_frames` for pre-framed input (including
malformed frames) and `drive_with_payloads` to wrap payloads with the default
length-delimited framing.
```rust,no_run
use std::io;
use wireframe::app::{Packet, WireframeApp};
pub async fn drive_with_frames<S, C, E>(
app: WireframeApp<S, C, E>,
frames: Vec<Vec<u8>>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet;
pub async fn drive_with_payloads<S, C, E>(
app: WireframeApp<S, C, E>,
payloads: Vec<Vec<u8>>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet;
pub async fn drive_with_frames_mut<S, C, E>(
app: &mut WireframeApp<S, C, E>,
frames: Vec<Vec<u8>>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet;
pub async fn drive_with_payloads_mut<S, C, E>(
app: &mut WireframeApp<S, C, E>,
payloads: Vec<Vec<u8>>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet;
```
Codec-aware helpers should be added as non-breaking extensions, so tests can
pass `FrameCodec` values and inspect protocol-specific frame metadata. Prefer
distinct names (for example, `drive_with_codec_frames`) so existing tests that
use raw byte frames continue to compile unchanged.
Behavioural details:
- `drive_with_frames` writes the provided bytes verbatim, making it the
preferred helper for malformed frame and recovery tests.
- `drive_with_payloads` length-prefixes payload bytes before delegating to
`drive_with_frames`.
- `drive_with_bincode` encodes a message with bincode and then length-prefixes
the output before driving the app.
- Mutable variants (`drive_with_frames_mut` and `drive_with_payloads_mut`)
accept `&mut WireframeApp` so tests can reuse a configured instance.
- I/O failures, framing errors, and server task panics are all returned as
`io::Error` values, so tests can assert on error handling.
### Slow-I/O drivers
Roadmap item `8.5.2` extends the in-memory harness with explicit slow reader
and slow writer simulation. The public surface uses one pacing type plus one
driver config:
```rust,no_run
# fn example() -> Result<(), Box<dyn std::error::Error>> {
use std::{num::NonZeroUsize, time::Duration};
use wireframe_testing::{SlowIoConfig, SlowIoPacing};
let writer = SlowIoPacing::new(
NonZeroUsize::new(8).ok_or_else(|| std::io::Error::other("chunk size must be non-zero"))?,
Duration::from_millis(5),
);
let reader = SlowIoPacing::new(
NonZeroUsize::new(32).ok_or_else(|| std::io::Error::other("chunk size must be non-zero"))?,
Duration::from_millis(5),
);
let config = SlowIoConfig::new()
.with_writer_pacing(writer)
.with_reader_pacing(reader)
.with_capacity(64);
# let _ = config;
# Ok(())
# }
```
The pacing applies to the client side of the in-memory duplex stream:
- `writer_pacing` throttles bytes written into the app.
- `reader_pacing` throttles bytes drained from the app.
- `capacity` controls how quickly the duplex buffer saturates, which is useful
when asserting back-pressure behaviour.
Accessibility caption: Sequence diagram showing a test spawning an async
runtime task that drives slow-I/O helpers, optionally pacing writes into the
app and reads back out of it through Tokio time delays before returning the
captured bytes for back-pressure assertions.
```mermaid
sequenceDiagram
actor Test as Test
participant Runtime as TokioRuntime
participant Helper as SlowIoHelpers
participant App as WireframeApp
participant Writer as SlowWriterPacer
participant Reader as SlowReaderPacer
participant Time as TokioTime
Test->>Runtime: spawn async test
Runtime->>Helper: drive_with_slow_payloads(app, payloads, config)
alt writer_pacing configured
Helper->>Writer: start_paced_writes(payloads, config.writer_pacing)
Writer->>App: send_first_chunk()
loop for each remaining chunk
Writer->>Time: sleep(config.writer_pacing.delay)
Time-->>Writer: wake
Writer->>App: send_chunk()
end
else no writer pacing
Helper->>App: send_all_payloads()
end
alt reader_pacing configured
Helper->>Reader: start_paced_reads(config.reader_pacing)
Reader->>App: read_first_chunk()
App-->>Reader: first_chunk_bytes
Reader-->>Helper: append_to_output(first_chunk_bytes)
loop while app_has_more_output
Reader->>Time: sleep(config.reader_pacing.delay)
Time-->>Reader: wake
Reader->>App: read_chunk()
App-->>Reader: chunk_bytes
Reader-->>Helper: append_to_output(chunk_bytes)
end
else no reader pacing
Helper->>App: read_all_output()
App-->>Helper: all_bytes
end
Helper-->>Runtime: Result<Vec<u8>>
Runtime-->>Test: assert_backpressure_behaviour()
```
Public entry points:
- `drive_with_slow_frames` for pre-framed raw bytes.
- `drive_with_slow_payloads` for default length-delimited payloads.
- `drive_with_slow_codec_payloads` for codec-aware payload round trips.
- `drive_with_slow_codec_frames` for codec-aware frame assertions.
These helpers are intentionally additive rather than replacing the existing
drivers. Existing tests keep the simpler fast-path helpers, while
back-pressure-focused tests opt into explicit pacing.
### Reassembly assertion helpers
Roadmap item `8.5.3` adds a public `wireframe_testing::reassembly` module for
deterministic assertions over two related but independent domains:
- transport fragment reassembly driven by `wireframe::fragment::Reassembler`;
and
- protocol message assembly driven by
`wireframe::message_assembler::MessageAssemblyState`.
The API uses typed snapshots plus typed expectation enums rather than macros.
That design keeps the helpers usable from ordinary integration tests and from
`rstest-bdd` step definitions, both of which prefer `Result`-returning helper
functions over panic-only assertions.
```rust,no_run
use wireframe::message_assembler::{FrameSequence, MessageKey};
use wireframe_testing::reassembly::{
MessageAssemblyErrorExpectation,
MessageAssemblySnapshot,
assert_message_assembly_error,
};
# fn check(snapshot: MessageAssemblySnapshot<'_>) -> wireframe_testing::TestResult {
assert_message_assembly_error(
snapshot,
MessageAssemblyErrorExpectation::SequenceMismatch {
expected: FrameSequence(2),
found: FrameSequence(3),
},
)?;
# Ok(())
# }
```
Message-assembly helpers cover incomplete, completed, errored, buffered, and
evicted outcomes. Fragment helpers cover absent, completed, errored, buffered,
and evicted outcomes. The snapshots are intentionally caller-built views over
current state, which keeps the helper module independent of any one test world
or fixture shape.
### Buffer capacity and limits
The duplex stream buffer defaults to `TEST_MAX_FRAME`, matching the shared
length-delimited framing guardrail. Use `run_app` or the `*_with_capacity`
helpers to override this value; they reject a `capacity` of zero or above the
maximum ceiling with `io::ErrorKind::InvalidInput`.
Codec-aware helpers should instead read `FrameCodec::max_frame_length()` to
align buffer sizing with protocol framing rules.
### Frame encoding and decoding helpers
The current helpers focus on the default length-delimited framing used by
Wireframe tests:
```rust,no_run
use tokio_util::codec::LengthDelimitedCodec;
pub fn decode_frames(bytes: Vec<u8>) -> Vec<Vec<u8>>;
pub fn decode_frames_with_max(bytes: Vec<u8>, max_len: usize) -> Vec<Vec<u8>>;
pub fn encode_frame(codec: &mut LengthDelimitedCodec, bytes: Vec<u8>) -> Vec<u8>;
```
### Bincode convenience wrapper
Most tests still send a single request encoded with bincode. Keep a small
wrapper that performs `bincode::encode_to_vec` with
`bincode::config::standard()` and drives the app:
```rust,no_run
use std::io;
use wireframe::app::{Packet, WireframeApp};
pub async fn drive_with_bincode<M, S, C, E>(
app: WireframeApp<S, C, E>,
msg: M,
) -> io::Result<Vec<u8>>
where
M: bincode::Encode,
S: TestSerializer,
C: Send + 'static,
E: Packet;
```
```rust,no_run
use wireframe_testing::{decode_frames, drive_with_bincode};
#[derive(bincode::Encode)]
struct Ping(u8);
let bytes = drive_with_bincode(app, Ping(1)).await?;
let frames = decode_frames(bytes);
assert!(!frames.is_empty(), "expected at least one response frame");
```
## Codec fixtures
Add reusable fixtures to avoid duplicated framing logic in tests:
- Default codec fixtures that yield `Bytes` frames, oversized payloads, and
truncated prefixes for negative tests.
- Example codec fixtures that mirror `wireframe::codec::examples` (for example
Hotline and MySQL), including helpers for correlation identifiers.
- Invalid frame builders (bad lengths, missing headers, truncated payloads) for
codec error and recovery assertions.
Fixtures should return `F::Frame` where possible and provide explicit
`wire_bytes()` helpers for malformed cases.
## Test observability harness
Introduce `wireframe_testing::observability`, providing an
`ObservabilityHandle` that combines log capture with metrics recording.
Key behaviours:
- Acquisition installs log capture via `LoggerHandle` and a scoped metrics
recorder using `metrics_util::debugging::DebuggingRecorder`.
- Access is serialized with a global lock, so concurrent tests do not
interfere, but the harness will reduce parallelism for the affected suite.
- Observability-heavy suites should run in a single-threaded test runner (for
example, pass `--test-threads=1` for the affected test binary), or share a
single `ObservabilityHandle` via a per-suite fixture to amortize setup costs.
- When partial parallelism is needed, group observability assertions into a
dedicated test binary that runs serially, and keep the remaining test suite
in the default parallel runner.
- Metrics snapshots should consume the captured values (matching
`DebuggingRecorder` semantics) so `clear()` can be implemented by draining a
snapshot.
- The handle should restore the previous recorder on drop by swapping the
active recorder back into a global delegating recorder. This keeps the global
recorder stable while still providing per-test isolation.
- When the `metrics` feature is disabled, the handle should still capture logs
and return empty metric snapshots.
Implemented public API:
```rust,no_run
use log::Level;
use wireframe_testing::LoggerHandle;
use wireframe_testing::observability::Labels;
pub struct ObservabilityHandle { /* fields omitted */ }
impl ObservabilityHandle {
pub fn new() -> Self;
pub fn logs(&mut self) -> &mut LoggerHandle;
pub fn recorder(&self) -> &metrics_util::debugging::DebuggingRecorder;
pub fn snapshot(&mut self);
pub fn clear(&mut self);
pub fn counter(&self, name: &str, labels: impl Into<Labels>) -> u64;
pub fn counter_without_labels(&self, name: &str) -> u64;
pub fn codec_error_counter(
&self, error_type: &str, recovery_policy: &str,
) -> u64;
pub fn assert_counter(
&self, name: &str, labels: impl Into<Labels>, expected: u64,
) -> Result<(), String>;
pub fn assert_no_metric(&self, name: &str) -> Result<(), String>;
pub fn assert_codec_error_counter(
&self, error_type: &str, recovery_policy: &str, expected: u64,
) -> Result<(), String>;
pub fn assert_log_contains(
&mut self, substring: &str,
) -> Result<(), String>;
pub fn assert_log_at_level(
&mut self, level: Level, substring: &str,
) -> Result<(), String>;
}
pub fn obs_handle() -> ObservabilityHandle;
```
Tests using `ObservabilityHandle` should not run concurrently; the global lock
serializes access, so favour a shared fixture or a dedicated serial test binary
for observability assertions.
The codec-error regression coverage added for roadmap item 9.7.4 establishes a
recommended pattern for taxonomy assertions:
- keep executable regressions in the repository-root `tests/` directory,
because `wireframe_testing` is a dev-dependency rather than a workspace
member;
- use the default `LengthDelimitedFrameCodec` directly when the test must
retain typed `EofError` information, because the generic helper surface
intentionally normalizes decoder failures to `io::Error`; and
- use `ObservabilityHandle` to assert that the chosen
`error_type`/`recovery_policy` label pair is what the regression records.
## In-process server/client pair harness
`wireframe_testing::client_pair` provides a reusable harness for starting a
bound `WireframeServer` and a connected `WireframeClient` inside one test
process. Both sides communicate over a real loopback TCP socket so
compatibility assertions exercise the full network path.
### Public API
- `spawn_wireframe_pair(app_factory, configure_client)` — reserves a loopback
listener, spawns a single-worker server, waits for readiness, connects a
client through the supplied builder closure, and returns a `WireframePair`.
- `spawn_wireframe_pair_default(app_factory)` — convenience wrapper that
connects a client with default builder settings.
- `WireframePair::client_mut()` — returns a `TestResult` containing a mutable
reference to the connected client for request/response operations. Returns an
error if called after shutdown. Streaming responses borrow the client
exclusively, preserving Rust's ownership rules at the call site.
- `WireframePair::local_addr()` — returns the loopback address the server is
bound to.
- `WireframePair::shutdown().await` — signals the server to stop, drops the
client, and joins the server task.
### Shared echo app factory
`wireframe_testing::echo_app_factory` provides a ready-made app factory for
pair-harness tests. It accepts an `Arc<AtomicUsize>` counter and returns a
fallible closure (`Fn() -> TestResult<CommonTestApp>`) that builds a
`CommonTestApp` with a single route (message id `1`) whose handler only records
invocations. A lower-level `echo_handler` is also exported for custom app
construction.
### Lifecycle
The harness reserves the TCP listener through `unused_listener()` before
spawning the server, eliminating address-race flakiness. The server is bound
through `WireframeServer::bind_existing_listener` and runs with a one-shot
shutdown channel. If the client connection fails after the server has started,
the server task is torn down before the error is returned. A `Drop`
implementation sends the shutdown signal and immediately aborts the server task
as a safety net if explicit shutdown is skipped.
### Usage
```rust,no_run
use wireframe::app::WireframeApp;
use wireframe_testing::client_pair::spawn_wireframe_pair;
use wireframe_testing::TestResult;
async fn example() -> TestResult<()> {
let mut pair = spawn_wireframe_pair(
|| WireframeApp::default(),
|builder| builder.max_frame_length(2048),
)
.await?;
let addr = pair.local_addr();
pair.shutdown().await?;
Ok(())
}
```
### Rationale
The harness uses real loopback TCP rather than `tokio::io::duplex` because the
purpose of `12.3.2` is client/server compatibility, not in-memory app driving.
It lives in `wireframe_testing` rather than `wireframe::testkit` to avoid
widening the optional production feature surface for a purely test-facing
capability.
## Helper macros
Keep `push_expect!` and `recv_expect!` for concise async assertions, with error
messages that include call-site information in debug builds.
## Example usage
```rust,no_run
use std::sync::Arc;
use wireframe::app::{Envelope, WireframeApp};
use wireframe_testing::{decode_frames, drive_with_bincode, obs_handle};
#[tokio::test]
async fn round_trips_with_codec() -> std::io::Result<()> {
let app = WireframeApp::new()?
.route(1, Arc::new(|_: &Envelope| Box::pin(async {})))?;
let env = Envelope::new(1, Some(5), vec![1, 2, 3]);
let out = drive_with_bincode(app, env).await?;
let frames = decode_frames(out);
assert_eq!(frames.len(), 1);
Ok(())
}
#[tokio::test]
async fn captures_metrics() -> std::io::Result<()> {
use wireframe::metrics::{Direction, FRAMES_PROCESSED, inc_frames};
let mut obs = obs_handle();
obs.clear();
metrics::with_local_recorder(obs.recorder(), || {
inc_frames(Direction::Inbound);
});
obs.snapshot();
assert_eq!(
obs.counter(FRAMES_PROCESSED, &[("direction", "inbound")]),
1
);
Ok(())
}
```
## Implementation notes
- Add codec-aware helpers (for example, `drive_with_codec_frames`) that accept
`F: FrameCodec` and return `F::Frame` values for tests that need protocol
metadata.
- Provide codec-aware `encode_frames` and `decode_frames` helpers that return
`io::Result` on failures instead of panicking.
- Keep the length-delimited helpers so existing tests that use raw frame bytes
remain compatible.
- Add fixture helpers for default and example codecs, including invalid frames
used by codec error tests.
- Implement the observability harness and expose it via an `rstest` fixture in
`wireframe_testing::observability`.
## Proposed enhancements
### Codec-aware frame encoding and decoding
Proposed codec-aware helpers make it easy to build fixtures or inspect raw
bytes:
```rust,no_run
use std::io;
use wireframe::codec::FrameCodec;
pub fn encode_frames<F>(codec: &F, frames: Vec<F::Frame>) -> io::Result<Vec<u8>>
where
F: FrameCodec;
pub fn decode_frames<F>(codec: &F, bytes: Vec<u8>) -> io::Result<Vec<F::Frame>>
where
F: FrameCodec;
```
These helpers should return an error when trailing bytes remain in the buffer
after the last frame, so tests can detect partial or malformed streams.