restate-sdk-shared-core 0.10.0

SDK Shared core
Documentation
use crate::error::NotificationMetadata;
use crate::fmt::{display_closed_error, format_do_progress, DiffFormatter};
use crate::service_protocol::messages::{CommandMessageHeaderDiff, RestateMessage};
use crate::service_protocol::{ContentTypeError, DecodingError, MessageType, NotificationId};
use crate::{Error, Version};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt;
// Error codes

#[derive(Copy, Clone, PartialEq, Eq)]
pub struct InvocationErrorCode(u16);

impl InvocationErrorCode {
    pub const fn new(code: u16) -> Self {
        InvocationErrorCode(code)
    }

    pub const fn code(self) -> u16 {
        self.0
    }
}

impl fmt::Debug for InvocationErrorCode {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl fmt::Display for InvocationErrorCode {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Debug::fmt(self, f)
    }
}

impl From<u16> for InvocationErrorCode {
    fn from(value: u16) -> Self {
        InvocationErrorCode(value)
    }
}

impl From<u32> for InvocationErrorCode {
    fn from(value: u32) -> Self {
        value
            .try_into()
            .map(InvocationErrorCode)
            .unwrap_or(codes::INTERNAL)
    }
}

impl From<InvocationErrorCode> for u16 {
    fn from(value: InvocationErrorCode) -> Self {
        value.0
    }
}

impl From<InvocationErrorCode> for u32 {
    fn from(value: InvocationErrorCode) -> Self {
        value.0 as u32
    }
}

pub mod codes {
    use super::InvocationErrorCode;

    pub const BAD_REQUEST: InvocationErrorCode = InvocationErrorCode(400);
    pub const INTERNAL: InvocationErrorCode = InvocationErrorCode(500);
    pub const UNSUPPORTED_MEDIA_TYPE: InvocationErrorCode = InvocationErrorCode(415);
    pub const JOURNAL_MISMATCH: InvocationErrorCode = InvocationErrorCode(570);
    pub const PROTOCOL_VIOLATION: InvocationErrorCode = InvocationErrorCode(571);
    pub const AWAITING_TWO_ASYNC_RESULTS: InvocationErrorCode = InvocationErrorCode(572);
    pub const UNSUPPORTED_FEATURE: InvocationErrorCode = InvocationErrorCode(573);
    pub const CLOSED: InvocationErrorCode = InvocationErrorCode(598);
    pub const SUSPENDED: InvocationErrorCode = InvocationErrorCode(599);
}

// Const errors

impl Error {
    const fn new_const(code: InvocationErrorCode, message: &'static str) -> Self {
        Error {
            code: code.0,
            message: Cow::Borrowed(message),
            stacktrace: String::new(),
            related_command: None,
            next_retry_delay: None,
        }
    }
}

pub const MISSING_CONTENT_TYPE: Error = Error::new_const(
    codes::UNSUPPORTED_MEDIA_TYPE,
    "Missing content type when invoking the service deployment",
);

pub const UNEXPECTED_INPUT_MESSAGE: Error = Error::new_const(
    codes::PROTOCOL_VIOLATION,
    "Expected incoming message to be an entry",
);

pub const KNOWN_ENTRIES_IS_ZERO: Error =
    Error::new_const(codes::INTERNAL, "Known entries is zero, expected >= 1");

pub const UNEXPECTED_ENTRY_MESSAGE: Error = Error::new_const(
    codes::PROTOCOL_VIOLATION,
    "Expected entry messages only when waiting replay entries",
);

pub const INPUT_CLOSED_WHILE_WAITING_ENTRIES: Error = Error::new_const(
    codes::PROTOCOL_VIOLATION,
    "The input was closed while still waiting to receive all journal to replay",
);

pub const EMPTY_IDEMPOTENCY_KEY: Error = Error::new_const(
    codes::INTERNAL,
    "Trying to execute an idempotent request with an empty idempotency key. The idempotency key must be non-empty.",
);

pub const SUSPENDED: Error = Error::new_const(codes::SUSPENDED, "Suspended invocation");

// Other errors

#[derive(Debug, Clone, thiserror::Error)]
#[error("The execution replay ended unexpectedly. Expecting to read '{expected}' from the recorded journal, but the buffered messages were already drained.")]
pub struct UnavailableEntryError {
    expected: MessageType,
}

