Skip to main content

datum/stream/
error.rs

1use super::*;
2
3/// Akka-style supervision directive for a failed user callback.
4///
5/// `Resume` drops the failing element and continues. `Restart` also drops the
6/// failing element, but stateful operators reset their accumulated state before
7/// continuing. `Stop` preserves Datum's default behavior: the stream fails with
8/// the original [`StreamError`].
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SupervisionDirective {
11    Stop,
12    Resume,
13    Restart,
14}
15
16/// Shared supervision decider used by supervised operators.
17///
18/// The decider is user code and is always invoked outside Datum's internal
19/// locks. If it panics, the supervised stage treats that as [`SupervisionDirective::Stop`].
20pub type SupervisionDecider = Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>;
21
22/// Convenience constructors matching Akka's built-in deciders.
23pub struct Supervision;
24
25impl Supervision {
26    #[must_use]
27    pub fn stopping_decider() -> SupervisionDecider {
28        Arc::new(|_| SupervisionDirective::Stop)
29    }
30
31    #[must_use]
32    pub fn resuming_decider() -> SupervisionDecider {
33        Arc::new(|_| SupervisionDirective::Resume)
34    }
35
36    #[must_use]
37    pub fn restarting_decider() -> SupervisionDecider {
38        Arc::new(|_| SupervisionDirective::Restart)
39    }
40}
41
42pub(crate) fn decide_supervision(
43    decider: &SupervisionDecider,
44    error: &StreamError,
45) -> SupervisionDirective {
46    catch_unwind(AssertUnwindSafe(|| decider(error))).unwrap_or(SupervisionDirective::Stop)
47}
48
49pub(crate) fn panic_stream_error(context: &str) -> StreamError {
50    StreamError::Failed(format!("{context} panicked"))
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Error)]
54pub enum StreamError {
55    #[error("stream was cancelled")]
56    Cancelled,
57    #[error("materializer was shut down")]
58    AbruptTermination,
59    #[error("stage is backpressured")]
60    Backpressured,
61    #[error("stream completed without an element")]
62    EmptyStream,
63    #[error("maybe source was materialized before completion")]
64    MaybeIncomplete,
65    #[error("stream limit of {max} reached")]
66    LimitExceeded { max: u64 },
67    #[error("invalid port operation {operation} on {port}: {reason}")]
68    InvalidPortOperation {
69        operation: &'static str,
70        port: String,
71        reason: String,
72    },
73    #[error("graph validation failed: {0}")]
74    GraphValidation(String),
75    #[error("fused execution exceeded event limit of {limit}")]
76    EventLimitExceeded { limit: usize },
77    #[error("actor ask timed out after {timeout:?}")]
78    ActorAskTimeout { timeout: Duration },
79    #[error("actor ask target terminated before the request could be sent")]
80    ActorTerminated,
81    #[error("actor ask response channel was dropped")]
82    ActorAskResponseDropped,
83    #[error("actor ask failed to send request: {reason}")]
84    ActorAskSendFailed { reason: String },
85    #[error("actor ask task failed: {reason}")]
86    ActorAskTaskFailed { reason: String },
87    #[error("stage failed: {0}")]
88    Failed(String),
89}
90
91pub type StreamResult<T> = Result<T, StreamError>;