1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//! Outcome of executing a full route pipeline (multiple steps).
//!
//! `PipelineOutcome` lives ONE LAYER ABOVE Tower. Individual processors and
//! producers continue to return `Result<Exchange, CamelError>`; `PipelineOutcome`
//! is produced only by `run_steps` (the pipeline executor) and consumed by the
//! pipeline's `Service<Exchange>` impl, which translates it back to
//! `Result<Exchange, CamelError>`. See ADR-0024.
use crate::error::CamelError;
use crate::exchange::Exchange;
/// Result of executing a full route pipeline.
///
/// Produced by `run_steps`; consumed by the pipeline's `Service<Exchange>` impl
/// and (transitively) by the route controller and consumer reply channels.
#[derive(Debug, Clone)]
pub enum PipelineOutcome {
/// Normal end of pipeline (all steps completed, or handler returned `Handled`).
Completed(Exchange),
/// `CompiledStep::Stop` was hit. The Exchange is the response state
/// (NOT discarded). Stop is successful control flow, not an error.
Stopped(Exchange),
/// Unhandled error escaped the pipeline (handler returned `Propagate`,
/// or no handler was configured and a step errored).
Failed(CamelError),
}
impl PipelineOutcome {
/// Translate to the Tower-layer `Result<Exchange, CamelError>`.
///
/// This is the canonical PUBLIC reply-channel adapter from ADR-0024 ยง3.5:
/// `Completed(ex)` and `Stopped(ex)` both become `Ok(ex)` โ indistinguishable
/// to the consumer, which is the core fix for Bug B. `Failed(err)` becomes
/// `Err(err)`.
///
/// This is the ONLY public `PipelineOutcome โ Result` translation site for
/// `Service<Exchange>::Response`. Code review MUST reject any new public
/// translation sites. (The private `eip_outcome_to_result` in camel-core is
/// the documented oracle Option E interim for nested sub-pipeline boundaries;
/// it is internal-only and removed in bd rc-5uv once EIPs become outcome-aware.)
pub fn into_tower_result(self) -> Result<Exchange, CamelError> {
match self {
PipelineOutcome::Completed(ex) | PipelineOutcome::Stopped(ex) => Ok(ex),
PipelineOutcome::Failed(err) => Err(err),
}
}
/// `true` if the pipeline reached a successful end state (`Completed` or `Stopped`).
pub fn is_success(&self) -> bool {
matches!(
self,
PipelineOutcome::Completed(_) | PipelineOutcome::Stopped(_)
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::Message;
fn ex(body: &str) -> Exchange {
Exchange::new(Message::new(body))
}
#[test]
fn completed_maps_to_ok() {
let outcome = PipelineOutcome::Completed(ex("done"));
assert!(outcome.into_tower_result().is_ok());
}
#[test]
fn stopped_maps_to_ok_indistinguishable_from_completed() {
// The core Bug B invariant: consumer cannot tell Completed from Stopped.
let stopped = PipelineOutcome::Stopped(ex("halted")).into_tower_result();
let completed = PipelineOutcome::Completed(ex("halted")).into_tower_result();
assert!(stopped.is_ok());
assert!(completed.is_ok());
// Bodies identical โ single reply path covers both.
assert_eq!(
stopped.unwrap().input.body.as_text(),
completed.unwrap().input.body.as_text()
);
}
#[test]
fn failed_maps_to_err() {
let outcome = PipelineOutcome::Failed(CamelError::ProcessorError("boom".into()));
assert!(outcome.into_tower_result().is_err());
}
#[test]
fn is_success_true_for_completed_and_stopped() {
assert!(PipelineOutcome::Completed(ex("x")).is_success());
assert!(PipelineOutcome::Stopped(ex("x")).is_success());
assert!(!PipelineOutcome::Failed(CamelError::ChannelClosed).is_success());
}
}