impl UnavailableEntryError {
    pub fn new(expected: MessageType) -> Self {
        Self { expected }
    }
}

#[derive(Debug, thiserror::Error)]
#[error("Unexpected state '{state:?}' when invoking '{event:?}'")]
pub struct UnexpectedStateError {
    state: &'static str,
    event: String,
}

impl UnexpectedStateError {
    pub fn new(state: &'static str, event: String) -> Self {
        Self { state, event }
    }
}

#[derive(Debug)]
pub struct ClosedError {
    event: String,
}

impl ClosedError {
    pub fn new(event: String) -> Self {
        Self { event }
    }
}

impl fmt::Display for ClosedError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        display_closed_error(f, &self.event)
    }
}

impl std::error::Error for ClosedError {}

#[derive(Debug)]
pub struct CommandTypeMismatchError {
    actual: MessageType,
    command_index: i64,
    expected: MessageType,
}

impl CommandTypeMismatchError {
    pub fn new(
        command_index: i64,
        actual: MessageType,
        expected: MessageType,
    ) -> CommandTypeMismatchError {
        Self {
            command_index,
            actual,
            expected,
        }
    }
}

impl fmt::Display for CommandTypeMismatchError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f,
               "Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
This typically happens when some parts of the code are non-deterministic.
 - The previous execution ran and recorded the following: '{}' (index '{}')
 - The current execution attempts to perform the following: '{}'",
               self.expected,
            self.command_index,
               self.actual,
        )
    }
}

impl std::error::Error for CommandTypeMismatchError {}

#[derive(Debug)]
pub struct CommandMismatchError<M> {
    command_index: i64,
    actual: M,
    expected: M,
}

impl<M> CommandMismatchError<M> {
    pub fn new(command_index: i64, actual: M, expected: M) -> CommandMismatchError<M> {
        Self {
            command_index,
            actual,
            expected,
        }
    }
}

impl<M: RestateMessage + CommandMessageHeaderDiff> fmt::Display for CommandMismatchError<M> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f,
"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
This typically happens when some parts of the code are non-deterministic.
- The mismatch happened while executing '{}' (index '{}')
- Difference:",
            M::ty(), self.command_index,
        )?;
        self.actual
            .write_diff(&self.expected, DiffFormatter::new(f, "   "))
    }
}

impl<M: RestateMessage + CommandMessageHeaderDiff + std::fmt::Debug> std::error::Error
    for CommandMismatchError<M>
{
}

#[derive(Debug)]
pub struct UncompletedDoProgressDuringReplay {
    notification_ids: Vec<NotificationId>,
    additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
}

impl UncompletedDoProgressDuringReplay {
    pub(crate) fn new(
        notification_ids: HashSet<NotificationId>,
        additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
    ) -> Self {
        // Order notifications: completions first (by id), then named signals, then unnamed signals (awakeables by id), then built-in signals last
        let mut ordered_notification_ids = Vec::from_iter(notification_ids);
        ordered_notification_ids.sort_by(|a, b| match (a, b) {
            (NotificationId::CompletionId(a_id), NotificationId::CompletionId(b_id)) => {
                a_id.cmp(b_id)
            }
            (NotificationId::CompletionId(_), _) => std::cmp::Ordering::Less,
            (_, NotificationId::CompletionId(_)) => std::cmp::Ordering::Greater,

            (NotificationId::SignalName(a_name), NotificationId::SignalName(b_name)) => {
                a_name.cmp(b_name)
            }
            (NotificationId::SignalName(_), NotificationId::SignalId(_)) => {
                std::cmp::Ordering::Less
            }
            (NotificationId::SignalId(_), NotificationId::SignalName(_)) => {
                std::cmp::Ordering::Greater
            }

            (NotificationId::SignalId(a_id), NotificationId::SignalId(b_id)) => {
                let a_is_cancel = *a_id == crate::service_protocol::CANCEL_SIGNAL_ID;
                let b_is_cancel = *b_id == crate::service_protocol::CANCEL_SIGNAL_ID;
                match (a_is_cancel, b_is_cancel) {
                    (true, false) => std::cmp::Ordering::Greater,
                    (false, true) => std::cmp::Ordering::Less,
                    _ => a_id.cmp(b_id),
                }
            }
        });
        Self {
            notification_ids: ordered_notification_ids,
            additional_known_metadata,
        }
    }
}

