1use super::*;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SupervisionDirective {
17 Stop,
18 Resume,
19 Restart,
20}
21
22pub type SupervisionDecider = Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>;
27
28pub struct Supervision;
30
31impl Supervision {
32 #[must_use]
33 pub fn stopping_decider() -> SupervisionDecider {
34 Arc::new(|_| SupervisionDirective::Stop)
35 }
36
37 #[must_use]
38 pub fn resuming_decider() -> SupervisionDecider {
39 Arc::new(|_| SupervisionDirective::Resume)
40 }
41
42 #[must_use]
43 pub fn restarting_decider() -> SupervisionDecider {
44 Arc::new(|_| SupervisionDirective::Restart)
45 }
46}
47
48pub(crate) fn decide_supervision(
49 decider: &SupervisionDecider,
50 error: &StreamError,
51) -> SupervisionDirective {
52 catch_unwind(AssertUnwindSafe(|| decider(error))).unwrap_or(SupervisionDirective::Stop)
53}
54
55pub(crate) fn panic_stream_error(context: &str) -> StreamError {
56 StreamError::Failed(format!("{context} panicked"))
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Error)]
60pub enum StreamError {
61 #[error("stream was cancelled")]
62 Cancelled,
63 #[error("materializer was shut down")]
64 AbruptTermination,
65 #[error("stage is backpressured")]
66 Backpressured,
67 #[error("stream completed without an element")]
68 EmptyStream,
69 #[error("maybe source was materialized before completion")]
70 MaybeIncomplete,
71 #[error("stream limit of {max} reached")]
72 LimitExceeded { max: u64 },
73 #[error("invalid port operation {operation} on {port}: {reason}")]
74 InvalidPortOperation {
75 operation: &'static str,
76 port: String,
77 reason: String,
78 },
79 #[error("graph validation failed: {0}")]
80 GraphValidation(String),
81 #[error("fused execution exceeded event limit of {limit}")]
82 EventLimitExceeded { limit: usize },
83 #[error("actor ask timed out after {timeout:?}")]
84 ActorAskTimeout { timeout: Duration },
85 #[error("actor ask target terminated before the request could be sent")]
86 ActorTerminated,
87 #[error("actor ask response channel was dropped")]
88 ActorAskResponseDropped,
89 #[error("actor ask failed to send request: {reason}")]
90 ActorAskSendFailed { reason: String },
91 #[error("actor ask task failed: {reason}")]
92 ActorAskTaskFailed { reason: String },
93 #[error("stage failed: {0}")]
94 Failed(String),
95}
96
97pub type StreamResult<T> = Result<T, StreamError>;