edgee_components_runtime/edge_function/
mod.rs

1pub mod versions;
2
3use std::convert::Infallible;
4use wasmtime::Store;
5use wasmtime_wasi_http::types::HostIncomingRequest;
6use wasmtime_wasi_http::WasiHttpView;
7
8use crate::config::ComponentsConfiguration;
9
10use crate::context::ComponentsContext;
11
12use bytes::Bytes;
13use http_body_util::combinators::BoxBody;
14use http_body_util::BodyExt;
15use hyper::body::Incoming;
16
17use http_body_util::Full;
18use hyper::Response;
19
20pub struct EdgeFunctionOutput {
21    pub status: http::StatusCode,
22    pub headers: http::HeaderMap,
23    pub body: Vec<u8>,
24}
25
26fn build_response(status: http::StatusCode, body: String) -> EdgeFunctionOutput {
27    EdgeFunctionOutput {
28        status,
29        headers: http::HeaderMap::new(),
30        body: body.into_bytes(),
31    }
32}
33
34impl From<EdgeFunctionOutput> for Response<BoxBody<Bytes, Infallible>> {
35    fn from(output: EdgeFunctionOutput) -> Self {
36        let mut builder = Response::builder().status(output.status);
37        let headers = builder.headers_mut().unwrap();
38        for (name, value) in output.headers.iter() {
39            headers.insert(name, value.clone());
40        }
41        builder
42            .body(Full::from(Bytes::from(output.body)).boxed())
43            .unwrap()
44    }
45}
46
47pub async fn invoke_fn_internal(
48    component_ctx: &ComponentsContext,
49    component_name: &str,
50    request: wasmtime::component::Resource<HostIncomingRequest>,
51    mut store: Store<crate::context::HostState>,
52) -> EdgeFunctionOutput {
53    let data = store.data_mut();
54    let (sender, receiver) = tokio::sync::oneshot::channel();
55    let response = data.new_response_outparam(sender).unwrap();
56    let component = component_ctx
57        .get_edge_function_1_0_0_instance(component_name, &mut store)
58        .await
59        .unwrap();
60
61    // call the WASI HTTP handler
62    tokio::task::spawn(async move {
63        match component
64            .wasi_http_incoming_handler()
65            .call_handle(store, request, response)
66            .await
67        {
68            Ok(()) => {}
69            Err(e) => {
70                println!("WASI HTTP handler failed: {e}");
71            }
72        }
73    });
74
75    // wait for data to stream from the WASI HTTP handler
76    let resp = match receiver.await {
77        // If the client calls `response-outparam::set` then one of these
78        // methods will be called.
79        Ok(Ok(response)) => {
80            let status = response.status();
81            let headers = response.headers().clone();
82            let body = match response.into_body().collect().await {
83                Ok(body) => body.to_bytes().to_vec(),
84                Err(e) => {
85                    println!("Failed to collect response body: {e}");
86                    return build_response(
87                    http::StatusCode::INTERNAL_SERVER_ERROR,
88                    "Failed to collect response body".to_string(),
89                    );
90                }
91            };
92            let mut builder = Response::builder().status(status);
93            let builder_headers = builder.headers_mut().unwrap();
94            for (header_name, header_value) in headers.iter() {
95            builder_headers.insert(header_name, header_value.clone());
96            }
97            // return the response with the body
98            EdgeFunctionOutput {
99                status,
100                headers: builder_headers.clone(),
101                body,
102            }
103        }
104        Ok(Err(_)) => build_response(
105            http::StatusCode::INTERNAL_SERVER_ERROR,
106            "Failed to handle response".to_string(),
107        )
108        ,
109        // Otherwise the `sender` will get dropped along with the `Store`
110        // meaning that the oneshot will get disconnected and here we can
111        // inspect the `task` result to see what happened
112        Err(_) => build_response(
113            http::StatusCode::INTERNAL_SERVER_ERROR,
114            "Failed to receive response from WASI HTTP handler".to_string()
115        )
116        };
117    resp
118}
119
120pub async fn invoke_fn(
121    component_ctx: &ComponentsContext,
122    component_name: &str,
123    component_configs: &ComponentsConfiguration,
124    mut request: http::Request<Incoming>,
125) -> EdgeFunctionOutput {
126    let mut store = component_ctx.empty_store_with_stdout();
127    let data = store.data_mut();
128
129    if component_configs.edge_function.is_empty() {
130        return build_response(
131            http::StatusCode::INTERNAL_SERVER_ERROR,
132            "No edge function configured".to_string(),
133        );
134    }
135
136    // grab the component configuration for the given component name
137    let Some(component_config) = component_configs.edge_function.iter().find_map(|f| {
138        if f.id == component_name {
139            Some(f.clone())
140        } else {
141            None
142        }
143    }) else {
144        return build_response(
145            http::StatusCode::INTERNAL_SERVER_ERROR,
146            format!("Component configuration not found for {component_name}"),
147        );
148    };
149
150    // Set the component settings as a header
151    let settings = serde_json::to_string(&component_config.settings).unwrap_or_default();
152    let Ok(settings_header) = http::HeaderValue::from_str(&settings) else {
153        return build_response(
154            http::StatusCode::INTERNAL_SERVER_ERROR,
155            "Failed to serialize component settings".to_string(),
156        );
157    };
158    request
159        .headers_mut()
160        .insert("x-edgee-component-settings", settings_header);
161
162    let wasi_req = data
163        .new_incoming_request(
164            wasmtime_wasi_http::bindings::http::types::Scheme::Http,
165            request,
166        )
167        .unwrap();
168    // Invoke the WASI HTTP handler
169    invoke_fn_internal(component_ctx, component_name, wasi_req, store).await
170}