land_runtime/
server.rs

1use crate::host_call::Request as WasmRequest;
2use crate::Context;
3use crate::{create_pool, WorkerPool};
4use anyhow::{anyhow, Result};
5use axum::{
6    body::Body,
7    http::{Request, Response},
8    routing::any,
9    Router,
10};
11use land_core::storage::STORAGE;
12use lazy_static::lazy_static;
13use moka::sync::Cache;
14use once_cell::sync::OnceCell;
15use std::net::SocketAddr;
16use std::{sync::Arc, time::Duration};
17use tracing::{debug, info, info_span, warn, Instrument};
18
19lazy_static! {
20    pub static ref WASM_INSTANCES: Cache<String,Arc<WorkerPool> > = Cache::builder()
21    // Time to live (TTL): 1 hours
22    .time_to_live(Duration::from_secs( 60 * 60))
23    // Time to idle (TTI):  10 minutes
24    .time_to_idle(Duration::from_secs(10 * 60))
25    // Create the cache.
26    .build();
27}
28
29// DEFAULT_WASM_PATH is used to set default wasm path
30pub static DEFAULT_WASM_PATH: OnceCell<String> = OnceCell::new();
31
32pub async fn prepare_worker_pool(key: &str) -> Result<Arc<WorkerPool>> {
33    let mut instances_pool = WASM_INSTANCES.get(key);
34
35    if instances_pool.is_some() {
36        return Ok(instances_pool.unwrap());
37    }
38
39    let storage = STORAGE.get().expect("storage is not initialized");
40    if !storage.is_exist(key).await? {
41        return Err(anyhow!("key not found: {}", key));
42    }
43    let binary = storage.read(key).await?;
44
45    // write binary to local file
46    let mut path = std::env::temp_dir();
47    path.push(key);
48    // create parent dir
49    let parent = path.parent().unwrap();
50    std::fs::create_dir_all(parent)?;
51    std::fs::write(&path, binary)?;
52    debug!("wasm temp binary write to {}", path.display());
53
54    // create wasm worker pool
55    let pool = create_pool(path.to_str().unwrap())?;
56    WASM_INSTANCES.insert(key.to_string(), Arc::new(pool));
57
58    instances_pool = WASM_INSTANCES.get(key);
59    info!("worker pool created");
60
61    Ok(instances_pool.unwrap())
62}
63
64pub async fn wasm_caller_handler(
65    req: Request<Body>,
66    wasm_path: &str,
67    req_id: String,
68) -> Result<Response<Body>> {
69    let pool = prepare_worker_pool(wasm_path)
70        .instrument(info_span!("[WASM]", wasm_path = %wasm_path))
71        .await?;
72    let mut worker = pool.get().await.map_err(|e| anyhow!(e.to_string()))?;
73    debug!("[HTTP] wasm worker pool get worker success: {}", wasm_path);
74
75    // convert request to host-call request
76    let mut headers: Vec<(String, String)> = vec![];
77    let req_headers = req.headers().clone();
78    req_headers.iter().for_each(|(k, v)| {
79        headers.push((k.to_string(), v.to_str().unwrap().to_string()));
80    });
81
82    let uri = req.uri().to_string();
83    let method = req.method().clone();
84    let mut context = Context::new(req_id);
85    let req_id = context.req_id();
86    let body = req.into_body();
87    let body_handle = context.set_body(body);
88    let wasm_req = WasmRequest {
89        method: method.as_str(),
90        uri: uri.as_str(),
91        headers: &headers,
92        body: Some(body_handle),
93    };
94
95    let span = info_span!("[WASM]", wasm_path = %wasm_path, body = ?body_handle);
96    let _enter = span.enter();
97
98    let (wasm_resp, wasm_resp_body) = match worker.handle_request(wasm_req, context).await {
99        Ok((wasm_resp, wasm_resp_body)) => (wasm_resp, wasm_resp_body),
100        Err(e) => {
101            let builder = Response::builder().status(500);
102            return Ok(builder.body(Body::from(e.to_string())).unwrap());
103        }
104    };
105
106    // convert host-call response to response
107    let mut builder = Response::builder().status(wasm_resp.status);
108    for (k, v) in wasm_resp.headers.clone() {
109        builder = builder.header(k, v);
110    }
111    if builder.headers_ref().unwrap().get("x-request-id").is_none() {
112        builder = builder.header("x-request-id", req_id.clone());
113    }
114    if wasm_resp.status >= 400 {
115        warn!( status=%wasm_resp.status, "[Response]");
116    } else {
117        info!( status=%wasm_resp.status, "[Response]");
118    }
119    Ok(builder.body(wasm_resp_body).unwrap())
120}
121
122// basic handler that responds with a static string
123async fn default_handler(req: Request<Body>) -> Response<Body> {
124    let req_id = uuid::Uuid::new_v4().to_string();
125    // get header x-land-wasm
126    let headers = req.headers().clone();
127    let empty_wasm_path = String::new();
128    let moni_wasm = headers
129        .get("x-land-wasm")
130        .and_then(|v| v.to_str().ok())
131        .unwrap_or(DEFAULT_WASM_PATH.get().unwrap_or(&empty_wasm_path));
132
133    let method = req.method().clone();
134    let uri = req.uri().to_string();
135    let span = info_span!("[HTTP]",req_id = %req_id.clone(), method = %method, uri = %uri,);
136
137    if moni_wasm.is_empty() {
138        let _enter = span.enter();
139        let builder = Response::builder().status(404);
140        warn!(status = 404, "[Response] x-land-wasm not found");
141        return builder.body(Body::from("x-land-wasm not found")).unwrap();
142    }
143
144    match wasm_caller_handler(req, moni_wasm, req_id)
145        .instrument(span)
146        .await
147    {
148        Ok(resp) => resp,
149        Err(e) => {
150            let builder = Response::builder().status(500);
151            builder.body(Body::from(e.to_string())).unwrap()
152        }
153    }
154}
155
156pub async fn start(addr: SocketAddr) -> Result<()> {
157    let app = Router::new()
158        .route("/", any(default_handler))
159        .route("/*path", any(default_handler));
160
161    info!("Starting on {}", addr);
162
163    axum::Server::bind(&addr)
164        .serve(app.into_make_service())
165        .await?;
166    Ok(())
167}