Skip to main content

embly/
lib.rs

1//! Embly is a serverless webassembly runtime. It runs small isolated functions.
2//! Functions can do a handful of things:
3//!
4//! - Receive bytes
5//! - Send bytes
6//! - Spawn a new function
7//!
8//! This library is used to access embly functionality from within a program
9//! being run by Embly.
10//!
11
12#![deny(
13    missing_docs,
14    trivial_numeric_casts,
15    unstable_features,
16    unused_extern_crates,
17    unused_features
18)]
19#![warn(unused_import_braces, unused_parens)]
20#![cfg_attr(feature = "clippy", plugin(clippy(conf_file = "../../clippy.toml")))]
21#![cfg_attr(
22    feature = "cargo-clippy",
23    allow(clippy::new_without_default, clippy::new_without_default)
24)]
25#![cfg_attr(
26    feature = "cargo-clippy",
27    warn(
28        clippy::float_arithmetic,
29        clippy::mut_mut,
30        clippy::nonminimal_bool,
31        clippy::option_map_unwrap_or,
32        clippy::option_map_unwrap_or_else,
33        clippy::unicode_not_nfc,
34        clippy::use_self
35    )
36)]
37
38pub use failure::Error;
39pub mod error;
40pub mod http;
41mod http_proto;
42pub mod kv;
43mod proto;
44mod task;
45
46use crate::prelude::*;
47use std::{
48    future::Future,
49    pin::Pin,
50    task::{Context, Poll},
51    {io, time},
52};
53
54pub mod prelude {
55    //! A "prelude" for crates using the `embly` crate
56    //!
57    //! imports io::Read and io::Write
58    //! ```
59    //! pub use std::io::Read as _;
60    //! pub use std::io::Write as _;
61    //! ```
62    pub use std::io::Read as _;
63    pub use std::io::Write as _;
64}
65
66use std::sync::Mutex;
67#[macro_use]
68extern crate lazy_static;
69
70use std::collections::btree_set::BTreeSet;
71lazy_static! {
72    static ref EVENT_REGISTRY: Mutex<BTreeSet<i32>> = { Mutex::new(BTreeSet::new()) };
73    // static ref EVENT_WAKER_REGISTRY: Mutex<HashMap<i32, Waker>> = { Mutex::new(HashMap::new()) };
74}
75
76/// Connections that handle communication between functions or gateways
77///
78/// ## Receive Bytes
79///
80/// When a function begins execution it can optionally read in any bytes that it might have
81/// been sent. Maybe there are bytes ready on startup, maybe it'll receive them later.
82///
83///
84/// ```rust
85/// use embly::{Conn, Error};
86/// use embly::prelude::*;
87///
88/// fn entrypoint(mut conn: Conn) -> Result<(), Error> {
89///     let mut buffer = Vec::new();
90///     // Conn implements std::io::Read
91///     conn.wait()?;
92///     conn.read_to_end(&mut buffer)?;
93///     
94///     // a little while later you might get another message
95///     conn.wait()?;
96///     conn.read_to_end(&mut buffer)?;
97///     return Ok(())
98/// }
99/// ```
100///
101/// ## Write Bytes
102///
103/// Bytes can be written back. A function is always executed by something. This could be a
104/// command line call, a load balancer or another function. Writing to a connection will send
105/// those bytes back to the function runner.
106///
107/// ```rust
108/// use embly::Conn;
109/// use embly::prelude::*;
110/// use std::io;
111///
112/// fn entrypoint(mut conn: Conn) -> io::Result<()> {
113///     // you can call write_all to send one message
114///     conn.write_all("Hello World".as_bytes())?;
115///
116///
117///     // Or you can make multiple calls with write if you want to construct a
118///     // message and then flush the response
119///     conn.write(b"Hello")?;
120///     conn.write(b"World")?;
121///     conn.flush()?;
122///     return Ok(())
123/// }
124/// ```
125///
126///
127///
128
129#[derive(Debug, Default)]
130pub struct Conn {
131    id: i32,
132    polled: bool,
133}
134
135impl Conn {
136    fn new(id: i32) -> Self {
137        Self { id, polled: false }
138    }
139
140    /// Read any bytes available on the connection and return them.
141    pub fn bytes(&mut self) -> Result<Vec<u8>, Error> {
142        let mut buffer = Vec::new();
143        self.read_to_end(&mut buffer)?;
144        Ok(buffer)
145    }
146    /// Read bytes available on the connection and cast them to a string
147    pub fn string(&mut self) -> Result<String, Error> {
148        let mut buffer = String::new();
149        self.read_to_string(&mut buffer)?;
150        Ok(buffer)
151    }
152    /// Wait for bytes to be available on the connection
153    /// ```
154    /// use embly::{
155    ///     Error,
156    ///     prelude::*,
157    /// };
158    /// fn run(conn: embly::Conn) -> Result<(), Error> {
159    ///     conn.wait()
160    /// }
161    /// ```
162    ///
163    /// Conn implements `Future` so better to await instead:
164    /// ```
165    /// use embly::{
166    ///     Error,
167    ///     prelude::*,
168    /// };
169    ///
170    /// async fn run(conn: embly::Conn) -> Result<(), Error> {
171    ///     conn.await
172    /// }
173    ///
174    /// ```
175    pub fn wait(&self) -> Result<(), Error> {
176        wait_id(self.id)
177    }
178}
179
180impl Copy for Conn {}
181impl Clone for Conn {
182    fn clone(&self) -> Self {
183        Self {
184            id: self.id,
185            polled: self.polled,
186        }
187    }
188}
189
190/// Spawn a Function
191///
192/// ```
193/// use embly::{Conn, spawn_function};
194/// use embly::prelude::*;
195/// use failure::Error;
196///
197/// fn entrypoint(conn: Conn) -> Result<(), Error> {
198///     let mut foo = spawn_function("github.com/maxmcd/foo")?;
199///     foo.write_all("Hello".as_bytes())?;
200///
201///     // get a response back from  foo
202///     let mut buffer = Vec::new();
203///     foo.read_to_end(&mut buffer)?;
204///     Ok(())
205/// }
206///
207/// ```
208///
209pub fn spawn_function(name: &str) -> Result<Conn, Error> {
210    Ok(Conn::new(spawn(name)?))
211}
212
213/// spawn, but then immediately send bytes along afterward
214pub fn spawn_and_send(name: &str, payload: &[u8]) -> Result<Conn, Error> {
215    let mut conn = spawn_function(name)?;
216    conn.write(&payload)?;
217    Ok(conn)
218}
219
220fn process_event_ids(ids: Vec<i32>) {
221    let mut er = EVENT_REGISTRY.lock().unwrap();
222    // let mut ewr = EVENT_WAKER_REGISTRY.lock().unwrap();
223    for id in ids {
224        // if let Some(waker) = ewr.remove(&id) {
225        //     waker.wake()
226        // }
227        er.insert(id);
228    }
229}
230
231fn has_id(id: i32) -> bool {
232    let er = EVENT_REGISTRY.lock().unwrap();
233    er.contains(&id)
234}
235
236// only used by wasm
237#[allow(dead_code)]
238fn remove_id(id: i32) {
239    let mut er = EVENT_REGISTRY.lock().unwrap();
240    er.remove(&id);
241}
242
243impl Future for Conn {
244    type Output = Result<(), Error>;
245
246    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
247        // Check for available events if we've never polled before, block
248        // indefinitely if we have
249        let timeout = if self.polled {
250            None
251        } else {
252            self.polled = true;
253            Some(time::Duration::new(0, 0))
254        };
255        let ids = events(timeout).expect("how do we handle this error");
256        process_event_ids(ids);
257        if has_id(self.id) {
258            self.polled = false;
259            Poll::Ready(Ok(()))
260        } else {
261            Poll::Pending
262        }
263    }
264}
265
266fn wait_id(id: i32) -> Result<(), Error> {
267    let mut timeout = Some(time::Duration::new(0, 0));
268    loop {
269        let ids = events(timeout)?;
270        process_event_ids(ids);
271        if has_id(id) {
272            break;
273        }
274        // the next call to events should block
275        timeout = None;
276    }
277    Ok(())
278}
279
280#[cfg(all(target_arch = "wasm32"))]
281impl io::Read for Conn {
282    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
283        remove_id(self.id);
284        read(self.id, buf)
285    }
286}
287
288#[cfg(all(target_arch = "wasm32"))]
289impl io::Write for Conn {
290    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
291        write(self.id, buf)
292    }
293    fn flush(&mut self) -> io::Result<()> {
294        Ok(())
295    }
296}
297
298#[cfg(not(target_arch = "wasm32"))]
299impl io::Read for Conn {
300    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
301        read(self.id, buf)
302    }
303}
304
305#[cfg(not(target_arch = "wasm32"))]
306impl io::Write for Conn {
307    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
308        write(self.id, buf)
309    }
310    fn flush(&mut self) -> io::Result<()> {
311        Ok(())
312    }
313}
314
315#[cfg(all(target_arch = "wasm32"))]
316#[link(wasm_import_module = "embly")]
317extern "C" {
318    fn _read(id: i32, payload: *const u8, payload_len: u32, ln: *mut i32) -> u16;
319    fn _write(id: i32, payload: *const u8, payload_len: u32, ln: *mut i32) -> u16;
320    fn _spawn(name: *const u8, name_len: u32, id: *mut i32) -> u16;
321    fn _events(
322        non_blocking: u8,
323        timeout_s: u64,
324        timeout_ns: u32,
325        ids: *const i32,
326        ids_len: u32,
327        ln: *mut i32,
328    ) -> u16;
329}
330
331#[cfg(not(target_arch = "wasm32"))]
332unsafe fn _events(
333    _non_blocking: u8,
334    _timeout_s: u64,
335    _timeout_ns: u32,
336    _ids: *const i32,
337    _ids_len: u32,
338    _ln: *mut i32,
339) -> u16 {
340    0
341}
342
343#[cfg(not(target_arch = "wasm32"))]
344unsafe fn _read(_id: i32, _payload: *const u8, _payload_len: u32, ln: *mut i32) -> u16 {
345    // lie and say no bytes
346    *ln = 0;
347    0
348}
349
350#[cfg(not(target_arch = "wasm32"))]
351unsafe fn _write(_id: i32, _payload: *const u8, payload_len: u32, ln: *mut i32) -> u16 {
352    // lie and say we write things
353    *ln = payload_len as i32;
354    0
355}
356
357#[cfg(not(target_arch = "wasm32"))]
358unsafe fn _spawn(_name: *const u8, _name_len: u32, id: *mut i32) -> u16 {
359    // lie and say the spawn id is 1
360    *id = 1;
361    0
362}
363
364fn read(id: i32, payload: &mut [u8]) -> io::Result<usize> {
365    let mut ln: i32 = 0;
366    let ln_ptr: *mut i32 = &mut ln;
367    error::wasi_err_to_io_err(unsafe {
368        _read(id, payload.as_ptr(), payload.len() as u32, ln_ptr)
369    })?;
370    Ok(ln as usize)
371}
372
373fn write(id: i32, payload: &[u8]) -> io::Result<usize> {
374    let mut ln: i32 = 0;
375    let ln_ptr: *mut i32 = &mut ln;
376    error::wasi_err_to_io_err(unsafe {
377        _write(id, payload.as_ptr(), payload.len() as u32, ln_ptr)
378    })?;
379    Ok(ln as usize)
380}
381
382fn spawn(name: &str) -> Result<i32, Error> {
383    let mut id: i32 = 0;
384    let id_ptr: *mut i32 = &mut id;
385    error::wasi_err_to_io_err(unsafe { _spawn(name.as_ptr(), name.len() as u32, id_ptr) })?;
386    Ok(id)
387}
388
389fn events(timeout: Option<time::Duration>) -> Result<Vec<i32>, Error> {
390    let mut ln: i32 = 0;
391    let ln_ptr: *mut i32 = &mut ln;
392    let out: [i32; 10] = [0; 10];
393    let mut timeout_s: u64 = 0;
394    let mut timeout_ns: u32 = 0;
395    let mut non_blocking: u8 = 0;
396    if let Some(dur) = timeout {
397        timeout_s = dur.as_secs();
398        timeout_ns = dur.subsec_nanos();
399    } else {
400        non_blocking = 1
401    };
402    error::wasi_err_to_io_err(unsafe {
403        _events(
404            non_blocking,
405            timeout_s,
406            timeout_ns,
407            out.as_ptr(),
408            out.len() as u32,
409            ln_ptr,
410        )
411    })?;
412    Ok(out[..(ln as usize)].to_vec())
413}
414
415/// Run a Function
416///
417/// ```
418/// use embly::{
419///     Error,
420///     prelude::*,
421/// };
422///
423/// fn execute(mut conn: embly::Conn) -> Result<(), Error> {
424///     conn.write_all(b"Hello\n")?;
425///     let mut out = Vec::new();
426///     conn.read_to_end(&mut out)?;
427///     println!("{:?}", out);
428///     Ok(())
429/// }
430/// async fn run(conn: embly::Conn) {
431///     match execute(conn) {
432///        Ok(_) => {}
433///        Err(err) => {println!("got error: {}", err)}
434///     }
435/// }
436/// fn main() {
437///     embly::run(run);
438/// }
439/// ```
440pub fn run<F>(to_run: fn(Conn) -> F)
441where
442    F: Future<Output = ()> + 'static,
443{
444    let c = Conn::new(1);
445    task::Task::spawn(Box::pin(to_run(c)));
446}
447
448/// Run a function and panic on error
449/// ```
450/// use embly::{
451///     Error,
452///     prelude::*,
453/// };
454///
455/// async fn execute(mut conn: embly::Conn) -> Result<(), Error> {
456///     conn.write_all(b"Hello\n")?;
457///     let mut out = Vec::new();
458///     conn.read_to_end(&mut out)?;
459///     println!("{:?}", out);
460///     Ok(())
461/// }
462/// fn main() {
463///     embly::run_catch_error(execute);
464/// }
465/// ```
466pub fn run_catch_error<F>(to_run: fn(Conn) -> F)
467where
468    F: Future<Output = Result<(), Error>> + 'static,
469{
470    let c = Conn::new(1);
471    task::Task::spawn(Box::pin(async move {
472        match to_run(c).await {
473            Ok(_) => {}
474            Err(err) => println!("got error: {}", err),
475        };
476    }));
477}