1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10 "to",
11 "log",
12 "wire_tap",
13 "script",
14 "filter",
15 "choice",
16 "split",
17 "aggregate",
18 "stop",
19];
20pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
21 &["script", "filter", "choice", "split"];
22pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
23 "set_header",
24 "set_body",
25 "multicast",
26 "convert_body_to",
27 "bean",
28 "marshal",
29 "unmarshal",
30];
31pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
32 "processor",
33 "process",
34 "process_fn",
35 "map_body",
36 "set_body_fn",
37 "set_header_fn",
38];
39
40pub fn canonical_contract_supports_step(step: &str) -> bool {
41 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
42}
43
44pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
45 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
46 return Some(
47 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
48 );
49 }
50
51 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
52 return Some("rust-only programmable step; not representable in canonical v1 contract");
53 }
54
55 if canonical_contract_supports_step(step)
56 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
57 {
58 return Some(
59 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
60 );
61 }
62
63 None
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct CanonicalRouteSpec {
68 pub route_id: String,
74 pub from: String,
75 pub steps: Vec<CanonicalStepSpec>,
76 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
77 pub version: u32,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum CanonicalStepSpec {
82 To {
83 uri: String,
84 },
85 Log {
86 message: String,
87 },
88 WireTap {
89 uri: String,
90 },
91 Script {
92 expression: LanguageExpressionDef,
93 },
94 Filter {
95 predicate: LanguageExpressionDef,
96 steps: Vec<CanonicalStepSpec>,
97 },
98 Choice {
99 whens: Vec<CanonicalWhenSpec>,
100 otherwise: Option<Vec<CanonicalStepSpec>>,
101 },
102 Split {
103 expression: CanonicalSplitExpressionSpec,
104 aggregation: CanonicalSplitAggregationSpec,
105 parallel: bool,
106 parallel_limit: Option<usize>,
107 stop_on_exception: bool,
108 steps: Vec<CanonicalStepSpec>,
109 },
110 Aggregate {
111 config: CanonicalAggregateSpec,
112 },
113 Stop,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct CanonicalWhenSpec {
118 pub predicate: LanguageExpressionDef,
119 pub steps: Vec<CanonicalStepSpec>,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CanonicalSplitExpressionSpec {
124 BodyLines,
125 BodyJsonArray,
126 Language(LanguageExpressionDef),
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub enum CanonicalSplitAggregationSpec {
131 LastWins,
132 CollectAll,
133 Original,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CanonicalAggregateStrategySpec {
138 CollectAll,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct CanonicalAggregateSpec {
143 pub header: String,
144 pub completion_size: Option<usize>,
145 pub completion_timeout_ms: Option<u64>,
146 pub correlation_key: Option<String>,
147 pub force_completion_on_stop: Option<bool>,
148 pub discard_on_timeout: Option<bool>,
149 pub strategy: CanonicalAggregateStrategySpec,
150 pub max_buckets: Option<usize>,
151 pub bucket_ttl_ms: Option<u64>,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct CanonicalCircuitBreakerSpec {
156 pub failure_threshold: u32,
157 pub open_duration_ms: u64,
158}
159
160impl CanonicalRouteSpec {
161 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
162 Self {
163 route_id: route_id.into(),
164 from: from.into(),
165 steps: Vec::new(),
166 circuit_breaker: None,
167 version: CANONICAL_CONTRACT_VERSION,
168 }
169 }
170
171 pub fn validate_contract(&self) -> Result<(), CamelError> {
172 if self.route_id.trim().is_empty() {
173 return Err(CamelError::RouteError(
174 "canonical contract violation: route_id cannot be empty".to_string(),
175 ));
176 }
177 if self.from.trim().is_empty() {
178 return Err(CamelError::RouteError(
179 "canonical contract violation: from cannot be empty".to_string(),
180 ));
181 }
182 if self.version != CANONICAL_CONTRACT_VERSION {
183 return Err(CamelError::RouteError(format!(
184 "canonical contract violation: expected version {}, got {}",
185 CANONICAL_CONTRACT_VERSION, self.version
186 )));
187 }
188 validate_steps(&self.steps)?;
189 if let Some(cb) = &self.circuit_breaker {
190 if cb.failure_threshold == 0 {
191 return Err(CamelError::RouteError(
192 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
193 .to_string(),
194 ));
195 }
196 if cb.open_duration_ms == 0 {
197 return Err(CamelError::RouteError(
198 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
199 .to_string(),
200 ));
201 }
202 }
203 Ok(())
204 }
205}
206
207fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
208 for step in steps {
209 match step {
210 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
211 if uri.trim().is_empty() {
212 return Err(CamelError::RouteError(
213 "canonical contract violation: endpoint uri cannot be empty".to_string(),
214 ));
215 }
216 }
217 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
218 CanonicalStepSpec::Choice { whens, otherwise } => {
219 for when in whens {
220 validate_steps(&when.steps)?;
221 }
222 if let Some(otherwise) = otherwise {
223 validate_steps(otherwise)?;
224 }
225 }
226 CanonicalStepSpec::Split {
227 parallel_limit,
228 steps,
229 ..
230 } => {
231 if let Some(limit) = parallel_limit
232 && *limit == 0
233 {
234 return Err(CamelError::RouteError(
235 "canonical contract violation: split.parallel_limit must be > 0"
236 .to_string(),
237 ));
238 }
239 validate_steps(steps)?;
240 }
241 CanonicalStepSpec::Aggregate { config } => {
242 if config.header.trim().is_empty() {
243 return Err(CamelError::RouteError(
244 "canonical contract violation: aggregate.header cannot be empty"
245 .to_string(),
246 ));
247 }
248 if let Some(size) = config.completion_size
249 && size == 0
250 {
251 return Err(CamelError::RouteError(
252 "canonical contract violation: aggregate.completion_size must be > 0"
253 .to_string(),
254 ));
255 }
256 }
257 CanonicalStepSpec::Log { .. }
258 | CanonicalStepSpec::Script { .. }
259 | CanonicalStepSpec::Stop => {}
260 }
261 }
262 Ok(())
263}
264
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub enum RuntimeCommand {
267 RegisterRoute {
268 spec: CanonicalRouteSpec,
269 command_id: String,
270 causation_id: Option<String>,
271 },
272 StartRoute {
273 route_id: String,
274 command_id: String,
275 causation_id: Option<String>,
276 },
277 StopRoute {
278 route_id: String,
279 command_id: String,
280 causation_id: Option<String>,
281 },
282 SuspendRoute {
283 route_id: String,
284 command_id: String,
285 causation_id: Option<String>,
286 },
287 ResumeRoute {
288 route_id: String,
289 command_id: String,
290 causation_id: Option<String>,
291 },
292 ReloadRoute {
293 route_id: String,
294 command_id: String,
295 causation_id: Option<String>,
296 },
297 FailRoute {
301 route_id: String,
302 error: String,
303 command_id: String,
304 causation_id: Option<String>,
305 },
306 RemoveRoute {
307 route_id: String,
308 command_id: String,
309 causation_id: Option<String>,
310 },
311}
312
313impl RuntimeCommand {
314 pub fn command_id(&self) -> &str {
315 match self {
316 RuntimeCommand::RegisterRoute { command_id, .. }
317 | RuntimeCommand::StartRoute { command_id, .. }
318 | RuntimeCommand::StopRoute { command_id, .. }
319 | RuntimeCommand::SuspendRoute { command_id, .. }
320 | RuntimeCommand::ResumeRoute { command_id, .. }
321 | RuntimeCommand::ReloadRoute { command_id, .. }
322 | RuntimeCommand::FailRoute { command_id, .. }
323 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
324 }
325 }
326
327 pub fn causation_id(&self) -> Option<&str> {
328 match self {
329 RuntimeCommand::RegisterRoute { causation_id, .. }
330 | RuntimeCommand::StartRoute { causation_id, .. }
331 | RuntimeCommand::StopRoute { causation_id, .. }
332 | RuntimeCommand::SuspendRoute { causation_id, .. }
333 | RuntimeCommand::ResumeRoute { causation_id, .. }
334 | RuntimeCommand::ReloadRoute { causation_id, .. }
335 | RuntimeCommand::FailRoute { causation_id, .. }
336 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
337 }
338 }
339}
340
341#[derive(Debug, Clone, PartialEq, Eq)]
342pub enum RuntimeCommandResult {
343 Accepted,
344 Duplicate { command_id: String },
345 RouteRegistered { route_id: String },
346 RouteStateChanged { route_id: String, status: String },
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub enum RuntimeQuery {
351 GetRouteStatus {
352 route_id: String,
353 },
354 InFlightCount {
358 route_id: String,
359 },
360 ListRoutes,
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub enum RuntimeQueryResult {
365 InFlightCount { route_id: String, count: u64 },
366 RouteNotFound { route_id: String },
367 RouteStatus { route_id: String, status: String },
368 Routes { route_ids: Vec<String> },
369}
370
371#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
372pub enum RuntimeEvent {
373 RouteRegistered { route_id: String },
374 RouteStartRequested { route_id: String },
375 RouteStarted { route_id: String },
376 RouteFailed { route_id: String, error: String },
377 RouteStopped { route_id: String },
378 RouteSuspended { route_id: String },
379 RouteResumed { route_id: String },
380 RouteReloaded { route_id: String },
381 RouteRemoved { route_id: String },
382}
383
384#[async_trait]
385pub trait RuntimeCommandBus: Send + Sync {
386 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
387}
388
389#[async_trait]
390pub trait RuntimeQueryBus: Send + Sync {
391 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
392}
393
394pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
395
396impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn command_and_query_ids_are_exposed() {
404 let cmd = RuntimeCommand::StartRoute {
405 route_id: "r1".into(),
406 command_id: "c1".into(),
407 causation_id: None,
408 };
409 assert_eq!(cmd.command_id(), "c1");
410 }
411
412 #[test]
413 fn canonical_spec_requires_route_id_and_from() {
414 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
415 assert_eq!(spec.route_id, "r1");
416 assert_eq!(spec.from, "timer:tick");
417 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
418 assert!(spec.steps.is_empty());
419 assert!(spec.circuit_breaker.is_none());
420 }
421
422 #[test]
423 fn canonical_contract_rejects_invalid_version() {
424 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
425 spec.version = 2;
426 let err = spec.validate_contract().unwrap_err().to_string();
427 assert!(err.contains("expected version"));
428 }
429
430 #[test]
431 fn canonical_contract_declares_subset_scope() {
432 assert!(canonical_contract_supports_step("to"));
433 assert!(canonical_contract_supports_step("split"));
434 assert!(!canonical_contract_supports_step("set_header"));
435
436 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
437 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
438 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
439 }
440
441 #[test]
442 fn canonical_contract_rejection_reason_is_explicit() {
443 let set_header_reason = canonical_contract_rejection_reason("set_header")
444 .expect("set_header should have explicit reason");
445 assert!(set_header_reason.contains("out-of-scope"));
446
447 let processor_reason = canonical_contract_rejection_reason("processor")
448 .expect("processor should be rust-only");
449 assert!(processor_reason.contains("rust-only"));
450
451 let split_reason = canonical_contract_rejection_reason("split")
452 .expect("split should require declarative form");
453 assert!(split_reason.contains("declarative"));
454 }
455
456 #[test]
457 fn command_causation_id_is_exposed() {
458 let cmd = RuntimeCommand::StopRoute {
459 route_id: "r1".into(),
460 command_id: "c2".into(),
461 causation_id: Some("c1".into()),
462 };
463 assert_eq!(cmd.command_id(), "c2");
464 assert_eq!(cmd.causation_id(), Some("c1"));
465 }
466}