Expand description

Active Messages are a computing model where messages contain both data (that you want to compute something with) and metadata that tells the message how to process its data when it arrives at its destination, e.g. a function pointer. The Wikipedia Page https://en.wikipedia.org/wiki/Active_message provides a short overview.

Lamellar is built upon asynchronous active messages, and provides users with an interface to construct their own active messages.

This interface is exposed through multiple Rust procedural macros and APIS.

Further details are provided in the documentation for each macro but at a high level to implement an active message we need to define the data to be transfered in a message and then define what to do with that data when we arrive at the destination.

The following examples will cover the following topics

  • Construncting your first Active Message
  • Lamellar AM DSL
  • Lamellar AM return types
    • returning plain old data
    • returning active messages
    • returning active messages that reutrn data
  • Nested Active Messages
  • Active Message Groups
    • Generic Active Message Groups
    • ‘Typed’ Active Message Groups
      • static members

§Examples

Let’s implement a simple active message below:

First lets define the data we would like to transfer

#[derive(Debug,Clone)]
 struct HelloWorld {
    original_pe: usize, //this will contain the ID of the PE this data originated from
 }

This looks like a pretty normal (if simple) struct, we next have to let the runtime know we would like this data to be used in an active message, so we need to apply the AmGroup macro, this is done by replacing the derive macro:

 use lamellar::active_messaging::prelude::*;
 #[AmData(Debug,Clone)]
 struct HelloWorld {
    original_pe: usize, //this will contain the ID of the PE this data originated from
 }

This change allows the compiler to implement the proper traits (related to Serialization and Deserialization) that will let this data type be used in an active message.

Next we now need to define the processing that we would like to take place when a message arrives at another PE

For this we use the am macro on an implementation of the LamellarAM trait

 #[lamellar::am]
 impl LamellarAM for HelloWorld {
     async fn exec(self) {
         println!(
             "Hello World, I'm from PE {:?}",
             self.original_pe,
         );
     }
 }

The am macro parses the provided implementation and performs a number of transformations to the code to enable execution of the active message. This macro is responsible for generating the code which will perform serialization/deserialization of both the active message data and any returned data.

Each active message implementation is assigned a unique ID at runtime initialization, these IDs are then used as the key to a Map containing specialized deserialization functions that convert a slice of bytes into the appropriate data type on the remote PE.

The final step is to actually launch an active message and await its result

 fn main(){
     let world = lamellar::LamellarWorldBuilder::new().build();
     let my_pe = world.my_pe();
     //Send a Hello World Active Message to all pes
     let request = world.exec_am_all(
         HelloWorld {
             original_pe: my_pe,
         }
     );
     //wait for the request to complete
     world.block_on(request);
 }

In this example we simply send a HelloWorld from every PE to every other PE using exec_am_all (please see the ActiveMessaging trait documentation for further details). exec_am_all returns a Future which we can use to await the completion of our operation.

Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)

 Hello World, I'm from PE 0
 Hello World, I'm from PE 1
 Hello World, I'm from PE 0
 Hello World, I'm from PE 1

What if we wanted to actuall know where we are currently executing?

§Lamellar AM DSL

This lamellar am macro also parses the provided code block for the presence of keywords from a small DSL, specifically searching for the following token streams:

  • lamellar::current_pe - return the world id of the PE this active message is executing on
  • lamellar::num_pes - return the number of PEs in the world
  • lamellar::world - return a reference to the instantiated LamellarWorld
  • lamellar::team - return a reference to the LamellarTeam responsible for launching this AM

Given this functionality, we can adapt the above active message body to this:

 #[lamellar::am]
 impl LamellarAM for HelloWorld {
     async fn exec(self) {
         println!(
             "Hello World on PE {:?} of {:?}, I'm from PE {:?}",
             lamellar::current_pe,
             lamellar::num_pes,
             self.original_pe,
         );
     }
 }

the new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)

 Hello World on PE 0 of 2, I'm from PE 0
 Hello World on PE 0 of 2, I'm from PE 1
 Hello World on PE 1 of 2, I'm from PE 0
 Hello World on PE 1 of 2, I'm from PE 1

§Active Messages with return data

In the above examples, we simply launched a remote active message but did not return an result back to the originating PE. Lamellar supports return both “plain old data”(as long as it impls AmDist) and other active messages themselves.

§Returning normal data

Lamellar Active Messages support returning data and it is as simple as specifying the return type in the implementation of the exec function.

 #[lamellar::am]
 impl LamellarAM for HelloWorld {
     async fn exec(self) -> usize { //specify we are returning a usize
         println!(
             "Hello World on PE {:?} of {:?}, I'm from PE {:?}",
             lamellar::current_pe,
             lamellar::num_pes,
             self.original_pe,
         );
         lamellar::current_pe
     }
 }

