pub mod versions;
use std::convert::Infallible;
use wasmtime::Store;
use wasmtime_wasi_http::types::HostIncomingRequest;
use wasmtime_wasi_http::WasiHttpView;
use crate::config::ComponentsConfiguration;
use crate::context::ComponentsContext;
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::body::Incoming;
use http_body_util::Full;
use hyper::Response;
pub struct EdgeFunctionOutput {
pub status: http::StatusCode,
pub headers: http::HeaderMap,
pub body: Vec<u8>,
}
fn build_response(status: http::StatusCode, body: String) -> EdgeFunctionOutput {
EdgeFunctionOutput {
status,
headers: http::HeaderMap::new(),
body: body.into_bytes(),
}
}
impl From<EdgeFunctionOutput> for Response<BoxBody<Bytes, Infallible>> {
fn from(output: EdgeFunctionOutput) -> Self {
let mut builder = Response::builder().status(output.status);
let headers = builder.headers_mut().unwrap();
for (name, value) in output.headers.iter() {
headers.insert(name, value.clone());
}
builder
.body(Full::from(Bytes::from(output.body)).boxed())
.unwrap()
}
}
pub async fn invoke_fn_internal(
component_ctx: &ComponentsContext,
component_name: &str,
request: wasmtime::component::Resource<HostIncomingRequest>,
mut store: Store<crate::context::HostState>,
) -> EdgeFunctionOutput {
let data = store.data_mut();
let (sender, receiver) = tokio::sync::oneshot::channel();
let response = data.new_response_outparam(sender).unwrap();
let component = component_ctx
.get_edge_function_1_0_0_instance(component_name, &mut store)
.await
.unwrap();
tokio::task::spawn(async move {
match component
.wasi_http_incoming_handler()
.call_handle(store, request, response)
.await
{
Ok(()) => {}
Err(e) => {
println!("WASI HTTP handler failed: {e}");
}
}
});
let resp = match receiver.await {
Ok(Ok(response)) => {
let status = response.status();
let headers = response.headers().clone();
let body = match response.into_body().collect().await {
Ok(body) => body.to_bytes().to_vec(),
Err(e) => {
println!("Failed to collect response body: {e}");
return build_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to collect response body".to_string(),
);
}
};
let mut builder = Response::builder().status(status);
let builder_headers = builder.headers_mut().unwrap();
for (header_name, header_value) in headers.iter() {
builder_headers.insert(header_name, header_value.clone());
}
EdgeFunctionOutput {
status,
headers: builder_headers.clone(),
body,
}
}
Ok(Err(_)) => build_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to handle response".to_string(),
)
,
Err(_) => build_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to receive response from WASI HTTP handler".to_string()
)
};
resp
}
pub async fn invoke_fn(
component_ctx: &ComponentsContext,
component_name: &str,
component_configs: &ComponentsConfiguration,
mut request: http::Request<Incoming>,
) -> EdgeFunctionOutput {
let mut store = component_ctx.empty_store_with_stdout();
let data = store.data_mut();
if component_configs.edge_function.is_empty() {
return build_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
"No edge function configured".to_string(),
);
}
let Some(component_config) = component_configs.edge_function.iter().find_map(|f| {
if f.id == component_name {
Some(f.clone())
} else {
None
}
}) else {
return build_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
format!("Component configuration not found for {component_name}"),
);
};
let settings = serde_json::to_string(&component_config.settings).unwrap_or_default();
let Ok(settings_header) = http::HeaderValue::from_str(&settings) else {
return build_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize component settings".to_string(),
);
};
request
.headers_mut()
.insert("x-edgee-component-settings", settings_header);
let wasi_req = data
.new_incoming_request(
wasmtime_wasi_http::bindings::http::types::Scheme::Http,
request,
)
.unwrap();
invoke_fn_internal(component_ctx, component_name, wasi_req, store).await
}