wactor/lib.rs
1//! # Wactor
2//! WASM actor system based on [lunatic](https://github.com/lunatic-solutions/lunatic).
3//!
4//! Actors run on isolated green threads. They cannot share memory, and communicate only through
5//! input and output messages. Consequently messages must be serialized to travel between threads.
6//!
7//! ## Example
8//! ```rust
9//! use serde::{Deserialize, Serialize};
10//! use wactor::*;
11//!
12//! struct Counter {
13//! count: u32,
14//! }
15//!
16//! #[derive(Serialize, Deserialize)]
17//! enum Input {
18//! AddOne,
19//! }
20//!
21//! #[derive(Serialize, Deserialize, PartialEq, Debug)]
22//! enum Output {
23//! Count(u32),
24//! }
25//!
26//! impl Actor for Counter {
27//! type Input = Input;
28//! type Output = Output;
29//!
30//! fn create() -> Self {
31//! Self { count: 0 }
32//! }
33//!
34//! fn handle(&mut self, msg: Self::Input, link: &Link<Self>) {
35//! match msg {
36//! Input::AddOne => {
37//! // Increment count by 1.
38//! self.count += 1;
39//! // Respond with new count. This fails if our recipient has been dropped.
40//! link.respond(Output::Count(self.count)).ok();
41//! }
42//! }
43//! }
44//! }
45//!
46//! fn main() {
47//! // Spawn our actor. We get a bridge for sending and receiving messages. Can be cloned for
48//! // multiple owners. Actor is dropped after all bridges have been dropped.
49//! let bridge = wactor::spawn::<Counter>();
50//! // Send our input message. This fails if our actor has panicked (unrecoverable error).
51//! bridge.send(Input::AddOne).expect("Dead actor");
52//! // Block until a response is received. This also fails if our actor has panicked.
53//! let result = bridge.receive();
54//! // Assert we received the correct value.
55//! assert_eq!(result, Ok(Output::Count(1)));
56//! }
57//! ```
58//!
59//! ### How to run
60//! Install lunatic then build and run:
61//!
62//! cargo build --release --target=wasm32-wasi --example basic
63//! lunatic target/wasm32-wasi/release/examples/basic.wasm
64
65use lunatic::{
66 channel::{self, Receiver, Sender},
67 Process,
68};
69use serde::{de::DeserializeOwned, Deserialize, Serialize};
70
71/// Actors run on isolated green threads. The cannot share memory, and communicate only through
72/// input and output messages. Consequently messages must be serialized to travel between threads.
73pub trait Actor: Sized {
74 type Input: Serialize + DeserializeOwned;
75 type Output: Serialize + DeserializeOwned;
76
77 /// Create this actor.
78 fn create() -> Self;
79 /// Handle an input message.
80 fn handle(&mut self, msg: Self::Input, link: &Link<Self>);
81}
82
83/// Spawn a new [Actor], returning its [Bridge]. Actor is dropped when all bridges have been
84/// dropped.
85pub fn spawn<A: Actor>() -> Bridge<A> {
86 let (in_sender, in_receiver) = channel::unbounded::<A::Input>();
87 let (out_sender, out_receiver) = channel::unbounded::<A::Output>();
88
89 Process::spawn_with((in_receiver, out_sender), |(receiver, sender)| {
90 Context {
91 link: Link { sender, receiver },
92 actor: A::create(),
93 }
94 .run()
95 })
96 .detach();
97
98 Bridge {
99 sender: in_sender,
100 receiver: out_receiver,
101 }
102}
103
104/// Bridge to an actor. Can be cloned for multiple owners. Actor is dropped when all bridges have
105/// been dropped.
106#[derive(Serialize, Deserialize)]
107pub struct Bridge<A: Actor> {
108 sender: Sender<A::Input>,
109 receiver: Receiver<A::Output>,
110}
111
112impl<A: Actor> Bridge<A> {
113 /// Send input message. This fails if the actor has panicked.
114 pub fn send(&self, msg: A::Input) -> Result<(), ()> {
115 self.sender.send(msg)
116 }
117 /// Block until a response is received. This fails if the actor has panicked.
118 pub fn receive(&self) -> Result<A::Output, ()> {
119 self.receiver.receive()
120 }
121}
122
123impl<A: Actor> Clone for Bridge<A> {
124 fn clone(&self) -> Self {
125 Self {
126 sender: self.sender.clone(),
127 receiver: self.receiver.clone(),
128 }
129 }
130}
131
132/// Link for responding to input messages.
133pub struct Link<A: Actor> {
134 sender: Sender<A::Output>,
135 receiver: Receiver<A::Input>,
136}
137
138impl<A: Actor> Link<A> {
139 /// Respond with given output message. Fails if recipient has been dropped.
140 pub fn respond(&self, msg: A::Output) -> Result<(), ()> {
141 self.sender.send(msg)
142 }
143
144 fn receive(&self) -> Result<A::Input, ()> {
145 self.receiver.receive()
146 }
147}
148
149/// Context for actor execution.
150struct Context<A: Actor> {
151 link: Link<A>,
152 actor: A,
153}
154
155impl<A: Actor> Context<A> {
156 fn run(mut self) {
157 // Receive messages until we get an error, meaning all recipients have been dropped.
158 while let Ok(msg) = self.link.receive() {
159 self.actor.handle(msg, &self.link);
160 }
161 }
162}