use super::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SupervisionDirective {
Stop,
Resume,
Restart,
}
pub type SupervisionDecider = Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>;
pub struct Supervision;
impl Supervision {
#[must_use]
pub fn stopping_decider() -> SupervisionDecider {
Arc::new(|_| SupervisionDirective::Stop)
}
#[must_use]
pub fn resuming_decider() -> SupervisionDecider {
Arc::new(|_| SupervisionDirective::Resume)
}
#[must_use]
pub fn restarting_decider() -> SupervisionDecider {
Arc::new(|_| SupervisionDirective::Restart)
}
}
pub(crate) fn decide_supervision(
decider: &SupervisionDecider,
error: &StreamError,
) -> SupervisionDirective {
catch_unwind(AssertUnwindSafe(|| decider(error))).unwrap_or(SupervisionDirective::Stop)
}
pub(crate) fn panic_stream_error(context: &str) -> StreamError {
StreamError::Failed(format!("{context} panicked"))
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum StreamError {
#[error("stream was cancelled")]
Cancelled,
#[error("materializer was shut down")]
AbruptTermination,
#[error("stage is backpressured")]
Backpressured,
#[error("stream completed without an element")]
EmptyStream,
#[error("maybe source was materialized before completion")]
MaybeIncomplete,
#[error("stream limit of {max} reached")]
LimitExceeded { max: u64 },
#[error("invalid port operation {operation} on {port}: {reason}")]
InvalidPortOperation {
operation: &'static str,
port: String,
reason: String,
},
#[error("graph validation failed: {0}")]
GraphValidation(String),
#[error("fused execution exceeded event limit of {limit}")]
EventLimitExceeded { limit: usize },
#[error("actor ask timed out after {timeout:?}")]
ActorAskTimeout { timeout: Duration },
#[error("actor ask target terminated before the request could be sent")]
ActorTerminated,
#[error("actor ask response channel was dropped")]
ActorAskResponseDropped,
#[error("actor ask failed to send request: {reason}")]
ActorAskSendFailed { reason: String },
#[error("actor ask task failed: {reason}")]
ActorAskTaskFailed { reason: String },
#[error("stage failed: {0}")]
Failed(String),
}
pub type StreamResult<T> = Result<T, StreamError>;