use std::convert::Infallible;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::bytes::{Buf, Bytes, BytesCursor};
use crate::stream::Stream;
use crate::util::{DetBuildHasher, DetHashMap};
#[derive(Debug)]
pub enum Frame<T> {
Data(T),
Trailers(HeaderMap),
}
impl<T> Frame<T> {
#[must_use]
pub fn data(data: T) -> Self {
Self::Data(data)
}
#[must_use]
pub fn trailers(trailers: HeaderMap) -> Self {
Self::Trailers(trailers)
}
#[must_use]
pub fn is_data(&self) -> bool {
matches!(self, Self::Data(_))
}
#[must_use]
pub fn is_trailers(&self) -> bool {
matches!(self, Self::Trailers(_))
}
pub fn into_data(self) -> Option<T> {
match self {
Self::Data(data) => Some(data),
Self::Trailers(_) => None,
}
}
pub fn into_trailers(self) -> Option<HeaderMap> {
match self {
Self::Data(_) => None,
Self::Trailers(trailers) => Some(trailers),
}
}
#[must_use]
pub fn data_ref(&self) -> Option<&T> {
match self {
Self::Data(data) => Some(data),
Self::Trailers(_) => None,
}
}
pub fn data_mut(&mut self) -> Option<&mut T> {
match self {
Self::Data(data) => Some(data),
Self::Trailers(_) => None,
}
}
pub fn map_data<U, F>(self, f: F) -> Frame<U>
where
F: FnOnce(T) -> U,
{
match self {
Self::Data(data) => Frame::Data(f(data)),
Self::Trailers(trailers) => Frame::Trailers(trailers),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct HeaderMap {
headers: Vec<(HeaderName, HeaderValue)>,
positions: DetHashMap<HeaderName, Vec<usize>>,
}
impl HeaderMap {
#[must_use]
pub fn new() -> Self {
Self {
headers: Vec::new(),
positions: DetHashMap::default(),
}
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
headers: Vec::with_capacity(capacity),
positions: DetHashMap::with_capacity_and_hasher(capacity, DetBuildHasher),
}
}
pub fn insert(&mut self, name: HeaderName, value: HeaderValue) {
if let Some(indices) = self.positions.remove(&name) {
for index in indices.into_iter().rev() {
self.headers.remove(index);
}
self.rebuild_positions();
}
self.append(name, value);
}
pub fn append(&mut self, name: HeaderName, value: HeaderValue) {
let index = self.headers.len();
self.positions.entry(name.clone()).or_default().push(index);
self.headers.push((name, value));
}
#[must_use]
pub fn get(&self, name: &HeaderName) -> Option<&HeaderValue> {
self.positions
.get(name)
.and_then(|indices| indices.first())
.and_then(|index| self.headers.get(*index))
.map(|(_, value)| value)
}
pub fn iter(&self) -> impl Iterator<Item = (&HeaderName, &HeaderValue)> {
self.headers.iter().map(|(n, v)| (n, v))
}
#[must_use]
pub fn len(&self) -> usize {
self.headers.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.headers.is_empty()
}
fn rebuild_positions(&mut self) {
self.positions.clear();
for (index, (name, _)) in self.headers.iter().enumerate() {
self.positions.entry(name.clone()).or_default().push(index);
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HeaderName(String);
impl HeaderName {
#[must_use]
pub fn from_static(name: &'static str) -> Self {
Self(name.to_lowercase())
}
#[must_use]
pub fn from_string(name: &str) -> Self {
Self(name.to_lowercase())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for HeaderName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderValue(Vec<u8>);
impl HeaderValue {
#[must_use]
pub fn from_bytes(bytes: &[u8]) -> Self {
Self(bytes.to_vec())
}
#[must_use]
pub fn from_static(s: &'static str) -> Self {
Self(s.as_bytes().to_vec())
}
#[must_use]
pub fn from_string(s: String) -> Self {
Self(s.into_bytes())
}
#[must_use]
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
pub fn to_str(&self) -> Result<&str, std::str::Utf8Error> {
std::str::from_utf8(&self.0)
}
}
impl fmt::Display for HeaderValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.to_str() {
Ok(s) => write!(f, "{s}"),
Err(_) => write!(f, "{:?}", self.0),
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SizeHint {
lower: u64,
upper: Option<u64>,
}
impl SizeHint {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_exact(size: u64) -> Self {
Self {
lower: size,
upper: Some(size),
}
}
#[must_use]
pub fn lower(&self) -> u64 {
self.lower
}
#[must_use]
pub fn upper(&self) -> Option<u64> {
self.upper
}
#[must_use]
pub fn exact(&self) -> Option<u64> {
match self.upper {
Some(upper) if upper == self.lower => Some(upper),
_ => None,
}
}
pub fn set_lower(&mut self, lower: u64) {
self.lower = lower;
}
pub fn set_upper(&mut self, upper: u64) {
self.upper = Some(upper);
}
}
#[allow(clippy::type_complexity)]
pub trait Body {
type Data: Buf;
type Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;
fn is_end_stream(&self) -> bool {
false
}
fn size_hint(&self) -> SizeHint {
SizeHint::default()
}
}
impl<B: Body + ?Sized> Body for Pin<Box<B>> {
type Data = B::Data;
type Error = B::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.get_mut().as_mut().poll_frame(cx)
}
fn is_end_stream(&self) -> bool {
self.as_ref().is_end_stream()
}
fn size_hint(&self) -> SizeHint {
self.as_ref().size_hint()
}
}
impl<B: Body + Unpin + ?Sized> Body for &mut B {
type Data = B::Data;
type Error = B::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut **self).poll_frame(cx)
}
fn is_end_stream(&self) -> bool {
(**self).is_end_stream()
}
fn size_hint(&self) -> SizeHint {
(**self).size_hint()
}
}
impl<B: Body + Unpin + ?Sized> Body for Box<B> {
type Data = B::Data;
type Error = B::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut **self).poll_frame(cx)
}
fn is_end_stream(&self) -> bool {
(**self).is_end_stream()
}
fn size_hint(&self) -> SizeHint {
(**self).size_hint()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct Empty;
impl Empty {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Body for Empty {
type Data = BytesCursor;
type Error = Infallible;
fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(None)
}
fn is_end_stream(&self) -> bool {
true
}
fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(0)
}
}
#[derive(Debug, Clone)]
pub struct Full<D> {
data: Option<D>,
}
impl<D> Full<D> {
#[must_use]
pub fn new(data: D) -> Self {
Self { data: Some(data) }
}
}
impl<D: Buf + Unpin> Body for Full<D> {
type Data = D;
type Error = Infallible;
fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.get_mut();
match this.data.take() {
Some(data) if data.remaining() > 0 => Poll::Ready(Some(Ok(Frame::Data(data)))),
_ => Poll::Ready(None),
}
}
fn is_end_stream(&self) -> bool {
self.data.as_ref().is_none_or(|d| d.remaining() == 0)
}
fn size_hint(&self) -> SizeHint {
self.data.as_ref().map_or_else(
|| SizeHint::with_exact(0),
|data| SizeHint::with_exact(data.remaining() as u64),
)
}
}
impl<D> From<D> for Full<D>
where
D: Buf,
{
fn from(data: D) -> Self {
Self::new(data)
}
}
impl From<&'static str> for Full<BytesCursor> {
fn from(s: &'static str) -> Self {
Self::new(BytesCursor::new(Bytes::from_static(s.as_bytes())))
}
}
impl From<String> for Full<BytesCursor> {
fn from(s: String) -> Self {
Self::new(BytesCursor::new(Bytes::from(s.into_bytes())))
}
}
impl From<Vec<u8>> for Full<BytesCursor> {
fn from(v: Vec<u8>) -> Self {
Self::new(BytesCursor::new(Bytes::from(v)))
}
}
#[derive(Debug)]
pub struct StreamBody<S> {
stream: S,
done: bool,
}
impl<S> StreamBody<S> {
#[must_use]
pub fn new(stream: S) -> Self {
Self {
stream,
done: false,
}
}
#[must_use]
pub fn into_inner(self) -> S {
self.stream
}
}
impl<S, D, E> Body for StreamBody<S>
where
S: Stream<Item = Result<Frame<D>, E>> + Unpin,
D: Buf,
{
type Data = D;
type Error = E;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.done {
return Poll::Ready(None);
}
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
self.done = true;
Poll::Ready(None)
}
other => other,
}
}
fn is_end_stream(&self) -> bool {
self.done || matches!(self.stream.size_hint(), (0, Some(0)))
}
fn size_hint(&self) -> SizeHint {
if self.done || matches!(self.stream.size_hint(), (0, Some(0))) {
SizeHint::with_exact(0)
} else {
SizeHint::default()
}
}
}
#[derive(Debug)]
pub struct Collected<B: Body> {
_inner: B,
data: Vec<u8>,
trailers: Option<HeaderMap>,
_done: bool,
}
impl<B: Body> Collected<B>
where
B::Data: Buf,
{
pub fn new(inner: B) -> Self {
Self {
_inner: inner,
data: Vec::new(),
trailers: None,
_done: false,
}
}
#[must_use]
pub fn data(&self) -> &[u8] {
&self.data
}
#[must_use]
pub fn trailers(&self) -> Option<&HeaderMap> {
self.trailers.as_ref()
}
#[must_use]
pub fn into_data(self) -> Vec<u8> {
self.data
}
}
#[derive(Debug)]
pub struct Limited<B> {
inner: B,
remaining: u64,
state: LimitedState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LimitedState {
Open,
Completed,
Failed,
}
impl<B> Limited<B> {
pub fn new(inner: B, limit: u64) -> Self {
Self {
inner,
remaining: limit,
state: LimitedState::Open,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct LengthLimitError;
impl fmt::Display for LengthLimitError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "body length limit exceeded")
}
}
impl std::error::Error for LengthLimitError {}
impl<B: Body + Unpin> Body for Limited<B>
where
B::Data: Buf,
{
type Data = B::Data;
type Error = LimitedError<B::Error>;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = &mut *self;
match this.state {
LimitedState::Completed => return Poll::Ready(None),
LimitedState::Failed => {
return Poll::Ready(Some(Err(LimitedError::PolledAfterCompletion)));
}
LimitedState::Open => {}
}
match Pin::new(&mut this.inner).poll_frame(cx) {
Poll::Ready(Some(Ok(frame))) => {
if let Some(data) = frame.data_ref() {
let len = data.remaining() as u64;
if len > this.remaining {
this.state = LimitedState::Failed;
return Poll::Ready(Some(Err(LimitedError::LengthLimit)));
}
this.remaining -= len;
}
Poll::Ready(Some(Ok(frame)))
}
Poll::Ready(Some(Err(e))) => {
this.state = LimitedState::Failed;
Poll::Ready(Some(Err(LimitedError::Inner(e))))
}
Poll::Ready(None) => {
this.state = LimitedState::Completed;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
fn is_end_stream(&self) -> bool {
self.state != LimitedState::Open || self.inner.is_end_stream()
}
fn size_hint(&self) -> SizeHint {
if self.state != LimitedState::Open {
return SizeHint::with_exact(0);
}
let inner = self.inner.size_hint();
let mut hint = SizeHint::new();
hint.set_lower(inner.lower().min(self.remaining));
if let Some(upper) = inner.upper() {
hint.set_upper(upper.min(self.remaining));
} else {
hint.set_upper(self.remaining);
}
hint
}
}
#[derive(Debug)]
pub enum LimitedError<E> {
LengthLimit,
PolledAfterCompletion,
Inner(E),
}
impl<E: fmt::Display> fmt::Display for LimitedError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::LengthLimit => write!(f, "body length limit exceeded"),
Self::PolledAfterCompletion => write!(f, "limited body polled after completion"),
Self::Inner(e) => write!(f, "{e}"),
}
}
}
impl<E: std::error::Error + 'static> std::error::Error for LimitedError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::LengthLimit | Self::PolledAfterCompletion => None,
Self::Inner(e) => Some(e),
}
}
}
pub type BoxBody<D, E> = Pin<Box<dyn Body<Data = D, Error = E> + Send + 'static>>;
pub fn boxed<B>(body: B) -> BoxBody<B::Data, B::Error>
where
B: Body + Send + 'static,
{
Box::pin(body)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::runtime::yield_now::yield_now;
use crate::stream;
use crate::types::Budget;
use serde_json::Value;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::Waker;
fn noop_waker() -> std::task::Waker {
std::task::Waker::noop().clone()
}
#[allow(clippy::type_complexity)]
fn poll_body<B: Body + Unpin>(body: &mut B) -> Poll<Option<Result<Frame<B::Data>, B::Error>>> {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
Pin::new(body).poll_frame(&mut cx)
}
#[test]
fn empty_body_is_end_stream() {
let body = Empty::new();
assert!(body.is_end_stream());
assert_eq!(body.size_hint().exact(), Some(0));
}
#[test]
fn empty_body_returns_none() {
let mut body = Empty::new();
assert!(matches!(poll_body(&mut body), Poll::Ready(None)));
}
#[test]
fn full_body_returns_data_then_none() {
let cursor = BytesCursor::new(Bytes::from_static(b"hello"));
let mut body = Full::new(cursor);
assert!(!body.is_end_stream());
assert_eq!(body.size_hint().exact(), Some(5));
let Poll::Ready(Some(Ok(frame))) = poll_body(&mut body) else {
panic!("expected data frame") };
let data = frame.into_data().expect("expected data frame");
assert_eq!(data.chunk(), b"hello");
assert!(body.is_end_stream());
assert!(matches!(poll_body(&mut body), Poll::Ready(None)));
}
#[test]
fn full_body_from_string() {
let cursor = BytesCursor::new(Bytes::from_static(b"hello world"));
let body = Full::new(cursor);
assert_eq!(body.size_hint().exact(), Some(11));
}
#[test]
fn full_body_from_vec() {
let cursor = BytesCursor::new(Bytes::from(vec![1_u8, 2, 3, 4, 5]));
let body = Full::new(cursor);
assert_eq!(body.size_hint().exact(), Some(5));
}
#[test]
fn size_hint_exact() {
let hint = SizeHint::with_exact(42);
assert_eq!(hint.lower(), 42);
assert_eq!(hint.upper(), Some(42));
assert_eq!(hint.exact(), Some(42));
}
#[test]
fn size_hint_default() {
let hint = SizeHint::default();
assert_eq!(hint.lower(), 0);
assert_eq!(hint.upper(), None);
assert_eq!(hint.exact(), None);
}
#[test]
fn frame_data_methods() {
let frame: Frame<Bytes> = Frame::data(Bytes::from_static(b"test"));
assert!(frame.is_data());
assert!(!frame.is_trailers());
assert_eq!(frame.data_ref().unwrap().as_ref(), b"test");
}
#[test]
fn frame_trailers_methods() {
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("x-checksum"),
HeaderValue::from_static("abc123"),
);
let frame: Frame<Bytes> = Frame::trailers(headers);
assert!(!frame.is_data());
assert!(frame.is_trailers());
}
#[test]
fn header_map_operations() {
let mut headers = HeaderMap::new();
assert!(headers.is_empty());
let name = HeaderName::from_static("content-type");
let value = HeaderValue::from_static("application/json");
headers.insert(name.clone(), value);
assert_eq!(headers.len(), 1);
assert!(!headers.is_empty());
let retrieved = headers.get(&name).expect("header should exist");
assert_eq!(retrieved.as_bytes(), b"application/json");
}
#[test]
fn header_name_lowercase() {
let name = HeaderName::from_static("Content-Type");
assert_eq!(name.as_str(), "content-type");
}
#[test]
fn frame_into_data_some() {
let frame: Frame<Vec<u8>> = Frame::data(vec![1, 2, 3]);
let data = frame.into_data();
assert_eq!(data, Some(vec![1, 2, 3]));
}
#[test]
fn frame_into_data_none_for_trailers() {
let frame: Frame<Vec<u8>> = Frame::trailers(HeaderMap::new());
assert!(frame.into_data().is_none());
}
#[test]
fn frame_into_trailers_some() {
let mut hm = HeaderMap::new();
hm.insert(
HeaderName::from_static("x-foo"),
HeaderValue::from_static("bar"),
);
let frame: Frame<Vec<u8>> = Frame::trailers(hm);
let trailers = frame.into_trailers().expect("should be trailers");
assert_eq!(trailers.len(), 1);
}
#[test]
fn frame_into_trailers_none_for_data() {
let frame: Frame<Vec<u8>> = Frame::data(vec![]);
assert!(frame.into_trailers().is_none());
}
#[test]
fn frame_data_mut() {
let mut frame: Frame<Vec<u8>> = Frame::data(vec![1]);
if let Some(data) = frame.data_mut() {
data.push(2);
}
assert_eq!(frame.data_ref(), Some(&vec![1, 2]));
}
#[test]
fn frame_data_mut_none_for_trailers() {
let mut frame: Frame<Vec<u8>> = Frame::trailers(HeaderMap::new());
assert!(frame.data_mut().is_none());
}
#[test]
fn frame_map_data() {
let frame: Frame<u32> = Frame::data(5);
let mapped = frame.map_data(|n| n * 2);
assert_eq!(mapped.into_data(), Some(10));
}
#[test]
fn frame_map_data_preserves_trailers() {
let frame: Frame<u32> = Frame::trailers(HeaderMap::new());
let mapped = frame.map_data(|n: u32| n * 2);
assert!(mapped.is_trailers());
}
#[test]
fn frame_debug() {
let frame: Frame<u32> = Frame::data(42);
let dbg = format!("{frame:?}");
assert!(dbg.contains("Data"), "{dbg}");
}
#[test]
fn header_map_with_capacity() {
let hm = HeaderMap::with_capacity(10);
assert!(hm.is_empty());
assert_eq!(hm.len(), 0);
}
#[test]
fn header_map_insert_replaces() {
let mut hm = HeaderMap::new();
let name = HeaderName::from_static("x-key");
hm.insert(name.clone(), HeaderValue::from_static("v1"));
hm.insert(name.clone(), HeaderValue::from_static("v2"));
assert_eq!(hm.len(), 1);
assert_eq!(hm.get(&name).unwrap().to_str().unwrap(), "v2");
}
#[test]
fn header_map_append_allows_duplicates() {
let mut hm = HeaderMap::new();
let name = HeaderName::from_static("x-multi");
hm.append(name.clone(), HeaderValue::from_static("a"));
hm.append(name, HeaderValue::from_static("b"));
assert_eq!(hm.len(), 2);
}
#[test]
fn header_map_append_get_returns_first_duplicate() {
let mut hm = HeaderMap::new();
let name = HeaderName::from_static("x-multi");
hm.append(name.clone(), HeaderValue::from_static("a"));
hm.append(name.clone(), HeaderValue::from_static("b"));
assert_eq!(hm.get(&name).unwrap().to_str().unwrap(), "a");
}
#[test]
fn header_map_iter() {
let mut hm = HeaderMap::new();
hm.insert(HeaderName::from_static("a"), HeaderValue::from_static("1"));
hm.insert(HeaderName::from_static("b"), HeaderValue::from_static("2"));
let count = hm.iter().count();
assert_eq!(count, 2);
}
#[test]
fn header_map_get_missing() {
let hm = HeaderMap::new();
let name = HeaderName::from_static("missing");
assert!(hm.get(&name).is_none());
}
#[test]
fn header_map_insert_rebuilds_indices_after_removal() {
let mut hm = HeaderMap::new();
let a = HeaderName::from_static("a");
let b = HeaderName::from_static("b");
let c = HeaderName::from_static("c");
hm.append(a.clone(), HeaderValue::from_static("a1"));
hm.append(b.clone(), HeaderValue::from_static("b1"));
hm.append(a.clone(), HeaderValue::from_static("a2"));
hm.append(c.clone(), HeaderValue::from_static("c1"));
hm.insert(a.clone(), HeaderValue::from_static("a3"));
assert_eq!(hm.len(), 3);
assert_eq!(hm.get(&a).unwrap().to_str().unwrap(), "a3");
assert_eq!(hm.get(&b).unwrap().to_str().unwrap(), "b1");
assert_eq!(hm.get(&c).unwrap().to_str().unwrap(), "c1");
}
#[test]
fn header_map_debug_clone_default() {
let hm = HeaderMap::default();
assert!(hm.is_empty());
let dbg = format!("{hm:?}");
assert!(dbg.contains("HeaderMap"), "{dbg}");
let mut hm2 = hm;
hm2.insert(HeaderName::from_static("x"), HeaderValue::from_static("y"));
assert_eq!(hm2.len(), 1);
}
#[test]
fn header_name_from_string() {
let name = HeaderName::from_string("X-Custom");
assert_eq!(name.as_str(), "x-custom");
}
#[test]
fn header_name_display() {
let name = HeaderName::from_static("content-type");
assert_eq!(format!("{name}"), "content-type");
}
#[test]
fn header_name_eq_hash() {
use std::collections::HashSet;
let a = HeaderName::from_static("x-foo");
let b = HeaderName::from_string("X-Foo");
assert_eq!(a, b);
let mut set = HashSet::new();
set.insert(a);
assert!(set.contains(&b));
}
#[test]
fn header_value_from_bytes() {
let v = HeaderValue::from_bytes(b"hello");
assert_eq!(v.as_bytes(), b"hello");
assert_eq!(v.to_str().unwrap(), "hello");
}
#[test]
fn header_value_from_string() {
let v = HeaderValue::from_string("world".to_string());
assert_eq!(v.as_bytes(), b"world");
}
#[test]
fn header_value_display_utf8() {
let v = HeaderValue::from_static("text/plain");
assert_eq!(format!("{v}"), "text/plain");
}
#[test]
fn header_value_display_non_utf8() {
let v = HeaderValue::from_bytes(&[0xFF, 0xFE]);
let disp = format!("{v}");
assert!(disp.contains("255"), "{disp}");
}
#[test]
fn header_value_eq_clone() {
let a = HeaderValue::from_static("x");
let b = a.clone();
assert_eq!(a, b);
}
#[test]
fn size_hint_set_lower_upper() {
let mut hint = SizeHint::new();
hint.set_lower(10);
hint.set_upper(100);
assert_eq!(hint.lower(), 10);
assert_eq!(hint.upper(), Some(100));
assert_eq!(hint.exact(), None); }
#[test]
fn size_hint_exact_mismatch() {
let mut hint = SizeHint::new();
hint.set_lower(5);
hint.set_upper(10);
assert_eq!(hint.exact(), None);
}
#[test]
fn size_hint_debug_clone_copy() {
let hint = SizeHint::with_exact(42);
let dbg = format!("{hint:?}");
assert!(dbg.contains("SizeHint"), "{dbg}");
let copied = hint; let cloned = hint;
assert_eq!(copied.exact(), cloned.exact());
}
#[test]
fn empty_debug_clone_copy_default() {
let e = Empty::new();
let dbg = format!("{e:?}");
assert!(dbg.contains("Empty"), "{dbg}");
let copied = e; let cloned = e;
let defaulted = Empty;
let _ = (copied, cloned, defaulted);
}
#[test]
fn full_debug_clone() {
let cursor = BytesCursor::new(Bytes::from_static(b"abc"));
let body = Full::new(cursor);
let dbg = format!("{body:?}");
assert!(dbg.contains("Full"), "{dbg}");
let cloned = body;
assert_eq!(cloned.size_hint().exact(), Some(3));
}
#[test]
fn full_from_static_str() {
let body: Full<BytesCursor> = Full::from("hello");
assert_eq!(body.size_hint().exact(), Some(5));
}
#[test]
fn full_from_string_conversion() {
let body: Full<BytesCursor> = Full::from("world".to_string());
assert_eq!(body.size_hint().exact(), Some(5));
}
#[test]
fn full_from_vec() {
let body: Full<BytesCursor> = Full::from(vec![1u8, 2, 3]);
assert_eq!(body.size_hint().exact(), Some(3));
}
#[test]
fn full_empty_data_is_end_stream() {
let cursor = BytesCursor::new(Bytes::from_static(b""));
let body = Full::new(cursor);
assert!(body.is_end_stream());
}
#[test]
fn stream_body_debug_and_into_inner() {
let stream = vec![1, 2, 3];
let body = StreamBody::new(stream);
let dbg = format!("{body:?}");
assert!(dbg.contains("StreamBody"), "{dbg}");
let inner = body.into_inner();
assert_eq!(inner, vec![1, 2, 3]);
}
#[test]
fn stream_body_polls_frames_and_fuses_eof() {
let mut trailers = HeaderMap::new();
trailers.insert(
HeaderName::from_static("x-checksum"),
HeaderValue::from_static("abc123"),
);
let stream = stream::iter(vec![
Ok::<_, Infallible>(Frame::data(BytesCursor::new(Bytes::from_static(b"abc")))),
Ok(Frame::trailers(trailers)),
]);
let mut body = StreamBody::new(stream);
assert!(!body.is_end_stream());
let Poll::Ready(Some(Ok(frame))) = poll_body(&mut body) else {
panic!("expected first data frame")
};
let data = frame.into_data().expect("expected data frame");
assert_eq!(data.chunk(), b"abc");
assert!(!body.is_end_stream());
let Poll::Ready(Some(Ok(frame))) = poll_body(&mut body) else {
panic!("expected trailers frame") };
let trailers = frame.into_trailers().expect("expected trailers frame");
assert_eq!(
trailers
.get(&HeaderName::from_static("x-checksum"))
.unwrap(),
&HeaderValue::from_static("abc123")
);
assert!(matches!(poll_body(&mut body), Poll::Ready(None)));
assert!(body.is_end_stream());
assert!(matches!(poll_body(&mut body), Poll::Ready(None)));
}
struct PendingThenFrameStream {
yielded_pending: bool,
yielded_frame: bool,
}
impl Stream for PendingThenFrameStream {
type Item = Result<Frame<BytesCursor>, Infallible>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.yielded_pending {
self.yielded_pending = true;
return Poll::Pending;
}
if !self.yielded_frame {
self.yielded_frame = true;
return Poll::Ready(Some(Ok(Frame::data(BytesCursor::new(Bytes::from_static(
b"later",
))))));
}
Poll::Ready(None)
}
}
#[test]
fn stream_body_propagates_pending() {
let mut body = StreamBody::new(PendingThenFrameStream {
yielded_pending: false,
yielded_frame: false,
});
assert!(matches!(poll_body(&mut body), Poll::Pending));
let Poll::Ready(Some(Ok(frame))) = poll_body(&mut body) else {
panic!("expected data frame after pending") };
let data = frame.into_data().expect("expected data frame");
assert_eq!(data.chunk(), b"later");
assert!(matches!(poll_body(&mut body), Poll::Ready(None)));
assert!(body.is_end_stream());
}
struct GatedFrameStream {
gate: Arc<AtomicBool>,
pending_logged: bool,
yielded_data: bool,
yielded_trailers: bool,
pending_waker: Arc<StdMutex<Option<Waker>>>,
checkpoints: Arc<StdMutex<Vec<Value>>>,
}
impl Stream for GatedFrameStream {
type Item = Result<Frame<BytesCursor>, Infallible>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.gate.load(Ordering::SeqCst) {
if !self.pending_logged {
self.pending_logged = true;
let event = serde_json::json!({
"phase": "body_pending",
});
tracing::info!(event = %event, "body_lab_checkpoint");
self.checkpoints.lock().unwrap().push(event);
}
*self.pending_waker.lock().unwrap() = Some(cx.waker().clone());
return Poll::Pending;
}
if !self.yielded_data {
self.yielded_data = true;
let event = serde_json::json!({
"phase": "body_data_ready",
"bytes": 5,
});
tracing::info!(event = %event, "body_lab_checkpoint");
self.checkpoints.lock().unwrap().push(event);
return Poll::Ready(Some(Ok(Frame::data(BytesCursor::new(Bytes::from_static(
b"hello",
))))));
}
if !self.yielded_trailers {
self.yielded_trailers = true;
let mut trailers = HeaderMap::new();
trailers.insert(
HeaderName::from_static("x-checksum"),
HeaderValue::from_static("done"),
);
let event = serde_json::json!({
"phase": "body_trailers_ready",
});
tracing::info!(event = %event, "body_lab_checkpoint");
self.checkpoints.lock().unwrap().push(event);
return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
let event = serde_json::json!({
"phase": "body_eof",
});
tracing::info!(event = %event, "body_lab_checkpoint");
self.checkpoints.lock().unwrap().push(event);
Poll::Ready(None)
}
}
#[test]
fn stream_body_roundtrip_under_lab_runtime() {
crate::test_utils::init_test_logging();
crate::test_phase!("stream_body_roundtrip_under_lab_runtime");
let config = TestConfig::new()
.with_seed(0xB0D1_5001)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let checkpoints = Arc::new(StdMutex::new(Vec::<Value>::new()));
let gate = Arc::new(AtomicBool::new(false));
let pending_waker = Arc::new(StdMutex::new(None::<Waker>));
let (body_bytes, trailer_value, checkpoints) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = crate::cx::Cx::current().expect("lab runtime should install a current Cx");
let body_spawn_cx = cx.clone();
let gate_spawn_cx = cx.clone();
let body_task = LabRuntimeTarget::spawn(&body_spawn_cx, Budget::INFINITE, {
let checkpoints = Arc::clone(&checkpoints);
let gate = Arc::clone(&gate);
let pending_waker = Arc::clone(&pending_waker);
async move {
let mut body = StreamBody::new(GatedFrameStream {
gate,
pending_logged: false,
yielded_data: false,
yielded_trailers: false,
pending_waker,
checkpoints: Arc::clone(&checkpoints),
});
let first = std::future::poll_fn(|cx| Pin::new(&mut body).poll_frame(cx))
.await
.expect("stream body first frame should succeed")
.expect("stream body should yield first frame");
let data = first.into_data().expect("expected data frame");
let data_bytes = data.chunk().to_vec();
let data_event = serde_json::json!({
"phase": "body_data_consumed",
"bytes": data_bytes.len(),
});
tracing::info!(event = %data_event, "body_lab_checkpoint");
checkpoints.lock().unwrap().push(data_event);
let second = std::future::poll_fn(|cx| Pin::new(&mut body).poll_frame(cx))
.await
.expect("stream body trailers should succeed")
.expect("stream body should yield trailers");
let trailers = second.into_trailers().expect("expected trailers frame");
let trailer_value = trailers
.get(&HeaderName::from_static("x-checksum"))
.expect("checksum trailer should exist")
.to_str()
.expect("checksum trailer should be utf-8")
.to_string();
let trailer_event = serde_json::json!({
"phase": "body_trailers_consumed",
"value": trailer_value,
});
tracing::info!(event = %trailer_event, "body_lab_checkpoint");
checkpoints.lock().unwrap().push(trailer_event);
let eof =
std::future::poll_fn(|cx| Pin::new(&mut body).poll_frame(cx)).await;
assert!(eof.is_none(), "body should terminate after trailers");
let eof_event = serde_json::json!({
"phase": "body_complete",
});
tracing::info!(event = %eof_event, "body_lab_checkpoint");
checkpoints.lock().unwrap().push(eof_event);
(data_bytes, trailer_value)
}
});
let gate_task = LabRuntimeTarget::spawn(&gate_spawn_cx, Budget::INFINITE, {
let checkpoints = Arc::clone(&checkpoints);
let gate = Arc::clone(&gate);
let pending_waker = Arc::clone(&pending_waker);
async move {
yield_now().await;
yield_now().await;
gate.store(true, Ordering::SeqCst);
let event = serde_json::json!({
"phase": "gate_opened",
});
tracing::info!(event = %event, "body_lab_checkpoint");
checkpoints.lock().unwrap().push(event);
if let Some(waker) = pending_waker.lock().unwrap().take() {
waker.wake();
}
}
});
let gate_outcome = gate_task.await;
crate::assert_with_log!(
matches!(gate_outcome, crate::types::Outcome::Ok(())),
"gate task completes successfully",
true,
matches!(gate_outcome, crate::types::Outcome::Ok(()))
);
let body_outcome = body_task.await;
crate::assert_with_log!(
matches!(body_outcome, crate::types::Outcome::Ok(_)),
"body task completes successfully",
true,
matches!(body_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok(result) = body_outcome else {
panic!("body task should finish successfully"); };
(result.0, result.1, checkpoints.lock().unwrap().clone())
});
assert_eq!(body_bytes, b"hello");
assert_eq!(trailer_value, "done");
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "body_pending"),
"body should report an initial pending checkpoint"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "gate_opened"),
"gate opening checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "body_trailers_consumed"),
"trailer consumption checkpoint should be recorded"
);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"body lab-runtime stream test should leave runtime invariants clean: {violations:?}"
);
}
struct ErrorOnceStream {
emitted_error: bool,
}
impl Stream for ErrorOnceStream {
type Item = Result<Frame<BytesCursor>, &'static str>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.emitted_error {
self.emitted_error = true;
return Poll::Ready(Some(Err("boom")));
}
Poll::Ready(None)
}
}
#[test]
fn stream_body_propagates_errors() {
let mut body = StreamBody::new(ErrorOnceStream {
emitted_error: false,
});
let Poll::Ready(Some(Err(err))) = poll_body(&mut body) else {
panic!("expected error frame") };
assert_eq!(err, "boom");
assert!(!body.is_end_stream());
assert!(matches!(poll_body(&mut body), Poll::Ready(None)));
assert!(body.is_end_stream());
}
#[test]
fn length_limit_error_display() {
let err = LengthLimitError;
assert_eq!(format!("{err}"), "body length limit exceeded");
}
#[test]
fn length_limit_error_debug_clone_copy() {
let err = LengthLimitError;
let dbg = format!("{err:?}");
assert!(dbg.contains("LengthLimitError"), "{dbg}");
let copied = err; let cloned = err;
let _ = (copied, cloned);
}
#[test]
fn length_limit_error_is_std_error() {
let err = LengthLimitError;
let _: &dyn std::error::Error = &err;
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn limited_error_display() {
let err: LimitedError<std::io::Error> = LimitedError::LengthLimit;
assert_eq!(format!("{err}"), "body length limit exceeded");
let done: LimitedError<std::io::Error> = LimitedError::PolledAfterCompletion;
assert_eq!(format!("{done}"), "limited body polled after completion");
let inner_err = LimitedError::Inner(std::io::Error::other("inner"));
let disp = format!("{inner_err}");
assert!(disp.contains("inner"), "{disp}");
}
#[test]
fn limited_error_debug() {
let err: LimitedError<&str> = LimitedError::LengthLimit;
let dbg = format!("{err:?}");
assert!(dbg.contains("LengthLimit"), "{dbg}");
let done: LimitedError<&str> = LimitedError::PolledAfterCompletion;
let dbg = format!("{done:?}");
assert!(dbg.contains("PolledAfterCompletion"), "{dbg}");
}
#[test]
fn limited_error_source() {
let err: LimitedError<std::io::Error> = LimitedError::LengthLimit;
assert!(std::error::Error::source(&err).is_none());
let done: LimitedError<std::io::Error> = LimitedError::PolledAfterCompletion;
assert!(std::error::Error::source(&done).is_none());
let inner = LimitedError::Inner(std::io::Error::other("cause"));
assert!(std::error::Error::source(&inner).is_some());
}
#[test]
fn collected_body_initial_state() {
let body = Collected::new(Empty::new());
assert!(body.data().is_empty());
assert!(body.trailers().is_none());
}
#[test]
fn collected_body_into_data() {
let body = Collected::new(Empty::new());
let data = body.into_data();
assert!(data.is_empty());
}
#[test]
fn limited_body_new() {
let inner = Empty::new();
let limited = Limited::new(inner, 1024);
let dbg = format!("{limited:?}");
assert!(dbg.contains("Limited"), "{dbg}");
}
#[derive(Debug)]
struct PanicAfterFirstPollBody {
first_poll: bool,
}
impl PanicAfterFirstPollBody {
fn new() -> Self {
Self { first_poll: true }
}
}
impl Body for PanicAfterFirstPollBody {
type Data = BytesCursor;
type Error = Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.first_poll {
self.first_poll = false;
let data = BytesCursor::new(Bytes::from_static(b"toolong"));
return Poll::Ready(Some(Ok(Frame::data(data))));
}
panic!("Limited polled inner body after terminal length-limit violation"); }
}
#[test]
fn limited_body_fail_closes_after_length_limit_violation() {
let inner = PanicAfterFirstPollBody::new();
let mut limited = Limited::new(inner, 3);
let first = poll_body(&mut limited);
assert!(matches!(
first,
Poll::Ready(Some(Err(LimitedError::LengthLimit)))
));
let second = poll_body(&mut limited);
assert!(matches!(
second,
Poll::Ready(Some(Err(LimitedError::PolledAfterCompletion)))
));
}
#[derive(Debug)]
struct ErrorThenPanicBody {
first_poll: bool,
}
impl ErrorThenPanicBody {
fn new() -> Self {
Self { first_poll: true }
}
}
impl Body for ErrorThenPanicBody {
type Data = BytesCursor;
type Error = std::io::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.first_poll {
self.first_poll = false;
return Poll::Ready(Some(Err(std::io::Error::other("boom"))));
}
panic!("Limited polled inner body after terminal inner error"); }
}
#[test]
fn limited_body_fail_closes_after_terminal_inner_error() {
let inner = ErrorThenPanicBody::new();
let mut limited = Limited::new(inner, 16);
let first = poll_body(&mut limited);
assert!(matches!(
first,
Poll::Ready(Some(Err(LimitedError::Inner(_))))
));
let second = poll_body(&mut limited);
assert!(matches!(
second,
Poll::Ready(Some(Err(LimitedError::PolledAfterCompletion)))
));
}
#[derive(Debug)]
struct EofThenPanicBody {
first_poll: bool,
}
impl EofThenPanicBody {
fn new() -> Self {
Self { first_poll: true }
}
}
impl Body for EofThenPanicBody {
type Data = BytesCursor;
type Error = Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.first_poll {
self.first_poll = false;
return Poll::Ready(None);
}
panic!("Limited polled inner body after terminal completion"); }
}
#[test]
fn limited_body_does_not_repoll_completed_inner_body() {
let inner = EofThenPanicBody::new();
let mut limited = Limited::new(inner, 16);
assert!(matches!(poll_body(&mut limited), Poll::Ready(None)));
assert!(matches!(poll_body(&mut limited), Poll::Ready(None)));
}
}