Skip to main content

camel_api/
producer.rs

1//! Producer context for dependency injection.
2//!
3//! This module provides [`ProducerContext`] for holding shared dependencies
4//! that producers need access to, such as the runtime command/query handle.
5
6use crate::runtime::RuntimeHandle;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10/// Context provided to producers for dependency injection.
11///
12/// `ProducerContext` holds references to shared infrastructure components
13/// that producers may need access to during message production.
14///
15/// Extensible via `default_headers` (headers injected into every outgoing
16/// message) and `timeout_ms` (per-producer timeout override).
17#[derive(Clone)]
18pub struct ProducerContext {
19    runtime: Option<Arc<dyn RuntimeHandle>>,
20    /// Default headers to inject into every outgoing message produced.
21    default_headers: HashMap<String, String>,
22    /// Optional timeout in milliseconds for producer operations.
23    timeout_ms: Option<u64>,
24}
25
26impl ProducerContext {
27    /// Creates a new empty `ProducerContext`.
28    pub fn new() -> Self {
29        Self {
30            runtime: None,
31            default_headers: HashMap::new(),
32            timeout_ms: None,
33        }
34    }
35
36    /// Attaches a runtime command/query handle.
37    pub fn with_runtime(mut self, runtime: Arc<dyn RuntimeHandle>) -> Self {
38        self.runtime = Some(runtime);
39        self
40    }
41
42    /// Returns the runtime command/query handle, if configured.
43    pub fn runtime(&self) -> Option<&Arc<dyn RuntimeHandle>> {
44        self.runtime.as_ref()
45    }
46
47    /// Sets a default header that will be injected into every outgoing message.
48    pub fn with_default_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
49        self.default_headers.insert(key.into(), value.into());
50        self
51    }
52
53    /// Returns the default headers map.
54    pub fn default_headers(&self) -> &HashMap<String, String> {
55        &self.default_headers
56    }
57
58    /// Sets a timeout in milliseconds for producer operations.
59    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
60        self.timeout_ms = Some(timeout_ms);
61        self
62    }
63
64    /// Returns the configured timeout in milliseconds, if any.
65    pub fn timeout_ms(&self) -> Option<u64> {
66        self.timeout_ms
67    }
68}
69
70impl Default for ProducerContext {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use crate::CamelError;
80    use crate::runtime::{
81        RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus,
82        RuntimeQueryResult,
83    };
84    use async_trait::async_trait;
85    use futures::executor::block_on;
86
87    struct NoopRuntime;
88
89    #[async_trait]
90    impl RuntimeCommandBus for NoopRuntime {
91        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
92            Ok(RuntimeCommandResult::Accepted)
93        }
94    }
95
96    #[async_trait]
97    impl RuntimeQueryBus for NoopRuntime {
98        async fn ask(&self, _query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
99            Ok(RuntimeQueryResult::Routes { route_ids: vec![] })
100        }
101    }
102
103    #[test]
104    fn producer_context_new_is_empty() {
105        let ctx = ProducerContext::new();
106        assert!(ctx.runtime().is_none());
107    }
108
109    #[test]
110    fn producer_context_default_is_empty() {
111        let ctx = ProducerContext::default();
112        assert!(ctx.runtime().is_none());
113    }
114
115    #[test]
116    fn producer_context_with_runtime_sets_handle() {
117        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
118        let ctx = ProducerContext::new().with_runtime(runtime.clone());
119
120        let attached = ctx.runtime().expect("runtime should be set");
121        assert!(Arc::ptr_eq(attached, &runtime));
122    }
123
124    #[test]
125    fn producer_context_clone_keeps_same_runtime_handle() {
126        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
127        let ctx = ProducerContext::new().with_runtime(runtime.clone());
128        let cloned = ctx.clone();
129
130        assert!(Arc::ptr_eq(cloned.runtime().unwrap(), &runtime));
131    }
132
133    #[test]
134    fn producer_context_with_runtime_can_execute_command() {
135        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
136        let ctx = ProducerContext::new().with_runtime(runtime);
137        let result = block_on(ctx.runtime().unwrap().execute(RuntimeCommand::StartRoute {
138            route_id: "r1".into(),
139            command_id: "c1".into(),
140            causation_id: None,
141        }))
142        .unwrap();
143
144        assert_eq!(result, RuntimeCommandResult::Accepted);
145    }
146
147    #[test]
148    fn producer_context_with_runtime_can_execute_query() {
149        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
150        let ctx = ProducerContext::new().with_runtime(runtime);
151        let result = block_on(ctx.runtime().unwrap().ask(RuntimeQuery::ListRoutes)).unwrap();
152
153        assert_eq!(result, RuntimeQueryResult::Routes { route_ids: vec![] });
154    }
155
156    #[test]
157    fn producer_context_with_runtime_replaces_previous_runtime() {
158        let first: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
159        let second: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
160        let ctx = ProducerContext::new()
161            .with_runtime(first)
162            .with_runtime(second.clone());
163
164        assert!(Arc::ptr_eq(ctx.runtime().unwrap(), &second));
165    }
166
167    #[test]
168    fn producer_context_default_headers_empty() {
169        let ctx = ProducerContext::new();
170        assert!(ctx.default_headers().is_empty());
171    }
172
173    #[test]
174    fn producer_context_with_default_header() {
175        let ctx = ProducerContext::new()
176            .with_default_header("X-Source", "camel")
177            .with_default_header("X-Trace", "enabled");
178        let headers = ctx.default_headers();
179        assert_eq!(headers.get("X-Source").unwrap(), "camel");
180        assert_eq!(headers.get("X-Trace").unwrap(), "enabled");
181    }
182
183    #[test]
184    fn producer_context_timeout_ms() {
185        let ctx = ProducerContext::new();
186        assert!(ctx.timeout_ms().is_none());
187
188        let ctx = ctx.with_timeout_ms(5000);
189        assert_eq!(ctx.timeout_ms(), Some(5000));
190    }
191}