[−][src]Module mogwai::txrx
Instant channels.
Just add water! ;)
Creating channels
There are a number of ways to create a channel in this module. The most straight forward is to use the function txrx. This will create a linked Transmitter + Receiver pair:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx):(Transmitter<()>, Receiver<()>) = txrx();
Or maybe you prefer an alternative syntax:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx) = txrx::<()>();
Or simply let the compiler try to figure it out:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx) = txrx(); // ...
This pair makes a linked channel. Messages you send on the Transmitter will be sent directly to the Receiver on the other end.
You can create separate terminals using the trns and recv functions. Then later in your code you can spawn new linked partners from them:
extern crate mogwai; use mogwai::prelude::*; let mut tx = trns(); let rx = tx.spawn_recv(); tx.send(&()); // rx will receive the message
extern crate mogwai; use mogwai::prelude::*; let rx = recv(); let tx = rx.new_trns(); tx.send(&()); // rx will receive the message
Note that Transmitter::spawn_recv mutates the transmitter its called on, while Receiver::new_trns requires no such mutation.
Sending messages
Once you have a txrx pair you can start sending messages:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx) = txrx(); tx.send(&()); tx.send(&()); tx.send(&());
Notice that we send references. This is because neither the transmitter nor the receiver own the messages.
It's also possible to send asynchronous messages! We can do this with
Transmitter::send_async, which takes a type that implements Future. Here
is an example of running an async web request to send some text from an
async
block:
extern crate mogwai; extern crate web_sys; use mogwai::prelude::*; use web_sys::{Request, RequestMode, RequestInit, Response}; // Here's our async function that fetches a text response from a server, // or returns an error string. async fn request_to_text(req:Request) -> Result<String, String> { let resp:Response = JsFuture::from( window() .fetch_with_request(&req) ) .await .map_err(|_| "request failed".to_string())? .dyn_into() .map_err(|_| "response is malformed")?; let text:String = JsFuture::from( resp .text() .map_err(|_| "could not get response text")? ) .await .map_err(|_| "getting text failed")? .as_string() .ok_or("couldn't get text as string".to_string())?; Ok(text) } let (tx, rx) = txrx(); tx.send_async(async { let mut opts = RequestInit::new(); opts.method("GET"); opts.mode(RequestMode::Cors); let req = Request::new_with_str_and_init( "https://worldtimeapi.org/api/timezone/Europe/London.txt", &opts ) .unwrap_throw(); request_to_text(req) .await .unwrap_or_else(|e| e) });
Responding to messages
Receivers can respond immediately to the messages that are sent to them. There is no polling and no internal message buffer. These channels are instant! Receivers do this by invoking their response function when they receive a message. The response function can be set using Receiver::respond:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx) = txrx(); rx.respond(|&()| { println!("Message received!"); }); tx.send(&());
For convenience we also have the Receiver::respond_shared method and the new_shared function that together allow you to respond using a shared mutable variable. Inside your fold function you can simply mutate this shared variable as normal. This makes it easy to encapsulate a little bit of shared state in your responder without requiring much knowledge about thread-safe asynchronous programming:
extern crate mogwai; use mogwai::prelude::*; let shared_count = new_shared(0); let (tx, rx) = txrx(); rx.respond_shared(shared_count.clone(), |count: &mut i32, &()| { *count += 1; println!("{} messages received!", *count); }); tx.send(&()); tx.send(&()); tx.send(&()); assert_eq!(*shared_count.borrow(), 3);
Composing channels
Sending messages into a transmitter and having it pop out automatically is
great, but wait, there's more! What if we have a tx_a:Transmitter<A>
and a
rx_b:Receiver<B>
, but we want to send A
s on tx_a
and have B
s pop out
of rx_b
? We could use the machinery we have and write something like:
extern crate mogwai; use mogwai::prelude::*; let (tx_a, rx_b) = { let (tx_a, rx_a) = txrx(); let (tx_b, rx_b) = txrx(); let f = |a| { a.turn_into_b() }; rx_a.respond(move |a| { tx_b.send(f(a)); }); (tx_a, rx_b) };
And indeed, it works! But that's an awful lot of boilerplate just to get a
channel of A
s to B
s. Instead we can use the txrx_map
function, which
does all of this for us given the map function. Here's an example using
a Transmitter<()>
that sends to a Receiver<i32>
:
extern crate mogwai; use mogwai::prelude::*; // For every unit that gets sent, map it to `1:i32`. let (tx_a, rx_b) = txrx_map(|&()| 1 ); let shared_count = new_shared(0); rx_b.respond_shared(shared_count.clone(), |count: &mut i32, n: &i32| { *count += n; println!("Current count is {}", *count); }); tx_a.send(&()); tx_a.send(&()); tx_a.send(&()); assert_eq!(*shared_count.borrow(), 3);
That is useful, but we can also do much more than simple maps! We can fold
over an internal state or a shared state, we can filter some of the sent
messages and we can do all those things together! Check out the txrx_*
family of functions:
You'll also find functions with these flavors in Transmitter and Receiver.
Wiring Transmitters and forwading Receivers
Another way to get a txrx pair of different types is to create each side separately using trns and recv and then wire them together:
extern crate mogwai; use mogwai::prelude::*; let mut tx = trns::<()>(); let rx = recv::<i32>(); tx.wire_map(&rx, |&()| 1);
The following make up the wire_*
family of functions on Transmitter:
- Transmitter::wire_filter_fold
- Transmitter::wire_filter_fold_async
- Transmitter::wire_filter_fold_shared
- Transmitter::wire_filter_map
- Transmitter::wire_fold
- Transmitter::wire_fold_shared
- Transmitter::wire_map
Note that they all mutate the Transmitter they are called on.
Conversely, if you would like to forward messages from a receiver into a transmitter of a different type you can "forward" messages from the receiver to the transmitter:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx) = txrx::<()>(); let (mut tx_i32, rx_i32) = txrx::<i32>(); rx.forward_map(&tx_i32, |&()| 1); let shared_got_it = new_shared(false); rx_i32.respond_shared(shared_got_it.clone(), |got_it: &mut bool, n: &i32| { println!("Got {}", *n); *got_it = true; }); tx.send(&()); assert_eq!(*shared_got_it.borrow(), true);
These make up the forward_*
family of functions on Receiver:
- Receiver::forward_filter_fold
- Receiver::forward_filter_fold_async
- Receiver::forward_filter_fold_shared
- Receiver::forward_filter_map
- Receiver::forward_fold
- Receiver::forward_fold_shared
- Receiver::forward_map
Note that they all consume the Receiver they are called on.
Cloning, branching, etc
Transmitters may be cloned. Once a transmitter is cloned a message sent on either the clone or the original will pop out on any linked receivers:
extern crate mogwai; use mogwai::prelude::*; let (tx1, rx) = txrx(); let tx2 = tx1.clone(); let shared_count = new_shared(0); rx.respond_shared(shared_count.clone(), |count: &mut i32, &()| { *count += 1; }); tx1.send(&()); tx2.send(&()); assert_eq!(*shared_count.borrow(), 2);
Receivers are a bit different from Transmitters, though. They are not clonable because they house a responder, which must be unique. Instead we can use Receiver::branch to create a new receiver that is linked to the same transmitters as the original, but owns its own unique response to messages:
extern crate mogwai; use mogwai::prelude::*; let (tx, rx1) = txrx(); let rx2 = rx1.branch(); let shared_count = new_shared(0); rx1.respond_shared(shared_count.clone(), |count: &mut i32, &()| { *count += 1; }); rx2.respond_shared(shared_count.clone(), |count: &mut i32, &()| { *count += 1; }); tx.send(&()); assert_eq!(*shared_count.borrow(), 2);
Both Transmitters and Receivers can be "branched" so that multiple
transmitters may send to the same receiver and multiple receivers may respond
to the same transmitter. These use the contra_*
family of functions on
Transmitter and the branch_*
family of functions on Receiver.
Transmitter's contra_* family
This family of functions are named after Haskell's contramap. That's
because these functions take a transmitter of B
s, some flavor of function
that transforms B
s into A
s and returns a new transmitter of A
s.
Essentially - the newly created transmitter extends the original backward,
allowing you to send A
s into it and have B
s automatically sent on the
original.
- Transmitter::contra_filter_fold
- Transmitter::contra_filter_fold_shared
- Transmitter::contra_filter_map
- Transmitter::contra_fold
- Transmitter::contra_map
Receiver's branch_* family
This family of functions all extend new receivers off of an original and
can transform messages of A
s received on the original into messages of B
s
received on the newly created receiver. This is analogous to Haskell's
fmap.
- Receiver::branch
- Receiver::branch_filter_fold
- Receiver::branch_filter_fold_shared
- Receiver::branch_filter_map
- Receiver::branch_fold
- Receiver::branch_fold_shared
- Receiver::branch_map
Receiver::merge
If you have many receivers that you would like to merge you can use the Receiver::merge function.
Done!
The channels defined here are the backbone of this library. Getting to know the many constructors and combinators may seem like a daunting task but don't worry - the patterns of branching, mapping and folding are functional programming's bread and butter. Once you get a taste for this flavor of development you'll want more (and it will get easier). But remember, no matter how much it begs, no matter how much it cries, NEVER feed Mogwai after midnight ;)
Structs
Receiver | Receive messages instantly. |
Transmitter | Send messages instantly. |
Functions
hand_clone | Clone a receiver. |
new_shared | Helper for making thread-safe shared mutable variables. |
recv | Create a new unlinked |
trns | Create a new unlinked |
txrx | Create a linked |
txrx_filter_fold | Create a linked, filtering |
txrx_filter_fold_shared | Create a linked, filtering |
txrx_filter_map | Create a linked, filtering |
txrx_fold | Create a linked |
txrx_fold_shared | Create a linked |
txrx_map | Create a linked |
wrap_future |
Type Definitions
RecvFuture |