use std::rc::Rc;
use std::{cell::RefCell, convert::TryFrom};
use crate::event::{ExchangeComplete, FiniteEvent};
use crate::grpc::transport::{GrpcCallId, GrpcCallResponse, GrpcResponseParts};
use crate::host::grpc::GrpcHost;
use crate::proxy_wasm::{
traits::{Context, HttpContext},
types::Action,
};
use crate::{
client::HttpCallResponse,
event::{
Event, EventData, Exchange, RequestBody, RequestHeaders, RequestTrailers, ResponseBody,
ResponseHeaders, ResponseTrailers,
},
host::Host,
middleware::{EventHandlerDispatch, EventHandlerStack},
reactor::{http::HttpReactor, root::RootReactor},
types::{Cid, HttpCid},
BoxError,
};
use futures::executor::LocalPool;
pub(crate) struct AsyncHttpContext {
context_id: HttpCid,
executor: Rc<RefCell<LocalPool>>,
host: Rc<dyn Host>,
grpc_host: Rc<dyn GrpcHost>,
config_reactor: Rc<RootReactor>,
reactor: Rc<HttpReactor>,
event_handlers: Rc<RefCell<EventHandlerStack>>,
}
impl AsyncHttpContext {
pub fn new(
context_id: HttpCid,
executor: Rc<RefCell<LocalPool>>,
host: Rc<dyn Host>,
grpc_host: Rc<dyn GrpcHost>,
config_reactor: Rc<RootReactor>,
reactor: Rc<HttpReactor>,
event_handlers: Rc<RefCell<EventHandlerStack>>,
) -> Self {
Self {
context_id,
executor,
host,
grpc_host,
config_reactor,
reactor,
event_handlers,
}
}
fn dispatch<S>(&self, event: S) -> Result<(), BoxError>
where
S: Event,
EventHandlerStack: EventHandlerDispatch<S>,
{
let exchange: Exchange<S> =
Exchange::new(self.reactor.clone(), self.host.clone(), Some(event.clone()));
let event = EventData::new(&exchange, event, 0);
self.event_handlers.borrow_mut().dispatch(&event)
}
fn notify_finite_event(&mut self, finite_event: FiniteEvent) -> Action {
let end_of_stream = finite_event.end_of_stream();
self.config_reactor
.set_active_cid(Cid::Http(self.context_id));
self.reactor.notify(finite_event.clone());
let kind = finite_event.kind();
let event_handler_result = match RequestHeaders::try_from(finite_event) {
Ok(event) => self.dispatch(event),
Err(finite_event) => match ResponseHeaders::try_from(finite_event) {
Ok(event) => self.dispatch(event),
Err(event) => {
log::trace!("No handler dispatched for event '{event:?}'.");
Ok(())
}
},
};
if let Err(err) = event_handler_result {
log::error!("Failed event handler for {kind:?}: {err:?}");
}
self.executor.borrow_mut().run_until_stalled();
if self.reactor.paused() || self.reactor.headers_paused() {
self.reactor.set_eos_paused(end_of_stream);
Action::Pause
} else {
Action::Continue
}
}
fn notify<E: Event>(&mut self, event: E) -> Action {
self.notify_finite_event(event.into())
}
}
impl Context for AsyncHttpContext {
fn on_http_call_response(
&mut self,
token_id: u32,
num_headers: usize,
body_size: usize,
num_trailers: usize,
) {
#[cfg(feature = "debug-logs")]
log::debug!(
"on_http_call_response: {token_id}, {num_headers}, {body_size}, {num_trailers}"
);
self.config_reactor.notify_response(HttpCallResponse {
request_id: token_id.into(),
num_headers,
body_size,
num_trailers,
});
self.config_reactor.set_active_cid(self.context_id.into());
self.executor.borrow_mut().run_until_stalled();
}
fn on_grpc_call_response(&mut self, token_id: u32, status_code: u32, response_size: usize) {
#[cfg(feature = "debug-logs")]
log::debug!("on_grpc_call_response: {token_id}, {status_code}, {response_size}");
let event = GrpcCallResponse {
call_id: GrpcCallId::new(token_id),
status_code,
response_size,
};
let content = self.grpc_host.get_grpc_call_response_body(0, response_size);
let status = self.grpc_host.get_grpc_status();
self.config_reactor
.notify_grpc_call_response(GrpcResponseParts {
event,
content,
status,
});
self.config_reactor.set_active_cid(self.context_id.into());
self.executor.borrow_mut().run_until_stalled();
}
fn on_done(&mut self) -> bool {
#[cfg(feature = "debug-logs")]
log::debug!("on_done");
self.notify(ExchangeComplete {});
self.config_reactor.set_http_context_done(self.context_id);
let result = true;
#[cfg(feature = "debug-logs")]
log::debug!("on_done -> {result}");
result
}
}
impl HttpContext for AsyncHttpContext {
fn on_http_request_headers(&mut self, num_headers: usize, end_of_stream: bool) -> Action {
#[cfg(feature = "debug-logs")]
log::debug!("on_http_request_headers: {num_headers}, {end_of_stream}");
let result = self.notify(RequestHeaders {
_num_headers: num_headers,
end_of_stream,
});
#[cfg(feature = "debug-logs")]
log::debug!("on_http_request_headers -> {result:?}");
result
}
fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
#[cfg(feature = "debug-logs")]
log::debug!("on_http_request_body: {body_size}, {end_of_stream}");
let result = self.notify(RequestBody {
body_size,
end_of_stream,
});
#[cfg(feature = "debug-logs")]
log::debug!("on_http_request_body -> {result:?}");
result
}
fn on_http_request_trailers(&mut self, num_trailers: usize) -> Action {
#[cfg(feature = "debug-logs")]
log::debug!("on_http_request_trailers: {num_trailers}");
let result = self.notify(RequestTrailers {
_num_trailers: num_trailers,
});
#[cfg(feature = "debug-logs")]
log::debug!("on_http_request_trailers -> {result:?}");
result
}
fn on_http_response_headers(&mut self, num_headers: usize, end_of_stream: bool) -> Action {
#[cfg(feature = "debug-logs")]
log::debug!("on_http_response_headers: {num_headers}, {end_of_stream}");
let result = self.notify(ResponseHeaders {
_num_headers: num_headers,
end_of_stream,
});
#[cfg(feature = "debug-logs")]
log::debug!("on_http_response_headers -> {result:?}");
result
}
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
#[cfg(feature = "debug-logs")]
log::debug!("on_http_response_body: {body_size}, {end_of_stream}");
let result = self.notify(ResponseBody {
body_size,
end_of_stream,
});
#[cfg(feature = "debug-logs")]
log::debug!("on_http_response_body -> {result:?}");
result
}
fn on_http_response_trailers(&mut self, num_trailers: usize) -> Action {
#[cfg(feature = "debug-logs")]
log::debug!("on_http_response_trailers: {num_trailers}");
let result = self.notify(ResponseTrailers {
_num_trailers: num_trailers,
});
#[cfg(feature = "debug-logs")]
log::debug!("on_http_response_trailers -> {result:?}");
result
}
}
impl Drop for AsyncHttpContext {
fn drop(&mut self) {
self.config_reactor.set_http_context_done(self.context_id);
}
}