Skip to main content

datum/stream/
error.rs

1//! Stream error model: `StreamError` (the failure enum), `StreamResult<T>`, and
2//! Akka-style supervision (`Supervision`, `SupervisionDecider`,
3//! `SupervisionDirective`). Failures are values here, never panics across the API
4//! boundary; a panicking decider or user closure is treated as
5//! `SupervisionDirective::Stop`.
6
7use super::*;
8
9/// Akka-style supervision directive for a failed user callback.
10///
11/// `Resume` drops the failing element and continues. `Restart` also drops the
12/// failing element, but stateful operators reset their accumulated state before
13/// continuing. `Stop` preserves Datum's default behavior: the stream fails with
14/// the original [`StreamError`].
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SupervisionDirective {
17    Stop,
18    Resume,
19    Restart,
20}
21
22/// Shared supervision decider used by supervised operators.
23///
24/// The decider is user code and is always invoked outside Datum's internal
25/// locks. If it panics, the supervised stage treats that as [`SupervisionDirective::Stop`].
26pub type SupervisionDecider = Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>;
27
28/// Convenience constructors matching Akka's built-in deciders.
29pub struct Supervision;
30
31impl Supervision {
32    #[must_use]
33    pub fn stopping_decider() -> SupervisionDecider {
34        Arc::new(|_| SupervisionDirective::Stop)
35    }
36
37    #[must_use]
38    pub fn resuming_decider() -> SupervisionDecider {
39        Arc::new(|_| SupervisionDirective::Resume)
40    }
41
42    #[must_use]
43    pub fn restarting_decider() -> SupervisionDecider {
44        Arc::new(|_| SupervisionDirective::Restart)
45    }
46}
47
48pub(crate) fn decide_supervision(
49    decider: &SupervisionDecider,
50    error: &StreamError,
51) -> SupervisionDirective {
52    catch_unwind(AssertUnwindSafe(|| decider(error))).unwrap_or(SupervisionDirective::Stop)
53}
54
55pub(crate) fn panic_stream_error(context: &str) -> StreamError {
56    StreamError::Failed(format!("{context} panicked"))
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Error)]
60pub enum StreamError {
61    #[error("stream was cancelled")]
62    Cancelled,
63    #[error("materializer was shut down")]
64    AbruptTermination,
65    #[error("stage is backpressured")]
66    Backpressured,
67    #[error("stream completed without an element")]
68    EmptyStream,
69    #[error("maybe source was materialized before completion")]
70    MaybeIncomplete,
71    #[error("stream limit of {max} reached")]
72    LimitExceeded { max: u64 },
73    #[error("invalid port operation {operation} on {port}: {reason}")]
74    InvalidPortOperation {
75        operation: &'static str,
76        port: String,
77        reason: String,
78    },
79    #[error("graph validation failed: {0}")]
80    GraphValidation(String),
81    #[error("fused execution exceeded event limit of {limit}")]
82    EventLimitExceeded { limit: usize },
83    #[error("actor ask timed out after {timeout:?}")]
84    ActorAskTimeout { timeout: Duration },
85    #[error("actor ask target terminated before the request could be sent")]
86    ActorTerminated,
87    #[error("actor ask response channel was dropped")]
88    ActorAskResponseDropped,
89    #[error("actor ask failed to send request: {reason}")]
90    ActorAskSendFailed { reason: String },
91    #[error("actor ask task failed: {reason}")]
92    ActorAskTaskFailed { reason: String },
93    #[error("stage failed: {0}")]
94    Failed(String),
95}
96
97pub type StreamResult<T> = Result<T, StreamError>;