Skip to main content

sp1_prover/worker/prover/
execute.rs

1use futures::{stream::FuturesUnordered, StreamExt};
2use slop_futures::pipeline::{AsyncEngine, AsyncWorker, Pipeline, SubmitHandle};
3use sp1_core_executor::{
4    ExecutionError, ExecutionReport, GasEstimatingVM, Program, SP1Context, SP1CoreOpts,
5    SP1RecursionProof,
6};
7use sp1_core_executor_runner::MinimalExecutorRunner;
8use sp1_core_machine::io::SP1Stdin;
9use sp1_hypercube::air::PROOF_NONCE_NUM_WORDS;
10use sp1_hypercube::{MachineVerifyingKey, SP1PcsProofInner, SP1VerifyingKey};
11use sp1_jit::TraceChunkRaw;
12use sp1_primitives::io::SP1PublicValues;
13use sp1_primitives::SP1GlobalContext;
14use std::sync::Arc;
15use tracing::Instrument;
16
17use crate::verify::SP1Verifier;
18
19type DeferredProofInput =
20    (SP1RecursionProof<SP1GlobalContext, SP1PcsProofInner>, MachineVerifyingKey<SP1GlobalContext>);
21use crate::worker::{
22    FinalVmState, FinalVmStateLock, DEFAULT_GAS_EXECUTOR_BUFFER_SIZE,
23    DEFAULT_NUM_GAS_EXECUTOR_WORKERS,
24};
25
26/// Configuration for the executor.
27#[derive(Debug, Clone)]
28pub struct SP1ExecutorConfig {
29    /// The number of gas executors.
30    pub num_gas_executors: usize,
31    /// The buffer size for the gas executor.
32    pub gas_executor_buffer_size: usize,
33}
34
35impl Default for SP1ExecutorConfig {
36    fn default() -> Self {
37        let num_gas_executors = std::env::var("SP1_WORKER_NUMBER_OF_GAS_EXECUTORS")
38            .ok()
39            .and_then(|s| s.parse::<usize>().ok())
40            .unwrap_or(DEFAULT_NUM_GAS_EXECUTOR_WORKERS);
41        let gas_executor_buffer_size = std::env::var("SP1_WORKER_GAS_EXECUTOR_BUFFER_SIZE")
42            .ok()
43            .and_then(|s| s.parse::<usize>().ok())
44            .unwrap_or(DEFAULT_GAS_EXECUTOR_BUFFER_SIZE);
45        Self { num_gas_executors, gas_executor_buffer_size }
46    }
47}
48
49pub fn initialize_gas_engine(
50    config: &SP1ExecutorConfig,
51    program: Arc<Program>,
52    nonce: [u32; PROOF_NONCE_NUM_WORDS],
53    opts: SP1CoreOpts,
54    calculate_gas: bool,
55) -> GasExecutingEngine {
56    let workers = (0..config.num_gas_executors)
57        .map(|_| GasExecutingWorker::new(program.clone(), nonce, opts.clone(), calculate_gas))
58        .collect();
59    AsyncEngine::new(workers, config.gas_executor_buffer_size)
60}
61
62pub type GasExecutingEngine =
63    AsyncEngine<GasExecutingTask, Result<ExecutionReport, ExecutionError>, GasExecutingWorker>;
64
65/// A task for gas estimation on a trace chunk.
66pub struct GasExecutingTask {
67    pub chunk: TraceChunkRaw,
68    /// Lock to store the final VM state when execution completes.
69    pub final_vm_state: FinalVmStateLock,
70}
71
72#[derive(Debug, Clone)]
73pub struct GasExecutingWorker {
74    program: Arc<Program>,
75    nonce: [u32; PROOF_NONCE_NUM_WORDS],
76    opts: SP1CoreOpts,
77    calculate_gas: bool,
78}
79
80impl GasExecutingWorker {
81    pub fn new(
82        program: Arc<Program>,
83        nonce: [u32; PROOF_NONCE_NUM_WORDS],
84        opts: SP1CoreOpts,
85        calculate_gas: bool,
86    ) -> Self {
87        Self { program, nonce, opts, calculate_gas }
88    }
89}
90
91impl AsyncWorker<GasExecutingTask, Result<ExecutionReport, ExecutionError>> for GasExecutingWorker {
92    async fn call(&self, input: GasExecutingTask) -> Result<ExecutionReport, ExecutionError> {
93        let GasExecutingTask { chunk, final_vm_state } = input;
94        if !self.calculate_gas {
95            return Ok(ExecutionReport::default());
96        }
97        let mut gas_estimating_vm =
98            GasEstimatingVM::new(&chunk, self.program.clone(), self.nonce, self.opts.clone());
99        let report = gas_estimating_vm.execute()?;
100
101        // If the VM has completed execution, set the final state.
102        if gas_estimating_vm.core.is_done() {
103            let final_state = FinalVmState::new(&gas_estimating_vm.core);
104            final_vm_state.set(final_state).map_err(|e| {
105                ExecutionError::Other(format!("failed to set final vm state: {}", e))
106            })?;
107        }
108
109        Ok(report)
110    }
111}
112
113fn verify_deferred_proofs(proofs: &[DeferredProofInput]) -> anyhow::Result<()> {
114    if proofs.is_empty() {
115        return Ok(());
116    }
117    let verifier = SP1Verifier::new(crate::verify::VerifierRecursionVks::default());
118    for (index, (proof, vk)) in proofs.iter().enumerate() {
119        let sp1_vk = SP1VerifyingKey { vk: vk.clone() };
120        verifier
121            .verify_compressed(proof, &sp1_vk)
122            .map_err(|e| anyhow::anyhow!("deferred proof {index} failed verification: {e}"))?;
123    }
124    Ok(())
125}
126
127pub async fn execute_with_options(
128    program: Arc<Program>,
129    stdin: SP1Stdin,
130    context: SP1Context<'static>,
131    opts: SP1CoreOpts,
132    executor_config: SP1ExecutorConfig,
133) -> anyhow::Result<(SP1PublicValues, [u8; 32], ExecutionReport)> {
134    // The return values of the spawned tasks.
135    enum ExecutorOutput {
136        VerifyDone,
137        Report(ExecutionReport),
138        PublicValues {
139            public_values: SP1PublicValues,
140            #[cfg(feature = "profiling")]
141            cycle_tracker: hashbrown::HashMap<String, u64>,
142            #[cfg(feature = "profiling")]
143            invocation_tracker: hashbrown::HashMap<String, u64>,
144        },
145    }
146
147    let mut join_set = tokio::task::JoinSet::new();
148    let SP1Stdin { buffer, proofs, .. } = stdin;
149
150    let calculate_gas = context.calculate_gas;
151    let nonce = context.proof_nonce;
152    let max_cycles = context.max_cycles;
153    let minimal_trace_chunk_threshold =
154        if context.calculate_gas { Some(opts.minimal_trace_chunk_threshold) } else { None };
155    let memory_limit = opts.memory_limit;
156    let trace_chunk_slots = opts.trace_chunk_slots;
157    let gas_engine =
158        initialize_gas_engine(&executor_config, program.clone(), nonce, opts, calculate_gas);
159
160    if context.deferred_proof_verification {
161        join_set.spawn_blocking(move || {
162            verify_deferred_proofs(&proofs)?;
163            Ok::<_, anyhow::Error>(ExecutorOutput::VerifyDone)
164        });
165    }
166
167    let mut minimal_executor = MinimalExecutorRunner::new(
168        program.clone(),
169        false,
170        minimal_trace_chunk_threshold,
171        memory_limit,
172        trace_chunk_slots,
173    );
174
175    // Feed stdin buffers to the executor
176    for buf in buffer {
177        minimal_executor.with_input(&buf);
178    }
179
180    // Create a shared final VM state lock that will be set when execution completes.
181    let final_vm_state = FinalVmStateLock::new();
182
183    // Execute the program to completion, collecting all trace chunks
184    let (handle_sender, mut handle_receiver) = tokio::sync::mpsc::unbounded_channel();
185
186    // Spawn a task that runs gas executors.
187    join_set.spawn(async move {
188        let mut report = ExecutionReport::default();
189        let max_cycles = max_cycles.unwrap_or(u64::MAX);
190        let mut gas_handles: FuturesUnordered<SubmitHandle<GasExecutingEngine>> =
191            FuturesUnordered::new();
192        loop {
193            tokio::select! {
194                Some(result) = handle_receiver.recv() => {
195                    let gas_handles_len = gas_handles.len();
196                    tracing::debug!(num_gas_handles = %gas_handles_len, "Received gas handle");
197                    gas_handles.push(result);
198
199                }
200
201                Some(result) = gas_handles.next() => {
202                    let chunk_report = result.map_err(|e| anyhow::anyhow!("gas task panicked: {}", e))??;
203                    let gas_handles_len = gas_handles.len();
204                    tracing::debug!(num_gas_handles = %gas_handles_len, "Gas task finished.");
205                    report += chunk_report;
206
207                    let total_instructions = report.total_instruction_count();
208                    if total_instructions >= max_cycles {
209                        tracing::debug!("Cycle limit reached, stopping execution");
210                        return Err(anyhow::anyhow!("cycle limit reached"));
211                    }
212                }
213
214                else => {
215                    tracing::debug!("No more gas handles to receive");
216                    break;
217                }
218            }
219        }
220        while let Some(result) = gas_handles.next().await {
221            let chunk_report = result.map_err(|e| anyhow::anyhow!("gas task panicked: {}", e))??;
222            report += chunk_report;
223        }
224        Ok::<_, anyhow::Error>(ExecutorOutput::Report(report))
225    }.instrument(tracing::debug_span!("report_accumulator")));
226
227    // Spawn a blocking task to run the minimal executor.
228    let final_vm_state_clone = final_vm_state.clone();
229    join_set.spawn_blocking(move || {
230        while let Some(chunk) = minimal_executor
231            .try_execute_chunk()
232            .map_err(|e| anyhow::anyhow!("Execute chunk failed: {e}"))?
233        {
234            let handle = gas_engine
235                .blocking_submit(GasExecutingTask {
236                    chunk,
237                    final_vm_state: final_vm_state_clone.clone(),
238                })
239                .map_err(|e| anyhow::anyhow!("Gas engine submission failed: {}", e))?;
240            handle_sender.send(handle)?;
241        }
242        tracing::debug!("minimal executor finished in {} cycles", minimal_executor.global_clk());
243
244        // Extract cycle tracker data before consuming the executor
245        #[cfg(feature = "profiling")]
246        let cycle_tracker = minimal_executor.take_cycle_tracker_totals();
247        #[cfg(feature = "profiling")]
248        let invocation_tracker = minimal_executor.take_invocation_tracker();
249
250        let public_value_stream = minimal_executor.into_public_values_stream();
251        let public_values = SP1PublicValues::from(&public_value_stream);
252
253        tracing::info!("public_value_stream: {:?}", public_value_stream);
254        Ok::<_, anyhow::Error>(ExecutorOutput::PublicValues {
255            public_values,
256            #[cfg(feature = "profiling")]
257            cycle_tracker,
258            #[cfg(feature = "profiling")]
259            invocation_tracker,
260        })
261    });
262
263    // Wait for all gas calculations to complete.
264    let mut final_report = ExecutionReport::default();
265    let mut public_values = SP1PublicValues::default();
266    #[cfg(feature = "profiling")]
267    let mut cycle_tracker_data: Option<(
268        hashbrown::HashMap<String, u64>,
269        hashbrown::HashMap<String, u64>,
270    )> = None;
271
272    while let Some(result) = join_set.join_next().await {
273        match result {
274            Ok(Ok(output)) => match output {
275                ExecutorOutput::PublicValues {
276                    public_values: pv,
277                    #[cfg(feature = "profiling")]
278                    cycle_tracker,
279                    #[cfg(feature = "profiling")]
280                    invocation_tracker,
281                } => {
282                    public_values = pv;
283                    #[cfg(feature = "profiling")]
284                    {
285                        cycle_tracker_data = Some((cycle_tracker, invocation_tracker));
286                    }
287                }
288                ExecutorOutput::Report(report) => final_report = report,
289                ExecutorOutput::VerifyDone => {}
290            },
291            Ok(Err(e)) => {
292                // Task returned an error.
293                return Err(e);
294            }
295            Err(join_error) => {
296                // Task panicked or was cancelled.
297                return Err(join_error.into());
298            }
299        }
300    }
301
302    // Merge cycle tracker data from MinimalExecutorRunner into the final report
303    // This must happen after all tasks complete to avoid race conditions
304    #[cfg(feature = "profiling")]
305    if let Some((cycle_tracker, invocation_tracker)) = cycle_tracker_data {
306        final_report.cycle_tracker = cycle_tracker;
307        final_report.invocation_tracker = invocation_tracker;
308    }
309
310    // Extract the public value digest from the final VM state.
311    let public_value_digest: [u8; 32] = final_vm_state
312        .get()
313        .map(|state| {
314            let mut committed_value_digest = [0u8; 32];
315            state.public_value_digest.iter().enumerate().for_each(|(i, word)| {
316                let bytes = word.to_le_bytes();
317                committed_value_digest[i * 4..(i + 1) * 4].copy_from_slice(&bytes);
318            });
319            committed_value_digest
320        })
321        .ok_or(anyhow::anyhow!("Failed to extract public value digest"))?;
322
323    Ok((public_values, public_value_digest, final_report))
324}
325
326#[cfg(test)]
327mod tests {
328    use std::sync::Arc;
329
330    use sp1_core_executor::{Program, SP1Context, SP1CoreOpts};
331    use sp1_core_machine::io::SP1Stdin;
332
333    use super::{execute_with_options, SP1ExecutorConfig};
334
335    #[tokio::test]
336    async fn test_execute_with_optional_gas() {
337        let elf = test_artifacts::FIBONACCI_ELF;
338        let program = Arc::new(Program::from(&elf).unwrap());
339        let mut stdin = SP1Stdin::new();
340        stdin.write(&10usize);
341        let opts = SP1CoreOpts::default();
342        let executor_config = SP1ExecutorConfig::default();
343
344        let context = SP1Context::default();
345        let (pv, digest, report) =
346            execute_with_options(program, stdin, context, opts, executor_config).await.unwrap();
347
348        assert!(pv.hash() == digest.to_vec() || pv.blake3_hash() == digest.to_vec());
349        assert_eq!(report.exit_code, 0);
350    }
351}