Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 2;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19    "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22    &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24    "set_header",
25    "set_property",
26    "set_body",
27    "multicast",
28    "convert_body_to",
29    "bean",
30    "marshal",
31    "unmarshal",
32];
33pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
34    "processor",
35    "process",
36    "process_fn",
37    "map_body",
38    "set_body_fn",
39    "set_header_fn",
40];
41
42pub fn canonical_contract_supports_step(step: &str) -> bool {
43    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
44}
45
46pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
47    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
48        return Some(
49            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
50        );
51    }
52
53    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
54        return Some("rust-only programmable step; not representable in canonical v1 contract");
55    }
56
57    if canonical_contract_supports_step(step)
58        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
59    {
60        return Some(
61            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
62        );
63    }
64
65    None
66}
67
68#[derive(
69    Debug,
70    Clone,
71    PartialEq,
72    Eq,
73    serde::Serialize,
74    serde::Deserialize,
75    schemars::JsonSchema,
76    ts_rs::TS,
77)]
78#[serde(rename_all = "snake_case")]
79#[ts(rename_all = "snake_case")]
80pub struct CanonicalRouteSpec {
81    /// Stable minimal route representation for runtime command registration.
82    ///
83    /// Scope notes:
84    /// - This is intentionally a partial model and does not mirror every `BuilderStep`.
85    /// - Version 2 adds: auto_startup, startup_order, concurrency.
86    /// - Still excluded: error_handler, unit_of_work. These are set to defaults
87    ///   when compiling from canonical.
88    /// - Round-trip (YAML → Canonical → YAML) loses these fields.
89    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
90    pub route_id: String,
91    pub from: String,
92    pub steps: Vec<CanonicalStepSpec>,
93    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
94    pub auto_startup: Option<bool>,
95    pub startup_order: Option<i32>,
96    pub concurrency: Option<CanonicalConcurrencySpec>,
97    pub version: u32,
98}
99
100#[derive(
101    Debug,
102    Clone,
103    PartialEq,
104    Eq,
105    serde::Serialize,
106    serde::Deserialize,
107    schemars::JsonSchema,
108    ts_rs::TS,
109)]
110#[serde(tag = "step", content = "config", rename_all = "snake_case")]
111#[ts(rename_all = "snake_case")]
112pub enum CanonicalStepSpec {
113    To {
114        uri: String,
115    },
116    Log {
117        message: String,
118    },
119    WireTap {
120        uri: String,
121    },
122    Script {
123        expression: LanguageExpressionDef,
124    },
125    Filter {
126        predicate: LanguageExpressionDef,
127        steps: Vec<CanonicalStepSpec>,
128    },
129    Choice {
130        whens: Vec<CanonicalWhenSpec>,
131        otherwise: Option<Vec<CanonicalStepSpec>>,
132    },
133    Split {
134        expression: CanonicalSplitExpressionSpec,
135        aggregation: CanonicalSplitAggregationSpec,
136        parallel: bool,
137        parallel_limit: Option<usize>,
138        stop_on_exception: bool,
139        steps: Vec<CanonicalStepSpec>,
140    },
141    Aggregate(CanonicalAggregateSpec),
142    Stop,
143    Delay {
144        #[ts(type = "number")]
145        delay_ms: u64,
146        dynamic_header: Option<String>,
147    },
148}
149
150#[derive(
151    Debug,
152    Clone,
153    PartialEq,
154    Eq,
155    serde::Serialize,
156    serde::Deserialize,
157    schemars::JsonSchema,
158    ts_rs::TS,
159)]
160#[serde(rename_all = "snake_case")]
161#[ts(rename_all = "snake_case")]
162pub struct CanonicalWhenSpec {
163    pub predicate: LanguageExpressionDef,
164    pub steps: Vec<CanonicalStepSpec>,
165}
166
167#[derive(
168    Debug,
169    Clone,
170    PartialEq,
171    Eq,
172    serde::Serialize,
173    serde::Deserialize,
174    schemars::JsonSchema,
175    ts_rs::TS,
176)]
177#[serde(rename_all = "snake_case")]
178#[ts(rename_all = "snake_case")]
179pub enum CanonicalSplitExpressionSpec {
180    BodyLines,
181    BodyJsonArray,
182    Language(LanguageExpressionDef),
183}
184
185#[derive(
186    Debug,
187    Clone,
188    PartialEq,
189    Eq,
190    serde::Serialize,
191    serde::Deserialize,
192    schemars::JsonSchema,
193    ts_rs::TS,
194)]
195#[serde(rename_all = "snake_case")]
196#[ts(rename_all = "snake_case")]
197pub enum CanonicalSplitAggregationSpec {
198    LastWins,
199    CollectAll,
200    Original,
201}
202
203#[derive(
204    Debug,
205    Clone,
206    PartialEq,
207    Eq,
208    serde::Serialize,
209    serde::Deserialize,
210    schemars::JsonSchema,
211    ts_rs::TS,
212)]
213#[serde(rename_all = "snake_case")]
214#[ts(rename_all = "snake_case")]
215pub enum CanonicalAggregateStrategySpec {
216    CollectAll,
217}
218
219#[derive(
220    Debug,
221    Clone,
222    PartialEq,
223    Eq,
224    serde::Serialize,
225    serde::Deserialize,
226    schemars::JsonSchema,
227    ts_rs::TS,
228)]
229#[serde(rename_all = "snake_case")]
230#[ts(rename_all = "snake_case")]
231pub struct CanonicalAggregateSpec {
232    pub header: String,
233    pub completion_size: Option<usize>,
234    #[ts(type = "number")]
235    pub completion_timeout_ms: Option<u64>,
236    pub correlation_key: Option<String>,
237    pub force_completion_on_stop: Option<bool>,
238    pub discard_on_timeout: Option<bool>,
239    pub strategy: CanonicalAggregateStrategySpec,
240    pub max_buckets: Option<usize>,
241    #[ts(type = "number")]
242    pub bucket_ttl_ms: Option<u64>,
243}
244
245#[derive(
246    Debug,
247    Clone,
248    PartialEq,
249    Eq,
250    serde::Serialize,
251    serde::Deserialize,
252    schemars::JsonSchema,
253    ts_rs::TS,
254)]
255#[serde(rename_all = "snake_case")]
256#[ts(rename_all = "snake_case")]
257pub struct CanonicalCircuitBreakerSpec {
258    pub failure_threshold: u32,
259    #[ts(type = "number")]
260    pub open_duration_ms: u64,
261}
262
263#[derive(
264    Debug,
265    Clone,
266    PartialEq,
267    Eq,
268    serde::Serialize,
269    serde::Deserialize,
270    schemars::JsonSchema,
271    ts_rs::TS,
272)]
273#[serde(tag = "mode", rename_all = "snake_case")]
274pub enum CanonicalConcurrencySpec {
275    Sequential,
276    Concurrent { max: usize },
277}
278
279impl CanonicalRouteSpec {
280    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
281        Self {
282            route_id: route_id.into(),
283            from: from.into(),
284            steps: Vec::new(),
285            circuit_breaker: None,
286            auto_startup: None,
287            startup_order: None,
288            concurrency: None,
289            version: CANONICAL_CONTRACT_VERSION,
290        }
291    }
292
293    pub fn with_auto_startup(mut self, auto: bool) -> Self {
294        self.auto_startup = Some(auto);
295        self
296    }
297
298    pub fn with_startup_order(mut self, order: i32) -> Self {
299        self.startup_order = Some(order);
300        self
301    }
302
303    pub fn with_concurrency(mut self, concurrency: CanonicalConcurrencySpec) -> Self {
304        self.concurrency = Some(concurrency);
305        self
306    }
307
308    pub fn validate_contract(&self) -> Result<(), CamelError> {
309        if self.route_id.trim().is_empty() {
310            return Err(CamelError::RouteError(
311                "canonical contract violation: route_id cannot be empty".to_string(),
312            ));
313        }
314        if self.from.trim().is_empty() {
315            return Err(CamelError::RouteError(
316                "canonical contract violation: from cannot be empty".to_string(),
317            ));
318        }
319        if self.version == 0 || self.version > CANONICAL_CONTRACT_VERSION {
320            return Err(CamelError::RouteError(format!(
321                "canonical contract violation: expected version {}, got {}",
322                CANONICAL_CONTRACT_VERSION, self.version
323            )));
324        }
325        validate_steps(&self.steps)?;
326        if let Some(cb) = &self.circuit_breaker {
327            if cb.failure_threshold == 0 {
328                return Err(CamelError::RouteError(
329                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
330                        .to_string(),
331                ));
332            }
333            if cb.open_duration_ms == 0 {
334                return Err(CamelError::RouteError(
335                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
336                        .to_string(),
337                ));
338            }
339        }
340        if let Some(CanonicalConcurrencySpec::Concurrent { max: 0 }) = &self.concurrency {
341            return Err(CamelError::RouteError(
342                "canonical contract violation: concurrency max must be > 0".to_string(),
343            ));
344        }
345        Ok(())
346    }
347}
348
349#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
350pub struct CanonicalFieldLoss {
351    pub field: &'static str,
352    pub reason: String,
353    pub target_version: u32,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize)]
357pub struct CanonicalLossReport {
358    pub dropped_fields: Vec<CanonicalFieldLoss>,
359}
360
361impl CanonicalLossReport {
362    pub fn from_field(field: &'static str, reason: &str, target_version: u32) -> Self {
363        Self {
364            dropped_fields: vec![CanonicalFieldLoss {
365                field,
366                reason: reason.to_string(),
367                target_version,
368            }],
369        }
370    }
371
372    pub fn is_empty(&self) -> bool {
373        self.dropped_fields.is_empty()
374    }
375}
376
377fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
378    for step in steps {
379        match step {
380            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
381                if uri.trim().is_empty() {
382                    return Err(CamelError::RouteError(
383                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
384                    ));
385                }
386            }
387            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
388            CanonicalStepSpec::Choice { whens, otherwise } => {
389                for when in whens {
390                    validate_steps(&when.steps)?;
391                }
392                if let Some(otherwise) = otherwise {
393                    validate_steps(otherwise)?;
394                }
395            }
396            CanonicalStepSpec::Split {
397                parallel_limit,
398                steps,
399                ..
400            } => {
401                if let Some(limit) = parallel_limit
402                    && *limit == 0
403                {
404                    return Err(CamelError::RouteError(
405                        "canonical contract violation: split.parallel_limit must be > 0"
406                            .to_string(),
407                    ));
408                }
409                validate_steps(steps)?;
410            }
411            CanonicalStepSpec::Aggregate(config) => {
412                if config.header.trim().is_empty() {
413                    return Err(CamelError::RouteError(
414                        "canonical contract violation: aggregate.header cannot be empty"
415                            .to_string(),
416                    ));
417                }
418                if let Some(size) = config.completion_size
419                    && size == 0
420                {
421                    return Err(CamelError::RouteError(
422                        "canonical contract violation: aggregate.completion_size must be > 0"
423                            .to_string(),
424                    ));
425                }
426            }
427            CanonicalStepSpec::Log { .. }
428            | CanonicalStepSpec::Script { .. }
429            | CanonicalStepSpec::Stop
430            | CanonicalStepSpec::Delay { .. } => {}
431        }
432    }
433    Ok(())
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub enum RuntimeCommand {
438    RegisterRoute {
439        spec: CanonicalRouteSpec,
440        command_id: String,
441        causation_id: Option<String>,
442    },
443    StartRoute {
444        route_id: String,
445        command_id: String,
446        causation_id: Option<String>,
447    },
448    StopRoute {
449        route_id: String,
450        command_id: String,
451        causation_id: Option<String>,
452    },
453    SuspendRoute {
454        route_id: String,
455        command_id: String,
456        causation_id: Option<String>,
457    },
458    ResumeRoute {
459        route_id: String,
460        command_id: String,
461        causation_id: Option<String>,
462    },
463    ReloadRoute {
464        route_id: String,
465        command_id: String,
466        causation_id: Option<String>,
467    },
468    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
469    ///
470    /// This keeps aggregate/projection state aligned with controller-observed failures.
471    FailRoute {
472        route_id: String,
473        error: String,
474        command_id: String,
475        causation_id: Option<String>,
476    },
477    RemoveRoute {
478        route_id: String,
479        command_id: String,
480        causation_id: Option<String>,
481    },
482}
483
484impl RuntimeCommand {
485    pub fn command_id(&self) -> &str {
486        match self {
487            RuntimeCommand::RegisterRoute { command_id, .. }
488            | RuntimeCommand::StartRoute { command_id, .. }
489            | RuntimeCommand::StopRoute { command_id, .. }
490            | RuntimeCommand::SuspendRoute { command_id, .. }
491            | RuntimeCommand::ResumeRoute { command_id, .. }
492            | RuntimeCommand::ReloadRoute { command_id, .. }
493            | RuntimeCommand::FailRoute { command_id, .. }
494            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
495        }
496    }
497
498    pub fn causation_id(&self) -> Option<&str> {
499        match self {
500            RuntimeCommand::RegisterRoute { causation_id, .. }
501            | RuntimeCommand::StartRoute { causation_id, .. }
502            | RuntimeCommand::StopRoute { causation_id, .. }
503            | RuntimeCommand::SuspendRoute { causation_id, .. }
504            | RuntimeCommand::ResumeRoute { causation_id, .. }
505            | RuntimeCommand::ReloadRoute { causation_id, .. }
506            | RuntimeCommand::FailRoute { causation_id, .. }
507            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
508        }
509    }
510}
511
512#[derive(Debug, Clone, PartialEq, Eq)]
513pub enum RuntimeCommandResult {
514    Accepted,
515    Duplicate { command_id: String },
516    RouteRegistered { route_id: String },
517    RouteStateChanged { route_id: String, status: String },
518}
519
520#[derive(Debug, Clone, PartialEq, Eq)]
521pub enum RuntimeQuery {
522    GetRouteStatus {
523        route_id: String,
524    },
525    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
526    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
527    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
528    InFlightCount {
529        route_id: String,
530    },
531    ListRoutes,
532}
533
534#[derive(Debug, Clone, PartialEq, Eq)]
535pub enum RuntimeQueryResult {
536    InFlightCount { route_id: String, count: u64 },
537    RouteNotFound { route_id: String },
538    RouteStatus { route_id: String, status: String },
539    Routes { route_ids: Vec<String> },
540}
541
542#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
543pub enum RuntimeEvent {
544    RouteRegistered { route_id: String },
545    RouteStartRequested { route_id: String },
546    RouteStarted { route_id: String },
547    RouteFailed { route_id: String, error: String },
548    RouteStopped { route_id: String },
549    RouteSuspended { route_id: String },
550    RouteResumed { route_id: String },
551    RouteReloaded { route_id: String },
552    RouteRemoved { route_id: String },
553}
554
555#[async_trait]
556pub trait RuntimeCommandBus: Send + Sync {
557    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
558}
559
560#[async_trait]
561pub trait RuntimeQueryBus: Send + Sync {
562    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
563}
564
565pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
566
567impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use async_trait::async_trait;
573    use futures::executor::block_on;
574
575    struct NoopRuntime;
576
577    #[async_trait]
578    impl RuntimeCommandBus for NoopRuntime {
579        async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
580            Ok(match cmd {
581                RuntimeCommand::RegisterRoute { spec, .. } => {
582                    RuntimeCommandResult::RouteRegistered {
583                        route_id: spec.route_id,
584                    }
585                }
586                RuntimeCommand::StartRoute { route_id, .. }
587                | RuntimeCommand::StopRoute { route_id, .. }
588                | RuntimeCommand::SuspendRoute { route_id, .. }
589                | RuntimeCommand::ResumeRoute { route_id, .. }
590                | RuntimeCommand::ReloadRoute { route_id, .. }
591                | RuntimeCommand::FailRoute { route_id, .. }
592                | RuntimeCommand::RemoveRoute { route_id, .. } => {
593                    RuntimeCommandResult::RouteStateChanged {
594                        route_id,
595                        status: "ok".to_string(),
596                    }
597                }
598            })
599        }
600    }
601
602    #[async_trait]
603    impl RuntimeQueryBus for NoopRuntime {
604        async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
605            Ok(match query {
606                RuntimeQuery::GetRouteStatus { route_id } => RuntimeQueryResult::RouteStatus {
607                    route_id,
608                    status: "Started".to_string(),
609                },
610                RuntimeQuery::InFlightCount { route_id } => {
611                    RuntimeQueryResult::InFlightCount { route_id, count: 0 }
612                }
613                RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
614                    route_ids: vec!["r1".to_string()],
615                },
616            })
617        }
618    }
619
620    #[test]
621    fn command_and_query_ids_are_exposed() {
622        let cmd = RuntimeCommand::StartRoute {
623            route_id: "r1".into(),
624            command_id: "c1".into(),
625            causation_id: None,
626        };
627        assert_eq!(cmd.command_id(), "c1");
628    }
629
630    #[test]
631    fn canonical_spec_requires_route_id_and_from() {
632        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
633        assert_eq!(spec.route_id, "r1");
634        assert_eq!(spec.from, "timer:tick");
635        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
636        assert!(spec.steps.is_empty());
637        assert!(spec.circuit_breaker.is_none());
638    }
639
640    #[test]
641    fn canonical_contract_rejects_invalid_version() {
642        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
643        spec.version = 3;
644        let err = spec.validate_contract().unwrap_err().to_string();
645        assert!(err.contains("expected version"));
646    }
647
648    #[test]
649    fn canonical_contract_declares_subset_scope() {
650        assert!(canonical_contract_supports_step("to"));
651        assert!(canonical_contract_supports_step("split"));
652        assert!(!canonical_contract_supports_step("set_header"));
653        assert!(!canonical_contract_supports_step("set_property"));
654
655        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
656        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
657        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_property"));
658        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
659    }
660
661    #[test]
662    fn canonical_contract_rejection_reason_is_explicit() {
663        let set_header_reason = canonical_contract_rejection_reason("set_header")
664            .expect("set_header should have explicit reason");
665        assert!(set_header_reason.contains("out-of-scope"));
666
667        let set_property_reason = canonical_contract_rejection_reason("set_property")
668            .expect("set_property should have explicit reason");
669        assert!(set_property_reason.contains("out-of-scope"));
670
671        let processor_reason = canonical_contract_rejection_reason("processor")
672            .expect("processor should be rust-only");
673        assert!(processor_reason.contains("rust-only"));
674
675        let split_reason = canonical_contract_rejection_reason("split")
676            .expect("split should require declarative form");
677        assert!(split_reason.contains("declarative"));
678    }
679
680    #[test]
681    fn command_causation_id_is_exposed() {
682        let cmd = RuntimeCommand::StopRoute {
683            route_id: "r1".into(),
684            command_id: "c2".into(),
685            causation_id: Some("c1".into()),
686        };
687        assert_eq!(cmd.command_id(), "c2");
688        assert_eq!(cmd.causation_id(), Some("c1"));
689    }
690
691    #[test]
692    fn canonical_contract_rejects_empty_route_id_and_from() {
693        let spec = CanonicalRouteSpec::new("   ", "timer:tick");
694        let err = spec.validate_contract().unwrap_err().to_string();
695        assert!(err.contains("route_id cannot be empty"));
696
697        let spec = CanonicalRouteSpec::new("r1", "  ");
698        let err = spec.validate_contract().unwrap_err().to_string();
699        assert!(err.contains("from cannot be empty"));
700    }
701
702    #[test]
703    fn canonical_contract_rejects_invalid_nested_steps() {
704        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
705        spec.steps = vec![CanonicalStepSpec::Split {
706            expression: CanonicalSplitExpressionSpec::BodyLines,
707            aggregation: CanonicalSplitAggregationSpec::CollectAll,
708            parallel: true,
709            parallel_limit: Some(0),
710            stop_on_exception: false,
711            steps: vec![CanonicalStepSpec::To {
712                uri: "log:ok".to_string(),
713            }],
714        }];
715        let err = spec.validate_contract().unwrap_err().to_string();
716        assert!(err.contains("split.parallel_limit must be > 0"));
717
718        spec.steps = vec![CanonicalStepSpec::To {
719            uri: "   ".to_string(),
720        }];
721        let err = spec.validate_contract().unwrap_err().to_string();
722        assert!(err.contains("endpoint uri cannot be empty"));
723    }
724
725    #[test]
726    fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
727        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
728        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
729            header: " ".to_string(),
730            completion_size: Some(1),
731            completion_timeout_ms: None,
732            correlation_key: None,
733            force_completion_on_stop: None,
734            discard_on_timeout: None,
735            strategy: CanonicalAggregateStrategySpec::CollectAll,
736            max_buckets: None,
737            bucket_ttl_ms: None,
738        })];
739        let err = spec.validate_contract().unwrap_err().to_string();
740        assert!(err.contains("aggregate.header cannot be empty"));
741
742        spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
743            header: "k".to_string(),
744            completion_size: Some(0),
745            completion_timeout_ms: None,
746            correlation_key: None,
747            force_completion_on_stop: None,
748            discard_on_timeout: None,
749            strategy: CanonicalAggregateStrategySpec::CollectAll,
750            max_buckets: None,
751            bucket_ttl_ms: None,
752        })];
753        let err = spec.validate_contract().unwrap_err().to_string();
754        assert!(err.contains("aggregate.completion_size must be > 0"));
755
756        spec.steps = vec![];
757        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
758            failure_threshold: 0,
759            open_duration_ms: 10,
760        });
761        let err = spec.validate_contract().unwrap_err().to_string();
762        assert!(err.contains("failure_threshold must be > 0"));
763
764        spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
765            failure_threshold: 1,
766            open_duration_ms: 0,
767        });
768        let err = spec.validate_contract().unwrap_err().to_string();
769        assert!(err.contains("open_duration_ms must be > 0"));
770    }
771
772    #[test]
773    fn canonical_contract_rejection_reason_none_for_regular_steps() {
774        assert!(canonical_contract_rejection_reason("to").is_none());
775        assert!(canonical_contract_rejection_reason("unknown-step").is_none());
776    }
777
778    #[test]
779    fn command_helpers_cover_all_variants() {
780        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
781        let cmds = [
782            RuntimeCommand::RegisterRoute {
783                spec,
784                command_id: "c1".into(),
785                causation_id: Some("root".into()),
786            },
787            RuntimeCommand::StartRoute {
788                route_id: "r1".into(),
789                command_id: "c2".into(),
790                causation_id: None,
791            },
792            RuntimeCommand::StopRoute {
793                route_id: "r1".into(),
794                command_id: "c3".into(),
795                causation_id: None,
796            },
797            RuntimeCommand::SuspendRoute {
798                route_id: "r1".into(),
799                command_id: "c4".into(),
800                causation_id: None,
801            },
802            RuntimeCommand::ResumeRoute {
803                route_id: "r1".into(),
804                command_id: "c5".into(),
805                causation_id: None,
806            },
807            RuntimeCommand::ReloadRoute {
808                route_id: "r1".into(),
809                command_id: "c6".into(),
810                causation_id: None,
811            },
812            RuntimeCommand::FailRoute {
813                route_id: "r1".into(),
814                error: "boom".into(),
815                command_id: "c7".into(),
816                causation_id: None,
817            },
818            RuntimeCommand::RemoveRoute {
819                route_id: "r1".into(),
820                command_id: "c8".into(),
821                causation_id: None,
822            },
823        ];
824
825        let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
826        assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
827        assert_eq!(cmds[0].causation_id(), Some("root"));
828        assert_eq!(cmds[1].causation_id(), None);
829    }
830
831    #[test]
832    fn canonical_route_spec_serde_roundtrip() {
833        let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
834        spec.steps.push(CanonicalStepSpec::Log {
835            message: "Hello".into(),
836        });
837        spec.steps.push(CanonicalStepSpec::To {
838            uri: "log:info".into(),
839        });
840        spec.steps.push(CanonicalStepSpec::Stop);
841
842        let json = serde_json::to_string(&spec).unwrap();
843        let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
844        assert_eq!(spec, deserialized);
845    }
846
847    #[test]
848    fn canonical_step_spec_serde_variants() {
849        let steps = vec![
850            CanonicalStepSpec::To {
851                uri: "direct:a".into(),
852            },
853            CanonicalStepSpec::Log {
854                message: "msg".into(),
855            },
856            CanonicalStepSpec::WireTap {
857                uri: "direct:audit".into(),
858            },
859            CanonicalStepSpec::Stop,
860            CanonicalStepSpec::Delay {
861                delay_ms: 100,
862                dynamic_header: None,
863            },
864        ];
865        let json = serde_json::to_string_pretty(&steps).unwrap();
866        let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
867        assert_eq!(steps, back);
868    }
869
870    #[test]
871    fn canonical_route_spec_json_schema_generates() {
872        let schema = schemars::schema_for!(CanonicalRouteSpec);
873        let json = serde_json::to_string(&schema).unwrap();
874        assert!(json.contains("CanonicalRouteSpec"));
875        assert!(json.contains("route_id"));
876    }
877
878    #[test]
879    fn canonical_json_schema_has_no_function_step() {
880        let schema = schemars::schema_for!(CanonicalRouteSpec);
881        let json = serde_json::to_string(&schema).unwrap();
882        assert!(
883            !json.contains("\"function\""),
884            "canonical JSON schema must not contain 'function' step"
885        );
886    }
887
888    #[test]
889    fn canonical_contract_does_not_support_function() {
890        assert!(
891            !canonical_contract_supports_step("function"),
892            "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
893        );
894    }
895
896    #[test]
897    fn runtime_command_result_all_variants_are_distinct() {
898        let accepted = RuntimeCommandResult::Accepted;
899        let dup = RuntimeCommandResult::Duplicate {
900            command_id: "c1".into(),
901        };
902        let registered = RuntimeCommandResult::RouteRegistered {
903            route_id: "r1".into(),
904        };
905        let changed = RuntimeCommandResult::RouteStateChanged {
906            route_id: "r1".into(),
907            status: "Started".into(),
908        };
909
910        assert_ne!(accepted, dup);
911        assert_ne!(dup, registered);
912        assert_ne!(registered, changed);
913
914        let dup2 = RuntimeCommandResult::Duplicate {
915            command_id: "c1".into(),
916        };
917        assert_eq!(dup, dup2);
918    }
919
920    #[test]
921    fn runtime_event_serialization_round_trip() {
922        let event = RuntimeEvent::RouteFailed {
923            route_id: "route-a".to_string(),
924            error: "boom".to_string(),
925        };
926        let json = serde_json::to_string(&event).unwrap();
927        let back: RuntimeEvent = serde_json::from_str(&json).unwrap();
928        assert_eq!(event, back);
929    }
930
931    #[test]
932    fn noop_runtime_execute_and_ask_return_expected_shapes() {
933        let rt = NoopRuntime;
934        let cmd = RuntimeCommand::RegisterRoute {
935            spec: CanonicalRouteSpec::new("r2", "timer:tick"),
936            command_id: "c1".into(),
937            causation_id: None,
938        };
939        let cmd_result = block_on(rt.execute(cmd)).unwrap();
940        assert_eq!(
941            cmd_result,
942            RuntimeCommandResult::RouteRegistered {
943                route_id: "r2".into()
944            }
945        );
946
947        let query_result = block_on(rt.ask(RuntimeQuery::GetRouteStatus {
948            route_id: "r2".into(),
949        }))
950        .unwrap();
951        assert_eq!(
952            query_result,
953            RuntimeQueryResult::RouteStatus {
954                route_id: "r2".into(),
955                status: "Started".into()
956            }
957        );
958    }
959
960    #[test]
961    fn canonical_contract_name_and_version_constants_match() {
962        assert_eq!(CANONICAL_CONTRACT_NAME, "canonical-v1");
963        assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
964    }
965
966    #[test]
967    fn canonical_concurrency_spec_rejects_zero_max() {
968        let spec = CanonicalRouteSpec::new("r1", "timer:tick")
969            .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 0 });
970        let err = spec.validate_contract().unwrap_err().to_string();
971        assert!(err.contains("concurrency max must be > 0"), "{err}");
972    }
973
974    #[test]
975    fn canonical_v2_round_trip() {
976        let spec = CanonicalRouteSpec::new("r1", "timer:tick")
977            .with_auto_startup(false)
978            .with_startup_order(42)
979            .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 8 });
980        spec.validate_contract().unwrap();
981    }
982
983    #[test]
984    fn canonical_v2_version_is_2() {
985        assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
986    }
987
988    #[test]
989    fn canonical_loss_report_builder() {
990        let report =
991            CanonicalLossReport::from_field("error_handler", "not supported by canonical path", 2);
992        assert_eq!(report.dropped_fields.len(), 1);
993        assert_eq!(report.dropped_fields[0].field, "error_handler");
994    }
995
996    #[test]
997    fn canonical_v1_json_deserializes_in_v2() {
998        let json = r#"{"route_id":"r1","from":"timer:tick","steps":[],"version":1}"#;
999        let spec: CanonicalRouteSpec = serde_json::from_str(json).unwrap();
1000        assert_eq!(spec.route_id, "r1");
1001        assert!(spec.auto_startup.is_none());
1002        assert!(spec.startup_order.is_none());
1003        assert!(spec.concurrency.is_none());
1004        // CRITICAL: v1 specs must pass validation in v2 runtime (backward compat)
1005        spec.validate_contract().unwrap();
1006    }
1007}