use {
super::{IntoBytes, IntoStreamingBody, StreamingBody},
futures_util::{Stream, StreamExt},
hyper::body::Bytes,
parking_lot::RwLock,
std::{
fmt::Debug,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
},
};
pub trait FrameLen {
fn encode_len(len: usize) -> Vec<u8>;
fn decode_len(buf: &[u8]) -> Option<usize>;
}
impl FrameLen for u64 {
fn encode_len(len: usize) -> Vec<u8> {
(len as Self).to_be_bytes().to_vec()
}
fn decode_len(buf: &[u8]) -> Option<usize> {
Some(Self::from_be_bytes(buf.get(..size_of::<Self>())?.try_into().ok()?) as _)
}
}
impl FrameLen for u32 {
fn encode_len(len: usize) -> Vec<u8> {
(len as Self).to_be_bytes().to_vec()
}
fn decode_len(buf: &[u8]) -> Option<usize> {
Some(Self::from_be_bytes(buf.get(..size_of::<Self>())?.try_into().ok()?) as _)
}
}
impl FrameLen for u16 {
fn encode_len(len: usize) -> Vec<u8> {
(len as Self).to_be_bytes().to_vec()
}
fn decode_len(buf: &[u8]) -> Option<usize> {
Some(Self::from_be_bytes(buf.get(..size_of::<Self>())?.try_into().ok()?) as _)
}
}
impl FrameLen for u8 {
fn encode_len(len: usize) -> Vec<u8> {
vec![len as u8]
}
fn decode_len(buf: &[u8]) -> Option<usize> {
buf.first().map(|&b| b as _)
}
}
impl FrameLen for () {
fn encode_len(_len: usize) -> Vec<u8> {
Vec::new()
}
fn decode_len(_buf: &[u8]) -> Option<usize> {
Some(0)
}
}
pub struct FramedBox<S, N = u16>
where
S: ?Sized,
{
inner: Pin<Box<S>>,
_len: PhantomData<fn() -> N>,
}
impl<S, N> FramedBox<S, N>
where
S: ?Sized,
{
pub(super) fn new(inner: Pin<Box<S>>) -> Self {
Self {
inner,
_len: Default::default(),
}
}
}
impl<S, N> FramedBox<S, N> {
pub fn pin(stream: S) -> Self {
Self {
inner: Box::pin(stream),
_len: PhantomData,
}
}
}
impl<S, N> Debug for FramedBox<S, N>
where
S: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FramedBox")
.field("inner", &self.inner)
.finish()
}
}
impl<S: ?Sized, N> Stream for FramedBox<S, N>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = unsafe { self.get_unchecked_mut() };
this.inner.as_mut().poll_next(cx)
}
}
impl<S, T, N> IntoStreamingBody for FramedBox<S, N>
where
S: Stream<Item = T> + Send + Sync + 'static,
T: IntoBytes + 'static,
N: FrameLen,
{
fn into_streaming_body(self) -> StreamingBody {
if size_of::<N>() == 0 {
StreamingBody::Stream {
stream: Arc::new(RwLock::new(Box::pin(self.inner.map(IntoBytes::into)))),
}
} else {
StreamingBody::Stream {
stream: Arc::new(RwLock::new(Box::pin(self.inner.map(|i| {
let data: Bytes = i.into();
let mut framed = Vec::with_capacity(size_of::<N>() + data.len());
framed.extend_from_slice(&N::encode_len(data.len()));
framed.extend_from_slice(&data);
Bytes::from(framed)
})))),
}
}
}
}