1use crate::exchange::Exchange;
2use crate::handler::Handler;
3use crate::status::Code;
4use fnv::FnvHashMap;
5use std::sync::Arc;
6
7#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
8pub enum ExecutorError {
9 HandlerStatusError,
10 HandlerExceptionError,
11 HandlerChainError,
12 MissingHandlerError
13}
14
15pub struct HandlerExecutor<Input, Output, Metadata> {
16 pub handlers: FnvHashMap<u64, Arc<dyn Handler<Input, Output, Metadata> + Send>>,
17}
18
19impl<Input, Output, Metadata> HandlerExecutor<Input, Output, Metadata>
20where
21 Input: Default + Send + 'static,
22 Output: Default + Send + 'static,
23 Metadata: Send,
24{
25 pub fn new() -> HandlerExecutor<Input, Output, Metadata> {
26 HandlerExecutor {
27 handlers: FnvHashMap::default(),
28 }
29 }
30
31 pub fn add_handler(
32 &mut self,
33 handler_id: u64,
34 handler: Arc<dyn Handler<Input, Output, Metadata> + Send>,
35 ) -> &mut Self {
36 self.handlers.insert(handler_id, handler);
37 self
38 }
39
40 pub async fn handle(
41 &self,
42 handler_chain: &Vec<u64>,
43 input: Input,
44 metadata: Option<Metadata>,
45 ) -> Result<Output, ExecutorError> {
46 let mut exchange: Exchange<Input, Output, Metadata> = Exchange::new();
47 exchange.save_input(input);
48 if let Some(metadata) = metadata {
49 exchange.add_metadata(metadata);
50 }
51
52 for handler_id in handler_chain {
53 if let Some(handler) = self.handlers.get(&handler_id) {
54 match handler.exec(&mut exchange).await {
55 Ok(status) => {
56 if status.code.all_flags(Code::REQUEST_COMPLETED) {
57 if let Ok(output) = exchange.take_output() {
58 return Ok(output);
59 }
60 } else if status.code.any_flags(
61 Code::CLIENT_ERROR
62 | Code::SERVER_ERROR
63 | Code::TIMEOUT
64 | Code::REQUEST_COMPLETED,
65 ) {
66 return Err(ExecutorError::HandlerStatusError)
67 }
68 }
69 Err(_error) => {
70 return Err(ExecutorError::HandlerExceptionError);
71 },
72 }
73 } else {
74 return Err(ExecutorError::MissingHandlerError);
75 }
76 }
77
78 Err(ExecutorError::HandlerChainError)
79 }
80}
81
82#[cfg(test)]
83mod test {
84 use std::sync::Arc;
85 use async_trait::async_trait;
86 use crate::exchange::Exchange;
87 use crate::executor::{ExecutorError, HandlerExecutor};
88 use crate::handler::Handler;
89 use crate::status::{Code, HandlerExecutionError, HandlerStatus};
90
91 struct ModifyBodyHandler;
92
93 #[async_trait]
94 impl Handler<String, String, ()> for ModifyBodyHandler {
95 async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
96 let request_body = exchange.input_mut().unwrap();
97 let new_body = request_body.replace("Hello", "Goodbye");
98 *request_body = new_body;
99 Ok(HandlerStatus::new(Code::OK))
100 }
101 }
102 struct IdempotentHandler1;
103
104 #[async_trait]
105 impl Handler<String, String, ()> for IdempotentHandler1 {
106 async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
107 println!("Some logging or something");
108 Ok(HandlerStatus::new(Code::OK))
109 }
110 }
111
112 struct IdempotentHandler2;
113
114 #[async_trait]
115 impl Handler<String, String, ()> for IdempotentHandler2 {
116 async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
117 println!("Some other idempotent operation");
118 Ok(HandlerStatus::new(Code::OK))
119 }
120 }
121
122 struct RequestCompletingHandler;
123
124 #[async_trait]
125 impl Handler<String, String, ()> for RequestCompletingHandler {
126 async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
127 let response = exchange.take_request().unwrap();
128 exchange.save_output(response);
129 Ok(HandlerStatus::new(Code::REQUEST_COMPLETED))
130 }
131 }
132
133 #[tokio::test(flavor = "current_thread")]
134 async fn test_valid_handler_chain() {
135 let mut handler_executor: HandlerExecutor<String, String, ()> = HandlerExecutor::new();
136
137 handler_executor.add_handler(1, Arc::new(IdempotentHandler1));
139 handler_executor.add_handler(2, Arc::new(IdempotentHandler2));
140 handler_executor.add_handler(3, Arc::new(RequestCompletingHandler));
141 handler_executor.add_handler(4, Arc::new(ModifyBodyHandler));
142
143 let chain_for_request: Vec<u64> = vec![1, 2, 4, 3];
145 let my_input = String::from("Hello Handler!");
146
147 let result = handler_executor.handle(&chain_for_request, my_input, None).await;
148 assert!(result.is_ok());
149 let result = result.unwrap();
150 assert_eq!("Goodbye Handler!", result);
151 }
152
153 #[tokio::test(flavor = "current_thread")]
154 async fn test_invalid_handler_chain() {
155 let mut handler_executor: HandlerExecutor<String, String, ()> = HandlerExecutor::new();
156
157 handler_executor.add_handler(1, Arc::new(IdempotentHandler1));
159 handler_executor.add_handler(2, Arc::new(IdempotentHandler2));
160 handler_executor.add_handler(3, Arc::new(RequestCompletingHandler));
161 handler_executor.add_handler(4, Arc::new(ModifyBodyHandler));
162
163 let chain_for_request: Vec<u64> = vec![1, 1];
165 let my_input = String::from("Hello Handler!");
166
167 let result = handler_executor.handle(&chain_for_request, my_input, None).await;
168 match result {
169 Ok(_) => {
170 assert!(false, "Execution should have failed!")
171 }
172 Err(e) => {
173 assert_eq!(ExecutorError::HandlerChainError, e);
174 }
175 }
176 }
177
178
179}