Skip to main content

camel_api/
producer.rs

1//! Producer context for dependency injection.
2//!
3//! This module provides [`ProducerContext`] for holding shared dependencies
4//! that producers need access to, such as the runtime command/query handle.
5
6use crate::runtime::RuntimeHandle;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10/// Context provided to producers for dependency injection.
11///
12/// `ProducerContext` holds references to shared infrastructure components
13/// that producers may need access to during message production.
14///
15/// Extensible via `default_headers` (headers injected into every outgoing
16/// message) and `timeout_ms` (per-producer timeout override).
17#[derive(Clone)]
18pub struct ProducerContext {
19    runtime: Option<Arc<dyn RuntimeHandle>>,
20    /// The route_id this producer is bound to, if any.
21    ///
22    /// Available for ADR-0012 metrics/health calls that require a route_id
23    /// (categories (b′), (e), (g)). Set at producer creation time by the
24    /// route build pipeline (step_resolution.rs).
25    route_id: Option<String>,
26    /// Default headers to inject into every outgoing message produced.
27    default_headers: HashMap<String, String>,
28    /// Optional timeout in milliseconds for producer operations.
29    timeout_ms: Option<u64>,
30}
31
32impl ProducerContext {
33    /// Creates a new empty `ProducerContext`.
34    pub fn new() -> Self {
35        Self {
36            runtime: None,
37            route_id: None,
38            default_headers: HashMap::new(),
39            timeout_ms: None,
40        }
41    }
42
43    /// Attaches a runtime command/query handle.
44    pub fn with_runtime(mut self, runtime: Arc<dyn RuntimeHandle>) -> Self {
45        self.runtime = Some(runtime);
46        self
47    }
48
49    /// Attaches the route_id this producer is bound to.
50    pub fn with_route_id(mut self, route_id: impl Into<String>) -> Self {
51        self.route_id = Some(route_id.into());
52        self
53    }
54
55    /// Returns the route_id this producer is bound to, if set.
56    pub fn route_id(&self) -> Option<&str> {
57        self.route_id.as_deref()
58    }
59
60    /// Returns the runtime command/query handle, if configured.
61    pub fn runtime(&self) -> Option<&Arc<dyn RuntimeHandle>> {
62        self.runtime.as_ref()
63    }
64
65    /// Sets a default header that will be injected into every outgoing message.
66    pub fn with_default_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
67        self.default_headers.insert(key.into(), value.into());
68        self
69    }
70
71    /// Returns the default headers map.
72    pub fn default_headers(&self) -> &HashMap<String, String> {
73        &self.default_headers
74    }
75
76    /// Sets a timeout in milliseconds for producer operations.
77    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
78        self.timeout_ms = Some(timeout_ms);
79        self
80    }
81
82    /// Returns the configured timeout in milliseconds, if any.
83    pub fn timeout_ms(&self) -> Option<u64> {
84        self.timeout_ms
85    }
86}
87
88impl Default for ProducerContext {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::CamelError;
98    use crate::runtime::{
99        RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus,
100        RuntimeQueryResult,
101    };
102    use async_trait::async_trait;
103    use futures::executor::block_on;
104
105    struct NoopRuntime;
106
107    #[async_trait]
108    impl RuntimeCommandBus for NoopRuntime {
109        async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
110            Ok(RuntimeCommandResult::Accepted)
111        }
112    }
113
114    #[async_trait]
115    impl RuntimeQueryBus for NoopRuntime {
116        async fn ask(&self, _query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117            Ok(RuntimeQueryResult::Routes { route_ids: vec![] })
118        }
119    }
120
121    #[test]
122    fn producer_context_new_is_empty() {
123        let ctx = ProducerContext::new();
124        assert!(ctx.runtime().is_none());
125    }
126
127    #[test]
128    fn producer_context_default_is_empty() {
129        let ctx = ProducerContext::default();
130        assert!(ctx.runtime().is_none());
131    }
132
133    #[test]
134    fn producer_context_with_runtime_sets_handle() {
135        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
136        let ctx = ProducerContext::new().with_runtime(runtime.clone());
137
138        let attached = ctx.runtime().expect("runtime should be set");
139        assert!(Arc::ptr_eq(attached, &runtime));
140    }
141
142    #[test]
143    fn producer_context_clone_keeps_same_runtime_handle() {
144        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
145        let ctx = ProducerContext::new().with_runtime(runtime.clone());
146        let cloned = ctx.clone();
147
148        assert!(Arc::ptr_eq(cloned.runtime().unwrap(), &runtime));
149    }
150
151    #[test]
152    fn producer_context_with_runtime_can_execute_command() {
153        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
154        let ctx = ProducerContext::new().with_runtime(runtime);
155        let result = block_on(ctx.runtime().unwrap().execute(RuntimeCommand::StartRoute {
156            route_id: "r1".into(),
157            command_id: "c1".into(),
158            causation_id: None,
159        }))
160        .unwrap();
161
162        assert_eq!(result, RuntimeCommandResult::Accepted);
163    }
164
165    #[test]
166    fn producer_context_with_runtime_can_execute_query() {
167        let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
168        let ctx = ProducerContext::new().with_runtime(runtime);
169        let result = block_on(ctx.runtime().unwrap().ask(RuntimeQuery::ListRoutes)).unwrap();
170
171        assert_eq!(result, RuntimeQueryResult::Routes { route_ids: vec![] });
172    }
173
174    #[test]
175    fn producer_context_with_runtime_replaces_previous_runtime() {
176        let first: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
177        let second: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
178        let ctx = ProducerContext::new()
179            .with_runtime(first)
180            .with_runtime(second.clone());
181
182        assert!(Arc::ptr_eq(ctx.runtime().unwrap(), &second));
183    }
184
185    #[test]
186    fn producer_context_route_id_is_set_via_builder() {
187        let ctx = ProducerContext::new().with_route_id("my-route");
188        assert_eq!(ctx.route_id(), Some("my-route"));
189    }
190
191    #[test]
192    fn producer_context_route_id_none_by_default() {
193        let ctx = ProducerContext::new();
194        assert_eq!(ctx.route_id(), None);
195    }
196
197    #[test]
198    fn producer_context_default_headers_empty() {
199        let ctx = ProducerContext::new();
200        assert!(ctx.default_headers().is_empty());
201    }
202
203    #[test]
204    fn producer_context_with_default_header() {
205        let ctx = ProducerContext::new()
206            .with_default_header("X-Source", "camel")
207            .with_default_header("X-Trace", "enabled");
208        let headers = ctx.default_headers();
209        assert_eq!(headers.get("X-Source").unwrap(), "camel");
210        assert_eq!(headers.get("X-Trace").unwrap(), "enabled");
211    }
212
213    #[test]
214    fn producer_context_timeout_ms() {
215        let ctx = ProducerContext::new();
216        assert!(ctx.timeout_ms().is_none());
217
218        let ctx = ctx.with_timeout_ms(5000);
219        assert_eq!(ctx.timeout_ms(), Some(5000));
220    }
221}