Skip to main content

camel_bean/
registry.rs

1use crate::{BeanError, BeanProcessor};
2use camel_api::{CamelError, Exchange};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use tracing::{debug, warn};
6
7/// A thread-safe registry for named bean processors.
8///
9/// Uses `Arc<Mutex<...>>` internally so that `register()` and `lookup()`
10/// can be called from multiple threads without external synchronisation.
11pub struct BeanRegistry {
12    beans: Arc<Mutex<HashMap<String, Arc<dyn BeanProcessor>>>>,
13}
14
15impl BeanRegistry {
16    pub fn new() -> Self {
17        Self {
18            beans: Arc::new(Mutex::new(HashMap::new())),
19        }
20    }
21
22    /// Register a bean under the given name.
23    ///
24    /// Returns `Err(BeanError::DuplicateName)` if a bean with the same name
25    /// already exists, or `Err(BeanError::InvalidName)` if the name is blank.
26    pub fn register<B>(&self, name: impl Into<String>, bean: B) -> Result<(), BeanError>
27    where
28        B: BeanProcessor + 'static,
29    {
30        let name = name.into();
31        if name.trim().is_empty() {
32            return Err(BeanError::InvalidName(name));
33        }
34        let mut beans = self.beans.lock().unwrap_or_else(|e| e.into_inner());
35        if beans.contains_key(&name) {
36            warn!(bean_name = %name, "bean already registered");
37            return Err(BeanError::DuplicateName(name));
38        }
39        beans.insert(name.clone(), Arc::new(bean));
40        debug!(bean_name = %name, "bean registered");
41        Ok(())
42    }
43
44    pub fn get(&self, name: &str) -> Option<Arc<dyn BeanProcessor>> {
45        self.beans
46            .lock()
47            .unwrap_or_else(|e| e.into_inner())
48            .get(name)
49            .cloned()
50    }
51
52    pub async fn invoke(
53        &self,
54        bean_name: &str,
55        method: &str,
56        exchange: &mut Exchange,
57    ) -> Result<(), CamelError> {
58        debug!(bean_name = %bean_name, method = %method, "invoking bean");
59
60        let bean = self.get(bean_name).ok_or_else(|| {
61            warn!(bean_name = %bean_name, "bean not found");
62            BeanError::NotFound(bean_name.to_string())
63        })?;
64
65        if !bean.methods().iter().any(|m| m == method) {
66            warn!(bean_name = %bean_name, method = %method, "bean method not found");
67            return Err(BeanError::MethodNotFound(method.to_string()).into());
68        }
69
70        match bean.call(method, exchange).await {
71            Ok(()) => {
72                debug!(bean_name = %bean_name, method = %method, "bean invocation succeeded");
73                Ok(())
74            }
75            Err(err) => {
76                warn!(bean_name = %bean_name, method = %method, error = %err, "bean invocation failed");
77                Err(err)
78            }
79        }
80    }
81
82    pub fn len(&self) -> usize {
83        self.beans.lock().unwrap_or_else(|e| e.into_inner()).len()
84    }
85
86    pub fn is_empty(&self) -> bool {
87        self.beans
88            .lock()
89            .unwrap_or_else(|e| e.into_inner())
90            .is_empty()
91    }
92}
93
94impl Default for BeanRegistry {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use async_trait::async_trait;
104    use camel_api::{Exchange, Message};
105
106    struct MockBean;
107
108    struct ErrorBean;
109
110    #[async_trait]
111    impl BeanProcessor for MockBean {
112        async fn call(&self, method: &str, _exchange: &mut Exchange) -> Result<(), CamelError> {
113            match method {
114                "process" => Ok(()),
115                _ => Err(BeanError::MethodNotFound(method.to_string()).into()),
116            }
117        }
118
119        fn methods(&self) -> Vec<String> {
120            vec!["process".to_string()]
121        }
122    }
123
124    #[async_trait]
125    impl BeanProcessor for ErrorBean {
126        async fn call(&self, _method: &str, _exchange: &mut Exchange) -> Result<(), CamelError> {
127            Err(CamelError::ProcessorError("boom".to_string()))
128        }
129
130        fn methods(&self) -> Vec<String> {
131            vec!["process".to_string()]
132        }
133    }
134
135    #[test]
136    fn test_default_is_empty() {
137        let registry = BeanRegistry::default();
138        assert!(registry.is_empty());
139        assert_eq!(registry.len(), 0);
140    }
141
142    #[test]
143    fn test_register_and_get() {
144        let registry = BeanRegistry::new();
145        registry.register("mock", MockBean).unwrap();
146
147        assert!(registry.get("mock").is_some());
148        assert!(registry.get("nonexistent").is_none());
149    }
150
151    #[test]
152    fn test_len_and_is_empty() {
153        let registry = BeanRegistry::new();
154        assert!(registry.is_empty());
155
156        registry.register("mock", MockBean).unwrap();
157        assert_eq!(registry.len(), 1);
158        assert!(!registry.is_empty());
159    }
160
161    #[test]
162    fn test_register_same_name_rejects() {
163        let registry = BeanRegistry::new();
164        registry.register("dup", MockBean).unwrap();
165        let result = registry.register("dup", ErrorBean);
166        assert!(result.is_err());
167        assert_eq!(registry.len(), 1);
168    }
169
170    #[test]
171    fn test_register_duplicate_returns_error() {
172        let registry = BeanRegistry::new();
173        registry.register("dup", MockBean).unwrap();
174        let err = registry.register("dup", ErrorBean).unwrap_err();
175        assert!(matches!(err, BeanError::DuplicateName(_)));
176    }
177
178    #[tokio::test]
179    async fn test_invoke_success() {
180        let registry = BeanRegistry::new();
181        registry.register("mock", MockBean).unwrap();
182
183        let mut exchange = Exchange::new(Message::default());
184        let result = registry.invoke("mock", "process", &mut exchange).await;
185
186        assert!(result.is_ok());
187    }
188
189    #[tokio::test]
190    async fn test_invoke_bean_not_found() {
191        let registry = BeanRegistry::new();
192        let mut exchange = Exchange::new(Message::default());
193
194        let result = registry
195            .invoke("nonexistent", "process", &mut exchange)
196            .await;
197        assert!(result.is_err());
198    }
199
200    #[tokio::test]
201    async fn test_invoke_method_not_found() {
202        let registry = BeanRegistry::new();
203        registry.register("mock", MockBean).unwrap();
204
205        let mut exchange = Exchange::new(Message::default());
206        let result = registry.invoke("mock", "unknown", &mut exchange).await;
207
208        assert!(result.is_err());
209    }
210
211    #[test]
212    fn test_register_empty_name_rejected() {
213        let registry = BeanRegistry::new();
214        let result = registry.register("", MockBean);
215        assert!(result.is_err());
216        assert!(matches!(result.unwrap_err(), BeanError::InvalidName(_)));
217    }
218
219    #[test]
220    fn test_register_whitespace_name_rejected() {
221        let registry = BeanRegistry::new();
222        let result = registry.register("   ", MockBean);
223        assert!(result.is_err());
224        assert!(matches!(result.unwrap_err(), BeanError::InvalidName(_)));
225    }
226
227    #[tokio::test]
228    async fn test_invoke_propagates_bean_error() {
229        let registry = BeanRegistry::new();
230        registry.register("err", ErrorBean).unwrap();
231
232        let mut exchange = Exchange::new(Message::default());
233        let result = registry.invoke("err", "process", &mut exchange).await;
234
235        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
236    }
237
238    /// C-17: Verify that calling a method on a bean_impl-generated BeanProcessor
239    /// returns Err (not panic) when the method is not found.
240    #[tokio::test]
241    async fn test_macro_generated_call_returns_err_for_unknown_method() {
242        // Manually simulate what #[bean_impl] generates for a simple handler,
243        // since the proc-macro uses ::camel_bean paths that don't resolve
244        // inside the camel_bean crate itself.
245        struct Greeter;
246
247        #[crate::async_trait]
248        impl crate::BeanProcessor for Greeter {
249            async fn call(&self, method: &str, _exchange: &mut Exchange) -> Result<(), CamelError> {
250                match method {
251                    "hello" => Ok(()),
252                    _ => Err(CamelError::ProcessorError(format!(
253                        "Method '{}' not found",
254                        method
255                    ))),
256                }
257            }
258
259            fn methods(&self) -> Vec<String> {
260                vec!["hello".to_string()]
261            }
262        }
263
264        let bean = Greeter;
265        let mut exchange = Exchange::new(Message::default());
266
267        // Known method should succeed
268        let result = bean.call("hello", &mut exchange).await;
269        assert!(result.is_ok(), "known method should succeed");
270
271        // Unknown method must return Err, not panic
272        let result = bean.call("nonexistent", &mut exchange).await;
273        assert!(result.is_err(), "unknown method must return Err");
274        let err = result.unwrap_err();
275        let msg = err.to_string();
276        assert!(
277            msg.contains("nonexistent"),
278            "error should mention method name, got: {msg}"
279        );
280    }
281
282    #[tokio::test]
283    async fn test_bean_lifecycle_defaults_are_noop() {
284        let b = MockBean;
285        assert!(b.on_start().await.is_ok());
286        assert!(b.on_stop().await.is_ok());
287    }
288}