wactor/
lib.rs

1//! # Wactor
2//! WASM actor system based on [lunatic](https://github.com/lunatic-solutions/lunatic).
3//!
4//! Actors run on isolated green threads. They cannot share memory, and communicate only through
5//! input and output messages. Consequently messages must be serialized to travel between threads.
6//!
7//! ## Example
8//! ```rust
9//! use serde::{Deserialize, Serialize};
10//! use wactor::*;
11//!
12//! struct Counter {
13//!     count: u32,
14//! }
15//!
16//! #[derive(Serialize, Deserialize)]
17//! enum Input {
18//!     AddOne,
19//! }
20//!
21//! #[derive(Serialize, Deserialize, PartialEq, Debug)]
22//! enum Output {
23//!     Count(u32),
24//! }
25//!
26//! impl Actor for Counter {
27//!     type Input = Input;
28//!     type Output = Output;
29//!
30//!     fn create() -> Self {
31//!         Self { count: 0 }
32//!     }
33//!
34//!     fn handle(&mut self, msg: Self::Input, link: &Link<Self>) {
35//!         match msg {
36//!             Input::AddOne => {
37//!                 // Increment count by 1.
38//!                 self.count += 1;
39//!                 // Respond with new count. This fails if our recipient has been dropped.
40//!                 link.respond(Output::Count(self.count)).ok();
41//!             }
42//!         }
43//!     }
44//! }
45//!
46//! fn main() {
47//!     // Spawn our actor. We get a bridge for sending and receiving messages. Can be cloned for
48//!     // multiple owners. Actor is dropped after all bridges have been dropped.
49//!     let bridge = wactor::spawn::<Counter>();
50//!     // Send our input message. This fails if our actor has panicked (unrecoverable error).
51//!     bridge.send(Input::AddOne).expect("Dead actor");
52//!     // Block until a response is received. This also fails if our actor has panicked.
53//!     let result = bridge.receive();
54//!     // Assert we received the correct value.
55//!     assert_eq!(result, Ok(Output::Count(1)));
56//! }
57//! ```
58//!
59//! ### How to run
60//! Install lunatic then build and run:
61//!
62//!     cargo build --release --target=wasm32-wasi --example basic
63//!     lunatic target/wasm32-wasi/release/examples/basic.wasm
64
65use lunatic::{
66    channel::{self, Receiver, Sender},
67    Process,
68};
69use serde::{de::DeserializeOwned, Deserialize, Serialize};
70
71/// Actors run on isolated green threads. The cannot share memory, and communicate only through
72/// input and output messages. Consequently messages must be serialized to travel between threads.
73pub trait Actor: Sized {
74    type Input: Serialize + DeserializeOwned;
75    type Output: Serialize + DeserializeOwned;
76
77    /// Create this actor.
78    fn create() -> Self;
79    /// Handle an input message.
80    fn handle(&mut self, msg: Self::Input, link: &Link<Self>);
81}
82
83/// Spawn a new [Actor], returning its [Bridge]. Actor is dropped when all bridges have been
84/// dropped.
85pub fn spawn<A: Actor>() -> Bridge<A> {
86    let (in_sender, in_receiver) = channel::unbounded::<A::Input>();
87    let (out_sender, out_receiver) = channel::unbounded::<A::Output>();
88
89    Process::spawn_with((in_receiver, out_sender), |(receiver, sender)| {
90        Context {
91            link: Link { sender, receiver },
92            actor: A::create(),
93        }
94        .run()
95    })
96    .detach();
97
98    Bridge {
99        sender: in_sender,
100        receiver: out_receiver,
101    }
102}
103
104/// Bridge to an actor. Can be cloned for multiple owners. Actor is dropped when all bridges have
105/// been dropped.
106#[derive(Serialize, Deserialize)]
107pub struct Bridge<A: Actor> {
108    sender: Sender<A::Input>,
109    receiver: Receiver<A::Output>,
110}
111
112impl<A: Actor> Bridge<A> {
113    /// Send input message. This fails if the actor has panicked.
114    pub fn send(&self, msg: A::Input) -> Result<(), ()> {
115        self.sender.send(msg)
116    }
117    /// Block until a response is received. This fails if the actor has panicked.
118    pub fn receive(&self) -> Result<A::Output, ()> {
119        self.receiver.receive()
120    }
121}
122
123impl<A: Actor> Clone for Bridge<A> {
124    fn clone(&self) -> Self {
125        Self {
126            sender: self.sender.clone(),
127            receiver: self.receiver.clone(),
128        }
129    }
130}
131
132/// Link for responding to input messages.
133pub struct Link<A: Actor> {
134    sender: Sender<A::Output>,
135    receiver: Receiver<A::Input>,
136}
137
138impl<A: Actor> Link<A> {
139    /// Respond with given output message. Fails if recipient has been dropped.
140    pub fn respond(&self, msg: A::Output) -> Result<(), ()> {
141        self.sender.send(msg)
142    }
143
144    fn receive(&self) -> Result<A::Input, ()> {
145        self.receiver.receive()
146    }
147}
148
149/// Context for actor execution.
150struct Context<A: Actor> {
151    link: Link<A>,
152    actor: A,
153}
154
155impl<A: Actor> Context<A> {
156    fn run(mut self) {
157        // Receive messages until we get an error, meaning all recipients have been dropped.
158        while let Ok(msg) = self.link.receive() {
159            self.actor.handle(msg, &self.link);
160        }
161    }
162}