restate_sdk_shared_core/vm/
errors.rs1use crate::error::NotificationMetadata;
2use crate::fmt::{display_closed_error, format_do_progress, DiffFormatter};
3use crate::service_protocol::messages::{CommandMessageHeaderDiff, RestateMessage};
4use crate::service_protocol::{ContentTypeError, DecodingError, MessageType, NotificationId};
5use crate::{Error, Version};
6use std::borrow::Cow;
7use std::collections::{HashMap, HashSet};
8use std::fmt;
9#[derive(Copy, Clone, PartialEq, Eq)]
12pub struct InvocationErrorCode(u16);
13
14impl InvocationErrorCode {
15 pub const fn new(code: u16) -> Self {
16 InvocationErrorCode(code)
17 }
18
19 pub const fn code(self) -> u16 {
20 self.0
21 }
22}
23
24impl fmt::Debug for InvocationErrorCode {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 write!(f, "{}", self.0)
27 }
28}
29
30impl fmt::Display for InvocationErrorCode {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 fmt::Debug::fmt(self, f)
33 }
34}
35
36impl From<u16> for InvocationErrorCode {
37 fn from(value: u16) -> Self {
38 InvocationErrorCode(value)
39 }
40}
41
42impl From<u32> for InvocationErrorCode {
43 fn from(value: u32) -> Self {
44 value
45 .try_into()
46 .map(InvocationErrorCode)
47 .unwrap_or(codes::INTERNAL)
48 }
49}
50
51impl From<InvocationErrorCode> for u16 {
52 fn from(value: InvocationErrorCode) -> Self {
53 value.0
54 }
55}
56
57impl From<InvocationErrorCode> for u32 {
58 fn from(value: InvocationErrorCode) -> Self {
59 value.0 as u32
60 }
61}
62
63pub mod codes {
64 use super::InvocationErrorCode;
65
66 pub const BAD_REQUEST: InvocationErrorCode = InvocationErrorCode(400);
67 pub const INTERNAL: InvocationErrorCode = InvocationErrorCode(500);
68 pub const UNSUPPORTED_MEDIA_TYPE: InvocationErrorCode = InvocationErrorCode(415);
69 pub const JOURNAL_MISMATCH: InvocationErrorCode = InvocationErrorCode(570);
70 pub const PROTOCOL_VIOLATION: InvocationErrorCode = InvocationErrorCode(571);
71 pub const AWAITING_TWO_ASYNC_RESULTS: InvocationErrorCode = InvocationErrorCode(572);
72 pub const UNSUPPORTED_FEATURE: InvocationErrorCode = InvocationErrorCode(573);
73 pub const CLOSED: InvocationErrorCode = InvocationErrorCode(598);
74 pub const SUSPENDED: InvocationErrorCode = InvocationErrorCode(599);
75}
76
77impl Error {
80 const fn new_const(code: InvocationErrorCode, message: &'static str) -> Self {
81 Error {
82 code: code.0,
83 message: Cow::Borrowed(message),
84 stacktrace: String::new(),
85 related_command: None,
86 next_retry_delay: None,
87 }
88 }
89}
90
91pub const MISSING_CONTENT_TYPE: Error = Error::new_const(
92 codes::UNSUPPORTED_MEDIA_TYPE,
93 "Missing content type when invoking the service deployment",
94);
95
96pub const UNEXPECTED_INPUT_MESSAGE: Error = Error::new_const(
97 codes::PROTOCOL_VIOLATION,
98 "Expected incoming message to be an entry",
99);
100
101pub const KNOWN_ENTRIES_IS_ZERO: Error =
102 Error::new_const(codes::INTERNAL, "Known entries is zero, expected >= 1");
103
104pub const UNEXPECTED_ENTRY_MESSAGE: Error = Error::new_const(
105 codes::PROTOCOL_VIOLATION,
106 "Expected entry messages only when waiting replay entries",
107);
108
109pub const INPUT_CLOSED_WHILE_WAITING_ENTRIES: Error = Error::new_const(
110 codes::PROTOCOL_VIOLATION,
111 "The input was closed while still waiting to receive all journal to replay",
112);
113
114pub const EMPTY_IDEMPOTENCY_KEY: Error = Error::new_const(
115 codes::INTERNAL,
116 "Trying to execute an idempotent request with an empty idempotency key. The idempotency key must be non-empty.",
117);
118
119pub const SUSPENDED: Error = Error::new_const(codes::SUSPENDED, "Suspended invocation");
120
121#[derive(Debug, Clone, thiserror::Error)]
124#[error("The execution replay ended unexpectedly. Expecting to read '{expected}' from the recorded journal, but the buffered messages were already drained.")]
125pub struct UnavailableEntryError {
126 expected: MessageType,
127}
128
129impl UnavailableEntryError {
130 pub fn new(expected: MessageType) -> Self {
131 Self { expected }
132 }
133}
134
135#[derive(Debug, thiserror::Error)]
136#[error("Unexpected state '{state:?}' when invoking '{event:?}'")]
137pub struct UnexpectedStateError {
138 state: &'static str,
139 event: String,
140}
141
142impl UnexpectedStateError {
143 pub fn new(state: &'static str, event: String) -> Self {
144 Self { state, event }
145 }
146}
147
148#[derive(Debug)]
149pub struct ClosedError {
150 event: String,
151}
152
153impl ClosedError {
154 pub fn new(event: String) -> Self {
155 Self { event }
156 }
157}
158
159impl fmt::Display for ClosedError {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 display_closed_error(f, &self.event)
162 }
163}
164
165impl std::error::Error for ClosedError {}
166
167#[derive(Debug)]
168pub struct CommandTypeMismatchError {
169 actual: MessageType,
170 command_index: i64,
171 expected: MessageType,
172}
173
174impl CommandTypeMismatchError {
175 pub fn new(
176 command_index: i64,
177 actual: MessageType,
178 expected: MessageType,
179 ) -> CommandTypeMismatchError {
180 Self {
181 command_index,
182 actual,
183 expected,
184 }
185 }
186}
187
188impl fmt::Display for CommandTypeMismatchError {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 write!(f,
191 "Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
192This typically happens when some parts of the code are non-deterministic.
193 - The previous execution ran and recorded the following: '{}' (index '{}')
194 - The current execution attempts to perform the following: '{}'",
195 self.expected,
196 self.command_index,
197 self.actual,
198 )
199 }
200}
201
202impl std::error::Error for CommandTypeMismatchError {}
203
204#[derive(Debug)]
205pub struct CommandMismatchError<M> {
206 command_index: i64,
207 actual: M,
208 expected: M,
209}
210
211impl<M> CommandMismatchError<M> {
212 pub fn new(command_index: i64, actual: M, expected: M) -> CommandMismatchError<M> {
213 Self {
214 command_index,
215 actual,
216 expected,
217 }
218 }
219}
220
221impl<M: RestateMessage + CommandMessageHeaderDiff> fmt::Display for CommandMismatchError<M> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 write!(f,
224"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
225This typically happens when some parts of the code are non-deterministic.
226- The mismatch happened while executing '{}' (index '{}')
227- Difference:",
228 M::ty(), self.command_index,
229 )?;
230 self.actual
231 .write_diff(&self.expected, DiffFormatter::new(f, " "))
232 }
233}
234
235impl<M: RestateMessage + CommandMessageHeaderDiff> std::error::Error for CommandMismatchError<M> {}
236
237#[derive(Debug)]
238pub struct UncompletedDoProgressDuringReplay {
239 notification_ids: Vec<NotificationId>,
240 additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
241}
242
243impl UncompletedDoProgressDuringReplay {
244 pub(crate) fn new(
245 notification_ids: HashSet<NotificationId>,
246 additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
247 ) -> Self {
248 let mut ordered_notification_ids = Vec::from_iter(notification_ids);
250 ordered_notification_ids.sort_by(|a, b| match (a, b) {
251 (NotificationId::CompletionId(a_id), NotificationId::CompletionId(b_id)) => {
252 a_id.cmp(b_id)
253 }
254 (NotificationId::CompletionId(_), _) => std::cmp::Ordering::Less,
255 (_, NotificationId::CompletionId(_)) => std::cmp::Ordering::Greater,
256
257 (NotificationId::SignalName(a_name), NotificationId::SignalName(b_name)) => {
258 a_name.cmp(b_name)
259 }
260 (NotificationId::SignalName(_), NotificationId::SignalId(_)) => {
261 std::cmp::Ordering::Less
262 }
263 (NotificationId::SignalId(_), NotificationId::SignalName(_)) => {
264 std::cmp::Ordering::Greater
265 }
266
267 (NotificationId::SignalId(a_id), NotificationId::SignalId(b_id)) => {
268 let a_is_cancel = *a_id == crate::service_protocol::CANCEL_SIGNAL_ID;
269 let b_is_cancel = *b_id == crate::service_protocol::CANCEL_SIGNAL_ID;
270 match (a_is_cancel, b_is_cancel) {
271 (true, false) => std::cmp::Ordering::Greater,
272 (false, true) => std::cmp::Ordering::Less,
273 _ => a_id.cmp(b_id),
274 }
275 }
276 });
277 Self {
278 notification_ids: ordered_notification_ids,
279 additional_known_metadata,
280 }
281 }
282}
283
284impl fmt::Display for UncompletedDoProgressDuringReplay {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 write!(f,
287"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
288'{}' could not be replayed. This usually means the code was mutated adding an 'await' without registering a new service revision.
289Notifications awaited on this {} point:",
290 format_do_progress(),
291 format_do_progress(),
292 )?;
293
294 for notification_id in &self.notification_ids {
295 write!(f, "\n - ")?;
296 if let Some(metadata) = self.additional_known_metadata.get(notification_id) {
297 write!(f, "{}", metadata)?;
298 } else {
299 match notification_id {
300 NotificationId::CompletionId(completion_id) => {
301 write!(f, "completion id {}", completion_id)?;
302 }
303 NotificationId::SignalId(signal_id) => {
304 write!(f, "signal [{}]", signal_id)?;
305 }
306 NotificationId::SignalName(signal_name) => {
307 write!(f, "Named signal: {}", signal_name)?;
308 }
309 }
310 }
311 }
312
313 Ok(())
314 }
315}
316
317impl std::error::Error for UncompletedDoProgressDuringReplay {}
318
319#[derive(Debug, Clone, thiserror::Error)]
320#[error("Cannot convert a eager state key into UTF-8 String: {0:?}")]
321pub struct BadEagerStateKeyError(#[from] pub(crate) std::string::FromUtf8Error);
322
323#[derive(Debug, Clone, thiserror::Error)]
324#[error("Unexpected empty value variant for get eager state")]
325pub struct EmptyGetEagerState;
326
327#[derive(Debug, Clone, thiserror::Error)]
328#[error("Unexpected empty value variant for state keys")]
329pub struct EmptyGetEagerStateKeys;
330
331#[derive(Debug, thiserror::Error)]
332#[error("Feature '{feature}' is not supported by the negotiated protocol version '{current_version}', the minimum required version is '{minimum_required_version}'")]
333pub struct UnsupportedFeatureForNegotiatedVersion {
334 feature: &'static str,
335 current_version: Version,
336 minimum_required_version: Version,
337}
338
339impl UnsupportedFeatureForNegotiatedVersion {
340 pub fn new(
341 feature: &'static str,
342 current_version: Version,
343 minimum_required_version: Version,
344 ) -> Self {
345 Self {
346 feature,
347 current_version,
348 minimum_required_version,
349 }
350 }
351}
352
353trait WithInvocationErrorCode {
356 fn code(&self) -> InvocationErrorCode;
357}
358
359impl<T: WithInvocationErrorCode + fmt::Display> From<T> for Error {
360 fn from(value: T) -> Self {
361 Error::new(value.code().0, value.to_string())
362 }
363}
364
365macro_rules! impl_error_code {
366 ($error_type:ident, $code:ident) => {
367 impl WithInvocationErrorCode for $error_type {
368 fn code(&self) -> InvocationErrorCode {
369 codes::$code
370 }
371 }
372 };
373}
374
375impl_error_code!(ContentTypeError, UNSUPPORTED_MEDIA_TYPE);
376impl WithInvocationErrorCode for DecodingError {
377 fn code(&self) -> InvocationErrorCode {
378 match self {
379 DecodingError::UnexpectedMessageType { .. } => codes::JOURNAL_MISMATCH,
380 _ => codes::INTERNAL,
381 }
382 }
383}
384impl_error_code!(UnavailableEntryError, PROTOCOL_VIOLATION);
385impl_error_code!(UnexpectedStateError, PROTOCOL_VIOLATION);
386impl_error_code!(ClosedError, CLOSED);
387impl_error_code!(CommandTypeMismatchError, JOURNAL_MISMATCH);
388impl_error_code!(UncompletedDoProgressDuringReplay, JOURNAL_MISMATCH);
389impl<M: RestateMessage + CommandMessageHeaderDiff> WithInvocationErrorCode
390 for CommandMismatchError<M>
391{
392 fn code(&self) -> InvocationErrorCode {
393 codes::JOURNAL_MISMATCH
394 }
395}
396impl_error_code!(BadEagerStateKeyError, INTERNAL);
397impl_error_code!(EmptyGetEagerState, PROTOCOL_VIOLATION);
398impl_error_code!(EmptyGetEagerStateKeys, PROTOCOL_VIOLATION);
399impl_error_code!(UnsupportedFeatureForNegotiatedVersion, UNSUPPORTED_FEATURE);