Skip to main content

boundless_market/prover_utils/
mod.rs

1// Copyright 2026 Boundless Foundation, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14#![allow(missing_docs)]
15
16pub mod config;
17pub(crate) mod local_executor;
18pub mod prover;
19pub(crate) mod requestor_pricing;
20pub mod storage;
21
22#[cfg(not(feature = "prover_utils"))]
23pub use config::MarketConfig;
24#[cfg(feature = "prover_utils")]
25pub use config::{
26    defaults as config_defaults, BatcherConfig, Config, MarketConfig, OrderCommitmentPriority,
27    OrderPricingPriority, ProverConfig,
28};
29
30use crate::{
31    contracts::{
32        FulfillmentData, Predicate, PredicateType, ProofRequest, RequestError, RequestInputType,
33    },
34    input::GuestEnv,
35    selector::{is_blake3_groth16_selector, SupportedSelectors},
36    storage::StorageDownloader,
37    util::now_timestamp,
38};
39use alloy::{
40    primitives::{
41        utils::{format_ether, format_units, parse_units},
42        Address, Bytes, FixedBytes, U256,
43    },
44    uint,
45};
46use anyhow::Context;
47use hex::FromHex;
48use moka::future::Cache;
49use prover::ProverObj;
50use risc0_zkvm::sha::Digest as Risc0Digest;
51use serde::{Deserialize, Serialize};
52use sha2::{Digest, Sha256};
53use std::{
54    collections::HashSet,
55    fmt,
56    sync::{Arc, OnceLock},
57};
58use thiserror::Error;
59use OrderPricingOutcome::Skip;
60
61const ONE_MILLION: U256 = uint!(1_000_000_U256);
62
63/// Execution limit reasoning details.
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum ProveLimitReason {
66    EthPricing { max_price: U256, gas_cost: U256, mcycle_price_eth: U256 },
67    CollateralPricing { collateral_reward: String, mcycle_price_collateral: String },
68    ConfigCap { max_mcycles: u64 },
69    DeadlineCap { time_remaining_secs: u64, peak_prove_khz: u64 },
70}
71
72impl fmt::Display for ProveLimitReason {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        match self {
75            ProveLimitReason::CollateralPricing { collateral_reward, mcycle_price_collateral } => {
76                write!(
77                    f,
78                    "collateral pricing: order collateral reward {} / {} mcycle_price_collateral_token config",
79                    collateral_reward,
80                    mcycle_price_collateral,
81                )
82            }
83            ProveLimitReason::EthPricing { max_price, gas_cost, mcycle_price_eth } => {
84                write!(
85                    f,
86                    "ETH pricing: (order maxPrice {} - gas {}) / mcycle_price config ({} per Mcycle)",
87                    format_ether(*max_price),
88                    format_ether(*gas_cost),
89                    format_ether(*mcycle_price_eth)
90                )
91            }
92            ProveLimitReason::ConfigCap { max_mcycles } => {
93                write!(f, "broker max_mcycle_limit config setting ({} Mcycles)", max_mcycles)
94            }
95            ProveLimitReason::DeadlineCap { time_remaining_secs, peak_prove_khz } => {
96                write!(
97                    f,
98                    "deadline: {}s remaining with peak_prove_khz config ({} kHz)",
99                    time_remaining_secs, peak_prove_khz
100                )
101            }
102        }
103    }
104}
105
106/// Errors returned by pricing logic.
107#[derive(Error, Debug, Clone)]
108#[non_exhaustive]
109pub enum OrderPricingError {
110    /// Failed to fetch or upload input.
111    #[error("failed to fetch / push input: {0:#}")]
112    FetchInputErr(#[source] Arc<anyhow::Error>),
113    /// Failed to fetch or upload image.
114    #[error("failed to fetch / push image: {0:#}")]
115    FetchImageErr(#[source] Arc<anyhow::Error>),
116    /// Invalid request.
117    #[error("invalid request: {0}")]
118    RequestError(Arc<RequestError>),
119    /// RPC error.
120    #[error("RPC error: {0:#}")]
121    RpcErr(Arc<anyhow::Error>),
122    /// Unexpected error.
123    #[error("Unexpected error: {0:#}")]
124    UnexpectedErr(Arc<anyhow::Error>),
125}
126
127impl From<anyhow::Error> for OrderPricingError {
128    fn from(err: anyhow::Error) -> Self {
129        OrderPricingError::UnexpectedErr(Arc::new(err))
130    }
131}
132
133impl From<RequestError> for OrderPricingError {
134    fn from(err: RequestError) -> Self {
135        OrderPricingError::RequestError(Arc::new(err))
136    }
137}
138
139/// Order fulfillment mode.
140#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
141pub enum FulfillmentType {
142    LockAndFulfill,
143    FulfillAfterLockExpire,
144    // Currently not supported
145    FulfillWithoutLocking,
146}
147
148/// Order request from the network.
149#[derive(Serialize, Deserialize, Debug, Clone)]
150pub struct OrderRequest {
151    pub request: ProofRequest,
152    pub client_sig: Bytes,
153    pub fulfillment_type: FulfillmentType,
154    pub boundless_market_address: Address,
155    pub chain_id: u64,
156    pub image_id: Option<String>,
157    pub input_id: Option<String>,
158    pub total_cycles: Option<u64>,
159    pub target_timestamp: Option<u64>,
160    pub expire_timestamp: Option<u64>,
161    #[serde(skip)]
162    cached_id: OnceLock<String>,
163}
164
165impl OrderRequest {
166    pub fn new(
167        request: ProofRequest,
168        client_sig: Bytes,
169        fulfillment_type: FulfillmentType,
170        boundless_market_address: Address,
171        chain_id: u64,
172    ) -> Self {
173        Self {
174            request,
175            client_sig,
176            fulfillment_type,
177            boundless_market_address,
178            chain_id,
179            image_id: None,
180            input_id: None,
181            total_cycles: None,
182            target_timestamp: None,
183            expire_timestamp: None,
184            cached_id: OnceLock::new(),
185        }
186    }
187
188    pub fn id(&self) -> String {
189        self.cached_id
190            .get_or_init(|| {
191                let signing_hash = self
192                    .request
193                    .signing_hash(self.boundless_market_address, self.chain_id)
194                    .unwrap_or(FixedBytes::ZERO);
195                format!("0x{:x}-{}-{:?}", self.request.id, signing_hash, self.fulfillment_type)
196            })
197            .clone()
198    }
199
200    pub fn expiry(&self) -> u64 {
201        match self.fulfillment_type {
202            FulfillmentType::LockAndFulfill => self.request.lock_expires_at(),
203            FulfillmentType::FulfillAfterLockExpire => self.request.expires_at(),
204            FulfillmentType::FulfillWithoutLocking => self.request.expires_at(),
205        }
206    }
207}
208
209impl std::fmt::Display for OrderRequest {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        let total_mcycles = if self.total_cycles.is_some() {
212            format!(" ({} mcycles)", self.total_cycles.unwrap() / 1_000_000)
213        } else {
214            "".to_string()
215        };
216        write!(f, "{}{} [{}]", self.id(), total_mcycles, format_expiries(&self.request))
217    }
218}
219
220/// Outcome of order pricing.
221#[derive(Debug)]
222#[cfg_attr(not(feature = "prover_utils"), allow(dead_code))]
223pub enum OrderPricingOutcome {
224    /// Order should be locked and proving commence after lock is secured.
225    Lock {
226        total_cycles: u64,
227        target_timestamp_secs: u64,
228        expiry_secs: u64,
229        target_mcycle_price: U256,
230        max_mcycle_price: U256,
231        config_min_mcycle_price: U256,
232        current_mcycle_price: U256,
233    },
234    /// Do not lock the order, but consider proving and fulfilling it after the lock expires.
235    ProveAfterLockExpire {
236        total_cycles: u64,
237        lock_expire_timestamp_secs: u64,
238        expiry_secs: u64,
239        mcycle_price: U256,
240        config_min_mcycle_price: U256,
241    },
242    /// Do not accept engage order.
243    Skip { reason: String },
244}
245
246/// Value type for preflight cache.
247#[derive(Clone, Debug)]
248pub enum PreflightCacheValue {
249    Success { exec_session_id: String, cycle_count: u64, image_id: String, input_id: String },
250    Skip { cached_limit: u64 },
251}
252
253/// Input type for preflight cache.
254#[derive(Hash, Eq, PartialEq, Clone, Debug)]
255pub enum InputCacheKey {
256    /// URL-based input.
257    Url(String),
258    /// Hash-based input (for inline data).
259    Hash([u8; 32]),
260}
261
262/// Key type for the preflight cache.
263#[derive(Hash, Eq, PartialEq, Clone, Debug)]
264pub struct PreflightCacheKey {
265    /// The predicate data.
266    pub predicate_data: Vec<u8>,
267    /// The input cache key.
268    pub input: InputCacheKey,
269}
270
271/// Cache for preflight results to avoid duplicate computations.
272pub type PreflightCache = Arc<Cache<PreflightCacheKey, PreflightCacheValue>>;
273
274/// Upload an image to the prover using the provided downloader.
275///
276/// This is a standalone function (not a trait method) so it can be called from inside
277/// async closures like `try_get_with` without capturing `&self`.
278async fn upload_image_with_downloader(
279    prover: &ProverObj,
280    image_url: &str,
281    predicate: &crate::contracts::RequestPredicate,
282    downloader: &(dyn StorageDownloader + Send + Sync),
283) -> anyhow::Result<String> {
284    let predicate = Predicate::try_from(predicate.clone()).context("Failed to parse predicate")?;
285
286    let image_id_str = predicate.image_id().map(|image_id| image_id.to_string());
287
288    // Check if prover already has the image cached
289    if let Some(ref image_id_str) = image_id_str {
290        if prover.has_image(image_id_str).await? {
291            tracing::debug!("Skipping program upload for cached image ID: {image_id_str}");
292            return Ok(image_id_str.clone());
293        }
294    }
295
296    tracing::debug!("Fetching program from URI {image_url}");
297    let image_data = downloader
298        .download(image_url)
299        .await
300        .with_context(|| format!("Failed to fetch image URI: {image_url}"))?;
301
302    let image_id =
303        risc0_zkvm::compute_image_id(&image_data).context("Failed to compute image ID")?;
304
305    if let Some(ref expected_image_id_str) = image_id_str {
306        let expected_image_id = risc0_zkvm::sha::Digest::from_hex(expected_image_id_str)?;
307        if image_id != expected_image_id {
308            anyhow::bail!(
309                "image ID does not match requirements; expect {}, got {}",
310                expected_image_id,
311                image_id
312            );
313        }
314    }
315
316    let image_id_str = image_id.to_string();
317
318    tracing::debug!("Uploading program with image ID {image_id_str} to prover");
319    prover.upload_image(&image_id_str, image_data).await?;
320
321    Ok(image_id_str)
322}
323
324/// Upload input data to the prover (from inline data or URL) using the provided downloader.
325///
326/// This is a standalone function (not a trait method) so it can be called from inside
327/// async closures like `try_get_with` without capturing `&self`.
328///
329/// If `is_priority_requestor` is true, size limits are bypassed when fetching from URLs.
330async fn upload_input_with_downloader(
331    prover: &ProverObj,
332    input_type: crate::contracts::RequestInputType,
333    input_data: &Bytes,
334    downloader: &(dyn StorageDownloader + Send + Sync),
335    is_priority_requestor: bool,
336) -> anyhow::Result<String> {
337    match input_type {
338        crate::contracts::RequestInputType::Inline => {
339            let stdin = GuestEnv::decode(input_data).context("Failed to decode input")?.stdin;
340            prover.upload_input(stdin).await.map_err(|e| anyhow::anyhow!("{}", e))
341        }
342        crate::contracts::RequestInputType::Url => {
343            let input_url =
344                std::str::from_utf8(input_data).context("input url is not valid utf8")?;
345
346            tracing::debug!("Fetching input from URI {input_url}");
347            let raw_input = if is_priority_requestor {
348                downloader.download_with_limit(input_url, usize::MAX).await
349            } else {
350                downloader.download(input_url).await
351            }
352            .with_context(|| format!("Failed to fetch input URI: {input_url}"))?;
353
354            let stdin =
355                GuestEnv::decode(&raw_input).context("Failed to decode input from URL")?.stdin;
356
357            prover.upload_input(stdin).await.map_err(|e| anyhow::anyhow!("{}", e))
358        }
359        crate::contracts::RequestInputType::__Invalid => {
360            anyhow::bail!("Invalid input type")
361        }
362    }
363}
364
365/// Context required by order pricing.
366#[allow(async_fn_in_trait)]
367pub trait OrderPricingContext {
368    fn market_config(&self) -> Result<MarketConfig, OrderPricingError>;
369    fn supported_selectors(&self) -> &SupportedSelectors;
370    fn preflight_cache(&self) -> &PreflightCache;
371    fn collateral_token_decimals(&self) -> u8;
372    fn format_collateral(&self, value: U256) -> String {
373        format_units(value, self.collateral_token_decimals()).unwrap_or_else(|_| "?".to_string())
374    }
375    /// Returns the set of denied requestor addresses, if any are configured.
376    fn denied_requestor_addresses(&self) -> Result<Option<HashSet<Address>>, OrderPricingError> {
377        Ok(None)
378    }
379    fn check_requestor_allowed(
380        &self,
381        order: &OrderRequest,
382        denied_addresses_opt: Option<&HashSet<Address>>,
383    ) -> Result<Option<OrderPricingOutcome>, OrderPricingError>;
384    async fn check_request_available(
385        &self,
386        order: &OrderRequest,
387    ) -> Result<Option<OrderPricingOutcome>, OrderPricingError>;
388    #[cfg(feature = "prover_utils")]
389    async fn estimate_gas_to_fulfill_pending(&self) -> Result<u64, OrderPricingError>;
390    fn is_priority_requestor(&self, client_addr: &Address) -> bool;
391    async fn check_available_balances(
392        &self,
393        order: &OrderRequest,
394        order_gas_cost: U256,
395        lock_expired: bool,
396        lockin_collateral: U256,
397    ) -> Result<Option<OrderPricingOutcome>, OrderPricingError>;
398    async fn current_gas_price(&self) -> Result<u128, OrderPricingError>;
399
400    /// Access to the prover for preflight operations.
401    fn prover(&self) -> &ProverObj;
402
403    /// Access to the downloader for fetching images and inputs.
404    /// Returns an Arc so it can be cloned into async closures.
405    fn downloader(&self) -> Arc<dyn StorageDownloader + Send + Sync>;
406
407    /// Fetch data from a URI using the downloader.
408    #[cfg(feature = "prover_utils")]
409    async fn fetch_url(&self, url: &str) -> Result<Vec<u8>, OrderPricingError> {
410        self.downloader()
411            .download(url)
412            .await
413            .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e.into())))
414    }
415
416    /// Upload an image from a URL to the prover using the downloader.
417    #[cfg(feature = "prover_utils")]
418    async fn upload_image(
419        &self,
420        image_url: &str,
421        predicate: &crate::contracts::RequestPredicate,
422    ) -> Result<String, OrderPricingError> {
423        upload_image_with_downloader(
424            self.prover(),
425            image_url,
426            predicate,
427            self.downloader().as_ref(),
428        )
429        .await
430        .map_err(|e| OrderPricingError::FetchImageErr(Arc::new(e)))
431    }
432
433    /// Upload input data to the prover (from inline data or URL) using the downloader.
434    #[cfg(feature = "prover_utils")]
435    async fn upload_input(&self, order: &OrderRequest) -> Result<String, OrderPricingError> {
436        let is_priority = self.is_priority_requestor(&order.request.client_address());
437        upload_input_with_downloader(
438            self.prover(),
439            order.request.input.inputType,
440            &order.request.input.data,
441            self.downloader().as_ref(),
442            is_priority,
443        )
444        .await
445        .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e)))
446    }
447
448    /// Execute preflight for an order.
449    async fn preflight_execute(
450        &self,
451        order: &OrderRequest,
452        exec_limit_cycles: u64,
453        cache_key: PreflightCacheKey,
454    ) -> Result<PreflightCacheValue, OrderPricingError> {
455        let order_id = order.id();
456
457        // Clone all data needed inside the closure
458        let prover = self.prover().clone();
459        let downloader = self.downloader();
460        let cache = self.preflight_cache().clone();
461        let image_url = order.request.imageUrl.clone();
462        let predicate = order.request.requirements.predicate.clone();
463        let input_type = order.request.input.inputType;
464        let input_data = order.request.input.data.clone();
465        let order_id_clone = order_id.clone();
466        let is_priority = self.is_priority_requestor(&order.request.client_address());
467
468        // Multiple concurrent calls of this coalesce into a single execution.
469        // https://docs.rs/moka/latest/moka/future/struct.Cache.html#concurrent-calls-on-the-same-key
470        let result = cache
471            .try_get_with(cache_key, async move {
472                tracing::trace!(
473                    "Starting preflight execution of {order_id_clone} with limit of {exec_limit_cycles} cycles"
474                );
475
476                // Upload image from URL using downloader
477                let image_id =
478                    upload_image_with_downloader(&prover, &image_url, &predicate, downloader.as_ref())
479                        .await
480                        .map_err(|e| OrderPricingError::FetchImageErr(Arc::new(e)))?;
481
482                // Upload input using downloader
483                let input_id =
484                    upload_input_with_downloader(&prover, input_type, &input_data, downloader.as_ref(), is_priority)
485                        .await
486                        .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e)))?;
487
488                match prover
489                    .preflight(&image_id, &input_id, vec![], Some(exec_limit_cycles), &order_id_clone)
490                    .await
491                {
492                    Ok(res) => {
493                        tracing::debug!(
494                            "Preflight execution of {order_id_clone} with session id {} and {} mcycles completed",
495                            res.id,
496                            res.stats.total_cycles / 1_000_000
497                        );
498                        Ok(PreflightCacheValue::Success {
499                            exec_session_id: res.id,
500                            cycle_count: res.stats.total_cycles,
501                            image_id,
502                            input_id,
503                        })
504                    }
505                    Err(err) => {
506                        let err_msg = err.to_string();
507                        if err_msg.contains("Session limit exceeded")
508                            || err_msg.contains("Execution stopped intentionally due to session limit")
509                        {
510                            tracing::debug!(
511                                "Skipping order {order_id_clone} due to intentional execution limit of {exec_limit_cycles}"
512                            );
513                            Ok(PreflightCacheValue::Skip { cached_limit: exec_limit_cycles })
514                        } else if err_msg.contains("Guest panicked") || err_msg.contains("GuestPanic") {
515                            tracing::debug!(
516                                "Skipping order {order_id_clone} due to guest panic: {}",
517                                err_msg
518                            );
519                            Ok(PreflightCacheValue::Skip { cached_limit: u64::MAX })
520                        } else {
521                            Err(OrderPricingError::UnexpectedErr(Arc::new(err.into())))
522                        }
523                    }
524                }
525            })
526            .await
527            .map_err(|e| (*e).clone())?;
528
529        Ok(result)
530    }
531
532    /// Fetch the journal from a preflight execution.
533    async fn preflight_journal(&self, exec_session_id: &str) -> Result<Vec<u8>, OrderPricingError> {
534        self.prover()
535            .get_preflight_journal(exec_session_id)
536            .await
537            .map_err(|e| OrderPricingError::UnexpectedErr(Arc::new(e.into())))?
538            .ok_or_else(|| {
539                OrderPricingError::UnexpectedErr(Arc::new(anyhow::anyhow!(
540                    "Preflight journal not found"
541                )))
542            })
543    }
544    async fn price_order(
545        &self,
546        order: &mut OrderRequest,
547    ) -> Result<OrderPricingOutcome, OrderPricingError> {
548        let order_id = order.id();
549        tracing::debug!("Pricing order {order_id}");
550
551        let now = now_timestamp();
552
553        // If order_expiration > lock_expiration the period in-between is when order can be filled
554        // by anyone without staking to partially claim the slashed collateral
555        let lock_expired = order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire;
556
557        let expiration = order.expiry();
558        let lockin_collateral =
559            if lock_expired { U256::ZERO } else { U256::from(order.request.offer.lockCollateral) };
560
561        if expiration <= now {
562            return Ok(Skip { reason: "order has expired".to_string() });
563        };
564
565        let config = self.market_config()?;
566        let min_deadline = config.min_deadline;
567        let denied_addresses_opt = self.denied_requestor_addresses()?;
568
569        // Does the order expire within the min deadline
570        let seconds_left = expiration.saturating_sub(now);
571        if seconds_left <= min_deadline {
572            return Ok(Skip {
573                reason: format!(
574                    "order expires in {seconds_left} seconds with min_deadline {min_deadline}"
575                ),
576            });
577        }
578
579        // Check if requestor is allowed (from both static config and dynamic lists)
580        if let Some(outcome) = self.check_requestor_allowed(order, denied_addresses_opt.as_ref())? {
581            return Ok(outcome);
582        }
583
584        if !self.supported_selectors().is_supported(order.request.requirements.selector) {
585            return Ok(Skip {
586                reason: format!(
587                    "unsupported selector requirement. Requested: {:x}. Supported: {:?}",
588                    order.request.requirements.selector,
589                    self.supported_selectors()
590                        .selectors
591                        .iter()
592                        .map(|(k, v)| format!("{k:x} ({v:?})"))
593                        .collect::<Vec<_>>()
594                ),
595            });
596        };
597
598        // Check if the collateral is sane and if we can afford it
599        // For lock expired orders, we don't check the max collateral because we can't lock those orders.
600        let max_collateral: U256 =
601            parse_units(&config.max_collateral, self.collateral_token_decimals())
602                .context("Failed to parse max_collateral")?
603                .into();
604
605        if !lock_expired && lockin_collateral > max_collateral {
606            return Ok(Skip {
607                reason: format!(
608                    "order collateral requirement exceeds max_collateral config {} > {}",
609                    self.format_collateral(lockin_collateral),
610                    self.format_collateral(max_collateral),
611                ),
612            });
613        }
614
615        // Short circuit if the order cannot be acted on.
616        if let Some(outcome) = self.check_request_available(order).await? {
617            return Ok(outcome);
618        }
619
620        // Check that we have both enough staking tokens to collateral, and enough gas tokens to lock and fulfil
621        // NOTE: We use the current gas price and a rough heuristic on gas costs. Its possible that
622        // gas prices may go up (or down) by the time its time to fulfill. This does not aim to be
623        // a tight estimate, although improving this estimate will allow for a more profit.
624        let gas_price = self.current_gas_price().await?;
625        let order_gas = if lock_expired {
626            // No need to include lock gas if its a lock expired order
627            U256::from(
628                estimate_gas_to_fulfill(&config, self.supported_selectors(), &order.request)
629                    .await?,
630            )
631        } else {
632            U256::from(
633                estimate_gas_to_lock(&config, order).await?
634                    + estimate_gas_to_fulfill(&config, self.supported_selectors(), &order.request)
635                        .await?,
636            )
637        };
638        let order_gas_cost = U256::from(gas_price) * order_gas;
639        tracing::debug!(
640            "Estimated {order_gas} gas to {} order {order_id}; {} ether @ {} gwei",
641            if lock_expired { "fulfill" } else { "lock and fulfill" },
642            format_ether(order_gas_cost),
643            format_units(gas_price, "gwei").unwrap()
644        );
645
646        if let Some(outcome) = self
647            .check_available_balances(order, order_gas_cost, lock_expired, lockin_collateral)
648            .await?
649        {
650            return Ok(outcome);
651        }
652
653        // Calculate exec limit (handles priority requestors and config internally)
654        let (exec_limit_cycles, prove_limit, prove_limit_reason) =
655            self.calculate_exec_limits(order, order_gas_cost)?;
656
657        if prove_limit < 2 {
658            // Exec limit is based on user cycles, and 2 is the minimum number of user cycles for a
659            // provable execution.
660            // TODO when/if total cycle limit is allowed in future, update this to be total cycle min
661            return Ok(Skip {
662                reason: format!("cycle limit hit from max reward: {} cycles", prove_limit),
663            });
664        }
665
666        tracing::debug!(
667            "Starting preflight execution of {order_id} with limit of {} cycles (~{} mcycles)",
668            exec_limit_cycles,
669            exec_limit_cycles / 1_000_000
670        );
671
672        // Create cache key based on input type
673        let predicate_data = order.request.requirements.predicate.data.to_vec();
674        let cache_key = match order.request.input.inputType {
675            RequestInputType::Url => {
676                let input_url = std::str::from_utf8(&order.request.input.data)
677                    .context("input url is not utf8")
678                    .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e)))?
679                    .to_string();
680                PreflightCacheKey { predicate_data, input: InputCacheKey::Url(input_url) }
681            }
682            RequestInputType::Inline => {
683                // For inline inputs, use SHA256 hash of the data
684                let mut hasher = Sha256::new();
685                sha2::Digest::update(&mut hasher, &order.request.input.data);
686                let input_hash: [u8; 32] = hasher.finalize().into();
687                PreflightCacheKey { predicate_data, input: InputCacheKey::Hash(input_hash) }
688            }
689            RequestInputType::__Invalid => {
690                return Err(OrderPricingError::UnexpectedErr(Arc::new(anyhow::anyhow!(
691                    "Unknown input type: {:?}",
692                    order.request.input.inputType
693                ))));
694            }
695        };
696
697        let preflight_result = loop {
698            let result =
699                self.preflight_execute(order, exec_limit_cycles, cache_key.clone()).await?;
700            if let PreflightCacheValue::Skip { cached_limit } = result {
701                if cached_limit < exec_limit_cycles {
702                    self.preflight_cache().invalidate(&cache_key).await;
703                    continue;
704                }
705            }
706            break result;
707        };
708
709        let (exec_session_id, cycle_count, image_id) = match preflight_result {
710            PreflightCacheValue::Success { exec_session_id, cycle_count, image_id, input_id } => {
711                tracing::debug!(
712                    "Using preflight result for {order_id}: session id {} with {} mcycles",
713                    exec_session_id,
714                    cycle_count / 1_000_000
715                );
716
717                // Update order with the uploaded IDs
718                order.image_id = Some(image_id.clone());
719                order.input_id = Some(input_id.clone());
720
721                (exec_session_id, cycle_count, image_id)
722            }
723            PreflightCacheValue::Skip { cached_limit } => {
724                return Ok(Skip {
725                    reason: format!(
726                        "order preflight execution limit hit with {cached_limit} cycles - limited by {prove_limit_reason}"
727                    ),
728                });
729            }
730        };
731
732        // If a max_mcycle_limit is configured check if the order is over that limit
733        if cycle_count > prove_limit {
734            // If the preflight execution has completed, but for the variant is rejected,
735            // provide the config value that needs to be updated in order to have accepted.
736            let config_info = match &prove_limit_reason {
737                ProveLimitReason::EthPricing { max_price, gas_cost, mcycle_price_eth } => {
738                    let max_price_gas_adjusted = max_price.saturating_sub(*gas_cost);
739                    let required_price_per_mcycle = max_price_gas_adjusted
740                        .saturating_mul(ONE_MILLION)
741                        / U256::from(cycle_count);
742                    let required_price_per_mcycle_ignore_gas =
743                        max_price.saturating_mul(ONE_MILLION) / U256::from(cycle_count);
744                    format!(
745                        "min_mcycle_price set to {} ETH/Mcycle in config, order requires min_mcycle_price <= {} ETH/Mcycle to be considered (gas cost: {} ETH, ignoring gas requires min {} ETH/Mcycle)",
746                        format_ether(*mcycle_price_eth),
747                        format_ether(required_price_per_mcycle),
748                        format_ether(*gas_cost),
749                        format_ether(required_price_per_mcycle_ignore_gas)
750                    )
751                }
752                ProveLimitReason::CollateralPricing { mcycle_price_collateral, .. } => {
753                    let reward =
754                        order.request.offer.collateral_reward_if_locked_and_not_fulfilled();
755                    let required_collateral_price =
756                        reward.saturating_mul(ONE_MILLION) / U256::from(cycle_count);
757                    format!(
758                        "min_mcycle_price_collateral_token set to {} ZKC/Mcycle in config, order requires min_mcycle_price_collateral_token <= {} ZKC/Mcycle to be considered",
759                        mcycle_price_collateral,
760                        self.format_collateral(required_collateral_price)
761                    )
762                }
763                ProveLimitReason::ConfigCap { max_mcycles } => {
764                    let required_mcycles = cycle_count.div_ceil(1_000_000);
765                    format!(
766                        "max_mcycle_limit set to {} Mcycles in config, order requires max_mcycle_limit >= {} Mcycles to be considered",
767                        max_mcycles, required_mcycles
768                    )
769                }
770                ProveLimitReason::DeadlineCap { time_remaining_secs, peak_prove_khz } => {
771                    let denom = time_remaining_secs.saturating_mul(1_000);
772                    let required_khz = cycle_count.div_ceil(denom);
773                    format!(
774                        "peak_prove_khz set to {} kHz in config, order requires peak_prove_khz >= {} kHz to be considered",
775                        peak_prove_khz, required_khz
776                    )
777                }
778            };
779
780            return Ok(Skip {
781                reason: format!(
782                    "order with {cycle_count} cycles above limit of {prove_limit} cycles - {config_info}"
783               ),
784            });
785        }
786
787        order.total_cycles = Some(cycle_count);
788
789        if config.min_mcycle_limit > 0 {
790            let min_cycles = config.min_mcycle_limit.saturating_mul(1_000_000);
791            if cycle_count < min_cycles {
792                tracing::debug!(
793                        "Order {order_id} skipped due to min_mcycle_limit config: {} cycles < {} cycles",
794                        cycle_count,
795                        min_cycles
796                    );
797                return Ok(Skip {
798                        reason: format!(
799                            "order with {} cycles below min limit of {} cycles - min_mcycle_limit set to {} Mcycles in config",
800                            cycle_count,
801                            min_cycles,
802                            config.min_mcycle_limit,
803                        ),
804                    });
805            }
806        }
807
808        let journal = self.preflight_journal(&exec_session_id).await?;
809        let order_predicate_type = order.request.requirements.predicate.predicateType;
810        if matches!(order_predicate_type, PredicateType::PrefixMatch | PredicateType::DigestMatch)
811            && journal.len() > config.max_journal_bytes
812        {
813            return Ok(OrderPricingOutcome::Skip {
814                reason: format!(
815                    "order journal larger than set limit ({} > {})",
816                    journal.len(),
817                    config.max_journal_bytes
818                ),
819            });
820        }
821
822        // If the selector is a blake3 groth16 selector, ensure the journal is exactly 32 bytes
823        if is_blake3_groth16_selector(order.request.requirements.selector) && journal.len() != 32 {
824            tracing::info!(
825                "Order {order_id} journal is not 32 bytes for blake3 groth16 selector, skipping",
826            );
827            return Ok(Skip {
828                reason: "blake3 groth16 selector requires 32 byte journal".to_string(),
829            });
830        }
831
832        // Validate the predicates:
833        let predicate = Predicate::try_from(order.request.requirements.predicate.clone())
834            .map_err(|e| OrderPricingError::RequestError(Arc::new(e.into())))?;
835        let eval_data = if is_blake3_groth16_selector(order.request.requirements.selector) {
836            // These proofs must have no journal delivery because they cannot be authenticated on chain.
837            FulfillmentData::None
838        } else {
839            FulfillmentData::from_image_id_and_journal(
840                Risc0Digest::from_hex(image_id).context("Failed to parse image ID")?,
841                journal,
842            )
843        };
844        if predicate.eval(&eval_data).is_none() {
845            return Ok(Skip { reason: "order predicate check failed".to_string() });
846        }
847
848        // For lock_expired orders, evaluate based on collateral
849        if lock_expired {
850            // Reward for the order is a fraction of the collateral once the lock has expired
851            let price = order.request.offer.collateral_reward_if_locked_and_not_fulfilled();
852            let mcycle_price_in_collateral_tokens =
853                price.saturating_mul(ONE_MILLION) / U256::from(cycle_count);
854
855            let config_min_mcycle_price_collateral_tokens = parse_units(
856                &config.min_mcycle_price_collateral_token,
857                self.collateral_token_decimals(),
858            )
859            .context("Failed to parse min_mcycle_price_collateral_token")?
860            .into();
861
862            tracing::debug!(
863                "Order price: {} (collateral tokens) - cycles: {} - mcycle price: {} (collateral tokens), config_min_mcycle_price_collateral_tokens: {} (collateral tokens)",
864                format_ether(price),
865                cycle_count,
866                self.format_collateral(mcycle_price_in_collateral_tokens),
867                self.format_collateral(config_min_mcycle_price_collateral_tokens),
868            );
869
870            // Skip the order if it will never be worth it
871            if mcycle_price_in_collateral_tokens < config_min_mcycle_price_collateral_tokens {
872                return Ok(Skip {
873                    reason: format!(
874                        "slashed collateral reward too low. {} (collateral reward) < config mcycle_price_collateral_token {}",
875                        self.format_collateral(mcycle_price_in_collateral_tokens),
876                        self.format_collateral(config_min_mcycle_price_collateral_tokens),
877                    ),
878                });
879            }
880
881            Ok(OrderPricingOutcome::ProveAfterLockExpire {
882                total_cycles: cycle_count,
883                lock_expire_timestamp_secs: order.request.offer.rampUpStart
884                    + order.request.offer.lockTimeout as u64,
885                expiry_secs: order.request.offer.rampUpStart + order.request.offer.timeout as u64,
886                mcycle_price: mcycle_price_in_collateral_tokens,
887                config_min_mcycle_price: config_min_mcycle_price_collateral_tokens,
888            })
889        } else {
890            // For lockable orders, evaluate based on ETH price
891            let config_min_mcycle_price: U256 = parse_units(&config.min_mcycle_price, 18)
892                .context("Failed to parse min_mcycle_price")?
893                .into();
894
895            let order_id = order.id();
896
897            let mcycle_price_min = U256::from(order.request.offer.minPrice)
898                .saturating_sub(order_gas_cost)
899                .saturating_mul(ONE_MILLION)
900                / U256::from(cycle_count);
901            let mcycle_price_max = U256::from(order.request.offer.maxPrice)
902                .saturating_sub(order_gas_cost)
903                .saturating_mul(ONE_MILLION)
904                / U256::from(cycle_count);
905
906            tracing::debug!(
907                "Order {order_id} price: {}-{} ETH, {}-{} ETH per mcycle, {} collateral required, {} ETH gas cost",
908                format_ether(U256::from(order.request.offer.minPrice)),
909                format_ether(U256::from(order.request.offer.maxPrice)),
910                format_ether(mcycle_price_min),
911                format_ether(mcycle_price_max),
912                self.format_collateral(order.request.offer.lockCollateral),
913                format_ether(order_gas_cost),
914            );
915
916            // Skip the order if it will never be worth it
917            if mcycle_price_max < config_min_mcycle_price {
918                return Ok(OrderPricingOutcome::Skip {
919                    reason: format!(
920                        "order max price {} is less than mcycle_price config {}",
921                        format_ether(U256::from(order.request.offer.maxPrice)),
922                        format_ether(config_min_mcycle_price),
923                    ),
924                });
925            }
926
927            let current_mcycle_price = order
928                .request
929                .offer
930                .price_at(now_timestamp())
931                .context("Failed to get current mcycle price")?;
932            let (target_mcycle_price, target_timestamp_secs) =
933                if mcycle_price_min >= config_min_mcycle_price {
934                    tracing::info!(
935                        "Selecting order {order_id} at price {} - ASAP",
936                        format_ether(current_mcycle_price)
937                    );
938                    (mcycle_price_min, 0) // Schedule the lock ASAP
939                } else {
940                    let target_min_price = config_min_mcycle_price
941                        .saturating_mul(U256::from(cycle_count))
942                        .div_ceil(ONE_MILLION)
943                        + order_gas_cost;
944                    tracing::debug!(
945                        "Order {order_id} minimum profitable price: {} ETH",
946                        format_ether(target_min_price)
947                    );
948
949                    let target_time = order
950                        .request
951                        .offer
952                        .time_at_price(target_min_price)
953                        .context("Failed to get target price timestamp")?;
954                    (target_min_price, target_time)
955                };
956
957            let expiry_secs =
958                order.request.offer.rampUpStart + order.request.offer.lockTimeout as u64;
959
960            Ok(OrderPricingOutcome::Lock {
961                total_cycles: cycle_count,
962                target_timestamp_secs,
963                expiry_secs,
964                target_mcycle_price,
965                max_mcycle_price: mcycle_price_max,
966                config_min_mcycle_price,
967                current_mcycle_price,
968            })
969        }
970    }
971
972    /// Calculates the cycle limit for the preflight and also for the max cycles that this specific
973    /// order variant will consider proving for.
974    ///
975    /// The reason for calculating both preflight and prove limits is to execute the order with
976    /// a large enough cycle limit for both lock and fulfill orders as well as for if the order
977    /// expires and to prove after lock expiry so that the execution can be cached and only happen
978    /// once. The prove limit is the limit for this specific order variant and decides the max
979    /// cycles the order can be for the prover to decide to commit to proving it.
980    fn calculate_exec_limits(
981        &self,
982        order: &OrderRequest,
983        order_gas_cost: U256,
984    ) -> Result<(u64, u64, ProveLimitReason), OrderPricingError> {
985        // Derive parameters from order
986        let order_id = order.id();
987        let is_fulfill_after_lock_expire =
988            order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire;
989        let now = now_timestamp();
990        let request_expiration = order.expiry();
991        let lock_expiry = order.request.lock_expires_at();
992        let order_expiry = order.request.expires_at();
993        let config = self.market_config()?;
994        let min_mcycle_price = parse_units(&config.min_mcycle_price, 18)
995            .context("Failed to parse min_mcycle_price")?
996            .into();
997        let min_mcycle_price_collateral_tokens: U256 = parse_units(
998            &config.min_mcycle_price_collateral_token,
999            self.collateral_token_decimals(),
1000        )
1001        .context("Failed to parse min_mcycle_price_collateral_token")?
1002        .into();
1003
1004        // Pricing based cycle limits: Calculate the cycle limit based on collateral price
1005        let collateral_based_limit = if min_mcycle_price_collateral_tokens == U256::ZERO {
1006            tracing::info!("min_mcycle_price_collateral_token is 0, setting unlimited exec limit");
1007            u64::MAX
1008        } else {
1009            let price = order.request.offer.collateral_reward_if_locked_and_not_fulfilled();
1010
1011            let initial_collateral_based_limit =
1012                (price.saturating_mul(ONE_MILLION).div_ceil(min_mcycle_price_collateral_tokens))
1013                    .try_into()
1014                    .unwrap_or(u64::MAX);
1015
1016            tracing::trace!(
1017                "Order {order_id} initial collateral based limit: {initial_collateral_based_limit}"
1018            );
1019            initial_collateral_based_limit
1020        };
1021
1022        let mut preflight_limit = collateral_based_limit;
1023        let mut prove_limit = collateral_based_limit;
1024        let mut prove_limit_reason = ProveLimitReason::CollateralPricing {
1025            mcycle_price_collateral: self.format_collateral(min_mcycle_price_collateral_tokens),
1026            collateral_reward: self.format_collateral(
1027                order.request.offer.collateral_reward_if_locked_and_not_fulfilled(),
1028            ),
1029        };
1030
1031        // If lock and fulfill, potentially increase that to ETH-based value if higher
1032        if !is_fulfill_after_lock_expire {
1033            // Calculate eth-based limit for lock and fulfill orders
1034            let eth_based_limit: u64 = if min_mcycle_price == U256::ZERO {
1035                tracing::info!("min_mcycle_price is 0, setting unlimited exec limit");
1036                u64::MAX
1037            } else {
1038                let limit: U256 = U256::from(order.request.offer.maxPrice)
1039                    .saturating_sub(order_gas_cost)
1040                    .saturating_mul(ONE_MILLION)
1041                    / min_mcycle_price;
1042                limit.try_into().unwrap_or(u64::MAX)
1043            };
1044
1045            if eth_based_limit > collateral_based_limit {
1046                // Eth based limit is higher, use that for both preflight and prove
1047                tracing::debug!("Order {order_id} eth based limit ({eth_based_limit}) > collateral based limit ({collateral_based_limit}), using eth based limit for both preflight and prove");
1048                preflight_limit = eth_based_limit;
1049                prove_limit = eth_based_limit;
1050            } else {
1051                // Otherwise lower the prove cycle limit for this order variant
1052                tracing::debug!("Order {order_id} eth based limit ({eth_based_limit}) < collateral based limit ({collateral_based_limit}), using eth based limit for prove");
1053                prove_limit = eth_based_limit;
1054            }
1055            prove_limit_reason = ProveLimitReason::EthPricing {
1056                max_price: U256::from(order.request.offer.maxPrice),
1057                gas_cost: order_gas_cost,
1058                mcycle_price_eth: min_mcycle_price,
1059            };
1060        }
1061
1062        debug_assert!(
1063            preflight_limit >= prove_limit,
1064            "preflight_limit ({preflight_limit}) < prove_limit ({prove_limit})",
1065        );
1066
1067        // Apply max mcycle limit cap
1068        // Check if priority requestor address - skip all exec limit calculations
1069        let client_addr = order.request.client_address();
1070        let skip_mcycle_limit = self.is_priority_requestor(&client_addr);
1071        if skip_mcycle_limit {
1072            tracing::debug!("Order {order_id} exec limit config ignored due to client {} being part of priority requestors.", client_addr);
1073        }
1074
1075        if !skip_mcycle_limit {
1076            let config_cycle_limit = config.max_mcycle_limit.saturating_mul(1_000_000);
1077            if prove_limit > config_cycle_limit {
1078                tracing::debug!(
1079                    "Order {order_id} prove limit capped by max_mcycle_limit config: {} -> {} cycles",
1080                    prove_limit,
1081                    config_cycle_limit
1082                );
1083                prove_limit = config_cycle_limit;
1084                preflight_limit = config_cycle_limit;
1085                prove_limit_reason =
1086                    ProveLimitReason::ConfigCap { max_mcycles: config.max_mcycle_limit };
1087            } else if preflight_limit > config_cycle_limit {
1088                preflight_limit = config_cycle_limit;
1089            }
1090        }
1091
1092        // Apply timing constraints based on peak prove khz
1093        if let Some(peak_prove_khz) = config.peak_prove_khz {
1094            let prove_window = request_expiration.saturating_sub(now);
1095            let prove_deadline_limit = calculate_max_cycles_for_time(peak_prove_khz, prove_window);
1096            if prove_limit > prove_deadline_limit {
1097                tracing::debug!("Order {order_id} prove limit capped by deadline: {} -> {} cycles ({:.1}s at {} peak_prove_khz)", prove_limit, prove_deadline_limit, prove_window, peak_prove_khz);
1098                prove_limit = prove_deadline_limit;
1099                prove_limit_reason = ProveLimitReason::DeadlineCap {
1100                    time_remaining_secs: prove_window,
1101                    peak_prove_khz,
1102                };
1103            }
1104
1105            // For preflight, also check fulfill-after-expiry window
1106            let new_preflight_limit = if !is_fulfill_after_lock_expire {
1107                let fulfill_after_expiry_window = order_expiry.saturating_sub(lock_expiry);
1108                let fulfill_after_expiry_limit =
1109                    calculate_max_cycles_for_time(peak_prove_khz, fulfill_after_expiry_window);
1110                std::cmp::max(prove_deadline_limit, fulfill_after_expiry_limit)
1111            } else {
1112                prove_deadline_limit
1113            };
1114
1115            if preflight_limit > new_preflight_limit {
1116                tracing::debug!("Order {order_id} preflight limit capped by deadline: {} -> {} cycles ({:.1}s at {} peak_prove_khz)", preflight_limit, new_preflight_limit, prove_window, peak_prove_khz);
1117                preflight_limit = new_preflight_limit;
1118            }
1119        }
1120
1121        tracing::trace!(
1122            "Order {order_id} final limits - preflight: {} cycles, prove: {} cycles (reason: {})",
1123            preflight_limit,
1124            prove_limit,
1125            prove_limit_reason
1126        );
1127
1128        debug_assert!(
1129            preflight_limit >= prove_limit,
1130            "preflight_limit ({preflight_limit}) < prove_limit ({prove_limit})",
1131        );
1132
1133        Ok((preflight_limit, prove_limit, prove_limit_reason))
1134    }
1135}
1136
1137fn calculate_max_cycles_for_time(peak_prove_khz: u64, time_remaining_secs: u64) -> u64 {
1138    peak_prove_khz.saturating_mul(time_remaining_secs) * 1_000
1139}
1140
1141fn format_expiries(request: &ProofRequest) -> String {
1142    let now: i64 = now_timestamp().try_into().unwrap();
1143    let lock_expires_at: i64 = request.lock_expires_at().try_into().unwrap();
1144    let lock_expires_delta = lock_expires_at - now;
1145    let lock_expires_delta_str = if lock_expires_delta > 0 {
1146        format!("{lock_expires_delta} seconds from now")
1147    } else {
1148        format!("{} seconds ago", lock_expires_delta.abs())
1149    };
1150    let expires_at: i64 = request.expires_at().try_into().unwrap();
1151    let expires_delta = expires_at - now;
1152    let expires_delta_str = if expires_delta > 0 {
1153        format!("{expires_delta} seconds from now")
1154    } else {
1155        format!("{} seconds ago", expires_delta.abs())
1156    };
1157    format!("Lock expires {lock_expires_delta_str}, expires {expires_delta_str}")
1158}
1159
1160async fn estimate_gas_to_fulfill(
1161    config: &MarketConfig,
1162    supported_selectors: &SupportedSelectors,
1163    request: &ProofRequest,
1164) -> anyhow::Result<u64> {
1165    use crate::selector::ProofType;
1166
1167    let selector = request.requirements.selector;
1168    if !supported_selectors.is_supported(selector) {
1169        anyhow::bail!("unsupported selector requirement: {selector:x}");
1170    }
1171
1172    let mut estimate = config.fulfill_gas_estimate;
1173
1174    // Add gas for orders that make use of the callbacks feature.
1175    estimate += u64::try_from(
1176        request
1177            .requirements
1178            .callback
1179            .as_option()
1180            .map(|callback| callback.gasLimit)
1181            .unwrap_or(alloy::primitives::aliases::U96::ZERO),
1182    )?;
1183
1184    estimate += match supported_selectors.proof_type(selector).context("unsupported selector")? {
1185        ProofType::Any | ProofType::Inclusion => 0,
1186        ProofType::Groth16 | ProofType::Blake3Groth16 => config.groth16_verify_gas_estimate,
1187    };
1188
1189    Ok(estimate)
1190}
1191
1192/// Gas allocated to verifying a smart contract signature. Copied from BoundlessMarket.sol.
1193const ERC1271_MAX_GAS_FOR_CHECK: u64 = 100000;
1194
1195async fn estimate_gas_to_lock(config: &MarketConfig, order: &OrderRequest) -> anyhow::Result<u64> {
1196    let mut estimate = config.lockin_gas_estimate;
1197
1198    if order.request.is_smart_contract_signed() {
1199        estimate += ERC1271_MAX_GAS_FOR_CHECK;
1200    }
1201
1202    Ok(estimate)
1203}