netsblox_vm/
std_system.rs

1//! An customizable implementation of [`System`] which depends on the standard library.
2//! 
3//! This submodule is only available with the [`std-system`](crate) feature flag.
4
5use alloc::string::ToString;
6use alloc::collections::BTreeMap;
7use alloc::borrow::ToOwned;
8use alloc::vec::Vec;
9use alloc::rc::Rc;
10
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use std::sync::{Arc, Mutex};
13use std::sync::mpsc::{Sender, Receiver, channel};
14use std::thread;
15
16use rand::distr::uniform::{SampleUniform, SampleRange};
17use rand_chacha::ChaChaRng;
18use rand::{Rng, SeedableRng};
19use tokio_tungstenite::tungstenite::Message;
20use futures::{StreamExt, SinkExt};
21use uuid::Uuid;
22
23use crate::runtime::*;
24use crate::process::*;
25use crate::json::*;
26use crate::gc::*;
27use crate::std_util::*;
28use crate::vecmap::*;
29use crate::compact_str::*;
30use crate::*;
31
32const MESSAGE_REPLY_TIMEOUT: Duration = Duration::from_millis(1500);
33
34async fn call_rpc_async<C: CustomTypes<S>, S: System<C>>(context: &NetsBloxContext, client: &reqwest::Client, host: Option<&str>, service: &str, rpc: &str, args: &[(&str, &Json)]) -> Result<SimpleValue, CompactString> {
35    let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
36    let url = format!("{service_host}/{service}/{rpc}?clientId={client_id}&t={time}",
37        service_host = host.unwrap_or(context.default_service_host.as_str()), client_id = context.client_id);
38    let args = args.iter().copied().collect::<BTreeMap<_,_>>();
39
40    let res = match client.post(url).json(&args).send().await {
41        Ok(x) => x,
42        Err(_) => return Err(format_compact!("Failed to reach {}", context.base_url)),
43    };
44
45    let content_type = res.headers().get("Content-Type").and_then(|x| CompactString::from_utf8(x.as_bytes().to_owned()).ok()).map(|x| x.to_lowercase()).unwrap_or_else(|| "unknown".into());
46    let status = res.status();
47
48    let res = match res.bytes().await {
49        Ok(res) => (*res).to_owned(),
50        Err(_) => return Err(CompactString::new("Failed to read response body")),
51    };
52
53    if !status.is_success() {
54        return Err(CompactString::from_utf8(res).ok().unwrap_or_else(|| "Received ill-formed error message".into()));
55    }
56
57    if content_type.contains("image/") {
58        Ok(SimpleValue::Image(Image { content: res, center: None, name: "untitled".into() }))
59    } else if content_type.contains("audio/") {
60        Ok(SimpleValue::Audio(Audio { content: res, name: "untitled".into() }))
61    } else if let Some(x) = parse_json_slice::<Json>(&res).ok() {
62        SimpleValue::from_netsblox_json(x).map_err(|e| format_compact!("Received ill-formed success value: {e:?}"))
63    } else if let Ok(x) = CompactString::from_utf8(res) {
64        Ok(SimpleValue::Text(x))
65    } else {
66        Err("Received ill-formed success value".into())
67    }
68}
69
70/// A type implementing the [`System`] trait which supports all features.
71/// 
72/// [`StdSystem`] can be configured with [`CustomTypes`] and [`Config`],
73/// which together allow for the definition of any external features (e.g., defining syscalls),
74/// as well as overriding default behavior (e.g., rpc intercepting).
75pub struct StdSystem<C: CustomTypes<StdSystem<C>>> {
76    config: Config<C, Self>,
77    context: Arc<NetsBloxContext>,
78    client: reqwest::Client,
79    rng: Mutex<ChaChaRng>,
80    clock: Arc<Clock>,
81
82    rpc_request_pipe: Sender<RpcRequest<C, Self>>,
83
84    message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>,
85    message_sender: Sender<OutgoingMessage>,
86    message_injector: Sender<IncomingMessage>,
87    message_receiver: Receiver<IncomingMessage>,
88}
89impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
90    /// Equivalent to [`StdSystem::new_async`] except that it can be executed outside of async context.
91    /// Note that using this from within async context can result in a panic from, e.g., `tokio` trying to create a runtime within a runtime.
92    #[tokio::main(flavor = "current_thread")]
93    pub async fn new_sync(base_url: CompactString, project_name: Option<&str>, config: Config<C, Self>, clock: Arc<Clock>) -> Self {
94        Self::new_async(base_url, project_name, config, clock).await
95    }
96    /// Initializes a new instance of [`StdSystem`] targeting the given NetsBlox server base url, e.g., `https://cloud.netsblox.org`.
97    pub async fn new_async(base_url: CompactString, project_name: Option<&str>, config: Config<C, Self>, clock: Arc<Clock>) -> Self {
98        let client = reqwest::Client::builder().build().unwrap();
99        let default_service_host = {
100            let configuration = client.get(format!("{base_url}/configuration")).send().await.unwrap().json::<BTreeMap<CompactString, Json>>().await.unwrap();
101            let services_hosts = configuration["servicesHosts"].as_array().unwrap();
102            services_hosts[0].as_object().unwrap().get("url").unwrap().as_str().unwrap().into()
103        };
104
105        let mut context = NetsBloxContext {
106            base_url,
107            default_service_host,
108            client_id: format_compact!("_vm-{}", names::Generator::default().next().unwrap()),
109            project_name: project_name.unwrap_or("untitled").into(),
110
111            project_id: CompactString::default(),
112            role_name: CompactString::default(),
113            role_id: CompactString::default(),
114        };
115
116        let message_replies = Arc::new(Mutex::new(Default::default()));
117        let (message_sender, message_receiver, message_injector, ws_finish_flag) = {
118            let (base_url, client_id, project_name, message_replies) = (context.base_url.clone(), context.client_id.clone(), context.project_name.clone(), message_replies.clone());
119            let (out_sender, out_receiver) = channel();
120            let (in_sender, in_receiver) = channel();
121            let finish_flag = Arc::new(());
122
123            #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
124            async fn handler(base_url: CompactString, client_id: CompactString, project_name: CompactString, message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>, out_receiver: Receiver<OutgoingMessage>, in_sender: Sender<IncomingMessage>, finish_flag: Arc<()>) {
125                let ws_url = format!("{}/network/{client_id}/connect", if let Some(x) = base_url.strip_prefix("http") { format!("ws{x}") } else { format!("wss://{base_url}") });
126                let (ws, _) = tokio_tungstenite::connect_async(ws_url).await.unwrap();
127                let (mut ws_sender, ws_receiver) = ws.split();
128                let (ws_sender_sender, ws_sender_receiver) = async_channel::unbounded();
129
130                tokio::spawn(async move {
131                    while let Ok(msg) = ws_sender_receiver.recv().await {
132                        ws_sender.send(msg).await.unwrap();
133                    }
134                });
135
136                let ws_sender_sender_clone = ws_sender_sender.clone();
137                tokio::spawn(async move {
138                    ws_receiver.for_each(move |packet| {
139                        let ws_sender_sender_clone = ws_sender_sender_clone.clone();
140                        let in_sender = in_sender.clone();
141                        let message_replies = message_replies.clone();
142                        async move {
143                            let mut msg = match packet {
144                                Ok(Message::Text(raw)) => match parse_json::<BTreeMap<CompactString, Json>>(&raw) {
145                                    Ok(x) => x,
146                                    Err(_) => return,
147                                }
148                                _ => return,
149                            };
150                            match msg.get("type").and_then(|x| x.as_str()).unwrap_or("unknown") {
151                                "ping" => ws_sender_sender_clone.send(Message::Text(json!({ "type": "pong" }).to_string().into())).await.unwrap(),
152                                "message" => {
153                                    let (msg_type, values) = match (msg.remove("msgType"), msg.remove("content")) {
154                                        (Some(Json::String(msg_type)), Some(Json::Object(values))) => (msg_type.into(), values),
155                                        _ => return,
156                                    };
157                                    if msg_type == "__reply__" {
158                                        let (value, reply_key) = match ({ values }.remove("body"), msg.remove("requestId")) {
159                                            (Some(value), Some(Json::String(request_id))) => (value, ExternReplyKey { request_id: request_id.into() }),
160                                            _ => return,
161                                        };
162                                        if let Some(entry) = message_replies.lock().unwrap().get_mut(&reply_key) {
163                                            if entry.value.is_none() {
164                                                entry.value = Some(value);
165                                            }
166                                        }
167                                    } else {
168                                        let reply_key = match msg.contains_key("requestId") {
169                                            true => match (msg.remove("srcId"), msg.remove("requestId")) {
170                                                (Some(Json::String(src_id)), Some(Json::String(request_id))) => Some(InternReplyKey { src_id: src_id.into(), request_id: request_id.into() }),
171                                                _ => return,
172                                            }
173                                            false => None,
174                                        };
175                                        let values = values.into_iter().filter_map(|(k, v)| SimpleValue::from_netsblox_json(v).ok().map(|v| (k.into(), v))).collect();
176                                        in_sender.send(IncomingMessage { msg_type, values, reply_key }).unwrap();
177                                    }
178                                }
179                                _ => (),
180                            }
181                        }
182                    }).await;
183                });
184
185                ws_sender_sender.send(Message::Text(json!({ "type": "set-uuid", "clientId": client_id }).to_string().into())).await.unwrap();
186                drop(finish_flag);
187
188                let src_id = format_compact!("{project_name}@{client_id}#vm");
189                fn resolve_targets<'a>(targets: &'a mut [CompactString], src_id: &CompactString) -> &'a mut [CompactString] {
190                    for target in targets.iter_mut() {
191                        if *target == "everyone in room" {
192                            target.clone_from(src_id);
193                        }
194                    }
195                    targets
196                }
197                while let Ok(request) = out_receiver.recv() {
198                    let msg = match request {
199                        OutgoingMessage::Normal { msg_type, values, mut targets } => json!({
200                            "type": "message",
201                            "dstId": resolve_targets(&mut targets, &src_id),
202                            "srcId": src_id,
203                            "msgType": msg_type,
204                            "content": values.into_iter().collect::<BTreeMap<_,_>>(),
205                        }),
206                        OutgoingMessage::Blocking { msg_type, values, mut targets, reply_key } => json!({
207                            "type": "message",
208                            "dstId": resolve_targets(&mut targets, &src_id),
209                            "srcId": src_id,
210                            "msgType": msg_type,
211                            "requestId": reply_key.request_id,
212                            "content": values.into_iter().collect::<BTreeMap<_,_>>(),
213                        }),
214                        OutgoingMessage::Reply { value, reply_key } => json!({
215                            "type": "message",
216                            "dstId": reply_key.src_id,
217                            "msgType": "__reply__",
218                            "requestId": reply_key.request_id,
219                            "content": { "body": value },
220                        }),
221                    };
222                    ws_sender_sender.send(Message::Text(msg.to_string().into())).await.unwrap();
223                }
224            }
225            let in_sender_clone = in_sender.clone();
226            let finish_flag_clone = finish_flag.clone();
227            thread::spawn(move || handler(base_url, client_id, project_name, message_replies, out_receiver, in_sender_clone, finish_flag_clone));
228
229            (out_sender, in_receiver, in_sender, Arc::downgrade(&finish_flag))
230        };
231
232        while ws_finish_flag.upgrade().is_some() {
233            tokio::time::sleep(Duration::from_millis(10)).await;
234        }
235
236        let meta = client.post(format!("{}/projects/", context.base_url))
237            .json(&json!({ "clientId": context.client_id, "name": context.project_name }))
238            .send().await.unwrap()
239            .json::<BTreeMap<CompactString, Json>>().await.unwrap();
240        context.project_id = meta["id"].as_str().unwrap().into();
241
242        let roles = meta["roles"].as_object().unwrap();
243        let (first_role_id, first_role_meta) = roles.get_key_value(roles.keys().next().unwrap()).unwrap();
244        let first_role_meta = first_role_meta.as_object().unwrap();
245        context.role_id = first_role_id.into();
246        context.role_name = first_role_meta.get("name").unwrap().as_str().unwrap().into();
247
248        client.post(format!("{}/network/{}/state", context.base_url, context.client_id))
249            .json(&json!({ "state": { "external": { "address": context.project_name, "appId": "vm" } } }))
250            .send().await.unwrap();
251
252        let context = Arc::new(context);
253        let rpc_request_pipe = {
254            let (client, context) = (client.clone(), context.clone());
255            let (sender, receiver) = channel();
256
257            #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
258            async fn handler<C: CustomTypes<StdSystem<C>>>(client: reqwest::Client, context: Arc<NetsBloxContext>, receiver: Receiver<RpcRequest<C, StdSystem<C>>>) {
259                while let Ok(RpcRequest { key, host, service, rpc, args }) = receiver.recv() {
260                    let (client, context) = (client.clone(), context.clone());
261                    tokio::spawn(async move {
262                        let res = call_rpc_async::<C, StdSystem<C>>(&context, &client, host.as_deref(), &service, &rpc, &args.iter().map(|x| (x.0.as_str(), x.1)).collect::<Vec<_>>()).await;
263                        key.complete(res.map(Into::into));
264                    });
265                }
266            }
267            thread::spawn(move || handler(client, context, receiver));
268
269            sender
270        };
271
272        let mut seed: <ChaChaRng as SeedableRng>::Seed = Default::default();
273        getrandom::fill(&mut seed).expect("failed to generate random seed");
274
275        let context_clone = context.clone();
276        let config = config.fallback(&Config {
277            request: Some(Rc::new(move |_, key, request, proc| {
278                match request {
279                    Request::Rpc { host, service, rpc, args } => match (host.as_deref(), service.as_str(), rpc.as_str(), args.as_slice()) {
280                        (_, "PublicRoles", "getPublicRoleId", []) => {
281                            key.complete(Ok(SimpleValue::Text(format_compact!("{}@{}#vm", context_clone.project_name, context_clone.client_id)).into()));
282                            RequestStatus::Handled
283                        }
284                        _ => {
285                            match args.into_iter().map(|(k, v)| Ok((k, v.to_simple()?.into_netsblox_json()))).collect::<Result<_,ToSimpleError<_,_>>>() {
286                                Ok(args) => proc.global_context.borrow().system.rpc_request_pipe.send(RpcRequest { host, service, rpc, args, key }).unwrap(),
287                                Err(err) => key.complete(Err(format_compact!("failed to convert RPC args to json: {err:?}"))),
288                            }
289                            RequestStatus::Handled
290                        }
291                    }
292                    _ => RequestStatus::UseDefault { key, request },
293                }
294            })),
295            command: None,
296        });
297
298        Self {
299            config, context, client, clock,
300            rng: Mutex::new(ChaChaRng::from_seed(seed)),
301            rpc_request_pipe,
302            message_replies, message_sender, message_receiver, message_injector,
303        }
304    }
305
306    /// Asynchronously calls an RPC and returns the result.
307    /// This function directly makes requests to NetsBlox, bypassing any RPC hook defined by [`Config`].
308    pub async fn call_rpc_async(&self, host: Option<&str>, service: &str, rpc: &str, args: &[(&str, &Json)]) -> Result<SimpleValue, CompactString> {
309        call_rpc_async::<C, Self>(&self.context, &self.client, host, service, rpc, args).await
310    }
311
312    /// Gets the public id of the running system that can be used to send messages to this client.
313    pub fn get_public_id(&self) -> CompactString {
314        format_compact!("{}@{}#vm", self.context.project_name, self.context.client_id)
315    }
316
317    /// Injects a message into the receiving queue as if received over the network.
318    pub fn inject_message(&self, msg_type: CompactString, values: VecMap<CompactString, SimpleValue, false>) {
319        self.message_injector.send(IncomingMessage { msg_type, values, reply_key: None }).unwrap();
320    }
321
322    #[cfg(debug_assertions)]
323    fn check_runtime_borrows<'gc>(mc: &Mutation<'gc>, proc: &mut Process<'gc, C, Self>) {
324        fn check_symbols<'gc, C: CustomTypes<StdSystem<C>>>(mc: &Mutation<'gc>, symbols: &SymbolTable<'gc, C, StdSystem<C>>) {
325            for symbol in symbols.iter() {
326                match &*symbol.1.get() {
327                    Value::Bool(_) | Value::Number(_) | Value::Text(_) | Value::Audio(_) | Value::Image(_) | Value::Native(_) => (),
328                    Value::List(x) => { x.borrow_mut(mc); }
329                    Value::Closure(x) => { x.borrow_mut(mc); }
330                    Value::Entity(x) => { x.borrow_mut(mc); }
331                }
332            }
333        }
334        fn check_entity<'gc, C: CustomTypes<StdSystem<C>>>(mc: &Mutation<'gc>, entity: &mut Entity<'gc, C, StdSystem<C>>) {
335            check_symbols(mc, &entity.fields);
336            if let Some(original) = entity.original {
337                check_entity(mc, &mut *original.borrow_mut(mc));
338            }
339        }
340
341        let global_context = proc.global_context.borrow_mut(mc);
342        check_symbols(mc, &global_context.globals);
343        for entry in proc.get_call_stack() {
344            check_symbols(mc, &entry.locals);
345            check_entity(mc, &mut entry.entity.borrow_mut(mc));
346        }
347        for entity in global_context.entities.iter() {
348            check_entity(mc, &mut *entity.1.borrow_mut(mc));
349        }
350    }
351}
352impl<C: CustomTypes<StdSystem<C>>> System<C> for StdSystem<C> {
353    type RequestKey = AsyncKey<Result<C::Intermediate, CompactString>>;
354    type CommandKey = AsyncKey<Result<(), CompactString>>;
355
356    fn rand<T: SampleUniform, R: SampleRange<T>>(&self, range: R) -> T {
357        self.rng.lock().unwrap().random_range(range)
358    }
359
360    fn time(&self, precision: Precision) -> SysTime {
361        SysTime::Real { local: self.clock.read(precision) }
362    }
363
364    fn perform_request<'gc>(&self, mc: &Mutation<'gc>, request: Request<'gc, C, Self>, proc: &mut Process<'gc, C, Self>) -> Result<Self::RequestKey, ErrorCause<C, Self>> {
365        #[cfg(debug_assertions)]
366        Self::check_runtime_borrows(mc, proc);
367
368        Ok(match self.config.request.as_ref() {
369            Some(handler) => {
370                let key = AsyncKey::new();
371                match handler(mc, key.clone(), request, proc) {
372                    RequestStatus::Handled => key,
373                    RequestStatus::UseDefault { key: _, request } => return Err(ErrorCause::NotSupported { feature: request.feature() }),
374                }
375            }
376            None => return Err(ErrorCause::NotSupported { feature: request.feature() }),
377        })
378    }
379    fn poll_request<'gc>(&self, mc: &Mutation<'gc>, key: &Self::RequestKey, _proc: &mut Process<'gc, C, Self>) -> Result<AsyncResult<Result<Value<'gc, C, Self>, CompactString>>, ErrorCause<C, Self>> {
380        #[cfg(debug_assertions)]
381        Self::check_runtime_borrows(mc, _proc);
382
383        Ok(match key.poll() {
384            AsyncResult::Completed(Ok(x)) => AsyncResult::Completed(Ok(C::from_intermediate(mc, x))),
385            AsyncResult::Completed(Err(x)) => AsyncResult::Completed(Err(x)),
386            AsyncResult::Pending => AsyncResult::Pending,
387            AsyncResult::Consumed => AsyncResult::Consumed,
388        })
389    }
390
391    fn perform_command<'gc>(&self, mc: &Mutation<'gc>, command: Command<'gc, '_, C, Self>, proc: &mut Process<'gc, C, Self>) -> Result<Self::CommandKey, ErrorCause<C, Self>> {
392        #[cfg(debug_assertions)]
393        Self::check_runtime_borrows(mc, proc);
394
395        Ok(match self.config.command.as_ref() {
396            Some(handler) => {
397                let key = AsyncKey::new();
398                match handler(mc, key.clone(), command, proc) {
399                    CommandStatus::Handled => key,
400                    CommandStatus::UseDefault { key: _, command } => return Err(ErrorCause::NotSupported { feature: command.feature() }),
401                }
402            }
403            None => return Err(ErrorCause::NotSupported { feature: command.feature() }),
404        })
405    }
406    fn poll_command<'gc>(&self, _mc: &Mutation<'gc>, key: &Self::CommandKey, _proc: &mut Process<'gc, C, Self>) -> Result<AsyncResult<Result<(), CompactString>>, ErrorCause<C, Self>> {
407        #[cfg(debug_assertions)]
408        Self::check_runtime_borrows(_mc, _proc);
409
410        Ok(key.poll())
411    }
412
413    fn send_message(&self, msg_type: CompactString, values: VecMap<CompactString, Json, false>, targets: Vec<CompactString>, expect_reply: bool) -> Result<Option<ExternReplyKey>, ErrorCause<C, StdSystem<C>>> {
414        let (msg, reply_key) = match expect_reply {
415            false => (OutgoingMessage::Normal { msg_type, values, targets }, None),
416            true => {
417                let reply_key = ExternReplyKey { request_id: format_compact!("{}", Uuid::new_v4()) };
418                let expiry = self.clock.read(Precision::Medium) + MESSAGE_REPLY_TIMEOUT;
419                self.message_replies.lock().unwrap().insert(reply_key.clone(), ReplyEntry { expiry, value: None });
420                (OutgoingMessage::Blocking { msg_type, values, targets, reply_key: reply_key.clone() }, Some(reply_key))
421            }
422        };
423        self.message_sender.send(msg).unwrap();
424        Ok(reply_key)
425    }
426    fn poll_reply(&self, key: &ExternReplyKey) -> AsyncResult<Option<Json>> {
427        let mut message_replies = self.message_replies.lock().unwrap();
428        let entry = message_replies.get(key).unwrap();
429        if entry.value.is_some() {
430            return AsyncResult::Completed(message_replies.remove(key).unwrap().value);
431        }
432        if self.clock.read(Precision::Low) > entry.expiry {
433            message_replies.remove(key).unwrap();
434            return AsyncResult::Completed(None);
435        }
436        AsyncResult::Pending
437    }
438    fn send_reply(&self, key: InternReplyKey, value: Json) -> Result<(), ErrorCause<C, Self>> {
439        self.message_sender.send(OutgoingMessage::Reply { value, reply_key: key }).unwrap();
440        Ok(())
441    }
442    fn receive_message(&self) -> Option<IncomingMessage> {
443        self.message_receiver.try_recv().ok()
444    }
445}