Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19];
20pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
21    &["script", "filter", "choice", "split"];
22pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
23    "set_header",
24    "set_body",
25    "multicast",
26    "convert_body_to",
27    "bean",
28];
29pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
30    "processor",
31    "process",
32    "process_fn",
33    "map_body",
34    "set_body_fn",
35    "set_header_fn",
36];
37
38pub fn canonical_contract_supports_step(step: &str) -> bool {
39    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
40}
41
42pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
43    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
44        return Some(
45            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
46        );
47    }
48
49    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
50        return Some("rust-only programmable step; not representable in canonical v1 contract");
51    }
52
53    if canonical_contract_supports_step(step)
54        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
55    {
56        return Some(
57            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
58        );
59    }
60
61    None
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct CanonicalRouteSpec {
66    /// Stable minimal route representation for runtime command registration.
67    ///
68    /// Scope note:
69    /// - This is intentionally a partial model (v1) and does not mirror every `BuilderStep`.
70    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
71    pub route_id: String,
72    pub from: String,
73    pub steps: Vec<CanonicalStepSpec>,
74    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
75    pub version: u32,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum CanonicalStepSpec {
80    To {
81        uri: String,
82    },
83    Log {
84        message: String,
85    },
86    WireTap {
87        uri: String,
88    },
89    Script {
90        expression: LanguageExpressionDef,
91    },
92    Filter {
93        predicate: LanguageExpressionDef,
94        steps: Vec<CanonicalStepSpec>,
95    },
96    Choice {
97        whens: Vec<CanonicalWhenSpec>,
98        otherwise: Option<Vec<CanonicalStepSpec>>,
99    },
100    Split {
101        expression: CanonicalSplitExpressionSpec,
102        aggregation: CanonicalSplitAggregationSpec,
103        parallel: bool,
104        parallel_limit: Option<usize>,
105        stop_on_exception: bool,
106        steps: Vec<CanonicalStepSpec>,
107    },
108    Aggregate {
109        config: CanonicalAggregateSpec,
110    },
111    Stop,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct CanonicalWhenSpec {
116    pub predicate: LanguageExpressionDef,
117    pub steps: Vec<CanonicalStepSpec>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum CanonicalSplitExpressionSpec {
122    BodyLines,
123    BodyJsonArray,
124    Language(LanguageExpressionDef),
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum CanonicalSplitAggregationSpec {
129    LastWins,
130    CollectAll,
131    Original,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub enum CanonicalAggregateStrategySpec {
136    CollectAll,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct CanonicalAggregateSpec {
141    pub header: String,
142    pub completion_size: Option<usize>,
143    pub strategy: CanonicalAggregateStrategySpec,
144    pub max_buckets: Option<usize>,
145    pub bucket_ttl_ms: Option<u64>,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct CanonicalCircuitBreakerSpec {
150    pub failure_threshold: u32,
151    pub open_duration_ms: u64,
152}
153
154impl CanonicalRouteSpec {
155    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
156        Self {
157            route_id: route_id.into(),
158            from: from.into(),
159            steps: Vec::new(),
160            circuit_breaker: None,
161            version: CANONICAL_CONTRACT_VERSION,
162        }
163    }
164
165    pub fn validate_contract(&self) -> Result<(), CamelError> {
166        if self.route_id.trim().is_empty() {
167            return Err(CamelError::RouteError(
168                "canonical contract violation: route_id cannot be empty".to_string(),
169            ));
170        }
171        if self.from.trim().is_empty() {
172            return Err(CamelError::RouteError(
173                "canonical contract violation: from cannot be empty".to_string(),
174            ));
175        }
176        if self.version != CANONICAL_CONTRACT_VERSION {
177            return Err(CamelError::RouteError(format!(
178                "canonical contract violation: expected version {}, got {}",
179                CANONICAL_CONTRACT_VERSION, self.version
180            )));
181        }
182        validate_steps(&self.steps)?;
183        if let Some(cb) = &self.circuit_breaker {
184            if cb.failure_threshold == 0 {
185                return Err(CamelError::RouteError(
186                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
187                        .to_string(),
188                ));
189            }
190            if cb.open_duration_ms == 0 {
191                return Err(CamelError::RouteError(
192                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
193                        .to_string(),
194                ));
195            }
196        }
197        Ok(())
198    }
199}
200
201fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
202    for step in steps {
203        match step {
204            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
205                if uri.trim().is_empty() {
206                    return Err(CamelError::RouteError(
207                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
208                    ));
209                }
210            }
211            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
212            CanonicalStepSpec::Choice { whens, otherwise } => {
213                for when in whens {
214                    validate_steps(&when.steps)?;
215                }
216                if let Some(otherwise) = otherwise {
217                    validate_steps(otherwise)?;
218                }
219            }
220            CanonicalStepSpec::Split {
221                parallel_limit,
222                steps,
223                ..
224            } => {
225                if let Some(limit) = parallel_limit
226                    && *limit == 0
227                {
228                    return Err(CamelError::RouteError(
229                        "canonical contract violation: split.parallel_limit must be > 0"
230                            .to_string(),
231                    ));
232                }
233                validate_steps(steps)?;
234            }
235            CanonicalStepSpec::Aggregate { config } => {
236                if config.header.trim().is_empty() {
237                    return Err(CamelError::RouteError(
238                        "canonical contract violation: aggregate.header cannot be empty"
239                            .to_string(),
240                    ));
241                }
242                if let Some(size) = config.completion_size
243                    && size == 0
244                {
245                    return Err(CamelError::RouteError(
246                        "canonical contract violation: aggregate.completion_size must be > 0"
247                            .to_string(),
248                    ));
249                }
250            }
251            CanonicalStepSpec::Log { .. }
252            | CanonicalStepSpec::Script { .. }
253            | CanonicalStepSpec::Stop => {}
254        }
255    }
256    Ok(())
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub enum RuntimeCommand {
261    RegisterRoute {
262        spec: CanonicalRouteSpec,
263        command_id: String,
264        causation_id: Option<String>,
265    },
266    StartRoute {
267        route_id: String,
268        command_id: String,
269        causation_id: Option<String>,
270    },
271    StopRoute {
272        route_id: String,
273        command_id: String,
274        causation_id: Option<String>,
275    },
276    SuspendRoute {
277        route_id: String,
278        command_id: String,
279        causation_id: Option<String>,
280    },
281    ResumeRoute {
282        route_id: String,
283        command_id: String,
284        causation_id: Option<String>,
285    },
286    ReloadRoute {
287        route_id: String,
288        command_id: String,
289        causation_id: Option<String>,
290    },
291    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
292    ///
293    /// This keeps aggregate/projection state aligned with controller-observed failures.
294    FailRoute {
295        route_id: String,
296        error: String,
297        command_id: String,
298        causation_id: Option<String>,
299    },
300    RemoveRoute {
301        route_id: String,
302        command_id: String,
303        causation_id: Option<String>,
304    },
305}
306
307impl RuntimeCommand {
308    pub fn command_id(&self) -> &str {
309        match self {
310            RuntimeCommand::RegisterRoute { command_id, .. }
311            | RuntimeCommand::StartRoute { command_id, .. }
312            | RuntimeCommand::StopRoute { command_id, .. }
313            | RuntimeCommand::SuspendRoute { command_id, .. }
314            | RuntimeCommand::ResumeRoute { command_id, .. }
315            | RuntimeCommand::ReloadRoute { command_id, .. }
316            | RuntimeCommand::FailRoute { command_id, .. }
317            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
318        }
319    }
320
321    pub fn causation_id(&self) -> Option<&str> {
322        match self {
323            RuntimeCommand::RegisterRoute { causation_id, .. }
324            | RuntimeCommand::StartRoute { causation_id, .. }
325            | RuntimeCommand::StopRoute { causation_id, .. }
326            | RuntimeCommand::SuspendRoute { causation_id, .. }
327            | RuntimeCommand::ResumeRoute { causation_id, .. }
328            | RuntimeCommand::ReloadRoute { causation_id, .. }
329            | RuntimeCommand::FailRoute { causation_id, .. }
330            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
331        }
332    }
333}
334
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub enum RuntimeCommandResult {
337    Accepted,
338    Duplicate { command_id: String },
339    RouteRegistered { route_id: String },
340    RouteStateChanged { route_id: String, status: String },
341}
342
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub enum RuntimeQuery {
345    GetRouteStatus { route_id: String },
346    ListRoutes,
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub enum RuntimeQueryResult {
351    RouteStatus { route_id: String, status: String },
352    Routes { route_ids: Vec<String> },
353}
354
355#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
356pub enum RuntimeEvent {
357    RouteRegistered { route_id: String },
358    RouteStartRequested { route_id: String },
359    RouteStarted { route_id: String },
360    RouteFailed { route_id: String, error: String },
361    RouteStopped { route_id: String },
362    RouteSuspended { route_id: String },
363    RouteResumed { route_id: String },
364    RouteReloaded { route_id: String },
365    RouteRemoved { route_id: String },
366}
367
368#[async_trait]
369pub trait RuntimeCommandBus: Send + Sync {
370    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
371}
372
373#[async_trait]
374pub trait RuntimeQueryBus: Send + Sync {
375    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
376}
377
378pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
379
380impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    #[test]
387    fn command_and_query_ids_are_exposed() {
388        let cmd = RuntimeCommand::StartRoute {
389            route_id: "r1".into(),
390            command_id: "c1".into(),
391            causation_id: None,
392        };
393        assert_eq!(cmd.command_id(), "c1");
394    }
395
396    #[test]
397    fn canonical_spec_requires_route_id_and_from() {
398        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
399        assert_eq!(spec.route_id, "r1");
400        assert_eq!(spec.from, "timer:tick");
401        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
402        assert!(spec.steps.is_empty());
403        assert!(spec.circuit_breaker.is_none());
404    }
405
406    #[test]
407    fn canonical_contract_rejects_invalid_version() {
408        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
409        spec.version = 2;
410        let err = spec.validate_contract().unwrap_err().to_string();
411        assert!(err.contains("expected version"));
412    }
413
414    #[test]
415    fn canonical_contract_declares_subset_scope() {
416        assert!(canonical_contract_supports_step("to"));
417        assert!(canonical_contract_supports_step("split"));
418        assert!(!canonical_contract_supports_step("set_header"));
419
420        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
421        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
422        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
423    }
424
425    #[test]
426    fn canonical_contract_rejection_reason_is_explicit() {
427        let set_header_reason = canonical_contract_rejection_reason("set_header")
428            .expect("set_header should have explicit reason");
429        assert!(set_header_reason.contains("out-of-scope"));
430
431        let processor_reason = canonical_contract_rejection_reason("processor")
432            .expect("processor should be rust-only");
433        assert!(processor_reason.contains("rust-only"));
434
435        let split_reason = canonical_contract_rejection_reason("split")
436            .expect("split should require declarative form");
437        assert!(split_reason.contains("declarative"));
438    }
439
440    #[test]
441    fn command_causation_id_is_exposed() {
442        let cmd = RuntimeCommand::StopRoute {
443            route_id: "r1".into(),
444            command_id: "c2".into(),
445            causation_id: Some("c1".into()),
446        };
447        assert_eq!(cmd.command_id(), "c2");
448        assert_eq!(cmd.causation_id(), Some("c1"));
449    }
450}