restate_sdk_shared_core/vm/
errors.rs1use crate::error::NotificationMetadata;
2use crate::fmt::{display_closed_error, format_do_progress, DiffFormatter};
3use crate::service_protocol::messages::{CommandMessageHeaderDiff, RestateMessage};
4use crate::service_protocol::{ContentTypeError, DecodingError, MessageType, NotificationId};
5use crate::{Error, Version};
6use std::borrow::Cow;
7use std::collections::{HashMap, HashSet};
8use std::fmt;
9#[derive(Copy, Clone, PartialEq, Eq)]
12pub struct InvocationErrorCode(u16);
13
14impl InvocationErrorCode {
15 pub const fn new(code: u16) -> Self {
16 InvocationErrorCode(code)
17 }
18
19 pub const fn code(self) -> u16 {
20 self.0
21 }
22}
23
24impl fmt::Debug for InvocationErrorCode {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 write!(f, "{}", self.0)
27 }
28}
29
30impl fmt::Display for InvocationErrorCode {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 fmt::Debug::fmt(self, f)
33 }
34}
35
36impl From<u16> for InvocationErrorCode {
37 fn from(value: u16) -> Self {
38 InvocationErrorCode(value)
39 }
40}
41
42impl From<u32> for InvocationErrorCode {
43 fn from(value: u32) -> Self {
44 value
45 .try_into()
46 .map(InvocationErrorCode)
47 .unwrap_or(codes::INTERNAL)
48 }
49}
50
51impl From<InvocationErrorCode> for u16 {
52 fn from(value: InvocationErrorCode) -> Self {
53 value.0
54 }
55}
56
57impl From<InvocationErrorCode> for u32 {
58 fn from(value: InvocationErrorCode) -> Self {
59 value.0 as u32
60 }
61}
62
63pub mod codes {
64 use super::InvocationErrorCode;
65
66 pub const BAD_REQUEST: InvocationErrorCode = InvocationErrorCode(400);
67 pub const INTERNAL: InvocationErrorCode = InvocationErrorCode(500);
68 pub const UNSUPPORTED_MEDIA_TYPE: InvocationErrorCode = InvocationErrorCode(415);
69 pub const JOURNAL_MISMATCH: InvocationErrorCode = InvocationErrorCode(570);
70 pub const PROTOCOL_VIOLATION: InvocationErrorCode = InvocationErrorCode(571);
71 pub const AWAITING_TWO_ASYNC_RESULTS: InvocationErrorCode = InvocationErrorCode(572);
72 pub const UNSUPPORTED_FEATURE: InvocationErrorCode = InvocationErrorCode(573);
73 pub const CLOSED: InvocationErrorCode = InvocationErrorCode(598);
74 pub const SUSPENDED: InvocationErrorCode = InvocationErrorCode(599);
75}
76
77impl Error {
80 const fn new_const(code: InvocationErrorCode, message: &'static str) -> Self {
81 Error {
82 code: code.0,
83 message: Cow::Borrowed(message),
84 stacktrace: String::new(),
85 related_command: None,
86 next_retry_delay: None,
87 }
88 }
89}
90
91pub const MISSING_CONTENT_TYPE: Error = Error::new_const(
92 codes::UNSUPPORTED_MEDIA_TYPE,
93 "Missing content type when invoking the service deployment",
94);
95
96pub const UNEXPECTED_INPUT_MESSAGE: Error = Error::new_const(
97 codes::PROTOCOL_VIOLATION,
98 "Expected incoming message to be an entry",
99);
100
101pub const KNOWN_ENTRIES_IS_ZERO: Error =
102 Error::new_const(codes::INTERNAL, "Known entries is zero, expected >= 1");
103
104pub const UNEXPECTED_ENTRY_MESSAGE: Error = Error::new_const(
105 codes::PROTOCOL_VIOLATION,
106 "Expected entry messages only when waiting replay entries",
107);
108
109pub const INPUT_CLOSED_WHILE_WAITING_ENTRIES: Error = Error::new_const(
110 codes::PROTOCOL_VIOLATION,
111 "The input was closed while still waiting to receive all journal to replay",
112);
113
114pub const EMPTY_IDEMPOTENCY_KEY: Error = Error::new_const(
115 codes::INTERNAL,
116 "Trying to execute an idempotent request with an empty idempotency key. The idempotency key must be non-empty.",
117);
118
119pub const SUSPENDED: Error = Error::new_const(codes::SUSPENDED, "Suspended invocation");
120
121#[derive(Debug, Clone, thiserror::Error)]
124#[error("The execution replay ended unexpectedly. Expecting to read '{expected}' from the recorded journal, but the buffered messages were already drained.")]
125pub struct UnavailableEntryError {
126 expected: MessageType,
127}
128
129impl UnavailableEntryError {
130 pub fn new(expected: MessageType) -> Self {
131 Self { expected }
132 }
133}
134
135#[derive(Debug, thiserror::Error)]
136#[error("Unexpected state '{state:?}' when invoking '{event:?}'")]
137pub struct UnexpectedStateError {
138 state: &'static str,
139 event: String,
140}
141
142impl UnexpectedStateError {
143 pub fn new(state: &'static str, event: String) -> Self {
144 Self { state, event }
145 }
146}
147
148#[derive(Debug)]
149pub struct ClosedError {
150 event: String,
151}
152
153impl ClosedError {
154 pub fn new(event: String) -> Self {
155 Self { event }
156 }
157}
158
159impl fmt::Display for ClosedError {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 display_closed_error(f, &self.event)
162 }
163}
164
165impl std::error::Error for ClosedError {}
166
167#[derive(Debug)]
168pub struct CommandTypeMismatchError {
169 actual: MessageType,
170 command_index: i64,
171 expected: MessageType,
172}
173
174impl CommandTypeMismatchError {
175 pub fn new(
176 command_index: i64,
177 actual: MessageType,
178 expected: MessageType,
179 ) -> CommandTypeMismatchError {
180 Self {
181 command_index,
182 actual,
183 expected,
184 }
185 }
186}
187
188impl fmt::Display for CommandTypeMismatchError {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 write!(f,
191 "Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
192This typically happens when some parts of the code are non-deterministic.
193 - The previous execution ran and recorded the following: '{}' (index '{}')
194 - The current execution attempts to perform the following: '{}'",
195 self.expected,
196 self.command_index,
197 self.actual,
198 )
199 }
200}
201
202impl std::error::Error for CommandTypeMismatchError {}
203
204#[derive(Debug)]
205pub struct CommandMismatchError<M> {
206 command_index: i64,
207 actual: M,
208 expected: M,
209}
210
211impl<M> CommandMismatchError<M> {
212 pub fn new(command_index: i64, actual: M, expected: M) -> CommandMismatchError<M> {
213 Self {
214 command_index,
215 actual,
216 expected,
217 }
218 }
219}
220
221impl<M: RestateMessage + CommandMessageHeaderDiff> fmt::Display for CommandMismatchError<M> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 write!(f,
224"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
225This typically happens when some parts of the code are non-deterministic.
226- The mismatch happened while executing '{}' (index '{}')
227- Difference:",
228 M::ty(), self.command_index,
229 )?;
230 self.actual
231 .write_diff(&self.expected, DiffFormatter::new(f, " "))
232 }
233}
234
235impl<M: RestateMessage + CommandMessageHeaderDiff + std::fmt::Debug> std::error::Error
236 for CommandMismatchError<M>
237{
238}
239
240#[derive(Debug)]
241pub struct UncompletedDoProgressDuringReplay {
242 notification_ids: Vec<NotificationId>,
243 additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
244}
245
246impl UncompletedDoProgressDuringReplay {
247 pub(crate) fn new(
248 notification_ids: HashSet<NotificationId>,
249 additional_known_metadata: HashMap<NotificationId, NotificationMetadata>,
250 ) -> Self {
251 let mut ordered_notification_ids = Vec::from_iter(notification_ids);
253 ordered_notification_ids.sort_by(|a, b| match (a, b) {
254 (NotificationId::CompletionId(a_id), NotificationId::CompletionId(b_id)) => {
255 a_id.cmp(b_id)
256 }
257 (NotificationId::CompletionId(_), _) => std::cmp::Ordering::Less,
258 (_, NotificationId::CompletionId(_)) => std::cmp::Ordering::Greater,
259
260 (NotificationId::SignalName(a_name), NotificationId::SignalName(b_name)) => {
261 a_name.cmp(b_name)
262 }
263 (NotificationId::SignalName(_), NotificationId::SignalId(_)) => {
264 std::cmp::Ordering::Less
265 }
266 (NotificationId::SignalId(_), NotificationId::SignalName(_)) => {
267 std::cmp::Ordering::Greater
268 }
269
270 (NotificationId::SignalId(a_id), NotificationId::SignalId(b_id)) => {
271 let a_is_cancel = *a_id == crate::service_protocol::CANCEL_SIGNAL_ID;
272 let b_is_cancel = *b_id == crate::service_protocol::CANCEL_SIGNAL_ID;
273 match (a_is_cancel, b_is_cancel) {
274 (true, false) => std::cmp::Ordering::Greater,
275 (false, true) => std::cmp::Ordering::Less,
276 _ => a_id.cmp(b_id),
277 }
278 }
279 });
280 Self {
281 notification_ids: ordered_notification_ids,
282 additional_known_metadata,
283 }
284 }
285}
286
287impl fmt::Display for UncompletedDoProgressDuringReplay {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 write!(f,
290"Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.
291'{}' could not be replayed. This usually means the code was mutated adding an 'await' without registering a new service revision.
292Notifications awaited on this {} point:",
293 format_do_progress(),
294 format_do_progress(),
295 )?;
296
297 for notification_id in &self.notification_ids {
298 write!(f, "\n - ")?;
299 if let Some(metadata) = self.additional_known_metadata.get(notification_id) {
300 write!(f, "{}", metadata)?;
301 } else {
302 match notification_id {
303 NotificationId::CompletionId(completion_id) => {
304 write!(f, "completion id {}", completion_id)?;
305 }
306 NotificationId::SignalId(signal_id) => {
307 write!(f, "signal [{}]", signal_id)?;
308 }
309 NotificationId::SignalName(signal_name) => {
310 write!(f, "Named signal: {}", signal_name)?;
311 }
312 }
313 }
314 }
315
316 Ok(())
317 }
318}
319
320impl std::error::Error for UncompletedDoProgressDuringReplay {}
321
322#[derive(Debug, Clone, thiserror::Error)]
323#[error("Cannot convert a eager state key into UTF-8 String: {0:?}")]
324pub struct BadEagerStateKeyError(#[from] pub(crate) std::string::FromUtf8Error);
325
326#[derive(Debug, Clone, thiserror::Error)]
327#[error("Unexpected empty value variant for get eager state")]
328pub struct EmptyGetEagerState;
329
330#[derive(Debug, Clone, thiserror::Error)]
331#[error("Unexpected empty value variant for state keys")]
332pub struct EmptyGetEagerStateKeys;
333
334#[derive(Debug, thiserror::Error)]
335#[error("Feature '{feature}' is not supported by the negotiated protocol version '{current_version}', the minimum required version is '{minimum_required_version}'")]
336pub struct UnsupportedFeatureForNegotiatedVersion {
337 feature: &'static str,
338 current_version: Version,
339 minimum_required_version: Version,
340}
341
342impl UnsupportedFeatureForNegotiatedVersion {
343 pub fn new(
344 feature: &'static str,
345 current_version: Version,
346 minimum_required_version: Version,
347 ) -> Self {
348 Self {
349 feature,
350 current_version,
351 minimum_required_version,
352 }
353 }
354}
355
356trait WithInvocationErrorCode {
359 fn code(&self) -> InvocationErrorCode;
360}
361
362impl<T: WithInvocationErrorCode + fmt::Display> From<T> for Error {
363 fn from(value: T) -> Self {
364 Error::new(value.code().0, value.to_string())
365 }
366}
367
368macro_rules! impl_error_code {
369 ($error_type:ident, $code:ident) => {
370 impl WithInvocationErrorCode for $error_type {
371 fn code(&self) -> InvocationErrorCode {
372 codes::$code
373 }
374 }
375 };
376}
377
378impl_error_code!(ContentTypeError, UNSUPPORTED_MEDIA_TYPE);
379impl WithInvocationErrorCode for DecodingError {
380 fn code(&self) -> InvocationErrorCode {
381 match self {
382 DecodingError::UnexpectedMessageType { .. } => codes::JOURNAL_MISMATCH,
383 _ => codes::INTERNAL,
384 }
385 }
386}
387impl_error_code!(UnavailableEntryError, PROTOCOL_VIOLATION);
388impl_error_code!(UnexpectedStateError, PROTOCOL_VIOLATION);
389impl_error_code!(ClosedError, CLOSED);
390impl_error_code!(CommandTypeMismatchError, JOURNAL_MISMATCH);
391impl_error_code!(UncompletedDoProgressDuringReplay, JOURNAL_MISMATCH);
392impl<M: RestateMessage + CommandMessageHeaderDiff> WithInvocationErrorCode
393 for CommandMismatchError<M>
394{
395 fn code(&self) -> InvocationErrorCode {
396 codes::JOURNAL_MISMATCH
397 }
398}
399impl_error_code!(BadEagerStateKeyError, INTERNAL);
400impl_error_code!(EmptyGetEagerState, PROTOCOL_VIOLATION);
401impl_error_code!(EmptyGetEagerStateKeys, PROTOCOL_VIOLATION);
402impl_error_code!(UnsupportedFeatureForNegotiatedVersion, UNSUPPORTED_FEATURE);