Skip to main content

restate_sdk_shared_core/vm/
errors.rs

1use crate::error::NotificationMetadata;
2use crate::fmt::{display_closed_error, format_do_progress, DiffFormatter};
3use crate::service_protocol::messages::{CommandMessageHeaderDiff, RestateMessage};
4use crate::service_protocol::{ContentTypeError, DecodingError, MessageType, NotificationId};
5use crate::{Error, Version};
6use std::borrow::Cow;
7use std::collections::{HashMap, HashSet};
8use std::fmt;
9// Error codes
10
11#[derive(Copy, Clone, PartialEq, Eq)]
12pub struct InvocationErrorCode(u16);
13
14impl InvocationErrorCode {
15    pub const fn new(code: u16) -> Self {
16        InvocationErrorCode(code)
17    }
18
19    pub const fn code(self) -> u16 {
20        self.0
21    }
22}
23
24impl fmt::Debug for InvocationErrorCode {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        write!(f, "{}", self.0)
27    }
28}
29
30impl fmt::Display for InvocationErrorCode {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        fmt::Debug::fmt(self, f)
33    }
34}
35
36impl From<u16> for InvocationErrorCode {
37    fn from(value: u16) -> Self {
38        InvocationErrorCode(value)
39    }
40}
41
42impl From<u32> for InvocationErrorCode {
43    fn from(value: u32) -> Self {
44        value
45            .try_into()
46            .map(InvocationErrorCode)
47            .unwrap_or(codes::INTERNAL)
48    }
49}
50
51impl From<InvocationErrorCode> for u16 {
52    fn from(value: InvocationErrorCode) -> Self {
53        value.0
54    }
55}
56
57impl From<InvocationErrorCode> for u32 {
58    fn from(value: InvocationErrorCode) -> Self {
59        value.0 as u32
60    }
61}
62
63pub mod codes {
64    use super::InvocationErrorCode;
65
66    pub const BAD_REQUEST: InvocationErrorCode = InvocationErrorCode(400);
67    pub const INTERNAL: InvocationErrorCode = InvocationErrorCode(500);
68    pub const UNSUPPORTED_MEDIA_TYPE: InvocationErrorCode = InvocationErrorCode(415);
69    pub const JOURNAL_MISMATCH: InvocationErrorCode = InvocationErrorCode(570);
70    pub const PROTOCOL_VIOLATION: InvocationErrorCode = InvocationErrorCode(571);
71    pub const AWAITING_TWO_ASYNC_RESULTS: InvocationErrorCode = InvocationErrorCode(572);
72    pub const UNSUPPORTED_FEATURE: InvocationErrorCode = InvocationErrorCode(573);
73    pub const CLOSED: InvocationErrorCode = InvocationErrorCode(598);
74    pub const SUSPENDED: InvocationErrorCode = InvocationErrorCode(599);
75}
76
77// Const errors
78
79impl Error {
80    const fn new_const(code: InvocationErrorCode, message: &'static str) -> Self {
81        Error {
82            code: code.0,
83            message: Cow::Borrowed(message),
84            stacktrace: String::new(),
85            related_command: None,
86            next_retry_delay: None,
87        }
88    }
89}
90
91pub const MISSING_CONTENT_TYPE: Error = Error::new_const(
92    codes::UNSUPPORTED_MEDIA_TYPE,
93    "Missing content type when invoking the service deployment",
94);
95
96pub const UNEXPECTED_INPUT_MESSAGE: Error = Error::new_const(
97    codes::PROTOCOL_VIOLATION,
98    "Expected incoming message to be an entry",
99);
100
101pub const KNOWN_ENTRIES_IS_ZERO: Error =
102    Error::new_const(codes::INTERNAL, "Known entries is zero, expected >= 1");
103
104pub const UNEXPECTED_ENTRY_MESSAGE: Error = Error::new_const(
105    codes::PROTOCOL_VIOLATION,
106    "Expected entry messages only when waiting replay entries",
107);
108
109pub const INPUT_CLOSED_WHILE_WAITING_ENTRIES: Error = Error::new_const(
110    codes::PROTOCOL_VIOLATION,
111    "The input was closed while still waiting to receive all journal to replay",
112);
113
114pub const EMPTY_IDEMPOTENCY_KEY: Error = Error::new_const(
115    codes::INTERNAL,
116    "Trying to execute an idempotent request with an empty idempotency key. The idempotency key must be non-empty.",
117);
118
119pub const SUSPENDED: Error = Error::new_const(codes::SUSPENDED, "Suspended invocation");
120
121// Other errors
122
123#[derive(Debug, Clone, thiserror::Error)]
124#[error("The execution replay ended unexpectedly. Expecting to read '{expected}' from the recorded journal, but the buffered messages were already drained.")]
125pub struct UnavailableEntryError {
126    expected: MessageType,
127}
128
129impl UnavailableEntryError {
130    pub fn new(expected: MessageType) -> Self {
131        Self { expected }
132    }
133}
134
135#[derive(Debug, thiserror::Error)]
136#[error("Unexpected state '{state:?}' when invoking '{event:?}'")]
137pub struct UnexpectedStateError {
138    state: &'static str,
139    event: String,
140}
141
142impl UnexpectedStateError {
143    pub fn new(state: &'static str, event: String) -> Self {
144        Self { state, event }
145    }
146}
147
148#[derive(Debug)]
149pub struct ClosedError {
150    event: String,
151}
152
153impl ClosedError {
154    pub fn new(event: String) -> Self {
155        Self { event }
156    }
157}
158
159impl fmt::Display for ClosedError {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        display_closed_error(f, &self.event)
162    }
163}
164
165impl std::error::Error for ClosedError {}
166
167#[derive(Debug)]
168pub struct CommandTypeMismatchError {
169    actual: MessageType,
170    command_index: i64,
171    expected: MessageType,
172}
173
174impl CommandTypeMismatchError {
175    pub fn new(
176        command_index: i64,
177        actual: MessageType,
178        expected: MessageType,
179    ) -> CommandTypeMismatchError {
180        Self {
181            command_index,
182            actual,
183            expected,
184        }
185    }
186}
187
188impl fmt::Display for CommandTypeMismatchError {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        write!(f,
191               "Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
192This typically happens when some parts of the code are non-deterministic.
193 - The previous execution ran and recorded the following: '{}' (index '{}')
194 - The current execution attempts to perform the following: '{}'",
195               self.expected,
196            self.command_index,
197               self.actual,
198        )
199    }
200}
201
202impl std::error::Error for CommandTypeMismatchError {}
203
204#[derive(Debug)]
205pub struct CommandMismatchError<M> {
206    command_index: i64,
207    actual: M,
208    expected: M,
209}
210
211impl<M> CommandMismatchError<M> {
212    pub fn new(command_index: i64, actual: M, expected: M) -> CommandMismatchError<M> {
213        Self {
214            command_index,
215            actual,
216            expected,
217        }
218    }
219}
220
221impl<M: RestateMessage + CommandMessageHeaderDiff> fmt::Display for CommandMismatchError<M> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        write!(f,
224"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
225This typically happens when some parts of the code are non-deterministic.
226- The mismatch happened while executing '{}' (index '{}')
227- Difference:",
228            M::ty(), self.command_index,
229        )?;
230        self.actual
231            .write_diff(&self.expected, DiffFormatter::new(f, "   "))
232    }
233}
234
235impl<M: RestateMessage + CommandMessageHeaderDiff> std::error::Error for CommandMismatchError<M> {}
236
237#[derive(Debug)]
238pub struct UncompletedDoProgressDuringReplay {
239    notification_ids: Vec<NotificationId>,
240    additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
241}
242
243impl UncompletedDoProgressDuringReplay {
244    pub(crate) fn new(
245        notification_ids: HashSet<NotificationId>,
246        additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
247    ) -> Self {
248        // Order notifications: completions first (by id), then named signals, then unnamed signals (awakeables by id), then built-in signals last
249        let mut ordered_notification_ids = Vec::from_iter(notification_ids);
250        ordered_notification_ids.sort_by(|a, b| match (a, b) {
251            (NotificationId::CompletionId(a_id), NotificationId::CompletionId(b_id)) => {
252                a_id.cmp(b_id)
253            }
254            (NotificationId::CompletionId(_), _) => std::cmp::Ordering::Less,
255            (_, NotificationId::CompletionId(_)) => std::cmp::Ordering::Greater,
256
257            (NotificationId::SignalName(a_name), NotificationId::SignalName(b_name)) => {
258                a_name.cmp(b_name)
259            }
260            (NotificationId::SignalName(_), NotificationId::SignalId(_)) => {
261                std::cmp::Ordering::Less
262            }
263            (NotificationId::SignalId(_), NotificationId::SignalName(_)) => {
264                std::cmp::Ordering::Greater
265            }
266
267            (NotificationId::SignalId(a_id), NotificationId::SignalId(b_id)) => {
268                let a_is_cancel = *a_id == crate::service_protocol::CANCEL_SIGNAL_ID;
269                let b_is_cancel = *b_id == crate::service_protocol::CANCEL_SIGNAL_ID;
270                match (a_is_cancel, b_is_cancel) {
271                    (true, false) => std::cmp::Ordering::Greater,
272                    (false, true) => std::cmp::Ordering::Less,
273                    _ => a_id.cmp(b_id),
274                }
275            }
276        });
277        Self {
278            notification_ids: ordered_notification_ids,
279            additional_known_metadata,
280        }
281    }
282}
283
284impl fmt::Display for UncompletedDoProgressDuringReplay {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        write!(f,
287"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
288'{}' could not be replayed. This usually means the code was mutated adding an 'await' without registering a new service revision.
289Notifications awaited on this {} point:",
290               format_do_progress(),
291               format_do_progress(),
292        )?;
293
294        for notification_id in &self.notification_ids {
295            write!(f, "\n - ")?;
296            if let Some(metadata) = self.additional_known_metadata.get(notification_id) {
297                write!(f, "{}", metadata)?;
298            } else {
299                match notification_id {
300                    NotificationId::CompletionId(completion_id) => {
301                        write!(f, "completion id {}", completion_id)?;
302                    }
303                    NotificationId::SignalId(signal_id) => {
304                        write!(f, "signal [{}]", signal_id)?;
305                    }
306                    NotificationId::SignalName(signal_name) => {
307                        write!(f, "Named signal: {}", signal_name)?;
308                    }
309                }
310            }
311        }
312
313        Ok(())
314    }
315}
316
317impl std::error::Error for UncompletedDoProgressDuringReplay {}
318
319#[derive(Debug, Clone, thiserror::Error)]
320#[error("Cannot convert a eager state key into UTF-8 String: {0:?}")]
321pub struct BadEagerStateKeyError(#[from] pub(crate) std::string::FromUtf8Error);
322
323#[derive(Debug, Clone, thiserror::Error)]
324#[error("Unexpected empty value variant for get eager state")]
325pub struct EmptyGetEagerState;
326
327#[derive(Debug, Clone, thiserror::Error)]
328#[error("Unexpected empty value variant for state keys")]
329pub struct EmptyGetEagerStateKeys;
330
331#[derive(Debug, thiserror::Error)]
332#[error("Feature '{feature}' is not supported by the negotiated protocol version '{current_version}', the minimum required version is '{minimum_required_version}'")]
333pub struct UnsupportedFeatureForNegotiatedVersion {
334    feature: &'static str,
335    current_version: Version,
336    minimum_required_version: Version,
337}
338
339impl UnsupportedFeatureForNegotiatedVersion {
340    pub fn new(
341        feature: &'static str,
342        current_version: Version,
343        minimum_required_version: Version,
344    ) -> Self {
345        Self {
346            feature,
347            current_version,
348            minimum_required_version,
349        }
350    }
351}
352
353// Conversions to VMError
354
355trait WithInvocationErrorCode {
356    fn code(&self) -> InvocationErrorCode;
357}
358
359impl<T: WithInvocationErrorCode + fmt::Display> From<T> for Error {
360    fn from(value: T) -> Self {
361        Error::new(value.code().0, value.to_string())
362    }
363}
364
365macro_rules! impl_error_code {
366    ($error_type:ident, $code:ident) => {
367        impl WithInvocationErrorCode for $error_type {
368            fn code(&self) -> InvocationErrorCode {
369                codes::$code
370            }
371        }
372    };
373}
374
375impl_error_code!(ContentTypeError, UNSUPPORTED_MEDIA_TYPE);
376impl WithInvocationErrorCode for DecodingError {
377    fn code(&self) -> InvocationErrorCode {
378        match self {
379            DecodingError::UnexpectedMessageType { .. } => codes::JOURNAL_MISMATCH,
380            _ => codes::INTERNAL,
381        }
382    }
383}
384impl_error_code!(UnavailableEntryError, PROTOCOL_VIOLATION);
385impl_error_code!(UnexpectedStateError, PROTOCOL_VIOLATION);
386impl_error_code!(ClosedError, CLOSED);
387impl_error_code!(CommandTypeMismatchError, JOURNAL_MISMATCH);
388impl_error_code!(UncompletedDoProgressDuringReplay, JOURNAL_MISMATCH);
389impl<M: RestateMessage + CommandMessageHeaderDiff> WithInvocationErrorCode
390    for CommandMismatchError<M>
391{
392    fn code(&self) -> InvocationErrorCode {
393        codes::JOURNAL_MISMATCH
394    }
395}
396impl_error_code!(BadEagerStateKeyError, INTERNAL);
397impl_error_code!(EmptyGetEagerState, PROTOCOL_VIOLATION);
398impl_error_code!(EmptyGetEagerStateKeys, PROTOCOL_VIOLATION);
399impl_error_code!(UnsupportedFeatureForNegotiatedVersion, UNSUPPORTED_FEATURE);