Skip to main content

camel_dsl/
model.rs

1pub use camel_api::{LanguageExpressionDef, ValueSourceDef};
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub enum DeclarativeConcurrency {
5    Sequential,
6    Concurrent { max: Option<usize> },
7}
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct DeclarativeCircuitBreaker {
11    pub failure_threshold: u32,
12    pub open_duration_ms: u64,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct DeclarativeRedeliveryPolicy {
17    pub max_attempts: u32,
18    pub initial_delay_ms: u64,
19    pub multiplier: f64,
20    pub max_delay_ms: u64,
21    pub jitter_factor: f64,
22    pub handled_by: Option<String>,
23}
24
25#[derive(Debug, Clone, PartialEq)]
26pub struct DeclarativeOnException {
27    pub kind: Option<String>,
28    pub message_contains: Option<String>,
29    pub retry: Option<DeclarativeRedeliveryPolicy>,
30}
31
32#[derive(Debug, Clone, PartialEq)]
33pub struct DeclarativeErrorHandler {
34    pub dead_letter_channel: Option<String>,
35    pub retry: Option<DeclarativeRedeliveryPolicy>,
36    pub on_exceptions: Option<Vec<DeclarativeOnException>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct DeclarativeRoute {
41    pub from: String,
42    pub route_id: String,
43    pub auto_startup: bool,
44    pub startup_order: i32,
45    pub concurrency: Option<DeclarativeConcurrency>,
46    pub error_handler: Option<DeclarativeErrorHandler>,
47    pub circuit_breaker: Option<DeclarativeCircuitBreaker>,
48    pub unit_of_work: Option<camel_api::UnitOfWorkConfig>,
49    pub steps: Vec<DeclarativeStep>,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ToStepDef {
54    pub uri: String,
55}
56
57impl ToStepDef {
58    pub fn new(uri: impl Into<String>) -> Self {
59        Self { uri: uri.into() }
60    }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum LogLevelDef {
65    Trace,
66    Debug,
67    Info,
68    Warn,
69    Error,
70}
71
72// Note: `Eq` is not derived because `ValueSourceDef` contains `serde_json::Value`
73// which does not implement `Eq` (due to floating-point fields).
74#[derive(Debug, Clone, PartialEq)]
75pub struct LogStepDef {
76    pub message: ValueSourceDef,
77    pub level: LogLevelDef,
78}
79
80impl LogStepDef {
81    pub fn info(message: impl Into<String>) -> Self {
82        Self {
83            message: ValueSourceDef::Literal(serde_json::Value::String(message.into())),
84            level: LogLevelDef::Info,
85        }
86    }
87}
88
89#[derive(Debug, Clone, PartialEq)]
90pub struct SetHeaderStepDef {
91    pub key: String,
92    pub value: ValueSourceDef,
93}
94
95impl SetHeaderStepDef {
96    pub fn literal(key: impl Into<String>, value: impl Into<String>) -> Self {
97        Self {
98            key: key.into(),
99            value: ValueSourceDef::Literal(serde_json::Value::String(value.into())),
100        }
101    }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub struct SetBodyStepDef {
106    pub value: ValueSourceDef,
107}
108
109#[derive(Debug, Clone, PartialEq)]
110pub struct FilterStepDef {
111    pub predicate: LanguageExpressionDef,
112    pub steps: Vec<DeclarativeStep>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct FunctionStepDef {
117    pub runtime: String,
118    pub source: String,
119    pub timeout_ms: Option<u64>,
120}
121
122#[derive(Debug, Clone, PartialEq)]
123pub struct WhenStepDef {
124    pub predicate: LanguageExpressionDef,
125    pub steps: Vec<DeclarativeStep>,
126}
127
128#[derive(Debug, Clone, PartialEq)]
129pub struct ChoiceStepDef {
130    pub whens: Vec<WhenStepDef>,
131    pub otherwise: Option<Vec<DeclarativeStep>>,
132}
133
134#[derive(Debug, Clone, PartialEq)]
135pub enum SplitExpressionDef {
136    BodyLines,
137    BodyJsonArray,
138    Language(LanguageExpressionDef),
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum SplitAggregationDef {
143    LastWins,
144    CollectAll,
145    Original,
146}
147
148#[derive(Debug, Clone, PartialEq)]
149pub struct SplitStepDef {
150    pub expression: SplitExpressionDef,
151    pub aggregation: SplitAggregationDef,
152    pub parallel: bool,
153    pub parallel_limit: Option<usize>,
154    pub stop_on_exception: bool,
155    pub steps: Vec<DeclarativeStep>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub enum AggregateStrategyDef {
160    CollectAll,
161}
162
163#[derive(Debug, Clone, PartialEq)]
164pub struct AggregateStepDef {
165    pub header: String,
166    pub correlation_key: Option<String>,
167    pub completion_size: Option<usize>,
168    pub completion_timeout_ms: Option<u64>,
169    pub completion_predicate: Option<LanguageExpressionDef>,
170    pub strategy: AggregateStrategyDef,
171    pub max_buckets: Option<usize>,
172    pub bucket_ttl_ms: Option<u64>,
173    pub force_completion_on_stop: Option<bool>,
174    pub discard_on_timeout: Option<bool>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct WireTapStepDef {
179    pub uri: String,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct BeanStepDef {
184    pub name: String,
185    pub method: String,
186}
187
188impl BeanStepDef {
189    pub fn new(name: impl Into<String>, method: impl Into<String>) -> Self {
190        Self {
191            name: name.into(),
192            method: method.into(),
193        }
194    }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Default)]
198pub enum ThrottleStrategyDef {
199    #[default]
200    Delay,
201    Reject,
202    Drop,
203}
204
205#[derive(Debug, Clone, PartialEq)]
206pub struct ThrottleStepDef {
207    pub max_requests: usize,
208    pub period_ms: u64,
209    pub strategy: ThrottleStrategyDef,
210    pub steps: Vec<DeclarativeStep>,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq, Default)]
214pub enum LoadBalanceStrategyDef {
215    #[default]
216    RoundRobin,
217    Random,
218    Failover,
219    Weighted {
220        distribution_ratio: String,
221    },
222}
223
224#[derive(Debug, Clone, PartialEq)]
225pub struct LoadBalanceStepDef {
226    pub strategy: LoadBalanceStrategyDef,
227    pub parallel: bool,
228    pub steps: Vec<DeclarativeStep>,
229}
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct DynamicRouterStepDef {
233    pub expression: LanguageExpressionDef,
234    pub uri_delimiter: String,
235    pub cache_size: i32,
236    pub ignore_invalid_endpoints: bool,
237    pub max_iterations: usize,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct RoutingSlipStepDef {
242    pub expression: LanguageExpressionDef,
243    pub uri_delimiter: String,
244    pub cache_size: i32,
245    pub ignore_invalid_endpoints: bool,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct RecipientListStepDef {
250    pub expression: LanguageExpressionDef,
251    pub delimiter: String,
252    pub parallel: bool,
253    pub parallel_limit: Option<usize>,
254    pub stop_on_exception: bool,
255    pub aggregation: MulticastAggregationDef,
256}
257
258#[derive(Debug, Clone, PartialEq, Eq)]
259pub enum MulticastAggregationDef {
260    LastWins,
261    CollectAll,
262    Original,
263}
264
265#[derive(Debug, Clone, PartialEq)]
266pub struct MulticastStepDef {
267    pub steps: Vec<DeclarativeStep>,
268    pub parallel: bool,
269    pub parallel_limit: Option<usize>,
270    pub stop_on_exception: bool,
271    pub timeout_ms: Option<u64>,
272    pub aggregation: MulticastAggregationDef,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct ScriptStepDef {
277    pub expression: LanguageExpressionDef,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
281pub enum BodyTypeDef {
282    Text,
283    Json,
284    Bytes,
285    Xml,
286    Empty,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct DataFormatDef {
291    pub format: String,
292}
293
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub struct DelayStepDef {
296    pub delay_ms: u64,
297    pub dynamic_header: Option<String>,
298}
299
300#[derive(Debug, Clone, PartialEq)]
301pub struct LoopStepDef {
302    pub count: Option<usize>,
303    pub while_predicate: Option<LanguageExpressionDef>,
304    pub steps: Vec<DeclarativeStep>,
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
308pub struct StreamCacheStepDef {
309    pub threshold: Option<usize>,
310}
311
312#[derive(Debug, Clone, PartialEq)]
313pub enum DeclarativeStep {
314    To(ToStepDef),
315    SetHeader(SetHeaderStepDef),
316    SetBody(SetBodyStepDef),
317    ConvertBodyTo(BodyTypeDef),
318    DynamicRouter(DynamicRouterStepDef),
319    Filter(FilterStepDef),
320    Function(FunctionStepDef),
321    LoadBalance(LoadBalanceStepDef),
322    Log(LogStepDef),
323    Choice(ChoiceStepDef),
324    Split(SplitStepDef),
325    Aggregate(AggregateStepDef),
326    WireTap(WireTapStepDef),
327    Multicast(MulticastStepDef),
328    RoutingSlip(RoutingSlipStepDef),
329    RecipientList(RecipientListStepDef),
330    Stop,
331    Throttle(ThrottleStepDef),
332    Script(ScriptStepDef),
333    StreamCache(StreamCacheStepDef),
334    Marshal(DataFormatDef),
335    Unmarshal(DataFormatDef),
336    Bean(BeanStepDef),
337    Delay(DelayStepDef),
338    Loop(LoopStepDef),
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn to_step_def_new() {
347        let def = ToStepDef::new("direct:a");
348        assert_eq!(def.uri, "direct:a");
349    }
350
351    #[test]
352    fn log_step_def_info() {
353        let def = LogStepDef::info("hello");
354        assert_eq!(def.level, LogLevelDef::Info);
355        match def.message {
356            ValueSourceDef::Literal(v) => assert_eq!(v, serde_json::Value::String("hello".into())),
357            _ => panic!("expected literal"),
358        }
359    }
360
361    #[test]
362    fn set_header_literal() {
363        let def = SetHeaderStepDef::literal("key", "value");
364        assert_eq!(def.key, "key");
365        match def.value {
366            ValueSourceDef::Literal(v) => assert_eq!(v, serde_json::Value::String("value".into())),
367            _ => panic!("expected literal"),
368        }
369    }
370
371    #[test]
372    fn bean_step_def_new() {
373        let def = BeanStepDef::new("myBean", "process");
374        assert_eq!(def.name, "myBean");
375        assert_eq!(def.method, "process");
376    }
377
378    #[test]
379    fn throttle_strategy_default() {
380        assert_eq!(ThrottleStrategyDef::default(), ThrottleStrategyDef::Delay);
381    }
382
383    #[test]
384    fn load_balance_strategy_default() {
385        assert_eq!(
386            LoadBalanceStrategyDef::default(),
387            LoadBalanceStrategyDef::RoundRobin
388        );
389    }
390
391    #[test]
392    fn concurrency_variants_equality() {
393        assert_eq!(
394            DeclarativeConcurrency::Sequential,
395            DeclarativeConcurrency::Sequential
396        );
397        assert_ne!(
398            DeclarativeConcurrency::Sequential,
399            DeclarativeConcurrency::Concurrent { max: None }
400        );
401    }
402
403    #[test]
404    fn body_type_variants() {
405        assert_eq!(BodyTypeDef::Text, BodyTypeDef::Text);
406        assert_ne!(BodyTypeDef::Text, BodyTypeDef::Json);
407    }
408
409    #[test]
410    fn data_format_def() {
411        let def = DataFormatDef {
412            format: "protobuf".into(),
413        };
414        assert_eq!(def.format, "protobuf");
415    }
416
417    #[test]
418    fn stream_cache_step_def() {
419        let def = StreamCacheStepDef {
420            threshold: Some(1024),
421        };
422        assert_eq!(def.threshold, Some(1024));
423    }
424
425    #[test]
426    fn delay_step_def() {
427        let def = DelayStepDef {
428            delay_ms: 500,
429            dynamic_header: Some("X-Delay".into()),
430        };
431        assert_eq!(def.delay_ms, 500);
432        assert_eq!(def.dynamic_header.as_deref(), Some("X-Delay"));
433    }
434
435    #[test]
436    fn circuit_breaker_def() {
437        let cb = DeclarativeCircuitBreaker {
438            failure_threshold: 3,
439            open_duration_ms: 5000,
440        };
441        assert_eq!(cb.failure_threshold, 3);
442        assert_eq!(cb.open_duration_ms, 5000);
443    }
444}