Expand description
Fama provides a series of functionalities that makes it easy to layout the steps require to accomplish a task. Each step in the process is refer to as a pipe. The data or content is passed through the “pipes”. At any stage the flow can be stopped.
This pattern is usually refer to as the “pipeline” pattern. It is very similar to the middleware pattern.
This implementation remove the responsibility of the current “pipe” calling the next pipe like in the middleware patten. A “pipe” can apply it’s changes/logic or stop the flow. It is the “pipeline” that initiates the next call.
The following example is illustrating a “New User” flow through the pipeline.
#[tokio::main]
async fn main() {
// A new user instance is being passed through the pipeline
let new_user = fama::Pipeline::pass(NewUser::default()) // The input/content
.through(ValidateUserName).await // pipe "ValidateUserName" will stop the flow if the user does not have a "username"
.through(GenerateUserId).await // pipe "GenerateUserId" generates and set the user ID.
.through(ApplyDefaultRole).await // pipe "ApplyDefaultRole" will give the user the "Basic" role if the list of roles is empty
.through(SaveNewUserData).await // pipe "SaveNewUserData" saves the data to the database. At this stage, we know all is well
.through_fn(|_pipe: fama::PipeContent| async {
println!("yep, you can pass a closure or function too");
}).await
.deliver(); // starts the process or use
// .confirm() // Return true when the content passes throug all the pipes
// Fails because "new user" does not have a "username"
println!("fails validation: {:#?}", &new_user);
println!("----------------------------------------");
let new_user2 = fama::Pipeline::pass(NewUser {
username: Some("user1".into()), // "new user" has a username
..NewUser::default()
})
.through(ValidateUserName).await
.through(GenerateUserId).await
.through(ApplyDefaultRole).await
.through(SaveNewUserData).await
.deliver();
println!(
"passes validation and all fields are set : {:#?}",
&new_user2
);
}
// The content for the pipeline input. Can be any type
#[derive(Debug, Clone)]
struct NewUser {
internal_id: i32,
id: Option<String>,
username: Option<String>,
role: Option<Vec<UserRole>>,
}
impl Default for NewUser {
fn default() -> Self {
Self {
internal_id: 0,
id: None,
username: None,
role: None,
}
}
}
// making the pipe input type injectable
#[fama::async_trait]
impl busybody::Injectable for NewUser {
async fn inject(c: &busybody::ServiceContainer) -> Self {
// get the instance of the type in the current scope or
// create a new instance
c.get_type().unwrap_or_else(|| Self::default())
}
}
// The various roles a user can have
#[derive(Debug, Clone)]
enum UserRole {
Admin,
ContentCreator,
Moderator,
Basic,
}
struct ValidateUserName;
// A struct becomes a pipe when it implements "fama::FamaPipe"
#[fama::async_trait]
impl fama::FamaPipe<(NewUser, fama::PipeContent), ()> for ValidateUserName {
// The only requirement is to implement "receive_pipe_content" method
async fn receive_pipe_content(&self, (new_user, mut content): (NewUser, fama::PipeContent)) {
if new_user.username.is_none() {
println!("User name cannot be empty");
content.stop_the_flow(); // Stop the pipeline flow. Pipes below this pipe will not get call
}
}
}
struct GenerateUserId;
#[fama::async_trait]
impl fama::FamaPipe<(NewUser, fama::PipeContent), Option<fama::PipeContent>> for GenerateUserId {
async fn receive_pipe_content(&self, (mut new_user, content): (NewUser,fama::PipeContent)) -> Option<fama::PipeContent> {
if new_user.id.is_none() {
new_user.id = Some(uuid::Uuid::new_v4().to_string()); // Generate and set the ID
content.store(new_user);
}
None
}
}
struct ApplyDefaultRole;
#[fama::async_trait]
impl fama::FamaPipe<(NewUser, fama::PipeContent), Option<fama::PipeContent>> for ApplyDefaultRole {
async fn receive_pipe_content(&self, (mut new_user, content): (NewUser,fama::PipeContent)) -> Option<fama::PipeContent> {
if new_user.role.is_none() {
new_user.role = Some(vec![UserRole::Basic]); // Apply default role
content.store(new_user);
}
Some(content)
}
}
struct SaveNewUserData;
#[fama::async_trait]
impl fama::FamaPipe<(NewUser, fama::PipeContent), Option<fama::PipeContent>> for SaveNewUserData {
async fn receive_pipe_content(&self, (mut new_user, content): (NewUser,fama::PipeContent)) -> Option<fama::PipeContent> {
println!(">> saving new user: {:?}", &new_user);
new_user.internal_id = 1; // pretend we persisted the data to a database
content.store(new_user);
Some(content)
}
}
Re-exports§
pub use busybody;
Structs§
- The pipes manager
- PipelineBuilder provides flexibility and extendibility to your pipelines