lamellar/
lib.rs

1#![warn(missing_docs)]
2#![warn(unreachable_pub)]
3#![doc(test(attr(deny(unused_must_use))))]
4//! Lamellar is an investigation of the applicability of the Rust systems programming language for HPC as an alternative to C and C++, with a focus on PGAS approaches.
5//!
6//! # Some Nomenclature
7//! Throughout this documentation and APIs there are a few terms we end up reusing a lot, those terms and brief descriptions are provided below:
8//! - `PE` - a processing element, typically a multi threaded process, for those familiar with MPI, it corresponds to a Rank.
9//!     - Commonly you will create 1 PE per psychical CPU socket on your system, but it is just as valid to have multiple PE's per CPU
10//!     - There may be some instances where `Node` (meaning a compute node) is used instead of `PE` in these cases they are interchangeable
11//! - `World` - an abstraction representing your distributed computing system
12//!     - consists of N PEs all capable of communicating with one another
13//! - `Team` - A subset of the PEs that exist in the world
14//! - `AM` - short for [Active Message][crate::active_messaging]
15//! - `Collective Operation` - Generally means that all PEs (associated with a given distributed object) must explicitly participate in the operation, otherwise deadlock will occur.
16//!     - e.g. barriers, construction of new distributed objects
17//! - `One-sided Operation` - Generally means that only the calling PE is required for the operation to successfully complete.
18//!     - e.g. accessing local data, waiting for local work to complete
19//!
20//! # Features
21//!
22//! Lamellar provides several different communication patterns and programming models to distributed applications, briefly highlighted below
23//! ## Active Messages
24//! Lamellar allows for sending and executing user defined active messages on remote PEs in a distributed environment.
25//! User first implement runtime exported trait (LamellarAM) for their data structures and then call a procedural macro [#\[lamellar::am\]][crate::active_messaging::am] on the implementation.
26//! The procedural macro produces all the necessary code to enable remote execution of the active message.
27//! More details can be found in the [Active Messaging][crate::active_messaging] module documentation.
28//!
29//! ## Darcs (Distributed Arcs)
30//! Lamellar provides a distributed extension of an [`Arc`][std::sync::Arc] called a [Darc][crate::darc].
31//! Darcs provide safe shared access to inner objects in a distributed environment, ensuring lifetimes and read/write accesses are enforced properly.
32//! More details can be found in the [Darc][crate::darc] module documentation.
33//!
34//! ## PGAS abstractions
35//!
36//! Lamellar also provides PGAS capabilities through multiple interfaces.
37//!
38//! ### LamellarArrays (Distributed Arrays)
39//!
40//! The first is a high-level abstraction of distributed arrays, allowing for distributed iteration and data parallel processing of elements.
41//! More details can be found in the [LamellarArray][crate::array] module documentation.
42//!
43//! ### Low-level Memory Regions
44//!
45//! The second is a low level (unsafe) interface for constructing memory regions which are readable and writable from remote PEs.
46//! Note that unless you are very comfortable/confident in low level distributed memory (and even then) it is highly recommended you use the LamellarArrays interface
47//! More details can be found in the [Memory Region][crate::memregion] module documentation.
48//!
49//! # Network Backends
50//!
51//! Lamellar relies on network providers called Lamellae to perform the transfer of data throughout the system.
52//! Currently three such Lamellae exist:
53//! - `local` -  used for single-PE (single system, single process) development (this is the default),
54//! - `shmem` -  used for multi-PE (single system, multi-process) development, useful for emulating distributed environments (communicates through shared memory)
55//! - `rofi` - used for multi-PE (multi system, multi-process) distributed development, based on the Rust OpenFabrics Interface Transport Layer (ROFI) (<https://github.com/pnnl/rofi>).
56//!     - By default support for Rofi is disabled as using it relies on both the Rofi C-library and the libfabrics library, which may not be installed on your system.
57//!     - It can be enabled by adding ```features = ["enable-rofi"] or `features = ["enable-rofi-shared"]``` to the lamellar entry in your `Cargo.toml` file
58//!
59//! The long term goal for lamellar is that you can develop using the `local` backend and then when you are ready to run distributed switch to the `rofi` backend with no changes to your code.
60//! Currently the inverse is true, if it compiles and runs using `rofi` it will compile and run when using `local` and `shmem` with no changes.
61//!
62//! Additional information on using each of the lamellae backends can be found below in the `Running Lamellar Applications` section
63//!
64//! Environment Variables
65//! ---------------------
66//! Lamellar has a number of environment variables that can be used to configure the runtime.
67//! please see the [Environment Variables][crate::env_var] module documentation for more details
68//!
69//! Examples
70//! --------
71//! Our repository also provides numerous examples highlighting various features of the runtime: <https://github.com/pnnl/lamellar-runtime/tree/master/examples>
72//!
73//! Additionally, we are compiling a set of benchmarks (some with multiple implementations) that may be helpful to look at as well: <https://github.com/pnnl/lamellar-benchmarks/>
74//!
75//! Below are a few small examples highlighting some of the features of lamellar, more in-depth examples can be found in the documentation for the various features.
76//! # Selecting a Lamellae and constructing a lamellar world instance
77//! You can select which backend to use at runtime as shown below:
78//! ```
79//! use lamellar::Backend;
80//! fn main(){
81//!  let mut world = lamellar::LamellarWorldBuilder::new()
82//!         .with_lamellae( Default::default() ) //if "enable-rofi" feature is active default is rofi, otherwise  default is `Local`
83//!         //.with_lamellae( Backend::Rofi ) //explicity set the lamellae backend to rofi,
84//!         //.with_lamellae( Backend::Local ) //explicity set the lamellae backend to local
85//!         //.with_lamellae( Backend::Shmem ) //explicity set the lamellae backend to use shared memory
86//!         .build();
87//! }
88//! ```
89//! or by setting the following envrionment variable:
90//!```LAMELLAE_BACKEND="lamellae"``` where lamellae is one of `local`, `shmem`, or `rofi`.
91//!
92//! # Creating and executing a Registered Active Message
93//! Please refer to the [Active Messaging][crate::active_messaging] documentation for more details and examples
94//! ```
95//! use lamellar::active_messaging::prelude::*;
96//!
97//! #[AmData(Debug, Clone)] // `AmData` is a macro used in place of `derive`
98//! struct HelloWorld { //the "input data" we are sending with our active message
99//!     my_pe: usize, // "pe" is processing element == a node
100//! }
101//!
102//! #[lamellar::am] // at a highlevel registers this LamellarAM implemenatation with the runtime for remote execution
103//! impl LamellarAM for HelloWorld {
104//!     async fn exec(&self) {
105//!         println!(
106//!             "Hello pe {:?} of {:?}, I'm pe {:?}",
107//!             lamellar::current_pe,
108//!             lamellar::num_pes,
109//!             self.my_pe
110//!         );
111//!     }
112//! }
113//!
114//! fn main(){
115//!     let mut world = lamellar::LamellarWorldBuilder::new().build();
116//!     let my_pe = world.my_pe();
117//!     let num_pes = world.num_pes();
118//!     let am = HelloWorld { my_pe: my_pe };
119//!     for pe in 0..num_pes{
120//!         let _ = world.exec_am_pe(pe,am.clone()).spawn(); // explicitly launch on each PE
121//!     }
122//!     world.wait_all(); // wait for all active messages to finish
123//!     world.barrier();  // synchronize with other PEs
124//!     let request = world.exec_am_all(am.clone()); //also possible to execute on every PE with a single call
125//!     request.block(); //both exec_am_all and exec_am_pe return futures that can be used to wait for completion and access any returned result
126//! }
127//! ```
128//!
129//! # Creating, initializing, and iterating through a distributed array
130//! Please refer to the [LamellarArray][crate::array] documentation for more details and examples
131//! ```
132//! use lamellar::array::prelude::*;
133//!
134//! fn main(){
135//!     let world = lamellar::LamellarWorldBuilder::new().build();
136//!     let my_pe = world.my_pe();
137//!     let block_array = AtomicArray::<usize>::new(&world, 1000, Distribution::Block).block(); //we also support Cyclic distribution.
138//!     let _ =block_array.dist_iter_mut().enumerate().for_each(move |(i,elem)| elem.store(i)).block(); //simultaneosuly initialize array accross all PEs, each pe only updates its local data
139//!     block_array.barrier();
140//!     if my_pe == 0{
141//!         for (i,elem) in block_array.onesided_iter().into_iter().enumerate(){ //iterate through entire array on pe 0 (automatically transfering remote data)
142//!             println!("i: {} = {})",i,elem);
143//!         }
144//!     }
145//! }
146//! ```
147//!
148//! # Utilizing a Darc within an active message
149//! Please refer to the [Darc][crate::darc] documentation for more details and examples
150//!```
151//! use lamellar::active_messaging::prelude::*;
152//! use lamellar::darc::prelude::*;
153//! use std::sync::atomic::{AtomicUsize,Ordering};
154//!
155//! #[AmData(Debug, Clone)] // `AmData` is a macro used in place of `derive`
156//! struct DarcAm { //the "input data" we are sending with our active message
157//!     cnt: Darc<AtomicUsize>, // count how many times each PE executes an active message
158//! }
159//!
160//! #[lamellar::am] // at a highlevel registers this LamellarAM implemenatation with the runtime for remote execution
161//! impl LamellarAM for DarcAm {
162//!     async fn exec(&self) {
163//!         self.cnt.fetch_add(1,Ordering::SeqCst);
164//!     }
165//! }
166//!
167//! fn main(){
168//!     let mut world = lamellar::LamellarWorldBuilder::new().build();
169//!     let my_pe = world.my_pe();
170//!     let num_pes = world.num_pes();
171//!     let cnt = Darc::new(&world, AtomicUsize::new(0)).block().expect("Current PE is in world team");
172//!     for pe in 0..num_pes{
173//!         let _ = world.exec_am_pe(pe,DarcAm{cnt: cnt.clone()}).spawn(); // explicitly launch on each PE
174//!     }
175//!     let _ = world.exec_am_all(DarcAm{cnt: cnt.clone()}).spawn(); //also possible to execute on every PE with a single call
176//!     cnt.fetch_add(1,Ordering::SeqCst); //this is valid as well!
177//!     world.wait_all(); // wait for all active messages to finish
178//!     world.barrier();  // synchronize with other PEs
179//!     assert_eq!(cnt.load(Ordering::SeqCst),num_pes*2 + 1);
180//! }
181//!```
182//! # Using Lamellar
183//! Lamellar is capable of running on single node workstations as well as distributed HPC systems.
184//! For a workstation, simply copy the following to the dependency section of you Cargo.toml file:
185//!
186//!``` lamellar = "0.7.0-rc.1" ```
187//!
188//! If planning to use within a distributed HPC system copy the following to your Cargo.toml file:
189//!
190//! ``` lamellar = { version = "0.7.0-rc.1", features = ["enable-rofi"]}```
191//!
192//! NOTE: as of Lamellar 0.6.1 It is no longer necessary to manually install Libfabric, the build process will now try to automatically build libfabric for you.
193//! If this process fails, it is still possible to pass in a manual libfabric installation via the OFI_DIR envrionment variable.
194//!
195//!
196//! For both environments, build your application as normal
197//!
198//! ```cargo build (--release)```
199//! # Running Lamellar Applications
200//! There are a number of ways to run Lamellar applications, mostly dictated by the lamellae you want to use.
201//! ## local (single-process, single system)
202//! 1. directly launch the executable
203//!     - ```cargo run --release```
204//! ## shmem (multi-process, single system)
205//! 1. grab the [lamellar_run.sh](https://github.com/pnnl/lamellar-runtime/blob/master/lamellar_run.sh)
206//! 2. Use `lamellar_run.sh` to launch your application
207//!     - ```./lamellar_run -N=2 -T=10 <appname>```
208//!         - `N` number of PEs (processes) to launch (Default=1)
209//!         - `T` number of threads Per PE (Default = number of cores/ number of PEs)
210//!         - assumes `<appname>` executable is located at `./target/release/<appname>`
211//! ## rofi (multi-process, multi-system)
212//! 1. allocate compute nodes on the cluster:
213//!     - ```salloc -N 2```
214//! 2. launch application using cluster launcher
215//!     - ```srun -N 2 -mpi=pmi2 ./target/release/<appname>```
216//!         - `pmi2` library is required to grab info about the allocated nodes and helps set up initial handshakes
217//!
218
219#[macro_use]
220extern crate lazy_static;
221#[macro_use]
222extern crate memoffset;
223//#[doc(hidden)]
224pub extern crate serde;
225//#[doc(hidden)]
226pub use serde::*;
227
228pub extern crate serde_bytes;
229//#[doc(hidden)]
230// pub use serde_bytes::*;
231
232// //#[doc(hidden)]
233pub extern crate serde_with;
234// pub use serde_with::*;
235
236// //#[doc(hidden)]
237// pub extern crate tracing;
238//#[doc(hidden)]
239pub use parking_lot;
240// //#[doc(hidden)]
241// pub use tracing::*;
242
243//#[doc(hidden)]
244pub use async_trait;
245
246//#[doc(hidden)]
247pub use futures_util;
248
249pub mod active_messaging;
250// //#[doc(hidden)]
251pub use active_messaging::prelude::*;
252pub mod array;
253// //#[doc(hidden)]
254pub use array::prelude::*;
255mod barrier;
256pub mod darc;
257// //#[doc(hidden)]
258pub use darc::prelude::*;
259mod lamellae;
260mod lamellar_alloc;
261mod lamellar_arch;
262pub mod lamellar_env;
263pub use lamellar_env::LamellarEnv;
264mod lamellar_request;
265mod lamellar_task_group;
266mod lamellar_team;
267mod lamellar_world;
268pub mod memregion;
269// //#[doc(hidden)]
270pub use memregion::prelude::*;
271mod scheduler;
272mod utils;
273//#[doc(hidden)]
274pub use utils::*;
275
276pub(crate) mod warnings;
277
278pub mod env_var;
279pub use env_var::config;
280
281pub use crate::lamellae::Backend;
282pub use crate::lamellar_arch::{BlockedArch, IdError, LamellarArch, StridedArch};
283// //#[doc(hidden)]
284pub use crate::lamellar_task_group::{
285    AmGroup, AmGroupResult, BaseAmGroupReq, LamellarTaskGroup, TypedAmGroupBatchReq,
286    TypedAmGroupBatchResult, TypedAmGroupResult,
287};
288pub use crate::lamellar_team::LamellarTeam;
289// //#[doc(hidden)]
290pub use crate::lamellar_team::ArcLamellarTeam;
291pub(crate) use crate::lamellar_team::LamellarTeamRT;
292pub use crate::lamellar_world::*;
293pub use crate::scheduler::ExecutorType;
294pub use crate::scheduler::LamellarTask;
295
296extern crate lamellar_impl;
297// //#[doc(hidden)]
298pub use lamellar_impl::Dist;
299// use lamellar_impl;
300
301//#[doc(hidden)]
302pub use inventory;
303
304//#[doc(hidden)]
305pub use bincode;
306use bincode::Options;
307
308// #[macro_use]
309// pub extern crate custom_derive;
310//#[doc(hidden)]
311pub use custom_derive;
312
313// #[macro_use]
314// pub extern crate newtype_derive;
315//#[doc(hidden)]
316pub use newtype_derive;
317
318lazy_static! {
319    pub(crate) static ref BINCODE: bincode::config::WithOtherTrailing<bincode::DefaultOptions, bincode::config::AllowTrailing> =
320        bincode::DefaultOptions::new().allow_trailing_bytes();
321}
322// use std::sync::atomic::AtomicUsize;
323// use std::sync::atomic::Ordering::SeqCst;
324// use std::sync::Arc;
325// lazy_static! {
326//     pub(crate) static ref SERIALIZE_TIMER: thread_local::ThreadLocal<Arc<AtomicUsize>> =
327//         thread_local::ThreadLocal::new();
328//     pub(crate) static ref DESERIALIZE_TIMER: thread_local::ThreadLocal<Arc<AtomicUsize>> =
329//         thread_local::ThreadLocal::new();
330//     pub(crate) static ref SERIALIZE_SIZE_TIMER: thread_local::ThreadLocal<Arc<AtomicUsize>> =
331//         thread_local::ThreadLocal::new();
332// }
333
334/// Wrapper function for serializing data
335pub fn serialize<T: ?Sized>(obj: &T, var: bool) -> Result<Vec<u8>, anyhow::Error>
336where
337    T: serde::Serialize,
338{
339    // let start = std::time::Instant::now();
340    let res = if var {
341        // Ok(BINCODE.serialize(obj)?)
342        Ok(bincode::serialize(obj)?)
343    } else {
344        Ok(bincode::serialize(obj)?)
345    };
346    // unsafe {
347    //     SERIALIZE_TIMER
348    //         .get_or(|| Arc::new(AtomicUsize::new(0)))
349    //         .fetch_add(start.elapsed().as_micros() as usize, SeqCst);
350    // }
351    res
352}
353
354/// Wrapper function for getting the size of serialized data
355pub fn serialized_size<T: ?Sized>(obj: &T, var: bool) -> usize
356where
357    T: serde::Serialize,
358{
359    // let start = std::time::Instant::now();
360    let res = if var {
361        // BINCODE.serialized_size(obj).unwrap() as usize
362        bincode::serialized_size(obj).unwrap() as usize
363    } else {
364        bincode::serialized_size(obj).unwrap() as usize
365    };
366    // unsafe {
367    //     SERIALIZE_SIZE_TIMER
368    //         .get_or(|| Arc::new(AtomicUsize::new(0)))
369    //         .fetch_add(start.elapsed().as_micros() as usize, SeqCst);
370    // }
371    res
372}
373
374/// Wrapper function for serializing an object into a buffer
375pub fn serialize_into<T: ?Sized>(buf: &mut [u8], obj: &T, var: bool) -> Result<(), anyhow::Error>
376where
377    T: serde::Serialize,
378{
379    // let start = std::time::Instant::now();
380    if var {
381        // BINCODE.serialize_into(buf, obj)?;
382        bincode::serialize_into(buf, obj)?;
383    } else {
384        bincode::serialize_into(buf, obj)?;
385    }
386    // unsafe {
387    //     SERIALIZE_TIMER
388    //         .get_or(|| Arc::new(AtomicUsize::new(0)))
389    //         .fetch_add(start.elapsed().as_micros() as usize, SeqCst);
390    // }
391    Ok(())
392}
393
394/// Wrapper function for deserializing data
395pub fn deserialize<'a, T>(bytes: &'a [u8], var: bool) -> Result<T, anyhow::Error>
396where
397    T: serde::Deserialize<'a>,
398{
399    // let start = std::time::Instant::now();
400    let res = if var {
401        // Ok(BINCODE.deserialize(bytes)?)
402        Ok(bincode::deserialize(bytes)?)
403    } else {
404        Ok(bincode::deserialize(bytes)?)
405    };
406    // unsafe {
407    //     DESERIALIZE_TIMER
408    //         .get_or(|| Arc::new(AtomicUsize::new(0)))
409    //         .fetch_add(start.elapsed().as_micros() as usize, SeqCst);
410    // }
411    res
412}
413//#[doc(hidden)]
414pub use async_std;