idem_handler/
executor.rs

1use crate::exchange::Exchange;
2use crate::handler::Handler;
3use crate::status::Code;
4use fnv::FnvHashMap;
5use std::sync::Arc;
6
7#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
8pub enum ExecutorError {
9    HandlerStatusError,
10    HandlerExceptionError,
11    HandlerChainError,
12    MissingHandlerError
13}
14
15pub struct HandlerExecutor<Input, Output, Metadata> {
16    pub handlers: FnvHashMap<u64, Arc<dyn Handler<Input, Output, Metadata> + Send>>,
17}
18
19impl<Input, Output, Metadata> HandlerExecutor<Input, Output, Metadata>
20where
21    Input: Default + Send + 'static,
22    Output: Default + Send + 'static,
23    Metadata: Send,
24{
25    pub fn new() -> HandlerExecutor<Input, Output, Metadata> {
26        HandlerExecutor {
27            handlers: FnvHashMap::default(),
28        }
29    }
30
31    pub fn add_handler(
32        &mut self,
33        handler_id: u64,
34        handler: Arc<dyn Handler<Input, Output, Metadata> + Send>,
35    ) -> &mut Self {
36        self.handlers.insert(handler_id, handler);
37        self
38    }
39
40    pub async fn handle(
41        &self,
42        handler_chain: &Vec<u64>,
43        input: Input,
44        metadata: Option<Metadata>,
45    ) -> Result<Output, ExecutorError> {
46        let mut exchange: Exchange<Input, Output, Metadata> = Exchange::new();
47        exchange.save_input(input);
48        if let Some(metadata) = metadata {
49            exchange.add_metadata(metadata);
50        }
51
52        for handler_id in handler_chain {
53            if let Some(handler) = self.handlers.get(&handler_id) {
54                match handler.exec(&mut exchange).await {
55                    Ok(status) => {
56                        if status.code.all_flags(Code::REQUEST_COMPLETED) {
57                           if let Ok(output) = exchange.take_output() {
58                               return Ok(output);
59                           }
60                        } else if status.code.any_flags(
61                            Code::CLIENT_ERROR
62                                | Code::SERVER_ERROR
63                                | Code::TIMEOUT
64                                | Code::REQUEST_COMPLETED,
65                        ) {
66                            return Err(ExecutorError::HandlerStatusError)
67                        }
68                    }
69                    Err(_error) => {
70                        return Err(ExecutorError::HandlerExceptionError);
71                    },
72                }
73            } else {
74                return Err(ExecutorError::MissingHandlerError);
75            }
76        }
77
78       Err(ExecutorError::HandlerChainError)
79    }
80}
81
82#[cfg(test)]
83mod test {
84    use std::sync::Arc;
85    use async_trait::async_trait;
86    use crate::exchange::Exchange;
87    use crate::executor::{ExecutorError, HandlerExecutor};
88    use crate::handler::Handler;
89    use crate::status::{Code, HandlerExecutionError, HandlerStatus};
90
91    struct ModifyBodyHandler;
92
93    #[async_trait]
94    impl Handler<String, String, ()> for ModifyBodyHandler {
95        async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
96            let request_body = exchange.input_mut().unwrap();
97            let new_body = request_body.replace("Hello", "Goodbye");
98            *request_body = new_body;
99            Ok(HandlerStatus::new(Code::OK))
100        }
101    }
102    struct IdempotentHandler1;
103
104    #[async_trait]
105    impl Handler<String, String, ()> for IdempotentHandler1 {
106        async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
107            println!("Some logging or something");
108            Ok(HandlerStatus::new(Code::OK))
109        }
110    }
111
112    struct IdempotentHandler2;
113
114    #[async_trait]
115    impl Handler<String, String, ()> for IdempotentHandler2 {
116        async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
117            println!("Some other idempotent operation");
118            Ok(HandlerStatus::new(Code::OK))
119        }
120    }
121
122    struct RequestCompletingHandler;
123
124    #[async_trait]
125    impl Handler<String, String, ()> for RequestCompletingHandler {
126        async fn exec(&self, exchange: &mut Exchange<String, String, ()>) -> Result<HandlerStatus, HandlerExecutionError> {
127            let response = exchange.take_request().unwrap();
128            exchange.save_output(response);
129            Ok(HandlerStatus::new(Code::REQUEST_COMPLETED))
130        }
131    }
132
133    #[tokio::test(flavor = "current_thread")]
134    async fn test_valid_handler_chain() {
135        let mut handler_executor: HandlerExecutor<String, String, ()> = HandlerExecutor::new();
136
137        // load handlers in any order.
138        handler_executor.add_handler(1, Arc::new(IdempotentHandler1));
139        handler_executor.add_handler(2, Arc::new(IdempotentHandler2));
140        handler_executor.add_handler(3, Arc::new(RequestCompletingHandler));
141        handler_executor.add_handler(4, Arc::new(ModifyBodyHandler));
142
143        // choose the handlers to be executed in order.
144        let chain_for_request: Vec<u64> = vec![1, 2, 4, 3];
145        let my_input = String::from("Hello Handler!");
146
147        let result = handler_executor.handle(&chain_for_request, my_input, None).await;
148        assert!(result.is_ok());
149        let result = result.unwrap();
150        assert_eq!("Goodbye Handler!", result);
151    }
152
153    #[tokio::test(flavor = "current_thread")]
154    async fn test_invalid_handler_chain() {
155        let mut handler_executor: HandlerExecutor<String, String, ()> = HandlerExecutor::new();
156
157        // load handlers in any order.
158        handler_executor.add_handler(1, Arc::new(IdempotentHandler1));
159        handler_executor.add_handler(2, Arc::new(IdempotentHandler2));
160        handler_executor.add_handler(3, Arc::new(RequestCompletingHandler));
161        handler_executor.add_handler(4, Arc::new(ModifyBodyHandler));
162
163        // choose the handlers to be executed in order.
164        let chain_for_request: Vec<u64> = vec![1, 1];
165        let my_input = String::from("Hello Handler!");
166
167        let result = handler_executor.handle(&chain_for_request, my_input, None).await;
168        match result {
169            Ok(_) => {
170                assert!(false, "Execution should have failed!")
171            }
172            Err(e) => {
173                assert_eq!(ExecutorError::HandlerChainError, e);
174            }
175        }
176    }
177
178
179}