1use super::*;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SupervisionDirective {
11 Stop,
12 Resume,
13 Restart,
14}
15
16pub type SupervisionDecider = Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>;
21
22pub struct Supervision;
24
25impl Supervision {
26 #[must_use]
27 pub fn stopping_decider() -> SupervisionDecider {
28 Arc::new(|_| SupervisionDirective::Stop)
29 }
30
31 #[must_use]
32 pub fn resuming_decider() -> SupervisionDecider {
33 Arc::new(|_| SupervisionDirective::Resume)
34 }
35
36 #[must_use]
37 pub fn restarting_decider() -> SupervisionDecider {
38 Arc::new(|_| SupervisionDirective::Restart)
39 }
40}
41
42pub(crate) fn decide_supervision(
43 decider: &SupervisionDecider,
44 error: &StreamError,
45) -> SupervisionDirective {
46 catch_unwind(AssertUnwindSafe(|| decider(error))).unwrap_or(SupervisionDirective::Stop)
47}
48
49pub(crate) fn panic_stream_error(context: &str) -> StreamError {
50 StreamError::Failed(format!("{context} panicked"))
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Error)]
54pub enum StreamError {
55 #[error("stream was cancelled")]
56 Cancelled,
57 #[error("materializer was shut down")]
58 AbruptTermination,
59 #[error("stage is backpressured")]
60 Backpressured,
61 #[error("stream completed without an element")]
62 EmptyStream,
63 #[error("maybe source was materialized before completion")]
64 MaybeIncomplete,
65 #[error("stream limit of {max} reached")]
66 LimitExceeded { max: u64 },
67 #[error("invalid port operation {operation} on {port}: {reason}")]
68 InvalidPortOperation {
69 operation: &'static str,
70 port: String,
71 reason: String,
72 },
73 #[error("graph validation failed: {0}")]
74 GraphValidation(String),
75 #[error("fused execution exceeded event limit of {limit}")]
76 EventLimitExceeded { limit: usize },
77 #[error("actor ask timed out after {timeout:?}")]
78 ActorAskTimeout { timeout: Duration },
79 #[error("actor ask target terminated before the request could be sent")]
80 ActorTerminated,
81 #[error("actor ask response channel was dropped")]
82 ActorAskResponseDropped,
83 #[error("actor ask failed to send request: {reason}")]
84 ActorAskSendFailed { reason: String },
85 #[error("actor ask task failed: {reason}")]
86 ActorAskTaskFailed { reason: String },
87 #[error("stage failed: {0}")]
88 Failed(String),
89}
90
91pub type StreamResult<T> = Result<T, StreamError>;