Skip to main content

fama/
lib.rs

1//! Fama provides a series of functionalities that makes it easy to layout the steps
2//! require to accomplish a task.
3//! Each step in the process is refer to as a pipe. The data or content is passed through
4//! the "pipes". At any stage the flow can be stopped.
5//!
6//! This pattern is usually refer to as the "pipeline" pattern. It is very similar to the
7//! middleware pattern.
8//!
9//! This implementation remove the responsibility of the current "pipe" calling the next pipe like
10//! in the middleware patten. A "pipe" can apply it's changes/logic or stop the flow. It is the "pipeline"
11//! that initiates the next call.
12//!
13//! The following example is illustrating a "New User" flow through the pipeline.
14//!
15//! ```rust
16//!#  #![allow(dead_code)]
17//!
18//! #[tokio::main]
19//! async fn main() {
20//!   // A new user instance is being passed through the pipeline
21//!   let new_user = fama::Pipeline::pass(NewUser::default()) // The input/content
22//!       .await
23//!       .through(ValidateUserName).await  // pipe "ValidateUserName" will stop the flow if the user does not have a "username"
24//!       .through(GenerateUserId).await    // pipe "GenerateUserId" generates and set the user ID.  
25//!       .through(ApplyDefaultRole).await  // pipe "ApplyDefaultRole" will give the user the "Basic" role if the list of roles is empty
26//!       .through(SaveNewUserData).await   // pipe "SaveNewUserData"  saves the data to the database. At this stage, we know all is well
27//!       .through_fn(|_pipe: fama::PipeContent| async {
28//!           println!("yep, you can pass a closure or function too");
29//!       }).await
30//!       .deliver().await;                       // starts the process or use
31//!       // .confirm()                     // Return true when the content passes throug all the pipes
32//!
33//!   // Fails because "new user" does not have a "username"
34//!   println!("fails validation: {:#?}", &new_user);
35//!
36//!   println!("----------------------------------------");
37//!
38//!   let new_user2 = fama::Pipeline::pass(NewUser {
39//!         username: Some("user1".into()),  // "new user" has a username
40//!         ..NewUser::default()
41//!     })
42//!     .await
43//!     .through(ValidateUserName).await
44//!     .through(GenerateUserId).await
45//!     .through(ApplyDefaultRole).await
46//!     .through(SaveNewUserData).await
47//!     .deliver().await;
48//!
49//!   println!(
50//!         "passes validation and all fields are set : {:#?}",
51//!         &new_user2
52//!    );
53//! }
54//!
55//! // The content for the pipeline input. Can be any type
56//! #[derive(Debug, Clone)]
57//! struct NewUser {
58//!   internal_id: i32,
59//!   id: Option<String>,
60//!   username: Option<String>,
61//!   role: Option<Vec<UserRole>>,
62//! }
63//!
64//! impl Default for NewUser {
65//!   fn default() -> Self {
66//!       Self {
67//!           internal_id: 0,
68//!           id: None,
69//!           username: None,
70//!           role: None,
71//!       }
72//!   }
73//! }
74//!
75//!
76//! // The various roles a user can have
77//! #[derive(Debug, Clone)]
78//! enum UserRole {
79//!   Admin,
80//!   ContentCreator,
81//!   Moderator,
82//!   Basic,
83//! }
84//!
85//! struct ValidateUserName;
86//!
87//! // A struct becomes a pipe when it implements "fama::FamaPipe"
88//! #[fama::async_trait]
89//! impl fama::FamaPipe<(NewUser, fama::PipeContent), ()> for ValidateUserName {
90//!
91//!   // The only requirement is to implement "receive_pipe_content" method
92//!    async fn receive_pipe_content(&self, (new_user, mut content): (NewUser, fama::PipeContent)) {
93//!  
94//1        // When the username is "none", stop the flow
95//!        if new_user.username.is_none() {
96//!            println!("User name cannot be empty");
97//!            content.stop_the_flow(); // Stop the pipeline flow. Pipes below this pipe will not get call
98//!        }
99//!
100//!   }
101//! }
102//!
103//! struct GenerateUserId;
104//!
105//! #[fama::async_trait]
106//! impl fama::FamaPipe<(NewUser, fama::PipeContent), Option<fama::PipeContent>> for GenerateUserId {
107//!     async fn receive_pipe_content(&self, (mut new_user, content): (NewUser,fama::PipeContent)) -> Option<fama::PipeContent> {
108//!
109//!         if new_user.id.is_none() {
110//!             new_user.id = Some(uuid::Uuid::new_v4().to_string()); // Generate and set the ID
111//!             content.store(new_user);
112//!         }
113//!
114//!       None
115//!     }
116//! }
117//!
118//! struct ApplyDefaultRole;
119//!
120//! #[fama::async_trait]
121//! impl fama::FamaPipe<(NewUser, fama::PipeContent), Option<fama::PipeContent>> for ApplyDefaultRole {
122//!     async fn receive_pipe_content(&self, (mut new_user, content): (NewUser,fama::PipeContent)) -> Option<fama::PipeContent> {
123//!
124//!         if new_user.role.is_none() {
125//!             new_user.role = Some(vec![UserRole::Basic]); // Apply default role
126//!             content.store(new_user);
127//!        }
128//!
129//!         Some(content)
130//!     }
131//! }
132//!
133//! struct SaveNewUserData;
134//! #[fama::async_trait]
135//! impl fama::FamaPipe<(NewUser, fama::PipeContent), Option<fama::PipeContent>> for SaveNewUserData {
136//!     async fn receive_pipe_content(&self, (mut new_user, content): (NewUser,fama::PipeContent)) -> Option<fama::PipeContent> {
137//!
138//!         println!(">> saving new user: {:?}", &new_user);
139//!
140//!         new_user.internal_id = 1; // pretend we persisted the data to a database
141//!         content.store(new_user);
142//!
143//!         Some(content)
144//!     }
145//! }
146//! ```
147//!
148mod content;
149mod pipeline;
150mod pipeline_builder;
151
152pub use content::PipeContent;
153pub use pipeline::FamaPipe;
154pub use pipeline::Pipeline;
155
156pub use async_trait::async_trait;
157pub use busybody;
158pub use pipeline_builder::PipelineBuilder;
159pub use pipeline_builder::PipelineBuilderTrait;
160
161#[async_trait::async_trait]
162pub trait PipelineTrait {
163    type Content: Clone + Send + Sync + 'static;
164
165    async fn handle_pipe(&self, pipeline: Pipeline<Self::Content>) -> Pipeline<Self::Content>;
166
167    async fn deliver(&self, subject: Self::Content) -> Self::Content {
168        let pipeline = Pipeline::pass(subject).await;
169        self.handle_pipe(pipeline).await.deliver().await
170    }
171
172    async fn try_to_deliver(&self, subject: Self::Content) -> Option<Self::Content> {
173        let pipeline = Pipeline::pass(subject).await;
174        self.handle_pipe(pipeline).await.try_deliver_as().await
175    }
176
177    async fn deliver_as<R: Clone + Send + Sync + 'static>(&self, subject: Self::Content) -> R
178    where
179        Self: Sized,
180    {
181        let pipeline = Pipeline::pass(subject).await;
182        self.handle_pipe(pipeline).await.deliver_as().await
183    }
184
185    async fn try_deliver_as<R: Clone + Send + Sync + 'static>(
186        &self,
187        subject: Self::Content,
188    ) -> Option<R>
189    where
190        Self: Sized,
191    {
192        let pipeline = Pipeline::pass(subject).await;
193        self.handle_pipe(pipeline).await.try_deliver_as().await
194    }
195
196    async fn confirm(&self, subject: Self::Content) -> bool {
197        let pipeline = Pipeline::pass(subject).await;
198        self.handle_pipe(pipeline).await.confirm()
199    }
200}