1use crate::host_call::Request as WasmRequest;
2use crate::Context;
3use crate::{create_pool, WorkerPool};
4use anyhow::{anyhow, Result};
5use axum::{
6 body::Body,
7 http::{Request, Response},
8 routing::any,
9 Router,
10};
11use land_core::storage::STORAGE;
12use lazy_static::lazy_static;
13use moka::sync::Cache;
14use once_cell::sync::OnceCell;
15use std::net::SocketAddr;
16use std::{sync::Arc, time::Duration};
17use tracing::{debug, info, info_span, warn, Instrument};
18
19lazy_static! {
20 pub static ref WASM_INSTANCES: Cache<String,Arc<WorkerPool> > = Cache::builder()
21 .time_to_live(Duration::from_secs( 60 * 60))
23 .time_to_idle(Duration::from_secs(10 * 60))
25 .build();
27}
28
29pub static DEFAULT_WASM_PATH: OnceCell<String> = OnceCell::new();
31
32pub async fn prepare_worker_pool(key: &str) -> Result<Arc<WorkerPool>> {
33 let mut instances_pool = WASM_INSTANCES.get(key);
34
35 if instances_pool.is_some() {
36 return Ok(instances_pool.unwrap());
37 }
38
39 let storage = STORAGE.get().expect("storage is not initialized");
40 if !storage.is_exist(key).await? {
41 return Err(anyhow!("key not found: {}", key));
42 }
43 let binary = storage.read(key).await?;
44
45 let mut path = std::env::temp_dir();
47 path.push(key);
48 let parent = path.parent().unwrap();
50 std::fs::create_dir_all(parent)?;
51 std::fs::write(&path, binary)?;
52 debug!("wasm temp binary write to {}", path.display());
53
54 let pool = create_pool(path.to_str().unwrap())?;
56 WASM_INSTANCES.insert(key.to_string(), Arc::new(pool));
57
58 instances_pool = WASM_INSTANCES.get(key);
59 info!("worker pool created");
60
61 Ok(instances_pool.unwrap())
62}
63
64pub async fn wasm_caller_handler(
65 req: Request<Body>,
66 wasm_path: &str,
67 req_id: String,
68) -> Result<Response<Body>> {
69 let pool = prepare_worker_pool(wasm_path)
70 .instrument(info_span!("[WASM]", wasm_path = %wasm_path))
71 .await?;
72 let mut worker = pool.get().await.map_err(|e| anyhow!(e.to_string()))?;
73 debug!("[HTTP] wasm worker pool get worker success: {}", wasm_path);
74
75 let mut headers: Vec<(String, String)> = vec![];
77 let req_headers = req.headers().clone();
78 req_headers.iter().for_each(|(k, v)| {
79 headers.push((k.to_string(), v.to_str().unwrap().to_string()));
80 });
81
82 let uri = req.uri().to_string();
83 let method = req.method().clone();
84 let mut context = Context::new(req_id);
85 let req_id = context.req_id();
86 let body = req.into_body();
87 let body_handle = context.set_body(body);
88 let wasm_req = WasmRequest {
89 method: method.as_str(),
90 uri: uri.as_str(),
91 headers: &headers,
92 body: Some(body_handle),
93 };
94
95 let span = info_span!("[WASM]", wasm_path = %wasm_path, body = ?body_handle);
96 let _enter = span.enter();
97
98 let (wasm_resp, wasm_resp_body) = match worker.handle_request(wasm_req, context).await {
99 Ok((wasm_resp, wasm_resp_body)) => (wasm_resp, wasm_resp_body),
100 Err(e) => {
101 let builder = Response::builder().status(500);
102 return Ok(builder.body(Body::from(e.to_string())).unwrap());
103 }
104 };
105
106 let mut builder = Response::builder().status(wasm_resp.status);
108 for (k, v) in wasm_resp.headers.clone() {
109 builder = builder.header(k, v);
110 }
111 if builder.headers_ref().unwrap().get("x-request-id").is_none() {
112 builder = builder.header("x-request-id", req_id.clone());
113 }
114 if wasm_resp.status >= 400 {
115 warn!( status=%wasm_resp.status, "[Response]");
116 } else {
117 info!( status=%wasm_resp.status, "[Response]");
118 }
119 Ok(builder.body(wasm_resp_body).unwrap())
120}
121
122async fn default_handler(req: Request<Body>) -> Response<Body> {
124 let req_id = uuid::Uuid::new_v4().to_string();
125 let headers = req.headers().clone();
127 let empty_wasm_path = String::new();
128 let moni_wasm = headers
129 .get("x-land-wasm")
130 .and_then(|v| v.to_str().ok())
131 .unwrap_or(DEFAULT_WASM_PATH.get().unwrap_or(&empty_wasm_path));
132
133 let method = req.method().clone();
134 let uri = req.uri().to_string();
135 let span = info_span!("[HTTP]",req_id = %req_id.clone(), method = %method, uri = %uri,);
136
137 if moni_wasm.is_empty() {
138 let _enter = span.enter();
139 let builder = Response::builder().status(404);
140 warn!(status = 404, "[Response] x-land-wasm not found");
141 return builder.body(Body::from("x-land-wasm not found")).unwrap();
142 }
143
144 match wasm_caller_handler(req, moni_wasm, req_id)
145 .instrument(span)
146 .await
147 {
148 Ok(resp) => resp,
149 Err(e) => {
150 let builder = Response::builder().status(500);
151 builder.body(Body::from(e.to_string())).unwrap()
152 }
153 }
154}
155
156pub async fn start(addr: SocketAddr) -> Result<()> {
157 let app = Router::new()
158 .route("/", any(default_handler))
159 .route("/*path", any(default_handler));
160
161 info!("Starting on {}", addr);
162
163 axum::Server::bind(&addr)
164 .serve(app.into_make_service())
165 .await?;
166 Ok(())
167}