Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19];
20pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
21    &["script", "filter", "choice", "split"];
22pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
23    "set_header",
24    "set_body",
25    "multicast",
26    "convert_body_to",
27    "bean",
28    "marshal",
29    "unmarshal",
30];
31pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
32    "processor",
33    "process",
34    "process_fn",
35    "map_body",
36    "set_body_fn",
37    "set_header_fn",
38];
39
40pub fn canonical_contract_supports_step(step: &str) -> bool {
41    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
42}
43
44pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
45    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
46        return Some(
47            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
48        );
49    }
50
51    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
52        return Some("rust-only programmable step; not representable in canonical v1 contract");
53    }
54
55    if canonical_contract_supports_step(step)
56        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
57    {
58        return Some(
59            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
60        );
61    }
62
63    None
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct CanonicalRouteSpec {
68    /// Stable minimal route representation for runtime command registration.
69    ///
70    /// Scope note:
71    /// - This is intentionally a partial model (v1) and does not mirror every `BuilderStep`.
72    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
73    pub route_id: String,
74    pub from: String,
75    pub steps: Vec<CanonicalStepSpec>,
76    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
77    pub version: u32,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum CanonicalStepSpec {
82    To {
83        uri: String,
84    },
85    Log {
86        message: String,
87    },
88    WireTap {
89        uri: String,
90    },
91    Script {
92        expression: LanguageExpressionDef,
93    },
94    Filter {
95        predicate: LanguageExpressionDef,
96        steps: Vec<CanonicalStepSpec>,
97    },
98    Choice {
99        whens: Vec<CanonicalWhenSpec>,
100        otherwise: Option<Vec<CanonicalStepSpec>>,
101    },
102    Split {
103        expression: CanonicalSplitExpressionSpec,
104        aggregation: CanonicalSplitAggregationSpec,
105        parallel: bool,
106        parallel_limit: Option<usize>,
107        stop_on_exception: bool,
108        steps: Vec<CanonicalStepSpec>,
109    },
110    Aggregate {
111        config: CanonicalAggregateSpec,
112    },
113    Stop,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct CanonicalWhenSpec {
118    pub predicate: LanguageExpressionDef,
119    pub steps: Vec<CanonicalStepSpec>,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CanonicalSplitExpressionSpec {
124    BodyLines,
125    BodyJsonArray,
126    Language(LanguageExpressionDef),
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub enum CanonicalSplitAggregationSpec {
131    LastWins,
132    CollectAll,
133    Original,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CanonicalAggregateStrategySpec {
138    CollectAll,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct CanonicalAggregateSpec {
143    pub header: String,
144    pub completion_size: Option<usize>,
145    pub strategy: CanonicalAggregateStrategySpec,
146    pub max_buckets: Option<usize>,
147    pub bucket_ttl_ms: Option<u64>,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct CanonicalCircuitBreakerSpec {
152    pub failure_threshold: u32,
153    pub open_duration_ms: u64,
154}
155
156impl CanonicalRouteSpec {
157    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
158        Self {
159            route_id: route_id.into(),
160            from: from.into(),
161            steps: Vec::new(),
162            circuit_breaker: None,
163            version: CANONICAL_CONTRACT_VERSION,
164        }
165    }
166
167    pub fn validate_contract(&self) -> Result<(), CamelError> {
168        if self.route_id.trim().is_empty() {
169            return Err(CamelError::RouteError(
170                "canonical contract violation: route_id cannot be empty".to_string(),
171            ));
172        }
173        if self.from.trim().is_empty() {
174            return Err(CamelError::RouteError(
175                "canonical contract violation: from cannot be empty".to_string(),
176            ));
177        }
178        if self.version != CANONICAL_CONTRACT_VERSION {
179            return Err(CamelError::RouteError(format!(
180                "canonical contract violation: expected version {}, got {}",
181                CANONICAL_CONTRACT_VERSION, self.version
182            )));
183        }
184        validate_steps(&self.steps)?;
185        if let Some(cb) = &self.circuit_breaker {
186            if cb.failure_threshold == 0 {
187                return Err(CamelError::RouteError(
188                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
189                        .to_string(),
190                ));
191            }
192            if cb.open_duration_ms == 0 {
193                return Err(CamelError::RouteError(
194                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
195                        .to_string(),
196                ));
197            }
198        }
199        Ok(())
200    }
201}
202
203fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
204    for step in steps {
205        match step {
206            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
207                if uri.trim().is_empty() {
208                    return Err(CamelError::RouteError(
209                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
210                    ));
211                }
212            }
213            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
214            CanonicalStepSpec::Choice { whens, otherwise } => {
215                for when in whens {
216                    validate_steps(&when.steps)?;
217                }
218                if let Some(otherwise) = otherwise {
219                    validate_steps(otherwise)?;
220                }
221            }
222            CanonicalStepSpec::Split {
223                parallel_limit,
224                steps,
225                ..
226            } => {
227                if let Some(limit) = parallel_limit
228                    && *limit == 0
229                {
230                    return Err(CamelError::RouteError(
231                        "canonical contract violation: split.parallel_limit must be > 0"
232                            .to_string(),
233                    ));
234                }
235                validate_steps(steps)?;
236            }
237            CanonicalStepSpec::Aggregate { config } => {
238                if config.header.trim().is_empty() {
239                    return Err(CamelError::RouteError(
240                        "canonical contract violation: aggregate.header cannot be empty"
241                            .to_string(),
242                    ));
243                }
244                if let Some(size) = config.completion_size
245                    && size == 0
246                {
247                    return Err(CamelError::RouteError(
248                        "canonical contract violation: aggregate.completion_size must be > 0"
249                            .to_string(),
250                    ));
251                }
252            }
253            CanonicalStepSpec::Log { .. }
254            | CanonicalStepSpec::Script { .. }
255            | CanonicalStepSpec::Stop => {}
256        }
257    }
258    Ok(())
259}
260
261#[derive(Debug, Clone, PartialEq, Eq)]
262pub enum RuntimeCommand {
263    RegisterRoute {
264        spec: CanonicalRouteSpec,
265        command_id: String,
266        causation_id: Option<String>,
267    },
268    StartRoute {
269        route_id: String,
270        command_id: String,
271        causation_id: Option<String>,
272    },
273    StopRoute {
274        route_id: String,
275        command_id: String,
276        causation_id: Option<String>,
277    },
278    SuspendRoute {
279        route_id: String,
280        command_id: String,
281        causation_id: Option<String>,
282    },
283    ResumeRoute {
284        route_id: String,
285        command_id: String,
286        causation_id: Option<String>,
287    },
288    ReloadRoute {
289        route_id: String,
290        command_id: String,
291        causation_id: Option<String>,
292    },
293    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
294    ///
295    /// This keeps aggregate/projection state aligned with controller-observed failures.
296    FailRoute {
297        route_id: String,
298        error: String,
299        command_id: String,
300        causation_id: Option<String>,
301    },
302    RemoveRoute {
303        route_id: String,
304        command_id: String,
305        causation_id: Option<String>,
306    },
307}
308
309impl RuntimeCommand {
310    pub fn command_id(&self) -> &str {
311        match self {
312            RuntimeCommand::RegisterRoute { command_id, .. }
313            | RuntimeCommand::StartRoute { command_id, .. }
314            | RuntimeCommand::StopRoute { command_id, .. }
315            | RuntimeCommand::SuspendRoute { command_id, .. }
316            | RuntimeCommand::ResumeRoute { command_id, .. }
317            | RuntimeCommand::ReloadRoute { command_id, .. }
318            | RuntimeCommand::FailRoute { command_id, .. }
319            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
320        }
321    }
322
323    pub fn causation_id(&self) -> Option<&str> {
324        match self {
325            RuntimeCommand::RegisterRoute { causation_id, .. }
326            | RuntimeCommand::StartRoute { causation_id, .. }
327            | RuntimeCommand::StopRoute { causation_id, .. }
328            | RuntimeCommand::SuspendRoute { causation_id, .. }
329            | RuntimeCommand::ResumeRoute { causation_id, .. }
330            | RuntimeCommand::ReloadRoute { causation_id, .. }
331            | RuntimeCommand::FailRoute { causation_id, .. }
332            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
333        }
334    }
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub enum RuntimeCommandResult {
339    Accepted,
340    Duplicate { command_id: String },
341    RouteRegistered { route_id: String },
342    RouteStateChanged { route_id: String, status: String },
343}
344
345#[derive(Debug, Clone, PartialEq, Eq)]
346pub enum RuntimeQuery {
347    GetRouteStatus {
348        route_id: String,
349    },
350    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
351    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
352    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
353    InFlightCount {
354        route_id: String,
355    },
356    ListRoutes,
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub enum RuntimeQueryResult {
361    InFlightCount { route_id: String, count: u64 },
362    RouteNotFound { route_id: String },
363    RouteStatus { route_id: String, status: String },
364    Routes { route_ids: Vec<String> },
365}
366
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
368pub enum RuntimeEvent {
369    RouteRegistered { route_id: String },
370    RouteStartRequested { route_id: String },
371    RouteStarted { route_id: String },
372    RouteFailed { route_id: String, error: String },
373    RouteStopped { route_id: String },
374    RouteSuspended { route_id: String },
375    RouteResumed { route_id: String },
376    RouteReloaded { route_id: String },
377    RouteRemoved { route_id: String },
378}
379
380#[async_trait]
381pub trait RuntimeCommandBus: Send + Sync {
382    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
383}
384
385#[async_trait]
386pub trait RuntimeQueryBus: Send + Sync {
387    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
388}
389
390pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
391
392impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn command_and_query_ids_are_exposed() {
400        let cmd = RuntimeCommand::StartRoute {
401            route_id: "r1".into(),
402            command_id: "c1".into(),
403            causation_id: None,
404        };
405        assert_eq!(cmd.command_id(), "c1");
406    }
407
408    #[test]
409    fn canonical_spec_requires_route_id_and_from() {
410        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
411        assert_eq!(spec.route_id, "r1");
412        assert_eq!(spec.from, "timer:tick");
413        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
414        assert!(spec.steps.is_empty());
415        assert!(spec.circuit_breaker.is_none());
416    }
417
418    #[test]
419    fn canonical_contract_rejects_invalid_version() {
420        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
421        spec.version = 2;
422        let err = spec.validate_contract().unwrap_err().to_string();
423        assert!(err.contains("expected version"));
424    }
425
426    #[test]
427    fn canonical_contract_declares_subset_scope() {
428        assert!(canonical_contract_supports_step("to"));
429        assert!(canonical_contract_supports_step("split"));
430        assert!(!canonical_contract_supports_step("set_header"));
431
432        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
433        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
434        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
435    }
436
437    #[test]
438    fn canonical_contract_rejection_reason_is_explicit() {
439        let set_header_reason = canonical_contract_rejection_reason("set_header")
440            .expect("set_header should have explicit reason");
441        assert!(set_header_reason.contains("out-of-scope"));
442
443        let processor_reason = canonical_contract_rejection_reason("processor")
444            .expect("processor should be rust-only");
445        assert!(processor_reason.contains("rust-only"));
446
447        let split_reason = canonical_contract_rejection_reason("split")
448            .expect("split should require declarative form");
449        assert!(split_reason.contains("declarative"));
450    }
451
452    #[test]
453    fn command_causation_id_is_exposed() {
454        let cmd = RuntimeCommand::StopRoute {
455            route_id: "r1".into(),
456            command_id: "c2".into(),
457            causation_id: Some("c1".into()),
458        };
459        assert_eq!(cmd.command_id(), "c2");
460        assert_eq!(cmd.causation_id(), Some("c1"));
461    }
462}