use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use atrium_api::com::atproto::label::defs::Label;
use futures_util::StreamExt;
use miette::{Diagnostic, NamedSource, SourceSpan};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::time::Instant;
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FrameHeader {
pub op: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub t: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscribeLabelsPayload {
pub seq: i64,
pub labels: Vec<Label>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscribeInfoPayload {
pub name: String,
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscribeErrorPayload {
pub error: String,
pub message: Option<String>,
}
#[derive(Debug, Clone)]
pub enum DecodedFrame {
Labels(SubscribeLabelsPayload),
Info(SubscribeInfoPayload),
Error(SubscribeErrorPayload),
}
#[derive(Debug, Clone)]
pub enum FrameDecodeError {
HeaderDecode {
raw: Arc<[u8]>,
cause: String,
},
PayloadDecode {
header: FrameHeader,
raw: Arc<[u8]>,
cause: String,
},
UnknownMessageType {
t: String,
raw: Arc<[u8]>,
},
TextFrameRejected(Arc<[u8]>),
}
pub fn decode_frame(bytes: &[u8]) -> Result<DecodedFrame, FrameDecodeError> {
let mut cursor = bytes;
let header = ciborium::de::from_reader::<FrameHeader, _>(&mut cursor).map_err(|e| {
FrameDecodeError::HeaderDecode {
raw: Arc::from(bytes),
cause: e.to_string(),
}
})?;
match (header.op, &header.t) {
(1, Some(t)) if t == "#labels" => {
let payload = ciborium::de::from_reader::<SubscribeLabelsPayload, _>(&mut cursor)
.map_err(|e| FrameDecodeError::PayloadDecode {
header: header.clone(),
raw: Arc::from(bytes),
cause: e.to_string(),
})?;
Ok(DecodedFrame::Labels(payload))
}
(1, Some(t)) if t == "#info" => {
let payload = ciborium::de::from_reader::<SubscribeInfoPayload, _>(&mut cursor)
.map_err(|e| FrameDecodeError::PayloadDecode {
header: header.clone(),
raw: Arc::from(bytes),
cause: e.to_string(),
})?;
Ok(DecodedFrame::Info(payload))
}
(-1, _) => {
let payload = ciborium::de::from_reader::<SubscribeErrorPayload, _>(&mut cursor)
.map_err(|e| FrameDecodeError::PayloadDecode {
header: header.clone(),
raw: Arc::from(bytes),
cause: e.to_string(),
})?;
Ok(DecodedFrame::Error(payload))
}
(_, Some(t)) => Err(FrameDecodeError::UnknownMessageType {
t: t.clone(),
raw: Arc::from(bytes),
}),
_ => Err(FrameDecodeError::UnknownMessageType {
t: format!("unknown op={} t={:?}", header.op, header.t),
raw: Arc::from(bytes),
}),
}
}
#[derive(Debug, Clone)]
pub enum BackfillOutcome {
CompletedWithIdleGap {
frames_observed: usize,
idle_gap_ms: u64,
},
ExceededBudget {
frames_observed: usize,
},
StreamClosedDuringBackfill {
frames_observed: usize,
},
NoFramesWithinBudget,
}
#[derive(Debug, Clone)]
pub enum LiveTailOutcome {
FromBackfill,
CleanHold {
frames_observed: usize,
},
SkippedEmpty,
ConnectFailed,
}
pub const SAMPLE_LABEL_CAP: usize = 256;
#[derive(Debug, Clone)]
pub struct SubscriptionFacts {
pub backfill_outcome: BackfillOutcome,
pub live_tail_outcome: LiveTailOutcome,
pub decode_errors: Vec<FrameDecodeError>,
pub sample_labels: Vec<Label>,
}
#[derive(Debug, Error, Diagnostic)]
#[error("{message}")]
#[diagnostic(code = "labeler::subscription::frame_decode")]
pub struct FrameDecodeFailureDiagnostic {
pub message: String,
#[source_code]
pub source_code: NamedSource<Arc<[u8]>>,
#[label("frame decode failure")]
pub span: SourceSpan,
}
#[derive(Debug, Error)]
pub enum SubscriptionStageError {
#[error("Subscription transport error: {message}")]
Transport {
message: String,
#[source]
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
}
#[async_trait]
pub trait FrameStream: Send {
async fn next_frame(&mut self) -> Option<Result<Vec<u8>, SubscriptionStageError>>;
async fn close(&mut self);
}
#[async_trait]
pub trait WebSocketClient: Send + Sync {
async fn connect(&self, url: &Url) -> Result<Box<dyn FrameStream>, SubscriptionStageError>;
}
pub struct RealWebSocketClient;
pub struct RealFrameStream {
stream: tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
}
#[async_trait]
impl FrameStream for RealFrameStream {
async fn next_frame(&mut self) -> Option<Result<Vec<u8>, SubscriptionStageError>> {
use tokio_tungstenite::tungstenite::Message;
loop {
match self.stream.next().await? {
Ok(Message::Binary(data)) => {
return Some(Ok(data.to_vec()));
}
Ok(Message::Text(_)) => {
return Some(Err(SubscriptionStageError::Transport {
message: "received text frame, expected binary".to_string(),
source: None,
}));
}
Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {
continue;
}
Ok(Message::Close(_)) => {
return None;
}
Ok(Message::Frame(_)) => {
continue;
}
Err(e) => {
return Some(Err(SubscriptionStageError::Transport {
message: e.to_string(),
source: Some(Box::new(e)),
}));
}
}
}
}
async fn close(&mut self) {
let _ = self.stream.close(None).await;
}
}
#[async_trait]
impl WebSocketClient for RealWebSocketClient {
async fn connect(&self, url: &Url) -> Result<Box<dyn FrameStream>, SubscriptionStageError> {
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
let request = url.to_string().into_client_request().map_err(|e| {
SubscriptionStageError::Transport {
message: e.to_string(),
source: Some(Box::new(e)),
}
})?;
let (stream, _response) = tokio_tungstenite::connect_async(request)
.await
.map_err(|e| SubscriptionStageError::Transport {
message: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(Box::new(RealFrameStream { stream }))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Check {
EndpointReachable,
LiveTailEndpointReachable,
Backfill,
LiveTail,
FrameDecode,
}
impl Check {
pub fn id(self) -> &'static str {
match self {
Check::EndpointReachable => "subscription::endpoint_reachable",
Check::LiveTailEndpointReachable => "subscription::live_tail_endpoint_reachable",
Check::Backfill => "subscription::backfill",
Check::LiveTail => "subscription::live_tail",
Check::FrameDecode => "subscription::frame_decode",
}
}
pub fn pass(self) -> crate::commands::test::labeler::report::CheckResult {
use crate::commands::test::labeler::report::{CheckStatus, Stage};
crate::commands::test::labeler::report::CheckResult {
id: self.id(),
stage: Stage::Subscription,
status: CheckStatus::Pass,
summary: std::borrow::Cow::Borrowed(match self {
Check::Backfill => "Subscription backfill completed",
Check::LiveTail => "Subscription live-tail connection held",
_ => "subscription check passed",
}),
diagnostic: None,
skipped_reason: None,
}
}
pub fn spec_violation(
self,
diagnostic: Box<dyn miette::Diagnostic + Send + Sync>,
) -> crate::commands::test::labeler::report::CheckResult {
use crate::commands::test::labeler::report::{CheckStatus, Stage};
crate::commands::test::labeler::report::CheckResult {
id: self.id(),
stage: Stage::Subscription,
status: CheckStatus::SpecViolation,
summary: std::borrow::Cow::Borrowed(match self {
Check::FrameDecode => "Subscription frame decode failure",
_ => "subscription check failed",
}),
diagnostic: Some(diagnostic),
skipped_reason: None,
}
}
pub fn network_error(self) -> crate::commands::test::labeler::report::CheckResult {
use crate::commands::test::labeler::report::{CheckStatus, Stage};
crate::commands::test::labeler::report::CheckResult {
id: self.id(),
stage: Stage::Subscription,
status: CheckStatus::NetworkError,
summary: std::borrow::Cow::Borrowed(match self {
Check::EndpointReachable => "Subscription endpoint reachability",
Check::LiveTailEndpointReachable => "Subscription live-tail reachability",
_ => "subscription network error",
}),
diagnostic: None,
skipped_reason: None,
}
}
pub fn advisory(self) -> crate::commands::test::labeler::report::CheckResult {
use crate::commands::test::labeler::report::{CheckStatus, Stage};
crate::commands::test::labeler::report::CheckResult {
id: self.id(),
stage: Stage::Subscription,
status: CheckStatus::Advisory,
summary: std::borrow::Cow::Borrowed(match self {
Check::Backfill => "Subscription backfill advisory",
_ => "subscription advisory",
}),
diagnostic: None,
skipped_reason: None,
}
}
pub fn skip(
self,
reason: impl Into<std::borrow::Cow<'static, str>>,
) -> crate::commands::test::labeler::report::CheckResult {
use crate::commands::test::labeler::report::{CheckStatus, Stage};
crate::commands::test::labeler::report::CheckResult {
id: self.id(),
stage: Stage::Subscription,
status: CheckStatus::Skipped,
summary: std::borrow::Cow::Borrowed(match self {
Check::LiveTail => "Subscription live-tail skipped",
_ => "subscription check skipped",
}),
diagnostic: None,
skipped_reason: Some(reason.into()),
}
}
}
#[derive(Debug)]
pub struct SubscriptionStageOutput {
pub facts: Option<SubscriptionFacts>,
pub results: Vec<crate::commands::test::labeler::report::CheckResult>,
}
fn collect_sample_labels(buffer: &mut Vec<Label>, frame_labels: Vec<Label>) {
if buffer.len() >= SAMPLE_LABEL_CAP {
return;
}
let remaining = SAMPLE_LABEL_CAP - buffer.len();
if frame_labels.len() <= remaining {
buffer.extend(frame_labels);
} else {
buffer.extend(frame_labels.into_iter().take(remaining));
}
}
async fn run_live_tail(
endpoint: &Url,
ws: &dyn WebSocketClient,
budget: Duration,
decode_errors: &mut Vec<FrameDecodeError>,
sample_labels: &mut Vec<Label>,
) -> Result<LiveTailOutcome, SubscriptionStageError> {
let mut live_tail_url = endpoint.clone();
live_tail_url.set_path("xrpc/com.atproto.label.subscribeLabels");
if live_tail_url.scheme() == "https" {
let _ = live_tail_url.set_scheme("wss");
}
tracing::debug!(url = %live_tail_url, "subscription stage: connecting for live-tail");
match ws.connect(&live_tail_url).await {
Ok(mut live_stream) => {
let mut live_frames_observed = 0;
let live_deadline = Instant::now() + budget;
loop {
if Instant::now() >= live_deadline {
break;
}
let time_left = live_deadline.saturating_duration_since(Instant::now());
match tokio::time::timeout(time_left, live_stream.next_frame()).await {
Ok(Some(Ok(frame))) => {
live_frames_observed += 1;
tracing::trace!(
frame_num = live_frames_observed,
frame_len = frame.len(),
"subscription stage: live-tail frame received"
);
match decode_frame(&frame) {
Ok(DecodedFrame::Labels(payload)) => {
collect_sample_labels(sample_labels, payload.labels);
}
Ok(_) => {}
Err(e) => decode_errors.push(e),
}
}
Ok(Some(Err(_))) => {
live_frames_observed += 1;
}
Ok(None) | Err(_) => break,
}
}
tracing::debug!(
live_frames_observed,
"subscription stage: live-tail phase finished"
);
live_stream.close().await;
Ok(LiveTailOutcome::CleanHold {
frames_observed: live_frames_observed,
})
}
Err(e) => {
tracing::debug!(url = %live_tail_url, "subscription stage: live-tail connect failed");
Err(e)
}
}
}
pub async fn run(
labeler_endpoint: &Url,
ws: &dyn WebSocketClient,
budget_per_connection: Duration,
) -> SubscriptionStageOutput {
use crate::commands::test::labeler::report::CheckResult;
use std::borrow::Cow;
use std::collections::HashSet;
let backfill_url = {
let mut url = labeler_endpoint.clone();
url.set_path("xrpc/com.atproto.label.subscribeLabels");
{
let mut query = url.query_pairs_mut();
query.append_pair("cursor", "0");
}
if url.scheme() == "https" {
let _ = url.set_scheme("wss");
}
url
};
tracing::debug!(url = %backfill_url, "subscription stage: connecting for backfill");
let mut stream = match ws.connect(&backfill_url).await {
Ok(s) => s,
Err(_e) => {
tracing::debug!(url = %backfill_url, "subscription stage: backfill connect failed");
return SubscriptionStageOutput {
facts: None,
results: vec![Check::EndpointReachable.network_error()],
};
}
};
let mut backfill_outcome = BackfillOutcome::NoFramesWithinBudget;
let mut live_tail_outcome: Option<LiveTailOutcome> = None;
let mut decode_errors: Vec<FrameDecodeError> = vec![];
let mut sample_labels: Vec<Label> = Vec::new();
let mut frames_observed = 0;
let mut last_frame_at: Option<Instant> = None;
let backfill_deadline = Instant::now() + budget_per_connection;
loop {
if Instant::now() >= backfill_deadline {
if frames_observed > 0 {
backfill_outcome = BackfillOutcome::ExceededBudget { frames_observed };
}
break;
}
let idle_gap_deadline = last_frame_at.map(|t| t + Duration::from_millis(500));
let timeout = if let Some(idle_deadline) = idle_gap_deadline {
if idle_deadline <= Instant::now() {
backfill_outcome = BackfillOutcome::CompletedWithIdleGap {
frames_observed,
idle_gap_ms: 500,
};
live_tail_outcome = Some(LiveTailOutcome::FromBackfill);
break;
}
let idle_time_left = idle_deadline.saturating_duration_since(Instant::now());
let budget_time_left = backfill_deadline.saturating_duration_since(Instant::now());
idle_time_left.min(budget_time_left)
} else {
backfill_deadline.saturating_duration_since(Instant::now())
};
match tokio::time::timeout(timeout, stream.next_frame()).await {
Ok(Some(Ok(frame_bytes))) => {
last_frame_at = Some(Instant::now());
frames_observed += 1;
tracing::trace!(
frame_num = frames_observed,
frame_len = frame_bytes.len(),
"subscription stage: backfill frame received"
);
match decode_frame(&frame_bytes) {
Ok(DecodedFrame::Labels(payload)) => {
collect_sample_labels(&mut sample_labels, payload.labels);
}
Ok(_) => {}
Err(e) => decode_errors.push(e),
}
}
Ok(Some(Err(_e))) => {
}
Ok(None) => {
if frames_observed > 0 {
backfill_outcome =
BackfillOutcome::StreamClosedDuringBackfill { frames_observed };
} else {
backfill_outcome = BackfillOutcome::NoFramesWithinBudget;
}
break;
}
Err(_e) => {
if frames_observed > 0 {
if let Some(idle_deadline) = idle_gap_deadline {
if Instant::now() >= idle_deadline {
backfill_outcome = BackfillOutcome::CompletedWithIdleGap {
frames_observed,
idle_gap_ms: 500,
};
live_tail_outcome = Some(LiveTailOutcome::FromBackfill);
} else {
backfill_outcome = BackfillOutcome::ExceededBudget { frames_observed };
}
} else {
backfill_outcome = BackfillOutcome::ExceededBudget { frames_observed };
}
}
break;
}
}
}
tracing::debug!(
frames_observed,
outcome = ?backfill_outcome,
"subscription stage: backfill phase finished"
);
stream.close().await;
let live_tail_outcome = if let Some(outcome) = live_tail_outcome {
outcome
} else {
match &backfill_outcome {
BackfillOutcome::ExceededBudget { .. }
| BackfillOutcome::StreamClosedDuringBackfill { .. } => {
run_live_tail(
labeler_endpoint,
ws,
budget_per_connection,
&mut decode_errors,
&mut sample_labels,
)
.await
.ok()
.unwrap_or(LiveTailOutcome::ConnectFailed)
}
BackfillOutcome::NoFramesWithinBudget => LiveTailOutcome::SkippedEmpty,
BackfillOutcome::CompletedWithIdleGap { .. } => {
unreachable!(
"live_tail_outcome is already Some(FromBackfill) for CompletedWithIdleGap"
);
}
}
};
let mut results = vec![];
if matches!(live_tail_outcome, LiveTailOutcome::ConnectFailed) {
results.push(Check::LiveTailEndpointReachable.network_error());
}
results.push(match &backfill_outcome {
BackfillOutcome::CompletedWithIdleGap { .. } => Check::Backfill.pass(),
BackfillOutcome::ExceededBudget { .. } => CheckResult {
summary: Cow::Borrowed("Subscription backfill exceeded budget"),
..Check::Backfill.advisory()
},
BackfillOutcome::StreamClosedDuringBackfill { .. } => CheckResult {
summary: Cow::Borrowed("Subscription backfill stream closed unexpectedly"),
..Check::Backfill.advisory()
},
BackfillOutcome::NoFramesWithinBudget => CheckResult {
summary: Cow::Borrowed("Subscription backfill had no frames"),
skipped_reason: Some(Cow::Borrowed("labeler has no published labels")),
..Check::Backfill.advisory()
},
});
if !matches!(live_tail_outcome, LiveTailOutcome::ConnectFailed) {
results.push(match live_tail_outcome {
LiveTailOutcome::FromBackfill => CheckResult {
summary: Cow::Borrowed("Subscription live-tail observed after backfill"),
..Check::LiveTail.pass()
},
LiveTailOutcome::CleanHold { .. } => Check::LiveTail.pass(),
LiveTailOutcome::SkippedEmpty => {
Check::LiveTail.skip("labeler has no published labels")
}
LiveTailOutcome::ConnectFailed => {
unreachable!("ConnectFailed case should be filtered by outer guard")
}
});
}
let mut seen_variants = HashSet::new();
for err in decode_errors.iter() {
let variant_key = std::mem::discriminant(err);
if seen_variants.insert(variant_key) {
let (raw_bytes, msg) = match err {
FrameDecodeError::HeaderDecode { raw, cause } => {
(raw.clone(), format!("Header decode failed: {cause}"))
}
FrameDecodeError::PayloadDecode { raw, cause, .. } => {
(raw.clone(), format!("Payload decode failed: {cause}"))
}
FrameDecodeError::UnknownMessageType { t, raw } => {
(raw.clone(), format!("Unknown message type: {t}"))
}
FrameDecodeError::TextFrameRejected(raw) => (
raw.clone(),
"Text frame rejected (expected binary)".to_string(),
),
};
let diagnostic = FrameDecodeFailureDiagnostic {
message: msg,
source_code: NamedSource::new("frame", raw_bytes),
span: SourceSpan::new(0.into(), 1),
};
results.push(Check::FrameDecode.spec_violation(Box::new(diagnostic)));
}
}
let facts = Some(SubscriptionFacts {
backfill_outcome,
live_tail_outcome,
decode_errors,
sample_labels,
});
SubscriptionStageOutput { facts, results }
}
#[cfg(test)]
mod tests {
use super::*;
fn encode_cbor<T: Serialize>(value: &T) -> Vec<u8> {
let mut buf = Vec::new();
ciborium::ser::into_writer(value, &mut buf).expect("failed to encode CBOR");
buf
}
#[test]
fn collect_sample_labels_respects_cap() {
use atrium_api::com::atproto::label::defs::LabelData;
use atrium_api::types::string::Datetime;
let make_label = |i: usize| -> Label {
LabelData {
cid: None,
cts: Datetime::new("2026-01-01T00:00:00.000Z".parse().expect("valid datetime")),
exp: None,
neg: None,
sig: Some(vec![0u8; 64]),
src: "did:plc:test123456789abcdefghijklmnop"
.parse()
.expect("valid did"),
uri: format!("at://did:plc:test123456789abcdefghijklmnop/x/{i}"),
val: "spam".to_string(),
ver: Some(1),
}
.into()
};
let mut buffer: Vec<Label> = Vec::new();
let half = SAMPLE_LABEL_CAP / 2;
collect_sample_labels(&mut buffer, (0..half).map(make_label).collect());
assert_eq!(buffer.len(), half);
let oversized: Vec<Label> = (0..(SAMPLE_LABEL_CAP * 2)).map(make_label).collect();
collect_sample_labels(&mut buffer, oversized);
assert_eq!(buffer.len(), SAMPLE_LABEL_CAP);
collect_sample_labels(&mut buffer, vec![make_label(99999)]);
assert_eq!(buffer.len(), SAMPLE_LABEL_CAP);
}
#[test]
fn decode_labels_frame_valid() {
let header = FrameHeader {
op: 1,
t: Some("#labels".to_string()),
};
let payload = SubscribeLabelsPayload {
seq: 0,
labels: vec![],
};
let mut frame_bytes = encode_cbor(&header);
frame_bytes.extend(encode_cbor(&payload));
match decode_frame(&frame_bytes) {
Ok(DecodedFrame::Labels(p)) => {
assert_eq!(p.seq, 0);
assert!(p.labels.is_empty());
}
other => panic!("expected DecodedFrame::Labels, got {other:?}"),
}
}
#[test]
fn decode_info_frame_valid() {
let header = FrameHeader {
op: 1,
t: Some("#info".to_string()),
};
let payload = SubscribeInfoPayload {
name: "test-service".to_string(),
message: Some("info message".to_string()),
};
let mut frame_bytes = encode_cbor(&header);
frame_bytes.extend(encode_cbor(&payload));
match decode_frame(&frame_bytes) {
Ok(DecodedFrame::Info(p)) => {
assert_eq!(p.name, "test-service");
assert_eq!(p.message, Some("info message".to_string()));
}
other => panic!("expected DecodedFrame::Info, got {other:?}"),
}
}
#[test]
fn decode_error_frame_valid() {
let header = FrameHeader { op: -1, t: None };
let payload = SubscribeErrorPayload {
error: "TestError".to_string(),
message: Some("Test error message".to_string()),
};
let mut frame_bytes = encode_cbor(&header);
frame_bytes.extend(encode_cbor(&payload));
match decode_frame(&frame_bytes) {
Ok(DecodedFrame::Error(p)) => {
assert_eq!(p.error, "TestError");
assert_eq!(p.message, Some("Test error message".to_string()));
}
other => panic!("expected DecodedFrame::Error, got {other:?}"),
}
}
#[test]
fn decode_frame_header_decode_failure() {
let garbage = vec![0x1f, 0x2f, 0x3f]; match decode_frame(&garbage) {
Err(FrameDecodeError::HeaderDecode { raw, cause: _ }) => {
assert_eq!(raw.as_ref(), &garbage);
}
other => panic!("expected HeaderDecode error, got {other:?}"),
}
}
#[test]
fn decode_frame_payload_decode_failure() {
let header = FrameHeader {
op: 1,
t: Some("#labels".to_string()),
};
let mut frame_bytes = encode_cbor(&header);
frame_bytes.push(0xff);
match decode_frame(&frame_bytes) {
Err(FrameDecodeError::PayloadDecode {
header: _,
raw,
cause: _,
}) => {
assert_eq!(raw.as_ref(), &frame_bytes);
}
other => panic!("expected PayloadDecode error, got {other:?}"),
}
}
#[test]
fn decode_frame_unknown_message_type() {
let header = FrameHeader {
op: 1,
t: Some("#futureType".to_string()),
};
let frame_bytes = encode_cbor(&header);
match decode_frame(&frame_bytes) {
Err(FrameDecodeError::UnknownMessageType { t, raw: _ }) => {
assert_eq!(t, "#futureType");
}
other => panic!("expected UnknownMessageType error, got {other:?}"),
}
}
#[test]
fn decode_frame_error_payload_malformed() {
let header = FrameHeader { op: -1, t: None };
let mut frame_bytes = encode_cbor(&header);
frame_bytes.push(0xff);
match decode_frame(&frame_bytes) {
Err(FrameDecodeError::PayloadDecode {
header: _,
raw,
cause: _,
}) => {
assert_eq!(raw.as_ref(), &frame_bytes);
}
other => panic!("expected PayloadDecode error, got {other:?}"),
}
}
}