1#![deny(
13 missing_docs,
14 trivial_numeric_casts,
15 unstable_features,
16 unused_extern_crates,
17 unused_features
18)]
19#![warn(unused_import_braces, unused_parens)]
20#![cfg_attr(feature = "clippy", plugin(clippy(conf_file = "../../clippy.toml")))]
21#![cfg_attr(
22 feature = "cargo-clippy",
23 allow(clippy::new_without_default, clippy::new_without_default)
24)]
25#![cfg_attr(
26 feature = "cargo-clippy",
27 warn(
28 clippy::float_arithmetic,
29 clippy::mut_mut,
30 clippy::nonminimal_bool,
31 clippy::option_map_unwrap_or,
32 clippy::option_map_unwrap_or_else,
33 clippy::unicode_not_nfc,
34 clippy::use_self
35 )
36)]
37
38pub use failure::Error;
39pub mod error;
40pub mod http;
41mod http_proto;
42pub mod kv;
43mod proto;
44mod task;
45
46use crate::prelude::*;
47use std::{
48 future::Future,
49 pin::Pin,
50 task::{Context, Poll},
51 {io, time},
52};
53
54pub mod prelude {
55 pub use std::io::Read as _;
63 pub use std::io::Write as _;
64}
65
66use std::sync::Mutex;
67#[macro_use]
68extern crate lazy_static;
69
70use std::collections::btree_set::BTreeSet;
71lazy_static! {
72 static ref EVENT_REGISTRY: Mutex<BTreeSet<i32>> = { Mutex::new(BTreeSet::new()) };
73 }
75
76#[derive(Debug, Default)]
130pub struct Conn {
131 id: i32,
132 polled: bool,
133}
134
135impl Conn {
136 fn new(id: i32) -> Self {
137 Self { id, polled: false }
138 }
139
140 pub fn bytes(&mut self) -> Result<Vec<u8>, Error> {
142 let mut buffer = Vec::new();
143 self.read_to_end(&mut buffer)?;
144 Ok(buffer)
145 }
146 pub fn string(&mut self) -> Result<String, Error> {
148 let mut buffer = String::new();
149 self.read_to_string(&mut buffer)?;
150 Ok(buffer)
151 }
152 pub fn wait(&self) -> Result<(), Error> {
176 wait_id(self.id)
177 }
178}
179
180impl Copy for Conn {}
181impl Clone for Conn {
182 fn clone(&self) -> Self {
183 Self {
184 id: self.id,
185 polled: self.polled,
186 }
187 }
188}
189
190pub fn spawn_function(name: &str) -> Result<Conn, Error> {
210 Ok(Conn::new(spawn(name)?))
211}
212
213pub fn spawn_and_send(name: &str, payload: &[u8]) -> Result<Conn, Error> {
215 let mut conn = spawn_function(name)?;
216 conn.write(&payload)?;
217 Ok(conn)
218}
219
220fn process_event_ids(ids: Vec<i32>) {
221 let mut er = EVENT_REGISTRY.lock().unwrap();
222 for id in ids {
224 er.insert(id);
228 }
229}
230
231fn has_id(id: i32) -> bool {
232 let er = EVENT_REGISTRY.lock().unwrap();
233 er.contains(&id)
234}
235
236#[allow(dead_code)]
238fn remove_id(id: i32) {
239 let mut er = EVENT_REGISTRY.lock().unwrap();
240 er.remove(&id);
241}
242
243impl Future for Conn {
244 type Output = Result<(), Error>;
245
246 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
247 let timeout = if self.polled {
250 None
251 } else {
252 self.polled = true;
253 Some(time::Duration::new(0, 0))
254 };
255 let ids = events(timeout).expect("how do we handle this error");
256 process_event_ids(ids);
257 if has_id(self.id) {
258 self.polled = false;
259 Poll::Ready(Ok(()))
260 } else {
261 Poll::Pending
262 }
263 }
264}
265
266fn wait_id(id: i32) -> Result<(), Error> {
267 let mut timeout = Some(time::Duration::new(0, 0));
268 loop {
269 let ids = events(timeout)?;
270 process_event_ids(ids);
271 if has_id(id) {
272 break;
273 }
274 timeout = None;
276 }
277 Ok(())
278}
279
280#[cfg(all(target_arch = "wasm32"))]
281impl io::Read for Conn {
282 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
283 remove_id(self.id);
284 read(self.id, buf)
285 }
286}
287
288#[cfg(all(target_arch = "wasm32"))]
289impl io::Write for Conn {
290 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
291 write(self.id, buf)
292 }
293 fn flush(&mut self) -> io::Result<()> {
294 Ok(())
295 }
296}
297
298#[cfg(not(target_arch = "wasm32"))]
299impl io::Read for Conn {
300 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
301 read(self.id, buf)
302 }
303}
304
305#[cfg(not(target_arch = "wasm32"))]
306impl io::Write for Conn {
307 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
308 write(self.id, buf)
309 }
310 fn flush(&mut self) -> io::Result<()> {
311 Ok(())
312 }
313}
314
315#[cfg(all(target_arch = "wasm32"))]
316#[link(wasm_import_module = "embly")]
317extern "C" {
318 fn _read(id: i32, payload: *const u8, payload_len: u32, ln: *mut i32) -> u16;
319 fn _write(id: i32, payload: *const u8, payload_len: u32, ln: *mut i32) -> u16;
320 fn _spawn(name: *const u8, name_len: u32, id: *mut i32) -> u16;
321 fn _events(
322 non_blocking: u8,
323 timeout_s: u64,
324 timeout_ns: u32,
325 ids: *const i32,
326 ids_len: u32,
327 ln: *mut i32,
328 ) -> u16;
329}
330
331#[cfg(not(target_arch = "wasm32"))]
332unsafe fn _events(
333 _non_blocking: u8,
334 _timeout_s: u64,
335 _timeout_ns: u32,
336 _ids: *const i32,
337 _ids_len: u32,
338 _ln: *mut i32,
339) -> u16 {
340 0
341}
342
343#[cfg(not(target_arch = "wasm32"))]
344unsafe fn _read(_id: i32, _payload: *const u8, _payload_len: u32, ln: *mut i32) -> u16 {
345 *ln = 0;
347 0
348}
349
350#[cfg(not(target_arch = "wasm32"))]
351unsafe fn _write(_id: i32, _payload: *const u8, payload_len: u32, ln: *mut i32) -> u16 {
352 *ln = payload_len as i32;
354 0
355}
356
357#[cfg(not(target_arch = "wasm32"))]
358unsafe fn _spawn(_name: *const u8, _name_len: u32, id: *mut i32) -> u16 {
359 *id = 1;
361 0
362}
363
364fn read(id: i32, payload: &mut [u8]) -> io::Result<usize> {
365 let mut ln: i32 = 0;
366 let ln_ptr: *mut i32 = &mut ln;
367 error::wasi_err_to_io_err(unsafe {
368 _read(id, payload.as_ptr(), payload.len() as u32, ln_ptr)
369 })?;
370 Ok(ln as usize)
371}
372
373fn write(id: i32, payload: &[u8]) -> io::Result<usize> {
374 let mut ln: i32 = 0;
375 let ln_ptr: *mut i32 = &mut ln;
376 error::wasi_err_to_io_err(unsafe {
377 _write(id, payload.as_ptr(), payload.len() as u32, ln_ptr)
378 })?;
379 Ok(ln as usize)
380}
381
382fn spawn(name: &str) -> Result<i32, Error> {
383 let mut id: i32 = 0;
384 let id_ptr: *mut i32 = &mut id;
385 error::wasi_err_to_io_err(unsafe { _spawn(name.as_ptr(), name.len() as u32, id_ptr) })?;
386 Ok(id)
387}
388
389fn events(timeout: Option<time::Duration>) -> Result<Vec<i32>, Error> {
390 let mut ln: i32 = 0;
391 let ln_ptr: *mut i32 = &mut ln;
392 let out: [i32; 10] = [0; 10];
393 let mut timeout_s: u64 = 0;
394 let mut timeout_ns: u32 = 0;
395 let mut non_blocking: u8 = 0;
396 if let Some(dur) = timeout {
397 timeout_s = dur.as_secs();
398 timeout_ns = dur.subsec_nanos();
399 } else {
400 non_blocking = 1
401 };
402 error::wasi_err_to_io_err(unsafe {
403 _events(
404 non_blocking,
405 timeout_s,
406 timeout_ns,
407 out.as_ptr(),
408 out.len() as u32,
409 ln_ptr,
410 )
411 })?;
412 Ok(out[..(ln as usize)].to_vec())
413}
414
415pub fn run<F>(to_run: fn(Conn) -> F)
441where
442 F: Future<Output = ()> + 'static,
443{
444 let c = Conn::new(1);
445 task::Task::spawn(Box::pin(to_run(c)));
446}
447
448pub fn run_catch_error<F>(to_run: fn(Conn) -> F)
467where
468 F: Future<Output = Result<(), Error>> + 'static,
469{
470 let c = Conn::new(1);
471 task::Task::spawn(Box::pin(async move {
472 match to_run(c).await {
473 Ok(_) => {}
474 Err(err) => println!("got error: {}", err),
475 };
476 }));
477}