use crate::types::Outcome;
use crate::types::cancel::CancelReason;
use crate::types::outcome::PanicPayload;
use core::fmt;
use std::marker::PhantomData;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PipelineConfig {
pub check_cancellation: bool,
pub continue_on_error: bool,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self::new()
}
}
impl PipelineConfig {
#[must_use]
pub const fn new() -> Self {
Self {
check_cancellation: true,
continue_on_error: false,
}
}
#[must_use]
pub const fn with_cancellation_check() -> Self {
Self {
check_cancellation: true,
continue_on_error: false,
}
}
#[must_use]
pub const fn without_cancellation_check() -> Self {
Self {
check_cancellation: false,
continue_on_error: false,
}
}
}
#[derive(Debug)]
pub struct Pipeline<T> {
pub config: PipelineConfig,
_t: PhantomData<T>,
}
impl<T> Pipeline<T> {
#[must_use]
pub const fn new() -> Self {
Self {
config: PipelineConfig::new(),
_t: PhantomData,
}
}
#[must_use]
pub const fn with_config(config: PipelineConfig) -> Self {
Self {
config,
_t: PhantomData,
}
}
}
impl<T> Clone for Pipeline<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for Pipeline<T> {}
impl<T> Default for Pipeline<T> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FailedStage {
pub index: usize,
pub total_stages: usize,
}
impl FailedStage {
#[must_use]
pub const fn new(index: usize, total_stages: usize) -> Self {
Self {
index,
total_stages,
}
}
#[must_use]
pub const fn is_first(&self) -> bool {
self.index == 0
}
#[must_use]
pub const fn is_last(&self) -> bool {
self.index + 1 == self.total_stages
}
#[must_use]
pub const fn stage_number(&self) -> usize {
self.index + 1
}
}
impl fmt::Display for FailedStage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "stage {}/{}", self.stage_number(), self.total_stages)
}
}
#[derive(Debug, Clone)]
pub enum PipelineResult<T, E> {
Completed {
value: T,
stages_completed: usize,
},
Failed {
error: E,
failed_at: FailedStage,
},
Cancelled {
reason: CancelReason,
cancelled_at: FailedStage,
},
Panicked {
payload: PanicPayload,
panicked_at: FailedStage,
},
}
impl<T, E> PipelineResult<T, E> {
#[must_use]
pub const fn completed(value: T, stages_completed: usize) -> Self {
Self::Completed {
value,
stages_completed,
}
}
#[must_use]
pub const fn failed(error: E, failed_at: FailedStage) -> Self {
Self::Failed { error, failed_at }
}
#[must_use]
pub const fn cancelled(reason: CancelReason, cancelled_at: FailedStage) -> Self {
Self::Cancelled {
reason,
cancelled_at,
}
}
#[must_use]
pub const fn panicked(payload: PanicPayload, panicked_at: FailedStage) -> Self {
Self::Panicked {
payload,
panicked_at,
}
}
#[must_use]
pub const fn is_completed(&self) -> bool {
matches!(self, Self::Completed { .. })
}
#[must_use]
pub const fn is_failed(&self) -> bool {
matches!(self, Self::Failed { .. })
}
#[must_use]
pub const fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled { .. })
}
#[must_use]
pub const fn is_panicked(&self) -> bool {
matches!(self, Self::Panicked { .. })
}
pub fn into_outcome(self) -> Outcome<T, E> {
match self {
Self::Completed { value, .. } => Outcome::Ok(value),
Self::Failed { error, .. } => Outcome::Err(error),
Self::Cancelled { reason, .. } => Outcome::Cancelled(reason),
Self::Panicked { payload, .. } => Outcome::Panicked(payload),
}
}
#[must_use]
pub const fn stages_executed(&self) -> usize {
match self {
Self::Completed {
stages_completed, ..
} => *stages_completed,
Self::Failed { failed_at, .. } => failed_at.index + 1,
Self::Cancelled { cancelled_at, .. } => cancelled_at.index,
Self::Panicked { panicked_at, .. } => panicked_at.index + 1,
}
}
}
#[derive(Debug, Clone)]
pub enum PipelineError<E> {
StageError {
error: E,
stage: FailedStage,
},
Cancelled {
reason: CancelReason,
stage: FailedStage,
},
Panicked {
payload: PanicPayload,
stage: FailedStage,
},
}
impl<E: fmt::Display> fmt::Display for PipelineError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::StageError { error, stage } => {
write!(f, "pipeline failed at {stage}: {error}")
}
Self::Cancelled { reason, stage } => {
write!(f, "pipeline cancelled at {stage}: {reason}")
}
Self::Panicked { payload, stage } => {
write!(f, "pipeline panicked at {stage}: {payload}")
}
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for PipelineError<E> {}
#[must_use]
pub fn stage_outcome_to_result<T, E>(
outcome: Outcome<T, E>,
stage_index: usize,
total_stages: usize,
) -> Option<PipelineResult<T, E>> {
let stage = FailedStage::new(stage_index, total_stages);
match outcome {
Outcome::Ok(_) => None, Outcome::Err(e) => Some(PipelineResult::failed(e, stage)),
Outcome::Cancelled(r) => Some(PipelineResult::cancelled(r, stage)),
Outcome::Panicked(p) => Some(PipelineResult::panicked(p, stage)),
}
}
#[must_use]
pub fn pipeline2_outcomes<T, E>(
o1: Outcome<T, E>,
o2: Option<Outcome<T, E>>,
) -> PipelineResult<T, E> {
const TOTAL_STAGES: usize = 2;
if let Some(result) = stage_outcome_to_result(o1, 0, TOTAL_STAGES) {
return result;
}
match o2 {
Some(Outcome::Ok(v)) => PipelineResult::completed(v, TOTAL_STAGES),
Some(outcome) => {
stage_outcome_to_result(outcome, 1, TOTAL_STAGES)
.expect("non-Ok should return Some result")
}
None => PipelineResult::panicked(
PanicPayload::new("o2 must be provided when o1 succeeds"),
FailedStage::new(1, TOTAL_STAGES),
),
}
}
#[must_use]
pub fn pipeline3_outcomes<T, E>(
o1: Outcome<T, E>,
o2: Option<Outcome<T, E>>,
o3: Option<Outcome<T, E>>,
) -> PipelineResult<T, E> {
const TOTAL_STAGES: usize = 3;
if let Some(result) = stage_outcome_to_result(o1, 0, TOTAL_STAGES) {
return result;
}
match o2 {
Some(outcome) => {
if let Some(result) = stage_outcome_to_result(outcome, 1, TOTAL_STAGES) {
return result;
}
}
None => {
return PipelineResult::panicked(
PanicPayload::new("o2 must be provided when o1 succeeds"),
FailedStage::new(1, TOTAL_STAGES),
);
}
}
match o3 {
Some(Outcome::Ok(v)) => PipelineResult::completed(v, TOTAL_STAGES),
Some(outcome) => {
stage_outcome_to_result(outcome, 2, TOTAL_STAGES)
.expect("non-Ok should return Some result")
}
None => PipelineResult::panicked(
PanicPayload::new("o3 must be provided when o1 and o2 succeed"),
FailedStage::new(2, TOTAL_STAGES),
),
}
}
#[must_use]
pub fn pipeline_n_outcomes<T, E>(
outcomes: Vec<Outcome<T, E>>,
total_stages: usize,
) -> PipelineResult<T, E> {
assert!(!outcomes.is_empty(), "outcomes must not be empty");
assert!(outcomes.len() <= total_stages, "more outcomes than stages");
let num_provided = outcomes.len();
let mut last_ok_value: Option<T> = None;
for (index, outcome) in outcomes.into_iter().enumerate() {
match outcome {
Outcome::Ok(v) => {
last_ok_value = Some(v);
}
Outcome::Err(e) => {
return PipelineResult::failed(e, FailedStage::new(index, total_stages));
}
Outcome::Cancelled(r) => {
return PipelineResult::cancelled(r, FailedStage::new(index, total_stages));
}
Outcome::Panicked(p) => {
return PipelineResult::panicked(p, FailedStage::new(index, total_stages));
}
}
}
if num_provided == total_stages {
PipelineResult::completed(
last_ok_value.expect("at least one outcome was provided"),
total_stages,
)
} else {
PipelineResult::completed(
last_ok_value.expect("at least one outcome was provided"),
num_provided,
)
}
}
#[must_use]
pub fn pipeline_with_final<T, E>(
intermediate_outcomes: Vec<Outcome<T, E>>,
final_outcome: Outcome<T, E>,
total_stages: usize,
) -> PipelineResult<T, E> {
assert!(total_stages > 0, "total_stages must be positive");
assert!(
intermediate_outcomes.len() + 1 == total_stages,
"intermediate_outcomes.len() ({}) + 1 must equal total_stages ({})",
intermediate_outcomes.len(),
total_stages
);
for (index, outcome) in intermediate_outcomes.into_iter().enumerate() {
if let Some(result) = stage_outcome_to_result(outcome, index, total_stages) {
return result;
}
}
let final_index = total_stages - 1;
match final_outcome {
Outcome::Ok(v) => PipelineResult::completed(v, total_stages),
Outcome::Err(e) => PipelineResult::failed(e, FailedStage::new(final_index, total_stages)),
Outcome::Cancelled(r) => {
PipelineResult::cancelled(r, FailedStage::new(final_index, total_stages))
}
Outcome::Panicked(p) => {
PipelineResult::panicked(p, FailedStage::new(final_index, total_stages))
}
}
}
pub fn pipeline_to_result<T, E>(result: PipelineResult<T, E>) -> Result<T, PipelineError<E>> {
match result {
PipelineResult::Completed { value, .. } => Ok(value),
PipelineResult::Failed { error, failed_at } => Err(PipelineError::StageError {
error,
stage: failed_at,
}),
PipelineResult::Cancelled {
reason,
cancelled_at,
} => Err(PipelineError::Cancelled {
reason,
stage: cancelled_at,
}),
PipelineResult::Panicked {
payload,
panicked_at,
} => Err(PipelineError::Panicked {
payload,
stage: panicked_at,
}),
}
}
#[macro_export]
macro_rules! pipeline {
($cx:expr, $input:expr, $($stage:expr),+ $(,)?) => {
{
let __pipeline_cx = &$cx;
async move {
let mut __pipeline_value = $input;
$(
__pipeline_value = ($stage)(__pipeline_cx, __pipeline_value).await;
)+
__pipeline_value
}
}
};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pipeline_config_default() {
let config = PipelineConfig::default();
assert!(config.check_cancellation);
assert!(!config.continue_on_error);
}
#[test]
fn pipeline_config_with_cancellation_check() {
let config = PipelineConfig::with_cancellation_check();
assert!(config.check_cancellation);
}
#[test]
fn pipeline_config_without_cancellation_check() {
let config = PipelineConfig::without_cancellation_check();
assert!(!config.check_cancellation);
}
#[test]
fn pipeline_creation() {
let pipeline = Pipeline::<()>::new();
assert!(pipeline.config.check_cancellation);
}
#[test]
fn pipeline_with_config() {
let config = PipelineConfig::without_cancellation_check();
let pipeline = Pipeline::<()>::with_config(config);
assert!(!pipeline.config.check_cancellation);
}
#[test]
fn pipeline_clone_and_copy() {
let p1 = Pipeline::<()>::new();
let p2 = p1; let p3 = p1;
assert_eq!(p1.config.check_cancellation, p2.config.check_cancellation);
assert_eq!(p1.config.check_cancellation, p3.config.check_cancellation);
}
#[test]
fn pipeline_macro_chains_stages_sequentially() {
let cx = crate::cx::Cx::for_testing();
let fut = crate::pipeline!(cx, 2usize, |_, x| async move { x + 3 }, |_, x| async move {
x * 4
});
let out = futures_lite::future::block_on(fut);
assert_eq!(out, 20);
}
#[test]
fn failed_stage_first() {
let stage = FailedStage::new(0, 3);
assert!(stage.is_first());
assert!(!stage.is_last());
assert_eq!(stage.stage_number(), 1);
}
#[test]
fn failed_stage_middle() {
let stage = FailedStage::new(1, 3);
assert!(!stage.is_first());
assert!(!stage.is_last());
assert_eq!(stage.stage_number(), 2);
}
#[test]
fn failed_stage_last() {
let stage = FailedStage::new(2, 3);
assert!(!stage.is_first());
assert!(stage.is_last());
assert_eq!(stage.stage_number(), 3);
}
#[test]
fn failed_stage_display() {
let stage = FailedStage::new(1, 5);
assert_eq!(stage.to_string(), "stage 2/5");
}
#[test]
fn pipeline_result_completed() {
let result: PipelineResult<i32, &str> = PipelineResult::completed(42, 3);
assert!(result.is_completed());
assert!(!result.is_failed());
assert!(!result.is_cancelled());
assert!(!result.is_panicked());
assert_eq!(result.stages_executed(), 3);
}
#[test]
fn pipeline_result_failed() {
let result: PipelineResult<i32, &str> =
PipelineResult::failed("error", FailedStage::new(1, 3));
assert!(!result.is_completed());
assert!(result.is_failed());
assert_eq!(result.stages_executed(), 2); }
#[test]
fn pipeline_result_cancelled() {
let result: PipelineResult<i32, &str> =
PipelineResult::cancelled(CancelReason::shutdown(), FailedStage::new(1, 3));
assert!(!result.is_completed());
assert!(result.is_cancelled());
assert_eq!(result.stages_executed(), 1); }
#[test]
fn pipeline_result_panicked() {
let result: PipelineResult<i32, &str> =
PipelineResult::panicked(PanicPayload::new("boom"), FailedStage::new(2, 3));
assert!(!result.is_completed());
assert!(result.is_panicked());
assert_eq!(result.stages_executed(), 3); }
#[test]
fn pipeline_result_into_outcome() {
let completed: PipelineResult<i32, &str> = PipelineResult::completed(42, 3);
assert!(matches!(completed.into_outcome(), Outcome::Ok(42)));
let failed: PipelineResult<i32, &str> =
PipelineResult::failed("error", FailedStage::new(0, 1));
assert!(matches!(failed.into_outcome(), Outcome::Err("error")));
let cancelled: PipelineResult<i32, &str> =
PipelineResult::cancelled(CancelReason::shutdown(), FailedStage::new(0, 1));
assert!(cancelled.into_outcome().is_cancelled());
let panicked: PipelineResult<i32, &str> =
PipelineResult::panicked(PanicPayload::new("oops"), FailedStage::new(0, 1));
assert!(panicked.into_outcome().is_panicked());
}
#[test]
fn stage_outcome_ok_returns_none() {
let result = stage_outcome_to_result::<i32, &str>(Outcome::Ok(42), 0, 3);
assert!(result.is_none());
}
#[test]
fn stage_outcome_err_returns_failed() {
let result = stage_outcome_to_result::<i32, &str>(Outcome::Err("error"), 1, 3);
assert!(result.is_some());
assert!(result.unwrap().is_failed());
}
#[test]
fn stage_outcome_cancelled_returns_cancelled() {
let result = stage_outcome_to_result::<i32, &str>(
Outcome::Cancelled(CancelReason::shutdown()),
2,
3,
);
assert!(result.is_some());
assert!(result.unwrap().is_cancelled());
}
#[test]
fn stage_outcome_panicked_returns_panicked() {
let result = stage_outcome_to_result::<i32, &str>(
Outcome::Panicked(PanicPayload::new("boom")),
0,
3,
);
assert!(result.is_some());
assert!(result.unwrap().is_panicked());
}
#[test]
fn pipeline2_both_ok() {
let result = pipeline2_outcomes::<i32, &str>(Outcome::Ok(1), Some(Outcome::Ok(2)));
assert!(result.is_completed());
if let PipelineResult::Completed {
value,
stages_completed,
} = result
{
assert_eq!(value, 2);
assert_eq!(stages_completed, 2);
} else {
unreachable!("Expected Completed");
}
}
#[test]
fn pipeline2_first_fails() {
let result = pipeline2_outcomes::<i32, &str>(Outcome::Err("stage1 error"), None);
assert!(result.is_failed());
if let PipelineResult::Failed { error, failed_at } = result {
assert_eq!(error, "stage1 error");
assert!(failed_at.is_first());
} else {
unreachable!("Expected Failed");
}
}
#[test]
fn pipeline2_second_fails() {
let result = pipeline2_outcomes(Outcome::Ok(1), Some(Outcome::Err("stage2 error")));
assert!(result.is_failed());
if let PipelineResult::Failed { error, failed_at } = result {
assert_eq!(error, "stage2 error");
assert!(failed_at.is_last());
assert_eq!(failed_at.index, 1);
} else {
unreachable!("Expected Failed");
}
}
#[test]
fn pipeline2_first_cancelled() {
let result =
pipeline2_outcomes::<i32, &str>(Outcome::Cancelled(CancelReason::shutdown()), None);
assert!(result.is_cancelled());
}
#[test]
fn pipeline2_panicked_when_o2_missing() {
let result = pipeline2_outcomes::<i32, &str>(Outcome::Ok(1), None);
assert!(result.is_panicked());
if let PipelineResult::Panicked {
payload,
panicked_at,
} = result
{
assert_eq!(payload.message(), "o2 must be provided when o1 succeeds");
assert_eq!(panicked_at.index, 1);
} else {
panic!("Expected Panicked");
}
}
#[test]
fn pipeline3_all_ok() {
let result = pipeline3_outcomes::<i32, &str>(
Outcome::<i32, &str>::Ok(1),
Some(Outcome::Ok(2)),
Some(Outcome::Ok(3)),
);
assert!(result.is_completed());
if let PipelineResult::Completed {
value,
stages_completed,
} = result
{
assert_eq!(value, 3);
assert_eq!(stages_completed, 3);
} else {
unreachable!("Expected Completed");
}
}
#[test]
fn pipeline3_first_fails() {
let result = pipeline3_outcomes::<i32, &str>(Outcome::Err("s1"), None, None);
assert!(result.is_failed());
if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 0);
}
}
#[test]
fn pipeline3_second_fails() {
let result =
pipeline3_outcomes::<i32, &str>(Outcome::Ok(1), Some(Outcome::Err("s2")), None);
assert!(result.is_failed());
if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 1);
}
}
#[test]
fn pipeline3_third_fails() {
let result = pipeline3_outcomes(
Outcome::Ok(1),
Some(Outcome::Ok(2)),
Some(Outcome::Err("s3")),
);
assert!(result.is_failed());
if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 2);
assert!(failed_at.is_last());
}
}
#[test]
fn pipeline3_panicked_when_o2_missing() {
let result = pipeline3_outcomes::<i32, &str>(Outcome::Ok(1), None, None);
assert!(result.is_panicked());
if let PipelineResult::Panicked {
payload,
panicked_at,
} = result
{
assert_eq!(payload.message(), "o2 must be provided when o1 succeeds");
assert_eq!(panicked_at.index, 1);
} else {
panic!("Expected Panicked");
}
}
#[test]
fn pipeline3_panicked_when_o3_missing() {
let result = pipeline3_outcomes::<i32, &str>(Outcome::Ok(1), Some(Outcome::Ok(2)), None);
assert!(result.is_panicked());
if let PipelineResult::Panicked {
payload,
panicked_at,
} = result
{
assert_eq!(
payload.message(),
"o3 must be provided when o1 and o2 succeed"
);
assert_eq!(panicked_at.index, 2);
} else {
panic!("Expected Panicked");
}
}
#[test]
fn pipeline_with_final_all_ok() {
let intermediates: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1), Outcome::Ok(2)];
let result = pipeline_with_final(intermediates, Outcome::Ok(42), 3);
assert!(result.is_completed());
if let PipelineResult::Completed { value, .. } = result {
assert_eq!(value, 42);
}
}
#[test]
fn pipeline_with_final_intermediate_fails() {
let intermediates: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1), Outcome::Err("mid fail")];
let result = pipeline_with_final(intermediates, Outcome::Ok(42), 3);
assert!(result.is_failed());
if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 1);
}
}
#[test]
fn pipeline_with_final_final_fails() {
let intermediates: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1), Outcome::Ok(2)];
let result = pipeline_with_final(intermediates, Outcome::Err("final fail"), 3);
assert!(result.is_failed());
if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 2);
assert!(failed_at.is_last());
}
}
#[test]
fn pipeline_to_result_completed() {
let result: PipelineResult<i32, &str> = PipelineResult::completed(42, 3);
assert_eq!(pipeline_to_result(result).unwrap(), 42);
}
#[test]
fn pipeline_to_result_failed() {
let result: PipelineResult<i32, &str> =
PipelineResult::failed("error", FailedStage::new(1, 3));
let err = pipeline_to_result(result).unwrap_err();
assert!(matches!(err, PipelineError::StageError { .. }));
}
#[test]
fn pipeline_to_result_cancelled() {
let result: PipelineResult<i32, &str> =
PipelineResult::cancelled(CancelReason::shutdown(), FailedStage::new(0, 3));
let err = pipeline_to_result(result).unwrap_err();
assert!(matches!(err, PipelineError::Cancelled { .. }));
}
#[test]
fn pipeline_to_result_panicked() {
let result: PipelineResult<i32, &str> =
PipelineResult::panicked(PanicPayload::new("boom"), FailedStage::new(2, 3));
let err = pipeline_to_result(result).unwrap_err();
assert!(matches!(err, PipelineError::Panicked { .. }));
}
#[test]
fn pipeline_error_display_stage_error() {
let err: PipelineError<&str> = PipelineError::StageError {
error: "test error",
stage: FailedStage::new(1, 3),
};
let display = err.to_string();
assert!(display.contains("stage 2/3"));
assert!(display.contains("test error"));
}
#[test]
fn pipeline_error_display_cancelled() {
let err: PipelineError<&str> = PipelineError::Cancelled {
reason: CancelReason::shutdown(),
stage: FailedStage::new(0, 2),
};
let display = err.to_string();
assert!(display.contains("cancelled"));
assert!(display.contains("stage 1/2"));
}
#[test]
fn pipeline_error_display_panicked() {
let err: PipelineError<&str> = PipelineError::Panicked {
payload: PanicPayload::new("boom"),
stage: FailedStage::new(2, 3),
};
let display = err.to_string();
assert!(display.contains("panicked"));
assert!(display.contains("boom"));
}
#[test]
fn pipeline_n_all_ok() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = pipeline_n_outcomes(outcomes, 3);
assert!(result.is_completed());
if let PipelineResult::Completed {
value,
stages_completed,
} = result
{
assert_eq!(value, 3);
assert_eq!(stages_completed, 3);
} else {
unreachable!("Expected Completed");
}
}
#[test]
fn pipeline_n_first_error() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Err("fail"), Outcome::Ok(2)];
let result = pipeline_n_outcomes(outcomes, 3);
assert!(result.is_failed());
if let PipelineResult::Failed { error, failed_at } = result {
assert_eq!(error, "fail");
assert_eq!(failed_at.index, 0);
assert_eq!(failed_at.total_stages, 3);
} else {
unreachable!("Expected Failed");
}
}
#[test]
fn pipeline_n_middle_cancel() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Cancelled(CancelReason::shutdown())];
let result = pipeline_n_outcomes(outcomes, 4);
assert!(result.is_cancelled());
if let PipelineResult::Cancelled { cancelled_at, .. } = result {
assert_eq!(cancelled_at.index, 1);
assert_eq!(cancelled_at.total_stages, 4);
} else {
panic!("Expected Cancelled");
}
}
#[test]
fn pipeline_n_partial_completion() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(10), Outcome::Ok(20)];
let result = pipeline_n_outcomes(outcomes, 5);
assert!(result.is_completed());
if let PipelineResult::Completed {
value,
stages_completed,
} = result
{
assert_eq!(value, 20);
assert_eq!(stages_completed, 2); } else {
unreachable!("Expected Completed");
}
}
#[test]
fn pipeline_n_single_ok() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(42)];
let result = pipeline_n_outcomes(outcomes, 1);
assert!(result.is_completed());
if let PipelineResult::Completed {
value,
stages_completed,
} = result
{
assert_eq!(value, 42);
assert_eq!(stages_completed, 1);
} else {
unreachable!("Expected Completed");
}
}
#[test]
fn pipeline_n_single_error() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Err("only stage fails")];
let result = pipeline_n_outcomes(outcomes, 1);
assert!(result.is_failed());
if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 0);
assert!(failed_at.is_first());
assert!(failed_at.is_last());
} else {
unreachable!("Expected Failed");
}
}
#[test]
fn pipeline_n_panic_mid_pipeline() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Ok(2),
Outcome::Panicked(PanicPayload::new("stage 3 panicked")),
];
let result = pipeline_n_outcomes(outcomes, 4);
assert!(result.is_panicked());
if let PipelineResult::Panicked { panicked_at, .. } = result {
assert_eq!(panicked_at.index, 2);
assert_eq!(panicked_at.total_stages, 4);
} else {
panic!("Expected Panicked");
}
}
#[test]
#[should_panic(expected = "outcomes must not be empty")]
fn pipeline_n_empty_outcomes_panics() {
let outcomes: Vec<Outcome<i32, &str>> = vec![];
let _ = pipeline_n_outcomes(outcomes, 3);
}
#[test]
#[should_panic(expected = "more outcomes than stages")]
fn pipeline_n_too_many_outcomes_panics() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let _ = pipeline_n_outcomes(outcomes, 2);
}
#[test]
#[should_panic(expected = "total_stages must be positive")]
fn pipeline_with_final_zero_stages_panics() {
let intermediates: Vec<Outcome<i32, &str>> = vec![];
let _ = pipeline_with_final(intermediates, Outcome::Ok(42), 0);
}
#[test]
#[should_panic(expected = "must equal total_stages")]
fn pipeline_with_final_mismatched_stages_panics() {
let intermediates: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1)];
let _ = pipeline_with_final(intermediates, Outcome::Ok(42), 5);
}
#[test]
fn pipeline_with_final_cancelled_final() {
let intermediates: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1), Outcome::Ok(2)];
let result = pipeline_with_final(
intermediates,
Outcome::Cancelled(CancelReason::shutdown()),
3,
);
assert!(result.is_cancelled());
if let PipelineResult::Cancelled { cancelled_at, .. } = result {
assert_eq!(cancelled_at.index, 2);
assert!(cancelled_at.is_last());
} else {
panic!("Expected Cancelled");
}
}
#[test]
fn pipeline_with_final_panicked_final() {
let intermediates: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1)];
let result = pipeline_with_final(
intermediates,
Outcome::Panicked(PanicPayload::new("final boom")),
2,
);
assert!(result.is_panicked());
if let PipelineResult::Panicked { panicked_at, .. } = result {
assert_eq!(panicked_at.index, 1);
assert!(panicked_at.is_last());
} else {
panic!("Expected Panicked");
}
}
#[test]
fn pipeline_with_final_single_stage() {
let intermediates: Vec<Outcome<i32, &str>> = vec![];
let result = pipeline_with_final(intermediates, Outcome::Ok(99), 1);
assert!(result.is_completed());
if let PipelineResult::Completed { value, .. } = result {
assert_eq!(value, 99);
} else {
unreachable!("Expected Completed");
}
}
#[test]
fn error_short_circuits_at_first_failure() {
let intermediates: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Ok(2),
Outcome::Err("stage 3 failed"),
Outcome::Ok(4), ];
let result = pipeline_with_final(intermediates, Outcome::Ok(999), 5);
assert!(result.is_failed());
assert_eq!(result.stages_executed(), 3); if let PipelineResult::Failed { failed_at, .. } = result {
assert_eq!(failed_at.index, 2);
assert_eq!(failed_at.total_stages, 5);
}
}
#[test]
fn cancelled_stops_at_boundary() {
let intermediates: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Cancelled(CancelReason::shutdown())];
let result = pipeline_with_final(intermediates, Outcome::Ok(42), 3);
assert!(result.is_cancelled());
if let PipelineResult::Cancelled { cancelled_at, .. } = result {
assert_eq!(cancelled_at.index, 1);
}
}
#[test]
fn stages_executed_reflects_actual_execution() {
let completed: PipelineResult<i32, &str> = PipelineResult::completed(42, 5);
assert_eq!(completed.stages_executed(), 5);
let failed: PipelineResult<i32, &str> =
PipelineResult::failed("err", FailedStage::new(1, 5));
assert_eq!(failed.stages_executed(), 2);
let cancelled: PipelineResult<i32, &str> =
PipelineResult::cancelled(CancelReason::shutdown(), FailedStage::new(2, 5));
assert_eq!(cancelled.stages_executed(), 2); }
#[test]
fn pipeline_config_debug_clone_copy_eq_default() {
let cfg = PipelineConfig::default();
let dbg = format!("{cfg:?}");
assert!(dbg.contains("PipelineConfig"), "{dbg}");
let copied = cfg;
let cloned = cfg;
assert_eq!(copied, cloned);
}
#[test]
fn failed_stage_debug_clone_copy_eq() {
let fs = FailedStage::new(2, 5);
let dbg = format!("{fs:?}");
assert!(dbg.contains("FailedStage"), "{dbg}");
let copied = fs;
let cloned = fs;
assert_eq!(copied, cloned);
assert_ne!(fs, FailedStage::new(3, 5));
}
}