1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 2;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10 "to",
11 "log",
12 "wire_tap",
13 "script",
14 "filter",
15 "choice",
16 "split",
17 "aggregate",
18 "stop",
19 "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22 &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24 "set_header",
25 "set_property",
26 "set_body",
27 "multicast",
28 "convert_body_to",
29 "bean",
30 "marshal",
31 "unmarshal",
32];
33pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
34 "processor",
35 "process",
36 "process_fn",
37 "map_body",
38 "set_body_fn",
39 "set_header_fn",
40];
41
42pub fn canonical_contract_supports_step(step: &str) -> bool {
43 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
44}
45
46pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
47 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
48 return Some(
49 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
50 );
51 }
52
53 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
54 return Some("rust-only programmable step; not representable in canonical v1 contract");
55 }
56
57 if canonical_contract_supports_step(step)
58 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
59 {
60 return Some(
61 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
62 );
63 }
64
65 None
66}
67
68#[derive(
69 Debug,
70 Clone,
71 PartialEq,
72 Eq,
73 serde::Serialize,
74 serde::Deserialize,
75 schemars::JsonSchema,
76 ts_rs::TS,
77)]
78#[serde(rename_all = "snake_case")]
79#[ts(rename_all = "snake_case")]
80pub struct CanonicalRouteSpec {
81 pub route_id: String,
91 pub from: String,
92 pub steps: Vec<CanonicalStepSpec>,
93 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
94 pub auto_startup: Option<bool>,
95 pub startup_order: Option<i32>,
96 pub concurrency: Option<CanonicalConcurrencySpec>,
97 pub version: u32,
98}
99
100#[derive(
101 Debug,
102 Clone,
103 PartialEq,
104 Eq,
105 serde::Serialize,
106 serde::Deserialize,
107 schemars::JsonSchema,
108 ts_rs::TS,
109)]
110#[serde(tag = "step", content = "config", rename_all = "snake_case")]
111#[ts(rename_all = "snake_case")]
112pub enum CanonicalStepSpec {
113 To {
114 uri: String,
115 },
116 Log {
117 message: String,
118 },
119 WireTap {
120 uri: String,
121 },
122 Script {
123 expression: LanguageExpressionDef,
124 },
125 Filter {
126 predicate: LanguageExpressionDef,
127 steps: Vec<CanonicalStepSpec>,
128 },
129 Choice {
130 whens: Vec<CanonicalWhenSpec>,
131 otherwise: Option<Vec<CanonicalStepSpec>>,
132 },
133 Split {
134 expression: CanonicalSplitExpressionSpec,
135 aggregation: CanonicalSplitAggregationSpec,
136 parallel: bool,
137 parallel_limit: Option<usize>,
138 stop_on_exception: bool,
139 steps: Vec<CanonicalStepSpec>,
140 },
141 Aggregate(CanonicalAggregateSpec),
142 Stop,
143 Delay {
144 #[ts(type = "number")]
145 delay_ms: u64,
146 dynamic_header: Option<String>,
147 },
148}
149
150#[derive(
151 Debug,
152 Clone,
153 PartialEq,
154 Eq,
155 serde::Serialize,
156 serde::Deserialize,
157 schemars::JsonSchema,
158 ts_rs::TS,
159)]
160#[serde(rename_all = "snake_case")]
161#[ts(rename_all = "snake_case")]
162pub struct CanonicalWhenSpec {
163 pub predicate: LanguageExpressionDef,
164 pub steps: Vec<CanonicalStepSpec>,
165}
166
167#[derive(
168 Debug,
169 Clone,
170 PartialEq,
171 Eq,
172 serde::Serialize,
173 serde::Deserialize,
174 schemars::JsonSchema,
175 ts_rs::TS,
176)]
177#[serde(rename_all = "snake_case")]
178#[ts(rename_all = "snake_case")]
179pub enum CanonicalSplitExpressionSpec {
180 BodyLines,
181 BodyJsonArray,
182 Language(LanguageExpressionDef),
183}
184
185#[derive(
186 Debug,
187 Clone,
188 PartialEq,
189 Eq,
190 serde::Serialize,
191 serde::Deserialize,
192 schemars::JsonSchema,
193 ts_rs::TS,
194)]
195#[serde(rename_all = "snake_case")]
196#[ts(rename_all = "snake_case")]
197pub enum CanonicalSplitAggregationSpec {
198 LastWins,
199 CollectAll,
200 Original,
201}
202
203#[derive(
204 Debug,
205 Clone,
206 PartialEq,
207 Eq,
208 serde::Serialize,
209 serde::Deserialize,
210 schemars::JsonSchema,
211 ts_rs::TS,
212)]
213#[serde(rename_all = "snake_case")]
214#[ts(rename_all = "snake_case")]
215pub enum CanonicalAggregateStrategySpec {
216 CollectAll,
217}
218
219#[derive(
220 Debug,
221 Clone,
222 PartialEq,
223 Eq,
224 serde::Serialize,
225 serde::Deserialize,
226 schemars::JsonSchema,
227 ts_rs::TS,
228)]
229#[serde(rename_all = "snake_case")]
230#[ts(rename_all = "snake_case")]
231pub struct CanonicalAggregateSpec {
232 pub header: String,
233 pub completion_size: Option<usize>,
234 #[ts(type = "number")]
235 pub completion_timeout_ms: Option<u64>,
236 pub correlation_key: Option<String>,
237 pub force_completion_on_stop: Option<bool>,
238 pub discard_on_timeout: Option<bool>,
239 pub strategy: CanonicalAggregateStrategySpec,
240 pub max_buckets: Option<usize>,
241 #[ts(type = "number")]
242 pub bucket_ttl_ms: Option<u64>,
243}
244
245#[derive(
246 Debug,
247 Clone,
248 PartialEq,
249 Eq,
250 serde::Serialize,
251 serde::Deserialize,
252 schemars::JsonSchema,
253 ts_rs::TS,
254)]
255#[serde(rename_all = "snake_case")]
256#[ts(rename_all = "snake_case")]
257pub struct CanonicalCircuitBreakerSpec {
258 pub failure_threshold: u32,
259 #[ts(type = "number")]
260 pub open_duration_ms: u64,
261}
262
263#[derive(
264 Debug,
265 Clone,
266 PartialEq,
267 Eq,
268 serde::Serialize,
269 serde::Deserialize,
270 schemars::JsonSchema,
271 ts_rs::TS,
272)]
273#[serde(tag = "mode", rename_all = "snake_case")]
274pub enum CanonicalConcurrencySpec {
275 Sequential,
276 Concurrent { max: usize },
277}
278
279impl CanonicalRouteSpec {
280 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
281 Self {
282 route_id: route_id.into(),
283 from: from.into(),
284 steps: Vec::new(),
285 circuit_breaker: None,
286 auto_startup: None,
287 startup_order: None,
288 concurrency: None,
289 version: CANONICAL_CONTRACT_VERSION,
290 }
291 }
292
293 pub fn with_auto_startup(mut self, auto: bool) -> Self {
294 self.auto_startup = Some(auto);
295 self
296 }
297
298 pub fn with_startup_order(mut self, order: i32) -> Self {
299 self.startup_order = Some(order);
300 self
301 }
302
303 pub fn with_concurrency(mut self, concurrency: CanonicalConcurrencySpec) -> Self {
304 self.concurrency = Some(concurrency);
305 self
306 }
307
308 pub fn validate_contract(&self) -> Result<(), CamelError> {
309 if self.route_id.trim().is_empty() {
310 return Err(CamelError::RouteError(
311 "canonical contract violation: route_id cannot be empty".to_string(),
312 ));
313 }
314 if self.from.trim().is_empty() {
315 return Err(CamelError::RouteError(
316 "canonical contract violation: from cannot be empty".to_string(),
317 ));
318 }
319 if self.version == 0 || self.version > CANONICAL_CONTRACT_VERSION {
320 return Err(CamelError::RouteError(format!(
321 "canonical contract violation: expected version {}, got {}",
322 CANONICAL_CONTRACT_VERSION, self.version
323 )));
324 }
325 validate_steps(&self.steps)?;
326 if let Some(cb) = &self.circuit_breaker {
327 if cb.failure_threshold == 0 {
328 return Err(CamelError::RouteError(
329 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
330 .to_string(),
331 ));
332 }
333 if cb.open_duration_ms == 0 {
334 return Err(CamelError::RouteError(
335 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
336 .to_string(),
337 ));
338 }
339 }
340 if let Some(CanonicalConcurrencySpec::Concurrent { max: 0 }) = &self.concurrency {
341 return Err(CamelError::RouteError(
342 "canonical contract violation: concurrency max must be > 0".to_string(),
343 ));
344 }
345 Ok(())
346 }
347}
348
349#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
350pub struct CanonicalFieldLoss {
351 pub field: &'static str,
352 pub reason: String,
353 pub target_version: u32,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize)]
357pub struct CanonicalLossReport {
358 pub dropped_fields: Vec<CanonicalFieldLoss>,
359}
360
361impl CanonicalLossReport {
362 pub fn from_field(field: &'static str, reason: &str, target_version: u32) -> Self {
363 Self {
364 dropped_fields: vec![CanonicalFieldLoss {
365 field,
366 reason: reason.to_string(),
367 target_version,
368 }],
369 }
370 }
371
372 pub fn is_empty(&self) -> bool {
373 self.dropped_fields.is_empty()
374 }
375}
376
377fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
378 for step in steps {
379 match step {
380 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
381 if uri.trim().is_empty() {
382 return Err(CamelError::RouteError(
383 "canonical contract violation: endpoint uri cannot be empty".to_string(),
384 ));
385 }
386 }
387 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
388 CanonicalStepSpec::Choice { whens, otherwise } => {
389 for when in whens {
390 validate_steps(&when.steps)?;
391 }
392 if let Some(otherwise) = otherwise {
393 validate_steps(otherwise)?;
394 }
395 }
396 CanonicalStepSpec::Split {
397 parallel_limit,
398 steps,
399 ..
400 } => {
401 if let Some(limit) = parallel_limit
402 && *limit == 0
403 {
404 return Err(CamelError::RouteError(
405 "canonical contract violation: split.parallel_limit must be > 0"
406 .to_string(),
407 ));
408 }
409 validate_steps(steps)?;
410 }
411 CanonicalStepSpec::Aggregate(config) => {
412 if config.header.trim().is_empty() {
413 return Err(CamelError::RouteError(
414 "canonical contract violation: aggregate.header cannot be empty"
415 .to_string(),
416 ));
417 }
418 if let Some(size) = config.completion_size
419 && size == 0
420 {
421 return Err(CamelError::RouteError(
422 "canonical contract violation: aggregate.completion_size must be > 0"
423 .to_string(),
424 ));
425 }
426 }
427 CanonicalStepSpec::Log { .. }
428 | CanonicalStepSpec::Script { .. }
429 | CanonicalStepSpec::Stop
430 | CanonicalStepSpec::Delay { .. } => {}
431 }
432 }
433 Ok(())
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub enum RuntimeCommand {
438 RegisterRoute {
439 spec: CanonicalRouteSpec,
440 command_id: String,
441 causation_id: Option<String>,
442 },
443 StartRoute {
444 route_id: String,
445 command_id: String,
446 causation_id: Option<String>,
447 },
448 StopRoute {
449 route_id: String,
450 command_id: String,
451 causation_id: Option<String>,
452 },
453 SuspendRoute {
454 route_id: String,
455 command_id: String,
456 causation_id: Option<String>,
457 },
458 ResumeRoute {
459 route_id: String,
460 command_id: String,
461 causation_id: Option<String>,
462 },
463 ReloadRoute {
464 route_id: String,
465 command_id: String,
466 causation_id: Option<String>,
467 },
468 FailRoute {
472 route_id: String,
473 error: String,
474 command_id: String,
475 causation_id: Option<String>,
476 },
477 RemoveRoute {
478 route_id: String,
479 command_id: String,
480 causation_id: Option<String>,
481 },
482}
483
484impl RuntimeCommand {
485 pub fn command_id(&self) -> &str {
486 match self {
487 RuntimeCommand::RegisterRoute { command_id, .. }
488 | RuntimeCommand::StartRoute { command_id, .. }
489 | RuntimeCommand::StopRoute { command_id, .. }
490 | RuntimeCommand::SuspendRoute { command_id, .. }
491 | RuntimeCommand::ResumeRoute { command_id, .. }
492 | RuntimeCommand::ReloadRoute { command_id, .. }
493 | RuntimeCommand::FailRoute { command_id, .. }
494 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
495 }
496 }
497
498 pub fn causation_id(&self) -> Option<&str> {
499 match self {
500 RuntimeCommand::RegisterRoute { causation_id, .. }
501 | RuntimeCommand::StartRoute { causation_id, .. }
502 | RuntimeCommand::StopRoute { causation_id, .. }
503 | RuntimeCommand::SuspendRoute { causation_id, .. }
504 | RuntimeCommand::ResumeRoute { causation_id, .. }
505 | RuntimeCommand::ReloadRoute { causation_id, .. }
506 | RuntimeCommand::FailRoute { causation_id, .. }
507 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
508 }
509 }
510}
511
512#[derive(Debug, Clone, PartialEq, Eq)]
513pub enum RuntimeCommandResult {
514 Accepted,
515 Duplicate { command_id: String },
516 RouteRegistered { route_id: String },
517 RouteStateChanged { route_id: String, status: String },
518}
519
520#[derive(Debug, Clone, PartialEq, Eq)]
521pub enum RuntimeQuery {
522 GetRouteStatus {
523 route_id: String,
524 },
525 InFlightCount {
529 route_id: String,
530 },
531 ListRoutes,
532}
533
534#[derive(Debug, Clone, PartialEq, Eq)]
535pub enum RuntimeQueryResult {
536 InFlightCount { route_id: String, count: u64 },
537 RouteNotFound { route_id: String },
538 RouteStatus { route_id: String, status: String },
539 Routes { route_ids: Vec<String> },
540}
541
542#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
543pub enum RuntimeEvent {
544 RouteRegistered { route_id: String },
545 RouteStartRequested { route_id: String },
546 RouteStarted { route_id: String },
547 RouteFailed { route_id: String, error: String },
548 RouteStopped { route_id: String },
549 RouteSuspended { route_id: String },
550 RouteResumed { route_id: String },
551 RouteReloaded { route_id: String },
552 RouteRemoved { route_id: String },
553}
554
555#[async_trait]
556pub trait RuntimeCommandBus: Send + Sync {
557 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
558}
559
560#[async_trait]
561pub trait RuntimeQueryBus: Send + Sync {
562 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
563}
564
565pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
566
567impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use async_trait::async_trait;
573 use futures::executor::block_on;
574
575 struct NoopRuntime;
576
577 #[async_trait]
578 impl RuntimeCommandBus for NoopRuntime {
579 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
580 Ok(match cmd {
581 RuntimeCommand::RegisterRoute { spec, .. } => {
582 RuntimeCommandResult::RouteRegistered {
583 route_id: spec.route_id,
584 }
585 }
586 RuntimeCommand::StartRoute { route_id, .. }
587 | RuntimeCommand::StopRoute { route_id, .. }
588 | RuntimeCommand::SuspendRoute { route_id, .. }
589 | RuntimeCommand::ResumeRoute { route_id, .. }
590 | RuntimeCommand::ReloadRoute { route_id, .. }
591 | RuntimeCommand::FailRoute { route_id, .. }
592 | RuntimeCommand::RemoveRoute { route_id, .. } => {
593 RuntimeCommandResult::RouteStateChanged {
594 route_id,
595 status: "ok".to_string(),
596 }
597 }
598 })
599 }
600 }
601
602 #[async_trait]
603 impl RuntimeQueryBus for NoopRuntime {
604 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
605 Ok(match query {
606 RuntimeQuery::GetRouteStatus { route_id } => RuntimeQueryResult::RouteStatus {
607 route_id,
608 status: "Started".to_string(),
609 },
610 RuntimeQuery::InFlightCount { route_id } => {
611 RuntimeQueryResult::InFlightCount { route_id, count: 0 }
612 }
613 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
614 route_ids: vec!["r1".to_string()],
615 },
616 })
617 }
618 }
619
620 #[test]
621 fn command_and_query_ids_are_exposed() {
622 let cmd = RuntimeCommand::StartRoute {
623 route_id: "r1".into(),
624 command_id: "c1".into(),
625 causation_id: None,
626 };
627 assert_eq!(cmd.command_id(), "c1");
628 }
629
630 #[test]
631 fn canonical_spec_requires_route_id_and_from() {
632 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
633 assert_eq!(spec.route_id, "r1");
634 assert_eq!(spec.from, "timer:tick");
635 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
636 assert!(spec.steps.is_empty());
637 assert!(spec.circuit_breaker.is_none());
638 }
639
640 #[test]
641 fn canonical_contract_rejects_invalid_version() {
642 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
643 spec.version = 3;
644 let err = spec.validate_contract().unwrap_err().to_string();
645 assert!(err.contains("expected version"));
646 }
647
648 #[test]
649 fn canonical_contract_declares_subset_scope() {
650 assert!(canonical_contract_supports_step("to"));
651 assert!(canonical_contract_supports_step("split"));
652 assert!(!canonical_contract_supports_step("set_header"));
653 assert!(!canonical_contract_supports_step("set_property"));
654
655 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
656 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
657 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_property"));
658 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
659 }
660
661 #[test]
662 fn canonical_contract_rejection_reason_is_explicit() {
663 let set_header_reason = canonical_contract_rejection_reason("set_header")
664 .expect("set_header should have explicit reason");
665 assert!(set_header_reason.contains("out-of-scope"));
666
667 let set_property_reason = canonical_contract_rejection_reason("set_property")
668 .expect("set_property should have explicit reason");
669 assert!(set_property_reason.contains("out-of-scope"));
670
671 let processor_reason = canonical_contract_rejection_reason("processor")
672 .expect("processor should be rust-only");
673 assert!(processor_reason.contains("rust-only"));
674
675 let split_reason = canonical_contract_rejection_reason("split")
676 .expect("split should require declarative form");
677 assert!(split_reason.contains("declarative"));
678 }
679
680 #[test]
681 fn command_causation_id_is_exposed() {
682 let cmd = RuntimeCommand::StopRoute {
683 route_id: "r1".into(),
684 command_id: "c2".into(),
685 causation_id: Some("c1".into()),
686 };
687 assert_eq!(cmd.command_id(), "c2");
688 assert_eq!(cmd.causation_id(), Some("c1"));
689 }
690
691 #[test]
692 fn canonical_contract_rejects_empty_route_id_and_from() {
693 let spec = CanonicalRouteSpec::new(" ", "timer:tick");
694 let err = spec.validate_contract().unwrap_err().to_string();
695 assert!(err.contains("route_id cannot be empty"));
696
697 let spec = CanonicalRouteSpec::new("r1", " ");
698 let err = spec.validate_contract().unwrap_err().to_string();
699 assert!(err.contains("from cannot be empty"));
700 }
701
702 #[test]
703 fn canonical_contract_rejects_invalid_nested_steps() {
704 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
705 spec.steps = vec![CanonicalStepSpec::Split {
706 expression: CanonicalSplitExpressionSpec::BodyLines,
707 aggregation: CanonicalSplitAggregationSpec::CollectAll,
708 parallel: true,
709 parallel_limit: Some(0),
710 stop_on_exception: false,
711 steps: vec![CanonicalStepSpec::To {
712 uri: "log:ok".to_string(),
713 }],
714 }];
715 let err = spec.validate_contract().unwrap_err().to_string();
716 assert!(err.contains("split.parallel_limit must be > 0"));
717
718 spec.steps = vec![CanonicalStepSpec::To {
719 uri: " ".to_string(),
720 }];
721 let err = spec.validate_contract().unwrap_err().to_string();
722 assert!(err.contains("endpoint uri cannot be empty"));
723 }
724
725 #[test]
726 fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
727 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
728 spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
729 header: " ".to_string(),
730 completion_size: Some(1),
731 completion_timeout_ms: None,
732 correlation_key: None,
733 force_completion_on_stop: None,
734 discard_on_timeout: None,
735 strategy: CanonicalAggregateStrategySpec::CollectAll,
736 max_buckets: None,
737 bucket_ttl_ms: None,
738 })];
739 let err = spec.validate_contract().unwrap_err().to_string();
740 assert!(err.contains("aggregate.header cannot be empty"));
741
742 spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
743 header: "k".to_string(),
744 completion_size: Some(0),
745 completion_timeout_ms: None,
746 correlation_key: None,
747 force_completion_on_stop: None,
748 discard_on_timeout: None,
749 strategy: CanonicalAggregateStrategySpec::CollectAll,
750 max_buckets: None,
751 bucket_ttl_ms: None,
752 })];
753 let err = spec.validate_contract().unwrap_err().to_string();
754 assert!(err.contains("aggregate.completion_size must be > 0"));
755
756 spec.steps = vec![];
757 spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
758 failure_threshold: 0,
759 open_duration_ms: 10,
760 });
761 let err = spec.validate_contract().unwrap_err().to_string();
762 assert!(err.contains("failure_threshold must be > 0"));
763
764 spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
765 failure_threshold: 1,
766 open_duration_ms: 0,
767 });
768 let err = spec.validate_contract().unwrap_err().to_string();
769 assert!(err.contains("open_duration_ms must be > 0"));
770 }
771
772 #[test]
773 fn canonical_contract_rejection_reason_none_for_regular_steps() {
774 assert!(canonical_contract_rejection_reason("to").is_none());
775 assert!(canonical_contract_rejection_reason("unknown-step").is_none());
776 }
777
778 #[test]
779 fn command_helpers_cover_all_variants() {
780 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
781 let cmds = [
782 RuntimeCommand::RegisterRoute {
783 spec,
784 command_id: "c1".into(),
785 causation_id: Some("root".into()),
786 },
787 RuntimeCommand::StartRoute {
788 route_id: "r1".into(),
789 command_id: "c2".into(),
790 causation_id: None,
791 },
792 RuntimeCommand::StopRoute {
793 route_id: "r1".into(),
794 command_id: "c3".into(),
795 causation_id: None,
796 },
797 RuntimeCommand::SuspendRoute {
798 route_id: "r1".into(),
799 command_id: "c4".into(),
800 causation_id: None,
801 },
802 RuntimeCommand::ResumeRoute {
803 route_id: "r1".into(),
804 command_id: "c5".into(),
805 causation_id: None,
806 },
807 RuntimeCommand::ReloadRoute {
808 route_id: "r1".into(),
809 command_id: "c6".into(),
810 causation_id: None,
811 },
812 RuntimeCommand::FailRoute {
813 route_id: "r1".into(),
814 error: "boom".into(),
815 command_id: "c7".into(),
816 causation_id: None,
817 },
818 RuntimeCommand::RemoveRoute {
819 route_id: "r1".into(),
820 command_id: "c8".into(),
821 causation_id: None,
822 },
823 ];
824
825 let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
826 assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
827 assert_eq!(cmds[0].causation_id(), Some("root"));
828 assert_eq!(cmds[1].causation_id(), None);
829 }
830
831 #[test]
832 fn canonical_route_spec_serde_roundtrip() {
833 let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
834 spec.steps.push(CanonicalStepSpec::Log {
835 message: "Hello".into(),
836 });
837 spec.steps.push(CanonicalStepSpec::To {
838 uri: "log:info".into(),
839 });
840 spec.steps.push(CanonicalStepSpec::Stop);
841
842 let json = serde_json::to_string(&spec).unwrap();
843 let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
844 assert_eq!(spec, deserialized);
845 }
846
847 #[test]
848 fn canonical_step_spec_serde_variants() {
849 let steps = vec![
850 CanonicalStepSpec::To {
851 uri: "direct:a".into(),
852 },
853 CanonicalStepSpec::Log {
854 message: "msg".into(),
855 },
856 CanonicalStepSpec::WireTap {
857 uri: "direct:audit".into(),
858 },
859 CanonicalStepSpec::Stop,
860 CanonicalStepSpec::Delay {
861 delay_ms: 100,
862 dynamic_header: None,
863 },
864 ];
865 let json = serde_json::to_string_pretty(&steps).unwrap();
866 let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
867 assert_eq!(steps, back);
868 }
869
870 #[test]
871 fn canonical_route_spec_json_schema_generates() {
872 let schema = schemars::schema_for!(CanonicalRouteSpec);
873 let json = serde_json::to_string(&schema).unwrap();
874 assert!(json.contains("CanonicalRouteSpec"));
875 assert!(json.contains("route_id"));
876 }
877
878 #[test]
879 fn canonical_json_schema_has_no_function_step() {
880 let schema = schemars::schema_for!(CanonicalRouteSpec);
881 let json = serde_json::to_string(&schema).unwrap();
882 assert!(
883 !json.contains("\"function\""),
884 "canonical JSON schema must not contain 'function' step"
885 );
886 }
887
888 #[test]
889 fn canonical_contract_does_not_support_function() {
890 assert!(
891 !canonical_contract_supports_step("function"),
892 "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
893 );
894 }
895
896 #[test]
897 fn runtime_command_result_all_variants_are_distinct() {
898 let accepted = RuntimeCommandResult::Accepted;
899 let dup = RuntimeCommandResult::Duplicate {
900 command_id: "c1".into(),
901 };
902 let registered = RuntimeCommandResult::RouteRegistered {
903 route_id: "r1".into(),
904 };
905 let changed = RuntimeCommandResult::RouteStateChanged {
906 route_id: "r1".into(),
907 status: "Started".into(),
908 };
909
910 assert_ne!(accepted, dup);
911 assert_ne!(dup, registered);
912 assert_ne!(registered, changed);
913
914 let dup2 = RuntimeCommandResult::Duplicate {
915 command_id: "c1".into(),
916 };
917 assert_eq!(dup, dup2);
918 }
919
920 #[test]
921 fn runtime_event_serialization_round_trip() {
922 let event = RuntimeEvent::RouteFailed {
923 route_id: "route-a".to_string(),
924 error: "boom".to_string(),
925 };
926 let json = serde_json::to_string(&event).unwrap();
927 let back: RuntimeEvent = serde_json::from_str(&json).unwrap();
928 assert_eq!(event, back);
929 }
930
931 #[test]
932 fn noop_runtime_execute_and_ask_return_expected_shapes() {
933 let rt = NoopRuntime;
934 let cmd = RuntimeCommand::RegisterRoute {
935 spec: CanonicalRouteSpec::new("r2", "timer:tick"),
936 command_id: "c1".into(),
937 causation_id: None,
938 };
939 let cmd_result = block_on(rt.execute(cmd)).unwrap();
940 assert_eq!(
941 cmd_result,
942 RuntimeCommandResult::RouteRegistered {
943 route_id: "r2".into()
944 }
945 );
946
947 let query_result = block_on(rt.ask(RuntimeQuery::GetRouteStatus {
948 route_id: "r2".into(),
949 }))
950 .unwrap();
951 assert_eq!(
952 query_result,
953 RuntimeQueryResult::RouteStatus {
954 route_id: "r2".into(),
955 status: "Started".into()
956 }
957 );
958 }
959
960 #[test]
961 fn canonical_contract_name_and_version_constants_match() {
962 assert_eq!(CANONICAL_CONTRACT_NAME, "canonical-v1");
963 assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
964 }
965
966 #[test]
967 fn canonical_concurrency_spec_rejects_zero_max() {
968 let spec = CanonicalRouteSpec::new("r1", "timer:tick")
969 .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 0 });
970 let err = spec.validate_contract().unwrap_err().to_string();
971 assert!(err.contains("concurrency max must be > 0"), "{err}");
972 }
973
974 #[test]
975 fn canonical_v2_round_trip() {
976 let spec = CanonicalRouteSpec::new("r1", "timer:tick")
977 .with_auto_startup(false)
978 .with_startup_order(42)
979 .with_concurrency(CanonicalConcurrencySpec::Concurrent { max: 8 });
980 spec.validate_contract().unwrap();
981 }
982
983 #[test]
984 fn canonical_v2_version_is_2() {
985 assert_eq!(CANONICAL_CONTRACT_VERSION, 2);
986 }
987
988 #[test]
989 fn canonical_loss_report_builder() {
990 let report =
991 CanonicalLossReport::from_field("error_handler", "not supported by canonical path", 2);
992 assert_eq!(report.dropped_fields.len(), 1);
993 assert_eq!(report.dropped_fields[0].field, "error_handler");
994 }
995
996 #[test]
997 fn canonical_v1_json_deserializes_in_v2() {
998 let json = r#"{"route_id":"r1","from":"timer:tick","steps":[],"version":1}"#;
999 let spec: CanonicalRouteSpec = serde_json::from_str(json).unwrap();
1000 assert_eq!(spec.route_id, "r1");
1001 assert!(spec.auto_startup.is_none());
1002 assert!(spec.startup_order.is_none());
1003 assert!(spec.concurrency.is_none());
1004 spec.validate_contract().unwrap();
1006 }
1007}