use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::{Mutex, Notify};
use crate::ids::{AureliaError, ErrorId};
use crate::peering::address::DomusAddr;
use crate::peering::codec::{EncodedMessage, MessageCodec};
use crate::peering::config::{DomusConfig, DomusConfigAccess, DomusConfigBuilder};
use crate::peering::message_id::PeerMessageIdAllocator;
use crate::peering::peering::RouteLocalRemoteBuilder;
use crate::peering::routing::RouteResolver;
use crate::peering::taberna::{
TabernaInbox, TabernaInboxHandle, TabernaRegistry, TabernaShutdownReport,
};
use crate::peering::transport::{Transport, TransportBackend};
use crate::peering::wire::PROTOCOL_VERSION;
use crate::peering::wire::{ErrorPayload, WireHeader};
use crate::peering::SimpleResolver;
use crate::peering::{BlobReceiver, MessageType, SendOptions, SendOutcome, TabernaId};
use caducus::MpscBuilder;
use std::time::Duration;
mod blob_unit;
mod faults;
mod observability;
mod ring_buffer;
mod runtime;
mod session;
mod socket_transport;
mod transport;
#[derive(Clone, Debug, PartialEq)]
pub(super) struct TestMessage {
msg_type: MessageType,
payload: Bytes,
}
pub(super) struct TestCodec;
impl MessageCodec for TestCodec {
type AppMessage = TestMessage;
fn encode_app(&self, msg: &Self::AppMessage) -> Result<EncodedMessage, AureliaError> {
Ok(EncodedMessage::new(msg.msg_type, msg.payload.clone()))
}
fn decode_app(&self, msg_type: u32, payload: &[u8]) -> Result<Self::AppMessage, AureliaError> {
Ok(TestMessage {
msg_type,
payload: Bytes::copy_from_slice(payload),
})
}
}
pub(super) fn test_message(msg_type: MessageType, payload: &'static [u8]) -> TestMessage {
TestMessage {
msg_type,
payload: Bytes::from_static(payload),
}
}
type AcceptedMessages = Arc<Mutex<Vec<(MessageType, Bytes, Option<BlobReceiver>)>>>;
struct MockSink {
accepted: AcceptedMessages,
mode: SinkMode,
expected_msg_types: Vec<MessageType>,
}
#[derive(Clone, Copy, Debug)]
enum SinkMode {
Ok,
Err(AcceptErrorKind),
}
#[derive(Clone, Copy, Debug)]
enum AcceptErrorKind {
LocalQueueFull,
RemoteTabernaRejected,
TabernaBusy,
}
impl MockSink {
fn new(expected_msg_types: Vec<MessageType>) -> Self {
Self {
accepted: Arc::new(Mutex::new(Vec::new())),
mode: SinkMode::Ok,
expected_msg_types,
}
}
fn with_mode(expected_msg_types: Vec<MessageType>, mode: SinkMode) -> Self {
Self {
accepted: Arc::new(Mutex::new(Vec::new())),
mode,
expected_msg_types,
}
}
}
#[async_trait::async_trait]
impl TabernaInbox for MockSink {
async fn enqueue(
&self,
msg_type: MessageType,
payload: Bytes,
blob_receiver: Option<BlobReceiver>,
notify: Option<Arc<Notify>>,
) -> Result<tokio::sync::oneshot::Receiver<Result<(), AureliaError>>, AureliaError> {
if !self.expected_msg_types.contains(&msg_type) {
return Err(AureliaError::new(ErrorId::RemoteTabernaRejected));
}
let (tx, rx) = tokio::sync::oneshot::channel();
match self.mode {
SinkMode::Ok => {
self.accepted
.lock()
.await
.push((msg_type, payload, blob_receiver));
let _ = tx.send(Ok(()));
if let Some(notify) = notify.as_ref() {
notify.notify_one();
}
}
SinkMode::Err(err) => {
let err = match err {
AcceptErrorKind::LocalQueueFull => AureliaError::new(ErrorId::LocalQueueFull),
AcceptErrorKind::RemoteTabernaRejected => {
AureliaError::new(ErrorId::RemoteTabernaRejected)
}
AcceptErrorKind::TabernaBusy => AureliaError::new(ErrorId::TabernaBusy),
};
let _ = tx.send(Err(err));
if let Some(notify) = notify.as_ref() {
notify.notify_one();
}
}
}
Ok(rx)
}
}
struct MockResolver {
addr: Option<DomusAddr>,
mode: ResolverMode,
}
#[derive(Clone, Copy, Debug)]
enum ResolverMode {
Ok,
Unknown,
Failed,
Delay(std::time::Duration),
}
#[async_trait::async_trait]
impl RouteResolver for MockResolver {
async fn resolve(&self, _taberna_id: TabernaId) -> Result<DomusAddr, AureliaError> {
match self.mode {
ResolverMode::Ok => self
.addr
.clone()
.ok_or_else(|| AureliaError::new(ErrorId::UnknownTaberna)),
ResolverMode::Unknown => Err(AureliaError::new(ErrorId::UnknownTaberna)),
ResolverMode::Failed => Err(AureliaError::with_message(
ErrorId::PeerUnavailable,
"resolver failed",
)),
ResolverMode::Delay(duration) => {
tokio::time::sleep(duration).await;
self.addr
.clone()
.ok_or_else(|| AureliaError::new(ErrorId::UnknownTaberna))
}
}
}
}
#[derive(Clone, Default)]
struct TestBackend;
#[async_trait::async_trait]
impl TransportBackend for TestBackend {
type Addr = DomusAddr;
type Listener = ();
type Stream = tokio::io::DuplexStream;
async fn bind(&self, _local: &Self::Addr) -> Result<Self::Listener, AureliaError> {
Ok(())
}
async fn accept(
&self,
_listener: &mut Self::Listener,
) -> Result<
crate::peering::transport::backend::AuthenticatedStream<Self::Stream, Self::Addr>,
AureliaError,
> {
Err(AureliaError::new(ErrorId::PeerUnavailable))
}
async fn dial(
&self,
_peer: &Self::Addr,
) -> Result<
crate::peering::transport::backend::AuthenticatedStream<Self::Stream, Self::Addr>,
AureliaError,
> {
Err(AureliaError::new(ErrorId::PeerUnavailable))
}
}
async fn test_transport(
registry: Arc<TabernaRegistry>,
config: DomusConfigAccess,
) -> Transport<TestBackend> {
let backend = Arc::new(TestBackend);
let transport = Transport::bind_with_backend(
DomusAddr::Tcp(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)),
registry,
config,
crate::peering::observability::new_observability(tokio::runtime::Handle::current()).1,
tokio::runtime::Handle::current(),
backend,
)
.await
.expect("bind transport");
transport
}
#[test]
fn wire_header_roundtrip() {
let header = WireHeader {
version: PROTOCOL_VERSION,
flags: 0x0102,
msg_type: 42,
peer_msg_id: 7,
src_taberna: 11,
dst_taberna: 22,
payload_len: 256,
};
let encoded = header.encode();
let decoded = WireHeader::decode(&encoded).expect("decode header");
assert_eq!(header, decoded);
}
#[test]
fn wire_header_rejects_invalid_length() {
let buf = [0u8; 4];
let err = WireHeader::decode(&buf).expect_err("expected invalid length error");
assert_eq!(err.kind, ErrorId::DecodeFailure);
}
#[test]
fn wire_header_rejects_unsupported_version() {
let header = WireHeader {
version: PROTOCOL_VERSION + 1,
flags: 0,
msg_type: 1,
peer_msg_id: 1,
src_taberna: 1,
dst_taberna: 1,
payload_len: 0,
};
let encoded = header.encode();
let err = WireHeader::decode(&encoded).expect_err("expected unsupported version error");
assert_eq!(err.kind, ErrorId::UnsupportedVersion);
}
#[test]
fn error_payload_rejects_short_buffer() {
let buf = [0u8; 3];
let err = ErrorPayload::from_bytes(&buf).expect_err("expected invalid length error");
assert_eq!(err.kind, ErrorId::DecodeFailure);
}
#[test]
fn peer_message_id_rollover() {
let allocator = PeerMessageIdAllocator::new(u32::MAX);
assert_eq!(allocator.next(), u32::MAX);
assert_eq!(allocator.next(), 0);
}
#[tokio::test]
async fn taberna_registry_register_and_resolve() {
let registry = TabernaRegistry::new();
let sink = Arc::new(MockSink::new(vec![0]));
let taberna_id = 42;
registry.register(taberna_id, sink.clone()).await.unwrap();
let resolved = registry.resolve_local(taberna_id).await;
assert!(resolved.is_some());
registry.unregister(taberna_id).await;
let resolved = registry.resolve_local(taberna_id).await;
assert!(resolved.is_none());
}
#[tokio::test]
async fn simple_resolver_resolves_and_removes_routes() {
let resolver = SimpleResolver::new();
let taberna_id = 42;
let addr = DomusAddr::Tcp(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5555));
resolver.insert(taberna_id, addr.clone()).await;
let resolved = resolver.resolve(taberna_id).await.expect("resolve");
assert_eq!(resolved, addr);
resolver.remove(taberna_id).await;
let err = resolver
.resolve(taberna_id)
.await
.expect_err("unknown taberna");
assert_eq!(err.kind, ErrorId::UnknownTaberna);
}
#[tokio::test]
async fn peering_dispatches_local_and_remote() {
let registry = Arc::new(TabernaRegistry::new());
let sink = Arc::new(MockSink::new(vec![90]));
let taberna_id = 1;
registry.register(taberna_id, sink.clone()).await.unwrap();
let config = DomusConfigBuilder::new()
.send_timeout(std::time::Duration::from_millis(2))
.accept_timeout(std::time::Duration::from_millis(2))
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(config);
let resolver = Arc::new(MockResolver {
addr: Some(DomusAddr::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
5555,
))),
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
peering
.send(
&codec,
taberna_id,
&test_message(90, b"local"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect("local send");
{
let accepted = sink.accepted.lock().await;
assert_eq!(accepted.len(), 1);
}
let err = peering
.send(
&codec,
999,
&test_message(10, b"remote"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("remote send timeout");
assert_eq!(err.kind, ErrorId::SendTimeout);
}
#[tokio::test]
async fn peering_local_accept_rejected_maps_error() {
let registry = Arc::new(TabernaRegistry::new());
let sink = Arc::new(MockSink::with_mode(
vec![1],
SinkMode::Err(AcceptErrorKind::RemoteTabernaRejected),
));
let taberna_id = 5;
registry.register(taberna_id, sink).await.unwrap();
let config = DomusConfigAccess::from_config(DomusConfig::default());
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(
&codec,
taberna_id,
&test_message(1, b"reject"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("expected rejection");
assert_eq!(err.kind, ErrorId::RemoteTabernaRejected);
}
#[tokio::test]
async fn peering_local_accept_ingress_full_maps_error() {
let registry = Arc::new(TabernaRegistry::new());
let sink = Arc::new(MockSink::with_mode(
vec![1],
SinkMode::Err(AcceptErrorKind::LocalQueueFull),
));
let taberna_id = 15;
registry.register(taberna_id, sink).await.unwrap();
let config = DomusConfigAccess::from_config(DomusConfig::default());
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(
&codec,
taberna_id,
&test_message(1, b"full"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("expected local queue full");
assert_eq!(err.kind, ErrorId::LocalQueueFull);
}
#[tokio::test]
async fn peering_local_accept_timeout_maps_error() {
let registry = Arc::new(TabernaRegistry::new());
let taberna_id = 6;
let domus_config = DomusConfigBuilder::new()
.accept_timeout(std::time::Duration::from_millis(20))
.taberna_accept_queue_size(1)
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(domus_config.clone());
let (sender, receiver) = MpscBuilder::new(
domus_config.taberna_accept_queue_size,
domus_config.accept_timeout,
)
.runtime(tokio::runtime::Handle::current())
.build()
.expect("caducus build");
let inbox = Arc::new(TabernaInboxHandle::new(
TestCodec,
sender,
config.clone(),
domus_config.taberna_accept_queue_size,
domus_config.accept_timeout,
));
registry.register(taberna_id, inbox).await.unwrap();
let _receiver_guard = receiver;
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(
&codec,
taberna_id,
&test_message(1, b"timeout"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("expected taberna busy");
assert_eq!(err.kind, ErrorId::TabernaBusy);
}
#[tokio::test]
async fn taberna_shutdown_drops_accept_waiters() {
let registry = Arc::new(TabernaRegistry::new());
let taberna_id = 8u64;
let domus_config = DomusConfigBuilder::new()
.accept_timeout(Duration::from_secs(1))
.taberna_accept_queue_size(1)
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(domus_config.clone());
let (sender, receiver) = MpscBuilder::new(
domus_config.taberna_accept_queue_size,
domus_config.accept_timeout,
)
.shutdown_channel(Arc::new(TabernaShutdownReport::<TestCodec>::new()))
.runtime(tokio::runtime::Handle::current())
.build()
.expect("caducus build");
let inbox: Arc<dyn TabernaInbox> = Arc::new(TabernaInboxHandle::new(
TestCodec,
sender,
config.clone(),
domus_config.taberna_accept_queue_size,
domus_config.accept_timeout,
));
registry
.register(taberna_id, Arc::clone(&inbox))
.await
.unwrap();
let _receiver_guard = receiver;
let accept_rx = inbox
.enqueue(1u32, Bytes::from_static(b"shutdown"), None, None)
.await
.expect("enqueue");
registry.shutdown().await;
let result = tokio::time::timeout(Duration::from_millis(100), accept_rx)
.await
.expect("accept wait");
let err = result.expect("accept recv").expect_err("domus closed");
assert_eq!(err.kind, ErrorId::DomusClosed);
}
#[tokio::test]
async fn peering_local_accept_explicit_timeout_maps_error() {
let registry = Arc::new(TabernaRegistry::new());
let sink = Arc::new(MockSink::with_mode(
vec![1],
SinkMode::Err(AcceptErrorKind::TabernaBusy),
));
let taberna_id = 7;
registry.register(taberna_id, sink).await.unwrap();
let config = DomusConfigAccess::from_config(DomusConfig::default());
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(
&codec,
taberna_id,
&test_message(1, b"timeout"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("expected taberna busy");
assert_eq!(err.kind, ErrorId::TabernaBusy);
}
#[tokio::test]
async fn peering_route_resolution_errors_map() {
let registry = Arc::new(TabernaRegistry::new());
let config = DomusConfigAccess::from_config(DomusConfig::default());
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let taberna_id = 999;
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Unknown,
});
let peering = RouteLocalRemoteBuilder::new(
config.clone(),
Arc::clone(®istry),
resolver,
Arc::clone(&transport),
)
.build();
let codec = TestCodec;
let err = peering
.send(
&codec,
taberna_id,
&test_message(1, b"x"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("unknown taberna");
assert_eq!(err.kind, ErrorId::UnknownTaberna);
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Failed,
});
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let err = peering
.send(
&codec,
taberna_id,
&test_message(1, b"x"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("resolver failed");
assert_eq!(err.kind, ErrorId::PeerUnavailable);
assert!(err
.message
.as_deref()
.unwrap_or_default()
.contains("resolver failed"));
}
#[tokio::test]
async fn peering_remote_dispatch_times_out_without_peer() {
let registry = Arc::new(TabernaRegistry::new());
let config = DomusConfigBuilder::new()
.send_timeout(std::time::Duration::from_millis(2))
.accept_timeout(std::time::Duration::from_millis(2))
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(config);
let resolver = Arc::new(MockResolver {
addr: Some(DomusAddr::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
5555,
))),
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(
&codec,
999,
&test_message(1, b"x"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("expected send timeout");
assert_eq!(err.kind, ErrorId::SendTimeout);
}
#[tokio::test]
async fn peering_route_resolution_timeout_maps_send_timeout() {
let registry = Arc::new(TabernaRegistry::new());
let config = DomusConfigBuilder::new()
.send_timeout(std::time::Duration::from_millis(1))
.accept_timeout(std::time::Duration::from_millis(1))
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(config);
let resolver = Arc::new(MockResolver {
addr: Some(DomusAddr::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
5555,
))),
mode: ResolverMode::Delay(std::time::Duration::from_millis(5)),
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(
&codec,
500,
&test_message(1, b"timeout"),
SendOptions::MESSAGE_ONLY,
)
.await
.expect_err("expected send timeout");
assert_eq!(err.kind, ErrorId::SendTimeout);
}
#[tokio::test]
async fn peering_send_blob_local_streams_chunks() {
let registry = Arc::new(TabernaRegistry::new());
let sink = Arc::new(MockSink::new(vec![44]));
let taberna_id = 2;
registry.register(taberna_id, sink.clone()).await.unwrap();
let config = DomusConfigAccess::from_config(DomusConfig::default());
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Unknown,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let outcome = peering
.send(
&codec,
taberna_id,
&test_message(44, b"blob"),
SendOptions::BLOB,
)
.await
.expect("send blob");
let mut sender = match outcome {
SendOutcome::Blob { sender } => sender,
SendOutcome::MessageOnly => panic!("expected blob sender"),
};
use tokio::io::AsyncWriteExt;
sender.write_all(b"first").await.expect("write first chunk");
sender
.write_all(b"second")
.await
.expect("write second chunk");
sender.shutdown().await.expect("shutdown sender");
let mut accepted = sink.accepted.lock().await;
assert_eq!(accepted.len(), 1);
let record = accepted.pop().expect("recorded blob");
assert_eq!(record.0, 44);
assert_eq!(record.1, Bytes::from_static(b"blob"));
let mut receiver = record.2.expect("blob receiver");
drop(accepted);
use tokio::io::AsyncReadExt;
let mut buffer = Vec::new();
receiver.read_to_end(&mut buffer).await.expect("read blob");
assert_eq!(buffer, b"firstsecond");
}
#[tokio::test]
async fn peering_send_blob_local_respects_buffer_caps() {
let registry = Arc::new(TabernaRegistry::new());
let sink = Arc::new(MockSink::new(vec![44]));
let taberna_id = 2;
registry.register(taberna_id, sink.clone()).await.unwrap();
let config = DomusConfigBuilder::new()
.blob_chunk_size(4)
.blob_ack_window(2)
.blob_outbound_buffer_bytes(8)
.blob_inbound_buffer_bytes(8)
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(config);
let resolver = Arc::new(MockResolver {
addr: None,
mode: ResolverMode::Unknown,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let outcome = peering
.send(
&codec,
taberna_id,
&test_message(44, b"blob"),
SendOptions::BLOB,
)
.await
.expect("send blob");
let sender = match outcome {
SendOutcome::Blob { sender } => sender,
SendOutcome::MessageOnly => panic!("expected blob sender"),
};
let mut accepted = sink.accepted.lock().await;
assert_eq!(accepted.len(), 1);
let record = accepted.pop().expect("recorded blob");
let receiver = record.2.expect("blob receiver");
drop(accepted);
let err = peering
.send(
&codec,
taberna_id,
&test_message(44, b"blob-two"),
SendOptions::BLOB,
)
.await
.expect_err("expected blob buffer full");
assert_eq!(err.kind, ErrorId::BlobBufferFull);
drop(sender);
drop(receiver);
let outcome = peering
.send(
&codec,
taberna_id,
&test_message(44, b"blob-three"),
SendOptions::BLOB,
)
.await
.expect("send blob after release");
let sender = match outcome {
SendOutcome::Blob { sender } => sender,
SendOutcome::MessageOnly => panic!("expected blob sender"),
};
drop(sender);
let mut accepted = sink.accepted.lock().await;
assert_eq!(accepted.len(), 1);
let record = accepted.pop().expect("recorded blob");
drop(record.2);
}
#[tokio::test]
async fn peering_send_blob_remote_dispatch_times_out_without_peer() {
let registry = Arc::new(TabernaRegistry::new());
let config = DomusConfigBuilder::new()
.send_timeout(std::time::Duration::from_millis(2))
.accept_timeout(std::time::Duration::from_millis(2))
.build()
.expect("valid domus config");
let config = DomusConfigAccess::from_config(config);
let resolver = Arc::new(MockResolver {
addr: Some(DomusAddr::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
5559,
))),
mode: ResolverMode::Ok,
});
let transport = Arc::new(test_transport(Arc::clone(®istry), config.clone()).await);
let peering = RouteLocalRemoteBuilder::new(config, registry, resolver, transport).build();
let codec = TestCodec;
let err = peering
.send(&codec, 901, &test_message(33, b"remote"), SendOptions::BLOB)
.await
.expect_err("expected send timeout");
assert_eq!(err.kind, ErrorId::SendTimeout);
}