Skip to main content

camel_api/
runtime.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3
4use crate::CamelError;
5use crate::declarative::LanguageExpressionDef;
6
7pub const CANONICAL_CONTRACT_NAME: &str = "canonical-v1";
8pub const CANONICAL_CONTRACT_VERSION: u32 = 1;
9pub const CANONICAL_CONTRACT_SUPPORTED_STEPS: &[&str] = &[
10    "to",
11    "log",
12    "wire_tap",
13    "script",
14    "filter",
15    "choice",
16    "split",
17    "aggregate",
18    "stop",
19    "delay",
20];
21pub const CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS: &[&str] =
22    &["script", "filter", "choice", "split"];
23pub const CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS: &[&str] = &[
24    "set_header",
25    "set_body",
26    "multicast",
27    "convert_body_to",
28    "bean",
29    "marshal",
30    "unmarshal",
31];
32pub const CANONICAL_CONTRACT_RUST_ONLY_STEPS: &[&str] = &[
33    "processor",
34    "process",
35    "process_fn",
36    "map_body",
37    "set_body_fn",
38    "set_header_fn",
39];
40
41pub fn canonical_contract_supports_step(step: &str) -> bool {
42    CANONICAL_CONTRACT_SUPPORTED_STEPS.contains(&step)
43}
44
45pub fn canonical_contract_rejection_reason(step: &str) -> Option<&'static str> {
46    if CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&step) {
47        return Some(
48            "declared out-of-scope for canonical v1; use declarative route compilation path outside CQRS canonical commands",
49        );
50    }
51
52    if CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&step) {
53        return Some("rust-only programmable step; not representable in canonical v1 contract");
54    }
55
56    if canonical_contract_supports_step(step)
57        && CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&step)
58    {
59        return Some(
60            "supported only as declarative/serializable expression form; closure/processor variants are outside canonical v1",
61        );
62    }
63
64    None
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct CanonicalRouteSpec {
69    /// Stable minimal route representation for runtime command registration.
70    ///
71    /// Scope note:
72    /// - This is intentionally a partial model (v1) and does not mirror every `BuilderStep`.
73    /// - Advanced EIPs continue to use the existing RouteDefinition/BuilderStep path.
74    pub route_id: String,
75    pub from: String,
76    pub steps: Vec<CanonicalStepSpec>,
77    pub circuit_breaker: Option<CanonicalCircuitBreakerSpec>,
78    pub version: u32,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub enum CanonicalStepSpec {
83    To {
84        uri: String,
85    },
86    Log {
87        message: String,
88    },
89    WireTap {
90        uri: String,
91    },
92    Script {
93        expression: LanguageExpressionDef,
94    },
95    Filter {
96        predicate: LanguageExpressionDef,
97        steps: Vec<CanonicalStepSpec>,
98    },
99    Choice {
100        whens: Vec<CanonicalWhenSpec>,
101        otherwise: Option<Vec<CanonicalStepSpec>>,
102    },
103    Split {
104        expression: CanonicalSplitExpressionSpec,
105        aggregation: CanonicalSplitAggregationSpec,
106        parallel: bool,
107        parallel_limit: Option<usize>,
108        stop_on_exception: bool,
109        steps: Vec<CanonicalStepSpec>,
110    },
111    Aggregate {
112        config: CanonicalAggregateSpec,
113    },
114    Stop,
115    Delay {
116        delay_ms: u64,
117        dynamic_header: Option<String>,
118    },
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct CanonicalWhenSpec {
123    pub predicate: LanguageExpressionDef,
124    pub steps: Vec<CanonicalStepSpec>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum CanonicalSplitExpressionSpec {
129    BodyLines,
130    BodyJsonArray,
131    Language(LanguageExpressionDef),
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub enum CanonicalSplitAggregationSpec {
136    LastWins,
137    CollectAll,
138    Original,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum CanonicalAggregateStrategySpec {
143    CollectAll,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct CanonicalAggregateSpec {
148    pub header: String,
149    pub completion_size: Option<usize>,
150    pub completion_timeout_ms: Option<u64>,
151    pub correlation_key: Option<String>,
152    pub force_completion_on_stop: Option<bool>,
153    pub discard_on_timeout: Option<bool>,
154    pub strategy: CanonicalAggregateStrategySpec,
155    pub max_buckets: Option<usize>,
156    pub bucket_ttl_ms: Option<u64>,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct CanonicalCircuitBreakerSpec {
161    pub failure_threshold: u32,
162    pub open_duration_ms: u64,
163}
164
165impl CanonicalRouteSpec {
166    pub fn new(route_id: impl Into<String>, from: impl Into<String>) -> Self {
167        Self {
168            route_id: route_id.into(),
169            from: from.into(),
170            steps: Vec::new(),
171            circuit_breaker: None,
172            version: CANONICAL_CONTRACT_VERSION,
173        }
174    }
175
176    pub fn validate_contract(&self) -> Result<(), CamelError> {
177        if self.route_id.trim().is_empty() {
178            return Err(CamelError::RouteError(
179                "canonical contract violation: route_id cannot be empty".to_string(),
180            ));
181        }
182        if self.from.trim().is_empty() {
183            return Err(CamelError::RouteError(
184                "canonical contract violation: from cannot be empty".to_string(),
185            ));
186        }
187        if self.version != CANONICAL_CONTRACT_VERSION {
188            return Err(CamelError::RouteError(format!(
189                "canonical contract violation: expected version {}, got {}",
190                CANONICAL_CONTRACT_VERSION, self.version
191            )));
192        }
193        validate_steps(&self.steps)?;
194        if let Some(cb) = &self.circuit_breaker {
195            if cb.failure_threshold == 0 {
196                return Err(CamelError::RouteError(
197                    "canonical contract violation: circuit_breaker.failure_threshold must be > 0"
198                        .to_string(),
199                ));
200            }
201            if cb.open_duration_ms == 0 {
202                return Err(CamelError::RouteError(
203                    "canonical contract violation: circuit_breaker.open_duration_ms must be > 0"
204                        .to_string(),
205                ));
206            }
207        }
208        Ok(())
209    }
210}
211
212fn validate_steps(steps: &[CanonicalStepSpec]) -> Result<(), CamelError> {
213    for step in steps {
214        match step {
215            CanonicalStepSpec::To { uri } | CanonicalStepSpec::WireTap { uri } => {
216                if uri.trim().is_empty() {
217                    return Err(CamelError::RouteError(
218                        "canonical contract violation: endpoint uri cannot be empty".to_string(),
219                    ));
220                }
221            }
222            CanonicalStepSpec::Filter { steps, .. } => validate_steps(steps)?,
223            CanonicalStepSpec::Choice { whens, otherwise } => {
224                for when in whens {
225                    validate_steps(&when.steps)?;
226                }
227                if let Some(otherwise) = otherwise {
228                    validate_steps(otherwise)?;
229                }
230            }
231            CanonicalStepSpec::Split {
232                parallel_limit,
233                steps,
234                ..
235            } => {
236                if let Some(limit) = parallel_limit
237                    && *limit == 0
238                {
239                    return Err(CamelError::RouteError(
240                        "canonical contract violation: split.parallel_limit must be > 0"
241                            .to_string(),
242                    ));
243                }
244                validate_steps(steps)?;
245            }
246            CanonicalStepSpec::Aggregate { config } => {
247                if config.header.trim().is_empty() {
248                    return Err(CamelError::RouteError(
249                        "canonical contract violation: aggregate.header cannot be empty"
250                            .to_string(),
251                    ));
252                }
253                if let Some(size) = config.completion_size
254                    && size == 0
255                {
256                    return Err(CamelError::RouteError(
257                        "canonical contract violation: aggregate.completion_size must be > 0"
258                            .to_string(),
259                    ));
260                }
261            }
262            CanonicalStepSpec::Log { .. }
263            | CanonicalStepSpec::Script { .. }
264            | CanonicalStepSpec::Stop
265            | CanonicalStepSpec::Delay { .. } => {}
266        }
267    }
268    Ok(())
269}
270
271#[derive(Debug, Clone, PartialEq, Eq)]
272pub enum RuntimeCommand {
273    RegisterRoute {
274        spec: CanonicalRouteSpec,
275        command_id: String,
276        causation_id: Option<String>,
277    },
278    StartRoute {
279        route_id: String,
280        command_id: String,
281        causation_id: Option<String>,
282    },
283    StopRoute {
284        route_id: String,
285        command_id: String,
286        causation_id: Option<String>,
287    },
288    SuspendRoute {
289        route_id: String,
290        command_id: String,
291        causation_id: Option<String>,
292    },
293    ResumeRoute {
294        route_id: String,
295        command_id: String,
296        causation_id: Option<String>,
297    },
298    ReloadRoute {
299        route_id: String,
300        command_id: String,
301        causation_id: Option<String>,
302    },
303    /// Internal lifecycle command emitted by runtime adapters when a route crashes at runtime.
304    ///
305    /// This keeps aggregate/projection state aligned with controller-observed failures.
306    FailRoute {
307        route_id: String,
308        error: String,
309        command_id: String,
310        causation_id: Option<String>,
311    },
312    RemoveRoute {
313        route_id: String,
314        command_id: String,
315        causation_id: Option<String>,
316    },
317}
318
319impl RuntimeCommand {
320    pub fn command_id(&self) -> &str {
321        match self {
322            RuntimeCommand::RegisterRoute { command_id, .. }
323            | RuntimeCommand::StartRoute { command_id, .. }
324            | RuntimeCommand::StopRoute { command_id, .. }
325            | RuntimeCommand::SuspendRoute { command_id, .. }
326            | RuntimeCommand::ResumeRoute { command_id, .. }
327            | RuntimeCommand::ReloadRoute { command_id, .. }
328            | RuntimeCommand::FailRoute { command_id, .. }
329            | RuntimeCommand::RemoveRoute { command_id, .. } => command_id,
330        }
331    }
332
333    pub fn causation_id(&self) -> Option<&str> {
334        match self {
335            RuntimeCommand::RegisterRoute { causation_id, .. }
336            | RuntimeCommand::StartRoute { causation_id, .. }
337            | RuntimeCommand::StopRoute { causation_id, .. }
338            | RuntimeCommand::SuspendRoute { causation_id, .. }
339            | RuntimeCommand::ResumeRoute { causation_id, .. }
340            | RuntimeCommand::ReloadRoute { causation_id, .. }
341            | RuntimeCommand::FailRoute { causation_id, .. }
342            | RuntimeCommand::RemoveRoute { causation_id, .. } => causation_id.as_deref(),
343        }
344    }
345}
346
347#[derive(Debug, Clone, PartialEq, Eq)]
348pub enum RuntimeCommandResult {
349    Accepted,
350    Duplicate { command_id: String },
351    RouteRegistered { route_id: String },
352    RouteStateChanged { route_id: String, status: String },
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub enum RuntimeQuery {
357    GetRouteStatus {
358        route_id: String,
359    },
360    /// **Note:** This variant is intercepted by `RuntimeBus::ask` *before* reaching
361    /// `execute_query`. Do not handle it in `execute_query` — it has no access to
362    /// the in-flight counter. See `runtime_bus.rs` for the intercept.
363    InFlightCount {
364        route_id: String,
365    },
366    ListRoutes,
367}
368
369#[derive(Debug, Clone, PartialEq, Eq)]
370pub enum RuntimeQueryResult {
371    InFlightCount { route_id: String, count: u64 },
372    RouteNotFound { route_id: String },
373    RouteStatus { route_id: String, status: String },
374    Routes { route_ids: Vec<String> },
375}
376
377#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
378pub enum RuntimeEvent {
379    RouteRegistered { route_id: String },
380    RouteStartRequested { route_id: String },
381    RouteStarted { route_id: String },
382    RouteFailed { route_id: String, error: String },
383    RouteStopped { route_id: String },
384    RouteSuspended { route_id: String },
385    RouteResumed { route_id: String },
386    RouteReloaded { route_id: String },
387    RouteRemoved { route_id: String },
388}
389
390#[async_trait]
391pub trait RuntimeCommandBus: Send + Sync {
392    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError>;
393}
394
395#[async_trait]
396pub trait RuntimeQueryBus: Send + Sync {
397    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError>;
398}
399
400pub trait RuntimeHandle: RuntimeCommandBus + RuntimeQueryBus {}
401
402impl<T> RuntimeHandle for T where T: RuntimeCommandBus + RuntimeQueryBus {}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn command_and_query_ids_are_exposed() {
410        let cmd = RuntimeCommand::StartRoute {
411            route_id: "r1".into(),
412            command_id: "c1".into(),
413            causation_id: None,
414        };
415        assert_eq!(cmd.command_id(), "c1");
416    }
417
418    #[test]
419    fn canonical_spec_requires_route_id_and_from() {
420        let spec = CanonicalRouteSpec::new("r1", "timer:tick");
421        assert_eq!(spec.route_id, "r1");
422        assert_eq!(spec.from, "timer:tick");
423        assert_eq!(spec.version, CANONICAL_CONTRACT_VERSION);
424        assert!(spec.steps.is_empty());
425        assert!(spec.circuit_breaker.is_none());
426    }
427
428    #[test]
429    fn canonical_contract_rejects_invalid_version() {
430        let mut spec = CanonicalRouteSpec::new("r1", "timer:tick");
431        spec.version = 2;
432        let err = spec.validate_contract().unwrap_err().to_string();
433        assert!(err.contains("expected version"));
434    }
435
436    #[test]
437    fn canonical_contract_declares_subset_scope() {
438        assert!(canonical_contract_supports_step("to"));
439        assert!(canonical_contract_supports_step("split"));
440        assert!(!canonical_contract_supports_step("set_header"));
441
442        assert!(CANONICAL_CONTRACT_DECLARATIVE_ONLY_STEPS.contains(&"split"));
443        assert!(CANONICAL_CONTRACT_EXCLUDED_DECLARATIVE_STEPS.contains(&"set_header"));
444        assert!(CANONICAL_CONTRACT_RUST_ONLY_STEPS.contains(&"processor"));
445    }
446
447    #[test]
448    fn canonical_contract_rejection_reason_is_explicit() {
449        let set_header_reason = canonical_contract_rejection_reason("set_header")
450            .expect("set_header should have explicit reason");
451        assert!(set_header_reason.contains("out-of-scope"));
452
453        let processor_reason = canonical_contract_rejection_reason("processor")
454            .expect("processor should be rust-only");
455        assert!(processor_reason.contains("rust-only"));
456
457        let split_reason = canonical_contract_rejection_reason("split")
458            .expect("split should require declarative form");
459        assert!(split_reason.contains("declarative"));
460    }
461
462    #[test]
463    fn command_causation_id_is_exposed() {
464        let cmd = RuntimeCommand::StopRoute {
465            route_id: "r1".into(),
466            command_id: "c2".into(),
467            causation_id: Some("c1".into()),
468        };
469        assert_eq!(cmd.command_id(), "c2");
470        assert_eq!(cmd.causation_id(), Some("c1"));
471    }
472}