1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6use crate::splitter::StreamSplitConfig;
7
8pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
9pub const CANONICAL_CONTRACT_VERSION: u32 = 2;
10pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
11 "to",
12 "log",
13 "wire_tap",
14 "script",
15 "filter",
16 "choice",
17 "split",
18 "aggregate",
19 "stop",
20 "delay",
21];
22pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
23 &["script", "filter", "choice", "split"];
24pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
25 "set_header",
26 "set_property",
27 "set_body",
28 "multicast",
29 "convert_body_to",
30 "bean",
31 "marshal",
32 "unmarshal",
33];
34pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
35 "processor",
36 "process",
37 "process_fn",
38 "map_body",
39 "set_body_fn",
40 "set_header_fn",
41];
42
43pub fn canonical_contract_supports_step(step: &str) -> bool {
44 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
45}
46
47pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
48 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
49 return Some(
50 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
51 );
52 }
53
54 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
55 return Some("rust-only programmable step; not representable in canonical v1 contract");
56 }
57
58 if canonical_contract_supports_step(step)
59 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
60 {
61 return Some(
62 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
63 );
64 }
65
66 None
67}
68
69#[derive(
70 Debug,
71 Clone,
72 PartialEq,
73 Eq,
74 serde::Serialize,
75 serde::Deserialize,
76 schemars::JsonSchema,
77 ts_rs::TS,
78)]
79#[serde(rename_all = "snake_case")]
80#[ts(rename_all = "snake_case")]
81pub struct CanonicalRouteSpec {
82 pub route_id: String,
92 pub from: String,
93 pub steps: Vec<CanonicalStepSpec>,
94 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
95 pub auto_startup: Option<bool>,
96 pub startup_order: Option<i32>,
97 pub concurrency: Option<CanonicalConcurrencySpec>,
98 pub version: u32,
99}
100
101#[derive(
102 Debug,
103 Clone,
104 PartialEq,
105 Eq,
106 serde::Serialize,
107 serde::Deserialize,
108 schemars::JsonSchema,
109 ts_rs::TS,
110)]
111#[serde(tag = "step", content = "config", rename_all = "snake_case")]
112#[ts(rename_all = "snake_case")]
113pub enum CanonicalStepSpec {
114 To {
115 uri: String,
116 },
117 Log {
118 message: String,
119 },
120 WireTap {
121 uri: String,
122 },
123 Script {
124 expression: LanguageExpressionDef,
125 },
126 Filter {
127 predicate: LanguageExpressionDef,
128 steps: Vec<CanonicalStepSpec>,
129 },
130 Choice {
131 whens: Vec<CanonicalWhenSpec>,
132 otherwise: Option<Vec<CanonicalStepSpec>>,
133 },
134 Split {
135 expression: CanonicalSplitExpressionSpec,
136 aggregation: CanonicalSplitAggregationSpec,
137 parallel: bool,
138 parallel_limit: Option<usize>,
139 stop_on_exception: bool,
140 steps: Vec<CanonicalStepSpec>,
141 },
142 Aggregate(CanonicalAggregateSpec),
143 Stop,
144 Delay {
145 #[ts(type = "number")]
146 delay_ms: u64,
147 dynamic_header: Option<String>,
148 },
149}
150
151#[derive(
152 Debug,
153 Clone,
154 PartialEq,
155 Eq,
156 serde::Serialize,
157 serde::Deserialize,
158 schemars::JsonSchema,
159 ts_rs::TS,
160)]
161#[serde(rename_all = "snake_case")]
162#[ts(rename_all = "snake_case")]
163pub struct CanonicalWhenSpec {
164 pub predicate: LanguageExpressionDef,
165 pub steps: Vec<CanonicalStepSpec>,
166}
167
168#[derive(
169 Debug,
170 Clone,
171 PartialEq,
172 Eq,
173 serde::Serialize,
174 serde::Deserialize,
175 schemars::JsonSchema,
176 ts_rs::TS,
177)]
178#[serde(rename_all = "snake_case")]
179#[ts(rename_all = "snake_case")]
180pub enum CanonicalSplitExpressionSpec {
181 BodyLines,
182 BodyJsonArray,
183 Language(LanguageExpressionDef),
184 Stream(StreamSplitConfig),
185}
186
187#[derive(
188 Debug,
189 Clone,
190 PartialEq,
191 Eq,
192 serde::Serialize,
193 serde::Deserialize,
194 schemars::JsonSchema,
195 ts_rs::TS,
196)]
197#[serde(rename_all = "snake_case")]
198#[ts(rename_all = "snake_case")]
199pub enum CanonicalSplitAggregationSpec {
200 LastWins,
201 CollectAll,
202 Original,
203}
204
205#[derive(
206 Debug,
207 Clone,
208 PartialEq,
209 Eq,
210 serde::Serialize,
211 serde::Deserialize,
212 schemars::JsonSchema,
213 ts_rs::TS,
214)]
215#[serde(rename_all = "snake_case")]
216#[ts(rename_all = "snake_case")]
217pub enum CanonicalAggregateStrategySpec {
218 CollectAll,
219}
220
221#[derive(
222 Debug,
223 Clone,
224 PartialEq,
225 Eq,
226 serde::Serialize,
227 serde::Deserialize,
228 schemars::JsonSchema,
229 ts_rs::TS,
230)]
231#[serde(rename_all = "snake_case")]
232#[ts(rename_all = "snake_case")]
233pub struct CanonicalAggregateSpec {
234 pub header: String,
235 pub completion_size: Option<usize>,
236 #[ts(type = "number")]
237 pub completion_timeout_ms: Option<u64>,
238 pub correlation_key: Option<String>,
239 pub force_completion_on_stop: Option<bool>,
240 pub discard_on_timeout: Option<bool>,
241 pub strategy: CanonicalAggregateStrategySpec,
242 pub max_buckets: Option<usize>,
243 #[ts(type = "number")]
244 pub bucket_ttl_ms: Option<u64>,
245}
246
247#[derive(
248 Debug,
249 Clone,
250 PartialEq,
251 Eq,
252 serde::Serialize,
253 serde::Deserialize,
254 schemars::JsonSchema,
255 ts_rs::TS,
256)]
257#[serde(rename_all = "snake_case")]
258#[ts(rename_all = "snake_case")]
259pub struct CanonicalCircuitBreakerSpec {
260 pub failure_threshold: u32,
261 #[ts(type = "number")]
262 pub open_duration_ms: u64,
263}
264
265#[derive(
266 Debug,
267 Clone,
268 PartialEq,
269 Eq,
270 serde::Serialize,
271 serde::Deserialize,
272 schemars::JsonSchema,
273 ts_rs::TS,
274)]
275#[serde(tag = "mode", rename_all = "snake_case")]
276pub enum CanonicalConcurrencySpec {
277 Sequential,
278 Concurrent { max: usize },
279}
280
281impl CanonicalRouteSpec {
282 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
283 Self {
284 route_id: route_id.into(),
285 from: from.into(),
286 steps: Vec::new(),
287 circuit_breaker: None,
288 auto_startup: None,
289 startup_order: None,
290 concurrency: None,
291 version: CANONICAL_CONTRACT_VERSION,
292 }
293 }
294
295 pub fn with_auto_startup(mut self, auto: bool) -> Self {
296 self.auto_startup = Some(auto);
297 self
298 }
299
300 pub fn with_startup_order(mut self, order: i32) -> Self {
301 self.startup_order = Some(order);
302 self
303 }
304
305 pub fn with_concurrency(mut self, concurrency: CanonicalConcurrencySpec) -> Self {
306 self.concurrency = Some(concurrency);
307 self
308 }
309
310 pub fn validate_contract(&self) -> Result<(), CamelError> {
311 if self.route_id.trim().is_empty() {
312 return Err(CamelError::RouteError(
313 "canonical contract violation: route_id cannot be empty".to_string(),
314 ));
315 }
316 if self.from.trim().is_empty() {
317 return Err(CamelError::RouteError(
318 "canonical contract violation: from cannot be empty".to_string(),
319 ));
320 }
321 if self.version == 0 || self.version > CANONICAL_CONTRACT_VERSION {
322 return Err(CamelError::RouteError(format!(
323 "canonical contract violation: expected version {}, got {}",
324 CANONICAL_CONTRACT_VERSION, self.version
325 )));
326 }
327 validate_steps(&self.steps)?;
328 if let Some(cb) = &self.circuit_breaker {
329 if cb.failure_threshold == 0 {
330 return Err(CamelError::RouteError(
331 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
332 .to_string(),
333 ));
334 }
335 if cb.open_duration_ms == 0 {
336 return Err(CamelError::RouteError(
337 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
338 .to_string(),
339 ));
340 }
341 }
342 if let Some(CanonicalConcurrencySpec::Concurrent { max: 0 }) = &self.concurrency {
343 return Err(CamelError::RouteError(
344 "canonical contract violation: concurrency max must be > 0".to_string(),
345 ));
346 }
347 Ok(())
348 }
349}
350
351#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
352pub struct CanonicalFieldLoss {
353 pub field: &'static str,
354 pub reason: String,
355 pub target_version: u32,
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize)]
359pub struct CanonicalLossReport {
360 pub dropped_fields: Vec<CanonicalFieldLoss>,
361}
362
363impl CanonicalLossReport {
364 pub fn from_field(field: &'static str, reason: &str, target_version: u32) -> Self {
365 Self {
366 dropped_fields: vec![CanonicalFieldLoss {
367 field,
368 reason: reason.to_string(),
369 target_version,
370 }],
371 }
372 }
373
374 pub fn is_empty(&self) -> bool {
375 self.dropped_fields.is_empty()
376 }
377}
378
379fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
380 for step in steps {
381 match step {
382 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
383 if uri.trim().is_empty() {
384 return Err(CamelError::RouteError(
385 "canonical contract violation: endpoint uri cannot be empty".to_string(),
386 ));
387 }
388 }
389 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
390 CanonicalStepSpec::Choice { whens, otherwise } => {
391 for when in whens {
392 validate_steps(&when.steps)?;
393 }
394 if let Some(otherwise) = otherwise {
395 validate_steps(otherwise)?;
396 }
397 }
398 CanonicalStepSpec::Split {
399 parallel_limit,
400 steps,
401 ..
402 } => {
403 if let Some(limit) = parallel_limit
404 && *limit == 0
405 {
406 return Err(CamelError::RouteError(
407 "canonical contract violation: split.parallel_limit must be > 0"
408 .to_string(),
409 ));
410 }
411 validate_steps(steps)?;
412 }
413 CanonicalStepSpec::Aggregate(config) => {
414 if config.header.trim().is_empty() {
415 return Err(CamelError::RouteError(
416 "canonical contract violation: aggregate.header cannot be empty"
417 .to_string(),
418 ));
419 }
420 if let Some(size) = config.completion_size
421 && size == 0
422 {
423 return Err(CamelError::RouteError(
424 "canonical contract violation: aggregate.completion_size must be > 0"
425 .to_string(),
426 ));
427 }
428 }
429 CanonicalStepSpec::Log { .. }
430 | CanonicalStepSpec::Script { .. }
431 | CanonicalStepSpec::Stop
432 | CanonicalStepSpec::Delay { .. } => {}
433 }
434 }
435 Ok(())
436}
437
438#[derive(Debug, Clone, PartialEq, Eq)]
439pub enum RuntimeCommand {
440 RegisterRoute {
441 spec: CanonicalRouteSpec,
442 command_id: String,
443 causation_id: Option<String>,
444 },
445 StartRoute {
446 route_id: String,
447 command_id: String,
448 causation_id: Option<String>,
449 },
450 StopRoute {
451 route_id: String,
452 command_id: String,
453 causation_id: Option<String>,
454 },
455 SuspendRoute {
456 route_id: String,
457 command_id: String,
458 causation_id: Option<String>,
459 },
460 ResumeRoute {
461 route_id: String,
462 command_id: String,
463 causation_id: Option<String>,
464 },
465 ReloadRoute {
466 route_id: String,
467 command_id: String,
468 causation_id: Option<String>,
469 },
470 FailRoute {
474 route_id: String,
475 error: String,
476 command_id: String,
477 causation_id: Option<String>,
478 },
479 RemoveRoute {
480 route_id: String,
481 command_id: String,
482 causation_id: Option<String>,
483 },
484}
485
486impl RuntimeCommand {
487 pub fn command_id(&self) -> &str {
488 match self {
489 RuntimeCommand::RegisterRoute { command_id, .. }
490 | RuntimeCommand::StartRoute { command_id, .. }
491 | RuntimeCommand::StopRoute { command_id, .. }
492 | RuntimeCommand::SuspendRoute { command_id, .. }
493 | RuntimeCommand::ResumeRoute { command_id, .. }
494 | RuntimeCommand::ReloadRoute { command_id, .. }
495 | RuntimeCommand::FailRoute { command_id, .. }
496 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
497 }
498 }
499
500 pub fn causation_id(&self) -> Option<&str> {
501 match self {
502 RuntimeCommand::RegisterRoute { causation_id, .. }
503 | RuntimeCommand::StartRoute { causation_id, .. }
504 | RuntimeCommand::StopRoute { causation_id, .. }
505 | RuntimeCommand::SuspendRoute { causation_id, .. }
506 | RuntimeCommand::ResumeRoute { causation_id, .. }
507 | RuntimeCommand::ReloadRoute { causation_id, .. }
508 | RuntimeCommand::FailRoute { causation_id, .. }
509 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
510 }
511 }
512}
513
514#[derive(Debug, Clone, PartialEq, Eq)]
515pub enum RuntimeCommandResult {
516 Accepted,
517 Duplicate { command_id: String },
518 RouteRegistered { route_id: String },
519 RouteStateChanged { route_id: String, status: String },
520}
521
522#[derive(Debug, Clone, PartialEq, Eq)]
523pub enum RuntimeQuery {
524 GetRouteStatus {
525 route_id: String,
526 },
527 InFlightCount {
531 route_id: String,
532 },
533 ListRoutes,
534}
535
536#[derive(Debug, Clone, PartialEq, Eq)]
537pub enum RuntimeQueryResult {
538 InFlightCount { route_id: String, count: u64 },
539 RouteNotFound { route_id: String },
540 RouteStatus { route_id: String, status: String },
541 Routes { route_ids: Vec<String> },
542}
543
544#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
545pub enum RuntimeEvent {
546 RouteRegistered { route_id: String },
547 RouteStartRequested { route_id: String },
548 RouteStarted { route_id: String },
549 RouteFailed { route_id: String, error: String },
550 RouteStopped { route_id: String },
551 RouteSuspended { route_id: String },
552 RouteResumed { route_id: String },
553 RouteReloaded { route_id: String },
554 RouteRemoved { route_id: String },
555}
556
557#[async_trait]
558pub trait RuntimeCommandBus: Send + Sync {
559 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
560}
561
562#[async_trait]
563pub trait RuntimeQueryBus: Send + Sync {
564 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
565}
566
567pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
568
569impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use async_trait::async_trait;
575 use futures::executor::block_on;
576
577 struct NoopRuntime;
578
579 #[async_trait]
580 impl RuntimeCommandBus for NoopRuntime {
581 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
582 Ok(match cmd {
583 RuntimeCommand::RegisterRoute { spec, .. } => {
584 RuntimeCommandResult::RouteRegistered {
585 route_id: spec.route_id,
586 }
587 }
588 RuntimeCommand::StartRoute { route_id, .. }
589 | RuntimeCommand::StopRoute { route_id, .. }
590 | RuntimeCommand::SuspendRoute { route_id, .. }
591 | RuntimeCommand::ResumeRoute { route_id, .. }
592 | RuntimeCommand::ReloadRoute { route_id, .. }
593 | RuntimeCommand::FailRoute { route_id, .. }
594 | RuntimeCommand::RemoveRoute { route_id, .. } => {
595 RuntimeCommandResult::RouteStateChanged {
596 route_id,
597 status: "ok".to_string(),
598 }
599 }
600 })
601 }
602 }
603
604 #[async_trait]
605 impl RuntimeQueryBus for NoopRuntime {
606 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
607 Ok(match query {
608 RuntimeQuery::GetRouteStatus { route_id } => RuntimeQueryResult::RouteStatus {
609 route_id,
610 status: "Started".to_string(),
611 },
612 RuntimeQuery::InFlightCount { route_id } => {
613 RuntimeQueryResult::InFlightCount { route_id, count: 0 }
614 }
615 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
616 route_ids: vec!["r1".to_string()],
617 },
618 })
619 }
620 }
621
622 #[test]
623 fn command_and_query_ids_are_exposed() {
624 let cmd = RuntimeCommand::StartRoute {
625 route_id: "r1".into(),
626 command_id: "c1".into(),
627 causation_id: None,
628 };
629 assert_eq!(cmd.command_id(), "c1");
630 }
631
632 #[test]
633 fn canonical_spec_requires_route_id_and_from() {
634 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
635 assert_eq!(spec.route_id, "r1");
636 assert_eq!(spec.from, "timer:tick");
637 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
638 assert!(spec.steps.is_empty());
639 assert!(spec.circuit_breaker.is_none());
640 }
641
642 #[test]
643 fn canonical_contract_rejects_invalid_version() {
644 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
645 spec.version = 3;
646 let err = spec.validate_contract().unwrap_err().to_string();
647 assert!(err.contains("expected version"));
648 }
649
650 #[test]
651 fn canonical_contract_declares_subset_scope() {
652 assert!(canonical_contract_supports_step("to"));
653 assert!(canonical_contract_supports_step("split"));
654 assert!(!canonical_contract_supports_step("set_header"));
655 assert!(!canonical_contract_supports_step("set_property"));
656
657 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
658 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
659 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_property"));
660 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
661 }
662
663 #[test]
664 fn canonical_contract_rejection_reason_is_explicit() {
665 let set_header_reason = canonical_contract_rejection_reason("set_header")
666 .expect("set_header should have explicit reason");
667 assert!(set_header_reason.contains("out-of-scope"));
668
669 let set_property_reason = canonical_contract_rejection_reason("set_property")
670 .expect("set_property should have explicit reason");
671 assert!(set_property_reason.contains("out-of-scope"));
672
673 let processor_reason = canonical_contract_rejection_reason("processor")
674 .expect("processor should be rust-only");
675 assert!(processor_reason.contains("rust-only"));
676
677 let split_reason = canonical_contract_rejection_reason("split")
678 .expect("split should require declarative form");
679 assert!(split_reason.contains("declarative"));
680 }
681
682 #[test]
683 fn command_causation_id_is_exposed() {
684 let cmd = RuntimeCommand::StopRoute {
685 route_id: "r1".into(),
686 command_id: "c2".into(),
687 causation_id: Some("c1".into()),
688 };
689 assert_eq!(cmd.command_id(), "c2");
690 assert_eq!(cmd.causation_id(), Some("c1"));
691 }
692
693 #[test]
694 fn canonical_contract_rejects_empty_route_id_and_from() {
695 let spec = CanonicalRouteSpec::new(" ", "timer:tick");
696 let err = spec.validate_contract().unwrap_err().to_string();
697 assert!(err.contains("route_id cannot be empty"));
698
699 let spec = CanonicalRouteSpec::new("r1", " ");
700 let err = spec.validate_contract().unwrap_err().to_string();
701 assert!(err.contains("from cannot be empty"));
702 }
703
704 #[test]
705 fn canonical_contract_rejects_invalid_nested_steps() {
706 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
707 spec.steps = vec![CanonicalStepSpec::Split {
708 expression: CanonicalSplitExpressionSpec::BodyLines,
709 aggregation: CanonicalSplitAggregationSpec::CollectAll,
710 parallel: true,
711 parallel_limit: Some(0),
712 stop_on_exception: false,
713 steps: vec![CanonicalStepSpec::To {
714 uri: "log:ok".to_string(),
715 }],
716 }];
717 let err = spec.validate_contract().unwrap_err().to_string();
718 assert!(err.contains("split.parallel_limit must be > 0"));
719
720 spec.steps = vec![CanonicalStepSpec::To {
721 uri: " ".to_string(),
722 }];
723 let err = spec.validate_contract().unwrap_err().to_string();
724 assert!(err.contains("endpoint uri cannot be empty"));
725 }
726
727 #[test]
728 fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
729 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
730 spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
731 header: " ".to_string(),
732 completion_size: Some(1),
733 completion_timeout_ms: None,
734 correlation_key: None,
735 force_completion_on_stop: None,
736 discard_on_timeout: None,
737 strategy: CanonicalAggregateStrategySpec::CollectAll,
738 max_buckets: None,
739 bucket_ttl_ms: None,
740 })];
741 let err = spec.validate_contract().unwrap_err().to_string();
742 assert!(err.contains("aggregate.header cannot be empty"));
743
744 spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
745 header: "k".to_string(),
746 completion_size: Some(0),
747 completion_timeout_ms: None,
748 correlation_key: None,
749 force_completion_on_stop: None,
750 discard_on_timeout: None,
751 strategy: CanonicalAggregateStrategySpec::CollectAll,
752 max_buckets: None,
753 bucket_ttl_ms: None,
754 })];
755 let err = spec.validate_contract().unwrap_err().to_string();
756 assert!(err.contains("aggregate.completion_size must be > 0"));
757
758 spec.steps = vec![];
759 spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
760 failure_threshold: 0,
761 open_duration_ms: 10,
762 });
763 let err = spec.validate_contract().unwrap_err().to_string();
764 assert!(err.contains("failure_threshold must be > 0"));
765
766 spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
767 failure_threshold: 1,
768 open_duration_ms: 0,
769 });
770 let err = spec.validate_contract().unwrap_err().to_string();
771 assert!(err.contains("open_duration_ms must be > 0"));
772 }
773
774 #[test]
775 fn canonical_contract_rejection_reason_none_for_regular_steps() {
776 assert!(canonical_contract_rejection_reason("to").is_none());
777 assert!(canonical_contract_rejection_reason("unknown-step").is_none());
778 }
779
780 #[test]
781 fn command_helpers_cover_all_variants() {
782 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
783 let cmds = [
784 RuntimeCommand::RegisterRoute {
785 spec,
786 command_id: "c1".into(),
787 causation_id: Some("root".into()),
788 },
789 RuntimeCommand::StartRoute {
790 route_id: "r1".into(),
791 command_id: "c2".into(),
792 causation_id: None,
793 },
794 RuntimeCommand::StopRoute {
795 route_id: "r1".into(),
796 command_id: "c3".into(),
797 causation_id: None,
798 },
799 RuntimeCommand::SuspendRoute {
800 route_id: "r1".into(),
801 command_id: "c4".into(),
802 causation_id: None,
803 },
804 RuntimeCommand::ResumeRoute {
805 route_id: "r1".into(),
806 command_id: "c5".into(),
807 causation_id: None,
808 },
809 RuntimeCommand::ReloadRoute {
810 route_id: "r1".into(),
811 command_id: "c6".into(),
812 causation_id: None,
813 },
814 RuntimeCommand::FailRoute {
815 route_id: "r1".into(),
816 error: "boom".into(),
817 command_id: "c7".into(),
818 causation_id: None,
819 },
820 RuntimeCommand::RemoveRoute {
821 route_id: "r1".into(),
822 command_id: "c8".into(),
823 causation_id: None,
824 },
825 ];
826
827 let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
828 assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
829 assert_eq!(cmds[0].causation_id(), Some("root"));
830 assert_eq!(cmds[1].causation_id(), None);
831 }
832
833 #[test]
834 fn canonical_route_spec_serde_roundtrip() {
835 let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
836 spec.steps.push(CanonicalStepSpec::Log {
837 message: "Hello".into(),
838 });
839 spec.steps.push(CanonicalStepSpec::To {
840 uri: "log:info".into(),
841 });
842 spec.steps.push(CanonicalStepSpec::Stop);
843
844 let json = serde_json::to_string(&spec).unwrap();
845 let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
846 assert_eq!(spec, deserialized);
847 }
848
849 #[test]
850 fn canonical_step_spec_serde_variants() {
851 let steps = vec![
852 CanonicalStepSpec::To {
853 uri: "direct:a".into(),
854 },
855 CanonicalStepSpec::Log {
856 message: "msg".into(),
857 },
858 CanonicalStepSpec::WireTap {
859 uri: "direct:audit".into(),
860 },
861 CanonicalStepSpec::Stop,
862 CanonicalStepSpec::Delay {
863 delay_ms: 100,
864 dynamic_header: None,
865 },
866 ];
867 let json = serde_json::to_string_pretty(&steps).unwrap();
868 let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
869 assert_eq!(steps, back);
870 }
871
872 #[test]
873 fn canonical_route_spec_json_schema_generates() {
874 let schema = schemars::schema_for!(CanonicalRouteSpec);
875 let json = serde_json::to_string(&schema).unwrap();
876 assert!(json.contains("CanonicalRouteSpec"));
877 assert!(json.contains("route_id"));
878 }
879
880 #[test]
881 fn canonical_json_schema_has_no_function_step() {
882 let schema = schemars::schema_for!(CanonicalRouteSpec);
883 let json = serde_json::to_string(&schema).unwrap();
884 assert!(
885 !json.contains("\"function\""),
886 "canonical JSON schema must not contain 'function' step"
887 );
888 }
889
890 #[test]
891 fn canonical_contract_does_not_support_function() {
892 assert!(
893 !canonical_contract_supports_step("function"),
894 "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
895 );
896 }
897
898 #[test]
899 fn runtime_command_result_all_variants_are_distinct() {
900 let accepted = RuntimeCommandResult::Accepted;
901 let dup = RuntimeCommandResult::Duplicate {
902 command_id: "c1".into(),
903 };
904 let registered = RuntimeCommandResult::RouteRegistered {
905 route_id: "r1".into(),
906 };
907 let changed = RuntimeCommandResult::RouteStateChanged {
908 route_id: "r1".into(),
909 status: "Started".into(),
910 };
911
912 assert_ne!(accepted, dup);
913 assert_ne!(dup, registered);
914 assert_ne!(registered, changed);
915
916 let dup2 = RuntimeCommandResult::Duplicate {
917 command_id: "c1".into(),
918 };
919 assert_eq!(dup, dup2);
920 }
921
922 #[test]
923 fn runtime_event_serialization_round_trip() {
924 let event = RuntimeEvent::RouteFailed {
925 route_id: "route-a".to_string(),
926 error: "boom".to_string(),
927 };
928 let json = serde_json::to_string(&event).unwrap();
929 let back: RuntimeEvent = serde_json::from_str(&json).unwrap();
930 assert_eq!(event, back);
931 }
932
933 #[test]
934 fn noop_runtime_execute_and_ask_return_expected_shapes() {
935 let rt = NoopRuntime;
936 let cmd = RuntimeCommand::RegisterRoute {
937 spec: CanonicalRouteSpec::new("r2", "timer:tick"),
938 command_id: "c1".into(),
939 causation_id: None,
940 };
941 let cmd_result = block_on(rt.execute(cmd)).unwrap();
942 assert_eq!(
943 cmd_result,
944 RuntimeCommandResult::RouteRegistered {
945 route_id: "r2".into()
946 }
947 );
948
949 let query_result = block_on(rt.ask(RuntimeQuery::GetRouteStatus {
950 route_id: "r2".into(),
951 }))
952 .unwrap();
953 assert_eq!(
954 query_result,
955 RuntimeQueryResult::RouteStatus {
956 route_id: "r2".into(),
957 status: "Started".into()
958 }
959 );
960 }
961
962 #[test]
963 fn canonical_contract_name_and_version_constants_match() {
964 assert_eq!(CANONICAL_CONTRACT_NAME, "canonical-v1");
965 assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
966 }
967
968 #[test]
969 fn canonical_concurrency_spec_rejects_zero_max() {
970 let spec = CanonicalRouteSpec::new("r1", "timer:tick")
971 .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 0 });
972 let err = spec.validate_contract().unwrap_err().to_string();
973 assert!(err.contains("concurrency max must be > 0"), "{err}");
974 }
975
976 #[test]
977 fn canonical_v2_round_trip() {
978 let spec = CanonicalRouteSpec::new("r1", "timer:tick")
979 .with_auto_startup(false)
980 .with_startup_order(42)
981 .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 8 });
982 spec.validate_contract().unwrap();
983 }
984
985 #[test]
986 fn canonical_v2_version_is_2() {
987 assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
988 }
989
990 #[test]
991 fn canonical_loss_report_builder() {
992 let report =
993 CanonicalLossReport::from_field("error_handler", "not supported by canonical path", 2);
994 assert_eq!(report.dropped_fields.len(), 1);
995 assert_eq!(report.dropped_fields[0].field, "error_handler");
996 }
997
998 #[test]
999 fn canonical_v1_json_deserializes_in_v2() {
1000 let json = r#"{"route_id":"r1","from":"timer:tick","steps":[],"version":1}"#;
1001 let spec: CanonicalRouteSpec = serde_json::from_str(json).unwrap();
1002 assert_eq!(spec.route_id, "r1");
1003 assert!(spec.auto_startup.is_none());
1004 assert!(spec.startup_order.is_none());
1005 assert!(spec.concurrency.is_none());
1006 spec.validate_contract().unwrap();
1008 }
1009}