1use alloc::string::ToString;
6use alloc::collections::BTreeMap;
7use alloc::borrow::ToOwned;
8use alloc::vec::Vec;
9use alloc::rc::Rc;
10
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use std::sync::{Arc, Mutex};
13use std::sync::mpsc::{Sender, Receiver, channel};
14use std::thread;
15
16use rand::distr::uniform::{SampleUniform, SampleRange};
17use rand_chacha::ChaChaRng;
18use rand::{Rng, SeedableRng};
19use tokio_tungstenite::tungstenite::Message;
20use futures::{StreamExt, SinkExt};
21use uuid::Uuid;
22
23use crate::runtime::*;
24use crate::process::*;
25use crate::json::*;
26use crate::gc::*;
27use crate::std_util::*;
28use crate::vecmap::*;
29use crate::compact_str::*;
30use crate::*;
31
32const MESSAGE_REPLY_TIMEOUT: Duration = Duration::from_millis(1500);
33
34async fn call_rpc_async<C: CustomTypes<S>, S: System<C>>(context: &NetsBloxContext, client: &reqwest::Client, host: Option<&str>, service: &str, rpc: &str, args: &[(&str, &Json)]) -> Result<SimpleValue, CompactString> {
35 let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
36 let url = format!("{service_host}/{service}/{rpc}?clientId={client_id}&t={time}",
37 service_host = host.unwrap_or(context.default_service_host.as_str()), client_id = context.client_id);
38 let args = args.iter().copied().collect::<BTreeMap<_,_>>();
39
40 let res = match client.post(url).json(&args).send().await {
41 Ok(x) => x,
42 Err(_) => return Err(format_compact!("Failed to reach {}", context.base_url)),
43 };
44
45 let content_type = res.headers().get("Content-Type").and_then(|x| CompactString::from_utf8(x.as_bytes().to_owned()).ok()).map(|x| x.to_lowercase()).unwrap_or_else(|| "unknown".into());
46 let status = res.status();
47
48 let res = match res.bytes().await {
49 Ok(res) => (*res).to_owned(),
50 Err(_) => return Err(CompactString::new("Failed to read response body")),
51 };
52
53 if !status.is_success() {
54 return Err(CompactString::from_utf8(res).ok().unwrap_or_else(|| "Received ill-formed error message".into()));
55 }
56
57 if content_type.contains("image/") {
58 Ok(SimpleValue::Image(Image { content: res, center: None, name: "untitled".into() }))
59 } else if content_type.contains("audio/") {
60 Ok(SimpleValue::Audio(Audio { content: res, name: "untitled".into() }))
61 } else if let Some(x) = parse_json_slice::<Json>(&res).ok() {
62 SimpleValue::from_netsblox_json(x).map_err(|e| format_compact!("Received ill-formed success value: {e:?}"))
63 } else if let Ok(x) = CompactString::from_utf8(res) {
64 Ok(SimpleValue::Text(x))
65 } else {
66 Err("Received ill-formed success value".into())
67 }
68}
69
70pub struct StdSystem<C: CustomTypes<StdSystem<C>>> {
76 config: Config<C, Self>,
77 context: Arc<NetsBloxContext>,
78 client: reqwest::Client,
79 rng: Mutex<ChaChaRng>,
80 clock: Arc<Clock>,
81
82 rpc_request_pipe: Sender<RpcRequest<C, Self>>,
83
84 message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>,
85 message_sender: Sender<OutgoingMessage>,
86 message_injector: Sender<IncomingMessage>,
87 message_receiver: Receiver<IncomingMessage>,
88}
89impl<C: CustomTypes<StdSystem<C>>> StdSystem<C> {
90 #[tokio::main(flavor = "current_thread")]
93 pub async fn new_sync(base_url: CompactString, project_name: Option<&str>, config: Config<C, Self>, clock: Arc<Clock>) -> Self {
94 Self::new_async(base_url, project_name, config, clock).await
95 }
96 pub async fn new_async(base_url: CompactString, project_name: Option<&str>, config: Config<C, Self>, clock: Arc<Clock>) -> Self {
98 let client = reqwest::Client::builder().build().unwrap();
99 let default_service_host = {
100 let configuration = client.get(format!("{base_url}/configuration")).send().await.unwrap().json::<BTreeMap<CompactString, Json>>().await.unwrap();
101 let services_hosts = configuration["servicesHosts"].as_array().unwrap();
102 services_hosts[0].as_object().unwrap().get("url").unwrap().as_str().unwrap().into()
103 };
104
105 let mut context = NetsBloxContext {
106 base_url,
107 default_service_host,
108 client_id: format_compact!("_vm-{}", names::Generator::default().next().unwrap()),
109 project_name: project_name.unwrap_or("untitled").into(),
110
111 project_id: CompactString::default(),
112 role_name: CompactString::default(),
113 role_id: CompactString::default(),
114 };
115
116 let message_replies = Arc::new(Mutex::new(Default::default()));
117 let (message_sender, message_receiver, message_injector, ws_finish_flag) = {
118 let (base_url, client_id, project_name, message_replies) = (context.base_url.clone(), context.client_id.clone(), context.project_name.clone(), message_replies.clone());
119 let (out_sender, out_receiver) = channel();
120 let (in_sender, in_receiver) = channel();
121 let finish_flag = Arc::new(());
122
123 #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
124 async fn handler(base_url: CompactString, client_id: CompactString, project_name: CompactString, message_replies: Arc<Mutex<BTreeMap<ExternReplyKey, ReplyEntry>>>, out_receiver: Receiver<OutgoingMessage>, in_sender: Sender<IncomingMessage>, finish_flag: Arc<()>) {
125 let ws_url = format!("{}/network/{client_id}/connect", if let Some(x) = base_url.strip_prefix("http") { format!("ws{x}") } else { format!("wss://{base_url}") });
126 let (ws, _) = tokio_tungstenite::connect_async(ws_url).await.unwrap();
127 let (mut ws_sender, ws_receiver) = ws.split();
128 let (ws_sender_sender, ws_sender_receiver) = async_channel::unbounded();
129
130 tokio::spawn(async move {
131 while let Ok(msg) = ws_sender_receiver.recv().await {
132 ws_sender.send(msg).await.unwrap();
133 }
134 });
135
136 let ws_sender_sender_clone = ws_sender_sender.clone();
137 tokio::spawn(async move {
138 ws_receiver.for_each(move |packet| {
139 let ws_sender_sender_clone = ws_sender_sender_clone.clone();
140 let in_sender = in_sender.clone();
141 let message_replies = message_replies.clone();
142 async move {
143 let mut msg = match packet {
144 Ok(Message::Text(raw)) => match parse_json::<BTreeMap<CompactString, Json>>(&raw) {
145 Ok(x) => x,
146 Err(_) => return,
147 }
148 _ => return,
149 };
150 match msg.get("type").and_then(|x| x.as_str()).unwrap_or("unknown") {
151 "ping" => ws_sender_sender_clone.send(Message::Text(json!({ "type": "pong" }).to_string().into())).await.unwrap(),
152 "message" => {
153 let (msg_type, values) = match (msg.remove("msgType"), msg.remove("content")) {
154 (Some(Json::String(msg_type)), Some(Json::Object(values))) => (msg_type.into(), values),
155 _ => return,
156 };
157 if msg_type == "__reply__" {
158 let (value, reply_key) = match ({ values }.remove("body"), msg.remove("requestId")) {
159 (Some(value), Some(Json::String(request_id))) => (value, ExternReplyKey { request_id: request_id.into() }),
160 _ => return,
161 };
162 if let Some(entry) = message_replies.lock().unwrap().get_mut(&reply_key) {
163 if entry.value.is_none() {
164 entry.value = Some(value);
165 }
166 }
167 } else {
168 let reply_key = match msg.contains_key("requestId") {
169 true => match (msg.remove("srcId"), msg.remove("requestId")) {
170 (Some(Json::String(src_id)), Some(Json::String(request_id))) => Some(InternReplyKey { src_id: src_id.into(), request_id: request_id.into() }),
171 _ => return,
172 }
173 false => None,
174 };
175 let values = values.into_iter().filter_map(|(k, v)| SimpleValue::from_netsblox_json(v).ok().map(|v| (k.into(), v))).collect();
176 in_sender.send(IncomingMessage { msg_type, values, reply_key }).unwrap();
177 }
178 }
179 _ => (),
180 }
181 }
182 }).await;
183 });
184
185 ws_sender_sender.send(Message::Text(json!({ "type": "set-uuid", "clientId": client_id }).to_string().into())).await.unwrap();
186 drop(finish_flag);
187
188 let src_id = format_compact!("{project_name}@{client_id}#vm");
189 fn resolve_targets<'a>(targets: &'a mut [CompactString], src_id: &CompactString) -> &'a mut [CompactString] {
190 for target in targets.iter_mut() {
191 if *target == "everyone in room" {
192 target.clone_from(src_id);
193 }
194 }
195 targets
196 }
197 while let Ok(request) = out_receiver.recv() {
198 let msg = match request {
199 OutgoingMessage::Normal { msg_type, values, mut targets } => json!({
200 "type": "message",
201 "dstId": resolve_targets(&mut targets, &src_id),
202 "srcId": src_id,
203 "msgType": msg_type,
204 "content": values.into_iter().collect::<BTreeMap<_,_>>(),
205 }),
206 OutgoingMessage::Blocking { msg_type, values, mut targets, reply_key } => json!({
207 "type": "message",
208 "dstId": resolve_targets(&mut targets, &src_id),
209 "srcId": src_id,
210 "msgType": msg_type,
211 "requestId": reply_key.request_id,
212 "content": values.into_iter().collect::<BTreeMap<_,_>>(),
213 }),
214 OutgoingMessage::Reply { value, reply_key } => json!({
215 "type": "message",
216 "dstId": reply_key.src_id,
217 "msgType": "__reply__",
218 "requestId": reply_key.request_id,
219 "content": { "body": value },
220 }),
221 };
222 ws_sender_sender.send(Message::Text(msg.to_string().into())).await.unwrap();
223 }
224 }
225 let in_sender_clone = in_sender.clone();
226 let finish_flag_clone = finish_flag.clone();
227 thread::spawn(move || handler(base_url, client_id, project_name, message_replies, out_receiver, in_sender_clone, finish_flag_clone));
228
229 (out_sender, in_receiver, in_sender, Arc::downgrade(&finish_flag))
230 };
231
232 while ws_finish_flag.upgrade().is_some() {
233 tokio::time::sleep(Duration::from_millis(10)).await;
234 }
235
236 let meta = client.post(format!("{}/projects/", context.base_url))
237 .json(&json!({ "clientId": context.client_id, "name": context.project_name }))
238 .send().await.unwrap()
239 .json::<BTreeMap<CompactString, Json>>().await.unwrap();
240 context.project_id = meta["id"].as_str().unwrap().into();
241
242 let roles = meta["roles"].as_object().unwrap();
243 let (first_role_id, first_role_meta) = roles.get_key_value(roles.keys().next().unwrap()).unwrap();
244 let first_role_meta = first_role_meta.as_object().unwrap();
245 context.role_id = first_role_id.into();
246 context.role_name = first_role_meta.get("name").unwrap().as_str().unwrap().into();
247
248 client.post(format!("{}/network/{}/state", context.base_url, context.client_id))
249 .json(&json!({ "state": { "external": { "address": context.project_name, "appId": "vm" } } }))
250 .send().await.unwrap();
251
252 let context = Arc::new(context);
253 let rpc_request_pipe = {
254 let (client, context) = (client.clone(), context.clone());
255 let (sender, receiver) = channel();
256
257 #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
258 async fn handler<C: CustomTypes<StdSystem<C>>>(client: reqwest::Client, context: Arc<NetsBloxContext>, receiver: Receiver<RpcRequest<C, StdSystem<C>>>) {
259 while let Ok(RpcRequest { key, host, service, rpc, args }) = receiver.recv() {
260 let (client, context) = (client.clone(), context.clone());
261 tokio::spawn(async move {
262 let res = call_rpc_async::<C, StdSystem<C>>(&context, &client, host.as_deref(), &service, &rpc, &args.iter().map(|x| (x.0.as_str(), x.1)).collect::<Vec<_>>()).await;
263 key.complete(res.map(Into::into));
264 });
265 }
266 }
267 thread::spawn(move || handler(client, context, receiver));
268
269 sender
270 };
271
272 let mut seed: <ChaChaRng as SeedableRng>::Seed = Default::default();
273 getrandom::fill(&mut seed).expect("failed to generate random seed");
274
275 let context_clone = context.clone();
276 let config = config.fallback(&Config {
277 request: Some(Rc::new(move |_, key, request, proc| {
278 match request {
279 Request::Rpc { host, service, rpc, args } => match (host.as_deref(), service.as_str(), rpc.as_str(), args.as_slice()) {
280 (_, "PublicRoles", "getPublicRoleId", []) => {
281 key.complete(Ok(SimpleValue::Text(format_compact!("{}@{}#vm", context_clone.project_name, context_clone.client_id)).into()));
282 RequestStatus::Handled
283 }
284 _ => {
285 match args.into_iter().map(|(k, v)| Ok((k, v.to_simple()?.into_netsblox_json()))).collect::<Result<_,ToSimpleError<_,_>>>() {
286 Ok(args) => proc.global_context.borrow().system.rpc_request_pipe.send(RpcRequest { host, service, rpc, args, key }).unwrap(),
287 Err(err) => key.complete(Err(format_compact!("failed to convert RPC args to json: {err:?}"))),
288 }
289 RequestStatus::Handled
290 }
291 }
292 _ => RequestStatus::UseDefault { key, request },
293 }
294 })),
295 command: None,
296 });
297
298 Self {
299 config, context, client, clock,
300 rng: Mutex::new(ChaChaRng::from_seed(seed)),
301 rpc_request_pipe,
302 message_replies, message_sender, message_receiver, message_injector,
303 }
304 }
305
306 pub async fn call_rpc_async(&self, host: Option<&str>, service: &str, rpc: &str, args: &[(&str, &Json)]) -> Result<SimpleValue, CompactString> {
309 call_rpc_async::<C, Self>(&self.context, &self.client, host, service, rpc, args).await
310 }
311
312 pub fn get_public_id(&self) -> CompactString {
314 format_compact!("{}@{}#vm", self.context.project_name, self.context.client_id)
315 }
316
317 pub fn inject_message(&self, msg_type: CompactString, values: VecMap<CompactString, SimpleValue, false>) {
319 self.message_injector.send(IncomingMessage { msg_type, values, reply_key: None }).unwrap();
320 }
321
322 #[cfg(debug_assertions)]
323 fn check_runtime_borrows<'gc>(mc: &Mutation<'gc>, proc: &mut Process<'gc, C, Self>) {
324 fn check_symbols<'gc, C: CustomTypes<StdSystem<C>>>(mc: &Mutation<'gc>, symbols: &SymbolTable<'gc, C, StdSystem<C>>) {
325 for symbol in symbols.iter() {
326 match &*symbol.1.get() {
327 Value::Bool(_) | Value::Number(_) | Value::Text(_) | Value::Audio(_) | Value::Image(_) | Value::Native(_) => (),
328 Value::List(x) => { x.borrow_mut(mc); }
329 Value::Closure(x) => { x.borrow_mut(mc); }
330 Value::Entity(x) => { x.borrow_mut(mc); }
331 }
332 }
333 }
334 fn check_entity<'gc, C: CustomTypes<StdSystem<C>>>(mc: &Mutation<'gc>, entity: &mut Entity<'gc, C, StdSystem<C>>) {
335 check_symbols(mc, &entity.fields);
336 if let Some(original) = entity.original {
337 check_entity(mc, &mut *original.borrow_mut(mc));
338 }
339 }
340
341 let global_context = proc.global_context.borrow_mut(mc);
342 check_symbols(mc, &global_context.globals);
343 for entry in proc.get_call_stack() {
344 check_symbols(mc, &entry.locals);
345 check_entity(mc, &mut entry.entity.borrow_mut(mc));
346 }
347 for entity in global_context.entities.iter() {
348 check_entity(mc, &mut *entity.1.borrow_mut(mc));
349 }
350 }
351}
352impl<C: CustomTypes<StdSystem<C>>> System<C> for StdSystem<C> {
353 type RequestKey = AsyncKey<Result<C::Intermediate, CompactString>>;
354 type CommandKey = AsyncKey<Result<(), CompactString>>;
355
356 fn rand<T: SampleUniform, R: SampleRange<T>>(&self, range: R) -> T {
357 self.rng.lock().unwrap().random_range(range)
358 }
359
360 fn time(&self, precision: Precision) -> SysTime {
361 SysTime::Real { local: self.clock.read(precision) }
362 }
363
364 fn perform_request<'gc>(&self, mc: &Mutation<'gc>, request: Request<'gc, C, Self>, proc: &mut Process<'gc, C, Self>) -> Result<Self::RequestKey, ErrorCause<C, Self>> {
365 #[cfg(debug_assertions)]
366 Self::check_runtime_borrows(mc, proc);
367
368 Ok(match self.config.request.as_ref() {
369 Some(handler) => {
370 let key = AsyncKey::new();
371 match handler(mc, key.clone(), request, proc) {
372 RequestStatus::Handled => key,
373 RequestStatus::UseDefault { key: _, request } => return Err(ErrorCause::NotSupported { feature: request.feature() }),
374 }
375 }
376 None => return Err(ErrorCause::NotSupported { feature: request.feature() }),
377 })
378 }
379 fn poll_request<'gc>(&self, mc: &Mutation<'gc>, key: &Self::RequestKey, _proc: &mut Process<'gc, C, Self>) -> Result<AsyncResult<Result<Value<'gc, C, Self>, CompactString>>, ErrorCause<C, Self>> {
380 #[cfg(debug_assertions)]
381 Self::check_runtime_borrows(mc, _proc);
382
383 Ok(match key.poll() {
384 AsyncResult::Completed(Ok(x)) => AsyncResult::Completed(Ok(C::from_intermediate(mc, x))),
385 AsyncResult::Completed(Err(x)) => AsyncResult::Completed(Err(x)),
386 AsyncResult::Pending => AsyncResult::Pending,
387 AsyncResult::Consumed => AsyncResult::Consumed,
388 })
389 }
390
391 fn perform_command<'gc>(&self, mc: &Mutation<'gc>, command: Command<'gc, '_, C, Self>, proc: &mut Process<'gc, C, Self>) -> Result<Self::CommandKey, ErrorCause<C, Self>> {
392 #[cfg(debug_assertions)]
393 Self::check_runtime_borrows(mc, proc);
394
395 Ok(match self.config.command.as_ref() {
396 Some(handler) => {
397 let key = AsyncKey::new();
398 match handler(mc, key.clone(), command, proc) {
399 CommandStatus::Handled => key,
400 CommandStatus::UseDefault { key: _, command } => return Err(ErrorCause::NotSupported { feature: command.feature() }),
401 }
402 }
403 None => return Err(ErrorCause::NotSupported { feature: command.feature() }),
404 })
405 }
406 fn poll_command<'gc>(&self, _mc: &Mutation<'gc>, key: &Self::CommandKey, _proc: &mut Process<'gc, C, Self>) -> Result<AsyncResult<Result<(), CompactString>>, ErrorCause<C, Self>> {
407 #[cfg(debug_assertions)]
408 Self::check_runtime_borrows(_mc, _proc);
409
410 Ok(key.poll())
411 }
412
413 fn send_message(&self, msg_type: CompactString, values: VecMap<CompactString, Json, false>, targets: Vec<CompactString>, expect_reply: bool) -> Result<Option<ExternReplyKey>, ErrorCause<C, StdSystem<C>>> {
414 let (msg, reply_key) = match expect_reply {
415 false => (OutgoingMessage::Normal { msg_type, values, targets }, None),
416 true => {
417 let reply_key = ExternReplyKey { request_id: format_compact!("{}", Uuid::new_v4()) };
418 let expiry = self.clock.read(Precision::Medium) + MESSAGE_REPLY_TIMEOUT;
419 self.message_replies.lock().unwrap().insert(reply_key.clone(), ReplyEntry { expiry, value: None });
420 (OutgoingMessage::Blocking { msg_type, values, targets, reply_key: reply_key.clone() }, Some(reply_key))
421 }
422 };
423 self.message_sender.send(msg).unwrap();
424 Ok(reply_key)
425 }
426 fn poll_reply(&self, key: &ExternReplyKey) -> AsyncResult<Option<Json>> {
427 let mut message_replies = self.message_replies.lock().unwrap();
428 let entry = message_replies.get(key).unwrap();
429 if entry.value.is_some() {
430 return AsyncResult::Completed(message_replies.remove(key).unwrap().value);
431 }
432 if self.clock.read(Precision::Low) > entry.expiry {
433 message_replies.remove(key).unwrap();
434 return AsyncResult::Completed(None);
435 }
436 AsyncResult::Pending
437 }
438 fn send_reply(&self, key: InternReplyKey, value: Json) -> Result<(), ErrorCause<C, Self>> {
439 self.message_sender.send(OutgoingMessage::Reply { value, reply_key: key }).unwrap();
440 Ok(())
441 }
442 fn receive_message(&self) -> Option<IncomingMessage> {
443 self.message_receiver.try_recv().ok()
444 }
445}