1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10 "to",
11 "log",
12 "wire_tap",
13 "script",
14 "filter",
15 "choice",
16 "split",
17 "aggregate",
18 "stop",
19 "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22 &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24 "set_header",
25 "set_body",
26 "multicast",
27 "convert_body_to",
28 "bean",
29 "marshal",
30 "unmarshal",
31];
32pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
33 "processor",
34 "process",
35 "process_fn",
36 "map_body",
37 "set_body_fn",
38 "set_header_fn",
39];
40
41pub fn canonical_contract_supports_step(step: &str) -> bool {
42 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
43}
44
45pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
46 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
47 return Some(
48 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
49 );
50 }
51
52 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
53 return Some("rust-only programmable step; not representable in canonical v1 contract");
54 }
55
56 if canonical_contract_supports_step(step)
57 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
58 {
59 return Some(
60 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
61 );
62 }
63
64 None
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct CanonicalRouteSpec {
69 pub route_id: String,
75 pub from: String,
76 pub steps: Vec<CanonicalStepSpec>,
77 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
78 pub version: u32,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub enum CanonicalStepSpec {
83 To {
84 uri: String,
85 },
86 Log {
87 message: String,
88 },
89 WireTap {
90 uri: String,
91 },
92 Script {
93 expression: LanguageExpressionDef,
94 },
95 Filter {
96 predicate: LanguageExpressionDef,
97 steps: Vec<CanonicalStepSpec>,
98 },
99 Choice {
100 whens: Vec<CanonicalWhenSpec>,
101 otherwise: Option<Vec<CanonicalStepSpec>>,
102 },
103 Split {
104 expression: CanonicalSplitExpressionSpec,
105 aggregation: CanonicalSplitAggregationSpec,
106 parallel: bool,
107 parallel_limit: Option<usize>,
108 stop_on_exception: bool,
109 steps: Vec<CanonicalStepSpec>,
110 },
111 Aggregate {
112 config: CanonicalAggregateSpec,
113 },
114 Stop,
115 Delay {
116 delay_ms: u64,
117 dynamic_header: Option<String>,
118 },
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct CanonicalWhenSpec {
123 pub predicate: LanguageExpressionDef,
124 pub steps: Vec<CanonicalStepSpec>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum CanonicalSplitExpressionSpec {
129 BodyLines,
130 BodyJsonArray,
131 Language(LanguageExpressionDef),
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub enum CanonicalSplitAggregationSpec {
136 LastWins,
137 CollectAll,
138 Original,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum CanonicalAggregateStrategySpec {
143 CollectAll,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct CanonicalAggregateSpec {
148 pub header: String,
149 pub completion_size: Option<usize>,
150 pub completion_timeout_ms: Option<u64>,
151 pub correlation_key: Option<String>,
152 pub force_completion_on_stop: Option<bool>,
153 pub discard_on_timeout: Option<bool>,
154 pub strategy: CanonicalAggregateStrategySpec,
155 pub max_buckets: Option<usize>,
156 pub bucket_ttl_ms: Option<u64>,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct CanonicalCircuitBreakerSpec {
161 pub failure_threshold: u32,
162 pub open_duration_ms: u64,
163}
164
165impl CanonicalRouteSpec {
166 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
167 Self {
168 route_id: route_id.into(),
169 from: from.into(),
170 steps: Vec::new(),
171 circuit_breaker: None,
172 version: CANONICAL_CONTRACT_VERSION,
173 }
174 }
175
176 pub fn validate_contract(&self) -> Result<(), CamelError> {
177 if self.route_id.trim().is_empty() {
178 return Err(CamelError::RouteError(
179 "canonical contract violation: route_id cannot be empty".to_string(),
180 ));
181 }
182 if self.from.trim().is_empty() {
183 return Err(CamelError::RouteError(
184 "canonical contract violation: from cannot be empty".to_string(),
185 ));
186 }
187 if self.version != CANONICAL_CONTRACT_VERSION {
188 return Err(CamelError::RouteError(format!(
189 "canonical contract violation: expected version {}, got {}",
190 CANONICAL_CONTRACT_VERSION, self.version
191 )));
192 }
193 validate_steps(&self.steps)?;
194 if let Some(cb) = &self.circuit_breaker {
195 if cb.failure_threshold == 0 {
196 return Err(CamelError::RouteError(
197 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
198 .to_string(),
199 ));
200 }
201 if cb.open_duration_ms == 0 {
202 return Err(CamelError::RouteError(
203 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
204 .to_string(),
205 ));
206 }
207 }
208 Ok(())
209 }
210}
211
212fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
213 for step in steps {
214 match step {
215 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
216 if uri.trim().is_empty() {
217 return Err(CamelError::RouteError(
218 "canonical contract violation: endpoint uri cannot be empty".to_string(),
219 ));
220 }
221 }
222 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
223 CanonicalStepSpec::Choice { whens, otherwise } => {
224 for when in whens {
225 validate_steps(&when.steps)?;
226 }
227 if let Some(otherwise) = otherwise {
228 validate_steps(otherwise)?;
229 }
230 }
231 CanonicalStepSpec::Split {
232 parallel_limit,
233 steps,
234 ..
235 } => {
236 if let Some(limit) = parallel_limit
237 && *limit == 0
238 {
239 return Err(CamelError::RouteError(
240 "canonical contract violation: split.parallel_limit must be > 0"
241 .to_string(),
242 ));
243 }
244 validate_steps(steps)?;
245 }
246 CanonicalStepSpec::Aggregate { config } => {
247 if config.header.trim().is_empty() {
248 return Err(CamelError::RouteError(
249 "canonical contract violation: aggregate.header cannot be empty"
250 .to_string(),
251 ));
252 }
253 if let Some(size) = config.completion_size
254 && size == 0
255 {
256 return Err(CamelError::RouteError(
257 "canonical contract violation: aggregate.completion_size must be > 0"
258 .to_string(),
259 ));
260 }
261 }
262 CanonicalStepSpec::Log { .. }
263 | CanonicalStepSpec::Script { .. }
264 | CanonicalStepSpec::Stop
265 | CanonicalStepSpec::Delay { .. } => {}
266 }
267 }
268 Ok(())
269}
270
271#[derive(Debug, Clone, PartialEq, Eq)]
272pub enum RuntimeCommand {
273 RegisterRoute {
274 spec: CanonicalRouteSpec,
275 command_id: String,
276 causation_id: Option<String>,
277 },
278 StartRoute {
279 route_id: String,
280 command_id: String,
281 causation_id: Option<String>,
282 },
283 StopRoute {
284 route_id: String,
285 command_id: String,
286 causation_id: Option<String>,
287 },
288 SuspendRoute {
289 route_id: String,
290 command_id: String,
291 causation_id: Option<String>,
292 },
293 ResumeRoute {
294 route_id: String,
295 command_id: String,
296 causation_id: Option<String>,
297 },
298 ReloadRoute {
299 route_id: String,
300 command_id: String,
301 causation_id: Option<String>,
302 },
303 FailRoute {
307 route_id: String,
308 error: String,
309 command_id: String,
310 causation_id: Option<String>,
311 },
312 RemoveRoute {
313 route_id: String,
314 command_id: String,
315 causation_id: Option<String>,
316 },
317}
318
319impl RuntimeCommand {
320 pub fn command_id(&self) -> &str {
321 match self {
322 RuntimeCommand::RegisterRoute { command_id, .. }
323 | RuntimeCommand::StartRoute { command_id, .. }
324 | RuntimeCommand::StopRoute { command_id, .. }
325 | RuntimeCommand::SuspendRoute { command_id, .. }
326 | RuntimeCommand::ResumeRoute { command_id, .. }
327 | RuntimeCommand::ReloadRoute { command_id, .. }
328 | RuntimeCommand::FailRoute { command_id, .. }
329 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
330 }
331 }
332
333 pub fn causation_id(&self) -> Option<&str> {
334 match self {
335 RuntimeCommand::RegisterRoute { causation_id, .. }
336 | RuntimeCommand::StartRoute { causation_id, .. }
337 | RuntimeCommand::StopRoute { causation_id, .. }
338 | RuntimeCommand::SuspendRoute { causation_id, .. }
339 | RuntimeCommand::ResumeRoute { causation_id, .. }
340 | RuntimeCommand::ReloadRoute { causation_id, .. }
341 | RuntimeCommand::FailRoute { causation_id, .. }
342 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
343 }
344 }
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
348pub enum RuntimeCommandResult {
349 Accepted,
350 Duplicate { command_id: String },
351 RouteRegistered { route_id: String },
352 RouteStateChanged { route_id: String, status: String },
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub enum RuntimeQuery {
357 GetRouteStatus {
358 route_id: String,
359 },
360 InFlightCount {
364 route_id: String,
365 },
366 ListRoutes,
367}
368
369#[derive(Debug, Clone, PartialEq, Eq)]
370pub enum RuntimeQueryResult {
371 InFlightCount { route_id: String, count: u64 },
372 RouteNotFound { route_id: String },
373 RouteStatus { route_id: String, status: String },
374 Routes { route_ids: Vec<String> },
375}
376
377#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
378pub enum RuntimeEvent {
379 RouteRegistered { route_id: String },
380 RouteStartRequested { route_id: String },
381 RouteStarted { route_id: String },
382 RouteFailed { route_id: String, error: String },
383 RouteStopped { route_id: String },
384 RouteSuspended { route_id: String },
385 RouteResumed { route_id: String },
386 RouteReloaded { route_id: String },
387 RouteRemoved { route_id: String },
388}
389
390#[async_trait]
391pub trait RuntimeCommandBus: Send + Sync {
392 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
393}
394
395#[async_trait]
396pub trait RuntimeQueryBus: Send + Sync {
397 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
398}
399
400pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
401
402impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn command_and_query_ids_are_exposed() {
410 let cmd = RuntimeCommand::StartRoute {
411 route_id: "r1".into(),
412 command_id: "c1".into(),
413 causation_id: None,
414 };
415 assert_eq!(cmd.command_id(), "c1");
416 }
417
418 #[test]
419 fn canonical_spec_requires_route_id_and_from() {
420 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
421 assert_eq!(spec.route_id, "r1");
422 assert_eq!(spec.from, "timer:tick");
423 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
424 assert!(spec.steps.is_empty());
425 assert!(spec.circuit_breaker.is_none());
426 }
427
428 #[test]
429 fn canonical_contract_rejects_invalid_version() {
430 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
431 spec.version = 2;
432 let err = spec.validate_contract().unwrap_err().to_string();
433 assert!(err.contains("expected version"));
434 }
435
436 #[test]
437 fn canonical_contract_declares_subset_scope() {
438 assert!(canonical_contract_supports_step("to"));
439 assert!(canonical_contract_supports_step("split"));
440 assert!(!canonical_contract_supports_step("set_header"));
441
442 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
443 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
444 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
445 }
446
447 #[test]
448 fn canonical_contract_rejection_reason_is_explicit() {
449 let set_header_reason = canonical_contract_rejection_reason("set_header")
450 .expect("set_header should have explicit reason");
451 assert!(set_header_reason.contains("out-of-scope"));
452
453 let processor_reason = canonical_contract_rejection_reason("processor")
454 .expect("processor should be rust-only");
455 assert!(processor_reason.contains("rust-only"));
456
457 let split_reason = canonical_contract_rejection_reason("split")
458 .expect("split should require declarative form");
459 assert!(split_reason.contains("declarative"));
460 }
461
462 #[test]
463 fn command_causation_id_is_exposed() {
464 let cmd = RuntimeCommand::StopRoute {
465 route_id: "r1".into(),
466 command_id: "c2".into(),
467 causation_id: Some("c1".into()),
468 };
469 assert_eq!(cmd.command_id(), "c2");
470 assert_eq!(cmd.causation_id(), Some("c1"));
471 }
472}