essential_vm/future.rs
1use crate::{
2 asm::{self, Op},
3 error::{ExecError, OpAsyncError, OpAsyncResult, OpError, OutOfGasError},
4 state_read::{self, StateReadFuture},
5 sync::step_op_sync,
6 Access, ContentAddress, Gas, GasLimit, OpAccess, OpAsync, OpGasCost, OpKind, StateRead, Vm,
7};
8use core::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14/// A future that when polled attempts to make progress on VM execution.
15///
16/// This poll implementation steps forward the VM by the stored operations,
17/// handling synchronous and asynchronous operations differently:
18///
19/// - For synchronous operations, it directly steps the VM to execute the
20/// operation.
21/// - For asynchronous operations, it creates a future that will complete
22/// the operation and temporarily takes ownership of the VM. This future
23/// is stored in `pending_op` until it's ready.
24///
25/// This type should not be constructed directly. Instead, it is used as a part
26/// of the implementation of [`Vm::exec`] and exposed publicly for documentation
27/// of its behaviour.
28///
29/// ## Yield Behavior
30///
31/// Execution yields in two scenarios:
32///
33/// - **Asynchronous Operations**: When an async operation is encountered,
34/// the method yields until the operation's future is ready. This allows
35/// other tasks to run while awaiting the asynchronous operation to
36/// complete.
37/// - **Gas Yield Limit Reached**: The method also yields based on a gas
38/// spending limit. If executing an operation causes `gas.spent` to exceed
39/// `gas.next_yield_threshold`, the method yields to allow the scheduler
40/// to run other tasks. This prevents long or complex sequences of
41/// operations from monopolizing CPU time.
42///
43/// Upon yielding, the method ensures that the state of the VM and the
44/// execution context (including gas counters and any pending operations)
45/// are preserved for when the `poll` method is called again.
46///
47/// ## Error Handling
48///
49/// Errors encountered during operation execution result in an immediate
50/// return of `Poll::Ready(Err(...))`, encapsulating the error within a
51/// `ExecError`. This includes errors from:
52///
53/// - Synchronous operations that fail during their execution.
54/// - Asynchronous operations, where errors are handled once the future
55/// resolves.
56///
57/// The VM's program counter will remain on the operation that caused the
58/// error.
59///
60/// ## Completion
61///
62/// The future completes (`Poll::Ready(Ok(...))`) when all operations have
63/// been executed and no more work remains. At this point, ownership over
64/// the VM is dropped and the total amount of gas spent during execution is
65/// returned. Attempting to poll the future after completion will panic.
66pub struct ExecFuture<'a, S, OA, OG>
67where
68 S: StateRead,
69{
70 /// Access to solution data.
71 access: Access<'a>,
72 /// Access to state reading.
73 state_read: &'a S,
74 /// Access to operations.
75 op_access: OA,
76 /// A function that, given a reference to an op, returns its gas cost.
77 op_gas_cost: &'a OG,
78 /// Store the VM in an `Option` so that we can `take` it upon future completion.
79 vm: Option<&'a mut Vm>,
80 /// Gas spent during execution so far.
81 gas: GasExec,
82 /// In the case that the operation future is pending (i.e a state read is in
83 /// progress), we store the future here.
84 pending_op: Option<PendingOp<'a, S>>,
85}
86
87/// Track gas limits and expenditure for execution.
88struct GasExec {
89 /// The total and yield gas limits.
90 limit: GasLimit,
91 /// The gas threshold at which the future should yield.
92 next_yield_threshold: Gas,
93 /// The total gas limit.
94 spent: Gas,
95}
96
97/// Encapsulates a pending operation.
98struct PendingOp<'a, S>
99where
100 S: StateRead,
101{
102 // The future representing the operation in progress.
103 future: StepOpAsyncFuture<'a, S>,
104 /// Total gas that will have been spent upon completing the op.
105 next_spent: Gas,
106}
107
108/// The future type produced when performing an async operation.
109enum StepOpAsyncFuture<'a, S>
110where
111 S: StateRead,
112{
113 /// The async `StateRead::WordRange` (or `WordRangeExtern`) operation future.
114 StateRead(StateReadFuture<'a, S>),
115}
116
117impl From<GasLimit> for GasExec {
118 /// Initialise gas execution tracking from a given gas limit.
119 fn from(limit: GasLimit) -> Self {
120 GasExec {
121 spent: 0,
122 next_yield_threshold: limit.per_yield,
123 limit,
124 }
125 }
126}
127
128// Allow for consuming the async operation future to retake ownership of the stored `&mut Vm`.
129impl<'a, S> From<StepOpAsyncFuture<'a, S>> for &'a mut Vm
130where
131 S: StateRead,
132{
133 fn from(future: StepOpAsyncFuture<'a, S>) -> Self {
134 match future {
135 StepOpAsyncFuture::StateRead(future) => future.vm,
136 }
137 }
138}
139
140impl<'a, S, OA, OG> Future for ExecFuture<'a, S, OA, OG>
141where
142 S: StateRead,
143 OA: OpAccess<Op = Op> + Unpin,
144 OG: OpGasCost,
145 OA::Error: Into<OpError<S::Error>>,
146{
147 /// Returns a result with the total gas spent.
148 type Output = Result<Gas, ExecError<S::Error>>;
149
150 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
151 // Poll the async operation future if there is one pending.
152 let vm = match self.pending_op.as_mut() {
153 None => self.vm.take().expect("future polled after completion"),
154 Some(pending) => {
155 let res = match Pin::new(&mut pending.future).poll(cx) {
156 Poll::Pending => return Poll::Pending,
157 Poll::Ready(ready) => ready,
158 };
159
160 // Drop the future now we've resumed, retake ownership of the `&mut Vm`.
161 let pending = self.pending_op.take().expect("guaranteed `Some`");
162 let next_spent = pending.next_spent;
163 let vm: &'a mut Vm = pending.future.into();
164
165 // Handle the op result.
166 #[cfg(feature = "tracing")]
167 crate::trace_op_res(
168 &mut self.op_access,
169 vm.pc,
170 &vm.stack,
171 &vm.memory,
172 res.as_ref(),
173 );
174
175 match res {
176 Ok(new_pc) => vm.pc = new_pc,
177 Err(err) => {
178 let err = ExecError(vm.pc, err.into());
179 return Poll::Ready(Err(err));
180 }
181 };
182
183 // Update gas spent and threshold now that we've resumed.
184 self.gas.spent = next_spent;
185 self.gas.next_yield_threshold =
186 self.gas.spent.saturating_add(self.gas.limit.per_yield);
187 vm
188 }
189 };
190
191 // Step forward the virtual machine by the next operation.
192 while let Some(res) = self.op_access.op_access(vm.pc) {
193 // Handle any potential operation access error.
194 let op = match res {
195 Ok(op) => op,
196 Err(err) => {
197 let err = ExecError(vm.pc, err.into());
198 return Poll::Ready(Err(err));
199 }
200 };
201
202 let op_gas = self.op_gas_cost.op_gas_cost(&op);
203
204 // Check that the operation wouldn't exceed gas limit.
205 let next_spent = match self
206 .gas
207 .spent
208 .checked_add(op_gas)
209 .filter(|&spent| spent <= self.gas.limit.total)
210 .ok_or_else(|| out_of_gas(&self.gas, op_gas))
211 .map_err(|err| ExecError(vm.pc, err.into()))
212 {
213 Err(err) => return Poll::Ready(Err(err)),
214 Ok(next_spent) => next_spent,
215 };
216
217 let res = match OpKind::from(op) {
218 OpKind::Sync(op) => step_op_sync(op, self.access, vm),
219 OpKind::Async(op) => {
220 // Async op takes ownership of the VM and returns it upon future completion.
221 let contract_addr = self
222 .access
223 .this_solution()
224 .predicate_to_solve
225 .contract
226 .clone();
227 let pc = vm.pc;
228 let future = match step_op_async(op, contract_addr, self.state_read, vm) {
229 Err(err) => {
230 let err = ExecError(pc, err.into());
231 return Poll::Ready(Err(err));
232 }
233 Ok(fut) => fut,
234 };
235 self.pending_op = Some(PendingOp { future, next_spent });
236 cx.waker().wake_by_ref();
237 return Poll::Pending;
238 }
239 };
240
241 #[cfg(feature = "tracing")]
242 crate::trace_op_res(
243 &mut self.op_access,
244 vm.pc,
245 &vm.stack,
246 &vm.memory,
247 res.as_ref(),
248 );
249
250 // Handle any errors.
251 let opt_new_pc = match res {
252 Ok(opt) => opt,
253 Err(err) => {
254 return Poll::Ready(Err(ExecError(vm.pc, err.into())));
255 }
256 };
257
258 // Operation successful, so update gas spent.
259 self.gas.spent = next_spent;
260
261 // Update the program counter, or exit if we're done.
262 match opt_new_pc {
263 Some(new_pc) => vm.pc = new_pc,
264 // `None` is returned after encountering a `Halt` operation.
265 None => return Poll::Ready(Ok(self.gas.spent)),
266 }
267
268 // Yield if we've reached our gas limit.
269 if self.gas.next_yield_threshold <= self.gas.spent {
270 self.gas.next_yield_threshold =
271 self.gas.spent.saturating_add(self.gas.limit.per_yield);
272 self.vm = Some(vm);
273 cx.waker().wake_by_ref();
274 return Poll::Pending;
275 }
276 }
277
278 Poll::Ready(Ok(self.gas.spent))
279 }
280}
281
282impl<S> Future for StepOpAsyncFuture<'_, S>
283where
284 S: StateRead,
285{
286 // Future returns a result with the new program counter.
287 type Output = OpAsyncResult<usize, S::Error>;
288 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
289 let (prev_pc, res) = match *self {
290 Self::StateRead(ref mut future) => {
291 let pc = future.vm.pc;
292 match Pin::new(future).poll(cx) {
293 Poll::Pending => return Poll::Pending,
294 Poll::Ready(res) => (pc, res),
295 }
296 }
297 };
298 // Every operation besides control flow steps forward program counter by 1.
299 let new_pc = prev_pc.checked_add(1).ok_or(OpAsyncError::PcOverflow)?;
300 let res = res.map(|()| new_pc);
301 Poll::Ready(res)
302 }
303}
304
305/// Creates the VM execution future.
306pub(crate) fn exec<'a, S, OA, OG>(
307 vm: &'a mut Vm,
308 access: Access<'a>,
309 state_read: &'a S,
310 op_access: OA,
311 op_gas_cost: &'a OG,
312 gas_limit: GasLimit,
313) -> ExecFuture<'a, S, OA, OG>
314where
315 S: StateRead,
316 OA: OpAccess<Op = Op> + Unpin,
317 OG: OpGasCost,
318 OA::Error: Into<OpError<S::Error>>,
319{
320 ExecFuture {
321 access,
322 state_read,
323 op_access,
324 op_gas_cost,
325 vm: Some(vm),
326 gas: GasExec::from(gas_limit),
327 pending_op: None,
328 }
329}
330
331/// Step forward the given `Vm` with the given asynchronous operation.
332///
333/// Returns a future representing the completion of the operation.
334fn step_op_async<'a, S>(
335 op: OpAsync,
336 contract_addr: ContentAddress,
337 state_read: &'a S,
338 vm: &'a mut Vm,
339) -> OpAsyncResult<StepOpAsyncFuture<'a, S>, S::Error>
340where
341 S: StateRead,
342{
343 match op {
344 OpAsync(asm::StateRead::KeyRange) => {
345 let future = state_read::key_range(state_read, &contract_addr, &mut *vm)?;
346 Ok(StepOpAsyncFuture::StateRead(future))
347 }
348 OpAsync(asm::StateRead::KeyRangeExtern) => {
349 let future = state_read::key_range_ext(state_read, &mut *vm)?;
350 Ok(StepOpAsyncFuture::StateRead(future))
351 }
352 }
353}
354
355/// Shorthand for constructing an `OutOfGasError`.
356fn out_of_gas(exec: &GasExec, op_gas: Gas) -> OutOfGasError {
357 OutOfGasError {
358 spent: exec.spent,
359 limit: exec.limit.total,
360 op_gas,
361 }
362}