Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19    "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22    &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24    "set_header",
25    "set_property",
26    "set_body",
27    "multicast",
28    "convert_body_to",
29    "bean",
30    "marshal",
31    "unmarshal",
32];
33pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
34    "processor",
35    "process",
36    "process_fn",
37    "map_body",
38    "set_body_fn",
39    "set_header_fn",
40];
41
42pub fn canonical_contract_supports_step(step: &str) -> bool {
43    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
44}
45
46pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
47    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
48        return Some(
49            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
50        );
51    }
52
53    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
54        return Some("rust-only programmable step; not representable in canonical v1 contract");
55    }
56
57    if canonical_contract_supports_step(step)
58        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
59    {
60        return Some(
61            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
62        );
63    }
64
65    None
66}
67
68#[derive(
69    Debug,
70    Clone,
71    PartialEq,
72    Eq,
73    serde::Serialize,
74    serde::Deserialize,
75    schemars::JsonSchema,
76    ts_rs::TS,
77)]
78#[serde(rename_all = "snake_case")]
79#[ts(rename_all = "snake_case")]
80pub struct CanonicalRouteSpec {
81    /// Stable minimal route representation for runtime command registration.
82    ///
83    /// Scope note (v1):
84    /// - This is intentionally a partial model and does not mirror every `BuilderStep`.
85    /// - Not included in v1: auto_startup, startup_order, concurrency, error_handler,
86    ///   unit_of_work. These are set to defaults when compiling from canonical.
87    /// - Round-trip (YAML → Canonical → YAML) loses these fields.
88    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
89    pub route_id: String,
90    pub from: String,
91    pub steps: Vec<CanonicalStepSpec>,
92    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
93    pub version: u32,
94}
95
96#[derive(
97    Debug,
98    Clone,
99    PartialEq,
100    Eq,
101    serde::Serialize,
102    serde::Deserialize,
103    schemars::JsonSchema,
104    ts_rs::TS,
105)]
106#[serde(tag = "step", content = "config", rename_all = "snake_case")]
107#[ts(rename_all = "snake_case")]
108pub enum CanonicalStepSpec {
109    To {
110        uri: String,
111    },
112    Log {
113        message: String,
114    },
115    WireTap {
116        uri: String,
117    },
118    Script {
119        expression: LanguageExpressionDef,
120    },
121    Filter {
122        predicate: LanguageExpressionDef,
123        steps: Vec<CanonicalStepSpec>,
124    },
125    Choice {
126        whens: Vec<CanonicalWhenSpec>,
127        otherwise: Option<Vec<CanonicalStepSpec>>,
128    },
129    Split {
130        expression: CanonicalSplitExpressionSpec,
131        aggregation: CanonicalSplitAggregationSpec,
132        parallel: bool,
133        parallel_limit: Option<usize>,
134        stop_on_exception: bool,
135        steps: Vec<CanonicalStepSpec>,
136    },
137    Aggregate(CanonicalAggregateSpec),
138    Stop,
139    Delay {
140        #[ts(type = "number")]
141        delay_ms: u64,
142        dynamic_header: Option<String>,
143    },
144}
145
146#[derive(
147    Debug,
148    Clone,
149    PartialEq,
150    Eq,
151    serde::Serialize,
152    serde::Deserialize,
153    schemars::JsonSchema,
154    ts_rs::TS,
155)]
156#[serde(rename_all = "snake_case")]
157#[ts(rename_all = "snake_case")]
158pub struct CanonicalWhenSpec {
159    pub predicate: LanguageExpressionDef,
160    pub steps: Vec<CanonicalStepSpec>,
161}
162
163#[derive(
164    Debug,
165    Clone,
166    PartialEq,
167    Eq,
168    serde::Serialize,
169    serde::Deserialize,
170    schemars::JsonSchema,
171    ts_rs::TS,
172)]
173#[serde(rename_all = "snake_case")]
174#[ts(rename_all = "snake_case")]
175pub enum CanonicalSplitExpressionSpec {
176    BodyLines,
177    BodyJsonArray,
178    Language(LanguageExpressionDef),
179}
180
181#[derive(
182    Debug,
183    Clone,
184    PartialEq,
185    Eq,
186    serde::Serialize,
187    serde::Deserialize,
188    schemars::JsonSchema,
189    ts_rs::TS,
190)]
191#[serde(rename_all = "snake_case")]
192#[ts(rename_all = "snake_case")]
193pub enum CanonicalSplitAggregationSpec {
194    LastWins,
195    CollectAll,
196    Original,
197}
198
199#[derive(
200    Debug,
201    Clone,
202    PartialEq,
203    Eq,
204    serde::Serialize,
205    serde::Deserialize,
206    schemars::JsonSchema,
207    ts_rs::TS,
208)]
209#[serde(rename_all = "snake_case")]
210#[ts(rename_all = "snake_case")]
211pub enum CanonicalAggregateStrategySpec {
212    CollectAll,
213}
214
215#[derive(
216    Debug,
217    Clone,
218    PartialEq,
219    Eq,
220    serde::Serialize,
221    serde::Deserialize,
222    schemars::JsonSchema,
223    ts_rs::TS,
224)]
225#[serde(rename_all = "snake_case")]
226#[ts(rename_all = "snake_case")]
227pub struct CanonicalAggregateSpec {
228    pub header: String,
229    pub completion_size: Option<usize>,
230    #[ts(type = "number")]
231    pub completion_timeout_ms: Option<u64>,
232    pub correlation_key: Option<String>,
233    pub force_completion_on_stop: Option<bool>,
234    pub discard_on_timeout: Option<bool>,
235    pub strategy: CanonicalAggregateStrategySpec,
236    pub max_buckets: Option<usize>,
237    #[ts(type = "number")]
238    pub bucket_ttl_ms: Option<u64>,
239}
240
241#[derive(
242    Debug,
243    Clone,
244    PartialEq,
245    Eq,
246    serde::Serialize,
247    serde::Deserialize,
248    schemars::JsonSchema,
249    ts_rs::TS,
250)]
251#[serde(rename_all = "snake_case")]
252#[ts(rename_all = "snake_case")]
253pub struct CanonicalCircuitBreakerSpec {
254    pub failure_threshold: u32,
255    #[ts(type = "number")]
256    pub open_duration_ms: u64,
257}
258
259impl CanonicalRouteSpec {
260    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
261        Self {
262            route_id: route_id.into(),
263            from: from.into(),
264            steps: Vec::new(),
265            circuit_breaker: None,
266            version: CANONICAL_CONTRACT_VERSION,
267        }
268    }
269
270    pub fn validate_contract(&self) -> Result<(), CamelError> {
271        if self.route_id.trim().is_empty() {
272            return Err(CamelError::RouteError(
273                "canonical contract violation: route_id cannot be empty".to_string(),
274            ));
275        }
276        if self.from.trim().is_empty() {
277            return Err(CamelError::RouteError(
278                "canonical contract violation: from cannot be empty".to_string(),
279            ));
280        }
281        if self.version != CANONICAL_CONTRACT_VERSION {
282            return Err(CamelError::RouteError(format!(
283                "canonical contract violation: expected version {}, got {}",
284                CANONICAL_CONTRACT_VERSION, self.version
285            )));
286        }
287        validate_steps(&self.steps)?;
288        if let Some(cb) = &self.circuit_breaker {
289            if cb.failure_threshold == 0 {
290                return Err(CamelError::RouteError(
291                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
292                        .to_string(),
293                ));
294            }
295            if cb.open_duration_ms == 0 {
296                return Err(CamelError::RouteError(
297                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
298                        .to_string(),
299                ));
300            }
301        }
302        Ok(())
303    }
304}
305
306fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
307    for step in steps {
308        match step {
309            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
310                if uri.trim().is_empty() {
311                    return Err(CamelError::RouteError(
312                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
313                    ));
314                }
315            }
316            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
317            CanonicalStepSpec::Choice { whens, otherwise } => {
318                for when in whens {
319                    validate_steps(&when.steps)?;
320                }
321                if let Some(otherwise) = otherwise {
322                    validate_steps(otherwise)?;
323                }
324            }
325            CanonicalStepSpec::Split {
326                parallel_limit,
327                steps,
328                ..
329            } => {
330                if let Some(limit) = parallel_limit
331                    && *limit == 0
332                {
333                    return Err(CamelError::RouteError(
334                        "canonical contract violation: split.parallel_limit must be > 0"
335                            .to_string(),
336                    ));
337                }
338                validate_steps(steps)?;
339            }
340            CanonicalStepSpec::Aggregate(config) => {
341                if config.header.trim().is_empty() {
342                    return Err(CamelError::RouteError(
343                        "canonical contract violation: aggregate.header cannot be empty"
344                            .to_string(),
345                    ));
346                }
347                if let Some(size) = config.completion_size
348                    && size == 0
349                {
350                    return Err(CamelError::RouteError(
351                        "canonical contract violation: aggregate.completion_size must be > 0"
352                            .to_string(),
353                    ));
354                }
355            }
356            CanonicalStepSpec::Log { .. }
357            | CanonicalStepSpec::Script { .. }
358            | CanonicalStepSpec::Stop
359            | CanonicalStepSpec::Delay { .. } => {}
360        }
361    }
362    Ok(())
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
366pub enum RuntimeCommand {
367    RegisterRoute {
368        spec: CanonicalRouteSpec,
369        command_id: String,
370        causation_id: Option<String>,
371    },
372    StartRoute {
373        route_id: String,
374        command_id: String,
375        causation_id: Option<String>,
376    },
377    StopRoute {
378        route_id: String,
379        command_id: String,
380        causation_id: Option<String>,
381    },
382    SuspendRoute {
383        route_id: String,
384        command_id: String,
385        causation_id: Option<String>,
386    },
387    ResumeRoute {
388        route_id: String,
389        command_id: String,
390        causation_id: Option<String>,
391    },
392    ReloadRoute {
393        route_id: String,
394        command_id: String,
395        causation_id: Option<String>,
396    },
397    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
398    ///
399    /// This keeps aggregate/projection state aligned with controller-observed failures.
400    FailRoute {
401        route_id: String,
402        error: String,
403        command_id: String,
404        causation_id: Option<String>,
405    },
406    RemoveRoute {
407        route_id: String,
408        command_id: String,
409        causation_id: Option<String>,
410    },
411}
412
413impl RuntimeCommand {
414    pub fn command_id(&self) -> &str {
415        match self {
416            RuntimeCommand::RegisterRoute { command_id, .. }
417            | RuntimeCommand::StartRoute { command_id, .. }
418            | RuntimeCommand::StopRoute { command_id, .. }
419            | RuntimeCommand::SuspendRoute { command_id, .. }
420            | RuntimeCommand::ResumeRoute { command_id, .. }
421            | RuntimeCommand::ReloadRoute { command_id, .. }
422            | RuntimeCommand::FailRoute { command_id, .. }
423            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
424        }
425    }
426
427    pub fn causation_id(&self) -> Option<&str> {
428        match self {
429            RuntimeCommand::RegisterRoute { causation_id, .. }
430            | RuntimeCommand::StartRoute { causation_id, .. }
431            | RuntimeCommand::StopRoute { causation_id, .. }
432            | RuntimeCommand::SuspendRoute { causation_id, .. }
433            | RuntimeCommand::ResumeRoute { causation_id, .. }
434            | RuntimeCommand::ReloadRoute { causation_id, .. }
435            | RuntimeCommand::FailRoute { causation_id, .. }
436            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
437        }
438    }
439}
440
441#[derive(Debug, Clone, PartialEq, Eq)]
442pub enum RuntimeCommandResult {
443    Accepted,
444    Duplicate { command_id: String },
445    RouteRegistered { route_id: String },
446    RouteStateChanged { route_id: String, status: String },
447}
448
449#[derive(Debug, Clone, PartialEq, Eq)]
450pub enum RuntimeQuery {
451    GetRouteStatus {
452        route_id: String,
453    },
454    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
455    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
456    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
457    InFlightCount {
458        route_id: String,
459    },
460    ListRoutes,
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
464pub enum RuntimeQueryResult {
465    InFlightCount { route_id: String, count: u64 },
466    RouteNotFound { route_id: String },
467    RouteStatus { route_id: String, status: String },
468    Routes { route_ids: Vec<String> },
469}
470
471#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
472pub enum RuntimeEvent {
473    RouteRegistered { route_id: String },
474    RouteStartRequested { route_id: String },
475    RouteStarted { route_id: String },
476    RouteFailed { route_id: String, error: String },
477    RouteStopped { route_id: String },
478    RouteSuspended { route_id: String },
479    RouteResumed { route_id: String },
480    RouteReloaded { route_id: String },
481    RouteRemoved { route_id: String },
482}
483
484#[async_trait]
485pub trait RuntimeCommandBus: Send + Sync {
486    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
487}
488
489#[async_trait]
490pub trait RuntimeQueryBus: Send + Sync {
491    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
492}
493
494pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
495
496impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use async_trait::async_trait;
502    use futures::executor::block_on;
503
504    struct NoopRuntime;
505
506    #[async_trait]
507    impl RuntimeCommandBus for NoopRuntime {
508        async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
509            Ok(match cmd {
510                RuntimeCommand::RegisterRoute { spec, .. } => {
511                    RuntimeCommandResult::RouteRegistered {
512                        route_id: spec.route_id,
513                    }
514                }
515                RuntimeCommand::StartRoute { route_id, .. }
516                | RuntimeCommand::StopRoute { route_id, .. }
517                | RuntimeCommand::SuspendRoute { route_id, .. }
518                | RuntimeCommand::ResumeRoute { route_id, .. }
519                | RuntimeCommand::ReloadRoute { route_id, .. }
520                | RuntimeCommand::FailRoute { route_id, .. }
521                | RuntimeCommand::RemoveRoute { route_id, .. } => {
522                    RuntimeCommandResult::RouteStateChanged {
523                        route_id,
524                        status: "ok".to_string(),
525                    }
526                }
527            })
528        }
529    }
530
531    #[async_trait]
532    impl RuntimeQueryBus for NoopRuntime {
533        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
534            Ok(match query {
535                RuntimeQuery::GetRouteStatus { route_id } => RuntimeQueryResult::RouteStatus {
536                    route_id,
537                    status: "Started".to_string(),
538                },
539                RuntimeQuery::InFlightCount { route_id } => {
540                    RuntimeQueryResult::InFlightCount { route_id, count: 0 }
541                }
542                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
543                    route_ids: vec!["r1".to_string()],
544                },
545            })
546        }
547    }
548
549    #[test]
550    fn command_and_query_ids_are_exposed() {
551        let cmd = RuntimeCommand::StartRoute {
552            route_id: "r1".into(),
553            command_id: "c1".into(),
554            causation_id: None,
555        };
556        assert_eq!(cmd.command_id(), "c1");
557    }
558
559    #[test]
560    fn canonical_spec_requires_route_id_and_from() {
561        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
562        assert_eq!(spec.route_id, "r1");
563        assert_eq!(spec.from, "timer:tick");
564        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
565        assert!(spec.steps.is_empty());
566        assert!(spec.circuit_breaker.is_none());
567    }
568
569    #[test]
570    fn canonical_contract_rejects_invalid_version() {
571        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
572        spec.version = 2;
573        let err = spec.validate_contract().unwrap_err().to_string();
574        assert!(err.contains("expected version"));
575    }
576
577    #[test]
578    fn canonical_contract_declares_subset_scope() {
579        assert!(canonical_contract_supports_step("to"));
580        assert!(canonical_contract_supports_step("split"));
581        assert!(!canonical_contract_supports_step("set_header"));
582        assert!(!canonical_contract_supports_step("set_property"));
583
584        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
585        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
586        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_property"));
587        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
588    }
589
590    #[test]
591    fn canonical_contract_rejection_reason_is_explicit() {
592        let set_header_reason = canonical_contract_rejection_reason("set_header")
593            .expect("set_header should have explicit reason");
594        assert!(set_header_reason.contains("out-of-scope"));
595
596        let set_property_reason = canonical_contract_rejection_reason("set_property")
597            .expect("set_property should have explicit reason");
598        assert!(set_property_reason.contains("out-of-scope"));
599
600        let processor_reason = canonical_contract_rejection_reason("processor")
601            .expect("processor should be rust-only");
602        assert!(processor_reason.contains("rust-only"));
603
604        let split_reason = canonical_contract_rejection_reason("split")
605            .expect("split should require declarative form");
606        assert!(split_reason.contains("declarative"));
607    }
608
609    #[test]
610    fn command_causation_id_is_exposed() {
611        let cmd = RuntimeCommand::StopRoute {
612            route_id: "r1".into(),
613            command_id: "c2".into(),
614            causation_id: Some("c1".into()),
615        };
616        assert_eq!(cmd.command_id(), "c2");
617        assert_eq!(cmd.causation_id(), Some("c1"));
618    }
619
620    #[test]
621    fn canonical_contract_rejects_empty_route_id_and_from() {
622        let spec = CanonicalRouteSpec::new("   ", "timer:tick");
623        let err = spec.validate_contract().unwrap_err().to_string();
624        assert!(err.contains("route_id cannot be empty"));
625
626        let spec = CanonicalRouteSpec::new("r1", "  ");
627        let err = spec.validate_contract().unwrap_err().to_string();
628        assert!(err.contains("from cannot be empty"));
629    }
630
631    #[test]
632    fn canonical_contract_rejects_invalid_nested_steps() {
633        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
634        spec.steps = vec![CanonicalStepSpec::Split {
635            expression: CanonicalSplitExpressionSpec::BodyLines,
636            aggregation: CanonicalSplitAggregationSpec::CollectAll,
637            parallel: true,
638            parallel_limit: Some(0),
639            stop_on_exception: false,
640            steps: vec![CanonicalStepSpec::To {
641                uri: "log:ok".to_string(),
642            }],
643        }];
644        let err = spec.validate_contract().unwrap_err().to_string();
645        assert!(err.contains("split.parallel_limit must be > 0"));
646
647        spec.steps = vec![CanonicalStepSpec::To {
648            uri: "   ".to_string(),
649        }];
650        let err = spec.validate_contract().unwrap_err().to_string();
651        assert!(err.contains("endpoint uri cannot be empty"));
652    }
653
654    #[test]
655    fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
656        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
657        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
658            header: " ".to_string(),
659            completion_size: Some(1),
660            completion_timeout_ms: None,
661            correlation_key: None,
662            force_completion_on_stop: None,
663            discard_on_timeout: None,
664            strategy: CanonicalAggregateStrategySpec::CollectAll,
665            max_buckets: None,
666            bucket_ttl_ms: None,
667        })];
668        let err = spec.validate_contract().unwrap_err().to_string();
669        assert!(err.contains("aggregate.header cannot be empty"));
670
671        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
672            header: "k".to_string(),
673            completion_size: Some(0),
674            completion_timeout_ms: None,
675            correlation_key: None,
676            force_completion_on_stop: None,
677            discard_on_timeout: None,
678            strategy: CanonicalAggregateStrategySpec::CollectAll,
679            max_buckets: None,
680            bucket_ttl_ms: None,
681        })];
682        let err = spec.validate_contract().unwrap_err().to_string();
683        assert!(err.contains("aggregate.completion_size must be > 0"));
684
685        spec.steps = vec![];
686        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
687            failure_threshold: 0,
688            open_duration_ms: 10,
689        });
690        let err = spec.validate_contract().unwrap_err().to_string();
691        assert!(err.contains("failure_threshold must be > 0"));
692
693        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
694            failure_threshold: 1,
695            open_duration_ms: 0,
696        });
697        let err = spec.validate_contract().unwrap_err().to_string();
698        assert!(err.contains("open_duration_ms must be > 0"));
699    }
700
701    #[test]
702    fn canonical_contract_rejection_reason_none_for_regular_steps() {
703        assert!(canonical_contract_rejection_reason("to").is_none());
704        assert!(canonical_contract_rejection_reason("unknown-step").is_none());
705    }
706
707    #[test]
708    fn command_helpers_cover_all_variants() {
709        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
710        let cmds = [
711            RuntimeCommand::RegisterRoute {
712                spec,
713                command_id: "c1".into(),
714                causation_id: Some("root".into()),
715            },
716            RuntimeCommand::StartRoute {
717                route_id: "r1".into(),
718                command_id: "c2".into(),
719                causation_id: None,
720            },
721            RuntimeCommand::StopRoute {
722                route_id: "r1".into(),
723                command_id: "c3".into(),
724                causation_id: None,
725            },
726            RuntimeCommand::SuspendRoute {
727                route_id: "r1".into(),
728                command_id: "c4".into(),
729                causation_id: None,
730            },
731            RuntimeCommand::ResumeRoute {
732                route_id: "r1".into(),
733                command_id: "c5".into(),
734                causation_id: None,
735            },
736            RuntimeCommand::ReloadRoute {
737                route_id: "r1".into(),
738                command_id: "c6".into(),
739                causation_id: None,
740            },
741            RuntimeCommand::FailRoute {
742                route_id: "r1".into(),
743                error: "boom".into(),
744                command_id: "c7".into(),
745                causation_id: None,
746            },
747            RuntimeCommand::RemoveRoute {
748                route_id: "r1".into(),
749                command_id: "c8".into(),
750                causation_id: None,
751            },
752        ];
753
754        let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
755        assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
756        assert_eq!(cmds[0].causation_id(), Some("root"));
757        assert_eq!(cmds[1].causation_id(), None);
758    }
759
760    #[test]
761    fn canonical_route_spec_serde_roundtrip() {
762        let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
763        spec.steps.push(CanonicalStepSpec::Log {
764            message: "Hello".into(),
765        });
766        spec.steps.push(CanonicalStepSpec::To {
767            uri: "log:info".into(),
768        });
769        spec.steps.push(CanonicalStepSpec::Stop);
770
771        let json = serde_json::to_string(&spec).unwrap();
772        let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
773        assert_eq!(spec, deserialized);
774    }
775
776    #[test]
777    fn canonical_step_spec_serde_variants() {
778        let steps = vec![
779            CanonicalStepSpec::To {
780                uri: "direct:a".into(),
781            },
782            CanonicalStepSpec::Log {
783                message: "msg".into(),
784            },
785            CanonicalStepSpec::WireTap {
786                uri: "direct:audit".into(),
787            },
788            CanonicalStepSpec::Stop,
789            CanonicalStepSpec::Delay {
790                delay_ms: 100,
791                dynamic_header: None,
792            },
793        ];
794        let json = serde_json::to_string_pretty(&steps).unwrap();
795        let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
796        assert_eq!(steps, back);
797    }
798
799    #[test]
800    fn canonical_route_spec_json_schema_generates() {
801        let schema = schemars::schema_for!(CanonicalRouteSpec);
802        let json = serde_json::to_string(&schema).unwrap();
803        assert!(json.contains("CanonicalRouteSpec"));
804        assert!(json.contains("route_id"));
805    }
806
807    #[test]
808    fn canonical_json_schema_has_no_function_step() {
809        let schema = schemars::schema_for!(CanonicalRouteSpec);
810        let json = serde_json::to_string(&schema).unwrap();
811        assert!(
812            !json.contains("\"function\""),
813            "canonical JSON schema must not contain 'function' step"
814        );
815    }
816
817    #[test]
818    fn canonical_contract_does_not_support_function() {
819        assert!(
820            !canonical_contract_supports_step("function"),
821            "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
822        );
823    }
824
825    #[test]
826    fn runtime_command_result_all_variants_are_distinct() {
827        let accepted = RuntimeCommandResult::Accepted;
828        let dup = RuntimeCommandResult::Duplicate {
829            command_id: "c1".into(),
830        };
831        let registered = RuntimeCommandResult::RouteRegistered {
832            route_id: "r1".into(),
833        };
834        let changed = RuntimeCommandResult::RouteStateChanged {
835            route_id: "r1".into(),
836            status: "Started".into(),
837        };
838
839        assert_ne!(accepted, dup);
840        assert_ne!(dup, registered);
841        assert_ne!(registered, changed);
842
843        let dup2 = RuntimeCommandResult::Duplicate {
844            command_id: "c1".into(),
845        };
846        assert_eq!(dup, dup2);
847    }
848
849    #[test]
850    fn runtime_event_serialization_round_trip() {
851        let event = RuntimeEvent::RouteFailed {
852            route_id: "route-a".to_string(),
853            error: "boom".to_string(),
854        };
855        let json = serde_json::to_string(&event).unwrap();
856        let back: RuntimeEvent = serde_json::from_str(&json).unwrap();
857        assert_eq!(event, back);
858    }
859
860    #[test]
861    fn noop_runtime_execute_and_ask_return_expected_shapes() {
862        let rt = NoopRuntime;
863        let cmd = RuntimeCommand::RegisterRoute {
864            spec: CanonicalRouteSpec::new("r2", "timer:tick"),
865            command_id: "c1".into(),
866            causation_id: None,
867        };
868        let cmd_result = block_on(rt.execute(cmd)).unwrap();
869        assert_eq!(
870            cmd_result,
871            RuntimeCommandResult::RouteRegistered {
872                route_id: "r2".into()
873            }
874        );
875
876        let query_result = block_on(rt.ask(RuntimeQuery::GetRouteStatus {
877            route_id: "r2".into(),
878        }))
879        .unwrap();
880        assert_eq!(
881            query_result,
882            RuntimeQueryResult::RouteStatus {
883                route_id: "r2".into(),
884                status: "Started".into()
885            }
886        );
887    }
888
889    #[test]
890    fn canonical_contract_name_and_version_constants_match() {
891        assert_eq!(CANONICAL_CONTRACT_NAME, "canonical-v1");
892        assert_eq!(CANONICAL_CONTRACT_VERSION, 1);
893    }
894}