sp1_prover/worker/prover/
execute.rs1use futures::{stream::FuturesUnordered, StreamExt};
2use slop_futures::pipeline::{AsyncEngine, AsyncWorker, Pipeline, SubmitHandle};
3use sp1_core_executor::{
4 ExecutionError, ExecutionReport, GasEstimatingVM, Program, SP1Context, SP1CoreOpts,
5 SP1RecursionProof,
6};
7use sp1_core_executor_runner::MinimalExecutorRunner;
8use sp1_core_machine::io::SP1Stdin;
9use sp1_hypercube::air::PROOF_NONCE_NUM_WORDS;
10use sp1_hypercube::{MachineVerifyingKey, SP1PcsProofInner, SP1VerifyingKey};
11use sp1_jit::TraceChunkRaw;
12use sp1_primitives::io::SP1PublicValues;
13use sp1_primitives::SP1GlobalContext;
14use std::sync::Arc;
15use tracing::Instrument;
16
17use crate::verify::SP1Verifier;
18
19type DeferredProofInput =
20 (SP1RecursionProof<SP1GlobalContext, SP1PcsProofInner>, MachineVerifyingKey<SP1GlobalContext>);
21use crate::worker::{
22 FinalVmState, FinalVmStateLock, DEFAULT_GAS_EXECUTOR_BUFFER_SIZE,
23 DEFAULT_NUM_GAS_EXECUTOR_WORKERS,
24};
25
26#[derive(Debug, Clone)]
28pub struct SP1ExecutorConfig {
29 pub num_gas_executors: usize,
31 pub gas_executor_buffer_size: usize,
33}
34
35impl Default for SP1ExecutorConfig {
36 fn default() -> Self {
37 let num_gas_executors = std::env::var("SP1_WORKER_NUMBER_OF_GAS_EXECUTORS")
38 .ok()
39 .and_then(|s| s.parse::<usize>().ok())
40 .unwrap_or(DEFAULT_NUM_GAS_EXECUTOR_WORKERS);
41 let gas_executor_buffer_size = std::env::var("SP1_WORKER_GAS_EXECUTOR_BUFFER_SIZE")
42 .ok()
43 .and_then(|s| s.parse::<usize>().ok())
44 .unwrap_or(DEFAULT_GAS_EXECUTOR_BUFFER_SIZE);
45 Self { num_gas_executors, gas_executor_buffer_size }
46 }
47}
48
49pub fn initialize_gas_engine(
50 config: &SP1ExecutorConfig,
51 program: Arc<Program>,
52 nonce: [u32; PROOF_NONCE_NUM_WORDS],
53 opts: SP1CoreOpts,
54 calculate_gas: bool,
55) -> GasExecutingEngine {
56 let workers = (0..config.num_gas_executors)
57 .map(|_| GasExecutingWorker::new(program.clone(), nonce, opts.clone(), calculate_gas))
58 .collect();
59 AsyncEngine::new(workers, config.gas_executor_buffer_size)
60}
61
62pub type GasExecutingEngine =
63 AsyncEngine<GasExecutingTask, Result<ExecutionReport, ExecutionError>, GasExecutingWorker>;
64
65pub struct GasExecutingTask {
67 pub chunk: TraceChunkRaw,
68 pub final_vm_state: FinalVmStateLock,
70}
71
72#[derive(Debug, Clone)]
73pub struct GasExecutingWorker {
74 program: Arc<Program>,
75 nonce: [u32; PROOF_NONCE_NUM_WORDS],
76 opts: SP1CoreOpts,
77 calculate_gas: bool,
78}
79
80impl GasExecutingWorker {
81 pub fn new(
82 program: Arc<Program>,
83 nonce: [u32; PROOF_NONCE_NUM_WORDS],
84 opts: SP1CoreOpts,
85 calculate_gas: bool,
86 ) -> Self {
87 Self { program, nonce, opts, calculate_gas }
88 }
89}
90
91impl AsyncWorker<GasExecutingTask, Result<ExecutionReport, ExecutionError>> for GasExecutingWorker {
92 async fn call(&self, input: GasExecutingTask) -> Result<ExecutionReport, ExecutionError> {
93 let GasExecutingTask { chunk, final_vm_state } = input;
94 if !self.calculate_gas {
95 return Ok(ExecutionReport::default());
96 }
97 let mut gas_estimating_vm =
98 GasEstimatingVM::new(&chunk, self.program.clone(), self.nonce, self.opts.clone());
99 let report = gas_estimating_vm.execute()?;
100
101 if gas_estimating_vm.core.is_done() {
103 let final_state = FinalVmState::new(&gas_estimating_vm.core);
104 final_vm_state.set(final_state).map_err(|e| {
105 ExecutionError::Other(format!("failed to set final vm state: {}", e))
106 })?;
107 }
108
109 Ok(report)
110 }
111}
112
113fn verify_deferred_proofs(proofs: &[DeferredProofInput]) -> anyhow::Result<()> {
114 if proofs.is_empty() {
115 return Ok(());
116 }
117 let verifier = SP1Verifier::new(crate::verify::VerifierRecursionVks::default());
118 for (index, (proof, vk)) in proofs.iter().enumerate() {
119 let sp1_vk = SP1VerifyingKey { vk: vk.clone() };
120 verifier
121 .verify_compressed(proof, &sp1_vk)
122 .map_err(|e| anyhow::anyhow!("deferred proof {index} failed verification: {e}"))?;
123 }
124 Ok(())
125}
126
127pub async fn execute_with_options(
128 program: Arc<Program>,
129 stdin: SP1Stdin,
130 context: SP1Context<'static>,
131 opts: SP1CoreOpts,
132 executor_config: SP1ExecutorConfig,
133) -> anyhow::Result<(SP1PublicValues, [u8; 32], ExecutionReport)> {
134 enum ExecutorOutput {
136 VerifyDone,
137 Report(ExecutionReport),
138 PublicValues {
139 public_values: SP1PublicValues,
140 #[cfg(feature = "profiling")]
141 cycle_tracker: hashbrown::HashMap<String, u64>,
142 #[cfg(feature = "profiling")]
143 invocation_tracker: hashbrown::HashMap<String, u64>,
144 },
145 }
146
147 let mut join_set = tokio::task::JoinSet::new();
148 let SP1Stdin { buffer, proofs, .. } = stdin;
149
150 let calculate_gas = context.calculate_gas;
151 let nonce = context.proof_nonce;
152 let max_cycles = context.max_cycles;
153 let minimal_trace_chunk_threshold =
154 if context.calculate_gas { Some(opts.minimal_trace_chunk_threshold) } else { None };
155 let memory_limit = opts.memory_limit;
156 let trace_chunk_slots = opts.trace_chunk_slots;
157 let gas_engine =
158 initialize_gas_engine(&executor_config, program.clone(), nonce, opts, calculate_gas);
159
160 if context.deferred_proof_verification {
161 join_set.spawn_blocking(move || {
162 verify_deferred_proofs(&proofs)?;
163 Ok::<_, anyhow::Error>(ExecutorOutput::VerifyDone)
164 });
165 }
166
167 let mut minimal_executor = MinimalExecutorRunner::new(
168 program.clone(),
169 false,
170 minimal_trace_chunk_threshold,
171 memory_limit,
172 trace_chunk_slots,
173 );
174
175 for buf in buffer {
177 minimal_executor.with_input(&buf);
178 }
179
180 let final_vm_state = FinalVmStateLock::new();
182
183 let (handle_sender, mut handle_receiver) = tokio::sync::mpsc::unbounded_channel();
185
186 join_set.spawn(async move {
188 let mut report = ExecutionReport::default();
189 let max_cycles = max_cycles.unwrap_or(u64::MAX);
190 let mut gas_handles: FuturesUnordered<SubmitHandle<GasExecutingEngine>> =
191 FuturesUnordered::new();
192 loop {
193 tokio::select! {
194 Some(result) = handle_receiver.recv() => {
195 let gas_handles_len = gas_handles.len();
196 tracing::debug!(num_gas_handles = %gas_handles_len, "Received gas handle");
197 gas_handles.push(result);
198
199 }
200
201 Some(result) = gas_handles.next() => {
202 let chunk_report = result.map_err(|e| anyhow::anyhow!("gas task panicked: {}", e))??;
203 let gas_handles_len = gas_handles.len();
204 tracing::debug!(num_gas_handles = %gas_handles_len, "Gas task finished.");
205 report += chunk_report;
206
207 let total_instructions = report.total_instruction_count();
208 if total_instructions >= max_cycles {
209 tracing::debug!("Cycle limit reached, stopping execution");
210 return Err(anyhow::anyhow!("cycle limit reached"));
211 }
212 }
213
214 else => {
215 tracing::debug!("No more gas handles to receive");
216 break;
217 }
218 }
219 }
220 while let Some(result) = gas_handles.next().await {
221 let chunk_report = result.map_err(|e| anyhow::anyhow!("gas task panicked: {}", e))??;
222 report += chunk_report;
223 }
224 Ok::<_, anyhow::Error>(ExecutorOutput::Report(report))
225 }.instrument(tracing::debug_span!("report_accumulator")));
226
227 let final_vm_state_clone = final_vm_state.clone();
229 join_set.spawn_blocking(move || {
230 while let Some(chunk) = minimal_executor
231 .try_execute_chunk()
232 .map_err(|e| anyhow::anyhow!("Execute chunk failed: {e}"))?
233 {
234 let handle = gas_engine
235 .blocking_submit(GasExecutingTask {
236 chunk,
237 final_vm_state: final_vm_state_clone.clone(),
238 })
239 .map_err(|e| anyhow::anyhow!("Gas engine submission failed: {}", e))?;
240 handle_sender.send(handle)?;
241 }
242 tracing::debug!("minimal executor finished in {} cycles", minimal_executor.global_clk());
243
244 #[cfg(feature = "profiling")]
246 let cycle_tracker = minimal_executor.take_cycle_tracker_totals();
247 #[cfg(feature = "profiling")]
248 let invocation_tracker = minimal_executor.take_invocation_tracker();
249
250 let public_value_stream = minimal_executor.into_public_values_stream();
251 let public_values = SP1PublicValues::from(&public_value_stream);
252
253 tracing::info!("public_value_stream: {:?}", public_value_stream);
254 Ok::<_, anyhow::Error>(ExecutorOutput::PublicValues {
255 public_values,
256 #[cfg(feature = "profiling")]
257 cycle_tracker,
258 #[cfg(feature = "profiling")]
259 invocation_tracker,
260 })
261 });
262
263 let mut final_report = ExecutionReport::default();
265 let mut public_values = SP1PublicValues::default();
266 #[cfg(feature = "profiling")]
267 let mut cycle_tracker_data: Option<(
268 hashbrown::HashMap<String, u64>,
269 hashbrown::HashMap<String, u64>,
270 )> = None;
271
272 while let Some(result) = join_set.join_next().await {
273 match result {
274 Ok(Ok(output)) => match output {
275 ExecutorOutput::PublicValues {
276 public_values: pv,
277 #[cfg(feature = "profiling")]
278 cycle_tracker,
279 #[cfg(feature = "profiling")]
280 invocation_tracker,
281 } => {
282 public_values = pv;
283 #[cfg(feature = "profiling")]
284 {
285 cycle_tracker_data = Some((cycle_tracker, invocation_tracker));
286 }
287 }
288 ExecutorOutput::Report(report) => final_report = report,
289 ExecutorOutput::VerifyDone => {}
290 },
291 Ok(Err(e)) => {
292 return Err(e);
294 }
295 Err(join_error) => {
296 return Err(join_error.into());
298 }
299 }
300 }
301
302 #[cfg(feature = "profiling")]
305 if let Some((cycle_tracker, invocation_tracker)) = cycle_tracker_data {
306 final_report.cycle_tracker = cycle_tracker;
307 final_report.invocation_tracker = invocation_tracker;
308 }
309
310 let public_value_digest: [u8; 32] = final_vm_state
312 .get()
313 .map(|state| {
314 let mut committed_value_digest = [0u8; 32];
315 state.public_value_digest.iter().enumerate().for_each(|(i, word)| {
316 let bytes = word.to_le_bytes();
317 committed_value_digest[i * 4..(i + 1) * 4].copy_from_slice(&bytes);
318 });
319 committed_value_digest
320 })
321 .ok_or(anyhow::anyhow!("Failed to extract public value digest"))?;
322
323 Ok((public_values, public_value_digest, final_report))
324}
325
326#[cfg(test)]
327mod tests {
328 use std::sync::Arc;
329
330 use sp1_core_executor::{Program, SP1Context, SP1CoreOpts};
331 use sp1_core_machine::io::SP1Stdin;
332
333 use super::{execute_with_options, SP1ExecutorConfig};
334
335 #[tokio::test]
336 async fn test_execute_with_optional_gas() {
337 let elf = test_artifacts::FIBONACCI_ELF;
338 let program = Arc::new(Program::from(&elf).unwrap());
339 let mut stdin = SP1Stdin::new();
340 stdin.write(&10usize);
341 let opts = SP1CoreOpts::default();
342 let executor_config = SP1ExecutorConfig::default();
343
344 let context = SP1Context::default();
345 let (pv, digest, report) =
346 execute_with_options(program, stdin, context, opts, executor_config).await.unwrap();
347
348 assert!(pv.hash() == digest.to_vec() || pv.blake3_hash() == digest.to_vec());
349 assert_eq!(report.exit_code, 0);
350 }
351}