1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10 "to",
11 "log",
12 "wire_tap",
13 "script",
14 "filter",
15 "choice",
16 "split",
17 "aggregate",
18 "stop",
19 "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22 &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24 "set_header",
25 "set_property",
26 "set_body",
27 "multicast",
28 "convert_body_to",
29 "bean",
30 "marshal",
31 "unmarshal",
32];
33pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
34 "processor",
35 "process",
36 "process_fn",
37 "map_body",
38 "set_body_fn",
39 "set_header_fn",
40];
41
42pub fn canonical_contract_supports_step(step: &str) -> bool {
43 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
44}
45
46pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
47 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
48 return Some(
49 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
50 );
51 }
52
53 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
54 return Some("rust-only programmable step; not representable in canonical v1 contract");
55 }
56
57 if canonical_contract_supports_step(step)
58 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
59 {
60 return Some(
61 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
62 );
63 }
64
65 None
66}
67
68#[derive(
69 Debug,
70 Clone,
71 PartialEq,
72 Eq,
73 serde::Serialize,
74 serde::Deserialize,
75 schemars::JsonSchema,
76 ts_rs::TS,
77)]
78#[serde(rename_all = "snake_case")]
79#[ts(rename_all = "snake_case")]
80pub struct CanonicalRouteSpec {
81 pub route_id: String,
90 pub from: String,
91 pub steps: Vec<CanonicalStepSpec>,
92 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
93 pub version: u32,
94}
95
96#[derive(
97 Debug,
98 Clone,
99 PartialEq,
100 Eq,
101 serde::Serialize,
102 serde::Deserialize,
103 schemars::JsonSchema,
104 ts_rs::TS,
105)]
106#[serde(tag = "step", content = "config", rename_all = "snake_case")]
107#[ts(rename_all = "snake_case")]
108pub enum CanonicalStepSpec {
109 To {
110 uri: String,
111 },
112 Log {
113 message: String,
114 },
115 WireTap {
116 uri: String,
117 },
118 Script {
119 expression: LanguageExpressionDef,
120 },
121 Filter {
122 predicate: LanguageExpressionDef,
123 steps: Vec<CanonicalStepSpec>,
124 },
125 Choice {
126 whens: Vec<CanonicalWhenSpec>,
127 otherwise: Option<Vec<CanonicalStepSpec>>,
128 },
129 Split {
130 expression: CanonicalSplitExpressionSpec,
131 aggregation: CanonicalSplitAggregationSpec,
132 parallel: bool,
133 parallel_limit: Option<usize>,
134 stop_on_exception: bool,
135 steps: Vec<CanonicalStepSpec>,
136 },
137 Aggregate(CanonicalAggregateSpec),
138 Stop,
139 Delay {
140 #[ts(type = "number")]
141 delay_ms: u64,
142 dynamic_header: Option<String>,
143 },
144}
145
146#[derive(
147 Debug,
148 Clone,
149 PartialEq,
150 Eq,
151 serde::Serialize,
152 serde::Deserialize,
153 schemars::JsonSchema,
154 ts_rs::TS,
155)]
156#[serde(rename_all = "snake_case")]
157#[ts(rename_all = "snake_case")]
158pub struct CanonicalWhenSpec {
159 pub predicate: LanguageExpressionDef,
160 pub steps: Vec<CanonicalStepSpec>,
161}
162
163#[derive(
164 Debug,
165 Clone,
166 PartialEq,
167 Eq,
168 serde::Serialize,
169 serde::Deserialize,
170 schemars::JsonSchema,
171 ts_rs::TS,
172)]
173#[serde(rename_all = "snake_case")]
174#[ts(rename_all = "snake_case")]
175pub enum CanonicalSplitExpressionSpec {
176 BodyLines,
177 BodyJsonArray,
178 Language(LanguageExpressionDef),
179}
180
181#[derive(
182 Debug,
183 Clone,
184 PartialEq,
185 Eq,
186 serde::Serialize,
187 serde::Deserialize,
188 schemars::JsonSchema,
189 ts_rs::TS,
190)]
191#[serde(rename_all = "snake_case")]
192#[ts(rename_all = "snake_case")]
193pub enum CanonicalSplitAggregationSpec {
194 LastWins,
195 CollectAll,
196 Original,
197}
198
199#[derive(
200 Debug,
201 Clone,
202 PartialEq,
203 Eq,
204 serde::Serialize,
205 serde::Deserialize,
206 schemars::JsonSchema,
207 ts_rs::TS,
208)]
209#[serde(rename_all = "snake_case")]
210#[ts(rename_all = "snake_case")]
211pub enum CanonicalAggregateStrategySpec {
212 CollectAll,
213}
214
215#[derive(
216 Debug,
217 Clone,
218 PartialEq,
219 Eq,
220 serde::Serialize,
221 serde::Deserialize,
222 schemars::JsonSchema,
223 ts_rs::TS,
224)]
225#[serde(rename_all = "snake_case")]
226#[ts(rename_all = "snake_case")]
227pub struct CanonicalAggregateSpec {
228 pub header: String,
229 pub completion_size: Option<usize>,
230 #[ts(type = "number")]
231 pub completion_timeout_ms: Option<u64>,
232 pub correlation_key: Option<String>,
233 pub force_completion_on_stop: Option<bool>,
234 pub discard_on_timeout: Option<bool>,
235 pub strategy: CanonicalAggregateStrategySpec,
236 pub max_buckets: Option<usize>,
237 #[ts(type = "number")]
238 pub bucket_ttl_ms: Option<u64>,
239}
240
241#[derive(
242 Debug,
243 Clone,
244 PartialEq,
245 Eq,
246 serde::Serialize,
247 serde::Deserialize,
248 schemars::JsonSchema,
249 ts_rs::TS,
250)]
251#[serde(rename_all = "snake_case")]
252#[ts(rename_all = "snake_case")]
253pub struct CanonicalCircuitBreakerSpec {
254 pub failure_threshold: u32,
255 #[ts(type = "number")]
256 pub open_duration_ms: u64,
257}
258
259impl CanonicalRouteSpec {
260 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
261 Self {
262 route_id: route_id.into(),
263 from: from.into(),
264 steps: Vec::new(),
265 circuit_breaker: None,
266 version: CANONICAL_CONTRACT_VERSION,
267 }
268 }
269
270 pub fn validate_contract(&self) -> Result<(), CamelError> {
271 if self.route_id.trim().is_empty() {
272 return Err(CamelError::RouteError(
273 "canonical contract violation: route_id cannot be empty".to_string(),
274 ));
275 }
276 if self.from.trim().is_empty() {
277 return Err(CamelError::RouteError(
278 "canonical contract violation: from cannot be empty".to_string(),
279 ));
280 }
281 if self.version != CANONICAL_CONTRACT_VERSION {
282 return Err(CamelError::RouteError(format!(
283 "canonical contract violation: expected version {}, got {}",
284 CANONICAL_CONTRACT_VERSION, self.version
285 )));
286 }
287 validate_steps(&self.steps)?;
288 if let Some(cb) = &self.circuit_breaker {
289 if cb.failure_threshold == 0 {
290 return Err(CamelError::RouteError(
291 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
292 .to_string(),
293 ));
294 }
295 if cb.open_duration_ms == 0 {
296 return Err(CamelError::RouteError(
297 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
298 .to_string(),
299 ));
300 }
301 }
302 Ok(())
303 }
304}
305
306fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
307 for step in steps {
308 match step {
309 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
310 if uri.trim().is_empty() {
311 return Err(CamelError::RouteError(
312 "canonical contract violation: endpoint uri cannot be empty".to_string(),
313 ));
314 }
315 }
316 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
317 CanonicalStepSpec::Choice { whens, otherwise } => {
318 for when in whens {
319 validate_steps(&when.steps)?;
320 }
321 if let Some(otherwise) = otherwise {
322 validate_steps(otherwise)?;
323 }
324 }
325 CanonicalStepSpec::Split {
326 parallel_limit,
327 steps,
328 ..
329 } => {
330 if let Some(limit) = parallel_limit
331 && *limit == 0
332 {
333 return Err(CamelError::RouteError(
334 "canonical contract violation: split.parallel_limit must be > 0"
335 .to_string(),
336 ));
337 }
338 validate_steps(steps)?;
339 }
340 CanonicalStepSpec::Aggregate(config) => {
341 if config.header.trim().is_empty() {
342 return Err(CamelError::RouteError(
343 "canonical contract violation: aggregate.header cannot be empty"
344 .to_string(),
345 ));
346 }
347 if let Some(size) = config.completion_size
348 && size == 0
349 {
350 return Err(CamelError::RouteError(
351 "canonical contract violation: aggregate.completion_size must be > 0"
352 .to_string(),
353 ));
354 }
355 }
356 CanonicalStepSpec::Log { .. }
357 | CanonicalStepSpec::Script { .. }
358 | CanonicalStepSpec::Stop
359 | CanonicalStepSpec::Delay { .. } => {}
360 }
361 }
362 Ok(())
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
366pub enum RuntimeCommand {
367 RegisterRoute {
368 spec: CanonicalRouteSpec,
369 command_id: String,
370 causation_id: Option<String>,
371 },
372 StartRoute {
373 route_id: String,
374 command_id: String,
375 causation_id: Option<String>,
376 },
377 StopRoute {
378 route_id: String,
379 command_id: String,
380 causation_id: Option<String>,
381 },
382 SuspendRoute {
383 route_id: String,
384 command_id: String,
385 causation_id: Option<String>,
386 },
387 ResumeRoute {
388 route_id: String,
389 command_id: String,
390 causation_id: Option<String>,
391 },
392 ReloadRoute {
393 route_id: String,
394 command_id: String,
395 causation_id: Option<String>,
396 },
397 FailRoute {
401 route_id: String,
402 error: String,
403 command_id: String,
404 causation_id: Option<String>,
405 },
406 RemoveRoute {
407 route_id: String,
408 command_id: String,
409 causation_id: Option<String>,
410 },
411}
412
413impl RuntimeCommand {
414 pub fn command_id(&self) -> &str {
415 match self {
416 RuntimeCommand::RegisterRoute { command_id, .. }
417 | RuntimeCommand::StartRoute { command_id, .. }
418 | RuntimeCommand::StopRoute { command_id, .. }
419 | RuntimeCommand::SuspendRoute { command_id, .. }
420 | RuntimeCommand::ResumeRoute { command_id, .. }
421 | RuntimeCommand::ReloadRoute { command_id, .. }
422 | RuntimeCommand::FailRoute { command_id, .. }
423 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
424 }
425 }
426
427 pub fn causation_id(&self) -> Option<&str> {
428 match self {
429 RuntimeCommand::RegisterRoute { causation_id, .. }
430 | RuntimeCommand::StartRoute { causation_id, .. }
431 | RuntimeCommand::StopRoute { causation_id, .. }
432 | RuntimeCommand::SuspendRoute { causation_id, .. }
433 | RuntimeCommand::ResumeRoute { causation_id, .. }
434 | RuntimeCommand::ReloadRoute { causation_id, .. }
435 | RuntimeCommand::FailRoute { causation_id, .. }
436 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
437 }
438 }
439}
440
441#[derive(Debug, Clone, PartialEq, Eq)]
442pub enum RuntimeCommandResult {
443 Accepted,
444 Duplicate { command_id: String },
445 RouteRegistered { route_id: String },
446 RouteStateChanged { route_id: String, status: String },
447}
448
449#[derive(Debug, Clone, PartialEq, Eq)]
450pub enum RuntimeQuery {
451 GetRouteStatus {
452 route_id: String,
453 },
454 InFlightCount {
458 route_id: String,
459 },
460 ListRoutes,
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
464pub enum RuntimeQueryResult {
465 InFlightCount { route_id: String, count: u64 },
466 RouteNotFound { route_id: String },
467 RouteStatus { route_id: String, status: String },
468 Routes { route_ids: Vec<String> },
469}
470
471#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
472pub enum RuntimeEvent {
473 RouteRegistered { route_id: String },
474 RouteStartRequested { route_id: String },
475 RouteStarted { route_id: String },
476 RouteFailed { route_id: String, error: String },
477 RouteStopped { route_id: String },
478 RouteSuspended { route_id: String },
479 RouteResumed { route_id: String },
480 RouteReloaded { route_id: String },
481 RouteRemoved { route_id: String },
482}
483
484#[async_trait]
485pub trait RuntimeCommandBus: Send + Sync {
486 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
487}
488
489#[async_trait]
490pub trait RuntimeQueryBus: Send + Sync {
491 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
492}
493
494pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
495
496impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use async_trait::async_trait;
502 use futures::executor::block_on;
503
504 struct NoopRuntime;
505
506 #[async_trait]
507 impl RuntimeCommandBus for NoopRuntime {
508 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
509 Ok(match cmd {
510 RuntimeCommand::RegisterRoute { spec, .. } => {
511 RuntimeCommandResult::RouteRegistered {
512 route_id: spec.route_id,
513 }
514 }
515 RuntimeCommand::StartRoute { route_id, .. }
516 | RuntimeCommand::StopRoute { route_id, .. }
517 | RuntimeCommand::SuspendRoute { route_id, .. }
518 | RuntimeCommand::ResumeRoute { route_id, .. }
519 | RuntimeCommand::ReloadRoute { route_id, .. }
520 | RuntimeCommand::FailRoute { route_id, .. }
521 | RuntimeCommand::RemoveRoute { route_id, .. } => {
522 RuntimeCommandResult::RouteStateChanged {
523 route_id,
524 status: "ok".to_string(),
525 }
526 }
527 })
528 }
529 }
530
531 #[async_trait]
532 impl RuntimeQueryBus for NoopRuntime {
533 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
534 Ok(match query {
535 RuntimeQuery::GetRouteStatus { route_id } => RuntimeQueryResult::RouteStatus {
536 route_id,
537 status: "Started".to_string(),
538 },
539 RuntimeQuery::InFlightCount { route_id } => {
540 RuntimeQueryResult::InFlightCount { route_id, count: 0 }
541 }
542 RuntimeQuery::ListRoutes => RuntimeQueryResult::Routes {
543 route_ids: vec!["r1".to_string()],
544 },
545 })
546 }
547 }
548
549 #[test]
550 fn command_and_query_ids_are_exposed() {
551 let cmd = RuntimeCommand::StartRoute {
552 route_id: "r1".into(),
553 command_id: "c1".into(),
554 causation_id: None,
555 };
556 assert_eq!(cmd.command_id(), "c1");
557 }
558
559 #[test]
560 fn canonical_spec_requires_route_id_and_from() {
561 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
562 assert_eq!(spec.route_id, "r1");
563 assert_eq!(spec.from, "timer:tick");
564 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
565 assert!(spec.steps.is_empty());
566 assert!(spec.circuit_breaker.is_none());
567 }
568
569 #[test]
570 fn canonical_contract_rejects_invalid_version() {
571 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
572 spec.version = 2;
573 let err = spec.validate_contract().unwrap_err().to_string();
574 assert!(err.contains("expected version"));
575 }
576
577 #[test]
578 fn canonical_contract_declares_subset_scope() {
579 assert!(canonical_contract_supports_step("to"));
580 assert!(canonical_contract_supports_step("split"));
581 assert!(!canonical_contract_supports_step("set_header"));
582 assert!(!canonical_contract_supports_step("set_property"));
583
584 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
585 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
586 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_property"));
587 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
588 }
589
590 #[test]
591 fn canonical_contract_rejection_reason_is_explicit() {
592 let set_header_reason = canonical_contract_rejection_reason("set_header")
593 .expect("set_header should have explicit reason");
594 assert!(set_header_reason.contains("out-of-scope"));
595
596 let set_property_reason = canonical_contract_rejection_reason("set_property")
597 .expect("set_property should have explicit reason");
598 assert!(set_property_reason.contains("out-of-scope"));
599
600 let processor_reason = canonical_contract_rejection_reason("processor")
601 .expect("processor should be rust-only");
602 assert!(processor_reason.contains("rust-only"));
603
604 let split_reason = canonical_contract_rejection_reason("split")
605 .expect("split should require declarative form");
606 assert!(split_reason.contains("declarative"));
607 }
608
609 #[test]
610 fn command_causation_id_is_exposed() {
611 let cmd = RuntimeCommand::StopRoute {
612 route_id: "r1".into(),
613 command_id: "c2".into(),
614 causation_id: Some("c1".into()),
615 };
616 assert_eq!(cmd.command_id(), "c2");
617 assert_eq!(cmd.causation_id(), Some("c1"));
618 }
619
620 #[test]
621 fn canonical_contract_rejects_empty_route_id_and_from() {
622 let spec = CanonicalRouteSpec::new(" ", "timer:tick");
623 let err = spec.validate_contract().unwrap_err().to_string();
624 assert!(err.contains("route_id cannot be empty"));
625
626 let spec = CanonicalRouteSpec::new("r1", " ");
627 let err = spec.validate_contract().unwrap_err().to_string();
628 assert!(err.contains("from cannot be empty"));
629 }
630
631 #[test]
632 fn canonical_contract_rejects_invalid_nested_steps() {
633 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
634 spec.steps = vec![CanonicalStepSpec::Split {
635 expression: CanonicalSplitExpressionSpec::BodyLines,
636 aggregation: CanonicalSplitAggregationSpec::CollectAll,
637 parallel: true,
638 parallel_limit: Some(0),
639 stop_on_exception: false,
640 steps: vec![CanonicalStepSpec::To {
641 uri: "log:ok".to_string(),
642 }],
643 }];
644 let err = spec.validate_contract().unwrap_err().to_string();
645 assert!(err.contains("split.parallel_limit must be > 0"));
646
647 spec.steps = vec![CanonicalStepSpec::To {
648 uri: " ".to_string(),
649 }];
650 let err = spec.validate_contract().unwrap_err().to_string();
651 assert!(err.contains("endpoint uri cannot be empty"));
652 }
653
654 #[test]
655 fn canonical_contract_rejects_invalid_aggregate_and_circuit_breaker() {
656 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
657 spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
658 header: " ".to_string(),
659 completion_size: Some(1),
660 completion_timeout_ms: None,
661 correlation_key: None,
662 force_completion_on_stop: None,
663 discard_on_timeout: None,
664 strategy: CanonicalAggregateStrategySpec::CollectAll,
665 max_buckets: None,
666 bucket_ttl_ms: None,
667 })];
668 let err = spec.validate_contract().unwrap_err().to_string();
669 assert!(err.contains("aggregate.header cannot be empty"));
670
671 spec.steps = vec![CanonicalStepSpec::Aggregate(CanonicalAggregateSpec {
672 header: "k".to_string(),
673 completion_size: Some(0),
674 completion_timeout_ms: None,
675 correlation_key: None,
676 force_completion_on_stop: None,
677 discard_on_timeout: None,
678 strategy: CanonicalAggregateStrategySpec::CollectAll,
679 max_buckets: None,
680 bucket_ttl_ms: None,
681 })];
682 let err = spec.validate_contract().unwrap_err().to_string();
683 assert!(err.contains("aggregate.completion_size must be > 0"));
684
685 spec.steps = vec![];
686 spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
687 failure_threshold: 0,
688 open_duration_ms: 10,
689 });
690 let err = spec.validate_contract().unwrap_err().to_string();
691 assert!(err.contains("failure_threshold must be > 0"));
692
693 spec.circuit_breaker = Some(CanonicalCircuitBreakerSpec {
694 failure_threshold: 1,
695 open_duration_ms: 0,
696 });
697 let err = spec.validate_contract().unwrap_err().to_string();
698 assert!(err.contains("open_duration_ms must be > 0"));
699 }
700
701 #[test]
702 fn canonical_contract_rejection_reason_none_for_regular_steps() {
703 assert!(canonical_contract_rejection_reason("to").is_none());
704 assert!(canonical_contract_rejection_reason("unknown-step").is_none());
705 }
706
707 #[test]
708 fn command_helpers_cover_all_variants() {
709 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
710 let cmds = [
711 RuntimeCommand::RegisterRoute {
712 spec,
713 command_id: "c1".into(),
714 causation_id: Some("root".into()),
715 },
716 RuntimeCommand::StartRoute {
717 route_id: "r1".into(),
718 command_id: "c2".into(),
719 causation_id: None,
720 },
721 RuntimeCommand::StopRoute {
722 route_id: "r1".into(),
723 command_id: "c3".into(),
724 causation_id: None,
725 },
726 RuntimeCommand::SuspendRoute {
727 route_id: "r1".into(),
728 command_id: "c4".into(),
729 causation_id: None,
730 },
731 RuntimeCommand::ResumeRoute {
732 route_id: "r1".into(),
733 command_id: "c5".into(),
734 causation_id: None,
735 },
736 RuntimeCommand::ReloadRoute {
737 route_id: "r1".into(),
738 command_id: "c6".into(),
739 causation_id: None,
740 },
741 RuntimeCommand::FailRoute {
742 route_id: "r1".into(),
743 error: "boom".into(),
744 command_id: "c7".into(),
745 causation_id: None,
746 },
747 RuntimeCommand::RemoveRoute {
748 route_id: "r1".into(),
749 command_id: "c8".into(),
750 causation_id: None,
751 },
752 ];
753
754 let ids: Vec<&str> = cmds.iter().map(RuntimeCommand::command_id).collect();
755 assert_eq!(ids, vec!["c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"]);
756 assert_eq!(cmds[0].causation_id(), Some("root"));
757 assert_eq!(cmds[1].causation_id(), None);
758 }
759
760 #[test]
761 fn canonical_route_spec_serde_roundtrip() {
762 let mut spec = CanonicalRouteSpec::new("test-route", "timer:tick?period=1000");
763 spec.steps.push(CanonicalStepSpec::Log {
764 message: "Hello".into(),
765 });
766 spec.steps.push(CanonicalStepSpec::To {
767 uri: "log:info".into(),
768 });
769 spec.steps.push(CanonicalStepSpec::Stop);
770
771 let json = serde_json::to_string(&spec).unwrap();
772 let deserialized: CanonicalRouteSpec = serde_json::from_str(&json).unwrap();
773 assert_eq!(spec, deserialized);
774 }
775
776 #[test]
777 fn canonical_step_spec_serde_variants() {
778 let steps = vec![
779 CanonicalStepSpec::To {
780 uri: "direct:a".into(),
781 },
782 CanonicalStepSpec::Log {
783 message: "msg".into(),
784 },
785 CanonicalStepSpec::WireTap {
786 uri: "direct:audit".into(),
787 },
788 CanonicalStepSpec::Stop,
789 CanonicalStepSpec::Delay {
790 delay_ms: 100,
791 dynamic_header: None,
792 },
793 ];
794 let json = serde_json::to_string_pretty(&steps).unwrap();
795 let back: Vec<CanonicalStepSpec> = serde_json::from_str(&json).unwrap();
796 assert_eq!(steps, back);
797 }
798
799 #[test]
800 fn canonical_route_spec_json_schema_generates() {
801 let schema = schemars::schema_for!(CanonicalRouteSpec);
802 let json = serde_json::to_string(&schema).unwrap();
803 assert!(json.contains("CanonicalRouteSpec"));
804 assert!(json.contains("route_id"));
805 }
806
807 #[test]
808 fn canonical_json_schema_has_no_function_step() {
809 let schema = schemars::schema_for!(CanonicalRouteSpec);
810 let json = serde_json::to_string(&schema).unwrap();
811 assert!(
812 !json.contains("\"function\""),
813 "canonical JSON schema must not contain 'function' step"
814 );
815 }
816
817 #[test]
818 fn canonical_contract_does_not_support_function() {
819 assert!(
820 !canonical_contract_supports_step("function"),
821 "function must not be in CANONICAL_CONTRACT_SUPPORTED_STEPS"
822 );
823 }
824
825 #[test]
826 fn runtime_command_result_all_variants_are_distinct() {
827 let accepted = RuntimeCommandResult::Accepted;
828 let dup = RuntimeCommandResult::Duplicate {
829 command_id: "c1".into(),
830 };
831 let registered = RuntimeCommandResult::RouteRegistered {
832 route_id: "r1".into(),
833 };
834 let changed = RuntimeCommandResult::RouteStateChanged {
835 route_id: "r1".into(),
836 status: "Started".into(),
837 };
838
839 assert_ne!(accepted, dup);
840 assert_ne!(dup, registered);
841 assert_ne!(registered, changed);
842
843 let dup2 = RuntimeCommandResult::Duplicate {
844 command_id: "c1".into(),
845 };
846 assert_eq!(dup, dup2);
847 }
848
849 #[test]
850 fn runtime_event_serialization_round_trip() {
851 let event = RuntimeEvent::RouteFailed {
852 route_id: "route-a".to_string(),
853 error: "boom".to_string(),
854 };
855 let json = serde_json::to_string(&event).unwrap();
856 let back: RuntimeEvent = serde_json::from_str(&json).unwrap();
857 assert_eq!(event, back);
858 }
859
860 #[test]
861 fn noop_runtime_execute_and_ask_return_expected_shapes() {
862 let rt = NoopRuntime;
863 let cmd = RuntimeCommand::RegisterRoute {
864 spec: CanonicalRouteSpec::new("r2", "timer:tick"),
865 command_id: "c1".into(),
866 causation_id: None,
867 };
868 let cmd_result = block_on(rt.execute(cmd)).unwrap();
869 assert_eq!(
870 cmd_result,
871 RuntimeCommandResult::RouteRegistered {
872 route_id: "r2".into()
873 }
874 );
875
876 let query_result = block_on(rt.ask(RuntimeQuery::GetRouteStatus {
877 route_id: "r2".into(),
878 }))
879 .unwrap();
880 assert_eq!(
881 query_result,
882 RuntimeQueryResult::RouteStatus {
883 route_id: "r2".into(),
884 status: "Started".into()
885 }
886 );
887 }
888
889 #[test]
890 fn canonical_contract_name_and_version_constants_match() {
891 assert_eq!(CANONICAL_CONTRACT_NAME, "canonical-v1");
892 assert_eq!(CANONICAL_CONTRACT_VERSION, 1);
893 }
894}