datum-core 0.3.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use super::*;

/// Akka-style supervision directive for a failed user callback.
///
/// `Resume` drops the failing element and continues. `Restart` also drops the
/// failing element, but stateful operators reset their accumulated state before
/// continuing. `Stop` preserves Datum's default behavior: the stream fails with
/// the original [`StreamError`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SupervisionDirective {
    Stop,
    Resume,
    Restart,
}

/// Shared supervision decider used by supervised operators.
///
/// The decider is user code and is always invoked outside Datum's internal
/// locks. If it panics, the supervised stage treats that as [`SupervisionDirective::Stop`].
pub type SupervisionDecider = Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>;

/// Convenience constructors matching Akka's built-in deciders.
pub struct Supervision;

impl Supervision {
    #[must_use]
    pub fn stopping_decider() -> SupervisionDecider {
        Arc::new(|_| SupervisionDirective::Stop)
    }

    #[must_use]
    pub fn resuming_decider() -> SupervisionDecider {
        Arc::new(|_| SupervisionDirective::Resume)
    }

    #[must_use]
    pub fn restarting_decider() -> SupervisionDecider {
        Arc::new(|_| SupervisionDirective::Restart)
    }
}

pub(crate) fn decide_supervision(
    decider: &SupervisionDecider,
    error: &StreamError,
) -> SupervisionDirective {
    catch_unwind(AssertUnwindSafe(|| decider(error))).unwrap_or(SupervisionDirective::Stop)
}

pub(crate) fn panic_stream_error(context: &str) -> StreamError {
    StreamError::Failed(format!("{context} panicked"))
}

#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum StreamError {
    #[error("stream was cancelled")]
    Cancelled,
    #[error("materializer was shut down")]
    AbruptTermination,
    #[error("stage is backpressured")]
    Backpressured,
    #[error("stream completed without an element")]
    EmptyStream,
    #[error("maybe source was materialized before completion")]
    MaybeIncomplete,
    #[error("stream limit of {max} reached")]
    LimitExceeded { max: u64 },
    #[error("invalid port operation {operation} on {port}: {reason}")]
    InvalidPortOperation {
        operation: &'static str,
        port: String,
        reason: String,
    },
    #[error("graph validation failed: {0}")]
    GraphValidation(String),
    #[error("fused execution exceeded event limit of {limit}")]
    EventLimitExceeded { limit: usize },
    #[error("actor ask timed out after {timeout:?}")]
    ActorAskTimeout { timeout: Duration },
    #[error("actor ask target terminated before the request could be sent")]
    ActorTerminated,
    #[error("actor ask response channel was dropped")]
    ActorAskResponseDropped,
    #[error("actor ask failed to send request: {reason}")]
    ActorAskSendFailed { reason: String },
    #[error("actor ask task failed: {reason}")]
    ActorAskTaskFailed { reason: String },
    #[error("stage failed: {0}")]
    Failed(String),
}

pub type StreamResult<T> = Result<T, StreamError>;