1pub use camel_api::{LanguageExpressionDef, ValueSourceDef};
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub enum DeclarativeConcurrency {
5 Sequential,
6 Concurrent { max: Option<usize> },
7}
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct DeclarativeCircuitBreaker {
11 pub failure_threshold: u32,
12 pub open_duration_ms: u64,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct DeclarativeRedeliveryPolicy {
17 pub max_attempts: u32,
18 pub initial_delay_ms: u64,
19 pub multiplier: f64,
20 pub max_delay_ms: u64,
21 pub jitter_factor: f64,
22 pub handled_by: Option<String>,
23}
24
25#[derive(Debug, Clone, PartialEq)]
26pub struct DeclarativeOnException {
27 pub kind: Option<String>,
28 pub message_contains: Option<String>,
29 pub retry: Option<DeclarativeRedeliveryPolicy>,
30}
31
32#[derive(Debug, Clone, PartialEq)]
33pub struct DeclarativeErrorHandler {
34 pub dead_letter_channel: Option<String>,
35 pub retry: Option<DeclarativeRedeliveryPolicy>,
36 pub on_exceptions: Option<Vec<DeclarativeOnException>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct DeclarativeRoute {
41 pub from: String,
42 pub route_id: String,
43 pub auto_startup: bool,
44 pub startup_order: i32,
45 pub concurrency: Option<DeclarativeConcurrency>,
46 pub error_handler: Option<DeclarativeErrorHandler>,
47 pub circuit_breaker: Option<DeclarativeCircuitBreaker>,
48 pub unit_of_work: Option<camel_api::UnitOfWorkConfig>,
49 pub steps: Vec<DeclarativeStep>,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ToStepDef {
54 pub uri: String,
55}
56
57impl ToStepDef {
58 pub fn new(uri: impl Into<String>) -> Self {
59 Self { uri: uri.into() }
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum LogLevelDef {
65 Trace,
66 Debug,
67 Info,
68 Warn,
69 Error,
70}
71
72#[derive(Debug, Clone, PartialEq)]
75pub struct LogStepDef {
76 pub message: ValueSourceDef,
77 pub level: LogLevelDef,
78}
79
80impl LogStepDef {
81 pub fn info(message: impl Into<String>) -> Self {
82 Self {
83 message: ValueSourceDef::Literal(serde_json::Value::String(message.into())),
84 level: LogLevelDef::Info,
85 }
86 }
87}
88
89#[derive(Debug, Clone, PartialEq)]
90pub struct SetHeaderStepDef {
91 pub key: String,
92 pub value: ValueSourceDef,
93}
94
95impl SetHeaderStepDef {
96 pub fn literal(key: impl Into<String>, value: impl Into<String>) -> Self {
97 Self {
98 key: key.into(),
99 value: ValueSourceDef::Literal(serde_json::Value::String(value.into())),
100 }
101 }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub struct SetBodyStepDef {
106 pub value: ValueSourceDef,
107}
108
109#[derive(Debug, Clone, PartialEq)]
110pub struct FilterStepDef {
111 pub predicate: LanguageExpressionDef,
112 pub steps: Vec<DeclarativeStep>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct FunctionStepDef {
117 pub runtime: String,
118 pub source: String,
119 pub timeout_ms: Option<u64>,
120}
121
122#[derive(Debug, Clone, PartialEq)]
123pub struct WhenStepDef {
124 pub predicate: LanguageExpressionDef,
125 pub steps: Vec<DeclarativeStep>,
126}
127
128#[derive(Debug, Clone, PartialEq)]
129pub struct ChoiceStepDef {
130 pub whens: Vec<WhenStepDef>,
131 pub otherwise: Option<Vec<DeclarativeStep>>,
132}
133
134#[derive(Debug, Clone, PartialEq)]
135pub enum SplitExpressionDef {
136 BodyLines,
137 BodyJsonArray,
138 Language(LanguageExpressionDef),
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum SplitAggregationDef {
143 LastWins,
144 CollectAll,
145 Original,
146}
147
148#[derive(Debug, Clone, PartialEq)]
149pub struct SplitStepDef {
150 pub expression: SplitExpressionDef,
151 pub aggregation: SplitAggregationDef,
152 pub parallel: bool,
153 pub parallel_limit: Option<usize>,
154 pub stop_on_exception: bool,
155 pub steps: Vec<DeclarativeStep>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub enum AggregateStrategyDef {
160 CollectAll,
161}
162
163#[derive(Debug, Clone, PartialEq)]
164pub struct AggregateStepDef {
165 pub header: String,
166 pub correlation_key: Option<String>,
167 pub completion_size: Option<usize>,
168 pub completion_timeout_ms: Option<u64>,
169 pub completion_predicate: Option<LanguageExpressionDef>,
170 pub strategy: AggregateStrategyDef,
171 pub max_buckets: Option<usize>,
172 pub bucket_ttl_ms: Option<u64>,
173 pub force_completion_on_stop: Option<bool>,
174 pub discard_on_timeout: Option<bool>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct WireTapStepDef {
179 pub uri: String,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct BeanStepDef {
184 pub name: String,
185 pub method: String,
186}
187
188impl BeanStepDef {
189 pub fn new(name: impl Into<String>, method: impl Into<String>) -> Self {
190 Self {
191 name: name.into(),
192 method: method.into(),
193 }
194 }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Default)]
198pub enum ThrottleStrategyDef {
199 #[default]
200 Delay,
201 Reject,
202 Drop,
203}
204
205#[derive(Debug, Clone, PartialEq)]
206pub struct ThrottleStepDef {
207 pub max_requests: usize,
208 pub period_ms: u64,
209 pub strategy: ThrottleStrategyDef,
210 pub steps: Vec<DeclarativeStep>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq, Default)]
214pub enum LoadBalanceStrategyDef {
215 #[default]
216 RoundRobin,
217 Random,
218 Failover,
219 Weighted {
220 distribution_ratio: String,
221 },
222}
223
224#[derive(Debug, Clone, PartialEq)]
225pub struct LoadBalanceStepDef {
226 pub strategy: LoadBalanceStrategyDef,
227 pub parallel: bool,
228 pub steps: Vec<DeclarativeStep>,
229}
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct DynamicRouterStepDef {
233 pub expression: LanguageExpressionDef,
234 pub uri_delimiter: String,
235 pub cache_size: i32,
236 pub ignore_invalid_endpoints: bool,
237 pub max_iterations: usize,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct RoutingSlipStepDef {
242 pub expression: LanguageExpressionDef,
243 pub uri_delimiter: String,
244 pub cache_size: i32,
245 pub ignore_invalid_endpoints: bool,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct RecipientListStepDef {
250 pub expression: LanguageExpressionDef,
251 pub delimiter: String,
252 pub parallel: bool,
253 pub parallel_limit: Option<usize>,
254 pub stop_on_exception: bool,
255 pub aggregation: MulticastAggregationDef,
256}
257
258#[derive(Debug, Clone, PartialEq, Eq)]
259pub enum MulticastAggregationDef {
260 LastWins,
261 CollectAll,
262 Original,
263}
264
265#[derive(Debug, Clone, PartialEq)]
266pub struct MulticastStepDef {
267 pub steps: Vec<DeclarativeStep>,
268 pub parallel: bool,
269 pub parallel_limit: Option<usize>,
270 pub stop_on_exception: bool,
271 pub timeout_ms: Option<u64>,
272 pub aggregation: MulticastAggregationDef,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct ScriptStepDef {
277 pub expression: LanguageExpressionDef,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
281pub enum BodyTypeDef {
282 Text,
283 Json,
284 Bytes,
285 Xml,
286 Empty,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct DataFormatDef {
291 pub format: String,
292}
293
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub struct DelayStepDef {
296 pub delay_ms: u64,
297 pub dynamic_header: Option<String>,
298}
299
300#[derive(Debug, Clone, PartialEq)]
301pub struct LoopStepDef {
302 pub count: Option<usize>,
303 pub while_predicate: Option<LanguageExpressionDef>,
304 pub steps: Vec<DeclarativeStep>,
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
308pub struct StreamCacheStepDef {
309 pub threshold: Option<usize>,
310}
311
312#[derive(Debug, Clone, PartialEq)]
313pub enum DeclarativeStep {
314 To(ToStepDef),
315 SetHeader(SetHeaderStepDef),
316 SetBody(SetBodyStepDef),
317 ConvertBodyTo(BodyTypeDef),
318 DynamicRouter(DynamicRouterStepDef),
319 Filter(FilterStepDef),
320 Function(FunctionStepDef),
321 LoadBalance(LoadBalanceStepDef),
322 Log(LogStepDef),
323 Choice(ChoiceStepDef),
324 Split(SplitStepDef),
325 Aggregate(AggregateStepDef),
326 WireTap(WireTapStepDef),
327 Multicast(MulticastStepDef),
328 RoutingSlip(RoutingSlipStepDef),
329 RecipientList(RecipientListStepDef),
330 Stop,
331 Throttle(ThrottleStepDef),
332 Script(ScriptStepDef),
333 StreamCache(StreamCacheStepDef),
334 Marshal(DataFormatDef),
335 Unmarshal(DataFormatDef),
336 Bean(BeanStepDef),
337 Delay(DelayStepDef),
338 Loop(LoopStepDef),
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn to_step_def_new() {
347 let def = ToStepDef::new("direct:a");
348 assert_eq!(def.uri, "direct:a");
349 }
350
351 #[test]
352 fn log_step_def_info() {
353 let def = LogStepDef::info("hello");
354 assert_eq!(def.level, LogLevelDef::Info);
355 match def.message {
356 ValueSourceDef::Literal(v) => assert_eq!(v, serde_json::Value::String("hello".into())),
357 _ => panic!("expected literal"),
358 }
359 }
360
361 #[test]
362 fn set_header_literal() {
363 let def = SetHeaderStepDef::literal("key", "value");
364 assert_eq!(def.key, "key");
365 match def.value {
366 ValueSourceDef::Literal(v) => assert_eq!(v, serde_json::Value::String("value".into())),
367 _ => panic!("expected literal"),
368 }
369 }
370
371 #[test]
372 fn bean_step_def_new() {
373 let def = BeanStepDef::new("myBean", "process");
374 assert_eq!(def.name, "myBean");
375 assert_eq!(def.method, "process");
376 }
377
378 #[test]
379 fn throttle_strategy_default() {
380 assert_eq!(ThrottleStrategyDef::default(), ThrottleStrategyDef::Delay);
381 }
382
383 #[test]
384 fn load_balance_strategy_default() {
385 assert_eq!(
386 LoadBalanceStrategyDef::default(),
387 LoadBalanceStrategyDef::RoundRobin
388 );
389 }
390
391 #[test]
392 fn concurrency_variants_equality() {
393 assert_eq!(
394 DeclarativeConcurrency::Sequential,
395 DeclarativeConcurrency::Sequential
396 );
397 assert_ne!(
398 DeclarativeConcurrency::Sequential,
399 DeclarativeConcurrency::Concurrent { max: None }
400 );
401 }
402
403 #[test]
404 fn body_type_variants() {
405 assert_eq!(BodyTypeDef::Text, BodyTypeDef::Text);
406 assert_ne!(BodyTypeDef::Text, BodyTypeDef::Json);
407 }
408
409 #[test]
410 fn data_format_def() {
411 let def = DataFormatDef {
412 format: "protobuf".into(),
413 };
414 assert_eq!(def.format, "protobuf");
415 }
416
417 #[test]
418 fn stream_cache_step_def() {
419 let def = StreamCacheStepDef {
420 threshold: Some(1024),
421 };
422 assert_eq!(def.threshold, Some(1024));
423 }
424
425 #[test]
426 fn delay_step_def() {
427 let def = DelayStepDef {
428 delay_ms: 500,
429 dynamic_header: Some("X-Delay".into()),
430 };
431 assert_eq!(def.delay_ms, 500);
432 assert_eq!(def.dynamic_header.as_deref(), Some("X-Delay"));
433 }
434
435 #[test]
436 fn circuit_breaker_def() {
437 let cb = DeclarativeCircuitBreaker {
438 failure_threshold: 3,
439 open_duration_ms: 5000,
440 };
441 assert_eq!(cb.failure_threshold, 3);
442 assert_eq!(cb.open_duration_ms, 5000);
443 }
444}