#![deny(missing_docs)]
pub use selium_remote_client_protocol::{
AbiParam, AbiScalarType, AbiScalarValue, AbiSignature, Capability, EntrypointArg,
GuestResourceId, GuestUint,
};
use std::{
collections::VecDeque,
fmt::{Debug, Display},
fs,
io::ErrorKind,
net::SocketAddr,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use futures::{Sink, Stream};
use quinn::{
ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, RecvStream, SendStream,
TransportConfig, WriteError, crypto::rustls::QuicClientConfig, rustls,
};
use rustls::{
RootCertStore,
pki_types::{CertificateDer, PrivateKeyDer},
};
use rustls_pki_types::{PrivatePkcs1KeyDer, PrivatePkcs8KeyDer, pem::SliceIter};
use selium_remote_client_protocol::{
ChannelRef, ProcessStartRequest, Request, Response, decode_response, encode_request,
};
use thiserror::Error;
use tokio::net::lookup_host;
use tracing::debug;
type Result<T> = std::result::Result<T, ClientError>;
pub const DEFAULT_DOMAIN: &str = "localhost";
pub const DEFAULT_PORT: u16 = 7000;
pub const DEFAULT_RESPONSE_LIMIT: usize = 8 * 1024;
#[derive(Debug, Error)]
pub enum ClientError {
#[error("failed to parse certificate: {0}")]
Certificate(String),
#[error("failed to build TLS config: {0}")]
Tls(#[source] rustls::Error),
#[error("failed to resolve {target}: {source}")]
Resolve {
target: String,
#[source]
source: std::io::Error,
},
#[error("failed to open client endpoint: {0}")]
Endpoint(#[source] std::io::Error),
#[error("failed to connect: {0}")]
Connect(#[source] ConnectError),
#[error("connection failed: {0}")]
Connection(#[source] ConnectionError),
#[error("stream write failed: {0}")]
Write(#[source] WriteError),
#[error("stream finish failed: {0}")]
Finish(String),
#[error("stream read failed: {0}")]
Read(String),
#[error("encode request: {0}")]
Encode(String),
#[error("decode response: {0}")]
Decode(String),
#[error("invalid request: {0}")]
InvalidArgument(&'static str),
#[error("remote error: {0}")]
Remote(String),
#[error("unexpected response from remote client")]
UnexpectedResponse,
}
#[derive(Clone, Debug)]
pub struct ClientConfigBuilder {
domain: String,
port: u16,
response_limit: usize,
cert_dir: PathBuf,
}
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<ClientInner>,
}
#[derive(Debug)]
struct ClientInner {
endpoint: Endpoint,
server_addr: SocketAddr,
server_name: String,
response_limit: usize,
}
#[derive(Clone, Debug)]
pub struct Channel {
client: Client,
handle: GuestResourceId,
}
#[derive(Clone, Debug)]
pub struct Process {
client: Client,
handle: GuestResourceId,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ProcessBuilder {
module_id: String,
entrypoint: String,
log_uri: Option<String>,
capabilities: Vec<Capability>,
signature: AbiSignature,
args: Vec<EntrypointArg>,
}
struct QuicSession {
connection: Connection,
send: SendStream,
recv: RecvStream,
}
enum PublishState {
Ready(SendStream),
Writing(Pin<Box<dyn std::future::Future<Output = Result<SendStream>> + Send>>),
Closed,
}
pub struct Publisher {
_connection: Connection,
state: PublishState,
}
pub struct Subscriber {
inner: Pin<Box<dyn Stream<Item = Result<Vec<u8>>> + Send>>,
}
impl Default for ClientConfigBuilder {
fn default() -> Self {
Self {
domain: DEFAULT_DOMAIN.to_string(),
port: DEFAULT_PORT,
response_limit: DEFAULT_RESPONSE_LIMIT,
cert_dir: default_cert_dir(),
}
}
}
impl ClientConfigBuilder {
pub fn domain(mut self, domain: impl Into<String>) -> Self {
self.domain = domain.into();
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn response_limit(mut self, limit: usize) -> Self {
self.response_limit = limit.max(1);
self
}
pub fn certificate_directory(mut self, dir: impl Into<PathBuf>) -> Self {
self.cert_dir = dir.into();
self
}
pub async fn connect(self) -> Result<Client> {
Client::connect_with(self).await
}
}
impl Client {
pub async fn connect() -> Result<Self> {
Client::connect_with(ClientConfigBuilder::default()).await
}
async fn connect_with(config: ClientConfigBuilder) -> Result<Self> {
let server_addr = resolve_socket(&config.domain, config.port).await?;
let bind_addr = match server_addr {
SocketAddr::V4(_) => SocketAddr::from(([0, 0, 0, 0], 0)),
SocketAddr::V6(_) => SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0], 0)),
};
let mut endpoint = Endpoint::client(bind_addr).map_err(ClientError::Endpoint)?;
endpoint.set_default_client_config(build_client_config(&config.cert_dir)?);
Ok(Self {
inner: Arc::new(ClientInner {
endpoint,
server_addr,
server_name: config.domain,
response_limit: config.response_limit,
}),
})
}
async fn open_session(&self) -> Result<QuicSession> {
let connecting = self
.inner
.endpoint
.connect(self.inner.server_addr, &self.inner.server_name)
.map_err(ClientError::Connect)?;
let connection = connecting.await.map_err(ClientError::Connection)?;
let (send, recv) = connection
.open_bi()
.await
.map_err(ClientError::Connection)?;
Ok(QuicSession {
connection,
send,
recv,
})
}
async fn request(&self, request: Request) -> Result<Response> {
let mut session = self.open_session().await?;
let payload =
encode_request(&request).map_err(|err| ClientError::Encode(err.to_string()))?;
debug!("sending request of {} bytes", payload.len());
session
.send
.write_all(&payload)
.await
.map_err(ClientError::Write)?;
session
.send
.finish()
.map_err(|_| ClientError::Finish("stream closed".to_string()))?;
debug!("request sent, awaiting response");
let mut buffer = VecDeque::new();
let response =
read_response(&mut session.recv, self.inner.response_limit, &mut buffer).await?;
match response {
Response::Error(message) => Err(ClientError::Remote(message)),
other => Ok(other),
}
}
}
impl Channel {
pub async fn create(client: &Client, capacity: u32) -> Result<Self> {
let response = client.request(Request::ChannelCreate(capacity)).await?;
match response {
Response::ChannelCreate(handle) => Ok(Self::new(client, handle)),
Response::Error(msg) => Err(ClientError::Remote(msg)),
_ => Err(ClientError::UnexpectedResponse),
}
}
pub fn new(client: &Client, handle: GuestResourceId) -> Self {
Self {
client: client.clone(),
handle,
}
}
pub async fn delete(self) -> Result<()> {
let response = self
.client
.request(Request::ChannelDelete(self.handle))
.await?;
match response {
Response::Ok => Ok(()),
Response::Error(msg) => Err(ClientError::Remote(msg)),
_ => Err(ClientError::UnexpectedResponse),
}
}
pub async fn subscribe(&self, chunk_size: u32) -> Result<Subscriber> {
self.subscribe_inner(ChannelRef::Strong(self.handle), chunk_size)
.await
}
pub async fn subscribe_shared(&self, chunk_size: u32) -> Result<Subscriber> {
self.subscribe_inner(ChannelRef::Shared(self.handle), chunk_size)
.await
}
async fn subscribe_inner(&self, target: ChannelRef, chunk_size: u32) -> Result<Subscriber> {
let mut session = self.client.open_session().await?;
let chunk_size = GuestUint::try_from(chunk_size)
.map_err(|_| ClientError::InvalidArgument("chunk size exceeds u32::MAX"))?;
let payload = encode_request(&Request::Subscribe(target, chunk_size))
.map_err(|err| ClientError::Encode(err.to_string()))?;
session
.send
.write_all(&payload)
.await
.map_err(ClientError::Write)?;
session
.send
.finish()
.map_err(|_| ClientError::Finish("stream closed".to_string()))?;
let max_frame = usize::try_from(chunk_size)
.map_err(|_| ClientError::InvalidArgument("chunk size exceeds usize::MAX"))?;
let connection = session.connection.clone();
let stream = futures::stream::unfold(
(session.recv, VecDeque::new(), max_frame, connection),
move |(mut recv, mut buffer, max_frame, connection)| async move {
match read_subscribed_frame(&mut recv, &mut buffer, max_frame).await {
Ok(Some(frame)) => Some((Ok(frame), (recv, buffer, max_frame, connection))),
Ok(None) => None,
Err(err) => Some((Err(err), (recv, buffer, max_frame, connection))),
}
},
);
Ok(Subscriber::new(stream))
}
pub async fn publish(&self) -> Result<Publisher> {
let mut session = self.client.open_session().await?;
let payload = encode_request(&Request::Publish(self.handle))
.map_err(|err| ClientError::Encode(err.to_string()))?;
session
.send
.write_all(&payload)
.await
.map_err(ClientError::Write)?;
let mut buffer = VecDeque::new();
read_response_once(
&mut session.recv,
self.client.inner.response_limit,
&mut buffer,
)
.await?;
Ok(Publisher {
_connection: session.connection,
state: PublishState::Ready(session.send),
})
}
pub fn handle(&self) -> GuestResourceId {
self.handle
}
}
impl ProcessBuilder {
pub fn new(module_id: impl Into<String>, entrypoint: impl Into<String>) -> Self {
Self {
module_id: module_id.into(),
entrypoint: entrypoint.into(),
log_uri: None,
capabilities: vec![Capability::ChannelLifecycle, Capability::ChannelWriter],
signature: AbiSignature::new(Vec::new(), Vec::new()),
args: Vec::new(),
}
}
pub fn capability(mut self, capability: Capability) -> Self {
if !self.capabilities.contains(&capability) {
self.capabilities.push(capability);
}
self
}
pub fn signature(mut self, signature: AbiSignature) -> Self {
self.signature = signature;
self
}
pub fn log_uri(mut self, value: impl Into<String>) -> Self {
self.log_uri = Some(value.into());
self
}
pub fn arg_scalar(mut self, value: AbiScalarValue) -> Self {
self.args.push(EntrypointArg::Scalar(value));
self
}
pub fn arg_utf8(self, value: impl Into<String>) -> Self {
self.arg_buffer(value.into().into_bytes())
}
pub fn arg_buffer(mut self, value: impl Into<Vec<u8>>) -> Self {
self.args.push(EntrypointArg::Buffer(value.into()));
self
}
pub fn arg_resource(mut self, handle: impl Into<GuestResourceId>) -> Self {
self.args.push(EntrypointArg::Resource(handle.into()));
self
}
fn build_request(self) -> Result<ProcessStartRequest> {
validate_entrypoint_args(&self.signature, &self.args)?;
Ok(ProcessStartRequest {
module_id: self.module_id,
entrypoint: self.entrypoint,
log_uri: self.log_uri,
capabilities: self.capabilities,
signature: self.signature,
args: self.args,
})
}
}
fn validate_entrypoint_args(signature: &AbiSignature, args: &[EntrypointArg]) -> Result<()> {
if signature.params().len() != args.len() {
return Err(ClientError::InvalidArgument(
"arguments do not satisfy the signature",
));
}
for (param, arg) in signature.params().iter().zip(args.iter()) {
match (param, arg) {
(AbiParam::Scalar(expected), EntrypointArg::Scalar(actual))
if actual.kind() == *expected => {}
(AbiParam::Scalar(AbiScalarType::I32), EntrypointArg::Resource(_)) => {}
(AbiParam::Scalar(AbiScalarType::U64), EntrypointArg::Resource(_)) => {}
(AbiParam::Buffer, EntrypointArg::Buffer(_)) => {}
_ => {
return Err(ClientError::InvalidArgument(
"arguments do not satisfy the signature",
));
}
}
}
Ok(())
}
impl Process {
pub fn new(client: &Client, handle: GuestResourceId) -> Self {
Self {
client: client.clone(),
handle,
}
}
pub async fn start(client: &Client, builder: ProcessBuilder) -> Result<Self> {
let request = builder.build_request()?;
let response = client.request(Request::ProcessStart(request)).await?;
match response {
Response::ProcessStart(handle) => Ok(Self::new(client, handle)),
Response::Error(msg) => Err(ClientError::Remote(msg)),
_ => Err(ClientError::UnexpectedResponse),
}
}
pub fn handle(&self) -> GuestResourceId {
self.handle
}
pub async fn log_channel(&self) -> Result<Channel> {
let response = self
.client
.request(Request::ProcessLogChannel(self.handle))
.await?;
match response {
Response::ProcessLogChannel(handle) => Ok(Channel::new(&self.client, handle)),
Response::Error(msg) => Err(ClientError::Remote(msg)),
_ => Err(ClientError::UnexpectedResponse),
}
}
pub async fn stop(self) -> Result<()> {
let response = self
.client
.request(Request::ProcessStop(self.handle))
.await?;
match response {
Response::Ok => Ok(()),
Response::Error(msg) => Err(ClientError::Remote(msg)),
_ => Err(ClientError::UnexpectedResponse),
}
}
}
impl Display for Process {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Process({})", self.handle)
}
}
impl Subscriber {
fn new(stream: impl Stream<Item = Result<Vec<u8>>> + Send + 'static) -> Self {
Self {
inner: Box::pin(stream),
}
}
}
impl Stream for Subscriber {
type Item = Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.inner.as_mut().poll_next(cx)
}
}
impl Sink<Vec<u8>> for Publisher {
type Error = ClientError;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
match &mut this.state {
PublishState::Ready(_) => Poll::Ready(Ok(())),
PublishState::Writing(fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(stream)) => {
this.state = PublishState::Ready(stream);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => {
this.state = PublishState::Closed;
Poll::Ready(Err(err))
}
Poll::Pending => Poll::Pending,
},
PublishState::Closed => Poll::Ready(Err(ClientError::UnexpectedResponse)),
}
}
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<()> {
let this = self.get_mut();
match std::mem::replace(&mut this.state, PublishState::Closed) {
PublishState::Ready(stream) => {
this.state = PublishState::Writing(Box::pin(write_once(stream, item)));
Ok(())
}
other => {
this.state = other;
Err(ClientError::InvalidArgument("publisher not ready"))
}
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_ready(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
loop {
match std::mem::replace(&mut this.state, PublishState::Closed) {
PublishState::Ready(mut stream) => {
stream
.finish()
.map_err(|_| ClientError::Finish("stream closed".to_string()))?;
this.state = PublishState::Closed;
return Poll::Ready(Ok(()));
}
PublishState::Writing(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(stream)) => {
this.state = PublishState::Ready(stream);
}
Poll::Ready(Err(err)) => {
this.state = PublishState::Closed;
return Poll::Ready(Err(err));
}
Poll::Pending => {
this.state = PublishState::Writing(fut);
return Poll::Pending;
}
},
PublishState::Closed => return Poll::Ready(Ok(())),
}
}
}
}
async fn read_subscribed_frame(
recv: &mut RecvStream,
buffer: &mut VecDeque<u8>,
max_frame: usize,
) -> Result<Option<Vec<u8>>> {
loop {
if let Some(frame) = try_parse_frame(
buffer,
max_frame,
"frame length exceeds subscribed chunk size",
)? {
return Ok(Some(frame));
}
let mut chunk = vec![0u8; max_frame.max(4)];
match recv.read(&mut chunk).await {
Ok(Some(len)) if len > 0 => {
chunk.truncate(len);
buffer.extend(chunk);
}
Ok(Some(_)) => {}
Ok(None) => {
if buffer.is_empty() {
return Ok(None);
}
return Err(ClientError::Decode(
"stream ended with a partial frame".to_string(),
));
}
Err(err) => return Err(ClientError::Read(err.to_string())),
}
}
}
fn try_parse_frame(
buffer: &mut VecDeque<u8>,
max_frame: usize,
limit_error: &'static str,
) -> Result<Option<Vec<u8>>> {
const LENGTH_PREFIX: usize = 4;
if buffer.len() < LENGTH_PREFIX {
return Ok(None);
}
let mut len_bytes = [0u8; LENGTH_PREFIX];
for (dst, src) in len_bytes.iter_mut().zip(buffer.iter().take(LENGTH_PREFIX)) {
*dst = *src;
}
let frame_len = u32::from_le_bytes(len_bytes) as usize;
if frame_len > max_frame {
return Err(ClientError::InvalidArgument(limit_error));
}
if buffer.len() < LENGTH_PREFIX + frame_len {
return Ok(None);
}
buffer.drain(..LENGTH_PREFIX);
let payload = buffer.drain(..frame_len).collect();
Ok(Some(payload))
}
async fn read_response_frame(
recv: &mut RecvStream,
limit: usize,
buffer: &mut VecDeque<u8>,
) -> Result<Vec<u8>> {
let max_buffer = limit.saturating_add(4);
loop {
if let Some(frame) = try_parse_frame(
buffer,
limit,
"response frame length exceeds response limit",
)? {
return Ok(frame);
}
let mut chunk = vec![0u8; max_buffer.saturating_sub(buffer.len()).max(4)];
match recv.read(&mut chunk).await {
Ok(Some(len)) if len > 0 => {
debug!("read {len} bytes from response stream");
chunk.truncate(len);
buffer.extend(chunk);
if buffer.len() > max_buffer {
return Err(ClientError::Decode(
"response exceeded maximum size".to_string(),
));
}
}
Ok(Some(_)) => {}
Ok(None) => {
return Err(ClientError::Decode(
"response stream ended before frame".to_string(),
));
}
Err(err) => return Err(ClientError::Read(err.to_string())),
}
}
}
async fn read_response(
recv: &mut RecvStream,
limit: usize,
buffer: &mut VecDeque<u8>,
) -> Result<Response> {
let payload = read_response_frame(recv, limit, buffer).await?;
decode_response(&payload).map_err(|err| ClientError::Decode(err.to_string()))
}
async fn read_response_once(
recv: &mut RecvStream,
limit: usize,
buffer: &mut VecDeque<u8>,
) -> Result<()> {
let response = read_response(recv, limit, buffer).await?;
match response {
Response::Ok => Ok(()),
Response::Error(msg) => Err(ClientError::Remote(msg)),
_ => Err(ClientError::UnexpectedResponse),
}
}
async fn resolve_socket(domain: &str, port: u16) -> Result<SocketAddr> {
let target = format!("{domain}:{port}");
let addrs: Vec<SocketAddr> = lookup_host(&target)
.await
.map_err(|source| ClientError::Resolve {
target: target.clone(),
source,
})?
.collect();
addrs
.iter()
.copied()
.find(SocketAddr::is_ipv4)
.or_else(|| addrs.into_iter().next())
.ok_or_else(|| ClientError::Resolve {
target,
source: std::io::Error::new(
ErrorKind::AddrNotAvailable,
"no socket addresses returned",
),
})
}
fn build_client_config(cert_dir: &Path) -> Result<ClientConfig> {
let provider = rustls::crypto::ring::default_provider();
let tls_builder = rustls::ClientConfig::builder_with_provider(provider.into())
.with_protocol_versions(&[&rustls::version::TLS13])
.map_err(ClientError::Tls)?;
let roots = build_root_store(cert_dir)?;
let client_cert = read_certificate(cert_dir, "client.crt")?;
let client_key = read_certificate(cert_dir, "client.key")?;
let cert_chain = parse_certificates(&client_cert)?;
let key = parse_private_key(&client_key)?;
let tls_config = tls_builder
.with_root_certificates(roots)
.with_client_auth_cert(cert_chain, key)
.map_err(ClientError::Tls)?;
let crypto = QuicClientConfig::try_from(Arc::new(tls_config))
.map_err(|_| ClientError::Certificate("failed to select QUIC cipher suite".to_string()))?;
let mut client_config = ClientConfig::new(Arc::new(crypto));
let mut transport = TransportConfig::default();
transport.keep_alive_interval(Some(Duration::from_secs(10)));
client_config.transport_config(Arc::new(transport));
Ok(client_config)
}
fn build_root_store(cert_dir: &Path) -> Result<RootCertStore> {
let mut store = RootCertStore::empty();
let ca_cert = read_certificate(cert_dir, "ca.crt")?;
for cert in parse_certificates(&ca_cert)? {
store
.add(cert)
.map_err(|_| ClientError::Certificate("add CA certificate".to_string()))?;
}
Ok(store)
}
fn read_certificate(cert_dir: &Path, name: &str) -> Result<Vec<u8>> {
let path = cert_dir.join(name);
fs::read(&path)
.map_err(|err| ClientError::Certificate(format!("read {}: {}", path.display(), err)))
}
fn default_cert_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../../certs")
}
fn parse_certificates(bytes: &[u8]) -> Result<Vec<CertificateDer<'static>>> {
let parsed: std::result::Result<Vec<_>, _> = SliceIter::new(bytes).collect();
let parsed = parsed.map_err(|err| ClientError::Certificate(err.to_string()))?;
if !parsed.is_empty() {
return Ok(parsed);
}
Ok(vec![CertificateDer::from(bytes.to_vec())])
}
fn parse_private_key(bytes: &[u8]) -> Result<PrivateKeyDer<'static>> {
let pkcs8_result: std::result::Result<Vec<PrivatePkcs8KeyDer>, _> =
SliceIter::new(bytes).collect();
let pkcs8 = pkcs8_result.map_err(|err| ClientError::Certificate(err.to_string()))?;
if let Some(key) = pkcs8.into_iter().next() {
return Ok(key.into());
}
let rsa_result: std::result::Result<Vec<PrivatePkcs1KeyDer>, _> =
SliceIter::new(bytes).collect();
let rsa = rsa_result.map_err(|err| ClientError::Certificate(err.to_string()))?;
if let Some(key) = rsa.into_iter().next() {
return Ok(key.into());
}
PrivateKeyDer::try_from(bytes.to_vec()).map_err(|_| {
ClientError::Certificate("no usable private key found in client key material".to_string())
})
}
async fn write_once(mut stream: SendStream, payload: Vec<u8>) -> Result<SendStream> {
if !payload.is_empty() {
stream
.write_all(&payload)
.await
.map_err(ClientError::Write)?;
}
Ok(stream)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn process_builder_validates_signature() {
let signature = AbiSignature::new(Vec::new(), Vec::new());
let builder = ProcessBuilder::new("module", "entry")
.signature(signature.clone())
.arg_resource(7u64);
let result = builder.build_request();
assert!(matches!(result, Err(ClientError::InvalidArgument(_))));
}
#[test]
fn request_round_trips() {
let req = Request::ChannelCreate(64 * 1024);
let encoded = encode_request(&req).expect("encode");
let decoded = selium_remote_client_protocol::decode_request(&encoded).expect("decode");
match decoded {
Request::ChannelCreate(capacity) => assert_eq!(capacity, 64 * 1024),
other => panic!("unexpected variant: {other:?}"),
}
}
#[test]
fn try_parse_frame_requires_complete_prefix() {
let mut buffer = VecDeque::from([0u8, 1u8, 2u8]);
let frame = try_parse_frame(&mut buffer, 8, "frame length exceeds subscribed chunk size")
.expect("parse result");
assert!(frame.is_none());
assert_eq!(buffer.len(), 3);
}
#[test]
fn try_parse_frame_extracts_payload() {
let mut buffer = VecDeque::new();
buffer.extend([3u8, 0, 0, 0]); buffer.extend([1u8, 2, 3]);
let frame = try_parse_frame(&mut buffer, 8, "frame length exceeds subscribed chunk size")
.expect("parse result")
.expect("frame available");
assert_eq!(frame, vec![1u8, 2, 3]);
assert!(buffer.is_empty());
}
#[test]
fn try_parse_frame_rejects_oversized_frame() {
let mut buffer = VecDeque::from([9u8, 0, 0, 0, 1, 2, 3]);
let result = try_parse_frame(&mut buffer, 4, "frame length exceeds subscribed chunk size");
assert!(matches!(result, Err(ClientError::InvalidArgument(_))));
}
}