use crate::formdata::FormData;
use bytes::Bytes;
use flate2::read::GzDecoder;
use http::HeaderMap;
use rong::*;
use rong_buffer::{Blob, File};
use rong_stream::{ReadableStream, readable_stream_take_receiver};
use rong_url::URLSearchParams;
use std::fmt;
use std::io::Read;
use std::sync::{Arc, Mutex};
type HostBodyChunk = Result<Bytes, String>;
type HostBodyReceiver = tokio::sync::mpsc::Receiver<HostBodyChunk>;
pub struct HostBodyStream {
inner: Arc<Mutex<Option<HostBodyReceiver>>>,
}
impl HostBodyStream {
pub fn from_receiver(receiver: tokio::sync::mpsc::Receiver<Result<Bytes, String>>) -> Self {
Self {
inner: Arc::new(Mutex::new(Some(receiver))),
}
}
pub fn into_receiver(
self,
) -> Result<tokio::sync::mpsc::Receiver<Result<Bytes, String>>, String> {
self.inner
.lock()
.map_err(|_| "failed to lock streaming body".to_owned())?
.take()
.ok_or_else(|| "streaming body already consumed".to_owned())
}
pub(crate) fn shared_slot(&self) -> Arc<Mutex<Option<HostBodyReceiver>>> {
self.inner.clone()
}
pub(crate) fn from_shared_slot(inner: Arc<Mutex<Option<HostBodyReceiver>>>) -> Self {
Self { inner }
}
pub(crate) fn is_consumed(&self) -> Result<bool, String> {
self.inner
.lock()
.map_err(|_| "failed to lock streaming body".to_owned())
.map(|guard| guard.is_none())
}
pub(crate) fn try_take_receiver(&self) -> Result<Option<HostBodyReceiver>, String> {
self.inner
.lock()
.map_err(|_| "failed to lock streaming body".to_owned())
.map(|mut guard| guard.take())
}
}
impl fmt::Debug for HostBodyStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("HostBodyStream(..)")
}
}
pub enum HostBody {
Empty,
Bytes(Bytes),
Stream(HostBodyStream),
}
impl HostBody {
pub fn empty() -> Self {
Self::Empty
}
pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
Self::Bytes(bytes.into())
}
pub fn from_stream(receiver: tokio::sync::mpsc::Receiver<Result<Bytes, String>>) -> Self {
Self::Stream(HostBodyStream::from_receiver(receiver))
}
pub fn as_bytes(&self) -> Option<Vec<u8>> {
match self {
Self::Empty => Some(Vec::new()),
Self::Bytes(bytes) => Some(bytes.to_vec()),
Self::Stream(_) => None,
}
}
pub fn is_definitely_empty(&self) -> bool {
match self {
Self::Empty => true,
Self::Bytes(bytes) => bytes.is_empty(),
Self::Stream(_) => false,
}
}
pub async fn into_bytes(self) -> Result<Bytes, String> {
match self {
Self::Empty => Ok(Bytes::new()),
Self::Bytes(bytes) => Ok(bytes),
Self::Stream(stream) => {
let mut receiver = stream.into_receiver()?;
let mut collected = Vec::new();
while let Some(chunk) = receiver.recv().await {
match chunk {
Ok(bytes) => collected.extend_from_slice(&bytes),
Err(error) => return Err(error),
}
}
Ok(Bytes::from(collected))
}
}
}
}
impl fmt::Debug for HostBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Empty => f.write_str("HostBody::Empty"),
Self::Bytes(bytes) => f
.debug_tuple("HostBody::Bytes")
.field(&format_args!("{} bytes", bytes.len()))
.finish(),
Self::Stream(_) => f.write_str("HostBody::Stream(..)"),
}
}
}
impl From<Bytes> for HostBody {
fn from(value: Bytes) -> Self {
Self::Bytes(value)
}
}
impl From<Vec<u8>> for HostBody {
fn from(value: Vec<u8>) -> Self {
Self::Bytes(Bytes::from(value))
}
}
pub(crate) enum BodyKind {
Buffered(Bytes),
Channel(HostBodyStream),
JS(HttpBody),
}
impl Clone for BodyKind {
fn clone(&self) -> Self {
match self {
Self::Buffered(bytes) => Self::Buffered(bytes.clone()),
Self::Channel(stream) => {
Self::Channel(HostBodyStream::from_shared_slot(stream.shared_slot()))
}
Self::JS(body) => Self::JS(body.clone()),
}
}
}
#[derive(Clone)]
pub(crate) struct HttpBody(pub JSValue);
impl HttpBody {
pub async fn to_bytes(&self) -> JSResult<(Bytes, Option<String>)> {
if let Some(obj) = self.0.clone().into_object() {
let ctx = obj.context();
if let Ok(params) = obj.borrow::<URLSearchParams>() {
return Ok((Bytes::from(params.to_string()), None));
}
if let Some(typed_array) = AnyJSTypedArray::from_object(obj.clone())
&& let Some(bytes) = typed_array.as_bytes()
{
return Ok((Bytes::from(bytes.to_vec()), None));
}
if let Some(buffer) = JSArrayBuffer::from_object(obj.clone()) {
return Ok((Bytes::from(buffer.as_bytes().to_vec()), None));
}
if let Ok(blob) = obj.borrow::<Blob>() {
return Ok((blob.bytes_ref().clone(), None));
}
if let Ok(file) = obj.borrow::<File>() {
return Ok((file.bytes_ref().clone(), None));
}
let formdata = if let Ok(formdata) = obj.borrow::<FormData>() {
Some(formdata.clone())
} else {
None
};
if let Some(formdata) = formdata {
let (body, boundary) = formdata.serialize(ctx.clone()).await?;
return Ok((Bytes::from(body), Some(boundary)));
}
let stream_receiver = if let Ok(stream) = obj.borrow::<ReadableStream>() {
Some(readable_stream_take_receiver(&stream))
} else {
None
};
if let Some(receiver) = stream_receiver {
let mut receiver = receiver.ok_or_else(|| {
HostError::new(
rong::error::E_INVALID_STATE,
"ReadableStream body already used",
)
.with_name("TypeError")
})?;
let mut collected = Vec::new();
while let Some(chunk) = receiver.recv().await {
match chunk {
Ok(bytes) => collected.extend_from_slice(&bytes),
Err(error) => {
return Err(HostError::new(rong::error::E_STREAM, error)
.with_name("TypeError")
.into());
}
}
}
return Ok((Bytes::from(collected), None));
}
return Ok((Bytes::new(), None));
}
if let Ok(s) = self.0.clone().to_rust::<String>() {
return Ok((Bytes::from(s), None));
}
Ok((Bytes::new(), None))
}
pub async fn text(&self) -> JSResult<String> {
let (bytes, _) = self.to_bytes().await?;
Ok(String::from_utf8_lossy(&bytes).into_owned())
}
pub async fn bytes(&self) -> JSResult<Bytes> {
Ok(self.to_bytes().await?.0)
}
}
pub(crate) fn decompress_bytes(bytes: Bytes, headers: &HeaderMap) -> JSResult<Bytes> {
if let Some(encoding) = headers.get(http::header::CONTENT_ENCODING) {
match encoding.to_str() {
Ok("gzip") => {
let mut decoder = GzDecoder::new(&bytes[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).map_err(|e| {
HostError::new(
rong::error::E_IO,
format!("Failed to decompress gzip: {}", e),
)
})?;
Ok(Bytes::from(decompressed))
}
Ok(encoding) => Err(HostError::new(
rong::error::E_NOT_SUPPORTED,
format!("Unsupported content-encoding: {}", encoding),
)
.into()),
Err(_) => Ok(bytes),
}
} else {
Ok(bytes)
}
}