1use crate::runtime::RuntimeHandle;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10#[derive(Clone)]
18pub struct ProducerContext {
19 runtime: Option<Arc<dyn RuntimeHandle>>,
20 route_id: Option<String>,
26 default_headers: HashMap<String, String>,
28 timeout_ms: Option<u64>,
30}
31
32impl ProducerContext {
33 pub fn new() -> Self {
35 Self {
36 runtime: None,
37 route_id: None,
38 default_headers: HashMap::new(),
39 timeout_ms: None,
40 }
41 }
42
43 pub fn with_runtime(mut self, runtime: Arc<dyn RuntimeHandle>) -> Self {
45 self.runtime = Some(runtime);
46 self
47 }
48
49 pub fn with_route_id(mut self, route_id: impl Into<String>) -> Self {
51 self.route_id = Some(route_id.into());
52 self
53 }
54
55 pub fn route_id(&self) -> Option<&str> {
57 self.route_id.as_deref()
58 }
59
60 pub fn runtime(&self) -> Option<&Arc<dyn RuntimeHandle>> {
62 self.runtime.as_ref()
63 }
64
65 pub fn with_default_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
67 self.default_headers.insert(key.into(), value.into());
68 self
69 }
70
71 pub fn default_headers(&self) -> &HashMap<String, String> {
73 &self.default_headers
74 }
75
76 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
78 self.timeout_ms = Some(timeout_ms);
79 self
80 }
81
82 pub fn timeout_ms(&self) -> Option<u64> {
84 self.timeout_ms
85 }
86}
87
88impl Default for ProducerContext {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::CamelError;
98 use crate::runtime::{
99 RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus,
100 RuntimeQueryResult,
101 };
102 use async_trait::async_trait;
103 use futures::executor::block_on;
104
105 struct NoopRuntime;
106
107 #[async_trait]
108 impl RuntimeCommandBus for NoopRuntime {
109 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
110 Ok(RuntimeCommandResult::Accepted)
111 }
112 }
113
114 #[async_trait]
115 impl RuntimeQueryBus for NoopRuntime {
116 async fn ask(&self, _query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117 Ok(RuntimeQueryResult::Routes { route_ids: vec![] })
118 }
119 }
120
121 #[test]
122 fn producer_context_new_is_empty() {
123 let ctx = ProducerContext::new();
124 assert!(ctx.runtime().is_none());
125 }
126
127 #[test]
128 fn producer_context_default_is_empty() {
129 let ctx = ProducerContext::default();
130 assert!(ctx.runtime().is_none());
131 }
132
133 #[test]
134 fn producer_context_with_runtime_sets_handle() {
135 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
136 let ctx = ProducerContext::new().with_runtime(runtime.clone());
137
138 let attached = ctx.runtime().expect("runtime should be set");
139 assert!(Arc::ptr_eq(attached, &runtime));
140 }
141
142 #[test]
143 fn producer_context_clone_keeps_same_runtime_handle() {
144 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
145 let ctx = ProducerContext::new().with_runtime(runtime.clone());
146 let cloned = ctx.clone();
147
148 assert!(Arc::ptr_eq(cloned.runtime().unwrap(), &runtime));
149 }
150
151 #[test]
152 fn producer_context_with_runtime_can_execute_command() {
153 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
154 let ctx = ProducerContext::new().with_runtime(runtime);
155 let result = block_on(ctx.runtime().unwrap().execute(RuntimeCommand::StartRoute {
156 route_id: "r1".into(),
157 command_id: "c1".into(),
158 causation_id: None,
159 }))
160 .unwrap();
161
162 assert_eq!(result, RuntimeCommandResult::Accepted);
163 }
164
165 #[test]
166 fn producer_context_with_runtime_can_execute_query() {
167 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
168 let ctx = ProducerContext::new().with_runtime(runtime);
169 let result = block_on(ctx.runtime().unwrap().ask(RuntimeQuery::ListRoutes)).unwrap();
170
171 assert_eq!(result, RuntimeQueryResult::Routes { route_ids: vec![] });
172 }
173
174 #[test]
175 fn producer_context_with_runtime_replaces_previous_runtime() {
176 let first: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
177 let second: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
178 let ctx = ProducerContext::new()
179 .with_runtime(first)
180 .with_runtime(second.clone());
181
182 assert!(Arc::ptr_eq(ctx.runtime().unwrap(), &second));
183 }
184
185 #[test]
186 fn producer_context_route_id_is_set_via_builder() {
187 let ctx = ProducerContext::new().with_route_id("my-route");
188 assert_eq!(ctx.route_id(), Some("my-route"));
189 }
190
191 #[test]
192 fn producer_context_route_id_none_by_default() {
193 let ctx = ProducerContext::new();
194 assert_eq!(ctx.route_id(), None);
195 }
196
197 #[test]
198 fn producer_context_default_headers_empty() {
199 let ctx = ProducerContext::new();
200 assert!(ctx.default_headers().is_empty());
201 }
202
203 #[test]
204 fn producer_context_with_default_header() {
205 let ctx = ProducerContext::new()
206 .with_default_header("X-Source", "camel")
207 .with_default_header("X-Trace", "enabled");
208 let headers = ctx.default_headers();
209 assert_eq!(headers.get("X-Source").unwrap(), "camel");
210 assert_eq!(headers.get("X-Trace").unwrap(), "enabled");
211 }
212
213 #[test]
214 fn producer_context_timeout_ms() {
215 let ctx = ProducerContext::new();
216 assert!(ctx.timeout_ms().is_none());
217
218 let ctx = ctx.with_timeout_ms(5000);
219 assert_eq!(ctx.timeout_ms(), Some(5000));
220 }
221}