Module lamellar::active_messaging
source · Expand description
Active Messages are a computing model where messages contain both data (that you want to compute something with) and metadata that tells the message how to process its data when it arrives at its destination, e.g. a function pointer. The Wikipedia Page https://en.wikipedia.org/wiki/Active_message provides a short overview.
Lamellar is built upon asynchronous active messages, and provides users with an interface to construct their own active messages.
This interface is exposed through multiple Rust procedural macros and APIS.
Further details are provided in the documentation for each macro but at a high level to implement an active message we need to define the data to be transfered in a message and then define what to do with that data when we arrive at the destination.
The following examples will cover the following topics
- Construncting your first Active Message
- Lamellar AM DSL
- Lamellar AM return types
- returning plain old data
- returning active messages
- returning active messages that reutrn data
- Nested Active Messages
- Active Message Groups
- Generic Active Message Groups
- ‘Typed’ Active Message Groups
- static members
§Examples
Let’s implement a simple active message below:
First lets define the data we would like to transfer
#[derive(Debug,Clone)]
struct HelloWorld {
original_pe: usize, //this will contain the ID of the PE this data originated from
}
This looks like a pretty normal (if simple) struct, we next have to let the runtime know we would like this data
to be used in an active message, so we need to apply the AmGroup macro, this is done by replacing the derive
macro:
use lamellar::active_messaging::prelude::*;
#[AmData(Debug,Clone)]
struct HelloWorld {
original_pe: usize, //this will contain the ID of the PE this data originated from
}
This change allows the compiler to implement the proper traits (related to Serialization and Deserialization) that will let this data type be used in an active message.
Next we now need to define the processing that we would like to take place when a message arrives at another PE
For this we use the am macro on an implementation of the LamellarAM trait
#[lamellar::am]
impl LamellarAM for HelloWorld {
async fn exec(self) {
println!(
"Hello World, I'm from PE {:?}",
self.original_pe,
);
}
}
The am macro parses the provided implementation and performs a number of transformations to the code to enable execution of the active message. This macro is responsible for generating the code which will perform serialization/deserialization of both the active message data and any returned data.
Each active message implementation is assigned a unique ID at runtime initialization, these IDs are then used as the key to a Map containing specialized deserialization functions that convert a slice of bytes into the appropriate data type on the remote PE.
The final step is to actually launch an active message and await its result
fn main(){
let world = lamellar::LamellarWorldBuilder::new().build();
let my_pe = world.my_pe();
//Send a Hello World Active Message to all pes
let request = world.exec_am_all(
HelloWorld {
original_pe: my_pe,
}
);
//wait for the request to complete
world.block_on(request);
}
In this example we simply send a HelloWorld
from every PE to every other PE using exec_am_all
(please see the ActiveMessaging trait documentation for further details).
exec_am_all
returns a Future
which we can use to await the completion of our operation.
Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)
Hello World, I'm from PE 0
Hello World, I'm from PE 1
Hello World, I'm from PE 0
Hello World, I'm from PE 1
What if we wanted to actuall know where we are currently executing?
§Lamellar AM DSL
This lamellar am macro also parses the provided code block for the presence of keywords from a small DSL, specifically searching for the following token streams:
lamellar::current_pe
- return the world id of the PE this active message is executing onlamellar::num_pes
- return the number of PEs in the worldlamellar::world
- return a reference to the instantiated LamellarWorldlamellar::team
- return a reference to the LamellarTeam responsible for launching this AM
Given this functionality, we can adapt the above active message body to this:
#[lamellar::am]
impl LamellarAM for HelloWorld {
async fn exec(self) {
println!(
"Hello World on PE {:?} of {:?}, I'm from PE {:?}",
lamellar::current_pe,
lamellar::num_pes,
self.original_pe,
);
}
}
the new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)
Hello World on PE 0 of 2, I'm from PE 0
Hello World on PE 0 of 2, I'm from PE 1
Hello World on PE 1 of 2, I'm from PE 0
Hello World on PE 1 of 2, I'm from PE 1
§Active Messages with return data
In the above examples, we simply launched a remote active message but did not return an result back to the originating PE. Lamellar supports return both “plain old data”(as long as it impls AmDist) and other active messages themselves.
§Returning normal data
Lamellar Active Messages support returning data and it is as simple as specifying the return type in the implementation of the exec
function.
#[lamellar::am]
impl LamellarAM for HelloWorld {
async fn exec(self) -> usize { //specify we are returning a usize
println!(
"Hello World on PE {:?} of {:?}, I'm from PE {:?}",
lamellar::current_pe,
lamellar::num_pes,
self.original_pe,
);
lamellar::current_pe
}
}
Retrieving the result is as simple as assigning a variable to the awaited request
fn main(){
let world = lamellar::LamellarWorldBuilder::new().build();
let my_pe = world.my_pe();
//Send a Hello World Active Message to all pes
let request = world.exec_am_all(
HelloWorld {
original_pe: my_pe,
}
);
//wait for the request to complete
let results = world.block_on(request);
println!("PE {my_pe} {results:?}");
}
The new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)
Hello World on PE 0 of 2, I'm from PE 0
Hello World on PE 0 of 2, I'm from PE 1
Hello World on PE 1 of 2, I'm from PE 0
Hello World on PE 1 of 2, I'm from PE 1
PE 0 [0,1]
PE 1 [0,1]
§Returning Active Messages
Lamellar also provides the ability to return another active message as a result.
This active message will execute automatically when it arrives back at the originating node as intended as a sort of callback mechanism.
Returning an active messages requires a few more changes to our code. First we will define our new active message
#[AmData(Debug,Clone)]
struct ReturnAm{
original_pe: usize,
remote_pe: usize,
}
#[lamellar::am]
impl LamellarAm for ReturnAm{
async fn exec(self) {
println!("initiated on PE {} visited PE {} finishing on PE {}",self.original_pe,self.remote_pe,lamellar::current_pe);
}
}
With that defined we can now modify our original Active Message to return this new ReturnAm
type.
The main change is that we need to explicitly tell the macro we are returning an active message and we provide the name of the active message we are returning
#[lamellar::am(return_am = "ReturnAm")] //we explicitly tell the macro we are returning an AM
impl LamellarAM for HelloWorld {
async fn exec(self) -> usize { //specify we are returning a usize
println!(
"Hello World on PE {:?} of {:?}, I'm from PE {:?}",
lamellar::current_pe,
lamellar::num_pes,
self.original_pe,
);
ReturnAm{ //simply return an instance of the AM
original_pe: self.original_pe,
remote_pe: lamellar::current_pe,
}
}
}
We do not need to modify any of the code in our main function, so the new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)
Hello World on PE 0 of 2, I'm from PE 0
Hello World on PE 0 of 2, I'm from PE 1
Hello World on PE 1 of 2, I'm from PE 0
Hello World on PE 1 of 2, I'm from PE 1
initiated on PE 0 visited PE 0 finishing on PE 0
initiated on PE 0 visited PE 1 finishing on PE 0
initiated on PE 1 visited PE 0 finishing on PE 1
initiated on PE 1 visited PE 0 finishing on PE 1
PE 0 [(),()]
PE 1 [(),()]
By examining the above output we can see that printing the results of the request returns the unit type (well a Vector of unit types because of the exec_am_all
call).
This is because our returned AM does not return any data itself.
§Returning Active Messages which return data
Lamellar does support returning an Active Message which then returns some data.
First we need to update ReturnAm
to actually return some data
#[lamellar::am]
impl LamellarAm for ReturnAm{
async fn exec(self) -> (usize,usize) {
println!("initiated on PE {} visited PE {} finishing on PE {}",self.original_pe,self.remote_pe,lamellar::current_pe);
(self.original_pe,self.remote_pe)
}
}
Next we need to make an additional change to the HelloWorld
am to specify that our returned am will return data itself.
we do this in the argument to the am procedural macro
#[lamellar::am(return_am = "ReturnAm -> (usize,usize)")] //we explicitly tell the macro we are returning an AM which itself returns data
impl LamellarAM for HelloWorld {
async fn exec(self) -> usize { //specify we are returning a usize
println!(
"Hello World on PE {:?} of {:?}, I'm from PE {:?}",
lamellar::current_pe,
lamellar::num_pes,
self.original_pe,
);
ReturnAm{ //simply return an instance of the AM
original_pe: self.original_pe,
remote_pe: lamellar::current_pe,
}
}
}
With those changes, the new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)
Hello World on PE 0 of 2, I'm from PE 0
Hello World on PE 0 of 2, I'm from PE 1
Hello World on PE 1 of 2, I'm from PE 0
Hello World on PE 1 of 2, I'm from PE 1
initiated on PE 0 visited PE 0 finishing on PE 0
initiated on PE 0 visited PE 1 finishing on PE 0
initiated on PE 1 visited PE 0 finishing on PE 1
initiated on PE 1 visited PE 0 finishing on PE 1
PE 0 [(0,0),(0,1)]
PE 1 [(1,0),(1,1)]
§Nested Active Messages
Lamellar Active Messages support nested active messages, i.e launching a new active message from within an executing active message.
This functionality can be used to setup active message dependencies, enable recursive active messages, ect. In the following example we will construct a recursive active message that performs a ring like commincation pattern accross PEs, which will return the reverse order in which it vistited the PE’s.
use lamellar::active_messaging::prelude::*;
#[AmData(Debug,Clone)]
struct RingAm {
original_pe: usize, //this will be are recursion terminating condition
}
#[lamellar::am]
impl LamellarAm for RingAm{
async fn exec(self) -> Vec<usize>{
let cur_pe = lamellar::current_pe;
if self.original_pe == cur_pe{ //terminate the recursion!
vec![cur_pe] //return a new path with the current_pe as the start
}
else { //launch another active message
let next_pe = (cur_pe + 1 ) % lamellar::num_pes; //account for wrap arround
let req = lamellar::team.exec_am_pe(next_pe, RingAm{original_pe: self.original_pe});//we can clone self because we don't need to modify any data
let mut path = req.await; // exec_am_*() calls return a future we used to get the result from
path.push(cur_pe); //update the path with the PE and return
path
}
}
}
fn main(){
let world = lamellar::LamellarWorldBuilder::new().build();
let my_pe = world.my_pe();
let num_pes = world.num_pes();
//Send initial message to right neighbor
let next_pe = (my_pe + 1) % num_pes; //account for wrap arround
let request = world.exec_am_pe(
next_pe,
RingAm {
original_pe: my_pe
}
);
//wait for the request to complete
let results = world.block_on(request);
println!("PE {my_pe} {results:?}");
}
The key thing to notice in this example is how we wait for a request to finish will change depending on the context we are executing in.
When we are in the active message we are already in an asychronous context so we can simply await
the future returned to us by the exec_am_pe()
call.
This is in contrast to the main function where we must use a block_on
call to drive the future an retrieve the result.
The sample output for the above example on a 4 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)
PE 0 [0,3,2,1]
PE 1 [1,0,3,2]
PE 2 [2,1,0,3]
PE 3 [3,2,1,0]
§Active Message Groups
Up until now, we have seen two extremes with respect to the granularity with which active messages can be awaited.
Either awaiting all outstanding active messages in the system via wait_all()
, or awaiting an individual active message e.g. req.await
.
Lamellar also supports active message groups, which is a collection of active messages that can be awaited together.
Conceptually, an active message group can be represented as a meta active message that contains a list of the actual active messages we want to execute,
as illustrated in the pseudocode below:
#[AmData(Debug,Clone)]
struct MetaAm{
ams: Vec<impl LamellarAm>
}
#[lamellar::am]
impl LamellarAm for MetaAm{
async fn exec(self) {
for am in self.ams{
am.exec().await
}
}
}
There are two flavors of active message groups discussed in the following sections:
§Generic Active Message Groups
The first Active Message Group is called AmGroup which can include any AM AM: impl LamellarAm<Output=()>
.
That is, the active messages in the group can consists of different underlying types as long as they all return ()
.
Future implementations will relax this restriction, so that they only need to return the same type.
use lamellar::active_messaging::prelude::*;
#[AmData(Debug,Clone)]
struct Am1 {
foo: usize,
}
#[lamellar::am]
impl LamellarAm for RingAm{
async fn exec(self) -> Vec<usize>{
println!("in am1 {:?} on PE{:?}",self.foo, lamellar::current_pe);
}
}
#[AmData(Debug,Clone)]
struct Am2 {
bar: String,
}
#[lamellar::am]
impl LamellarAm for RingAm{
async fn exec(self) -> Vec<usize>{
println!("in am2 {:?} on PE{:?}",self.bar,lamellar::current_pe);
}
}
fn main(){
let world = lamellar::LamellarWorldBuilder::new().build();
let my_pe = world.my_pe();
let num_pes = world.num_pes();
let am1 = Am1{foo: 1};
let am2 = Am2{bar: "hello".to_string()};
//create a new AMGroup
let am_group = AMGroup::new(&world);
// add the AMs to the group
// we can specify individual PEs to execute on or all PEs
am_group.add_am_pe(0,am1.clone());
am_group.add_am_pe(1,am1.clone());
am_group.add_am_pe(0,am2.clone());
am_group.add_am_pe(1,am2.clone());
am_group.add_am_all(am1.clone());
am_group.add_am_all(am2.clone());
//execute and await the completion of all AMs in the group
world.block_on(am_group.exec());
}
Expected output on each PE:
in am1 1 on PE0
in am2 hello on PE0
in am1 1 on PE0
in am2 hello on PE0
in am1 1 on PE1
in am2 hello on PE1
in am1 1 on PE1
in am2 hello on PE1
§Typed Active Message Groups
The second Active Message Group is called TypedAmGroup
which can only include AMs of a specific type (but this type can return data).
Data is returned in the same order as the AMs were added
(You can think of this as similar to Vec<T>
)
Typed Am Groups are instantiated using the typed_am_group macro which expects two parameters, the first being the type (name) of the AM and the second being a reference to a lamellar team.
use lamellar::active_messaging::prelude::*;
use lamellar::darc::prelude::*;
use std::sync::atomic::AtomicUsize;
#[AmData(Debug,Clone)]
struct ExampleAm {
cnt: Darc<AtomicUsize>,
}
#[lamellar::am]
impl LamellarAm for ExampleAm{
async fn exec(self) -> usize{
self.cnt.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
fn main(){
let world = lamellar::LamellarWorldBuilder::new().build();
let my_pe = world.my_pe();
let num_pes = world.num_pes();
if my_pe == 0 { // we only want to run this on PE0 for sake of illustration
let am_group = typed_am_group!{ExampleAm,&world};
let am = ExampleAm{cnt: 0};
// add the AMs to the group
// we can specify individual PEs to execute on or all PEs
am_group.add_am_pe(0,am.clone());
am_group.add_am_all(am.clone());
am_group.add_am_pe(1,am.clone());
am_group.add_am_all(am.clone());
//execute and await the completion of all AMs in the group
let results = world.block_on(am_group.exec()); // we want to process the returned data
//we can index into the results
if let AmGroupResult::Pe((pe,val)) = results.at(2){
assert_eq!(pe, 1); //the third add_am_* call in the group was to execute on PE1
assert_eq!(val, 1); // this was the second am to execute on PE1 so the fetched value is 1
}
//or we can iterate over the results
for res in results{
match res{
AmGroupResult::Pe((pe,val)) => { println!("{} from PE{}",val,pe)},
AmGroupResult::All(val) => { println!("{} on all PEs",val)},
}
}
}
}
Expected output on each PE1:
0 from PE0
[1,0] on all PEs
1 from PE1
[2,2] on all PEs
§Static Members
In the above code, the ExampleAm
stuct contains a member that is a Darc (Distributed Arc).
In order to properly calculate distributed reference counts Darcs implements specialized Serialize and Deserialize operations.
While, the cost to any single serialization/deserialization operation is small, doing this for every active message containing
a Darc can become expensive.
In certain cases Typed Am Groups can avoid the repeated serialization/deserialization of Darc members if the user guarantees that every Active Message in the group is using a reference to the same Darc. In this case, we simply would only need to serialize the Darc once for each PE it gets sent to.
This can be accomplished by using the AmData attribute macro with the static
keyword passed in as an argument as illustrated below:
use lamellar::active_messaging::prelude::*;
use lamellar::darc::prelude::*;
use std::sync::atomic::AtomicUsize;
#[AmData(Debug,Clone)]
struct ExampleAm {
#[AmData(static)]
cnt: Darc<AtomicUsize>,
}
Other than the addition of #[AmData(static)]
the rest of the code as the previous example would be the same.
Macros§
- This macro is used to construct an am group of a single am type. The macro used to create an new instance of a
TypedAmGroup
which is an Active Message Group that can only include AMs of a specific type (but this type can return data). Data is returned in the same order as the AMs were added (You can think of this as similar toVec<T>
) This macro which expects two parameters, the first being the type (name) of the AM and the second being a reference to a lamellar team.
Traits§
- The interface for launching, executing, and managing Lamellar Active Messages .
- Supertrait specifying a Type can be used in (remote)ActiveMessages
- The trait representing an active message that can be executed remotely (AmDist is a blanket impl for serde::Serialize + serde::Deserialize + Sync + Send + ’static)
- The trait representing an active message that can only be executed locally, i.e. from the PE that initiated it (SyncSend is a blanket impl for Sync + Send)
- Supertrait specifying
serde::ser::Serialize
+serde::de::DeserializeOwned
- Supertrait specifying
Sync
+Send
Attribute Macros§
- This macro is used to setup the attributed type so that it can be used within remote active messages.
- This macro is used to setup the attributed type so that it can be used within local active messages.
- This macro is used to associate an implemenation of LamellarAM for type that has used the AmData attribute macro
- This macro is used to associate an implemenation of LamellarAM for a data structure that has used the AmLocalData attribute macro