pub enum OperationError {
Transient {
err: Error,
retry_strategy: RetryStrategy,
fatal_strategy: FatalStrategy,
},
Fatal {
err: Error,
strategy: FatalStrategy,
},
}
Expand description
Operation error types.
Given the distributed nature of the Paladin system, it is important to be able to specify how the system should respond to various flavors of errors. This type provides the error types and strategies for handling errors.
Paladin recognizes two types of errors: transient and fatal. Transient errors are those that are expected to be resolved by retrying the operation. For example, a transient error might be a network timeout or a database deadlock. Fatal errors are those that are not expected to be resolved by retrying the operation. For example, a fatal error might be a malformed request or a database connection error.
§Example
§An Operation
failing with a fatal error:
use paladin::{
RemoteExecute,
runtime::Runtime,
operation::{Operation, Result, FatalError, FatalStrategy},
directive::{Directive, IndexedStream},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, RemoteExecute)]
struct WillFail;
impl Operation for WillFail {
type Input = i64;
type Output = i64;
fn execute(&self, _: Self::Input) -> Result<Self::Output> {
FatalError::from_str(
"This operation will always fail.",
FatalStrategy::default()
)
.into()
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let computation = IndexedStream::from([1, 2, 3]).map(&WillFail);
let result = computation
.run(&runtime).await?
.into_values_sorted().await
.map(|values| values.into_iter().collect::<Vec<_>>());
assert_eq!(result.unwrap_err().to_string(), "Fatal operation error: This operation will always fail.");
}
§A Monoid
failing with a fatal error:
use paladin::{
RemoteExecute,
runtime::Runtime,
operation::{Operation, Monoid, Result, FatalError, FatalStrategy},
directive::{Directive, IndexedStream},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, RemoteExecute)]
struct WillFail;
impl Monoid for WillFail {
type Elem = i64;
fn empty(&self) -> Self::Elem {
0
}
fn combine(&self, _: Self::Elem, _: Self::Elem) -> Result<Self::Elem> {
FatalError::from_str(
"This operation will always fail.",
FatalStrategy::default()
)
.into()
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let computation = IndexedStream::from([1, 2, 3]).fold(&WillFail);
let result = computation.run(&runtime).await;
assert_eq!(result.unwrap_err().to_string(), "Fatal operation error: This operation will always fail.");
}
§An IndexedStream
containing errors:
use paladin::{
RemoteExecute,
runtime::Runtime,
operation::{Operation, Monoid, Result},
directive::{Directive, IndexedStream, indexed_stream::try_from_into_iterator},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, RemoteExecute)]
struct Multiply;
impl Monoid for Multiply {
type Elem = i64;
fn empty(&self) -> Self::Elem {
0
}
fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem> {
Ok(a * b)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let computation = try_from_into_iterator([
Ok(1), Err(anyhow::anyhow!("Failure")), Ok(3)
])
.fold(&Multiply);
let result = computation.run(&runtime).await;
assert_eq!(result.unwrap_err().to_string(), "Failure");
}
§A TransientError
recovering after retry:
use paladin::{
RemoteExecute,
runtime::Runtime,
operation::{
Operation,
Monoid,
Result,
OperationError,
TransientError,
RetryStrategy,
FatalStrategy
},
directive::{Directive, IndexedStream},
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, atomic::{Ordering, AtomicBool}};
#[derive(Serialize, Deserialize, Default, RemoteExecute)]
struct Multiply;
static DID_TRY: AtomicBool = AtomicBool::new(false);
impl Monoid for Multiply {
type Elem = i64;
fn empty(&self) -> Self::Elem {
0
}
fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem> {
if DID_TRY.swap(true, Ordering::SeqCst) {
return Ok(a * b);
}
TransientError::from_str(
"will retry",
RetryStrategy::default(),
FatalStrategy::default()
)
.into()
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let computation = IndexedStream::from([1, 2, 3]).fold(&Multiply);
let result = computation.run(&runtime).await?;
assert_eq!(result, 6);
}
§A TransientError
timing out after exhausting retries:
use std::{num::NonZeroU32, sync::{Arc, atomic::{Ordering, AtomicU32}}};
use paladin::{
RemoteExecute,
runtime::Runtime,
operation::{
Operation,
Monoid,
Result,
OperationError,
TransientError,
RetryStrategy,
FatalStrategy
},
directive::{Directive, IndexedStream},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Default, RemoteExecute)]
struct Multiply;
static NUM_TRIES: AtomicU32 = AtomicU32::new(0);
const MAX_TRIES: u32 = 3;
impl Monoid for Multiply {
type Elem = i64;
fn empty(&self) -> Self::Elem {
0
}
fn combine(&self, a: Self::Elem, b: Self::Elem) -> Result<Self::Elem> {
let prev = NUM_TRIES.fetch_add(1, Ordering::SeqCst);
TransientError::from_str(
&format!("tried {}/{}", prev, MAX_TRIES + 1),
RetryStrategy::Immediate {
max_retries: NonZeroU32::new(MAX_TRIES).unwrap()
},
FatalStrategy::default()
)
.into()
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let computation = IndexedStream::from([1, 2, 3]).fold(&Multiply);
let result = computation.run(&runtime).await;
assert_eq!(NUM_TRIES.load(Ordering::SeqCst), MAX_TRIES + 2);
}
Variants§
Transient
An error that is expected to be resolved by retrying the operation.
Fields
retry_strategy: RetryStrategy
The retry strategy.
fatal_strategy: FatalStrategy
The strategy to employ once the maximum number of retries is exceeded.
Fatal
An error that is not expected to be resolved by retrying the operation.
Implementations§
Source§impl OperationError
impl OperationError
Sourcepub async fn retry<O, Fut, F>(self, f: F) -> Result<O>
pub async fn retry<O, Fut, F>(self, f: F) -> Result<O>
Retry the operation according to the strategy.
If the error is not a transient error, it is returned unchanged. If the error is a transient error, it is retried according to the provided strategy, and if the retry policy is exhausted, the error is converted into a fatal error.
Sourcepub async fn retry_trace<O, Fut, F, T>(self, f: F, tracer: T) -> Result<O>
pub async fn retry_trace<O, Fut, F, T>(self, f: F, tracer: T) -> Result<O>
Retry the operation according to the strategy and the provided tracer.
If the error is not a transient error, it is returned unchanged. If the error is a transient error, it is retried according to the provided strategy, and if the retry policy is exhausted, the error is converted into a fatal error.
Sourcepub fn into_fatal(self) -> Self
pub fn into_fatal(self) -> Self
Convert an error into a fatal error.
If the error is already a fatal error, it is returned unchanged.