Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19    "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22    &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24    "set_header",
25    "set_body",
26    "multicast",
27    "convert_body_to",
28    "bean",
29    "marshal",
30    "unmarshal",
31];
32pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
33    "processor",
34    "process",
35    "process_fn",
36    "map_body",
37    "set_body_fn",
38    "set_header_fn",
39];
40
41pub fn canonical_contract_supports_step(step: &str) -> bool {
42    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
43}
44
45pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
46    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
47        return Some(
48            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
49        );
50    }
51
52    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
53        return Some("rust-only programmable step; not representable in canonical v1 contract");
54    }
55
56    if canonical_contract_supports_step(step)
57        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
58    {
59        return Some(
60            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
61        );
62    }
63
64    None
65}
66
67#[derive(
68    Debug,
69    Clone,
70    PartialEq,
71    Eq,
72    serde::Serialize,
73    serde::Deserialize,
74    schemars::JsonSchema,
75    ts_rs::TS,
76)]
77#[serde(rename_all = "snake_case")]
78#[ts(rename_all = "snake_case")]
79pub struct CanonicalRouteSpec {
80    /// Stable minimal route representation for runtime command registration.
81    ///
82    /// Scope note (v1):
83    /// - This is intentionally a partial model and does not mirror every `BuilderStep`.
84    /// - Not included in v1: auto_startup, startup_order, concurrency, error_handler,
85    ///   unit_of_work. These are set to defaults when compiling from canonical.
86    /// - Round-trip (YAML → Canonical → YAML) loses these fields.
87    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
88    pub route_id: String,
89    pub from: String,
90    pub steps: Vec<CanonicalStepSpec>,
91    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
92    pub version: u32,
93}
94
95#[derive(
96    Debug,
97    Clone,
98    PartialEq,
99    Eq,
100    serde::Serialize,
101    serde::Deserialize,
102    schemars::JsonSchema,
103    ts_rs::TS,
104)]
105#[serde(tag = "step", content = "config", rename_all = "snake_case")]
106#[ts(rename_all = "snake_case")]
107pub enum CanonicalStepSpec {
108    To {
109        uri: String,
110    },
111    Log {
112        message: String,
113    },
114    WireTap {
115        uri: String,
116    },
117    Script {
118        expression: LanguageExpressionDef,
119    },
120    Filter {
121        predicate: LanguageExpressionDef,
122        steps: Vec<CanonicalStepSpec>,
123    },
124    Choice {
125        whens: Vec<CanonicalWhenSpec>,
126        otherwise: Option<Vec<CanonicalStepSpec>>,
127    },
128    Split {
129        expression: CanonicalSplitExpressionSpec,
130        aggregation: CanonicalSplitAggregationSpec,
131        parallel: bool,
132        parallel_limit: Option<usize>,
133        stop_on_exception: bool,
134        steps: Vec<CanonicalStepSpec>,
135    },
136    Aggregate(CanonicalAggregateSpec),
137    Stop,
138    Delay {
139        #[ts(type = "number")]
140        delay_ms: u64,
141        dynamic_header: Option<String>,
142    },
143}
144
145#[derive(
146    Debug,
147    Clone,
148    PartialEq,
149    Eq,
150    serde::Serialize,
151    serde::Deserialize,
152    schemars::JsonSchema,
153    ts_rs::TS,
154)]
155#[serde(rename_all = "snake_case")]
156#[ts(rename_all = "snake_case")]
157pub struct CanonicalWhenSpec {
158    pub predicate: LanguageExpressionDef,
159    pub steps: Vec<CanonicalStepSpec>,
160}
161
162#[derive(
163    Debug,
164    Clone,
165    PartialEq,
166    Eq,
167    serde::Serialize,
168    serde::Deserialize,
169    schemars::JsonSchema,
170    ts_rs::TS,
171)]
172#[serde(rename_all = "snake_case")]
173#[ts(rename_all = "snake_case")]
174pub enum CanonicalSplitExpressionSpec {
175    BodyLines,
176    BodyJsonArray,
177    Language(LanguageExpressionDef),
178}
179
180#[derive(
181    Debug,
182    Clone,
183    PartialEq,
184    Eq,
185    serde::Serialize,
186    serde::Deserialize,
187    schemars::JsonSchema,
188    ts_rs::TS,
189)]
190#[serde(rename_all = "snake_case")]
191#[ts(rename_all = "snake_case")]
192pub enum CanonicalSplitAggregationSpec {
193    LastWins,
194    CollectAll,
195    Original,
196}
197
198#[derive(
199    Debug,
200    Clone,
201    PartialEq,
202    Eq,
203    serde::Serialize,
204    serde::Deserialize,
205    schemars::JsonSchema,
206    ts_rs::TS,
207)]
208#[serde(rename_all = "snake_case")]
209#[ts(rename_all = "snake_case")]
210pub enum CanonicalAggregateStrategySpec {
211    CollectAll,
212}
213
214#[derive(
215    Debug,
216    Clone,
217    PartialEq,
218    Eq,
219    serde::Serialize,
220    serde::Deserialize,
221    schemars::JsonSchema,
222    ts_rs::TS,
223)]
224#[serde(rename_all = "snake_case")]
225#[ts(rename_all = "snake_case")]
226pub struct CanonicalAggregateSpec {
227    pub header: String,
228    pub completion_size: Option<usize>,
229    #[ts(type = "number")]
230    pub completion_timeout_ms: Option<u64>,
231    pub correlation_key: Option<String>,
232    pub force_completion_on_stop: Option<bool>,
233    pub discard_on_timeout: Option<bool>,
234    pub strategy: CanonicalAggregateStrategySpec,
235    pub max_buckets: Option<usize>,
236    #[ts(type = "number")]
237    pub bucket_ttl_ms: Option<u64>,
238}
239
240#[derive(
241    Debug,
242    Clone,
243    PartialEq,
244    Eq,
245    serde::Serialize,
246    serde::Deserialize,
247    schemars::JsonSchema,
248    ts_rs::TS,
249)]
250#[serde(rename_all = "snake_case")]
251#[ts(rename_all = "snake_case")]
252pub struct CanonicalCircuitBreakerSpec {
253    pub failure_threshold: u32,
254    #[ts(type = "number")]
255    pub open_duration_ms: u64,
256}
257
258impl CanonicalRouteSpec {
259    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
260        Self {
261            route_id: route_id.into(),
262            from: from.into(),
263            steps: Vec::new(),
264            circuit_breaker: None,
265            version: CANONICAL_CONTRACT_VERSION,
266        }
267    }
268
269    pub fn validate_contract(&self) -> Result<(), CamelError> {
270        if self.route_id.trim().is_empty() {
271            return Err(CamelError::RouteError(
272                "canonical contract violation: route_id cannot be empty".to_string(),
273            ));
274        }
275        if self.from.trim().is_empty() {
276            return Err(CamelError::RouteError(
277                "canonical contract violation: from cannot be empty".to_string(),
278            ));
279        }
280        if self.version != CANONICAL_CONTRACT_VERSION {
281            return Err(CamelError::RouteError(format!(
282                "canonical contract violation: expected version {}, got {}",
283                CANONICAL_CONTRACT_VERSION, self.version
284            )));
285        }
286        validate_steps(&self.steps)?;
287        if let Some(cb) = &self.circuit_breaker {
288            if cb.failure_threshold == 0 {
289                return Err(CamelError::RouteError(
290                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
291                        .to_string(),
292                ));
293            }
294            if cb.open_duration_ms == 0 {
295                return Err(CamelError::RouteError(
296                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
297                        .to_string(),
298                ));
299            }
300        }
301        Ok(())
302    }
303}
304
305fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
306    for step in steps {
307        match step {
308            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
309                if uri.trim().is_empty() {
310                    return Err(CamelError::RouteError(
311                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
312                    ));
313                }
314            }
315            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
316            CanonicalStepSpec::Choice { whens, otherwise } => {
317                for when in whens {
318                    validate_steps(&when.steps)?;
319                }
320                if let Some(otherwise) = otherwise {
321                    validate_steps(otherwise)?;
322                }
323            }
324            CanonicalStepSpec::Split {
325                parallel_limit,
326                steps,
327                ..
328            } => {
329                if let Some(limit) = parallel_limit
330                    && *limit == 0
331                {
332                    return Err(CamelError::RouteError(
333                        "canonical contract violation: split.parallel_limit must be > 0"
334                            .to_string(),
335                    ));
336                }
337                validate_steps(steps)?;
338            }
339            CanonicalStepSpec::Aggregate(config) => {
340                if config.header.trim().is_empty() {
341                    return Err(CamelError::RouteError(
342                        "canonical contract violation: aggregate.header cannot be empty"
343                            .to_string(),
344                    ));
345                }
346                if let Some(size) = config.completion_size
347                    && size == 0
348                {
349                    return Err(CamelError::RouteError(
350                        "canonical contract violation: aggregate.completion_size must be > 0"
351                            .to_string(),
352                    ));
353                }
354            }
355            CanonicalStepSpec::Log { .. }
356            | CanonicalStepSpec::Script { .. }
357            | CanonicalStepSpec::Stop
358            | CanonicalStepSpec::Delay { .. } => {}
359        }
360    }
361    Ok(())
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub enum RuntimeCommand {
366    RegisterRoute {
367        spec: CanonicalRouteSpec,
368        command_id: String,
369        causation_id: Option<String>,
370    },
371    StartRoute {
372        route_id: String,
373        command_id: String,
374        causation_id: Option<String>,
375    },
376    StopRoute {
377        route_id: String,
378        command_id: String,
379        causation_id: Option<String>,
380    },
381    SuspendRoute {
382        route_id: String,
383        command_id: String,
384        causation_id: Option<String>,
385    },
386    ResumeRoute {
387        route_id: String,
388        command_id: String,
389        causation_id: Option<String>,
390    },
391    ReloadRoute {
392        route_id: String,
393        command_id: String,
394        causation_id: Option<String>,
395    },
396    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
397    ///
398    /// This keeps aggregate/projection state aligned with controller-observed failures.
399    FailRoute {
400        route_id: String,
401        error: String,
402        command_id: String,
403        causation_id: Option<String>,
404    },
405    RemoveRoute {
406        route_id: String,
407        command_id: String,
408        causation_id: Option<String>,
409    },
410}
411
412impl RuntimeCommand {
413    pub fn command_id(&self) -> &str {
414        match self {
415            RuntimeCommand::RegisterRoute { command_id, .. }
416            | RuntimeCommand::StartRoute { command_id, .. }
417            | RuntimeCommand::StopRoute { command_id, .. }
418            | RuntimeCommand::SuspendRoute { command_id, .. }
419            | RuntimeCommand::ResumeRoute { command_id, .. }
420            | RuntimeCommand::ReloadRoute { command_id, .. }
421            | RuntimeCommand::FailRoute { command_id, .. }
422            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
423        }
424    }
425
426    pub fn causation_id(&self) -> Option<&str> {
427        match self {
428            RuntimeCommand::RegisterRoute { causation_id, .. }
429            | RuntimeCommand::StartRoute { causation_id, .. }
430            | RuntimeCommand::StopRoute { causation_id, .. }
431            | RuntimeCommand::SuspendRoute { causation_id, .. }
432            | RuntimeCommand::ResumeRoute { causation_id, .. }
433            | RuntimeCommand::ReloadRoute { causation_id, .. }
434            | RuntimeCommand::FailRoute { causation_id, .. }
435            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
436        }
437    }
438}
439
440#[derive(Debug, Clone, PartialEq, Eq)]
441pub enum RuntimeCommandResult {
442    Accepted,
443    Duplicate { command_id: String },
444    RouteRegistered { route_id: String },
445    RouteStateChanged { route_id: String, status: String },
446}
447
448#[derive(Debug, Clone, PartialEq, Eq)]
449pub enum RuntimeQuery {
450    GetRouteStatus {
451        route_id: String,
452    },
453    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
454    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
455    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
456    InFlightCount {
457        route_id: String,
458    },
459    ListRoutes,
460}
461
462#[derive(Debug, Clone, PartialEq, Eq)]
463pub enum RuntimeQueryResult {
464    InFlightCount { route_id: String, count: u64 },
465    RouteNotFound { route_id: String },
466    RouteStatus { route_id: String, status: String },
467    Routes { route_ids: Vec<String> },
468}
469
470#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
471pub enum RuntimeEvent {
472    RouteRegistered { route_id: String },
473    RouteStartRequested { route_id: String },
474    RouteStarted { route_id: String },
475    RouteFailed { route_id: String, error: String },
476    RouteStopped { route_id: String },
477    RouteSuspended { route_id: String },
478    RouteResumed { route_id: String },
479    RouteReloaded { route_id: String },
480    RouteRemoved { route_id: String },
481}
482
483#[async_trait]
484pub trait RuntimeCommandBus: Send + Sync {
485    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
486}
487
488#[async_trait]
489pub trait RuntimeQueryBus: Send + Sync {
490    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
491}
492
493pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
494
495impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    #[test]
502    fn command_and_query_ids_are_exposed() {
503        let cmd = RuntimeCommand::StartRoute {
504            route_id: "r1".into(),
505            command_id: "c1".into(),
506            causation_id: None,
507        };
508        assert_eq!(cmd.command_id(), "c1");
509    }
510
511    #[test]
512    fn canonical_spec_requires_route_id_and_from() {
513        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
514        assert_eq!(spec.route_id, "r1");
515        assert_eq!(spec.from, "timer:tick");
516        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
517        assert!(spec.steps.is_empty());
518        assert!(spec.circuit_breaker.is_none());
519    }
520
521    #[test]
522    fn canonical_contract_rejects_invalid_version() {
523        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
524        spec.version = 2;
525        let err = spec.validate_contract().unwrap_err().to_string();
526        assert!(err.contains("expected version"));
527    }
528
529    #[test]
530    fn canonical_contract_declares_subset_scope() {
531        assert!(canonical_contract_supports_step("to"));
532        assert!(canonical_contract_supports_step("split"));
533        assert!(!canonical_contract_supports_step("set_header"));
534
535        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
536        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
537        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
538    }
539
540    #[test]
541    fn canonical_contract_rejection_reason_is_explicit() {
542        let set_header_reason = canonical_contract_rejection_reason("set_header")
543            .expect("set_header should have explicit reason");
544        assert!(set_header_reason.contains("out-of-scope"));
545
546        let processor_reason = canonical_contract_rejection_reason("processor")
547            .expect("processor should be rust-only");
548        assert!(processor_reason.contains("rust-only"));
549
550        let split_reason = canonical_contract_rejection_reason("split")
551            .expect("split should require declarative form");
552        assert!(split_reason.contains("declarative"));
553    }
554
555    #[test]
556    fn command_causation_id_is_exposed() {
557        let cmd = RuntimeCommand::StopRoute {
558            route_id: "r1".into(),
559            command_id: "c2".into(),
560            causation_id: Some("c1".into()),
561        };
562        assert_eq!(cmd.command_id(), "c2");
563        assert_eq!(cmd.causation_id(), Some("c1"));
564    }
565
566    #[test]
567    fn canonical_contract_rejects_empty_route_id_and_from() {
568        let spec = CanonicalRouteSpec::new("   ", "timer:tick");
569        let err = spec.validate_contract().unwrap_err().to_string();
570        assert!(err.contains("route_id cannot be empty"));
571
572        let spec = CanonicalRouteSpec::new("r1", "  ");
573        let err = spec.validate_contract().unwrap_err().to_string();
574        assert!(err.contains("from cannot be empty"));
575    }
576
577    #[test]
578    fn canonical_contract_rejects_invalid_nested_steps() {
579        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
580        spec.steps = vec![CanonicalStepSpec::Split {
581            expression: CanonicalSplitExpressionSpec::BodyLines,
582            aggregation: CanonicalSplitAggregationSpec::CollectAll,
583            parallel: true,
584            parallel_limit: Some(0),
585            stop_on_exception: false,
586            steps: vec![CanonicalStepSpec::To {
587                uri: "log:ok".to_string(),
588            }],
589        }];
590        let err = spec.validate_contract().unwrap_err().to_string();
591        assert!(err.contains("split.parallel_limit must be > 0"));
592
593        spec.steps = vec![CanonicalStepSpec::To {
594            uri: "   ".to_string(),
595        }];
596        let err = spec.validate_contract().unwrap_err().to_string();
597        assert!(err.contains("endpoint uri cannot be empty"));
598    }
599
600    #[test]
601    fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
602        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
603        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
604            header: " ".to_string(),
605            completion_size: Some(1),
606            completion_timeout_ms: None,
607            correlation_key: None,
608            force_completion_on_stop: None,
609            discard_on_timeout: None,
610            strategy: CanonicalAggregateStrategySpec::CollectAll,
611            max_buckets: None,
612            bucket_ttl_ms: None,
613        })];
614        let err = spec.validate_contract().unwrap_err().to_string();
615        assert!(err.contains("aggregate.header cannot be empty"));
616
617        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
618            header: "k".to_string(),
619            completion_size: Some(0),
620            completion_timeout_ms: None,
621            correlation_key: None,
622            force_completion_on_stop: None,
623            discard_on_timeout: None,
624            strategy: CanonicalAggregateStrategySpec::CollectAll,
625            max_buckets: None,
626            bucket_ttl_ms: None,
627        })];
628        let err = spec.validate_contract().unwrap_err().to_string();
629        assert!(err.contains("aggregate.completion_size must be > 0"));
630
631        spec.steps = vec![];
632        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
633            failure_threshold: 0,
634            open_duration_ms: 10,
635        });
636        let err = spec.validate_contract().unwrap_err().to_string();
637        assert!(err.contains("failure_threshold must be > 0"));
638
639        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
640            failure_threshold: 1,
641            open_duration_ms: 0,
642        });
643        let err = spec.validate_contract().unwrap_err().to_string();
644        assert!(err.contains("open_duration_ms must be > 0"));
645    }
646
647    #[test]
648    fn canonical_contract_rejection_reason_none_for_regular_steps() {
649        assert!(canonical_contract_rejection_reason("to").is_none());
650        assert!(canonical_contract_rejection_reason("unknown-step").is_none());
651    }
652
653    #[test]
654    fn command_helpers_cover_all_variants() {
655        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
656        let cmds = [
657            RuntimeCommand::RegisterRoute {
658                spec,
659                command_id: "c1".into(),
660                causation_id: Some("root".into()),
661            },
662            RuntimeCommand::StartRoute {
663                route_id: "r1".into(),
664                command_id: "c2".into(),
665                causation_id: None,
666            },
667            RuntimeCommand::StopRoute {
668                route_id: "r1".into(),
669                command_id: "c3".into(),
670                causation_id: None,
671            },
672            RuntimeCommand::SuspendRoute {
673                route_id: "r1".into(),
674                command_id: "c4".into(),
675                causation_id: None,
676            },
677            RuntimeCommand::ResumeRoute {
678                route_id: "r1".into(),
679                command_id: "c5".into(),
680                causation_id: None,
681            },
682            RuntimeCommand::ReloadRoute {
683                route_id: "r1".into(),
684                command_id: "c6".into(),
685                causation_id: None,
686            },
687            RuntimeCommand::FailRoute {
688                route_id: "r1".into(),
689                error: "boom".into(),
690                command_id: "c7".into(),
691                causation_id: None,
692            },
693            RuntimeCommand::RemoveRoute {
694                route_id: "r1".into(),
695                command_id: "c8".into(),
696                causation_id: None,
697            },
698        ];
699
700        let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
701        assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
702        assert_eq!(cmds[0].causation_id(), Some("root"));
703        assert_eq!(cmds[1].causation_id(), None);
704    }
705
706    #[test]
707    fn canonical_route_spec_serde_roundtrip() {
708        let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
709        spec.steps.push(CanonicalStepSpec::Log {
710            message: "Hello".into(),
711        });
712        spec.steps.push(CanonicalStepSpec::To {
713            uri: "log:info".into(),
714        });
715        spec.steps.push(CanonicalStepSpec::Stop);
716
717        let json = serde_json::to_string(&spec).unwrap();
718        let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
719        assert_eq!(spec, deserialized);
720    }
721
722    #[test]
723    fn canonical_step_spec_serde_variants() {
724        let steps = vec![
725            CanonicalStepSpec::To {
726                uri: "direct:a".into(),
727            },
728            CanonicalStepSpec::Log {
729                message: "msg".into(),
730            },
731            CanonicalStepSpec::WireTap {
732                uri: "direct:audit".into(),
733            },
734            CanonicalStepSpec::Stop,
735            CanonicalStepSpec::Delay {
736                delay_ms: 100,
737                dynamic_header: None,
738            },
739        ];
740        let json = serde_json::to_string_pretty(&steps).unwrap();
741        let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
742        assert_eq!(steps, back);
743    }
744
745    #[test]
746    fn canonical_route_spec_json_schema_generates() {
747        let schema = schemars::schema_for!(CanonicalRouteSpec);
748        let json = serde_json::to_string(&schema).unwrap();
749        assert!(json.contains("CanonicalRouteSpec"));
750        assert!(json.contains("route_id"));
751    }
752
753    #[test]
754    fn canonical_json_schema_has_no_function_step() {
755        let schema = schemars::schema_for!(CanonicalRouteSpec);
756        let json = serde_json::to_string(&schema).unwrap();
757        assert!(
758            !json.contains("\"function\""),
759            "canonical JSON schema must not contain 'function' step"
760        );
761    }
762
763    #[test]
764    fn canonical_contract_does_not_support_function() {
765        assert!(
766            !canonical_contract_supports_step("function"),
767            "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
768        );
769    }
770}