easy_qjs/
engine.rs

1use crate::{error::*, JsEngine, JsonValue};
2use std::fmt;
3
4use js::{Context, Function, Object, Promise, Tokio};
5use snafu::ResultExt;
6use tracing::debug;
7impl JsEngine {
8    #[cfg(feature = "dispatcher")]
9    pub fn create() -> Result<(Self, flume::Receiver<crate::MsgChannel>)> {
10        let (tx, rx) = flume::unbounded::<crate::MsgChannel>();
11        let rt = js::Runtime::new().context(JsRuntimeSnafu)?;
12        rt.set_max_stack_size(256 * 1024);
13        rt.set_memory_limit(2 * 1024 * 1024);
14
15        let ctx = Context::full(&rt).context(JsContextSnafu)?;
16        rt.spawn_executor(Tokio);
17
18        let engine = Self {
19            runtime: rt,
20            context: ctx,
21            sender: tx,
22        };
23        engine.init_globals()?;
24        Ok((engine, rx))
25    }
26
27    #[cfg(not(feature = "dispatcher"))]
28    pub fn create() -> Result<Self> {
29        let rt = js::Runtime::new().context(JsRuntimeSnafu)?;
30        rt.set_max_stack_size(256 * 1024);
31        rt.set_memory_limit(2 * 1024 * 1024);
32
33        let ctx = Context::full(&rt).context(JsContextSnafu)?;
34        rt.spawn_executor(Tokio);
35
36        let engine = Self {
37            runtime: rt,
38            context: ctx,
39        };
40        engine.init_globals()?;
41        Ok(engine)
42    }
43
44    #[cfg(feature = "builtin_processor")]
45    pub fn create_with_processors(
46        processors: Vec<(&str, &str, Box<dyn crate::Processor>)>,
47    ) -> Result<Self, Error> {
48        let (engine, rx) = Self::create()?;
49
50        run_processors(rx, processors);
51        Ok(engine)
52    }
53
54    pub async fn run(&self, code: &str, req: JsonValue) -> Result<JsonValue, Error> {
55        let ret: Result<Promise<JsonValue>, js::Error> = self.context.with(|ctx| {
56            let src = format!(r#"export default async function(req) {{ {} }}"#, code);
57            debug!("code to execute: {}", src);
58            let m = ctx.compile("script", src)?;
59            let fun = m.get::<_, Function>("default")?;
60
61            fun.call((req,))
62        });
63        ret.context(JsExecuteSnafu)?.await.context(JsExecuteSnafu)
64    }
65
66    pub fn load_global_js(&self, name: &str, code: &str) -> Result<()> {
67        let ret: Result<(), js::Error> = self.context.with(|ctx| {
68            let global = ctx.globals();
69            let m = ctx.compile(name, code)?;
70            let obj = m.get::<_, Object>("default")?;
71            for item in obj.into_iter() {
72                let (k, v) = item?;
73                global.set(k, v)?;
74            }
75            Ok(())
76        });
77        ret.context(JsExecuteSnafu)
78    }
79
80    fn init_globals(&self) -> Result<(), Error> {
81        let ret: Result<(), js::Error> = self.context.with(|ctx| {
82            let global = ctx.globals();
83            #[cfg(feature = "console")]
84            {
85                use crate::builtins::{con::Console, Con};
86                global.init_def::<Con>()?;
87                global.set("console", Console)?;
88            }
89            #[cfg(feature = "fetch")]
90            {
91                use crate::builtins::Fetch;
92                ctx.globals().init_def::<Fetch>()?;
93            }
94
95            #[cfg(feature = "dispatcher")]
96            {
97                use crate::builtins::{disp::Dispatcher, Disp};
98                global.init_def::<Disp>()?;
99                global.set("dispatcher", Dispatcher::new(self.sender.clone()))?;
100            }
101            Ok(())
102        });
103        ret.context(JsExecuteSnafu)
104    }
105}
106
107impl fmt::Debug for JsEngine {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("JsEngine").finish()
110    }
111}
112
113#[cfg(all(feature = "builtin_processor", feature = "dispatcher"))]
114fn run_processors(
115    rx: flume::Receiver<crate::MsgChannel>,
116    processors: Vec<(&str, &str, Box<dyn crate::Processor>)>,
117) {
118    let processors: std::collections::HashMap<(String, String), Box<dyn crate::Processor>> =
119        processors
120            .into_iter()
121            .map(|(ns, name, processor)| ((ns.to_owned(), name.to_owned()), processor))
122            .collect();
123
124    tokio::spawn(async move {
125        while let Ok(msg) = rx.recv_async().await {
126            let name = format!("{}.{}", msg.namespace, msg.name);
127            tracing::info!("Received request for {name}: {:#?}", msg.args);
128            let processor = match processors.get(&(msg.namespace, msg.name)) {
129                Some(p) => p,
130                None => {
131                    if let Err(e) = msg.res.send(Err(format!("{} not found", name))) {
132                        tracing::warn!("send error: {:?}", e);
133                    };
134                    continue;
135                }
136            };
137            let ret = processor.call(msg.args).await;
138            if let Err(e) = msg.res.send(ret) {
139                tracing::warn!("send error: {:?}", e);
140            }
141        }
142    });
143}
144
145#[cfg(test)]
146mod tests {
147    #[allow(unused_imports)]
148    use super::*;
149    #[allow(unused_imports)]
150    use anyhow::Result;
151    #[allow(unused_imports)]
152    use serde_json::json;
153
154    #[cfg(feature = "builtin_processor")]
155    fn auth_create_token(args: JsonValue) -> std::result::Result<JsonValue, String> {
156        Ok(args)
157    }
158
159    #[cfg(feature = "builtin_processor")]
160    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
161    async fn js_engine_should_work() -> Result<()> {
162        tracing_subscriber::fmt::init();
163
164        let engine = JsEngine::create_with_processors(vec![(
165            "auth",
166            "create_token",
167            Box::new(auth_create_token) as Box<dyn crate::Processor>,
168        )])?;
169        #[cfg(feature = "console")]
170        engine
171            .run(
172                "let a = 1;console.log(`hello world ${a}`, a)",
173                JsonValue::null(),
174            )
175            .await?;
176        let ret = engine
177            .run(
178                "return dispatcher.dispatch('auth', 'create_token', {a: 1})",
179                JsonValue::null(),
180            )
181            .await?;
182
183        assert_eq!(ret.0, json!({"a": 1}));
184        Ok(())
185    }
186
187    #[cfg(feature = "fetch")]
188    #[cfg(not(feature = "dispatcher"))]
189    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
190    async fn fetch_should_work() {
191        let engine = JsEngine::create().expect("valid");
192        let ret = engine
193            .run(
194                "return await fetch('https://httpbin.org/get');",
195                JsonValue::null(),
196            )
197            .await
198            .expect("valid");
199        assert_eq!(ret.0["url"], "https://httpbin.org/get");
200    }
201}