1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
//! # Wactor //! WASM actor system based on [lunatic](https://github.com/lunatic-solutions/lunatic). //! //! Actors run on isolated green threads. They cannot share memory, and communicate only through //! input and output messages. Consequently messages must be serialized to travel between threads. //! //! ## Example //! ```rust //! use serde::{Deserialize, Serialize}; //! use wactor::*; //! //! struct Counter { //! count: u32, //! } //! //! #[derive(Serialize, Deserialize)] //! enum Input { //! AddOne, //! } //! //! #[derive(Serialize, Deserialize, PartialEq, Debug)] //! enum Output { //! Count(u32), //! } //! //! impl Actor for Counter { //! type Input = Input; //! type Output = Output; //! //! fn create() -> Self { //! Self { count: 0 } //! } //! //! fn handle(&mut self, msg: Self::Input, link: &Link<Self>) { //! match msg { //! Input::AddOne => { //! // Increment count by 1. //! self.count += 1; //! // Respond with new count. This fails if our recipient has been dropped. //! link.respond(Output::Count(self.count)).ok(); //! } //! } //! } //! } //! //! fn main() { //! // Spawn our actor. We get a bridge for sending and receiving messages. Can be cloned for //! // multiple owners. Actor is dropped after all bridges have been dropped. //! let bridge = wactor::spawn::<Counter>(); //! // Send our input message. This fails if our actor has panicked (unrecoverable error). //! bridge.send(Input::AddOne).expect("Dead actor"); //! // Block until a response is received. This also fails if our actor has panicked. //! let result = bridge.receive(); //! // Assert we received the correct value. //! assert_eq!(result, Ok(Output::Count(1))); //! } //! ``` //! //! ### How to run //! Install lunatic then build and run: //! //! cargo build --release --target=wasm32-wasi --example basic //! lunatic target/wasm32-wasi/release/examples/basic.wasm use lunatic::{ channel::{self, Receiver, Sender}, Process, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; /// Actors run on isolated green threads. The cannot share memory, and communicate only through /// input and output messages. Consequently messages must be serialized to travel between threads. pub trait Actor: Sized { type Input: Serialize + DeserializeOwned; type Output: Serialize + DeserializeOwned; /// Create this actor. fn create() -> Self; /// Handle an input message. fn handle(&mut self, msg: Self::Input, link: &Link<Self>); } /// Spawn a new [Actor], returning its [Bridge]. Actor is dropped when all bridges have been /// dropped. pub fn spawn<A: Actor>() -> Bridge<A> { let (in_sender, in_receiver) = channel::unbounded::<A::Input>(); let (out_sender, out_receiver) = channel::unbounded::<A::Output>(); Process::spawn_with((in_receiver, out_sender), |(receiver, sender)| { Context { link: Link { sender, receiver }, actor: A::create(), } .run() }) .detach(); Bridge { sender: in_sender, receiver: out_receiver, } } /// Bridge to an actor. Can be cloned for multiple owners. Actor is dropped when all bridges have /// been dropped. #[derive(Serialize, Deserialize)] pub struct Bridge<A: Actor> { sender: Sender<A::Input>, receiver: Receiver<A::Output>, } impl<A: Actor> Bridge<A> { /// Send input message. This fails if the actor has panicked. pub fn send(&self, msg: A::Input) -> Result<(), ()> { self.sender.send(msg) } /// Block until a response is received. This fails if the actor has panicked. pub fn receive(&self) -> Result<A::Output, ()> { self.receiver.receive() } } impl<A: Actor> Clone for Bridge<A> { fn clone(&self) -> Self { Self { sender: self.sender.clone(), receiver: self.receiver.clone(), } } } /// Link for responding to input messages. pub struct Link<A: Actor> { sender: Sender<A::Output>, receiver: Receiver<A::Input>, } impl<A: Actor> Link<A> { /// Respond with given output message. Fails if recipient has been dropped. pub fn respond(&self, msg: A::Output) -> Result<(), ()> { self.sender.send(msg) } fn receive(&self) -> Result<A::Input, ()> { self.receiver.receive() } } /// Context for actor execution. struct Context<A: Actor> { link: Link<A>, actor: A, } impl<A: Actor> Context<A> { fn run(mut self) { // Receive messages until we get an error, meaning all recipients have been dropped. while let Ok(msg) = self.link.receive() { self.actor.handle(msg, &self.link); } } }