essential_state_read_vm/
future.rs

1use crate::{
2    asm::Op,
3    error::{OpAsyncError, OpError, OutOfGasError, StateReadError},
4    state_read::{self, StateReadFuture},
5    step_op_sync, Access, ContentAddress, Gas, GasLimit, OpAccess, OpAsync, OpAsyncResult,
6    OpGasCost, OpKind, StateRead, Vm,
7};
8use core::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14/// A future that when polled attempts to make progress on VM execution.
15///
16/// This poll implementation steps forward the VM by the stored operations,
17/// handling synchronous and asynchronous operations differently:
18///
19/// - For synchronous operations, it directly steps the VM to execute the
20///   operation.
21/// - For asynchronous operations, it creates a future that will complete
22///   the operation and temporarily takes ownership of the VM. This future
23///   is stored in `pending_op` until it's ready.
24///
25/// This type should not be constructed directly. Instead, it is used as a part
26/// of the implementation of [`Vm::exec`] and exposed publicly for documentation
27/// of its behaviour.
28///
29/// ## Yield Behavior
30///
31/// Execution yields in two scenarios:
32///
33/// - **Asynchronous Operations**: When an async operation is encountered,
34///   the method yields until the operation's future is ready. This allows
35///   other tasks to run while awaiting the asynchronous operation to
36///   complete.
37/// - **Gas Yield Limit Reached**: The method also yields based on a gas
38///   spending limit. If executing an operation causes `gas.spent` to exceed
39///   `gas.next_yield_threshold`, the method yields to allow the scheduler
40///   to run other tasks. This prevents long or complex sequences of
41///   operations from monopolizing CPU time.
42///
43/// Upon yielding, the method ensures that the state of the VM and the
44/// execution context (including gas counters and any pending operations)
45/// are preserved for when the `poll` method is called again.
46///
47/// ## Error Handling
48///
49/// Errors encountered during operation execution result in an immediate
50/// return of `Poll::Ready(Err(...))`, encapsulating the error within a
51/// `StateReadError`. This includes errors from:
52///
53/// - Synchronous operations that fail during their execution.
54/// - Asynchronous operations, where errors are handled once the future
55///   resolves.
56///
57/// The VM's program counter will remain on the operation that caused the
58/// error.
59///
60/// ## Completion
61///
62/// The future completes (`Poll::Ready(Ok(...))`) when all operations have
63/// been executed and no more work remains. At this point, ownership over
64/// the VM is dropped and the total amount of gas spent during execution is
65/// returned. Attempting to poll the future after completion will panic.
66pub struct ExecFuture<'a, S, OA, OG>
67where
68    S: StateRead,
69{
70    /// Access to solution data.
71    access: Access<'a>,
72    /// Access to state reading.
73    state_read: &'a S,
74    /// Access to operations.
75    op_access: OA,
76    /// A function that, given a reference to an op, returns its gas cost.
77    op_gas_cost: &'a OG,
78    /// Store the VM in an `Option` so that we can `take` it upon future completion.
79    vm: Option<&'a mut Vm>,
80    /// Gas spent during execution so far.
81    gas: GasExec,
82    /// In the case that the operation future is pending (i.e a state read is in
83    /// progress), we store the future here.
84    pending_op: Option<PendingOp<'a, S>>,
85}
86
87/// Track gas limits and expenditure for execution.
88struct GasExec {
89    /// The total and yield gas limits.
90    limit: GasLimit,
91    /// The gas threshold at which the future should yield.
92    next_yield_threshold: Gas,
93    /// The total gas limit.
94    spent: Gas,
95}
96
97/// Encapsulates a pending operation.
98struct PendingOp<'a, S>
99where
100    S: StateRead,
101{
102    // The future representing the operation in progress.
103    future: StepOpAsyncFuture<'a, S>,
104    /// Total gas that will have been spent upon completing the op.
105    next_spent: Gas,
106}
107
108/// The future type produced when performing an async operation.
109enum StepOpAsyncFuture<'a, S>
110where
111    S: StateRead,
112{
113    /// The async `StateRead::WordRange` (or `WordRangeExtern`) operation future.
114    StateRead(StateReadFuture<'a, S>),
115}
116
117impl From<GasLimit> for GasExec {
118    /// Initialise gas execution tracking from a given gas limit.
119    fn from(limit: GasLimit) -> Self {
120        GasExec {
121            spent: 0,
122            next_yield_threshold: limit.per_yield,
123            limit,
124        }
125    }
126}
127
128// Allow for consuming the async operation future to retake ownership of the stored `&mut Vm`.
129impl<'a, S> From<StepOpAsyncFuture<'a, S>> for &'a mut Vm
130where
131    S: StateRead,
132{
133    fn from(future: StepOpAsyncFuture<'a, S>) -> Self {
134        match future {
135            StepOpAsyncFuture::StateRead(future) => future.vm,
136        }
137    }
138}
139
140impl<'a, S, OA, OG> Future for ExecFuture<'a, S, OA, OG>
141where
142    S: StateRead,
143    OA: OpAccess<Op = Op> + Unpin,
144    OG: OpGasCost,
145    OA::Error: Into<OpError<S::Error>>,
146{
147    /// Returns a result with the total gas spent.
148    type Output = Result<Gas, StateReadError<S::Error>>;
149
150    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
151        // Poll the async operation future if there is one pending.
152        let vm = match self.pending_op.as_mut() {
153            None => self.vm.take().expect("future polled after completion"),
154            Some(pending) => {
155                let res = match Pin::new(&mut pending.future).poll(cx) {
156                    Poll::Pending => return Poll::Pending,
157                    Poll::Ready(ready) => ready,
158                };
159
160                // Drop the future now we've resumed, retake ownership of the `&mut Vm`.
161                let pending = self.pending_op.take().expect("guaranteed `Some`");
162                let next_spent = pending.next_spent;
163                let vm: &'a mut Vm = pending.future.into();
164
165                // Handle the op result.
166                #[cfg(feature = "tracing")]
167                trace_op_res(&mut self.op_access, &*vm, res.as_ref());
168
169                match res {
170                    Ok(new_pc) => vm.pc = new_pc,
171                    Err(err) => {
172                        let err = StateReadError::Op(vm.pc, err.into());
173                        return Poll::Ready(Err(err));
174                    }
175                };
176
177                // Update gas spent and threshold now that we've resumed.
178                self.gas.spent = next_spent;
179                self.gas.next_yield_threshold =
180                    self.gas.spent.saturating_add(self.gas.limit.per_yield);
181                vm
182            }
183        };
184
185        // Step forward the virtual machine by the next operation.
186        while let Some(res) = self.op_access.op_access(vm.pc) {
187            // Handle any potential operation access error.
188            let op = match res {
189                Ok(op) => op,
190                Err(err) => {
191                    let err = StateReadError::Op(vm.pc, err.into());
192                    return Poll::Ready(Err(err));
193                }
194            };
195
196            let op_gas = self.op_gas_cost.op_gas_cost(&op);
197
198            // Check that the operation wouldn't exceed gas limit.
199            let next_spent = match self
200                .gas
201                .spent
202                .checked_add(op_gas)
203                .filter(|&spent| spent <= self.gas.limit.total)
204                .ok_or_else(|| out_of_gas(&self.gas, op_gas))
205                .map_err(|err| StateReadError::Op(vm.pc, err.into()))
206            {
207                Err(err) => return Poll::Ready(Err(err)),
208                Ok(next_spent) => next_spent,
209            };
210
211            let res = match OpKind::from(op) {
212                OpKind::Sync(op) => step_op_sync(op, self.access, vm),
213                OpKind::Async(op) => {
214                    // Async op takes ownership of the VM and returns it upon future completion.
215                    let contract_addr = self
216                        .access
217                        .solution
218                        .this_data()
219                        .predicate_to_solve
220                        .contract
221                        .clone();
222                    let pc = vm.pc;
223                    let future = match step_op_async(op, contract_addr, self.state_read, vm) {
224                        Err(err) => {
225                            let err = StateReadError::Op(pc, err.into());
226                            return Poll::Ready(Err(err));
227                        }
228                        Ok(fut) => fut,
229                    };
230                    self.pending_op = Some(PendingOp { future, next_spent });
231                    cx.waker().wake_by_ref();
232                    return Poll::Pending;
233                }
234            };
235
236            #[cfg(feature = "tracing")]
237            trace_op_res(&mut self.op_access, &*vm, res.as_ref());
238
239            // Handle any errors.
240            let opt_new_pc = match res {
241                Ok(opt) => opt,
242                Err(err) => {
243                    return Poll::Ready(Err(StateReadError::Op(vm.pc, err.into())));
244                }
245            };
246
247            // Operation successful, so update gas spent.
248            self.gas.spent = next_spent;
249
250            // Update the program counter, or exit if we're done.
251            match opt_new_pc {
252                Some(new_pc) => vm.pc = new_pc,
253                // `None` is returned after encountering a `Halt` operation.
254                None => return Poll::Ready(Ok(self.gas.spent)),
255            }
256
257            // Yield if we've reached our gas limit.
258            if self.gas.next_yield_threshold <= self.gas.spent {
259                self.gas.next_yield_threshold =
260                    self.gas.spent.saturating_add(self.gas.limit.per_yield);
261                self.vm = Some(vm);
262                cx.waker().wake_by_ref();
263                return Poll::Pending;
264            }
265        }
266
267        Poll::Ready(Ok(self.gas.spent))
268    }
269}
270
271impl<'vm, S> Future for StepOpAsyncFuture<'vm, S>
272where
273    S: StateRead,
274{
275    // Future returns a result with the new program counter.
276    type Output = OpAsyncResult<usize, S::Error>;
277    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
278        let (prev_pc, res) = match *self {
279            Self::StateRead(ref mut future) => {
280                let pc = future.vm.pc;
281                match Pin::new(future).poll(cx) {
282                    Poll::Pending => return Poll::Pending,
283                    Poll::Ready(res) => (pc, res),
284                }
285            }
286        };
287        // Every operation besides control flow steps forward program counter by 1.
288        let new_pc = prev_pc.checked_add(1).ok_or(OpAsyncError::PcOverflow)?;
289        let res = res.map(|()| new_pc);
290        Poll::Ready(res)
291    }
292}
293
294/// Creates the VM execution future.
295pub(crate) fn exec<'a, S, OA, OG>(
296    vm: &'a mut Vm,
297    access: Access<'a>,
298    state_read: &'a S,
299    op_access: OA,
300    op_gas_cost: &'a OG,
301    gas_limit: GasLimit,
302) -> ExecFuture<'a, S, OA, OG>
303where
304    S: StateRead,
305    OA: OpAccess<Op = Op> + Unpin,
306    OG: OpGasCost,
307    OA::Error: Into<OpError<S::Error>>,
308{
309    ExecFuture {
310        access,
311        state_read,
312        op_access,
313        op_gas_cost,
314        vm: Some(vm),
315        gas: GasExec::from(gas_limit),
316        pending_op: None,
317    }
318}
319
320/// Step forward the given `Vm` with the given asynchronous operation.
321///
322/// Returns a future representing the completion of the operation.
323fn step_op_async<'a, S>(
324    op: OpAsync,
325    contract_addr: ContentAddress,
326    state_read: &'a S,
327    vm: &'a mut Vm,
328) -> OpAsyncResult<StepOpAsyncFuture<'a, S>, S::Error>
329where
330    S: StateRead,
331{
332    match op {
333        OpAsync::StateReadKeyRange => {
334            let future = state_read::key_range(state_read, &contract_addr, &mut *vm)?;
335            Ok(StepOpAsyncFuture::StateRead(future))
336        }
337        OpAsync::StateReadKeyRangeExt => {
338            let future = state_read::key_range_ext(state_read, &mut *vm)?;
339            Ok(StepOpAsyncFuture::StateRead(future))
340        }
341    }
342}
343
344/// Shorthand for constructing an `OutOfGasError`.
345fn out_of_gas(exec: &GasExec, op_gas: Gas) -> OutOfGasError {
346    OutOfGasError {
347        spent: exec.spent,
348        limit: exec.limit.total,
349        op_gas,
350    }
351}
352
353/// Trace the operation at the given program counter.
354///
355/// In the success case, also emits the resulting stack.
356///
357/// In the error case, emits a debug log with the error.
358#[cfg(feature = "tracing")]
359fn trace_op_res<OA, T, E>(oa: &mut OA, vm: &Vm, op_res: Result<T, E>)
360where
361    OA: OpAccess,
362    OA::Op: core::fmt::Debug,
363    E: core::fmt::Display,
364{
365    let op = oa
366        .op_access(vm.pc)
367        .expect("must exist as retrieved previously")
368        .expect("must exist as retrieved previously");
369    let pc_op = format!("0x{:02X}: {op:?}", vm.pc);
370    match op_res {
371        Ok(_) => {
372            tracing::trace!(
373                "{pc_op}\n  ├── {:?}\n  └── {:?}",
374                &vm.stack,
375                &vm.state_memory
376            )
377        }
378        Err(ref err) => {
379            tracing::trace!("{pc_op}");
380            tracing::debug!("{err}");
381        }
382    }
383}