1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10 "to",
11 "log",
12 "wire_tap",
13 "script",
14 "filter",
15 "choice",
16 "split",
17 "aggregate",
18 "stop",
19];
20pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
21 &["script", "filter", "choice", "split"];
22pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
23 "set_header",
24 "set_body",
25 "multicast",
26 "convert_body_to",
27 "bean",
28];
29pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
30 "processor",
31 "process",
32 "process_fn",
33 "map_body",
34 "set_body_fn",
35 "set_header_fn",
36];
37
38pub fn canonical_contract_supports_step(step: &str) -> bool {
39 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
40}
41
42pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
43 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
44 return Some(
45 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
46 );
47 }
48
49 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
50 return Some("rust-only programmable step; not representable in canonical v1 contract");
51 }
52
53 if canonical_contract_supports_step(step)
54 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
55 {
56 return Some(
57 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
58 );
59 }
60
61 None
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct CanonicalRouteSpec {
66 pub route_id: String,
72 pub from: String,
73 pub steps: Vec<CanonicalStepSpec>,
74 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
75 pub version: u32,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum CanonicalStepSpec {
80 To {
81 uri: String,
82 },
83 Log {
84 message: String,
85 },
86 WireTap {
87 uri: String,
88 },
89 Script {
90 expression: LanguageExpressionDef,
91 },
92 Filter {
93 predicate: LanguageExpressionDef,
94 steps: Vec<CanonicalStepSpec>,
95 },
96 Choice {
97 whens: Vec<CanonicalWhenSpec>,
98 otherwise: Option<Vec<CanonicalStepSpec>>,
99 },
100 Split {
101 expression: CanonicalSplitExpressionSpec,
102 aggregation: CanonicalSplitAggregationSpec,
103 parallel: bool,
104 parallel_limit: Option<usize>,
105 stop_on_exception: bool,
106 steps: Vec<CanonicalStepSpec>,
107 },
108 Aggregate {
109 config: CanonicalAggregateSpec,
110 },
111 Stop,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct CanonicalWhenSpec {
116 pub predicate: LanguageExpressionDef,
117 pub steps: Vec<CanonicalStepSpec>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum CanonicalSplitExpressionSpec {
122 BodyLines,
123 BodyJsonArray,
124 Language(LanguageExpressionDef),
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum CanonicalSplitAggregationSpec {
129 LastWins,
130 CollectAll,
131 Original,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub enum CanonicalAggregateStrategySpec {
136 CollectAll,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct CanonicalAggregateSpec {
141 pub header: String,
142 pub completion_size: Option<usize>,
143 pub strategy: CanonicalAggregateStrategySpec,
144 pub max_buckets: Option<usize>,
145 pub bucket_ttl_ms: Option<u64>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct CanonicalCircuitBreakerSpec {
150 pub failure_threshold: u32,
151 pub open_duration_ms: u64,
152}
153
154impl CanonicalRouteSpec {
155 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
156 Self {
157 route_id: route_id.into(),
158 from: from.into(),
159 steps: Vec::new(),
160 circuit_breaker: None,
161 version: CANONICAL_CONTRACT_VERSION,
162 }
163 }
164
165 pub fn validate_contract(&self) -> Result<(), CamelError> {
166 if self.route_id.trim().is_empty() {
167 return Err(CamelError::RouteError(
168 "canonical contract violation: route_id cannot be empty".to_string(),
169 ));
170 }
171 if self.from.trim().is_empty() {
172 return Err(CamelError::RouteError(
173 "canonical contract violation: from cannot be empty".to_string(),
174 ));
175 }
176 if self.version != CANONICAL_CONTRACT_VERSION {
177 return Err(CamelError::RouteError(format!(
178 "canonical contract violation: expected version {}, got {}",
179 CANONICAL_CONTRACT_VERSION, self.version
180 )));
181 }
182 validate_steps(&self.steps)?;
183 if let Some(cb) = &self.circuit_breaker {
184 if cb.failure_threshold == 0 {
185 return Err(CamelError::RouteError(
186 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
187 .to_string(),
188 ));
189 }
190 if cb.open_duration_ms == 0 {
191 return Err(CamelError::RouteError(
192 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
193 .to_string(),
194 ));
195 }
196 }
197 Ok(())
198 }
199}
200
201fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
202 for step in steps {
203 match step {
204 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
205 if uri.trim().is_empty() {
206 return Err(CamelError::RouteError(
207 "canonical contract violation: endpoint uri cannot be empty".to_string(),
208 ));
209 }
210 }
211 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
212 CanonicalStepSpec::Choice { whens, otherwise } => {
213 for when in whens {
214 validate_steps(&when.steps)?;
215 }
216 if let Some(otherwise) = otherwise {
217 validate_steps(otherwise)?;
218 }
219 }
220 CanonicalStepSpec::Split {
221 parallel_limit,
222 steps,
223 ..
224 } => {
225 if let Some(limit) = parallel_limit
226 && *limit == 0
227 {
228 return Err(CamelError::RouteError(
229 "canonical contract violation: split.parallel_limit must be > 0"
230 .to_string(),
231 ));
232 }
233 validate_steps(steps)?;
234 }
235 CanonicalStepSpec::Aggregate { config } => {
236 if config.header.trim().is_empty() {
237 return Err(CamelError::RouteError(
238 "canonical contract violation: aggregate.header cannot be empty"
239 .to_string(),
240 ));
241 }
242 if let Some(size) = config.completion_size
243 && size == 0
244 {
245 return Err(CamelError::RouteError(
246 "canonical contract violation: aggregate.completion_size must be > 0"
247 .to_string(),
248 ));
249 }
250 }
251 CanonicalStepSpec::Log { .. }
252 | CanonicalStepSpec::Script { .. }
253 | CanonicalStepSpec::Stop => {}
254 }
255 }
256 Ok(())
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub enum RuntimeCommand {
261 RegisterRoute {
262 spec: CanonicalRouteSpec,
263 command_id: String,
264 causation_id: Option<String>,
265 },
266 StartRoute {
267 route_id: String,
268 command_id: String,
269 causation_id: Option<String>,
270 },
271 StopRoute {
272 route_id: String,
273 command_id: String,
274 causation_id: Option<String>,
275 },
276 SuspendRoute {
277 route_id: String,
278 command_id: String,
279 causation_id: Option<String>,
280 },
281 ResumeRoute {
282 route_id: String,
283 command_id: String,
284 causation_id: Option<String>,
285 },
286 ReloadRoute {
287 route_id: String,
288 command_id: String,
289 causation_id: Option<String>,
290 },
291 FailRoute {
295 route_id: String,
296 error: String,
297 command_id: String,
298 causation_id: Option<String>,
299 },
300 RemoveRoute {
301 route_id: String,
302 command_id: String,
303 causation_id: Option<String>,
304 },
305}
306
307impl RuntimeCommand {
308 pub fn command_id(&self) -> &str {
309 match self {
310 RuntimeCommand::RegisterRoute { command_id, .. }
311 | RuntimeCommand::StartRoute { command_id, .. }
312 | RuntimeCommand::StopRoute { command_id, .. }
313 | RuntimeCommand::SuspendRoute { command_id, .. }
314 | RuntimeCommand::ResumeRoute { command_id, .. }
315 | RuntimeCommand::ReloadRoute { command_id, .. }
316 | RuntimeCommand::FailRoute { command_id, .. }
317 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
318 }
319 }
320
321 pub fn causation_id(&self) -> Option<&str> {
322 match self {
323 RuntimeCommand::RegisterRoute { causation_id, .. }
324 | RuntimeCommand::StartRoute { causation_id, .. }
325 | RuntimeCommand::StopRoute { causation_id, .. }
326 | RuntimeCommand::SuspendRoute { causation_id, .. }
327 | RuntimeCommand::ResumeRoute { causation_id, .. }
328 | RuntimeCommand::ReloadRoute { causation_id, .. }
329 | RuntimeCommand::FailRoute { causation_id, .. }
330 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
331 }
332 }
333}
334
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub enum RuntimeCommandResult {
337 Accepted,
338 Duplicate { command_id: String },
339 RouteRegistered { route_id: String },
340 RouteStateChanged { route_id: String, status: String },
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub enum RuntimeQuery {
345 GetRouteStatus { route_id: String },
346 ListRoutes,
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub enum RuntimeQueryResult {
351 RouteStatus { route_id: String, status: String },
352 Routes { route_ids: Vec<String> },
353}
354
355#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
356pub enum RuntimeEvent {
357 RouteRegistered { route_id: String },
358 RouteStartRequested { route_id: String },
359 RouteStarted { route_id: String },
360 RouteFailed { route_id: String, error: String },
361 RouteStopped { route_id: String },
362 RouteSuspended { route_id: String },
363 RouteResumed { route_id: String },
364 RouteReloaded { route_id: String },
365 RouteRemoved { route_id: String },
366}
367
368#[async_trait]
369pub trait RuntimeCommandBus: Send + Sync {
370 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
371}
372
373#[async_trait]
374pub trait RuntimeQueryBus: Send + Sync {
375 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
376}
377
378pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
379
380impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 #[test]
387 fn command_and_query_ids_are_exposed() {
388 let cmd = RuntimeCommand::StartRoute {
389 route_id: "r1".into(),
390 command_id: "c1".into(),
391 causation_id: None,
392 };
393 assert_eq!(cmd.command_id(), "c1");
394 }
395
396 #[test]
397 fn canonical_spec_requires_route_id_and_from() {
398 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
399 assert_eq!(spec.route_id, "r1");
400 assert_eq!(spec.from, "timer:tick");
401 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
402 assert!(spec.steps.is_empty());
403 assert!(spec.circuit_breaker.is_none());
404 }
405
406 #[test]
407 fn canonical_contract_rejects_invalid_version() {
408 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
409 spec.version = 2;
410 let err = spec.validate_contract().unwrap_err().to_string();
411 assert!(err.contains("expected version"));
412 }
413
414 #[test]
415 fn canonical_contract_declares_subset_scope() {
416 assert!(canonical_contract_supports_step("to"));
417 assert!(canonical_contract_supports_step("split"));
418 assert!(!canonical_contract_supports_step("set_header"));
419
420 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
421 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
422 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
423 }
424
425 #[test]
426 fn canonical_contract_rejection_reason_is_explicit() {
427 let set_header_reason = canonical_contract_rejection_reason("set_header")
428 .expect("set_header should have explicit reason");
429 assert!(set_header_reason.contains("out-of-scope"));
430
431 let processor_reason = canonical_contract_rejection_reason("processor")
432 .expect("processor should be rust-only");
433 assert!(processor_reason.contains("rust-only"));
434
435 let split_reason = canonical_contract_rejection_reason("split")
436 .expect("split should require declarative form");
437 assert!(split_reason.contains("declarative"));
438 }
439
440 #[test]
441 fn command_causation_id_is_exposed() {
442 let cmd = RuntimeCommand::StopRoute {
443 route_id: "r1".into(),
444 command_id: "c2".into(),
445 causation_id: Some("c1".into()),
446 };
447 assert_eq!(cmd.command_id(), "c2");
448 assert_eq!(cmd.causation_id(), Some("c1"));
449 }
450}