1use crate::{error::*, JsEngine, JsonValue};
2use std::fmt;
3
4use js::{Context, Function, Object, Promise, Tokio};
5use snafu::ResultExt;
6use tracing::debug;
7impl JsEngine {
8 #[cfg(feature = "dispatcher")]
9 pub fn create() -> Result<(Self, flume::Receiver<crate::MsgChannel>)> {
10 let (tx, rx) = flume::unbounded::<crate::MsgChannel>();
11 let rt = js::Runtime::new().context(JsRuntimeSnafu)?;
12 rt.set_max_stack_size(256 * 1024);
13 rt.set_memory_limit(2 * 1024 * 1024);
14
15 let ctx = Context::full(&rt).context(JsContextSnafu)?;
16 rt.spawn_executor(Tokio);
17
18 let engine = Self {
19 runtime: rt,
20 context: ctx,
21 sender: tx,
22 };
23 engine.init_globals()?;
24 Ok((engine, rx))
25 }
26
27 #[cfg(not(feature = "dispatcher"))]
28 pub fn create() -> Result<Self> {
29 let rt = js::Runtime::new().context(JsRuntimeSnafu)?;
30 rt.set_max_stack_size(256 * 1024);
31 rt.set_memory_limit(2 * 1024 * 1024);
32
33 let ctx = Context::full(&rt).context(JsContextSnafu)?;
34 rt.spawn_executor(Tokio);
35
36 let engine = Self {
37 runtime: rt,
38 context: ctx,
39 };
40 engine.init_globals()?;
41 Ok(engine)
42 }
43
44 #[cfg(feature = "builtin_processor")]
45 pub fn create_with_processors(
46 processors: Vec<(&str, &str, Box<dyn crate::Processor>)>,
47 ) -> Result<Self, Error> {
48 let (engine, rx) = Self::create()?;
49
50 run_processors(rx, processors);
51 Ok(engine)
52 }
53
54 pub async fn run(&self, code: &str, req: JsonValue) -> Result<JsonValue, Error> {
55 let ret: Result<Promise<JsonValue>, js::Error> = self.context.with(|ctx| {
56 let src = format!(r#"export default async function(req) {{ {} }}"#, code);
57 debug!("code to execute: {}", src);
58 let m = ctx.compile("script", src)?;
59 let fun = m.get::<_, Function>("default")?;
60
61 fun.call((req,))
62 });
63 ret.context(JsExecuteSnafu)?.await.context(JsExecuteSnafu)
64 }
65
66 pub fn load_global_js(&self, name: &str, code: &str) -> Result<()> {
67 let ret: Result<(), js::Error> = self.context.with(|ctx| {
68 let global = ctx.globals();
69 let m = ctx.compile(name, code)?;
70 let obj = m.get::<_, Object>("default")?;
71 for item in obj.into_iter() {
72 let (k, v) = item?;
73 global.set(k, v)?;
74 }
75 Ok(())
76 });
77 ret.context(JsExecuteSnafu)
78 }
79
80 fn init_globals(&self) -> Result<(), Error> {
81 let ret: Result<(), js::Error> = self.context.with(|ctx| {
82 let global = ctx.globals();
83 #[cfg(feature = "console")]
84 {
85 use crate::builtins::{con::Console, Con};
86 global.init_def::<Con>()?;
87 global.set("console", Console)?;
88 }
89 #[cfg(feature = "fetch")]
90 {
91 use crate::builtins::Fetch;
92 ctx.globals().init_def::<Fetch>()?;
93 }
94
95 #[cfg(feature = "dispatcher")]
96 {
97 use crate::builtins::{disp::Dispatcher, Disp};
98 global.init_def::<Disp>()?;
99 global.set("dispatcher", Dispatcher::new(self.sender.clone()))?;
100 }
101 Ok(())
102 });
103 ret.context(JsExecuteSnafu)
104 }
105}
106
107impl fmt::Debug for JsEngine {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("JsEngine").finish()
110 }
111}
112
113#[cfg(all(feature = "builtin_processor", feature = "dispatcher"))]
114fn run_processors(
115 rx: flume::Receiver<crate::MsgChannel>,
116 processors: Vec<(&str, &str, Box<dyn crate::Processor>)>,
117) {
118 let processors: std::collections::HashMap<(String, String), Box<dyn crate::Processor>> =
119 processors
120 .into_iter()
121 .map(|(ns, name, processor)| ((ns.to_owned(), name.to_owned()), processor))
122 .collect();
123
124 tokio::spawn(async move {
125 while let Ok(msg) = rx.recv_async().await {
126 let name = format!("{}.{}", msg.namespace, msg.name);
127 tracing::info!("Received request for {name}: {:#?}", msg.args);
128 let processor = match processors.get(&(msg.namespace, msg.name)) {
129 Some(p) => p,
130 None => {
131 if let Err(e) = msg.res.send(Err(format!("{} not found", name))) {
132 tracing::warn!("send error: {:?}", e);
133 };
134 continue;
135 }
136 };
137 let ret = processor.call(msg.args).await;
138 if let Err(e) = msg.res.send(ret) {
139 tracing::warn!("send error: {:?}", e);
140 }
141 }
142 });
143}
144
145#[cfg(test)]
146mod tests {
147 #[allow(unused_imports)]
148 use super::*;
149 #[allow(unused_imports)]
150 use anyhow::Result;
151 #[allow(unused_imports)]
152 use serde_json::json;
153
154 #[cfg(feature = "builtin_processor")]
155 fn auth_create_token(args: JsonValue) -> std::result::Result<JsonValue, String> {
156 Ok(args)
157 }
158
159 #[cfg(feature = "builtin_processor")]
160 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
161 async fn js_engine_should_work() -> Result<()> {
162 tracing_subscriber::fmt::init();
163
164 let engine = JsEngine::create_with_processors(vec![(
165 "auth",
166 "create_token",
167 Box::new(auth_create_token) as Box<dyn crate::Processor>,
168 )])?;
169 #[cfg(feature = "console")]
170 engine
171 .run(
172 "let a = 1;console.log(`hello world ${a}`, a)",
173 JsonValue::null(),
174 )
175 .await?;
176 let ret = engine
177 .run(
178 "return dispatcher.dispatch('auth', 'create_token', {a: 1})",
179 JsonValue::null(),
180 )
181 .await?;
182
183 assert_eq!(ret.0, json!({"a": 1}));
184 Ok(())
185 }
186
187 #[cfg(feature = "fetch")]
188 #[cfg(not(feature = "dispatcher"))]
189 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
190 async fn fetch_should_work() {
191 let engine = JsEngine::create().expect("valid");
192 let ret = engine
193 .run(
194 "return await fetch('https://httpbin.org/get');",
195 JsonValue::null(),
196 )
197 .await
198 .expect("valid");
199 assert_eq!(ret.0["url"], "https://httpbin.org/get");
200 }
201}