Skip to main content

camel_dsl/
yaml.rs

1//! YAML route definition parser.
2
3use std::path::Path;
4
5use camel_api::{CamelError, CanonicalRouteSpec};
6use camel_core::route::RouteDefinition;
7
8use crate::compile::{compile_declarative_route, compile_declarative_route_to_canonical};
9use crate::contract::{DeclarativeStepKind, assert_contract_coverage};
10use crate::model::{
11    AggregateStepDef, AggregateStrategyDef, BodyTypeDef, ChoiceStepDef, DeclarativeCircuitBreaker,
12    DeclarativeConcurrency, DeclarativeErrorHandler, DeclarativeRedeliveryPolicy, DeclarativeRoute,
13    DeclarativeStep, LanguageExpressionDef, LogLevelDef, LogStepDef, MulticastAggregationDef,
14    MulticastStepDef, ScriptStepDef, SetBodyStepDef, SetHeaderStepDef, SplitAggregationDef,
15    SplitExpressionDef, SplitStepDef, ToStepDef, ValueSourceDef, WhenStepDef, WireTapStepDef,
16};
17pub use crate::yaml_ast::{
18    AggregateData, AggregateStep, ChoiceData, ChoiceStep, FilterStep, LogConfig, LogMessageData,
19    LogMessageExpr, LogStep, MulticastData, MulticastStep, PredicateBlock, ScriptData, ScriptStep,
20    SetBodyConfig, SetBodyData, SetBodyStep, SetHeaderData, SetHeaderStep, SplitData,
21    SplitExpressionConfig, SplitExpressionYaml, SplitStep, StopStep, ToStep, WireTapStep,
22    YamlRoute, YamlRoutes, YamlStep,
23};
24
25const YAML_IMPLEMENTED_MANDATORY_STEPS: [DeclarativeStepKind; 13] = [
26    DeclarativeStepKind::To,
27    DeclarativeStepKind::Log,
28    DeclarativeStepKind::SetHeader,
29    DeclarativeStepKind::SetBody,
30    DeclarativeStepKind::Filter,
31    DeclarativeStepKind::Choice,
32    DeclarativeStepKind::Split,
33    DeclarativeStepKind::Aggregate,
34    DeclarativeStepKind::WireTap,
35    DeclarativeStepKind::Multicast,
36    DeclarativeStepKind::Stop,
37    DeclarativeStepKind::Script,
38    DeclarativeStepKind::ConvertBodyTo,
39];
40
41const _: () = assert_contract_coverage(&YAML_IMPLEMENTED_MANDATORY_STEPS);
42
43pub fn parse_yaml_to_declarative(yaml: &str) -> Result<Vec<DeclarativeRoute>, CamelError> {
44    let routes: YamlRoutes = serde_yaml::from_str(yaml)
45        .map_err(|e| CamelError::RouteError(format!("YAML parse error: {e}")))?;
46
47    routes
48        .routes
49        .into_iter()
50        .map(yaml_route_to_declarative_route)
51        .collect()
52}
53
54pub fn parse_yaml(yaml: &str) -> Result<Vec<RouteDefinition>, CamelError> {
55    parse_yaml_to_declarative(yaml)?
56        .into_iter()
57        .map(compile_declarative_route)
58        .collect()
59}
60
61pub fn parse_yaml_to_canonical(yaml: &str) -> Result<Vec<CanonicalRouteSpec>, CamelError> {
62    parse_yaml_to_declarative(yaml)?
63        .into_iter()
64        .map(compile_declarative_route_to_canonical)
65        .collect()
66}
67
68fn yaml_route_to_declarative_route(route: YamlRoute) -> Result<DeclarativeRoute, CamelError> {
69    if route.id.is_empty() {
70        return Err(CamelError::RouteError(
71            "route 'id' must not be empty".into(),
72        ));
73    }
74
75    if route.sequential && route.concurrent.is_some() {
76        return Err(CamelError::RouteError(
77            "route cannot set both 'sequential' and 'concurrent'".into(),
78        ));
79    }
80
81    let concurrency = if route.sequential {
82        Some(DeclarativeConcurrency::Sequential)
83    } else {
84        route
85            .concurrent
86            .map(|max| DeclarativeConcurrency::Concurrent {
87                max: if max == 0 { None } else { Some(max) },
88            })
89    };
90
91    let error_handler = route.error_handler.map(|eh| DeclarativeErrorHandler {
92        dead_letter_channel: eh.dead_letter_channel,
93        retry: eh.retry.map(|retry| DeclarativeRedeliveryPolicy {
94            max_attempts: retry.max_attempts,
95            initial_delay_ms: retry.initial_delay_ms,
96            multiplier: retry.multiplier,
97            max_delay_ms: retry.max_delay_ms,
98            jitter_factor: retry.jitter_factor,
99            handled_by: retry.handled_by,
100        }),
101    });
102
103    let circuit_breaker = route.circuit_breaker.map(|cb| DeclarativeCircuitBreaker {
104        failure_threshold: cb.failure_threshold,
105        open_duration_ms: cb.open_duration_ms,
106    });
107
108    let steps = route
109        .steps
110        .into_iter()
111        .map(yaml_step_to_declarative_step)
112        .collect::<Result<Vec<_>, _>>()?;
113
114    Ok(DeclarativeRoute {
115        from: route.from,
116        route_id: route.id,
117        auto_startup: route.auto_startup,
118        startup_order: route.startup_order,
119        concurrency,
120        error_handler,
121        circuit_breaker,
122        steps,
123    })
124}
125
126fn yaml_step_to_declarative_step(step: YamlStep) -> Result<DeclarativeStep, CamelError> {
127    match step {
128        YamlStep::To(ToStep { to }) => Ok(DeclarativeStep::To(ToStepDef::new(to))),
129        YamlStep::WireTap(WireTapStep { wire_tap }) => {
130            Ok(DeclarativeStep::WireTap(WireTapStepDef { uri: wire_tap }))
131        }
132        YamlStep::Stop(StopStep { stop }) => {
133            if stop {
134                Ok(DeclarativeStep::Stop)
135            } else {
136                Err(CamelError::RouteError(
137                    "'stop: false' is invalid; remove the step or use 'stop: true'".into(),
138                ))
139            }
140        }
141        YamlStep::Log(LogStep { log }) => {
142            let (message_data, level) = match log {
143                crate::yaml_ast::LogBody::Message(message) => (
144                    crate::yaml_ast::LogMessageData::Literal(message),
145                    LogLevelDef::Info,
146                ),
147                crate::yaml_ast::LogBody::Config(config) => {
148                    let level = match config.level.as_deref().unwrap_or("info") {
149                        "trace" => LogLevelDef::Trace,
150                        "debug" => LogLevelDef::Debug,
151                        "info" => LogLevelDef::Info,
152                        "warn" => LogLevelDef::Warn,
153                        "error" => LogLevelDef::Error,
154                        other => {
155                            return Err(CamelError::RouteError(format!(
156                                "unsupported log level `{other}`"
157                            )));
158                        }
159                    };
160                    (config.message, level)
161                }
162            };
163            let message = match message_data {
164                // A bare string like `log: "Got ${body}"` is always evaluated as Simple Language,
165                // matching Apache Camel behaviour: the message field "uses simple language".
166                crate::yaml_ast::LogMessageData::Literal(s) => {
167                    ValueSourceDef::Expression(LanguageExpressionDef {
168                        language: "simple".to_string(),
169                        source: s,
170                    })
171                }
172                crate::yaml_ast::LogMessageData::Expr(expr) => parse_value_source(
173                    expr.value.map(serde_json::Value::String),
174                    expr.language,
175                    expr.source,
176                    expr.simple,
177                    expr.rhai,
178                    "log.message",
179                )?,
180            };
181            Ok(DeclarativeStep::Log(LogStepDef { message, level }))
182        }
183        YamlStep::SetHeader(SetHeaderStep { set_header }) => {
184            let value = parse_value_source(
185                set_header.value,
186                set_header.language,
187                set_header.source,
188                set_header.simple,
189                set_header.rhai,
190                "set_header",
191            )?;
192            Ok(DeclarativeStep::SetHeader(SetHeaderStepDef {
193                key: set_header.key,
194                value,
195            }))
196        }
197        YamlStep::SetBody(SetBodyStep { set_body }) => {
198            let value = match set_body {
199                SetBodyData::Literal(value) => ValueSourceDef::Literal(value),
200                SetBodyData::Config(SetBodyConfig {
201                    value,
202                    language,
203                    source,
204                    simple,
205                    rhai,
206                }) => parse_value_source(value, language, source, simple, rhai, "set_body")?,
207            };
208            Ok(DeclarativeStep::SetBody(SetBodyStepDef { value }))
209        }
210        YamlStep::Script(ScriptStep {
211            script: ScriptData { language, source },
212        }) => Ok(DeclarativeStep::Script(ScriptStepDef {
213            expression: LanguageExpressionDef { language, source },
214        })),
215        YamlStep::Filter(FilterStep { filter }) => {
216            let predicate = parse_predicate_block(&filter, "filter")?;
217            let steps = filter
218                .steps
219                .into_iter()
220                .map(yaml_step_to_declarative_step)
221                .collect::<Result<Vec<_>, _>>()?;
222            Ok(DeclarativeStep::Filter(crate::model::FilterStepDef {
223                predicate,
224                steps,
225            }))
226        }
227        YamlStep::Choice(ChoiceStep {
228            choice: ChoiceData { when, otherwise },
229        }) => {
230            let whens = when
231                .into_iter()
232                .map(|block| {
233                    let predicate = parse_predicate_block(&block, "choice.when")?;
234                    let steps = block
235                        .steps
236                        .into_iter()
237                        .map(yaml_step_to_declarative_step)
238                        .collect::<Result<Vec<_>, _>>()?;
239                    Ok(WhenStepDef { predicate, steps })
240                })
241                .collect::<Result<Vec<_>, CamelError>>()?;
242
243            let otherwise = match otherwise {
244                Some(steps) => Some(
245                    steps
246                        .into_iter()
247                        .map(yaml_step_to_declarative_step)
248                        .collect::<Result<Vec<_>, _>>()?,
249                ),
250                None => None,
251            };
252
253            Ok(DeclarativeStep::Choice(ChoiceStepDef { whens, otherwise }))
254        }
255        YamlStep::Split(SplitStep { split }) => {
256            let expression = match split.expression {
257                None => SplitExpressionDef::BodyLines,
258                Some(SplitExpressionYaml::Simple(s)) => match s.as_str() {
259                    "body_lines" | "lines" => SplitExpressionDef::BodyLines,
260                    "body_json_array" | "json_array" => SplitExpressionDef::BodyJsonArray,
261                    other => {
262                        return Err(CamelError::RouteError(format!(
263                            "unsupported split.expression `{other}`"
264                        )));
265                    }
266                },
267                Some(SplitExpressionYaml::Config(SplitExpressionConfig {
268                    language,
269                    source,
270                    simple,
271                    rhai,
272                })) => {
273                    let expr = parse_language_expression(
274                        language,
275                        source,
276                        simple,
277                        rhai,
278                        "split.expression",
279                    )?;
280                    SplitExpressionDef::Language(expr)
281                }
282            };
283
284            let aggregation = match split.aggregation.as_str() {
285                "last_wins" => SplitAggregationDef::LastWins,
286                "collect_all" => SplitAggregationDef::CollectAll,
287                "original" => SplitAggregationDef::Original,
288                other => {
289                    return Err(CamelError::RouteError(format!(
290                        "unsupported split.aggregation `{other}`"
291                    )));
292                }
293            };
294
295            let steps = split
296                .steps
297                .into_iter()
298                .map(yaml_step_to_declarative_step)
299                .collect::<Result<Vec<_>, _>>()?;
300
301            Ok(DeclarativeStep::Split(SplitStepDef {
302                expression,
303                aggregation,
304                parallel: split.parallel,
305                parallel_limit: split.parallel_limit,
306                stop_on_exception: split.stop_on_exception,
307                steps,
308            }))
309        }
310        YamlStep::Aggregate(AggregateStep { aggregate }) => {
311            let strategy = match aggregate.strategy.as_str() {
312                "collect_all" => AggregateStrategyDef::CollectAll,
313                other => {
314                    return Err(CamelError::RouteError(format!(
315                        "unsupported aggregate.strategy `{other}`"
316                    )));
317                }
318            };
319
320            let completion_predicate = aggregate
321                .completion_predicate
322                .map(|block| parse_predicate_block(&block, "aggregate.completion_predicate"))
323                .transpose()?;
324
325            Ok(DeclarativeStep::Aggregate(AggregateStepDef {
326                header: aggregate.header,
327                completion_size: aggregate.completion_size,
328                completion_timeout_ms: aggregate.completion_timeout_ms,
329                completion_predicate,
330                strategy,
331                max_buckets: aggregate.max_buckets,
332                bucket_ttl_ms: aggregate.bucket_ttl_ms,
333            }))
334        }
335        YamlStep::Multicast(MulticastStep { multicast }) => {
336            let aggregation = match multicast.aggregation.as_str() {
337                "last_wins" => MulticastAggregationDef::LastWins,
338                "collect_all" => MulticastAggregationDef::CollectAll,
339                "original" => MulticastAggregationDef::Original,
340                other => {
341                    return Err(CamelError::RouteError(format!(
342                        "unsupported multicast.aggregation `{other}`"
343                    )));
344                }
345            };
346
347            let steps = multicast
348                .steps
349                .into_iter()
350                .map(yaml_step_to_declarative_step)
351                .collect::<Result<Vec<_>, _>>()?;
352
353            Ok(DeclarativeStep::Multicast(MulticastStepDef {
354                steps,
355                parallel: multicast.parallel,
356                parallel_limit: multicast.parallel_limit,
357                stop_on_exception: multicast.stop_on_exception,
358                timeout_ms: multicast.timeout_ms,
359                aggregation,
360            }))
361        }
362        YamlStep::ConvertBodyTo(step) => {
363            let def = match step.convert_body_to.to_lowercase().as_str() {
364                "text" => BodyTypeDef::Text,
365                "json" => BodyTypeDef::Json,
366                "bytes" => BodyTypeDef::Bytes,
367                "xml" => BodyTypeDef::Xml,
368                "empty" => BodyTypeDef::Empty,
369                other => {
370                    return Err(CamelError::RouteError(format!(
371                        "unknown convert_body_to target: '{}'. Expected: text, json, bytes, xml, empty",
372                        other
373                    )));
374                }
375            };
376            Ok(DeclarativeStep::ConvertBodyTo(def))
377        }
378    }
379}
380
381fn parse_predicate_block(
382    block: &PredicateBlock,
383    context: &str,
384) -> Result<LanguageExpressionDef, CamelError> {
385    let mut selected = Vec::new();
386
387    if let (Some(language), Some(source)) = (block.language.as_ref(), block.source.as_ref()) {
388        selected.push((language.clone(), source.clone()));
389    } else if block.language.is_some() || block.source.is_some() {
390        return Err(CamelError::RouteError(format!(
391            "{context}: `language` and `source` must be set together"
392        )));
393    }
394
395    if let Some(source) = block.simple.as_ref() {
396        selected.push(("simple".to_string(), source.clone()));
397    }
398    if let Some(source) = block.rhai.as_ref() {
399        selected.push(("rhai".to_string(), source.clone()));
400    }
401
402    if selected.len() != 1 {
403        return Err(CamelError::RouteError(format!(
404            "{context} must define exactly one predicate source: `language+source`, `simple`, or `rhai`"
405        )));
406    }
407
408    let (language, source) = selected.remove(0);
409    Ok(LanguageExpressionDef { language, source })
410}
411
412fn parse_value_source(
413    literal: Option<serde_json::Value>,
414    language: Option<String>,
415    source: Option<String>,
416    simple: Option<String>,
417    rhai: Option<String>,
418    context: &str,
419) -> Result<ValueSourceDef, CamelError> {
420    let mut count = 0;
421    if literal.is_some() {
422        count += 1;
423    }
424    if language.is_some() || source.is_some() {
425        if language.is_some() && source.is_some() {
426            count += 1;
427        } else {
428            return Err(CamelError::RouteError(format!(
429                "{context}: `language` and `source` must be set together"
430            )));
431        }
432    }
433    if simple.is_some() {
434        count += 1;
435    }
436    if rhai.is_some() {
437        count += 1;
438    }
439
440    if count != 1 {
441        return Err(CamelError::RouteError(format!(
442            "{context} must define exactly one value source: `value`, `language+source`, `simple`, or `rhai`"
443        )));
444    }
445
446    if let Some(value) = literal {
447        return Ok(ValueSourceDef::Literal(value));
448    }
449    if let (Some(language), Some(source)) = (language, source) {
450        return Ok(ValueSourceDef::Expression(LanguageExpressionDef {
451            language,
452            source,
453        }));
454    }
455    if let Some(source) = simple {
456        return Ok(ValueSourceDef::Expression(LanguageExpressionDef {
457            language: "simple".to_string(),
458            source,
459        }));
460    }
461    if let Some(source) = rhai {
462        return Ok(ValueSourceDef::Expression(LanguageExpressionDef {
463            language: "rhai".to_string(),
464            source,
465        }));
466    }
467
468    Err(CamelError::RouteError(format!(
469        "{context}: missing value source"
470    )))
471}
472
473fn parse_language_expression(
474    language: Option<String>,
475    source: Option<String>,
476    simple: Option<String>,
477    rhai: Option<String>,
478    context: &str,
479) -> Result<LanguageExpressionDef, CamelError> {
480    let mut count = 0;
481    if language.is_some() || source.is_some() {
482        if language.is_some() && source.is_some() {
483            count += 1;
484        } else {
485            return Err(CamelError::RouteError(format!(
486                "{context}: `language` and `source` must be set together"
487            )));
488        }
489    }
490    if simple.is_some() {
491        count += 1;
492    }
493    if rhai.is_some() {
494        count += 1;
495    }
496
497    if count != 1 {
498        return Err(CamelError::RouteError(format!(
499            "{context} must define exactly one language source: `language+source`, `simple`, or `rhai`"
500        )));
501    }
502
503    if let (Some(language), Some(source)) = (language, source) {
504        return Ok(LanguageExpressionDef { language, source });
505    }
506    if let Some(source) = simple {
507        return Ok(LanguageExpressionDef {
508            language: "simple".to_string(),
509            source,
510        });
511    }
512    if let Some(source) = rhai {
513        return Ok(LanguageExpressionDef {
514            language: "rhai".to_string(),
515            source,
516        });
517    }
518
519    Err(CamelError::RouteError(format!(
520        "{context}: missing language source"
521    )))
522}
523
524pub fn load_from_file(path: &Path) -> Result<Vec<RouteDefinition>, CamelError> {
525    let content = std::fs::read_to_string(path)
526        .map_err(|e| CamelError::Io(format!("Failed to read {}: {e}", path.display())))?;
527    parse_yaml(&content)
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    #[test]
535    fn test_parse_valid_yaml() {
536        let yaml = r#"
537routes:
538  - id: "test-route"
539    from: "timer:tick?period=1000"
540    steps:
541      - set_header:
542          key: "source"
543          value: "timer"
544      - to: "log:info"
545"#;
546        let defs = parse_yaml(yaml).unwrap();
547        assert_eq!(defs.len(), 1);
548        assert_eq!(defs[0].route_id(), "test-route");
549        assert_eq!(defs[0].from_uri(), "timer:tick?period=1000");
550    }
551
552    #[test]
553    fn test_parse_missing_id_fails() {
554        let yaml = r#"
555routes:
556  - from: "timer:tick"
557    steps:
558      - to: "log:info"
559"#;
560        let result = parse_yaml(yaml);
561        assert!(result.is_err());
562    }
563
564    #[test]
565    fn test_parse_empty_id_fails() {
566        let yaml = r#"
567routes:
568  - id: ""
569    from: "timer:tick"
570"#;
571        let result = parse_yaml(yaml);
572        assert!(result.is_err());
573    }
574
575    #[test]
576    fn test_parse_multiple_routes() {
577        let yaml = r#"
578routes:
579  - id: "route-a"
580    from: "timer:tick"
581    steps:
582      - to: "log:info"
583  - id: "route-b"
584    from: "timer:tock"
585    auto_startup: false
586    startup_order: 10
587"#;
588        let defs = parse_yaml(yaml).unwrap();
589        assert_eq!(defs.len(), 2);
590        assert_eq!(defs[1].route_id(), "route-b");
591    }
592
593    #[test]
594    fn test_parse_defaults() {
595        let yaml = r#"
596routes:
597  - id: "default-route"
598    from: "timer:tick"
599"#;
600        let defs = parse_yaml(yaml).unwrap();
601        assert!(defs[0].auto_startup());
602        assert_eq!(defs[0].startup_order(), 1000);
603    }
604
605    #[test]
606    fn test_parse_yaml_to_declarative_preserves_route_metadata() {
607        let yaml = r#"
608routes:
609  - id: "declarative-route"
610    from: "timer:tick"
611    auto_startup: false
612    startup_order: 7
613    steps:
614      - log: "hello"
615      - to: "mock:out"
616"#;
617
618        let routes = parse_yaml_to_declarative(yaml).unwrap();
619        assert_eq!(routes.len(), 1);
620        assert_eq!(routes[0].route_id, "declarative-route");
621        assert_eq!(routes[0].from, "timer:tick");
622        assert!(!routes[0].auto_startup);
623        assert_eq!(routes[0].startup_order, 7);
624        assert_eq!(routes[0].steps.len(), 2);
625    }
626
627    #[test]
628    fn test_parse_yaml_to_canonical_supports_to_log_stop_subset() {
629        let yaml = r#"
630routes:
631  - id: "canonical-v1"
632    from: "direct:start"
633    steps:
634      - to: "mock:out"
635      - log:
636          message: "hello"
637      - stop: true
638"#;
639        let routes = parse_yaml_to_canonical(yaml).unwrap();
640        assert_eq!(routes.len(), 1);
641        assert_eq!(routes[0].route_id, "canonical-v1");
642        assert_eq!(routes[0].from, "direct:start");
643        assert_eq!(routes[0].version, 1);
644        assert_eq!(routes[0].steps.len(), 3);
645    }
646
647    #[test]
648    fn test_parse_yaml_to_canonical_supports_advanced_declarative_steps() {
649        let yaml = r#"
650routes:
651  - id: "canonical-v1-advanced"
652    from: "direct:start"
653    circuit_breaker:
654      failure_threshold: 4
655      open_duration_ms: 750
656    steps:
657      - filter:
658          simple: "${header.kind} == 'A'"
659          steps:
660            - to: "mock:filtered"
661      - choice:
662          when:
663            - simple: "${header.kind} == 'A'"
664              steps:
665                - to: "mock:a"
666          otherwise:
667            - to: "mock:other"
668      - split:
669          expression: body_lines
670          aggregation: collect_all
671          steps:
672            - to: "mock:split"
673      - aggregate:
674          header: "orderId"
675          completion_size: 2
676      - wire_tap: "mock:tap"
677      - script:
678          language: "simple"
679          source: "${body}"
680"#;
681
682        let routes = parse_yaml_to_canonical(yaml).unwrap();
683        assert_eq!(routes.len(), 1);
684        assert_eq!(routes[0].route_id, "canonical-v1-advanced");
685        assert_eq!(routes[0].steps.len(), 6);
686        let cb = routes[0]
687            .circuit_breaker
688            .as_ref()
689            .expect("circuit breaker should be present");
690        assert_eq!(cb.failure_threshold, 4);
691        assert_eq!(cb.open_duration_ms, 750);
692    }
693
694    #[test]
695    fn test_parse_yaml_to_canonical_rejects_unsupported_steps() {
696        let yaml = r#"
697routes:
698  - id: "canonical-v1-unsupported"
699    from: "direct:start"
700    steps:
701      - set_header:
702          key: "k"
703          value: "v"
704"#;
705        let err = parse_yaml_to_canonical(yaml).unwrap_err().to_string();
706        assert!(
707            err.contains("canonical v1 does not support step `set_header`"),
708            "unexpected error: {err}"
709        );
710        assert!(
711            err.contains("out-of-scope"),
712            "expected explicit canonical subset reason, got: {err}"
713        );
714    }
715
716    #[test]
717    fn test_parse_yaml_supports_all_declarative_step_kinds() {
718        let yaml = r#"
719routes:
720  - id: "all-steps"
721    from: "direct:start"
722    sequential: true
723    error_handler:
724      dead_letter_channel: "log:dlc"
725      retry:
726        max_attempts: 2
727        handled_by: "log:handled"
728    circuit_breaker:
729      failure_threshold: 3
730      open_duration_ms: 500
731    steps:
732      - log:
733          message: "hello"
734          level: "debug"
735      - set_header:
736          key: "kind"
737          value: "A"
738      - set_body:
739          value: "payload"
740      - filter:
741          simple: "${header.kind} == 'A'"
742          steps:
743            - to: "mock:filtered"
744      - choice:
745          when:
746            - simple: "${header.kind} == 'A'"
747              steps:
748                - to: "mock:a"
749          otherwise:
750            - to: "mock:other"
751      - split:
752          expression: body_lines
753          aggregation: collect_all
754          steps:
755            - to: "mock:split"
756      - aggregate:
757          header: "orderId"
758          completion_size: 2
759      - wire_tap: "mock:tap"
760      - multicast:
761          steps:
762            - to: "mock:left"
763            - to: "mock:right"
764      - script:
765          language: "simple"
766          source: "${body}"
767      - stop: true
768"#;
769
770        let routes = parse_yaml_to_declarative(yaml).unwrap();
771        assert_eq!(routes.len(), 1);
772        assert_eq!(routes[0].steps.len(), 11);
773
774        let defs = parse_yaml(yaml).unwrap();
775        assert_eq!(defs.len(), 1);
776        assert_eq!(defs[0].route_id(), "all-steps");
777    }
778
779    #[test]
780    fn test_load_from_file() {
781        use std::io::Write;
782        let temp_dir = std::env::temp_dir();
783        let file_path = temp_dir.join("test_routes.yaml");
784
785        let yaml_content = r#"
786routes:
787  - id: "file-route"
788    from: "timer:tick"
789    steps:
790      - to: "log:info"
791"#;
792
793        let mut file = std::fs::File::create(&file_path).unwrap();
794        file.write_all(yaml_content.as_bytes()).unwrap();
795
796        let defs = load_from_file(&file_path).unwrap();
797        assert_eq!(defs.len(), 1);
798        assert_eq!(defs[0].route_id(), "file-route");
799
800        std::fs::remove_file(&file_path).ok();
801    }
802
803    /// Verifies that `log:` with a Simple Language expression is parsed into a
804    /// `DeclarativeStep::Log` that carries a `ValueSourceDef::Expression`, not a bare String.
805    /// This test drives the requirement that `LogStepDef.message` becomes a `ValueSourceDef`.
806    #[test]
807    fn test_log_step_with_simple_expression_parses_as_expression() {
808        let yaml = r#"
809routes:
810  - id: "log-expr"
811    from: "timer:tick"
812    steps:
813      - log:
814          message:
815            simple: "${header.CamelTimerCounter} World"
816"#;
817        let routes = parse_yaml_to_declarative(yaml).unwrap();
818        assert_eq!(routes.len(), 1);
819        let step = &routes[0].steps[0];
820        match step {
821            DeclarativeStep::Log(def) => match &def.message {
822                ValueSourceDef::Expression(expr) => {
823                    assert_eq!(expr.language, "simple");
824                    assert_eq!(expr.source, "${header.CamelTimerCounter} World");
825                }
826                ValueSourceDef::Literal(_) => {
827                    panic!("expected Expression, got Literal")
828                }
829            },
830            _ => panic!("expected Log step, got {:?}", step),
831        }
832    }
833
834    #[test]
835    fn test_load_from_nonexistent_file() {
836        let result = load_from_file(Path::new("/nonexistent/path/routes.yaml"));
837        assert!(result.is_err());
838    }
839}