use super::{
dynamic_exchange::{DynamicExchange, ExchangeEvent},
entity::{BodyError, BodyHandler, HeadersHandler},
};
use crate::event::{BodyEvent, EventData, EventDataStream, HeadersAccessor, HeadersEvent};
use crate::hl::entity::HeadersBodyHandler;
use crate::host::Host;
use futures::StreamExt;
use std::future::ready;
use std::{cell::RefCell, marker::PhantomData, rc::Rc};
struct Inner<B> {
host: Rc<dyn Host>,
event: B,
}
pub struct HeadersBodyExchange<B, H> {
exchange: Rc<RefCell<DynamicExchange>>,
inner: Option<Inner<B>>,
_events: PhantomData<B>,
_previous_event: PhantomData<H>,
}
impl<B, H> HeadersBodyExchange<B, H>
where
B: BodyEvent + ExchangeEvent,
H: HeadersEvent + ExchangeEvent,
{
#[allow(clippy::await_holding_refcell_ref)]
pub(super) async fn new(exchange: Rc<RefCell<DynamicExchange>>, contains_body: bool) -> Self {
if !contains_body {
return Self {
exchange,
inner: None,
_events: PhantomData,
_previous_event: PhantomData,
};
}
let (host, event) = {
let mut exchange_ref = exchange.borrow_mut();
if let Some(reactor) = exchange_ref.get_reactor() {
reactor.set_headers_paused(true);
}
let body_exchange = exchange_ref
.wait_for_event::<B>()
.await
.expect("Must contain body");
let event = body_exchange
.event_data_stream()
.map(|e| e.event)
.skip_while(|e| ready(!e.end_of_stream()))
.next()
.await
.expect("End of stream");
(body_exchange.host.clone(), event)
};
Self {
exchange: exchange.clone(),
inner: Some(Inner { host, event }),
_events: PhantomData,
_previous_event: PhantomData,
}
}
pub(super) fn contains_body(&self) -> bool {
self.inner.is_some()
}
pub fn with_headers_body<F, T>(&self, op: F) -> Option<T>
where
for<'e> F: FnOnce(&EventDataStream<'e, B>) -> T,
{
self.exchange
.borrow()
.get::<B>()
.map(|ex| op(&ex.event_data_stream()))
}
pub fn with_headers<F, T>(&self, op: F) -> Option<T>
where
for<'f> F: FnOnce(&EventData<'f, H>) -> T,
{
self.exchange
.borrow()
.get::<H>()
.and_then(|exchange| exchange.event_data())
.map(|event_data| op(&event_data))
}
}
impl<B, H> HeadersBodyExchange<B, H>
where
B: BodyEvent + ExchangeEvent + 'static,
H: HeadersEvent + ExchangeEvent + 'static,
for<'e> EventDataStream<'e, B>: HeadersAccessor,
for<'f> EventData<'f, H>: HeadersAccessor,
{
pub fn handler(&self) -> &dyn HeadersBodyHandler {
self
}
}
macro_rules! with_headers_fallback {
($self:expr, $method:ident $(, $arg:expr)*) => {
if let Some(res) = $self.with_headers_body(|h| h.$method($($arg),*)) {
res
} else {
$self.with_headers(|h| h.$method($($arg),*))
.expect("Exchange should be in Body event or header event")
}
};
}
macro_rules! with_headers_fallback_cloned {
($self:expr, $method:ident $(, $arg:expr)*) => {
if let Some(res) = $self.with_headers_body(|h| h.$method($($arg.clone()),*)) {
res
} else {
$self.with_headers(|h| h.$method($($arg),*))
.expect("Exchange should be in Body event or header event")
}
};
}
impl<B, H> HeadersHandler for HeadersBodyExchange<B, H>
where
B: BodyEvent + ExchangeEvent + 'static,
H: HeadersEvent + ExchangeEvent + 'static,
for<'e> EventDataStream<'e, B>: HeadersAccessor,
for<'f> EventData<'f, H>: HeadersAccessor,
{
fn headers(&self) -> Vec<(String, String)> {
with_headers_fallback!(self, headers)
}
fn header(&self, name: &str) -> Option<String> {
with_headers_fallback!(self, header, name)
}
fn add_header(&self, name: &str, value: &str) {
with_headers_fallback!(self, add_header, name, value)
}
fn set_header(&self, name: &str, value: &str) {
with_headers_fallback!(self, set_header, name, value)
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
with_headers_fallback_cloned!(self, set_headers, headers)
}
fn remove_header(&self, name: &str) {
with_headers_fallback!(self, remove_header, name)
}
}
impl<B, H> BodyHandler for HeadersBodyExchange<B, H>
where
B: BodyEvent + ExchangeEvent + 'static,
H: HeadersEvent + ExchangeEvent + 'static,
for<'e> EventDataStream<'e, B>: HeadersAccessor,
for<'f> EventData<'f, H>: HeadersAccessor,
{
fn body(&self) -> Vec<u8> {
let Some(inner) = self.inner.as_ref() else {
return Vec::new();
};
B::read_body(inner.host.as_ref(), 0, inner.event.body_size()).unwrap_or_default()
}
fn set_body(&self, body: &[u8]) -> Result<(), BodyError> {
let Some(inner) = self.inner.as_ref() else {
return Err(BodyError::BodyNotSent);
};
#[cfg(not(feature = "experimental_disable_body_limit_check"))]
if body.len() >= crate::hl::body::MAX_BODY_SIZE {
return Err(BodyError::ExceededBodySize(body.len()));
}
B::write_body(inner.host.as_ref(), 0, usize::MAX, body);
Ok(())
}
}
impl<B, H> HeadersBodyHandler for HeadersBodyExchange<B, H>
where
B: BodyEvent + ExchangeEvent + 'static,
H: HeadersEvent + ExchangeEvent + 'static,
for<'e> EventDataStream<'e, B>: HeadersAccessor,
for<'f> EventData<'f, H>: HeadersAccessor,
{
}