essential_state_read_vm/future.rs
1use crate::{
2 asm::Op,
3 error::{OpAsyncError, OpError, OutOfGasError, StateReadError},
4 state_read::{self, StateReadFuture},
5 step_op_sync, Access, ContentAddress, Gas, GasLimit, OpAccess, OpAsync, OpAsyncResult,
6 OpGasCost, OpKind, StateRead, Vm,
7};
8use core::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14/// A future that when polled attempts to make progress on VM execution.
15///
16/// This poll implementation steps forward the VM by the stored operations,
17/// handling synchronous and asynchronous operations differently:
18///
19/// - For synchronous operations, it directly steps the VM to execute the
20/// operation.
21/// - For asynchronous operations, it creates a future that will complete
22/// the operation and temporarily takes ownership of the VM. This future
23/// is stored in `pending_op` until it's ready.
24///
25/// This type should not be constructed directly. Instead, it is used as a part
26/// of the implementation of [`Vm::exec`] and exposed publicly for documentation
27/// of its behaviour.
28///
29/// ## Yield Behavior
30///
31/// Execution yields in two scenarios:
32///
33/// - **Asynchronous Operations**: When an async operation is encountered,
34/// the method yields until the operation's future is ready. This allows
35/// other tasks to run while awaiting the asynchronous operation to
36/// complete.
37/// - **Gas Yield Limit Reached**: The method also yields based on a gas
38/// spending limit. If executing an operation causes `gas.spent` to exceed
39/// `gas.next_yield_threshold`, the method yields to allow the scheduler
40/// to run other tasks. This prevents long or complex sequences of
41/// operations from monopolizing CPU time.
42///
43/// Upon yielding, the method ensures that the state of the VM and the
44/// execution context (including gas counters and any pending operations)
45/// are preserved for when the `poll` method is called again.
46///
47/// ## Error Handling
48///
49/// Errors encountered during operation execution result in an immediate
50/// return of `Poll::Ready(Err(...))`, encapsulating the error within a
51/// `StateReadError`. This includes errors from:
52///
53/// - Synchronous operations that fail during their execution.
54/// - Asynchronous operations, where errors are handled once the future
55/// resolves.
56///
57/// The VM's program counter will remain on the operation that caused the
58/// error.
59///
60/// ## Completion
61///
62/// The future completes (`Poll::Ready(Ok(...))`) when all operations have
63/// been executed and no more work remains. At this point, ownership over
64/// the VM is dropped and the total amount of gas spent during execution is
65/// returned. Attempting to poll the future after completion will panic.
66pub struct ExecFuture<'a, S, OA, OG>
67where
68 S: StateRead,
69{
70 /// Access to solution data.
71 access: Access<'a>,
72 /// Access to state reading.
73 state_read: &'a S,
74 /// Access to operations.
75 op_access: OA,
76 /// A function that, given a reference to an op, returns its gas cost.
77 op_gas_cost: &'a OG,
78 /// Store the VM in an `Option` so that we can `take` it upon future completion.
79 vm: Option<&'a mut Vm>,
80 /// Gas spent during execution so far.
81 gas: GasExec,
82 /// In the case that the operation future is pending (i.e a state read is in
83 /// progress), we store the future here.
84 pending_op: Option<PendingOp<'a, S>>,
85}
86
87/// Track gas limits and expenditure for execution.
88struct GasExec {
89 /// The total and yield gas limits.
90 limit: GasLimit,
91 /// The gas threshold at which the future should yield.
92 next_yield_threshold: Gas,
93 /// The total gas limit.
94 spent: Gas,
95}
96
97/// Encapsulates a pending operation.
98struct PendingOp<'a, S>
99where
100 S: StateRead,
101{
102 // The future representing the operation in progress.
103 future: StepOpAsyncFuture<'a, S>,
104 /// Total gas that will have been spent upon completing the op.
105 next_spent: Gas,
106}
107
108/// The future type produced when performing an async operation.
109enum StepOpAsyncFuture<'a, S>
110where
111 S: StateRead,
112{
113 /// The async `StateRead::WordRange` (or `WordRangeExtern`) operation future.
114 StateRead(StateReadFuture<'a, S>),
115}
116
117impl From<GasLimit> for GasExec {
118 /// Initialise gas execution tracking from a given gas limit.
119 fn from(limit: GasLimit) -> Self {
120 GasExec {
121 spent: 0,
122 next_yield_threshold: limit.per_yield,
123 limit,
124 }
125 }
126}
127
128// Allow for consuming the async operation future to retake ownership of the stored `&mut Vm`.
129impl<'a, S> From<StepOpAsyncFuture<'a, S>> for &'a mut Vm
130where
131 S: StateRead,
132{
133 fn from(future: StepOpAsyncFuture<'a, S>) -> Self {
134 match future {
135 StepOpAsyncFuture::StateRead(future) => future.vm,
136 }
137 }
138}
139
140impl<'a, S, OA, OG> Future for ExecFuture<'a, S, OA, OG>
141where
142 S: StateRead,
143 OA: OpAccess<Op = Op> + Unpin,
144 OG: OpGasCost,
145 OA::Error: Into<OpError<S::Error>>,
146{
147 /// Returns a result with the total gas spent.
148 type Output = Result<Gas, StateReadError<S::Error>>;
149
150 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
151 // Poll the async operation future if there is one pending.
152 let vm = match self.pending_op.as_mut() {
153 None => self.vm.take().expect("future polled after completion"),
154 Some(pending) => {
155 let res = match Pin::new(&mut pending.future).poll(cx) {
156 Poll::Pending => return Poll::Pending,
157 Poll::Ready(ready) => ready,
158 };
159
160 // Drop the future now we've resumed, retake ownership of the `&mut Vm`.
161 let pending = self.pending_op.take().expect("guaranteed `Some`");
162 let next_spent = pending.next_spent;
163 let vm: &'a mut Vm = pending.future.into();
164
165 // Handle the op result.
166 #[cfg(feature = "tracing")]
167 trace_op_res(&mut self.op_access, &*vm, res.as_ref());
168
169 match res {
170 Ok(new_pc) => vm.pc = new_pc,
171 Err(err) => {
172 let err = StateReadError::Op(vm.pc, err.into());
173 return Poll::Ready(Err(err));
174 }
175 };
176
177 // Update gas spent and threshold now that we've resumed.
178 self.gas.spent = next_spent;
179 self.gas.next_yield_threshold =
180 self.gas.spent.saturating_add(self.gas.limit.per_yield);
181 vm
182 }
183 };
184
185 // Step forward the virtual machine by the next operation.
186 while let Some(res) = self.op_access.op_access(vm.pc) {
187 // Handle any potential operation access error.
188 let op = match res {
189 Ok(op) => op,
190 Err(err) => {
191 let err = StateReadError::Op(vm.pc, err.into());
192 return Poll::Ready(Err(err));
193 }
194 };
195
196 let op_gas = self.op_gas_cost.op_gas_cost(&op);
197
198 // Check that the operation wouldn't exceed gas limit.
199 let next_spent = match self
200 .gas
201 .spent
202 .checked_add(op_gas)
203 .filter(|&spent| spent <= self.gas.limit.total)
204 .ok_or_else(|| out_of_gas(&self.gas, op_gas))
205 .map_err(|err| StateReadError::Op(vm.pc, err.into()))
206 {
207 Err(err) => return Poll::Ready(Err(err)),
208 Ok(next_spent) => next_spent,
209 };
210
211 let res = match OpKind::from(op) {
212 OpKind::Sync(op) => step_op_sync(op, self.access, vm),
213 OpKind::Async(op) => {
214 // Async op takes ownership of the VM and returns it upon future completion.
215 let contract_addr = self
216 .access
217 .solution
218 .this_data()
219 .predicate_to_solve
220 .contract
221 .clone();
222 let pc = vm.pc;
223 let future = match step_op_async(op, contract_addr, self.state_read, vm) {
224 Err(err) => {
225 let err = StateReadError::Op(pc, err.into());
226 return Poll::Ready(Err(err));
227 }
228 Ok(fut) => fut,
229 };
230 self.pending_op = Some(PendingOp { future, next_spent });
231 cx.waker().wake_by_ref();
232 return Poll::Pending;
233 }
234 };
235
236 #[cfg(feature = "tracing")]
237 trace_op_res(&mut self.op_access, &*vm, res.as_ref());
238
239 // Handle any errors.
240 let opt_new_pc = match res {
241 Ok(opt) => opt,
242 Err(err) => {
243 return Poll::Ready(Err(StateReadError::Op(vm.pc, err.into())));
244 }
245 };
246
247 // Operation successful, so update gas spent.
248 self.gas.spent = next_spent;
249
250 // Update the program counter, or exit if we're done.
251 match opt_new_pc {
252 Some(new_pc) => vm.pc = new_pc,
253 // `None` is returned after encountering a `Halt` operation.
254 None => return Poll::Ready(Ok(self.gas.spent)),
255 }
256
257 // Yield if we've reached our gas limit.
258 if self.gas.next_yield_threshold <= self.gas.spent {
259 self.gas.next_yield_threshold =
260 self.gas.spent.saturating_add(self.gas.limit.per_yield);
261 self.vm = Some(vm);
262 cx.waker().wake_by_ref();
263 return Poll::Pending;
264 }
265 }
266
267 Poll::Ready(Ok(self.gas.spent))
268 }
269}
270
271impl<'vm, S> Future for StepOpAsyncFuture<'vm, S>
272where
273 S: StateRead,
274{
275 // Future returns a result with the new program counter.
276 type Output = OpAsyncResult<usize, S::Error>;
277 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
278 let (prev_pc, res) = match *self {
279 Self::StateRead(ref mut future) => {
280 let pc = future.vm.pc;
281 match Pin::new(future).poll(cx) {
282 Poll::Pending => return Poll::Pending,
283 Poll::Ready(res) => (pc, res),
284 }
285 }
286 };
287 // Every operation besides control flow steps forward program counter by 1.
288 let new_pc = prev_pc.checked_add(1).ok_or(OpAsyncError::PcOverflow)?;
289 let res = res.map(|()| new_pc);
290 Poll::Ready(res)
291 }
292}
293
294/// Creates the VM execution future.
295pub(crate) fn exec<'a, S, OA, OG>(
296 vm: &'a mut Vm,
297 access: Access<'a>,
298 state_read: &'a S,
299 op_access: OA,
300 op_gas_cost: &'a OG,
301 gas_limit: GasLimit,
302) -> ExecFuture<'a, S, OA, OG>
303where
304 S: StateRead,
305 OA: OpAccess<Op = Op> + Unpin,
306 OG: OpGasCost,
307 OA::Error: Into<OpError<S::Error>>,
308{
309 ExecFuture {
310 access,
311 state_read,
312 op_access,
313 op_gas_cost,
314 vm: Some(vm),
315 gas: GasExec::from(gas_limit),
316 pending_op: None,
317 }
318}
319
320/// Step forward the given `Vm` with the given asynchronous operation.
321///
322/// Returns a future representing the completion of the operation.
323fn step_op_async<'a, S>(
324 op: OpAsync,
325 contract_addr: ContentAddress,
326 state_read: &'a S,
327 vm: &'a mut Vm,
328) -> OpAsyncResult<StepOpAsyncFuture<'a, S>, S::Error>
329where
330 S: StateRead,
331{
332 match op {
333 OpAsync::StateReadKeyRange => {
334 let future = state_read::key_range(state_read, &contract_addr, &mut *vm)?;
335 Ok(StepOpAsyncFuture::StateRead(future))
336 }
337 OpAsync::StateReadKeyRangeExt => {
338 let future = state_read::key_range_ext(state_read, &mut *vm)?;
339 Ok(StepOpAsyncFuture::StateRead(future))
340 }
341 }
342}
343
344/// Shorthand for constructing an `OutOfGasError`.
345fn out_of_gas(exec: &GasExec, op_gas: Gas) -> OutOfGasError {
346 OutOfGasError {
347 spent: exec.spent,
348 limit: exec.limit.total,
349 op_gas,
350 }
351}
352
353/// Trace the operation at the given program counter.
354///
355/// In the success case, also emits the resulting stack.
356///
357/// In the error case, emits a debug log with the error.
358#[cfg(feature = "tracing")]
359fn trace_op_res<OA, T, E>(oa: &mut OA, vm: &Vm, op_res: Result<T, E>)
360where
361 OA: OpAccess,
362 OA::Op: core::fmt::Debug,
363 E: core::fmt::Display,
364{
365 let op = oa
366 .op_access(vm.pc)
367 .expect("must exist as retrieved previously")
368 .expect("must exist as retrieved previously");
369 let pc_op = format!("0x{:02X}: {op:?}", vm.pc);
370 match op_res {
371 Ok(_) => {
372 tracing::trace!(
373 "{pc_op}\n ├── {:?}\n └── {:?}",
374 &vm.stack,
375 &vm.state_memory
376 )
377 }
378 Err(ref err) => {
379 tracing::trace!("{pc_op}");
380 tracing::debug!("{err}");
381 }
382 }
383}