Retrieving the result is as simple as assigning a variable to the awaited request

 fn main(){
     let world = lamellar::LamellarWorldBuilder::new().build();
     let my_pe = world.my_pe();
     //Send a Hello World Active Message to all pes
     let request = world.exec_am_all(
         HelloWorld {
             original_pe: my_pe,
         }
     );
     //wait for the request to complete
     let results = world.block_on(request);
     println!("PE {my_pe} {results:?}");
 }

The new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)

 Hello World on PE 0 of 2, I'm from PE 0
 Hello World on PE 0 of 2, I'm from PE 1
 Hello World on PE 1 of 2, I'm from PE 0
 Hello World on PE 1 of 2, I'm from PE 1
 PE 0 [0,1]
 PE 1 [0,1]

§Returning Active Messages

Lamellar also provides the ability to return another active message as a result.

This active message will execute automatically when it arrives back at the originating node as intended as a sort of callback mechanism.

Returning an active messages requires a few more changes to our code. First we will define our new active message

 #[AmData(Debug,Clone)]
 struct ReturnAm{
     original_pe: usize,
     remote_pe: usize,
 }

 #[lamellar::am]
 impl LamellarAm for ReturnAm{
     async fn exec(self) {
         println!("initiated on PE {} visited PE {} finishing on PE {}",self.original_pe,self.remote_pe,lamellar::current_pe);
     }
 }

With that defined we can now modify our original Active Message to return this new ReturnAm type. The main change is that we need to explicitly tell the macro we are returning an active message and we provide the name of the active message we are returning

 #[lamellar::am(return_am = "ReturnAm")] //we explicitly tell the macro we are returning an AM
 impl LamellarAM for HelloWorld {
     async fn exec(self) -> usize { //specify we are returning a usize
         println!(
             "Hello World on PE {:?} of {:?}, I'm from PE {:?}",
             lamellar::current_pe,
             lamellar::num_pes,
             self.original_pe,
         );
         ReturnAm{ //simply return an instance of the AM
             original_pe: self.original_pe,
             remote_pe: lamellar::current_pe,
         }
     }
 }

We do not need to modify any of the code in our main function, so the new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)

 Hello World on PE 0 of 2, I'm from PE 0
 Hello World on PE 0 of 2, I'm from PE 1
 Hello World on PE 1 of 2, I'm from PE 0
 Hello World on PE 1 of 2, I'm from PE 1
 initiated on PE 0 visited PE 0 finishing on PE 0
 initiated on PE 0 visited PE 1 finishing on PE 0
 initiated on PE 1 visited PE 0 finishing on PE 1
 initiated on PE 1 visited PE 0 finishing on PE 1
 PE 0 [(),()]
 PE 1 [(),()]

By examining the above output we can see that printing the results of the request returns the unit type (well a Vector of unit types because of the exec_am_all call). This is because our returned AM does not return any data itself.

§Returning Active Messages which return data

Lamellar does support returning an Active Message which then returns some data. First we need to update ReturnAm to actually return some data


 #[lamellar::am]
 impl LamellarAm for ReturnAm{
     async fn exec(self) -> (usize,usize) {
         println!("initiated on PE {} visited PE {} finishing on PE {}",self.original_pe,self.remote_pe,lamellar::current_pe);
         (self.original_pe,self.remote_pe)
     }
 }

Next we need to make an additional change to the HelloWorld am to specify that our returned am will return data itself. we do this in the argument to the am procedural macro


 #[lamellar::am(return_am = "ReturnAm -> (usize,usize)")] //we explicitly tell the macro we are returning an AM which itself returns data
 impl LamellarAM for HelloWorld {
     async fn exec(self) -> usize { //specify we are returning a usize
         println!(
             "Hello World on PE {:?} of {:?}, I'm from PE {:?}",
             lamellar::current_pe,
             lamellar::num_pes,
             self.original_pe,
         );
         ReturnAm{ //simply return an instance of the AM
             original_pe: self.original_pe,
             remote_pe: lamellar::current_pe,
         }
     }
 }

With those changes, the new Sample output for the above example on a 2 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)

 Hello World on PE 0 of 2, I'm from PE 0
 Hello World on PE 0 of 2, I'm from PE 1
 Hello World on PE 1 of 2, I'm from PE 0
 Hello World on PE 1 of 2, I'm from PE 1
 initiated on PE 0 visited PE 0 finishing on PE 0
 initiated on PE 0 visited PE 1 finishing on PE 0
 initiated on PE 1 visited PE 0 finishing on PE 1
 initiated on PE 1 visited PE 0 finishing on PE 1
 PE 0 [(0,0),(0,1)]
 PE 1 [(1,0),(1,1)]

