essential_vm/
future.rs

1use crate::{
2    asm::{self, Op},
3    error::{ExecError, OpAsyncError, OpAsyncResult, OpError, OutOfGasError},
4    state_read::{self, StateReadFuture},
5    sync::step_op_sync,
6    Access, ContentAddress, Gas, GasLimit, OpAccess, OpAsync, OpGasCost, OpKind, StateRead, Vm,
7};
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14/// A future that when polled attempts to make progress on VM execution.
15///
16/// This poll implementation steps forward the VM by the stored operations,
17/// handling synchronous and asynchronous operations differently:
18///
19/// - For synchronous operations, it directly steps the VM to execute the
20///   operation.
21/// - For asynchronous operations, it creates a future that will complete
22///   the operation and temporarily takes ownership of the VM. This future
23///   is stored in `pending_op` until it's ready.
24///
25/// This type should not be constructed directly. Instead, it is used as a part
26/// of the implementation of [`Vm::exec`] and exposed publicly for documentation
27/// of its behaviour.
28///
29/// ## Yield Behavior
30///
31/// Execution yields in two scenarios:
32///
33/// - **Asynchronous Operations**: When an async operation is encountered,
34///   the method yields until the operation's future is ready. This allows
35///   other tasks to run while awaiting the asynchronous operation to
36///   complete.
37/// - **Gas Yield Limit Reached**: The method also yields based on a gas
38///   spending limit. If executing an operation causes `gas.spent` to exceed
39///   `gas.next_yield_threshold`, the method yields to allow the scheduler
40///   to run other tasks. This prevents long or complex sequences of
41///   operations from monopolizing CPU time.
42///
43/// Upon yielding, the method ensures that the state of the VM and the
44/// execution context (including gas counters and any pending operations)
45/// are preserved for when the `poll` method is called again.
46///
47/// ## Error Handling
48///
49/// Errors encountered during operation execution result in an immediate
50/// return of `Poll::Ready(Err(...))`, encapsulating the error within a
51/// `ExecError`. This includes errors from:
52///
53/// - Synchronous operations that fail during their execution.
54/// - Asynchronous operations, where errors are handled once the future
55///   resolves.
56///
57/// The VM's program counter will remain on the operation that caused the
58/// error.
59///
60/// ## Completion
61///
62/// The future completes (`Poll::Ready(Ok(...))`) when all operations have
63/// been executed and no more work remains. At this point, ownership over
64/// the VM is dropped and the total amount of gas spent during execution is
65/// returned. Attempting to poll the future after completion will panic.
66pub struct ExecFuture<'a, S, OA, OG>
67where
68    S: StateRead,
69{
70    /// Access to solution data.
71    access: Access<'a>,
72    /// Access to state reading.
73    state_read: &'a S,
74    /// Access to operations.
75    op_access: OA,
76    /// A function that, given a reference to an op, returns its gas cost.
77    op_gas_cost: &'a OG,
78    /// Store the VM in an `Option` so that we can `take` it upon future completion.
79    vm: Option<&'a mut Vm>,
80    /// Gas spent during execution so far.
81    gas: GasExec,
82    /// In the case that the operation future is pending (i.e a state read is in
83    /// progress), we store the future here.
84    pending_op: Option<PendingOp<'a, S>>,
85}
86
87/// Track gas limits and expenditure for execution.
88struct GasExec {
89    /// The total and yield gas limits.
90    limit: GasLimit,
91    /// The gas threshold at which the future should yield.
92    next_yield_threshold: Gas,
93    /// The total gas limit.
94    spent: Gas,
95}
96
97/// Encapsulates a pending operation.
98struct PendingOp<'a, S>
99where
100    S: StateRead,
101{
102    // The future representing the operation in progress.
103    future: StepOpAsyncFuture<'a, S>,
104    /// Total gas that will have been spent upon completing the op.
105    next_spent: Gas,
106}
107
108/// The future type produced when performing an async operation.
109enum StepOpAsyncFuture<'a, S>
110where
111    S: StateRead,
112{
113    /// The async `StateRead::WordRange` (or `WordRangeExtern`) operation future.
114    StateRead(StateReadFuture<'a, S>),
115}
116
117impl From<GasLimit> for GasExec {
118    /// Initialise gas execution tracking from a given gas limit.
119    fn from(limit: GasLimit) -> Self {
120        GasExec {
121            spent: 0,
122            next_yield_threshold: limit.per_yield,
123            limit,
124        }
125    }
126}
127
128// Allow for consuming the async operation future to retake ownership of the stored `&mut Vm`.
129impl<'a, S> From<StepOpAsyncFuture<'a, S>> for &'a mut Vm
130where
131    S: StateRead,
132{
133    fn from(future: StepOpAsyncFuture<'a, S>) -> Self {
134        match future {
135            StepOpAsyncFuture::StateRead(future) => future.vm,
136        }
137    }
138}
139
140impl<'a, S, OA, OG> Future for ExecFuture<'a, S, OA, OG>
141where
142    S: StateRead,
143    OA: OpAccess<Op = Op> + Unpin,
144    OG: OpGasCost,
145    OA::Error: Into<OpError<S::Error>>,
146{
147    /// Returns a result with the total gas spent.
148    type Output = Result<Gas, ExecError<S::Error>>;
149
150    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
151        // Poll the async operation future if there is one pending.
152        let vm = match self.pending_op.as_mut() {
153            None => self.vm.take().expect("future polled after completion"),
154            Some(pending) => {
155                let res = match Pin::new(&mut pending.future).poll(cx) {
156                    Poll::Pending => return Poll::Pending,
157                    Poll::Ready(ready) => ready,
158                };
159
160                // Drop the future now we've resumed, retake ownership of the `&mut Vm`.
161                let pending = self.pending_op.take().expect("guaranteed `Some`");
162                let next_spent = pending.next_spent;
163                let vm: &'a mut Vm = pending.future.into();
164
165                // Handle the op result.
166                #[cfg(feature = "tracing")]
167                crate::trace_op_res(
168                    &mut self.op_access,
169                    vm.pc,
170                    &vm.stack,
171                    &vm.memory,
172                    res.as_ref(),
173                );
174
175                match res {
176                    Ok(new_pc) => vm.pc = new_pc,
177                    Err(err) => {
178                        let err = ExecError(vm.pc, err.into());
179                        return Poll::Ready(Err(err));
180                    }
181                };
182
183                // Update gas spent and threshold now that we've resumed.
184                self.gas.spent = next_spent;
185                self.gas.next_yield_threshold =
186                    self.gas.spent.saturating_add(self.gas.limit.per_yield);
187                vm
188            }
189        };
190
191        // Step forward the virtual machine by the next operation.
192        while let Some(res) = self.op_access.op_access(vm.pc) {
193            // Handle any potential operation access error.
194            let op = match res {
195                Ok(op) => op,
196                Err(err) => {
197                    let err = ExecError(vm.pc, err.into());
198                    return Poll::Ready(Err(err));
199                }
200            };
201
202            let op_gas = self.op_gas_cost.op_gas_cost(&op);
203
204            // Check that the operation wouldn't exceed gas limit.
205            let next_spent = match self
206                .gas
207                .spent
208                .checked_add(op_gas)
209                .filter(|&spent| spent <= self.gas.limit.total)
210                .ok_or_else(|| out_of_gas(&self.gas, op_gas))
211                .map_err(|err| ExecError(vm.pc, err.into()))
212            {
213                Err(err) => return Poll::Ready(Err(err)),
214                Ok(next_spent) => next_spent,
215            };
216
217            let res = match OpKind::from(op) {
218                OpKind::Sync(op) => step_op_sync(op, self.access, vm),
219                OpKind::Async(op) => {
220                    // Async op takes ownership of the VM and returns it upon future completion.
221                    let contract_addr = self
222                        .access
223                        .this_solution()
224                        .predicate_to_solve
225                        .contract
226                        .clone();
227                    let pc = vm.pc;
228                    let future = match step_op_async(op, contract_addr, self.state_read, vm) {
229                        Err(err) => {
230                            let err = ExecError(pc, err.into());
231                            return Poll::Ready(Err(err));
232                        }
233                        Ok(fut) => fut,
234                    };
235                    self.pending_op = Some(PendingOp { future, next_spent });
236                    cx.waker().wake_by_ref();
237                    return Poll::Pending;
238                }
239            };
240
241            #[cfg(feature = "tracing")]
242            crate::trace_op_res(
243                &mut self.op_access,
244                vm.pc,
245                &vm.stack,
246                &vm.memory,
247                res.as_ref(),
248            );
249
250            // Handle any errors.
251            let opt_new_pc = match res {
252                Ok(opt) => opt,
253                Err(err) => {
254                    return Poll::Ready(Err(ExecError(vm.pc, err.into())));
255                }
256            };
257
258            // Operation successful, so update gas spent.
259            self.gas.spent = next_spent;
260
261            // Update the program counter, or exit if we're done.
262            match opt_new_pc {
263                Some(new_pc) => vm.pc = new_pc,
264                // `None` is returned after encountering a `Halt` operation.
265                None => return Poll::Ready(Ok(self.gas.spent)),
266            }
267
268            // Yield if we've reached our gas limit.
269            if self.gas.next_yield_threshold <= self.gas.spent {
270                self.gas.next_yield_threshold =
271                    self.gas.spent.saturating_add(self.gas.limit.per_yield);
272                self.vm = Some(vm);
273                cx.waker().wake_by_ref();
274                return Poll::Pending;
275            }
276        }
277
278        Poll::Ready(Ok(self.gas.spent))
279    }
280}
281
282impl<S> Future for StepOpAsyncFuture<'_, S>
283where
284    S: StateRead,
285{
286    // Future returns a result with the new program counter.
287    type Output = OpAsyncResult<usize, S::Error>;
288    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
289        let (prev_pc, res) = match *self {
290            Self::StateRead(ref mut future) => {
291                let pc = future.vm.pc;
292                match Pin::new(future).poll(cx) {
293                    Poll::Pending => return Poll::Pending,
294                    Poll::Ready(res) => (pc, res),
295                }
296            }
297        };
298        // Every operation besides control flow steps forward program counter by 1.
299        let new_pc = prev_pc.checked_add(1).ok_or(OpAsyncError::PcOverflow)?;
300        let res = res.map(|()| new_pc);
301        Poll::Ready(res)
302    }
303}
304
305/// Creates the VM execution future.
306pub(crate) fn exec<'a, S, OA, OG>(
307    vm: &'a mut Vm,
308    access: Access<'a>,
309    state_read: &'a S,
310    op_access: OA,
311    op_gas_cost: &'a OG,
312    gas_limit: GasLimit,
313) -> ExecFuture<'a, S, OA, OG>
314where
315    S: StateRead,
316    OA: OpAccess<Op = Op> + Unpin,
317    OG: OpGasCost,
318    OA::Error: Into<OpError<S::Error>>,
319{
320    ExecFuture {
321        access,
322        state_read,
323        op_access,
324        op_gas_cost,
325        vm: Some(vm),
326        gas: GasExec::from(gas_limit),
327        pending_op: None,
328    }
329}
330
331/// Step forward the given `Vm` with the given asynchronous operation.
332///
333/// Returns a future representing the completion of the operation.
334fn step_op_async<'a, S>(
335    op: OpAsync,
336    contract_addr: ContentAddress,
337    state_read: &'a S,
338    vm: &'a mut Vm,
339) -> OpAsyncResult<StepOpAsyncFuture<'a, S>, S::Error>
340where
341    S: StateRead,
342{
343    match op {
344        OpAsync(asm::StateRead::KeyRange) => {
345            let future = state_read::key_range(state_read, &contract_addr, &mut *vm)?;
346            Ok(StepOpAsyncFuture::StateRead(future))
347        }
348        OpAsync(asm::StateRead::KeyRangeExtern) => {
349            let future = state_read::key_range_ext(state_read, &mut *vm)?;
350            Ok(StepOpAsyncFuture::StateRead(future))
351        }
352    }
353}
354
355/// Shorthand for constructing an `OutOfGasError`.
356fn out_of_gas(exec: &GasExec, op_gas: Gas) -> OutOfGasError {
357    OutOfGasError {
358        spent: exec.spent,
359        limit: exec.limit.total,
360        op_gas,
361    }
362}