Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6use crate::splitter::StreamSplitConfig;
7
8pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
9pub const CANONICAL_CONTRACT_VERSION: u32 = 2;
10pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
11    "to",
12    "log",
13    "wire_tap",
14    "script",
15    "filter",
16    "choice",
17    "split",
18    "aggregate",
19    "stop",
20    "delay",
21];
22pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
23    &["script", "filter", "choice", "split"];
24pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
25    "set_header",
26    "set_property",
27    "set_body",
28    "multicast",
29    "convert_body_to",
30    "bean",
31    "marshal",
32    "unmarshal",
33];
34pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
35    "processor",
36    "process",
37    "process_fn",
38    "map_body",
39    "set_body_fn",
40    "set_header_fn",
41];
42
43pub fn canonical_contract_supports_step(step: &str) -> bool {
44    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
45}
46
47pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
48    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
49        return Some(
50            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
51        );
52    }
53
54    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
55        return Some("rust-only programmable step; not representable in canonical v1 contract");
56    }
57
58    if canonical_contract_supports_step(step)
59        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
60    {
61        return Some(
62            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
63        );
64    }
65
66    None
67}
68
69#[derive(
70    Debug,
71    Clone,
72    PartialEq,
73    Eq,
74    serde::Serialize,
75    serde::Deserialize,
76    schemars::JsonSchema,
77    ts_rs::TS,
78)]
79#[serde(rename_all = "snake_case")]
80#[ts(rename_all = "snake_case")]
81pub struct CanonicalRouteSpec {
82    /// Stable minimal route representation for runtime command registration.
83    ///
84    /// Scope notes:
85    /// - This is intentionally a partial model and does not mirror every `BuilderStep`.
86    /// - Version 2 adds: auto_startup, startup_order, concurrency.
87    /// - Still excluded: error_handler, unit_of_work. These are set to defaults
88    ///   when compiling from canonical.
89    /// - Round-trip (YAML → Canonical → YAML) loses these fields.
90    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
91    pub route_id: String,
92    pub from: String,
93    pub steps: Vec<CanonicalStepSpec>,
94    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
95    pub auto_startup: Option<bool>,
96    pub startup_order: Option<i32>,
97    pub concurrency: Option<CanonicalConcurrencySpec>,
98    pub version: u32,
99}
100
101#[derive(
102    Debug,
103    Clone,
104    PartialEq,
105    Eq,
106    serde::Serialize,
107    serde::Deserialize,
108    schemars::JsonSchema,
109    ts_rs::TS,
110)]
111#[serde(tag = "step", content = "config", rename_all = "snake_case")]
112#[ts(rename_all = "snake_case")]
113pub enum CanonicalStepSpec {
114    To {
115        uri: String,
116    },
117    Log {
118        message: String,
119    },
120    WireTap {
121        uri: String,
122    },
123    Script {
124        expression: LanguageExpressionDef,
125    },
126    Filter {
127        predicate: LanguageExpressionDef,
128        steps: Vec<CanonicalStepSpec>,
129    },
130    Choice {
131        whens: Vec<CanonicalWhenSpec>,
132        otherwise: Option<Vec<CanonicalStepSpec>>,
133    },
134    Split {
135        expression: CanonicalSplitExpressionSpec,
136        aggregation: CanonicalSplitAggregationSpec,
137        parallel: bool,
138        parallel_limit: Option<usize>,
139        stop_on_exception: bool,
140        steps: Vec<CanonicalStepSpec>,
141    },
142    Aggregate(CanonicalAggregateSpec),
143    Stop,
144    Delay {
145        #[ts(type = "number")]
146        delay_ms: u64,
147        dynamic_header: Option<String>,
148    },
149}
150
151#[derive(
152    Debug,
153    Clone,
154    PartialEq,
155    Eq,
156    serde::Serialize,
157    serde::Deserialize,
158    schemars::JsonSchema,
159    ts_rs::TS,
160)]
161#[serde(rename_all = "snake_case")]
162#[ts(rename_all = "snake_case")]
163pub struct CanonicalWhenSpec {
164    pub predicate: LanguageExpressionDef,
165    pub steps: Vec<CanonicalStepSpec>,
166}
167
168#[derive(
169    Debug,
170    Clone,
171    PartialEq,
172    Eq,
173    serde::Serialize,
174    serde::Deserialize,
175    schemars::JsonSchema,
176    ts_rs::TS,
177)]
178#[serde(rename_all = "snake_case")]
179#[ts(rename_all = "snake_case")]
180pub enum CanonicalSplitExpressionSpec {
181    BodyLines,
182    BodyJsonArray,
183    Language(LanguageExpressionDef),
184    Stream(StreamSplitConfig),
185}
186
187#[derive(
188    Debug,
189    Clone,
190    PartialEq,
191    Eq,
192    serde::Serialize,
193    serde::Deserialize,
194    schemars::JsonSchema,
195    ts_rs::TS,
196)]
197#[serde(rename_all = "snake_case")]
198#[ts(rename_all = "snake_case")]
199pub enum CanonicalSplitAggregationSpec {
200    LastWins,
201    CollectAll,
202    Original,
203}
204
205#[derive(
206    Debug,
207    Clone,
208    PartialEq,
209    Eq,
210    serde::Serialize,
211    serde::Deserialize,
212    schemars::JsonSchema,
213    ts_rs::TS,
214)]
215#[serde(rename_all = "snake_case")]
216#[ts(rename_all = "snake_case")]
217pub enum CanonicalAggregateStrategySpec {
218    CollectAll,
219}
220
221#[derive(
222    Debug,
223    Clone,
224    PartialEq,
225    Eq,
226    serde::Serialize,
227    serde::Deserialize,
228    schemars::JsonSchema,
229    ts_rs::TS,
230)]
231#[serde(rename_all = "snake_case")]
232#[ts(rename_all = "snake_case")]
233pub struct CanonicalAggregateSpec {
234    pub header: String,
235    pub completion_size: Option<usize>,
236    #[ts(type = "number")]
237    pub completion_timeout_ms: Option<u64>,
238    pub correlation_key: Option<String>,
239    pub force_completion_on_stop: Option<bool>,
240    pub discard_on_timeout: Option<bool>,
241    pub strategy: CanonicalAggregateStrategySpec,
242    pub max_buckets: Option<usize>,
243    #[ts(type = "number")]
244    pub bucket_ttl_ms: Option<u64>,
245}
246
247#[derive(
248    Debug,
249    Clone,
250    PartialEq,
251    Eq,
252    serde::Serialize,
253    serde::Deserialize,
254    schemars::JsonSchema,
255    ts_rs::TS,
256)]
257#[serde(rename_all = "snake_case")]
258#[ts(rename_all = "snake_case")]
259pub struct CanonicalCircuitBreakerSpec {
260    pub failure_threshold: u32,
261    #[ts(type = "number")]
262    pub open_duration_ms: u64,
263}
264
265#[derive(
266    Debug,
267    Clone,
268    PartialEq,
269    Eq,
270    serde::Serialize,
271    serde::Deserialize,
272    schemars::JsonSchema,
273    ts_rs::TS,
274)]
275#[serde(tag = "mode", rename_all = "snake_case")]
276pub enum CanonicalConcurrencySpec {
277    Sequential,
278    Concurrent { max: usize },
279}
280
281impl CanonicalRouteSpec {
282    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
283        Self {
284            route_id: route_id.into(),
285            from: from.into(),
286            steps: Vec::new(),
287            circuit_breaker: None,
288            auto_startup: None,
289            startup_order: None,
290            concurrency: None,
291            version: CANONICAL_CONTRACT_VERSION,
292        }
293    }
294
295    pub fn with_auto_startup(mut self, auto: bool) -> Self {
296        self.auto_startup = Some(auto);
297        self
298    }
299
300    pub fn with_startup_order(mut self, order: i32) -> Self {
301        self.startup_order = Some(order);
302        self
303    }
304
305    pub fn with_concurrency(mut self, concurrency: CanonicalConcurrencySpec) -> Self {
306        self.concurrency = Some(concurrency);
307        self
308    }
309
310    pub fn validate_contract(&self) -> Result<(), CamelError> {
311        if self.route_id.trim().is_empty() {
312            return Err(CamelError::RouteError(
313                "canonical contract violation: route_id cannot be empty".to_string(),
314            ));
315        }
316        if self.from.trim().is_empty() {
317            return Err(CamelError::RouteError(
318                "canonical contract violation: from cannot be empty".to_string(),
319            ));
320        }
321        if self.version == 0 || self.version > CANONICAL_CONTRACT_VERSION {
322            return Err(CamelError::RouteError(format!(
323                "canonical contract violation: expected version {}, got {}",
324                CANONICAL_CONTRACT_VERSION, self.version
325            )));
326        }
327        validate_steps(&self.steps)?;
328        if let Some(cb) = &self.circuit_breaker {
329            if cb.failure_threshold == 0 {
330                return Err(CamelError::RouteError(
331                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
332                        .to_string(),
333                ));
334            }
335            if cb.open_duration_ms == 0 {
336                return Err(CamelError::RouteError(
337                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
338                        .to_string(),
339                ));
340            }
341        }
342        if let Some(CanonicalConcurrencySpec::Concurrent { max: 0 }) = &self.concurrency {
343            return Err(CamelError::RouteError(
344                "canonical contract violation: concurrency max must be > 0".to_string(),
345            ));
346        }
347        Ok(())
348    }
349}
350
351#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
352pub struct CanonicalFieldLoss {
353    pub field: &'static str,
354    pub reason: String,
355    pub target_version: u32,
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize)]
359pub struct CanonicalLossReport {
360    pub dropped_fields: Vec<CanonicalFieldLoss>,
361}
362
363impl CanonicalLossReport {
364    pub fn from_field(field: &'static str, reason: &str, target_version: u32) -> Self {
365        Self {
366            dropped_fields: vec![CanonicalFieldLoss {
367                field,
368                reason: reason.to_string(),
369                target_version,
370            }],
371        }
372    }
373
374    pub fn is_empty(&self) -> bool {
375        self.dropped_fields.is_empty()
376    }
377}
378
379fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
380    for step in steps {
381        match step {
382            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
383                if uri.trim().is_empty() {
384                    return Err(CamelError::RouteError(
385                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
386                    ));
387                }
388            }
389            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
390            CanonicalStepSpec::Choice { whens, otherwise } => {
391                for when in whens {
392                    validate_steps(&when.steps)?;
393                }
394                if let Some(otherwise) = otherwise {
395                    validate_steps(otherwise)?;
396                }
397            }
398            CanonicalStepSpec::Split {
399                parallel_limit,
400                steps,
401                ..
402            } => {
403                if let Some(limit) = parallel_limit
404                    && *limit == 0
405                {
406                    return Err(CamelError::RouteError(
407                        "canonical contract violation: split.parallel_limit must be > 0"
408                            .to_string(),
409                    ));
410                }
411                validate_steps(steps)?;
412            }
413            CanonicalStepSpec::Aggregate(config) => {
414                if config.header.trim().is_empty() {
415                    return Err(CamelError::RouteError(
416                        "canonical contract violation: aggregate.header cannot be empty"
417                            .to_string(),
418                    ));
419                }
420                if let Some(size) = config.completion_size
421                    && size == 0
422                {
423                    return Err(CamelError::RouteError(
424                        "canonical contract violation: aggregate.completion_size must be > 0"
425                            .to_string(),
426                    ));
427                }
428            }
429            CanonicalStepSpec::Log { .. }
430            | CanonicalStepSpec::Script { .. }
431            | CanonicalStepSpec::Stop
432            | CanonicalStepSpec::Delay { .. } => {}
433        }
434    }
435    Ok(())
436}
437
438#[derive(Debug, Clone, PartialEq, Eq)]
439pub enum RuntimeCommand {
440    RegisterRoute {
441        spec: CanonicalRouteSpec,
442        command_id: String,
443        causation_id: Option<String>,
444    },
445    StartRoute {
446        route_id: String,
447        command_id: String,
448        causation_id: Option<String>,
449    },
450    StopRoute {
451        route_id: String,
452        command_id: String,
453        causation_id: Option<String>,
454    },
455    SuspendRoute {
456        route_id: String,
457        command_id: String,
458        causation_id: Option<String>,
459    },
460    ResumeRoute {
461        route_id: String,
462        command_id: String,
463        causation_id: Option<String>,
464    },
465    ReloadRoute {
466        route_id: String,
467        command_id: String,
468        causation_id: Option<String>,
469    },
470    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
471    ///
472    /// This keeps aggregate/projection state aligned with controller-observed failures.
473    FailRoute {
474        route_id: String,
475        error: String,
476        command_id: String,
477        causation_id: Option<String>,
478    },
479    RemoveRoute {
480        route_id: String,
481        command_id: String,
482        causation_id: Option<String>,
483    },
484}
485
486impl RuntimeCommand {
487    pub fn command_id(&self) -> &str {
488        match self {
489            RuntimeCommand::RegisterRoute { command_id, .. }
490            | RuntimeCommand::StartRoute { command_id, .. }
491            | RuntimeCommand::StopRoute { command_id, .. }
492            | RuntimeCommand::SuspendRoute { command_id, .. }
493            | RuntimeCommand::ResumeRoute { command_id, .. }
494            | RuntimeCommand::ReloadRoute { command_id, .. }
495            | RuntimeCommand::FailRoute { command_id, .. }
496            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
497        }
498    }
499
500    pub fn causation_id(&self) -> Option<&str> {
501        match self {
502            RuntimeCommand::RegisterRoute { causation_id, .. }
503            | RuntimeCommand::StartRoute { causation_id, .. }
504            | RuntimeCommand::StopRoute { causation_id, .. }
505            | RuntimeCommand::SuspendRoute { causation_id, .. }
506            | RuntimeCommand::ResumeRoute { causation_id, .. }
507            | RuntimeCommand::ReloadRoute { causation_id, .. }
508            | RuntimeCommand::FailRoute { causation_id, .. }
509            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
510        }
511    }
512}
513
514#[derive(Debug, Clone, PartialEq, Eq)]
515pub enum RuntimeCommandResult {
516    Accepted,
517    Duplicate { command_id: String },
518    RouteRegistered { route_id: String },
519    RouteStateChanged { route_id: String, status: String },
520}
521
522#[derive(Debug, Clone, PartialEq, Eq)]
523pub enum RuntimeQuery {
524    GetRouteStatus {
525        route_id: String,
526    },
527    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
528    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
529    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
530    InFlightCount {
531        route_id: String,
532    },
533    ListRoutes,
534}
535
536#[derive(Debug, Clone, PartialEq, Eq)]
537pub enum RuntimeQueryResult {
538    InFlightCount { route_id: String, count: u64 },
539    RouteNotFound { route_id: String },
540    RouteStatus { route_id: String, status: String },
541    Routes { route_ids: Vec<String> },
542}
543
544#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
545pub enum RuntimeEvent {
546    RouteRegistered { route_id: String },
547    RouteStartRequested { route_id: String },
548    RouteStarted { route_id: String },
549    RouteFailed { route_id: String, error: String },
550    RouteStopped { route_id: String },
551    RouteSuspended { route_id: String },
552    RouteResumed { route_id: String },
553    RouteReloaded { route_id: String },
554    RouteRemoved { route_id: String },
555}
556
557#[async_trait]
558pub trait RuntimeCommandBus: Send + Sync {
559    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
560}
561
562#[async_trait]
563pub trait RuntimeQueryBus: Send + Sync {
564    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
565}
566
567pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
568
569impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use async_trait::async_trait;
575    use futures::executor::block_on;
576
577    struct NoopRuntime;
578
579    #[async_trait]
580    impl RuntimeCommandBus for NoopRuntime {
581        async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
582            Ok(match cmd {
583                RuntimeCommand::RegisterRoute { spec, .. } => {
584                    RuntimeCommandResult::RouteRegistered {
585                        route_id: spec.route_id,
586                    }
587                }
588                RuntimeCommand::StartRoute { route_id, .. }
589                | RuntimeCommand::StopRoute { route_id, .. }
590                | RuntimeCommand::SuspendRoute { route_id, .. }
591                | RuntimeCommand::ResumeRoute { route_id, .. }
592                | RuntimeCommand::ReloadRoute { route_id, .. }
593                | RuntimeCommand::FailRoute { route_id, .. }
594                | RuntimeCommand::RemoveRoute { route_id, .. } => {
595                    RuntimeCommandResult::RouteStateChanged {
596                        route_id,
597                        status: "ok".to_string(),
598                    }
599                }
600            })
601        }
602    }
603
604    #[async_trait]
605    impl RuntimeQueryBus for NoopRuntime {
606        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
607            Ok(match query {
608                RuntimeQuery::GetRouteStatus { route_id } => RuntimeQueryResult::RouteStatus {
609                    route_id,
610                    status: "Started".to_string(),
611                },
612                RuntimeQuery::InFlightCount { route_id } => {
613                    RuntimeQueryResult::InFlightCount { route_id, count: 0 }
614                }
615                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
616                    route_ids: vec!["r1".to_string()],
617                },
618            })
619        }
620    }
621
622    #[test]
623    fn command_and_query_ids_are_exposed() {
624        let cmd = RuntimeCommand::StartRoute {
625            route_id: "r1".into(),
626            command_id: "c1".into(),
627            causation_id: None,
628        };
629        assert_eq!(cmd.command_id(), "c1");
630    }
631
632    #[test]
633    fn canonical_spec_requires_route_id_and_from() {
634        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
635        assert_eq!(spec.route_id, "r1");
636        assert_eq!(spec.from, "timer:tick");
637        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
638        assert!(spec.steps.is_empty());
639        assert!(spec.circuit_breaker.is_none());
640    }
641
642    #[test]
643    fn canonical_contract_rejects_invalid_version() {
644        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
645        spec.version = 3;
646        let err = spec.validate_contract().unwrap_err().to_string();
647        assert!(err.contains("expected version"));
648    }
649
650    #[test]
651    fn canonical_contract_declares_subset_scope() {
652        assert!(canonical_contract_supports_step("to"));
653        assert!(canonical_contract_supports_step("split"));
654        assert!(!canonical_contract_supports_step("set_header"));
655        assert!(!canonical_contract_supports_step("set_property"));
656
657        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
658        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
659        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_property"));
660        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
661    }
662
663    #[test]
664    fn canonical_contract_rejection_reason_is_explicit() {
665        let set_header_reason = canonical_contract_rejection_reason("set_header")
666            .expect("set_header should have explicit reason");
667        assert!(set_header_reason.contains("out-of-scope"));
668
669        let set_property_reason = canonical_contract_rejection_reason("set_property")
670            .expect("set_property should have explicit reason");
671        assert!(set_property_reason.contains("out-of-scope"));
672
673        let processor_reason = canonical_contract_rejection_reason("processor")
674            .expect("processor should be rust-only");
675        assert!(processor_reason.contains("rust-only"));
676
677        let split_reason = canonical_contract_rejection_reason("split")
678            .expect("split should require declarative form");
679        assert!(split_reason.contains("declarative"));
680    }
681
682    #[test]
683    fn command_causation_id_is_exposed() {
684        let cmd = RuntimeCommand::StopRoute {
685            route_id: "r1".into(),
686            command_id: "c2".into(),
687            causation_id: Some("c1".into()),
688        };
689        assert_eq!(cmd.command_id(), "c2");
690        assert_eq!(cmd.causation_id(), Some("c1"));
691    }
692
693    #[test]
694    fn canonical_contract_rejects_empty_route_id_and_from() {
695        let spec = CanonicalRouteSpec::new("   ", "timer:tick");
696        let err = spec.validate_contract().unwrap_err().to_string();
697        assert!(err.contains("route_id cannot be empty"));
698
699        let spec = CanonicalRouteSpec::new("r1", "  ");
700        let err = spec.validate_contract().unwrap_err().to_string();
701        assert!(err.contains("from cannot be empty"));
702    }
703
704    #[test]
705    fn canonical_contract_rejects_invalid_nested_steps() {
706        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
707        spec.steps = vec![CanonicalStepSpec::Split {
708            expression: CanonicalSplitExpressionSpec::BodyLines,
709            aggregation: CanonicalSplitAggregationSpec::CollectAll,
710            parallel: true,
711            parallel_limit: Some(0),
712            stop_on_exception: false,
713            steps: vec![CanonicalStepSpec::To {
714                uri: "log:ok".to_string(),
715            }],
716        }];
717        let err = spec.validate_contract().unwrap_err().to_string();
718        assert!(err.contains("split.parallel_limit must be > 0"));
719
720        spec.steps = vec![CanonicalStepSpec::To {
721            uri: "   ".to_string(),
722        }];
723        let err = spec.validate_contract().unwrap_err().to_string();
724        assert!(err.contains("endpoint uri cannot be empty"));
725    }
726
727    #[test]
728    fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
729        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
730        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
731            header: " ".to_string(),
732            completion_size: Some(1),
733            completion_timeout_ms: None,
734            correlation_key: None,
735            force_completion_on_stop: None,
736            discard_on_timeout: None,
737            strategy: CanonicalAggregateStrategySpec::CollectAll,
738            max_buckets: None,
739            bucket_ttl_ms: None,
740        })];
741        let err = spec.validate_contract().unwrap_err().to_string();
742        assert!(err.contains("aggregate.header cannot be empty"));
743
744        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
745            header: "k".to_string(),
746            completion_size: Some(0),
747            completion_timeout_ms: None,
748            correlation_key: None,
749            force_completion_on_stop: None,
750            discard_on_timeout: None,
751            strategy: CanonicalAggregateStrategySpec::CollectAll,
752            max_buckets: None,
753            bucket_ttl_ms: None,
754        })];
755        let err = spec.validate_contract().unwrap_err().to_string();
756        assert!(err.contains("aggregate.completion_size must be > 0"));
757
758        spec.steps = vec![];
759        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
760            failure_threshold: 0,
761            open_duration_ms: 10,
762        });
763        let err = spec.validate_contract().unwrap_err().to_string();
764        assert!(err.contains("failure_threshold must be > 0"));
765
766        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
767            failure_threshold: 1,
768            open_duration_ms: 0,
769        });
770        let err = spec.validate_contract().unwrap_err().to_string();
771        assert!(err.contains("open_duration_ms must be > 0"));
772    }
773
774    #[test]
775    fn canonical_contract_rejection_reason_none_for_regular_steps() {
776        assert!(canonical_contract_rejection_reason("to").is_none());
777        assert!(canonical_contract_rejection_reason("unknown-step").is_none());
778    }
779
780    #[test]
781    fn command_helpers_cover_all_variants() {
782        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
783        let cmds = [
784            RuntimeCommand::RegisterRoute {
785                spec,
786                command_id: "c1".into(),
787                causation_id: Some("root".into()),
788            },
789            RuntimeCommand::StartRoute {
790                route_id: "r1".into(),
791                command_id: "c2".into(),
792                causation_id: None,
793            },
794            RuntimeCommand::StopRoute {
795                route_id: "r1".into(),
796                command_id: "c3".into(),
797                causation_id: None,
798            },
799            RuntimeCommand::SuspendRoute {
800                route_id: "r1".into(),
801                command_id: "c4".into(),
802                causation_id: None,
803            },
804            RuntimeCommand::ResumeRoute {
805                route_id: "r1".into(),
806                command_id: "c5".into(),
807                causation_id: None,
808            },
809            RuntimeCommand::ReloadRoute {
810                route_id: "r1".into(),
811                command_id: "c6".into(),
812                causation_id: None,
813            },
814            RuntimeCommand::FailRoute {
815                route_id: "r1".into(),
816                error: "boom".into(),
817                command_id: "c7".into(),
818                causation_id: None,
819            },
820            RuntimeCommand::RemoveRoute {
821                route_id: "r1".into(),
822                command_id: "c8".into(),
823                causation_id: None,
824            },
825        ];
826
827        let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
828        assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
829        assert_eq!(cmds[0].causation_id(), Some("root"));
830        assert_eq!(cmds[1].causation_id(), None);
831    }
832
833    #[test]
834    fn canonical_route_spec_serde_roundtrip() {
835        let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
836        spec.steps.push(CanonicalStepSpec::Log {
837            message: "Hello".into(),
838        });
839        spec.steps.push(CanonicalStepSpec::To {
840            uri: "log:info".into(),
841        });
842        spec.steps.push(CanonicalStepSpec::Stop);
843
844        let json = serde_json::to_string(&spec).unwrap();
845        let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
846        assert_eq!(spec, deserialized);
847    }
848
849    #[test]
850    fn canonical_step_spec_serde_variants() {
851        let steps = vec![
852            CanonicalStepSpec::To {
853                uri: "direct:a".into(),
854            },
855            CanonicalStepSpec::Log {
856                message: "msg".into(),
857            },
858            CanonicalStepSpec::WireTap {
859                uri: "direct:audit".into(),
860            },
861            CanonicalStepSpec::Stop,
862            CanonicalStepSpec::Delay {
863                delay_ms: 100,
864                dynamic_header: None,
865            },
866        ];
867        let json = serde_json::to_string_pretty(&steps).unwrap();
868        let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
869        assert_eq!(steps, back);
870    }
871
872    #[test]
873    fn canonical_route_spec_json_schema_generates() {
874        let schema = schemars::schema_for!(CanonicalRouteSpec);
875        let json = serde_json::to_string(&schema).unwrap();
876        assert!(json.contains("CanonicalRouteSpec"));
877        assert!(json.contains("route_id"));
878    }
879
880    #[test]
881    fn canonical_json_schema_has_no_function_step() {
882        let schema = schemars::schema_for!(CanonicalRouteSpec);
883        let json = serde_json::to_string(&schema).unwrap();
884        assert!(
885            !json.contains("\"function\""),
886            "canonical JSON schema must not contain 'function' step"
887        );
888    }
889
890    #[test]
891    fn canonical_contract_does_not_support_function() {
892        assert!(
893            !canonical_contract_supports_step("function"),
894            "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
895        );
896    }
897
898    #[test]
899    fn runtime_command_result_all_variants_are_distinct() {
900        let accepted = RuntimeCommandResult::Accepted;
901        let dup = RuntimeCommandResult::Duplicate {
902            command_id: "c1".into(),
903        };
904        let registered = RuntimeCommandResult::RouteRegistered {
905            route_id: "r1".into(),
906        };
907        let changed = RuntimeCommandResult::RouteStateChanged {
908            route_id: "r1".into(),
909            status: "Started".into(),
910        };
911
912        assert_ne!(accepted, dup);
913        assert_ne!(dup, registered);
914        assert_ne!(registered, changed);
915
916        let dup2 = RuntimeCommandResult::Duplicate {
917            command_id: "c1".into(),
918        };
919        assert_eq!(dup, dup2);
920    }
921
922    #[test]
923    fn runtime_event_serialization_round_trip() {
924        let event = RuntimeEvent::RouteFailed {
925            route_id: "route-a".to_string(),
926            error: "boom".to_string(),
927        };
928        let json = serde_json::to_string(&event).unwrap();
929        let back: RuntimeEvent = serde_json::from_str(&json).unwrap();
930        assert_eq!(event, back);
931    }
932
933    #[test]
934    fn noop_runtime_execute_and_ask_return_expected_shapes() {
935        let rt = NoopRuntime;
936        let cmd = RuntimeCommand::RegisterRoute {
937            spec: CanonicalRouteSpec::new("r2", "timer:tick"),
938            command_id: "c1".into(),
939            causation_id: None,
940        };
941        let cmd_result = block_on(rt.execute(cmd)).unwrap();
942        assert_eq!(
943            cmd_result,
944            RuntimeCommandResult::RouteRegistered {
945                route_id: "r2".into()
946            }
947        );
948
949        let query_result = block_on(rt.ask(RuntimeQuery::GetRouteStatus {
950            route_id: "r2".into(),
951        }))
952        .unwrap();
953        assert_eq!(
954            query_result,
955            RuntimeQueryResult::RouteStatus {
956                route_id: "r2".into(),
957                status: "Started".into()
958            }
959        );
960    }
961
962    #[test]
963    fn canonical_contract_name_and_version_constants_match() {
964        assert_eq!(CANONICAL_CONTRACT_NAME, "canonical-v1");
965        assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
966    }
967
968    #[test]
969    fn canonical_concurrency_spec_rejects_zero_max() {
970        let spec = CanonicalRouteSpec::new("r1", "timer:tick")
971            .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 0 });
972        let err = spec.validate_contract().unwrap_err().to_string();
973        assert!(err.contains("concurrency max must be > 0"), "{err}");
974    }
975
976    #[test]
977    fn canonical_v2_round_trip() {
978        let spec = CanonicalRouteSpec::new("r1", "timer:tick")
979            .with_auto_startup(false)
980            .with_startup_order(42)
981            .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 8 });
982        spec.validate_contract().unwrap();
983    }
984
985    #[test]
986    fn canonical_v2_version_is_2() {
987        assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
988    }
989
990    #[test]
991    fn canonical_loss_report_builder() {
992        let report =
993            CanonicalLossReport::from_field("error_handler", "not supported by canonical path", 2);
994        assert_eq!(report.dropped_fields.len(), 1);
995        assert_eq!(report.dropped_fields[0].field, "error_handler");
996    }
997
998    #[test]
999    fn canonical_v1_json_deserializes_in_v2() {
1000        let json = r#"{"route_id":"r1","from":"timer:tick","steps":[],"version":1}"#;
1001        let spec: CanonicalRouteSpec = serde_json::from_str(json).unwrap();
1002        assert_eq!(spec.route_id, "r1");
1003        assert!(spec.auto_startup.is_none());
1004        assert!(spec.startup_order.is_none());
1005        assert!(spec.concurrency.is_none());
1006        // CRITICAL: v1 specs must pass validation in v2 runtime (backward compat)
1007        spec.validate_contract().unwrap();
1008    }
1009}