use serde::{Deserialize, Serialize};
pub mod combine;
pub mod error;
pub mod wait;
pub use combine::{join_all, join_any, zip};
pub use error::{ErrorKind, OutcomeError};
pub use wait::{CompensationAction, WaitCondition};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
#[non_exhaustive]
pub enum Outcome<T> {
Ok(T),
Err(OutcomeError),
Retry {
after_ms: u64,
attempt: u32,
max_attempts: u32,
reason: String,
},
Pending {
condition: WaitCondition,
#[serde(with = "crate::wire::u128_bytes")]
resume_token: u128,
},
Cancelled {
reason: String,
},
Batch(Vec<Outcome<T>>),
}
impl<T> Outcome<T> {
pub fn ok(val: T) -> Self {
Self::Ok(val)
}
pub fn err(e: OutcomeError) -> Self {
Self::Err(e)
}
pub fn cancelled(reason: impl Into<String>) -> Self {
Self::Cancelled {
reason: reason.into(),
}
}
pub fn retry(
after_ms: u64,
attempt: u32,
max_attempts: u32,
reason: impl Into<String>,
) -> Self {
Self::Retry {
after_ms,
attempt,
max_attempts,
reason: reason.into(),
}
}
pub fn pending(condition: WaitCondition, resume_token: u128) -> Self {
Self::Pending {
condition,
resume_token,
}
}
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok(_))
}
pub fn is_err(&self) -> bool {
matches!(self, Self::Err(_))
}
pub fn is_retry(&self) -> bool {
matches!(self, Self::Retry { .. })
}
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending { .. })
}
pub fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled { .. })
}
pub fn is_batch(&self) -> bool {
matches!(self, Self::Batch(_))
}
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Ok(_) | Self::Err(_) | Self::Cancelled { .. })
}
pub fn map<U, F: FnOnce(T) -> U + Clone>(self, f: F) -> Outcome<U> {
match self {
Self::Ok(v) => Outcome::Ok(f(v)),
Self::Err(e) => Outcome::Err(e),
Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
} => Outcome::Retry {
after_ms,
attempt,
max_attempts,
reason,
},
Self::Pending {
condition,
resume_token,
} => Outcome::Pending {
condition,
resume_token,
},
Self::Cancelled { reason } => Outcome::Cancelled { reason },
Self::Batch(items) => {
Outcome::Batch(items.into_iter().map(|o| o.map(f.clone())).collect())
}
}
}
pub fn and_then<U, F: FnOnce(T) -> Outcome<U> + Clone>(self, f: F) -> Outcome<U> {
match self {
Self::Ok(v) => f(v),
Self::Err(e) => Outcome::Err(e),
Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
} => Outcome::Retry {
after_ms,
attempt,
max_attempts,
reason,
},
Self::Pending {
condition,
resume_token,
} => Outcome::Pending {
condition,
resume_token,
},
Self::Cancelled { reason } => Outcome::Cancelled { reason },
Self::Batch(items) => {
Outcome::Batch(items.into_iter().map(|o| o.and_then(f.clone())).collect())
}
}
}
pub fn map_err<F: FnOnce(OutcomeError) -> OutcomeError + Clone>(self, f: F) -> Self {
match self {
Self::Err(e) => Self::Err(f(e)),
Self::Batch(items) => {
Self::Batch(items.into_iter().map(|o| o.map_err(f.clone())).collect())
}
Self::Ok(_) | Self::Retry { .. } | Self::Pending { .. } | Self::Cancelled { .. } => {
self
}
}
}
pub fn or_else<F: FnOnce(OutcomeError) -> Outcome<T> + Clone>(self, f: F) -> Outcome<T> {
match self {
Self::Err(e) => f(e),
Self::Batch(items) => {
Self::Batch(items.into_iter().map(|o| o.or_else(f.clone())).collect())
}
Self::Ok(_) | Self::Retry { .. } | Self::Pending { .. } | Self::Cancelled { .. } => {
self
}
}
}
pub fn inspect<F: FnOnce(&T) + Clone>(self, f: F) -> Self {
match self {
Self::Ok(v) => {
f(&v);
Self::Ok(v)
}
Self::Batch(items) => {
Self::Batch(items.into_iter().map(|o| o.inspect(f.clone())).collect())
}
Self::Err(_) | Self::Retry { .. } | Self::Pending { .. } | Self::Cancelled { .. } => {
self
}
}
}
pub fn inspect_err<F: FnOnce(&OutcomeError) + Clone>(self, f: F) -> Self {
match self {
Self::Err(e) => {
f(&e);
Self::Err(e)
}
Self::Batch(items) => Self::Batch(
items
.into_iter()
.map(|o| o.inspect_err(f.clone()))
.collect(),
),
Self::Ok(v) => Self::Ok(v),
Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
} => Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
},
Self::Pending {
condition,
resume_token,
} => Self::Pending {
condition,
resume_token,
},
Self::Cancelled { reason } => Self::Cancelled { reason },
}
}
pub fn and_then_if<F: Fn(&T) -> bool + Clone, G: FnOnce(T) -> Outcome<T> + Clone>(
self,
pred: F,
f: G,
) -> Outcome<T> {
match self {
Self::Ok(v) => {
if pred(&v) {
f(v)
} else {
Self::Ok(v)
}
}
Self::Batch(items) => Self::Batch(
items
.into_iter()
.map(|o| o.and_then_if(pred.clone(), f.clone()))
.collect(),
),
Self::Err(error) => Self::Err(error),
Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
} => Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
},
Self::Pending {
condition,
resume_token,
} => Self::Pending {
condition,
resume_token,
},
Self::Cancelled { reason } => Self::Cancelled { reason },
}
}
pub fn into_result(self) -> Result<T, OutcomeError> {
match self {
Self::Ok(v) => Ok(v),
Self::Err(error) => Err(error),
Self::Cancelled { reason } => Err(OutcomeError::new(
ErrorKind::Cancelled,
format!("cancelled: {reason}"),
)),
Self::Retry {
after_ms,
attempt,
max_attempts,
reason,
} => Err(OutcomeError::new(
ErrorKind::Timeout,
format!(
"retry after {}ms (attempt {}/{}) - {}",
after_ms, attempt, max_attempts, reason
),
)),
Self::Pending {
condition,
resume_token,
} => Err(OutcomeError::new(
ErrorKind::Pending,
format!(
"pending outcome cannot collapse into Result: {:?} (resume {:032x})",
condition, resume_token
),
)),
Self::Batch(items) => Err(OutcomeError::new(
ErrorKind::BatchCollapse,
format!(
"batch outcome cannot collapse into Result without dropping {} item(s)",
items.len()
),
)),
}
}
pub fn unwrap_or(self, default: T) -> T {
match self {
Self::Ok(v) => v,
Self::Err(_)
| Self::Retry { .. }
| Self::Pending { .. }
| Self::Cancelled { .. }
| Self::Batch(_) => default,
}
}
pub fn unwrap_or_else<F: FnOnce() -> T>(self, f: F) -> T {
match self {
Self::Ok(v) => v,
Self::Err(_)
| Self::Retry { .. }
| Self::Pending { .. }
| Self::Cancelled { .. }
| Self::Batch(_) => f(),
}
}
}
impl<T> Outcome<Outcome<T>> {
pub fn flatten(self) -> Outcome<T> {
self.and_then(|inner| inner)
}
}