1use crate::runtime::RuntimeHandle;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10#[derive(Clone)]
18pub struct ProducerContext {
19 runtime: Option<Arc<dyn RuntimeHandle>>,
20 default_headers: HashMap<String, String>,
22 timeout_ms: Option<u64>,
24}
25
26impl ProducerContext {
27 pub fn new() -> Self {
29 Self {
30 runtime: None,
31 default_headers: HashMap::new(),
32 timeout_ms: None,
33 }
34 }
35
36 pub fn with_runtime(mut self, runtime: Arc<dyn RuntimeHandle>) -> Self {
38 self.runtime = Some(runtime);
39 self
40 }
41
42 pub fn runtime(&self) -> Option<&Arc<dyn RuntimeHandle>> {
44 self.runtime.as_ref()
45 }
46
47 pub fn with_default_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
49 self.default_headers.insert(key.into(), value.into());
50 self
51 }
52
53 pub fn default_headers(&self) -> &HashMap<String, String> {
55 &self.default_headers
56 }
57
58 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
60 self.timeout_ms = Some(timeout_ms);
61 self
62 }
63
64 pub fn timeout_ms(&self) -> Option<u64> {
66 self.timeout_ms
67 }
68}
69
70impl Default for ProducerContext {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use crate::CamelError;
80 use crate::runtime::{
81 RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus,
82 RuntimeQueryResult,
83 };
84 use async_trait::async_trait;
85 use futures::executor::block_on;
86
87 struct NoopRuntime;
88
89 #[async_trait]
90 impl RuntimeCommandBus for NoopRuntime {
91 async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
92 Ok(RuntimeCommandResult::Accepted)
93 }
94 }
95
96 #[async_trait]
97 impl RuntimeQueryBus for NoopRuntime {
98 async fn ask(&self, _query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
99 Ok(RuntimeQueryResult::Routes { route_ids: vec![] })
100 }
101 }
102
103 #[test]
104 fn producer_context_new_is_empty() {
105 let ctx = ProducerContext::new();
106 assert!(ctx.runtime().is_none());
107 }
108
109 #[test]
110 fn producer_context_default_is_empty() {
111 let ctx = ProducerContext::default();
112 assert!(ctx.runtime().is_none());
113 }
114
115 #[test]
116 fn producer_context_with_runtime_sets_handle() {
117 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
118 let ctx = ProducerContext::new().with_runtime(runtime.clone());
119
120 let attached = ctx.runtime().expect("runtime should be set");
121 assert!(Arc::ptr_eq(attached, &runtime));
122 }
123
124 #[test]
125 fn producer_context_clone_keeps_same_runtime_handle() {
126 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
127 let ctx = ProducerContext::new().with_runtime(runtime.clone());
128 let cloned = ctx.clone();
129
130 assert!(Arc::ptr_eq(cloned.runtime().unwrap(), &runtime));
131 }
132
133 #[test]
134 fn producer_context_with_runtime_can_execute_command() {
135 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
136 let ctx = ProducerContext::new().with_runtime(runtime);
137 let result = block_on(ctx.runtime().unwrap().execute(RuntimeCommand::StartRoute {
138 route_id: "r1".into(),
139 command_id: "c1".into(),
140 causation_id: None,
141 }))
142 .unwrap();
143
144 assert_eq!(result, RuntimeCommandResult::Accepted);
145 }
146
147 #[test]
148 fn producer_context_with_runtime_can_execute_query() {
149 let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
150 let ctx = ProducerContext::new().with_runtime(runtime);
151 let result = block_on(ctx.runtime().unwrap().ask(RuntimeQuery::ListRoutes)).unwrap();
152
153 assert_eq!(result, RuntimeQueryResult::Routes { route_ids: vec![] });
154 }
155
156 #[test]
157 fn producer_context_with_runtime_replaces_previous_runtime() {
158 let first: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
159 let second: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
160 let ctx = ProducerContext::new()
161 .with_runtime(first)
162 .with_runtime(second.clone());
163
164 assert!(Arc::ptr_eq(ctx.runtime().unwrap(), &second));
165 }
166
167 #[test]
168 fn producer_context_default_headers_empty() {
169 let ctx = ProducerContext::new();
170 assert!(ctx.default_headers().is_empty());
171 }
172
173 #[test]
174 fn producer_context_with_default_header() {
175 let ctx = ProducerContext::new()
176 .with_default_header("X-Source", "camel")
177 .with_default_header("X-Trace", "enabled");
178 let headers = ctx.default_headers();
179 assert_eq!(headers.get("X-Source").unwrap(), "camel");
180 assert_eq!(headers.get("X-Trace").unwrap(), "enabled");
181 }
182
183 #[test]
184 fn producer_context_timeout_ms() {
185 let ctx = ProducerContext::new();
186 assert!(ctx.timeout_ms().is_none());
187
188 let ctx = ctx.with_timeout_ms(5000);
189 assert_eq!(ctx.timeout_ms(), Some(5000));
190 }
191}