use std::{cell::RefCell, error::Error, rc::Rc};
use crate::{
grpc::transport::{GrpcCallId, GrpcCallResponse, GrpcResponseParts},
grpc::GrpcStatusCode,
host::grpc::GrpcHost,
proxy_wasm::{
traits::{Context, HttpContext, RootContext},
types::ContextType,
},
};
use futures::{executor::LocalPool, task::LocalSpawnExt};
use crate::{
client::HttpCallResponse,
extract::context::ConfigureContext,
handler::{Handler, IntoHandlerResult},
host::{clock::Clock, shared_data::SharedData, Host},
middleware::EventHandlerStack,
reactor::root::RootReactor,
types::{HttpCid, RootCid},
};
use super::{error::ErrorContext, http::AsyncHttpContext};
#[derive(Clone)]
enum ConfigurationState {
Started,
Finished,
Failed(Rc<dyn Error>),
}
pub(crate) struct AsyncRootContext<C> {
context_id: RootCid,
state: Rc<RefCell<ConfigurationState>>,
host: Rc<dyn Host>,
clock: Rc<dyn Clock>,
grpc_host: Rc<dyn GrpcHost>,
shared_data: Rc<dyn SharedData>,
#[cfg(feature = "experimental_metrics")]
metrics: Rc<dyn crate::host::metrics::MetricsHost>,
executor: Rc<RefCell<LocalPool>>,
reactor: Rc<RootReactor>,
event_handlers: Rc<RefCell<EventHandlerStack>>,
configure: Rc<C>,
}
impl<C> AsyncRootContext<C> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
context_id: RootCid,
host: Rc<dyn Host>,
clock: Rc<dyn Clock>,
grpc_host: Rc<dyn GrpcHost>,
shared_data: Rc<dyn SharedData>,
#[cfg(feature = "experimental_metrics")] metrics: Rc<dyn crate::host::metrics::MetricsHost>,
event_handlers: EventHandlerStack,
configure: C,
) -> Self {
Self {
context_id,
state: Rc::new(RefCell::new(ConfigurationState::Started)),
host,
clock,
grpc_host,
shared_data,
#[cfg(feature = "experimental_metrics")]
metrics,
executor: Rc::new(RefCell::new(LocalPool::new())),
reactor: Rc::new(RootReactor::new(context_id)),
event_handlers: Rc::new(RefCell::new(event_handlers)),
configure: Rc::new(configure),
}
}
}
impl<C> Context for AsyncRootContext<C> {
fn on_http_call_response(
&mut self,
token_id: u32,
num_headers: usize,
body_size: usize,
num_trailers: usize,
) {
self.reactor.notify_response(HttpCallResponse {
request_id: token_id.into(),
num_headers,
body_size,
num_trailers,
});
self.reactor.set_active_cid(self.context_id.into());
self.executor.borrow_mut().run_until_stalled();
}
fn on_grpc_call_response(&mut self, token_id: u32, status_code: u32, response_size: usize) {
let event = GrpcCallResponse {
call_id: GrpcCallId::new(token_id),
status_code,
response_size,
};
let (content, status) = if status_code == GrpcStatusCode::Ok.code() {
(
self.grpc_host.get_grpc_call_response_body(0, response_size),
None,
)
} else {
(None, self.grpc_host.get_grpc_status())
};
self.reactor.notify_grpc_call_response(GrpcResponseParts {
event,
content,
status,
});
self.reactor.set_active_cid(self.context_id.into());
self.executor.borrow_mut().run_until_stalled();
}
fn on_done(&mut self) -> bool {
self.reactor.set_done();
true
}
}
impl<C> RootContext for AsyncRootContext<C>
where
C: Handler<ConfigureContext> + 'static,
C::Output: IntoHandlerResult,
{
fn get_type(&self) -> Option<ContextType> {
Some(ContextType::HttpContext)
}
fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
match *self.state.borrow() {
ConfigurationState::Failed(ref error) => {
let error_context = ErrorContext::new(self.host.clone(), error.clone());
Some(Box::new(error_context))
}
ConfigurationState::Finished => {
log::error!("Inconsistent filter state.");
let error: Box<dyn Error> =
Box::from("Incoming request after the configure function finished");
let error_context = ErrorContext::new(self.host.clone(), error.into());
Some(Box::new(error_context))
}
ConfigurationState::Started => {
let context_id = HttpCid::from(context_id);
let executor = self.executor.clone();
let host = self.host.clone();
let grpc_host = self.grpc_host.clone();
let config_reactor = self.reactor.clone();
let reactor = if let Some(reactor) = config_reactor.create_http_context(context_id)
{
reactor
} else {
log::warn!(
"There is no current HttpContext available. \
A filter function was not launched in configure function or \
an async operation is still in progress."
);
let error: Box<dyn Error> = Box::from("Configuration in pending state");
let error_context = ErrorContext::new(self.host.clone(), error.into());
return Some(Box::new(error_context));
};
config_reactor.set_active_cid(context_id.into());
self.executor.borrow_mut().run_until_stalled();
let filter = AsyncHttpContext::new(
context_id,
executor,
host,
grpc_host,
config_reactor.clone(),
reactor,
self.event_handlers.clone(),
);
config_reactor.set_active_cid(self.context_id.into());
Some(Box::new(filter))
}
}
}
fn on_configure(&mut self, _plugin_configuration_size: usize) -> bool {
let host = self.host.clone();
let clock = self.clock.clone();
let grpc_host = self.grpc_host.clone();
let shared_data = self.shared_data.clone();
#[cfg(feature = "experimental_metrics")]
let metrics = self.metrics.clone();
let reactor = self.reactor.clone();
reactor.set_active_cid(self.context_id.into());
let context = ConfigureContext {
host: host.clone(),
clock: clock.clone(),
grpc_host: grpc_host.clone(),
shared_data: shared_data.clone(),
#[cfg(feature = "experimental_metrics")]
metrics: metrics.clone(),
root_reactor: reactor,
unique_extractions: Default::default(),
};
let state = self.state.clone();
let configure = self.configure.clone();
let task = async move {
match configure.call(context).await {
Ok(result) => {
let handler_output = result.into_handler_result();
match handler_output {
Ok(()) => *state.borrow_mut() = ConfigurationState::Finished,
Err(error) => {
log::error!("Launcher problem: {error}");
*state.borrow_mut() = ConfigurationState::Failed(error.into());
}
}
}
Err(error) => {
log::error!("Configuration problem in extraction: {error}");
*state.borrow_mut() = ConfigurationState::Failed(Rc::new(error));
}
}
};
let spawn_result = self.executor.borrow().spawner().spawn_local(task);
if let Err(error) = spawn_result {
log::error!("Configuration problem: {error}");
*self.state.borrow_mut() = ConfigurationState::Failed(Rc::new(error));
}
self.executor.borrow_mut().run_until_stalled();
true
}
fn on_tick(&mut self) {
if self.reactor.notify_tick() {
self.executor.borrow_mut().run_until_stalled();
}
}
}