§Nested Active Messages

Lamellar Active Messages support nested active messages, i.e launching a new active message from within an executing active message.

This functionality can be used to setup active message dependencies, enable recursive active messages, ect. In the following example we will construct a recursive active message that performs a ring like commincation pattern accross PEs, which will return the reverse order in which it vistited the PE’s.

 use lamellar::active_messaging::prelude::*;
 #[AmData(Debug,Clone)]
 struct RingAm {
    original_pe: usize, //this will be are recursion terminating condition
 }
 #[lamellar::am]
 impl LamellarAm for RingAm{
     async fn exec(self) -> Vec<usize>{
         let cur_pe = lamellar::current_pe;
         if self.original_pe ==  cur_pe{ //terminate the recursion!
             vec![cur_pe] //return a new path with the current_pe as the start
         }
         else { //launch another active message
             let next_pe = (cur_pe + 1 ) % lamellar::num_pes; //account for wrap arround
             let req = lamellar::team.exec_am_pe(next_pe, RingAm{original_pe: self.original_pe});//we can clone self because we don't need to modify any data
             let mut path = req.await; // exec_am_*() calls return a future we used to get the result from
             path.push(cur_pe); //update the path with the PE and return
             path
         }
     }
 }

 fn main(){
     let world = lamellar::LamellarWorldBuilder::new().build();
     let my_pe = world.my_pe();
     let num_pes = world.num_pes();
     //Send initial message to right neighbor
     let next_pe = (my_pe + 1) % num_pes; //account for wrap arround
     let request = world.exec_am_pe(
         next_pe,
         RingAm {
             original_pe: my_pe
         }
     );
     //wait for the request to complete
     let results = world.block_on(request);
     println!("PE {my_pe} {results:?}");
 }

The key thing to notice in this example is how we wait for a request to finish will change depending on the context we are executing in. When we are in the active message we are already in an asychronous context so we can simply await the future returned to us by the exec_am_pe() call. This is in contrast to the main function where we must use a block_on call to drive the future an retrieve the result.

The sample output for the above example on a 4 PE system may look something like (exact ordering is nondeterministic due to asynchronous behavior)

 PE 0 [0,3,2,1]
 PE 1 [1,0,3,2]
 PE 2 [2,1,0,3]
 PE 3 [3,2,1,0]

§Active Message Groups

Up until now, we have seen two extremes with respect to the granularity with which active messages can be awaited. Either awaiting all outstanding active messages in the system via wait_all(), or awaiting an individual active message e.g. req.await. Lamellar also supports active message groups, which is a collection of active messages that can be awaited together. Conceptually, an active message group can be represented as a meta active message that contains a list of the actual active messages we want to execute, as illustrated in the pseudocode below:

#[AmData(Debug,Clone)]
struct MetaAm{
    ams: Vec<impl LamellarAm>
}
#[lamellar::am]
impl LamellarAm for MetaAm{
    async fn exec(self) {
        for am in self.ams{
            am.exec().await
        }
    }
}

There are two flavors of active message groups discussed in the following sections:

§Generic Active Message Groups

The first Active Message Group is called AmGroup which can include any AM AM: impl LamellarAm<Output=()>. That is, the active messages in the group can consists of different underlying types as long as they all return (). Future implementations will relax this restriction, so that they only need to return the same type.

use lamellar::active_messaging::prelude::*;
#[AmData(Debug,Clone)]
struct Am1 {
   foo: usize,
}
#[lamellar::am]
impl LamellarAm for RingAm{
    async fn exec(self) -> Vec<usize>{
        println!("in am1 {:?} on PE{:?}",self.foo,  lamellar::current_pe);
    }
}

#[AmData(Debug,Clone)]
struct Am2 {
   bar: String,
}
#[lamellar::am]
impl LamellarAm for RingAm{
    async fn exec(self) -> Vec<usize>{
        println!("in am2 {:?} on PE{:?}",self.bar,lamellar::current_pe);
    }
}

fn main(){
    let world = lamellar::LamellarWorldBuilder::new().build();
    let my_pe = world.my_pe();
    let num_pes = world.num_pes();

    let am1 = Am1{foo: 1};
    let am2 = Am2{bar: "hello".to_string()};
    //create a new AMGroup
    let am_group = AMGroup::new(&world);
    // add the AMs to the group
    // we can specify individual PEs to execute on or all PEs
    am_group.add_am_pe(0,am1.clone());
    am_group.add_am_pe(1,am1.clone());
    am_group.add_am_pe(0,am2.clone());
    am_group.add_am_pe(1,am2.clone());
    am_group.add_am_all(am1.clone());
    am_group.add_am_all(am2.clone());

    //execute and await the completion of all AMs in the group
    world.block_on(am_group.exec());
}

