Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19];
20pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
21    &["script", "filter", "choice", "split"];
22pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
23    "set_header",
24    "set_body",
25    "multicast",
26    "convert_body_to",
27    "bean",
28    "marshal",
29    "unmarshal",
30];
31pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
32    "processor",
33    "process",
34    "process_fn",
35    "map_body",
36    "set_body_fn",
37    "set_header_fn",
38];
39
40pub fn canonical_contract_supports_step(step: &str) -> bool {
41    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
42}
43
44pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
45    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
46        return Some(
47            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
48        );
49    }
50
51    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
52        return Some("rust-only programmable step; not representable in canonical v1 contract");
53    }
54
55    if canonical_contract_supports_step(step)
56        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
57    {
58        return Some(
59            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
60        );
61    }
62
63    None
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct CanonicalRouteSpec {
68    /// Stable minimal route representation for runtime command registration.
69    ///
70    /// Scope note:
71    /// - This is intentionally a partial model (v1) and does not mirror every `BuilderStep`.
72    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
73    pub route_id: String,
74    pub from: String,
75    pub steps: Vec<CanonicalStepSpec>,
76    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
77    pub version: u32,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum CanonicalStepSpec {
82    To {
83        uri: String,
84    },
85    Log {
86        message: String,
87    },
88    WireTap {
89        uri: String,
90    },
91    Script {
92        expression: LanguageExpressionDef,
93    },
94    Filter {
95        predicate: LanguageExpressionDef,
96        steps: Vec<CanonicalStepSpec>,
97    },
98    Choice {
99        whens: Vec<CanonicalWhenSpec>,
100        otherwise: Option<Vec<CanonicalStepSpec>>,
101    },
102    Split {
103        expression: CanonicalSplitExpressionSpec,
104        aggregation: CanonicalSplitAggregationSpec,
105        parallel: bool,
106        parallel_limit: Option<usize>,
107        stop_on_exception: bool,
108        steps: Vec<CanonicalStepSpec>,
109    },
110    Aggregate {
111        config: CanonicalAggregateSpec,
112    },
113    Stop,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct CanonicalWhenSpec {
118    pub predicate: LanguageExpressionDef,
119    pub steps: Vec<CanonicalStepSpec>,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CanonicalSplitExpressionSpec {
124    BodyLines,
125    BodyJsonArray,
126    Language(LanguageExpressionDef),
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub enum CanonicalSplitAggregationSpec {
131    LastWins,
132    CollectAll,
133    Original,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CanonicalAggregateStrategySpec {
138    CollectAll,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct CanonicalAggregateSpec {
143    pub header: String,
144    pub completion_size: Option<usize>,
145    pub completion_timeout_ms: Option<u64>,
146    pub correlation_key: Option<String>,
147    pub force_completion_on_stop: Option<bool>,
148    pub discard_on_timeout: Option<bool>,
149    pub strategy: CanonicalAggregateStrategySpec,
150    pub max_buckets: Option<usize>,
151    pub bucket_ttl_ms: Option<u64>,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct CanonicalCircuitBreakerSpec {
156    pub failure_threshold: u32,
157    pub open_duration_ms: u64,
158}
159
160impl CanonicalRouteSpec {
161    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
162        Self {
163            route_id: route_id.into(),
164            from: from.into(),
165            steps: Vec::new(),
166            circuit_breaker: None,
167            version: CANONICAL_CONTRACT_VERSION,
168        }
169    }
170
171    pub fn validate_contract(&self) -> Result<(), CamelError> {
172        if self.route_id.trim().is_empty() {
173            return Err(CamelError::RouteError(
174                "canonical contract violation: route_id cannot be empty".to_string(),
175            ));
176        }
177        if self.from.trim().is_empty() {
178            return Err(CamelError::RouteError(
179                "canonical contract violation: from cannot be empty".to_string(),
180            ));
181        }
182        if self.version != CANONICAL_CONTRACT_VERSION {
183            return Err(CamelError::RouteError(format!(
184                "canonical contract violation: expected version {}, got {}",
185                CANONICAL_CONTRACT_VERSION, self.version
186            )));
187        }
188        validate_steps(&self.steps)?;
189        if let Some(cb) = &self.circuit_breaker {
190            if cb.failure_threshold == 0 {
191                return Err(CamelError::RouteError(
192                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
193                        .to_string(),
194                ));
195            }
196            if cb.open_duration_ms == 0 {
197                return Err(CamelError::RouteError(
198                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
199                        .to_string(),
200                ));
201            }
202        }
203        Ok(())
204    }
205}
206
207fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
208    for step in steps {
209        match step {
210            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
211                if uri.trim().is_empty() {
212                    return Err(CamelError::RouteError(
213                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
214                    ));
215                }
216            }
217            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
218            CanonicalStepSpec::Choice { whens, otherwise } => {
219                for when in whens {
220                    validate_steps(&when.steps)?;
221                }
222                if let Some(otherwise) = otherwise {
223                    validate_steps(otherwise)?;
224                }
225            }
226            CanonicalStepSpec::Split {
227                parallel_limit,
228                steps,
229                ..
230            } => {
231                if let Some(limit) = parallel_limit
232                    && *limit == 0
233                {
234                    return Err(CamelError::RouteError(
235                        "canonical contract violation: split.parallel_limit must be > 0"
236                            .to_string(),
237                    ));
238                }
239                validate_steps(steps)?;
240            }
241            CanonicalStepSpec::Aggregate { config } => {
242                if config.header.trim().is_empty() {
243                    return Err(CamelError::RouteError(
244                        "canonical contract violation: aggregate.header cannot be empty"
245                            .to_string(),
246                    ));
247                }
248                if let Some(size) = config.completion_size
249                    && size == 0
250                {
251                    return Err(CamelError::RouteError(
252                        "canonical contract violation: aggregate.completion_size must be > 0"
253                            .to_string(),
254                    ));
255                }
256            }
257            CanonicalStepSpec::Log { .. }
258            | CanonicalStepSpec::Script { .. }
259            | CanonicalStepSpec::Stop => {}
260        }
261    }
262    Ok(())
263}
264
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub enum RuntimeCommand {
267    RegisterRoute {
268        spec: CanonicalRouteSpec,
269        command_id: String,
270        causation_id: Option<String>,
271    },
272    StartRoute {
273        route_id: String,
274        command_id: String,
275        causation_id: Option<String>,
276    },
277    StopRoute {
278        route_id: String,
279        command_id: String,
280        causation_id: Option<String>,
281    },
282    SuspendRoute {
283        route_id: String,
284        command_id: String,
285        causation_id: Option<String>,
286    },
287    ResumeRoute {
288        route_id: String,
289        command_id: String,
290        causation_id: Option<String>,
291    },
292    ReloadRoute {
293        route_id: String,
294        command_id: String,
295        causation_id: Option<String>,
296    },
297    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
298    ///
299    /// This keeps aggregate/projection state aligned with controller-observed failures.
300    FailRoute {
301        route_id: String,
302        error: String,
303        command_id: String,
304        causation_id: Option<String>,
305    },
306    RemoveRoute {
307        route_id: String,
308        command_id: String,
309        causation_id: Option<String>,
310    },
311}
312
313impl RuntimeCommand {
314    pub fn command_id(&self) -> &str {
315        match self {
316            RuntimeCommand::RegisterRoute { command_id, .. }
317            | RuntimeCommand::StartRoute { command_id, .. }
318            | RuntimeCommand::StopRoute { command_id, .. }
319            | RuntimeCommand::SuspendRoute { command_id, .. }
320            | RuntimeCommand::ResumeRoute { command_id, .. }
321            | RuntimeCommand::ReloadRoute { command_id, .. }
322            | RuntimeCommand::FailRoute { command_id, .. }
323            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
324        }
325    }
326
327    pub fn causation_id(&self) -> Option<&str> {
328        match self {
329            RuntimeCommand::RegisterRoute { causation_id, .. }
330            | RuntimeCommand::StartRoute { causation_id, .. }
331            | RuntimeCommand::StopRoute { causation_id, .. }
332            | RuntimeCommand::SuspendRoute { causation_id, .. }
333            | RuntimeCommand::ResumeRoute { causation_id, .. }
334            | RuntimeCommand::ReloadRoute { causation_id, .. }
335            | RuntimeCommand::FailRoute { causation_id, .. }
336            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
337        }
338    }
339}
340
341#[derive(Debug, Clone, PartialEq, Eq)]
342pub enum RuntimeCommandResult {
343    Accepted,
344    Duplicate { command_id: String },
345    RouteRegistered { route_id: String },
346    RouteStateChanged { route_id: String, status: String },
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub enum RuntimeQuery {
351    GetRouteStatus {
352        route_id: String,
353    },
354    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
355    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
356    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
357    InFlightCount {
358        route_id: String,
359    },
360    ListRoutes,
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub enum RuntimeQueryResult {
365    InFlightCount { route_id: String, count: u64 },
366    RouteNotFound { route_id: String },
367    RouteStatus { route_id: String, status: String },
368    Routes { route_ids: Vec<String> },
369}
370
371#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
372pub enum RuntimeEvent {
373    RouteRegistered { route_id: String },
374    RouteStartRequested { route_id: String },
375    RouteStarted { route_id: String },
376    RouteFailed { route_id: String, error: String },
377    RouteStopped { route_id: String },
378    RouteSuspended { route_id: String },
379    RouteResumed { route_id: String },
380    RouteReloaded { route_id: String },
381    RouteRemoved { route_id: String },
382}
383
384#[async_trait]
385pub trait RuntimeCommandBus: Send + Sync {
386    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
387}
388
389#[async_trait]
390pub trait RuntimeQueryBus: Send + Sync {
391    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
392}
393
394pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
395
396impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn command_and_query_ids_are_exposed() {
404        let cmd = RuntimeCommand::StartRoute {
405            route_id: "r1".into(),
406            command_id: "c1".into(),
407            causation_id: None,
408        };
409        assert_eq!(cmd.command_id(), "c1");
410    }
411
412    #[test]
413    fn canonical_spec_requires_route_id_and_from() {
414        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
415        assert_eq!(spec.route_id, "r1");
416        assert_eq!(spec.from, "timer:tick");
417        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
418        assert!(spec.steps.is_empty());
419        assert!(spec.circuit_breaker.is_none());
420    }
421
422    #[test]
423    fn canonical_contract_rejects_invalid_version() {
424        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
425        spec.version = 2;
426        let err = spec.validate_contract().unwrap_err().to_string();
427        assert!(err.contains("expected version"));
428    }
429
430    #[test]
431    fn canonical_contract_declares_subset_scope() {
432        assert!(canonical_contract_supports_step("to"));
433        assert!(canonical_contract_supports_step("split"));
434        assert!(!canonical_contract_supports_step("set_header"));
435
436        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
437        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
438        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
439    }
440
441    #[test]
442    fn canonical_contract_rejection_reason_is_explicit() {
443        let set_header_reason = canonical_contract_rejection_reason("set_header")
444            .expect("set_header should have explicit reason");
445        assert!(set_header_reason.contains("out-of-scope"));
446
447        let processor_reason = canonical_contract_rejection_reason("processor")
448            .expect("processor should be rust-only");
449        assert!(processor_reason.contains("rust-only"));
450
451        let split_reason = canonical_contract_rejection_reason("split")
452            .expect("split should require declarative form");
453        assert!(split_reason.contains("declarative"));
454    }
455
456    #[test]
457    fn command_causation_id_is_exposed() {
458        let cmd = RuntimeCommand::StopRoute {
459            route_id: "r1".into(),
460            command_id: "c2".into(),
461            causation_id: Some("c1".into()),
462        };
463        assert_eq!(cmd.command_id(), "c2");
464        assert_eq!(cmd.causation_id(), Some("c1"));
465    }
466}