use std::{
cell::{Cell, RefCell},
convert::TryFrom,
marker::PhantomData,
ops::Deref,
rc::Rc,
task::{Poll, Waker},
};
use futures::{Stream, StreamExt};
use crate::reactor::http::ExchangePhase;
use crate::{
host::Host,
reactor::http::{HttpReactor, WakerId},
types::HttpCid,
};
mod private {
use crate::host::Host;
pub trait Sealed {}
pub trait BodyAccessor {
fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>>;
fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]);
}
}
use private::{BodyAccessor, Sealed};
pub trait Event: Sealed + Clone + Into<FiniteEvent> + TryFrom<FiniteEvent> + Unpin {
fn kind() -> EventKind;
fn event_kind(&self) -> EventKind {
Self::kind()
}
fn body_size(&self) -> usize;
fn should_pause(&self) -> bool;
fn end_of_stream(&self) -> bool;
}
pub trait After<S: Event>: Event {}
pub trait Before<S: Event>: Event {}
impl<A, B> Before<B> for A
where
B: After<A>,
A: Event,
{
}
pub trait BodyEvent: Event + BodyAccessor {}
pub trait HeadersEvent: Event {}
#[derive(Clone, Debug)]
pub struct Start {
pub(crate) _context_id: HttpCid,
}
#[derive(Clone, Debug)]
pub struct RequestHeaders {
pub(crate) _num_headers: usize,
pub(crate) end_of_stream: bool,
}
impl HeadersEvent for RequestHeaders {}
#[derive(Clone, Debug)]
pub struct RequestBody {
pub(crate) body_size: usize,
pub(crate) end_of_stream: bool,
}
impl BodyEvent for RequestBody {}
impl BodyAccessor for RequestBody {
fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
host.get_http_request_body(offset, max_size)
}
fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
host.set_http_request_body(offset, size, value)
}
}
#[cfg(feature = "enable_stop_iteration")]
impl HeadersAccessor for EventDataStream<'_, RequestHeaders> {
fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_request_header(name)
}
fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_request_headers()
}
fn add_header(&self, name: &str, value: &str) {
self.exchange.host.add_http_request_header(name, value);
}
fn set_header(&self, name: &str, value: &str) {
self.exchange
.host
.set_http_request_header(name, Some(value));
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
self.exchange.host.set_http_request_headers(headers);
}
fn remove_header(&self, name: &str) {
self.exchange.host.set_http_request_header(name, None);
}
}
#[cfg(feature = "enable_stop_iteration")]
impl HeadersAccessor for EventDataStream<'_, RequestBody> {
fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_request_header(name)
}
fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_request_headers()
}
fn add_header(&self, name: &str, value: &str) {
self.exchange.host.add_http_request_header(name, value);
}
fn set_header(&self, name: &str, value: &str) {
self.exchange
.host
.set_http_request_header(name, Some(value));
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
self.exchange.host.set_http_request_headers(headers);
}
fn remove_header(&self, name: &str) {
self.exchange.host.set_http_request_header(name, None);
}
}
#[cfg(feature = "enable_stop_iteration")]
impl HeadersAccessor for EventDataStream<'_, ResponseHeaders> {
fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_response_header(name)
}
fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_response_headers()
}
fn add_header(&self, name: &str, value: &str) {
self.exchange.host.add_http_response_header(name, value);
}
fn set_header(&self, name: &str, value: &str) {
self.exchange
.host
.set_http_response_header(name, Some(value));
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
self.exchange.host.set_http_response_headers(headers);
}
fn remove_header(&self, name: &str) {
self.exchange.host.set_http_response_header(name, None);
}
}
#[cfg(feature = "enable_stop_iteration")]
impl HeadersAccessor for EventDataStream<'_, ResponseBody> {
fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_response_header(name)
}
fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_response_headers()
}
fn add_header(&self, name: &str, value: &str) {
self.exchange.host.add_http_response_header(name, value);
}
fn set_header(&self, name: &str, value: &str) {
self.exchange
.host
.set_http_response_header(name, Some(value));
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
self.exchange.host.set_http_response_headers(headers);
}
fn remove_header(&self, name: &str) {
self.exchange.host.set_http_response_header(name, None);
}
}
#[derive(Clone, Debug)]
pub struct RequestTrailers {
pub(crate) _num_trailers: usize,
}
impl HeadersEvent for RequestTrailers {}
#[derive(Clone, Debug)]
pub struct ResponseHeaders {
pub(crate) _num_headers: usize,
pub(crate) end_of_stream: bool,
}
impl HeadersEvent for ResponseHeaders {}
#[derive(Clone, Debug)]
pub struct ResponseBody {
pub(crate) body_size: usize,
pub(crate) end_of_stream: bool,
}
impl BodyEvent for ResponseBody {}
impl BodyAccessor for ResponseBody {
fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
host.get_http_response_body(offset, max_size)
}
fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
host.set_http_response_body(offset, size, value)
}
}
#[derive(Clone, Debug)]
pub struct ResponseTrailers {
pub(crate) _num_trailers: usize,
}
impl HeadersEvent for ResponseTrailers {}
#[derive(Clone, Debug)]
pub struct ExchangeComplete {}
macro_rules! should_pause {
(RequestBody, $value:expr) => {
!$value.end_of_stream
};
(ResponseBody, $value:expr) => {
!$value.end_of_stream
};
($event:ty, $value:expr) => {
false
};
}
macro_rules! end_of_stream {
(RequestBody, $value:expr) => {
$value.end_of_stream
};
(ResponseBody, $value:expr) => {
$value.end_of_stream
};
(RequestHeaders, $value:expr) => {
$value.end_of_stream
};
(ResponseHeaders, $value:expr) => {
$value.end_of_stream
};
($event:ty, $value:expr) => {
false
};
}
macro_rules! body_size {
(RequestBody, $value:expr) => {
$value.body_size
};
(ResponseBody, $value:expr) => {
$value.body_size
};
($event:ty, $value:expr) => {
0
};
}
macro_rules! impl_after {
($event:ty) => {};
($event:ty, $($after:ty),*) => {
$(impl After<$after> for $event {})*
impl_after!($($after),*);
};
}
macro_rules! after {
([] $($reversed:tt)*) => {
impl_after!($($reversed),*); };
([$first:tt $($rest:tt)*] $($reversed:tt)*) => {
after!([$($rest)*] $first $($reversed)*); };
}
macro_rules! finite_events {
($($event:ident,)+) => {
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum EventKind {
$($event),+
}
#[derive(Clone, Debug)]
pub enum FiniteEvent {
$($event($event)),+
}
impl FiniteEvent {
pub fn kind(&self) -> EventKind {
match self {
$(Self::$event(_) => EventKind::$event),+
}
}
pub fn end_of_stream(&self) -> bool {
match self {
$(Self::$event(e) => e.end_of_stream()),+
}
}
}
after!([$($event)*]);
$(
impl Sealed for $event {}
impl Event for $event {
fn kind() -> EventKind {
EventKind::$event
}
fn should_pause(&self) -> bool {
should_pause!($event, self)
}
fn body_size(&self) -> usize {
body_size!($event, self)
}
fn end_of_stream(&self) -> bool {
end_of_stream!($event, self)
}
}
impl From<$event> for FiniteEvent {
fn from(event: $event) -> Self {
Self::$event(event)
}
}
impl TryFrom<FiniteEvent> for $event {
type Error = FiniteEvent;
fn try_from(finite_event: FiniteEvent) -> Result<Self, FiniteEvent> {
match finite_event {
FiniteEvent::$event(e) => Ok(e),
e => Err(e),
}
}
}
)*
};
}
finite_events! {
Start,
RequestHeaders,
RequestBody,
RequestTrailers,
ResponseHeaders,
ResponseBody,
ResponseTrailers,
ExchangeComplete,
}
impl PartialOrd for EventKind {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for EventKind {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.index().cmp(&other.index())
}
}
impl EventKind {
fn index(&self) -> usize {
*self as usize
}
}
struct InnerExchange<S: Event> {
reactor: Rc<HttpReactor>,
host: Rc<dyn Host>,
first_event: RefCell<Option<S>>,
empty_stream: Cell<bool>,
offset: Cell<usize>,
event_count: Cell<usize>,
}
impl<S: Event> InnerExchange<S> {
fn take_first_event(&self) -> Option<S> {
self.first_event.borrow_mut().take()
}
}
pub struct Exchange<S: Event = Start> {
pub(crate) reactor: Rc<HttpReactor>,
pub(crate) host: Rc<dyn Host>,
inner: Rc<InnerExchange<S>>,
}
pub struct EventData<'a, S: Event> {
exchange: Rc<InnerExchange<S>>,
pub(crate) event: S,
offset: usize,
_lifetime: PhantomData<&'a ()>,
}
impl<'a, S: Event> EventData<'a, S> {
pub(crate) fn new(exchange: &'a Exchange<S>, event: S, offset: usize) -> Self {
Self {
exchange: exchange.inner.clone(),
event,
offset,
_lifetime: PhantomData,
}
}
fn from_inner(exchange: Rc<InnerExchange<S>>, event: S, offset: usize) -> Self {
Self {
exchange,
event,
offset,
_lifetime: PhantomData,
}
}
}
pub trait HeadersAccessor {
fn header(&self, name: &str) -> Option<String>;
fn headers(&self) -> Vec<(String, String)>;
fn add_header(&self, name: &str, value: &str);
fn set_header(&self, name: &str, value: &str);
fn set_headers(&self, headers: Vec<(&str, &str)>);
fn remove_header(&self, name: &str);
}
impl HeadersAccessor for EventData<'_, RequestHeaders> {
fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_request_header(name)
}
fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_request_headers()
}
fn add_header(&self, name: &str, value: &str) {
self.exchange.host.add_http_request_header(name, value);
}
fn set_header(&self, name: &str, value: &str) {
self.exchange
.host
.set_http_request_header(name, Some(value));
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
self.exchange.host.set_http_request_headers(headers);
}
fn remove_header(&self, name: &str) {
self.exchange.host.set_http_request_header(name, None);
}
}
impl EventData<'_, RequestTrailers> {
pub fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_request_trailer(name)
}
pub fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_request_trailers()
}
}
impl HeadersAccessor for EventData<'_, ResponseHeaders> {
fn header(&self, name: &str) -> Option<String> {
self.exchange.host.get_http_response_header(name)
}
fn headers(&self) -> Vec<(String, String)> {
self.exchange.host.get_http_response_headers()
}
fn add_header(&self, name: &str, value: &str) {
self.exchange.host.add_http_response_header(name, value);
}
fn set_header(&self, name: &str, value: &str) {
self.exchange
.host
.set_http_response_header(name, Some(value));
}
fn set_headers(&self, headers: Vec<(&str, &str)>) {
self.exchange.host.set_http_response_headers(headers);
}
fn remove_header(&self, name: &str) {
self.exchange.host.set_http_response_header(name, None);
}
}
impl<S: Event> Exchange<S> {
pub(crate) fn new(
reactor: Rc<HttpReactor>,
host: Rc<dyn Host>,
first_event: Option<S>,
) -> Self {
let empty_stream = first_event
.as_ref()
.map(|e| !e.should_pause())
.unwrap_or(false);
Self {
reactor: reactor.clone(),
host: host.clone(),
inner: Rc::new(InnerExchange {
reactor,
host,
first_event: RefCell::new(first_event),
empty_stream: Cell::new(empty_stream),
event_count: Cell::new(0),
offset: Cell::new(0),
}),
}
}
pub fn event_data(&self) -> Option<EventData<S>>
where
S: HeadersEvent,
{
let finite_event = self.reactor.cloned_finite_event();
S::try_from(finite_event)
.ok()
.map(|e| EventData::new(self, e, 0))
}
#[must_use]
pub fn event_data_stream(&self) -> EventDataStream<S> {
EventDataStream {
id_and_waker: None,
exchange: self.inner.clone(),
buffering: true,
_lifetime: PhantomData,
}
}
pub(crate) fn static_event_data_stream(&self) -> EventDataStream<'static, S>
where
S: BodyEvent,
{
EventDataStream {
id_and_waker: None,
exchange: self.inner.clone(),
buffering: false,
_lifetime: PhantomData,
}
}
pub(crate) async fn wait_for_event<E>(self) -> Exchange<E>
where
E: Event,
S: Before<E>,
{
self.wait_for_event_buffering(true).await
}
pub(crate) async fn wait_for_event_buffering<E>(self, buffering: bool) -> Exchange<E>
where
E: Event,
S: Before<E>,
{
let exchange: Exchange<E> =
Exchange::new(Rc::clone(&self.reactor), Rc::clone(&self.host), None);
drop(self);
let mut stream = EventDataStream {
id_and_waker: None,
exchange: exchange.inner.clone(),
buffering,
_lifetime: PhantomData,
};
let first_event = stream.next().await.map(|ed| ed.event);
*exchange.inner.first_event.borrow_mut() = first_event;
exchange.inner.offset.set(0);
exchange.inner.empty_stream.set(false);
exchange
}
pub async fn wait_for_request_headers(self) -> Exchange<RequestHeaders>
where
S: Before<RequestHeaders>,
{
self.wait_for_event().await
}
pub async fn wait_for_request_body(self) -> Exchange<RequestBody>
where
S: Before<RequestBody>,
{
self.wait_for_event().await
}
pub(crate) async fn _wait_for_request_trailers(self) -> Exchange<RequestTrailers>
where
S: Before<RequestTrailers>,
{
self.wait_for_event().await
}
pub async fn wait_for_response_headers(self) -> Exchange<ResponseHeaders>
where
S: Before<ResponseHeaders>,
{
self.wait_for_event().await
}
pub async fn wait_for_response_body(self) -> Exchange<ResponseBody>
where
S: Before<ResponseBody>,
{
self.wait_for_event().await
}
pub(crate) async fn _wait_for_response_trailers(self) -> Exchange<ResponseTrailers>
where
S: Before<ResponseTrailers>,
{
self.wait_for_event().await
}
pub(crate) async fn _wait_for_exchange_complete(self) -> Exchange<ExchangeComplete>
where
S: Before<ExchangeComplete>,
{
self.wait_for_event().await
}
pub fn send_response(self, status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>)
where
S: After<Start> + Before<ResponseBody>,
{
self.host
.set_effective_context(self.reactor.context_id().into());
self.reactor.set_paused(true);
self.reactor.cancel_request();
self.host.send_http_response(status_code, headers, body);
}
fn should_resume(&self, current: EventKind, next: EventKind) -> bool {
let feature_enabled = cfg!(feature = "enable_stop_iteration");
let current_match = self.is_event(current);
let next_match = next == self.reactor.current_event();
!feature_enabled || !current_match || !next_match
}
fn should_resume_request(&self) -> bool {
self.should_resume(EventKind::RequestHeaders, EventKind::RequestBody)
}
fn should_resume_response(&self) -> bool {
self.should_resume(EventKind::ResponseHeaders, EventKind::ResponseBody)
}
fn is_event(&self, kind: EventKind) -> bool {
self.inner
.first_event
.borrow()
.as_ref()
.map(|d| d.event_kind() == kind)
.unwrap_or(false)
}
}
impl<S: Event> Drop for Exchange<S> {
fn drop(&mut self) {
let reactor = &self.reactor;
let host = &self.host;
if !reactor.is_done()
&& reactor.paused()
&& ((!reactor.cancelled_request() && reactor.phase() == ExchangePhase::Request)
|| (!reactor.cancelled_response() && reactor.phase() == ExchangePhase::Response))
{
match reactor.phase() {
ExchangePhase::Request => {
if self.should_resume_request() {
reactor.set_paused(false);
reactor.set_eos_paused(false);
host.set_effective_context(reactor.context_id().into());
host.resume_http_request()
}
}
ExchangePhase::Response => {
if self.should_resume_response() {
reactor.set_paused(false);
reactor.set_eos_paused(false);
host.set_effective_context(reactor.context_id().into());
host.resume_http_response()
}
}
}
}
}
}
impl<S> EventData<'_, S>
where
S: BodyEvent,
{
pub fn offset(&self) -> usize {
self.offset
}
pub fn chunk_size(&self) -> usize {
self.event.body_size()
}
pub fn read_body(&self, offset: usize, max_size: usize) -> Vec<u8> {
S::read_body(self.exchange.host.deref(), offset, max_size).unwrap_or_default()
}
pub fn read_chunk(&self) -> Vec<u8> {
self.read_body(self.offset, self.event.body_size())
}
pub fn read_payload(&self) -> Vec<u8> {
self.read_body(0, self.event.body_size())
}
}
pub struct EventDataStream<'e, S: Event> {
exchange: Rc<InnerExchange<S>>,
id_and_waker: Option<(WakerId, Waker)>,
buffering: bool,
_lifetime: PhantomData<&'e ()>,
}
impl<'e, S: Event> EventDataStream<'e, S> {
fn process_event(&mut self, event: S) -> EventData<'e, S> {
let exchange = self.exchange.clone();
let reactor = exchange.reactor.as_ref();
let pause = self.buffering && event.should_pause() || reactor.eos_paused();
reactor.set_paused(pause);
let offset = exchange.offset.get();
exchange.offset.set(event.body_size());
exchange.empty_stream.set(!event.should_pause());
EventData::from_inner(exchange, event, offset)
}
}
impl<'e, S: Event> Stream for EventDataStream<'e, S> {
type Item = EventData<'e, S>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let reactor = self.exchange.reactor.clone();
let exchange = self.exchange.clone();
if exchange.empty_stream.get() {
if let Some((id, _)) = self.id_and_waker.take() {
reactor.remove_waker(S::kind(), id);
}
return Poll::Ready(None);
}
if reactor.current_event() >= S::kind() {
let event_data = if let Some(event) = exchange.take_first_event() {
if let Some((id, _)) = self.id_and_waker.take() {
reactor.remove_waker(S::kind(), id);
}
Some(self.process_event(event))
} else {
let event_count = reactor.event_count();
if event_count > exchange.event_count.get() {
exchange.event_count.set(event_count);
if let Ok(event) = S::try_from(reactor.cloned_finite_event()) {
Some(self.process_event(event))
} else {
None
}
} else {
return Poll::Pending;
}
};
Poll::Ready(event_data)
} else {
match &self.id_and_waker {
None => {
let id = reactor.insert_waker(S::kind(), cx.waker().clone());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
reactor.remove_waker(S::kind(), *id);
let id = reactor.insert_waker(S::kind(), cx.waker().clone());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Poll::Pending
}
}
}