Expected output on each PE:

in am1 1 on PE0
in am2 hello on PE0
in am1 1 on PE0
in am2 hello on PE0
in am1 1 on PE1
in am2 hello on PE1
in am1 1 on PE1
in am2 hello on PE1

§Typed Active Message Groups

The second Active Message Group is called TypedAmGroup which can only include AMs of a specific type (but this type can return data). Data is returned in the same order as the AMs were added (You can think of this as similar to Vec<T>) Typed Am Groups are instantiated using the typed_am_group macro which expects two parameters, the first being the type (name) of the AM and the second being a reference to a lamellar team.

use lamellar::active_messaging::prelude::*;
use lamellar::darc::prelude::*;
use std::sync::atomic::AtomicUsize;
#[AmData(Debug,Clone)]
struct ExampleAm {
   cnt: Darc<AtomicUsize>,
}
#[lamellar::am]
impl LamellarAm for ExampleAm{
    async fn exec(self) -> usize{
        self.cnt.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    }
}

fn main(){
    let world = lamellar::LamellarWorldBuilder::new().build();
    let my_pe = world.my_pe();
    let num_pes = world.num_pes();

    if my_pe == 0 { // we only want to run this on PE0 for sake of illustration
        let am_group = typed_am_group!{ExampleAm,&world};
        let am = ExampleAm{cnt: 0};
        // add the AMs to the group
        // we can specify individual PEs to execute on or all PEs
        am_group.add_am_pe(0,am.clone());
        am_group.add_am_all(am.clone());
        am_group.add_am_pe(1,am.clone());
        am_group.add_am_all(am.clone());

        //execute and await the completion of all AMs in the group
        let results = world.block_on(am_group.exec()); // we want to process the returned data
        //we can index into the results
        if let AmGroupResult::Pe((pe,val)) = results.at(2){
            assert_eq!(pe, 1); //the third add_am_* call in the group was to execute on PE1
            assert_eq!(val, 1); // this was the second am to execute on PE1 so the fetched value is 1
        }
        //or we can iterate over the results
        for res in results{
            match res{
                AmGroupResult::Pe((pe,val)) => { println!("{} from PE{}",val,pe)},
                AmGroupResult::All(val) => { println!("{} on all PEs",val)},
            }
        }
    }
}

Expected output on each PE1:

0 from PE0
[1,0] on all PEs
1 from PE1
[2,2] on all PEs

§Static Members

In the above code, the ExampleAm stuct contains a member that is a Darc (Distributed Arc). In order to properly calculate distributed reference counts Darcs implements specialized Serialize and Deserialize operations. While, the cost to any single serialization/deserialization operation is small, doing this for every active message containing a Darc can become expensive.

In certain cases Typed Am Groups can avoid the repeated serialization/deserialization of Darc members if the user guarantees that every Active Message in the group is using a reference to the same Darc. In this case, we simply would only need to serialize the Darc once for each PE it gets sent to.

This can be accomplished by using the AmData attribute macro with the static keyword passed in as an argument as illustrated below:

use lamellar::active_messaging::prelude::*;
use lamellar::darc::prelude::*;
use std::sync::atomic::AtomicUsize;
#[AmData(Debug,Clone)]
struct ExampleAm {
   #[AmData(static)]
   cnt: Darc<AtomicUsize>,
}

Other than the addition of #[AmData(static)] the rest of the code as the previous example would be the same.

Macros§

  • This macro is used to construct an am group of a single am type. The macro used to create an new instance of a TypedAmGroup which is an Active Message Group that can only include AMs of a specific type (but this type can return data). Data is returned in the same order as the AMs were added (You can think of this as similar to Vec<T>) This macro which expects two parameters, the first being the type (name) of the AM and the second being a reference to a lamellar team.

Traits§

  • The interface for launching, executing, and managing Lamellar Active Messages .
  • Supertrait specifying a Type can be used in (remote)ActiveMessages
  • The trait representing an active message that can be executed remotely (AmDist is a blanket impl for serde::Serialize + serde::Deserialize + Sync + Send + ’static)
  • The trait representing an active message that can only be executed locally, i.e. from the PE that initiated it (SyncSend is a blanket impl for Sync + Send)
  • Supertrait specifying serde::ser::Serialize + serde::de::DeserializeOwned
  • Supertrait specifying Sync + Send

Attribute Macros§

  • This macro is used to setup the attributed type so that it can be used within remote active messages.
  • This macro is used to setup the attributed type so that it can be used within local active messages.
  • This macro is used to associate an implemenation of LamellarAM for type that has used the AmData attribute macro
  • This macro is used to associate an implemenation of LamellarAM for a data structure that has used the AmLocalData attribute macro