1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
use crate::error::{CommandMetadata, NotificationMetadata};
use crate::service_protocol::{MessageType, NotificationId, CANCEL_SIGNAL_ID};
use crate::vm::context::Context;
use crate::vm::errors::UncompletedDoProgressDuringReplay;
use crate::vm::transitions::{HitSuspensionPoint, Transition, TransitionAndReturn};
use crate::vm::{awakeable_id_str, State};
use crate::{DoProgressResponse, Error, NotificationHandle, Value};
use std::collections::HashMap;
use tracing::trace;
pub(crate) struct Suspended;
pub(crate) struct DoProgress(pub(crate) Vec<NotificationHandle>);
impl TransitionAndReturn<Context, DoProgress> for State {
type Output = Result<DoProgressResponse, Suspended>;
fn transition_and_return(
mut self,
context: &mut Context,
DoProgress(awaiting_on): DoProgress,
) -> Result<(Self, Self::Output), Error> {
match self {
State::Replaying {
ref mut async_results,
ref run_state,
..
} => {
// Check first if any was completed already
if awaiting_on
.iter()
.any(|h| async_results.is_handle_completed(*h))
{
// We're good, let's give back control to user code
return Ok((self, Ok(DoProgressResponse::AnyCompleted)));
}
let notification_ids = async_results.resolve_notification_handles(&awaiting_on);
if notification_ids.is_empty() {
// This can happen if `do_progress` was called while the SDK has all the results already.
return Ok((self, Ok(DoProgressResponse::AnyCompleted)));
}
// Let's try to find it in the notifications we already have
if async_results.process_next_until_any_found(¬ification_ids) {
// We're good, let's give back control to user code
return Ok((self, Ok(DoProgressResponse::AnyCompleted)));
}
// This assertion proves the user mutated the code, adding an await point.
//
// Proof by contradiction:
//
// 1. During replay, we transition to processing AFTER replaying all COMMANDS
// (not after replaying the entire journal, which includes both commands and notifications)
// 2. This code path handles awaits ONLY during replay
// 3. If we reach this point, none of the previous checks succeeded, meaning we don't have enough notifications to complete this await point
// 4. But if this await cannot be completed during replay, then in previous replays/execution attempts, no progress should have been made afterward,
// meaning there should be no more commands to replay in the state machine
// 5. However, we ARE still replaying (as evidenced by being in this code path), which means
// there ARE commands to replay after this await point
//
// This contradiction proves the code was mutated: an await must have been added after
// the journal was originally created.
// Prepare error metadata here, we gotta be nice to make sure users can debug this
let mut known_notification_metadata = HashMap::with_capacity(2);
let mut known_command_metadata: Option<CommandMetadata> = None;
// Collect run info
for handle in awaiting_on {
if let Some((command_index, name)) = run_state.get_run_info(&handle) {
let notification_id =
async_results.must_resolve_notification_handle(&handle);
let command_metadata = CommandMetadata::new_named(
name.to_owned(),
command_index,
MessageType::RunCommand,
);
known_command_metadata = Some(command_metadata.clone());
known_notification_metadata.insert(
notification_id,
NotificationMetadata::RelatedToCommand(command_metadata),
);
}
}
// For awakeables, prep ids
for notification_id in ¬ification_ids {
if let NotificationId::SignalId(id) = notification_id {
if *id == CANCEL_SIGNAL_ID {
known_notification_metadata.insert(
notification_id.clone(),
NotificationMetadata::Cancellation,
);
} else if *id > 16 {
known_notification_metadata.insert(
notification_id.clone(),
NotificationMetadata::Awakeable(awakeable_id_str(
&context.expect_start_info().id,
*id,
)),
);
}
}
}
let mut error = Error::from(UncompletedDoProgressDuringReplay::new(
notification_ids,
known_notification_metadata,
));
if let Some(command_metadata) = known_command_metadata {
error = error.with_related_command_metadata(command_metadata)
}
Err(error)
}
State::Processing {
ref mut async_results,
ref mut run_state,
..
} => {
// Check first if any was completed already
if awaiting_on
.iter()
.any(|h| async_results.is_handle_completed(*h))
{
// We're good, let's give back control to user code
return Ok((self, Ok(DoProgressResponse::AnyCompleted)));
}
let notification_ids = async_results.resolve_notification_handles(&awaiting_on);
if notification_ids.is_empty() {
trace!("Could not resolve any of the {awaiting_on:?} handles");
return Ok((self, Ok(DoProgressResponse::AnyCompleted)));
}
// Let's try to find it in the notifications we already have
if async_results.process_next_until_any_found(¬ification_ids) {
// We're good, let's give back control to user code
return Ok((self, Ok(DoProgressResponse::AnyCompleted)));
}
// We couldn't find any notification for the given ids, let's check if there's some run to execute
if let Some(run_to_execute) =
run_state.try_execute_run(&awaiting_on.iter().cloned().collect())
{
return Ok((self, Ok(DoProgressResponse::ExecuteRun(run_to_execute))));
}
// Check suspension condition
if context.input_is_closed {
// Maybe something is executing and we're awaiting it to complete,
// in this case we don't suspend yet!
if run_state.any_executing(&awaiting_on) {
return Ok((self, Ok(DoProgressResponse::WaitingPendingRun)));
}
let state = self.transition(context, HitSuspensionPoint(notification_ids))?;
return Ok((state, Err(Suspended)));
};
// Nothing else can be done, we need more input
Ok((self, Ok(DoProgressResponse::ReadFromInput)))
}
s => Err(s.as_unexpected_state(crate::fmt::format_do_progress())),
}
}
}
pub(crate) struct TakeNotification(pub(crate) NotificationHandle);
impl TransitionAndReturn<Context, TakeNotification> for State {
type Output = Result<Option<Value>, Suspended>;
fn transition_and_return(
mut self,
_: &mut Context,
TakeNotification(handle): TakeNotification,
) -> Result<(Self, Self::Output), Error> {
match self {
State::Processing {
ref mut async_results,
..
}
| State::Replaying {
ref mut async_results,
..
} => {
let opt = async_results.take_handle(handle);
Ok((self, Ok(opt.map(Into::into))))
}
s => Err(s.as_unexpected_state("TakeNotification")),
}
}
}
pub(crate) struct CopyNotification(pub(crate) NotificationHandle);
impl TransitionAndReturn<Context, CopyNotification> for State {
type Output = Result<Option<Value>, Suspended>;
fn transition_and_return(
mut self,
_: &mut Context,
CopyNotification(handle): CopyNotification,
) -> Result<(Self, Self::Output), Error> {
match self {
State::Processing {
ref mut async_results,
..
}
| State::Replaying {
ref mut async_results,
..
} => {
let opt = async_results.copy_handle(handle);
Ok((self, Ok(opt.map(Into::into))))
}
s => Err(s.as_unexpected_state("CopyNotification")),
}
}
}