1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10 "to",
11 "log",
12 "wire_tap",
13 "script",
14 "filter",
15 "choice",
16 "split",
17 "aggregate",
18 "stop",
19];
20pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
21 &["script", "filter", "choice", "split"];
22pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
23 "set_header",
24 "set_body",
25 "multicast",
26 "convert_body_to",
27 "bean",
28 "marshal",
29 "unmarshal",
30];
31pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
32 "processor",
33 "process",
34 "process_fn",
35 "map_body",
36 "set_body_fn",
37 "set_header_fn",
38];
39
40pub fn canonical_contract_supports_step(step: &str) -> bool {
41 CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
42}
43
44pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
45 if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
46 return Some(
47 "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
48 );
49 }
50
51 if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
52 return Some("rust-only programmable step; not representable in canonical v1 contract");
53 }
54
55 if canonical_contract_supports_step(step)
56 && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
57 {
58 return Some(
59 "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
60 );
61 }
62
63 None
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct CanonicalRouteSpec {
68 pub route_id: String,
74 pub from: String,
75 pub steps: Vec<CanonicalStepSpec>,
76 pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
77 pub version: u32,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum CanonicalStepSpec {
82 To {
83 uri: String,
84 },
85 Log {
86 message: String,
87 },
88 WireTap {
89 uri: String,
90 },
91 Script {
92 expression: LanguageExpressionDef,
93 },
94 Filter {
95 predicate: LanguageExpressionDef,
96 steps: Vec<CanonicalStepSpec>,
97 },
98 Choice {
99 whens: Vec<CanonicalWhenSpec>,
100 otherwise: Option<Vec<CanonicalStepSpec>>,
101 },
102 Split {
103 expression: CanonicalSplitExpressionSpec,
104 aggregation: CanonicalSplitAggregationSpec,
105 parallel: bool,
106 parallel_limit: Option<usize>,
107 stop_on_exception: bool,
108 steps: Vec<CanonicalStepSpec>,
109 },
110 Aggregate {
111 config: CanonicalAggregateSpec,
112 },
113 Stop,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct CanonicalWhenSpec {
118 pub predicate: LanguageExpressionDef,
119 pub steps: Vec<CanonicalStepSpec>,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CanonicalSplitExpressionSpec {
124 BodyLines,
125 BodyJsonArray,
126 Language(LanguageExpressionDef),
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub enum CanonicalSplitAggregationSpec {
131 LastWins,
132 CollectAll,
133 Original,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CanonicalAggregateStrategySpec {
138 CollectAll,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct CanonicalAggregateSpec {
143 pub header: String,
144 pub completion_size: Option<usize>,
145 pub strategy: CanonicalAggregateStrategySpec,
146 pub max_buckets: Option<usize>,
147 pub bucket_ttl_ms: Option<u64>,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct CanonicalCircuitBreakerSpec {
152 pub failure_threshold: u32,
153 pub open_duration_ms: u64,
154}
155
156impl CanonicalRouteSpec {
157 pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
158 Self {
159 route_id: route_id.into(),
160 from: from.into(),
161 steps: Vec::new(),
162 circuit_breaker: None,
163 version: CANONICAL_CONTRACT_VERSION,
164 }
165 }
166
167 pub fn validate_contract(&self) -> Result<(), CamelError> {
168 if self.route_id.trim().is_empty() {
169 return Err(CamelError::RouteError(
170 "canonical contract violation: route_id cannot be empty".to_string(),
171 ));
172 }
173 if self.from.trim().is_empty() {
174 return Err(CamelError::RouteError(
175 "canonical contract violation: from cannot be empty".to_string(),
176 ));
177 }
178 if self.version != CANONICAL_CONTRACT_VERSION {
179 return Err(CamelError::RouteError(format!(
180 "canonical contract violation: expected version {}, got {}",
181 CANONICAL_CONTRACT_VERSION, self.version
182 )));
183 }
184 validate_steps(&self.steps)?;
185 if let Some(cb) = &self.circuit_breaker {
186 if cb.failure_threshold == 0 {
187 return Err(CamelError::RouteError(
188 "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
189 .to_string(),
190 ));
191 }
192 if cb.open_duration_ms == 0 {
193 return Err(CamelError::RouteError(
194 "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
195 .to_string(),
196 ));
197 }
198 }
199 Ok(())
200 }
201}
202
203fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
204 for step in steps {
205 match step {
206 CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
207 if uri.trim().is_empty() {
208 return Err(CamelError::RouteError(
209 "canonical contract violation: endpoint uri cannot be empty".to_string(),
210 ));
211 }
212 }
213 CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
214 CanonicalStepSpec::Choice { whens, otherwise } => {
215 for when in whens {
216 validate_steps(&when.steps)?;
217 }
218 if let Some(otherwise) = otherwise {
219 validate_steps(otherwise)?;
220 }
221 }
222 CanonicalStepSpec::Split {
223 parallel_limit,
224 steps,
225 ..
226 } => {
227 if let Some(limit) = parallel_limit
228 && *limit == 0
229 {
230 return Err(CamelError::RouteError(
231 "canonical contract violation: split.parallel_limit must be > 0"
232 .to_string(),
233 ));
234 }
235 validate_steps(steps)?;
236 }
237 CanonicalStepSpec::Aggregate { config } => {
238 if config.header.trim().is_empty() {
239 return Err(CamelError::RouteError(
240 "canonical contract violation: aggregate.header cannot be empty"
241 .to_string(),
242 ));
243 }
244 if let Some(size) = config.completion_size
245 && size == 0
246 {
247 return Err(CamelError::RouteError(
248 "canonical contract violation: aggregate.completion_size must be > 0"
249 .to_string(),
250 ));
251 }
252 }
253 CanonicalStepSpec::Log { .. }
254 | CanonicalStepSpec::Script { .. }
255 | CanonicalStepSpec::Stop => {}
256 }
257 }
258 Ok(())
259}
260
261#[derive(Debug, Clone, PartialEq, Eq)]
262pub enum RuntimeCommand {
263 RegisterRoute {
264 spec: CanonicalRouteSpec,
265 command_id: String,
266 causation_id: Option<String>,
267 },
268 StartRoute {
269 route_id: String,
270 command_id: String,
271 causation_id: Option<String>,
272 },
273 StopRoute {
274 route_id: String,
275 command_id: String,
276 causation_id: Option<String>,
277 },
278 SuspendRoute {
279 route_id: String,
280 command_id: String,
281 causation_id: Option<String>,
282 },
283 ResumeRoute {
284 route_id: String,
285 command_id: String,
286 causation_id: Option<String>,
287 },
288 ReloadRoute {
289 route_id: String,
290 command_id: String,
291 causation_id: Option<String>,
292 },
293 FailRoute {
297 route_id: String,
298 error: String,
299 command_id: String,
300 causation_id: Option<String>,
301 },
302 RemoveRoute {
303 route_id: String,
304 command_id: String,
305 causation_id: Option<String>,
306 },
307}
308
309impl RuntimeCommand {
310 pub fn command_id(&self) -> &str {
311 match self {
312 RuntimeCommand::RegisterRoute { command_id, .. }
313 | RuntimeCommand::StartRoute { command_id, .. }
314 | RuntimeCommand::StopRoute { command_id, .. }
315 | RuntimeCommand::SuspendRoute { command_id, .. }
316 | RuntimeCommand::ResumeRoute { command_id, .. }
317 | RuntimeCommand::ReloadRoute { command_id, .. }
318 | RuntimeCommand::FailRoute { command_id, .. }
319 | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
320 }
321 }
322
323 pub fn causation_id(&self) -> Option<&str> {
324 match self {
325 RuntimeCommand::RegisterRoute { causation_id, .. }
326 | RuntimeCommand::StartRoute { causation_id, .. }
327 | RuntimeCommand::StopRoute { causation_id, .. }
328 | RuntimeCommand::SuspendRoute { causation_id, .. }
329 | RuntimeCommand::ResumeRoute { causation_id, .. }
330 | RuntimeCommand::ReloadRoute { causation_id, .. }
331 | RuntimeCommand::FailRoute { causation_id, .. }
332 | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
333 }
334 }
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub enum RuntimeCommandResult {
339 Accepted,
340 Duplicate { command_id: String },
341 RouteRegistered { route_id: String },
342 RouteStateChanged { route_id: String, status: String },
343}
344
345#[derive(Debug, Clone, PartialEq, Eq)]
346pub enum RuntimeQuery {
347 GetRouteStatus {
348 route_id: String,
349 },
350 InFlightCount {
354 route_id: String,
355 },
356 ListRoutes,
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub enum RuntimeQueryResult {
361 InFlightCount { route_id: String, count: u64 },
362 RouteNotFound { route_id: String },
363 RouteStatus { route_id: String, status: String },
364 Routes { route_ids: Vec<String> },
365}
366
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
368pub enum RuntimeEvent {
369 RouteRegistered { route_id: String },
370 RouteStartRequested { route_id: String },
371 RouteStarted { route_id: String },
372 RouteFailed { route_id: String, error: String },
373 RouteStopped { route_id: String },
374 RouteSuspended { route_id: String },
375 RouteResumed { route_id: String },
376 RouteReloaded { route_id: String },
377 RouteRemoved { route_id: String },
378}
379
380#[async_trait]
381pub trait RuntimeCommandBus: Send + Sync {
382 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
383}
384
385#[async_trait]
386pub trait RuntimeQueryBus: Send + Sync {
387 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
388}
389
390pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
391
392impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn command_and_query_ids_are_exposed() {
400 let cmd = RuntimeCommand::StartRoute {
401 route_id: "r1".into(),
402 command_id: "c1".into(),
403 causation_id: None,
404 };
405 assert_eq!(cmd.command_id(), "c1");
406 }
407
408 #[test]
409 fn canonical_spec_requires_route_id_and_from() {
410 let spec = CanonicalRouteSpec::new("r1", "timer:tick");
411 assert_eq!(spec.route_id, "r1");
412 assert_eq!(spec.from, "timer:tick");
413 assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
414 assert!(spec.steps.is_empty());
415 assert!(spec.circuit_breaker.is_none());
416 }
417
418 #[test]
419 fn canonical_contract_rejects_invalid_version() {
420 let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
421 spec.version = 2;
422 let err = spec.validate_contract().unwrap_err().to_string();
423 assert!(err.contains("expected version"));
424 }
425
426 #[test]
427 fn canonical_contract_declares_subset_scope() {
428 assert!(canonical_contract_supports_step("to"));
429 assert!(canonical_contract_supports_step("split"));
430 assert!(!canonical_contract_supports_step("set_header"));
431
432 assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
433 assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
434 assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
435 }
436
437 #[test]
438 fn canonical_contract_rejection_reason_is_explicit() {
439 let set_header_reason = canonical_contract_rejection_reason("set_header")
440 .expect("set_header should have explicit reason");
441 assert!(set_header_reason.contains("out-of-scope"));
442
443 let processor_reason = canonical_contract_rejection_reason("processor")
444 .expect("processor should be rust-only");
445 assert!(processor_reason.contains("rust-only"));
446
447 let split_reason = canonical_contract_rejection_reason("split")
448 .expect("split should require declarative form");
449 assert!(split_reason.contains("declarative"));
450 }
451
452 #[test]
453 fn command_causation_id_is_exposed() {
454 let cmd = RuntimeCommand::StopRoute {
455 route_id: "r1".into(),
456 command_id: "c2".into(),
457 causation_id: Some("c1".into()),
458 };
459 assert_eq!(cmd.command_id(), "c2");
460 assert_eq!(cmd.causation_id(), Some("c1"));
461 }
462}