use futures::StreamExt;
use std::{cell::RefCell, future::ready, rc::Rc};
use super::{
dynamic_exchange::{DynamicExchange, ExchangeEvent},
entity::{BodyError, BodyHandler},
};
use crate::{event::BodyEvent, host::Host};
pub(super) const MAX_BODY_SIZE: usize = 1024 * 1024;
struct Inner<B> {
host: Rc<dyn Host>,
event: B,
}
pub struct BodyExchange<B> {
inner: Option<Inner<B>>,
}
impl<B> BodyExchange<B>
where
B: BodyEvent + ExchangeEvent,
{
#[allow(clippy::await_holding_refcell_ref)]
pub(super) async fn new(exchange: Rc<RefCell<DynamicExchange>>, contains_body: bool) -> Self {
if !contains_body {
return Self { inner: None };
}
let mut exchange = exchange.borrow_mut();
let exchange = exchange
.wait_for_event::<B>()
.await
.expect("Must contain body");
let event = exchange
.event_data_stream()
.map(|e| e.event)
.skip_while(|e| ready(!e.end_of_stream()))
.next()
.await
.expect("End of stream");
Self {
inner: Some(Inner {
host: exchange.host.clone(),
event,
}),
}
}
pub(super) fn contains_body(&self) -> bool {
self.inner.is_some()
}
}
impl<B: BodyEvent> BodyHandler for BodyExchange<B> {
fn body(&self) -> Vec<u8> {
let Some(inner) = self.inner.as_ref() else {
return Vec::new();
};
B::read_body(inner.host.as_ref(), 0, inner.event.body_size()).unwrap_or_default()
}
fn set_body(&self, body: &[u8]) -> Result<(), BodyError> {
let Some(inner) = self.inner.as_ref() else {
return Err(BodyError::BodyNotSent);
};
#[cfg(not(feature = "experimental_disable_body_limit_check"))]
if body.len() >= MAX_BODY_SIZE {
return Err(BodyError::ExceededBodySize(body.len()));
}
B::write_body(inner.host.as_ref(), 0, usize::MAX, body);
Ok(())
}
}