impl fmt::Display for UncompletedDoProgressDuringReplay {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f,
"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
'{}' could not be replayed. This usually means the code was mutated adding an 'await' without registering a new service revision.
Notifications awaited on this {} point:",
               format_do_progress(),
               format_do_progress(),
        )?;

        for notification_id in &self.notification_ids {
            write!(f, "\n - ")?;
            if let Some(metadata) = self.additional_known_metadata.get(notification_id) {
                write!(f, "{}", metadata)?;
            } else {
                match notification_id {
                    NotificationId::CompletionId(completion_id) => {
                        write!(f, "completion id {}", completion_id)?;
                    }
                    NotificationId::SignalId(signal_id) => {
                        write!(f, "signal [{}]", signal_id)?;
                    }
                    NotificationId::SignalName(signal_name) => {
                        write!(f, "Named signal: {}", signal_name)?;
                    }
                }
            }
        }

        Ok(())
    }
}

impl std::error::Error for UncompletedDoProgressDuringReplay {}

#[derive(Debug, Clone, thiserror::Error)]
#[error("Cannot convert a eager state key into UTF-8 String: {0:?}")]
pub struct BadEagerStateKeyError(#[from] pub(crate) std::string::FromUtf8Error);

#[derive(Debug, Clone, thiserror::Error)]
#[error("Unexpected empty value variant for get eager state")]
pub struct EmptyGetEagerState;

#[derive(Debug, Clone, thiserror::Error)]
#[error("Unexpected empty value variant for state keys")]
pub struct EmptyGetEagerStateKeys;

#[derive(Debug, thiserror::Error)]
#[error("Feature '{feature}' is not supported by the negotiated protocol version '{current_version}', the minimum required version is '{minimum_required_version}'")]
pub struct UnsupportedFeatureForNegotiatedVersion {
    feature: &'static str,
    current_version: Version,
    minimum_required_version: Version,
}

impl UnsupportedFeatureForNegotiatedVersion {
    pub fn new(
        feature: &'static str,
        current_version: Version,
        minimum_required_version: Version,
    ) -> Self {
        Self {
            feature,
            current_version,
            minimum_required_version,
        }
    }
}

// Conversions to VMError

trait WithInvocationErrorCode {
    fn code(&self) -> InvocationErrorCode;
}

impl<T: WithInvocationErrorCode + fmt::Display> From<T> for Error {
    fn from(value: T) -> Self {
        Error::new(value.code().0, value.to_string())
    }
}

macro_rules! impl_error_code {
    ($error_type:ident, $code:ident) => {
        impl WithInvocationErrorCode for $error_type {
            fn code(&self) -> InvocationErrorCode {
                codes::$code
            }
        }
    };
}

impl_error_code!(ContentTypeError, UNSUPPORTED_MEDIA_TYPE);
impl WithInvocationErrorCode for DecodingError {
    fn code(&self) -> InvocationErrorCode {
        match self {
            DecodingError::UnexpectedMessageType { .. } => codes::JOURNAL_MISMATCH,
            _ => codes::INTERNAL,
        }
    }
}
impl_error_code!(UnavailableEntryError, PROTOCOL_VIOLATION);
impl_error_code!(UnexpectedStateError, PROTOCOL_VIOLATION);
impl_error_code!(ClosedError, CLOSED);
impl_error_code!(CommandTypeMismatchError, JOURNAL_MISMATCH);
impl_error_code!(UncompletedDoProgressDuringReplay, JOURNAL_MISMATCH);
impl<M: RestateMessage + CommandMessageHeaderDiff> WithInvocationErrorCode
    for CommandMismatchError<M>
{
    fn code(&self) -> InvocationErrorCode {
        codes::JOURNAL_MISMATCH
    }
}
impl_error_code!(BadEagerStateKeyError, INTERNAL);
impl_error_code!(EmptyGetEagerState, PROTOCOL_VIOLATION);
impl_error_code!(EmptyGetEagerStateKeys, PROTOCOL_VIOLATION);
impl_error_code!(UnsupportedFeatureForNegotiatedVersion, UNSUPPORTED_FEATURE);