1#![allow(missing_docs)]
15
16pub mod config;
17pub(crate) mod local_executor;
18pub mod prover;
19pub(crate) mod requestor_pricing;
20pub mod storage;
21
22#[cfg(not(feature = "prover_utils"))]
23pub use config::MarketConfig;
24#[cfg(feature = "prover_utils")]
25pub use config::{
26 defaults as config_defaults, BatcherConfig, Config, MarketConfig, OrderCommitmentPriority,
27 OrderPricingPriority, ProverConfig,
28};
29
30use crate::{
31 contracts::{
32 FulfillmentData, Predicate, PredicateType, ProofRequest, RequestError, RequestInputType,
33 },
34 input::GuestEnv,
35 selector::{is_blake3_groth16_selector, SupportedSelectors},
36 storage::StorageDownloader,
37 util::now_timestamp,
38};
39use alloy::{
40 primitives::{
41 utils::{format_ether, format_units, parse_units},
42 Address, Bytes, FixedBytes, U256,
43 },
44 uint,
45};
46use anyhow::Context;
47use hex::FromHex;
48use moka::future::Cache;
49use prover::ProverObj;
50use risc0_zkvm::sha::Digest as Risc0Digest;
51use serde::{Deserialize, Serialize};
52use sha2::{Digest, Sha256};
53use std::{
54 collections::HashSet,
55 fmt,
56 sync::{Arc, OnceLock},
57};
58use thiserror::Error;
59use OrderPricingOutcome::Skip;
60
61const ONE_MILLION: U256 = uint!(1_000_000_U256);
62
63#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum ProveLimitReason {
66 EthPricing { max_price: U256, gas_cost: U256, mcycle_price_eth: U256 },
67 CollateralPricing { collateral_reward: String, mcycle_price_collateral: String },
68 ConfigCap { max_mcycles: u64 },
69 DeadlineCap { time_remaining_secs: u64, peak_prove_khz: u64 },
70}
71
72impl fmt::Display for ProveLimitReason {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 match self {
75 ProveLimitReason::CollateralPricing { collateral_reward, mcycle_price_collateral } => {
76 write!(
77 f,
78 "collateral pricing: order collateral reward {} / {} mcycle_price_collateral_token config",
79 collateral_reward,
80 mcycle_price_collateral,
81 )
82 }
83 ProveLimitReason::EthPricing { max_price, gas_cost, mcycle_price_eth } => {
84 write!(
85 f,
86 "ETH pricing: (order maxPrice {} - gas {}) / mcycle_price config ({} per Mcycle)",
87 format_ether(*max_price),
88 format_ether(*gas_cost),
89 format_ether(*mcycle_price_eth)
90 )
91 }
92 ProveLimitReason::ConfigCap { max_mcycles } => {
93 write!(f, "broker max_mcycle_limit config setting ({} Mcycles)", max_mcycles)
94 }
95 ProveLimitReason::DeadlineCap { time_remaining_secs, peak_prove_khz } => {
96 write!(
97 f,
98 "deadline: {}s remaining with peak_prove_khz config ({} kHz)",
99 time_remaining_secs, peak_prove_khz
100 )
101 }
102 }
103 }
104}
105
106#[derive(Error, Debug, Clone)]
108#[non_exhaustive]
109pub enum OrderPricingError {
110 #[error("failed to fetch / push input: {0:#}")]
112 FetchInputErr(#[source] Arc<anyhow::Error>),
113 #[error("failed to fetch / push image: {0:#}")]
115 FetchImageErr(#[source] Arc<anyhow::Error>),
116 #[error("invalid request: {0}")]
118 RequestError(Arc<RequestError>),
119 #[error("RPC error: {0:#}")]
121 RpcErr(Arc<anyhow::Error>),
122 #[error("Unexpected error: {0:#}")]
124 UnexpectedErr(Arc<anyhow::Error>),
125}
126
127impl From<anyhow::Error> for OrderPricingError {
128 fn from(err: anyhow::Error) -> Self {
129 OrderPricingError::UnexpectedErr(Arc::new(err))
130 }
131}
132
133impl From<RequestError> for OrderPricingError {
134 fn from(err: RequestError) -> Self {
135 OrderPricingError::RequestError(Arc::new(err))
136 }
137}
138
139#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
141pub enum FulfillmentType {
142 LockAndFulfill,
143 FulfillAfterLockExpire,
144 FulfillWithoutLocking,
146}
147
148#[derive(Serialize, Deserialize, Debug, Clone)]
150pub struct OrderRequest {
151 pub request: ProofRequest,
152 pub client_sig: Bytes,
153 pub fulfillment_type: FulfillmentType,
154 pub boundless_market_address: Address,
155 pub chain_id: u64,
156 pub image_id: Option<String>,
157 pub input_id: Option<String>,
158 pub total_cycles: Option<u64>,
159 pub target_timestamp: Option<u64>,
160 pub expire_timestamp: Option<u64>,
161 #[serde(skip)]
162 cached_id: OnceLock<String>,
163}
164
165impl OrderRequest {
166 pub fn new(
167 request: ProofRequest,
168 client_sig: Bytes,
169 fulfillment_type: FulfillmentType,
170 boundless_market_address: Address,
171 chain_id: u64,
172 ) -> Self {
173 Self {
174 request,
175 client_sig,
176 fulfillment_type,
177 boundless_market_address,
178 chain_id,
179 image_id: None,
180 input_id: None,
181 total_cycles: None,
182 target_timestamp: None,
183 expire_timestamp: None,
184 cached_id: OnceLock::new(),
185 }
186 }
187
188 pub fn id(&self) -> String {
189 self.cached_id
190 .get_or_init(|| {
191 let signing_hash = self
192 .request
193 .signing_hash(self.boundless_market_address, self.chain_id)
194 .unwrap_or(FixedBytes::ZERO);
195 format!("0x{:x}-{}-{:?}", self.request.id, signing_hash, self.fulfillment_type)
196 })
197 .clone()
198 }
199
200 pub fn expiry(&self) -> u64 {
201 match self.fulfillment_type {
202 FulfillmentType::LockAndFulfill => self.request.lock_expires_at(),
203 FulfillmentType::FulfillAfterLockExpire => self.request.expires_at(),
204 FulfillmentType::FulfillWithoutLocking => self.request.expires_at(),
205 }
206 }
207}
208
209impl std::fmt::Display for OrderRequest {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 let total_mcycles = if self.total_cycles.is_some() {
212 format!(" ({} mcycles)", self.total_cycles.unwrap() / 1_000_000)
213 } else {
214 "".to_string()
215 };
216 write!(f, "{}{} [{}]", self.id(), total_mcycles, format_expiries(&self.request))
217 }
218}
219
220#[derive(Debug)]
222#[cfg_attr(not(feature = "prover_utils"), allow(dead_code))]
223pub enum OrderPricingOutcome {
224 Lock {
226 total_cycles: u64,
227 target_timestamp_secs: u64,
228 expiry_secs: u64,
229 target_mcycle_price: U256,
230 max_mcycle_price: U256,
231 config_min_mcycle_price: U256,
232 current_mcycle_price: U256,
233 },
234 ProveAfterLockExpire {
236 total_cycles: u64,
237 lock_expire_timestamp_secs: u64,
238 expiry_secs: u64,
239 mcycle_price: U256,
240 config_min_mcycle_price: U256,
241 },
242 Skip { reason: String },
244}
245
246#[derive(Clone, Debug)]
248pub enum PreflightCacheValue {
249 Success { exec_session_id: String, cycle_count: u64, image_id: String, input_id: String },
250 Skip { cached_limit: u64 },
251}
252
253#[derive(Hash, Eq, PartialEq, Clone, Debug)]
255pub enum InputCacheKey {
256 Url(String),
258 Hash([u8; 32]),
260}
261
262#[derive(Hash, Eq, PartialEq, Clone, Debug)]
264pub struct PreflightCacheKey {
265 pub predicate_data: Vec<u8>,
267 pub input: InputCacheKey,
269}
270
271pub type PreflightCache = Arc<Cache<PreflightCacheKey, PreflightCacheValue>>;
273
274async fn upload_image_with_downloader(
279 prover: &ProverObj,
280 image_url: &str,
281 predicate: &crate::contracts::RequestPredicate,
282 downloader: &(dyn StorageDownloader + Send + Sync),
283) -> anyhow::Result<String> {
284 let predicate = Predicate::try_from(predicate.clone()).context("Failed to parse predicate")?;
285
286 let image_id_str = predicate.image_id().map(|image_id| image_id.to_string());
287
288 if let Some(ref image_id_str) = image_id_str {
290 if prover.has_image(image_id_str).await? {
291 tracing::debug!("Skipping program upload for cached image ID: {image_id_str}");
292 return Ok(image_id_str.clone());
293 }
294 }
295
296 tracing::debug!("Fetching program from URI {image_url}");
297 let image_data = downloader
298 .download(image_url)
299 .await
300 .with_context(|| format!("Failed to fetch image URI: {image_url}"))?;
301
302 let image_id =
303 risc0_zkvm::compute_image_id(&image_data).context("Failed to compute image ID")?;
304
305 if let Some(ref expected_image_id_str) = image_id_str {
306 let expected_image_id = risc0_zkvm::sha::Digest::from_hex(expected_image_id_str)?;
307 if image_id != expected_image_id {
308 anyhow::bail!(
309 "image ID does not match requirements; expect {}, got {}",
310 expected_image_id,
311 image_id
312 );
313 }
314 }
315
316 let image_id_str = image_id.to_string();
317
318 tracing::debug!("Uploading program with image ID {image_id_str} to prover");
319 prover.upload_image(&image_id_str, image_data).await?;
320
321 Ok(image_id_str)
322}
323
324async fn upload_input_with_downloader(
331 prover: &ProverObj,
332 input_type: crate::contracts::RequestInputType,
333 input_data: &Bytes,
334 downloader: &(dyn StorageDownloader + Send + Sync),
335 is_priority_requestor: bool,
336) -> anyhow::Result<String> {
337 match input_type {
338 crate::contracts::RequestInputType::Inline => {
339 let stdin = GuestEnv::decode(input_data).context("Failed to decode input")?.stdin;
340 prover.upload_input(stdin).await.map_err(|e| anyhow::anyhow!("{}", e))
341 }
342 crate::contracts::RequestInputType::Url => {
343 let input_url =
344 std::str::from_utf8(input_data).context("input url is not valid utf8")?;
345
346 tracing::debug!("Fetching input from URI {input_url}");
347 let raw_input = if is_priority_requestor {
348 downloader.download_with_limit(input_url, usize::MAX).await
349 } else {
350 downloader.download(input_url).await
351 }
352 .with_context(|| format!("Failed to fetch input URI: {input_url}"))?;
353
354 let stdin =
355 GuestEnv::decode(&raw_input).context("Failed to decode input from URL")?.stdin;
356
357 prover.upload_input(stdin).await.map_err(|e| anyhow::anyhow!("{}", e))
358 }
359 crate::contracts::RequestInputType::__Invalid => {
360 anyhow::bail!("Invalid input type")
361 }
362 }
363}
364
365#[allow(async_fn_in_trait)]
367pub trait OrderPricingContext {
368 fn market_config(&self) -> Result<MarketConfig, OrderPricingError>;
369 fn supported_selectors(&self) -> &SupportedSelectors;
370 fn preflight_cache(&self) -> &PreflightCache;
371 fn collateral_token_decimals(&self) -> u8;
372 fn format_collateral(&self, value: U256) -> String {
373 format_units(value, self.collateral_token_decimals()).unwrap_or_else(|_| "?".to_string())
374 }
375 fn denied_requestor_addresses(&self) -> Result<Option<HashSet<Address>>, OrderPricingError> {
377 Ok(None)
378 }
379 fn check_requestor_allowed(
380 &self,
381 order: &OrderRequest,
382 denied_addresses_opt: Option<&HashSet<Address>>,
383 ) -> Result<Option<OrderPricingOutcome>, OrderPricingError>;
384 async fn check_request_available(
385 &self,
386 order: &OrderRequest,
387 ) -> Result<Option<OrderPricingOutcome>, OrderPricingError>;
388 #[cfg(feature = "prover_utils")]
389 async fn estimate_gas_to_fulfill_pending(&self) -> Result<u64, OrderPricingError>;
390 fn is_priority_requestor(&self, client_addr: &Address) -> bool;
391 async fn check_available_balances(
392 &self,
393 order: &OrderRequest,
394 order_gas_cost: U256,
395 lock_expired: bool,
396 lockin_collateral: U256,
397 ) -> Result<Option<OrderPricingOutcome>, OrderPricingError>;
398 async fn current_gas_price(&self) -> Result<u128, OrderPricingError>;
399
400 fn prover(&self) -> &ProverObj;
402
403 fn downloader(&self) -> Arc<dyn StorageDownloader + Send + Sync>;
406
407 #[cfg(feature = "prover_utils")]
409 async fn fetch_url(&self, url: &str) -> Result<Vec<u8>, OrderPricingError> {
410 self.downloader()
411 .download(url)
412 .await
413 .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e.into())))
414 }
415
416 #[cfg(feature = "prover_utils")]
418 async fn upload_image(
419 &self,
420 image_url: &str,
421 predicate: &crate::contracts::RequestPredicate,
422 ) -> Result<String, OrderPricingError> {
423 upload_image_with_downloader(
424 self.prover(),
425 image_url,
426 predicate,
427 self.downloader().as_ref(),
428 )
429 .await
430 .map_err(|e| OrderPricingError::FetchImageErr(Arc::new(e)))
431 }
432
433 #[cfg(feature = "prover_utils")]
435 async fn upload_input(&self, order: &OrderRequest) -> Result<String, OrderPricingError> {
436 let is_priority = self.is_priority_requestor(&order.request.client_address());
437 upload_input_with_downloader(
438 self.prover(),
439 order.request.input.inputType,
440 &order.request.input.data,
441 self.downloader().as_ref(),
442 is_priority,
443 )
444 .await
445 .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e)))
446 }
447
448 async fn preflight_execute(
450 &self,
451 order: &OrderRequest,
452 exec_limit_cycles: u64,
453 cache_key: PreflightCacheKey,
454 ) -> Result<PreflightCacheValue, OrderPricingError> {
455 let order_id = order.id();
456
457 let prover = self.prover().clone();
459 let downloader = self.downloader();
460 let cache = self.preflight_cache().clone();
461 let image_url = order.request.imageUrl.clone();
462 let predicate = order.request.requirements.predicate.clone();
463 let input_type = order.request.input.inputType;
464 let input_data = order.request.input.data.clone();
465 let order_id_clone = order_id.clone();
466 let is_priority = self.is_priority_requestor(&order.request.client_address());
467
468 let result = cache
471 .try_get_with(cache_key, async move {
472 tracing::trace!(
473 "Starting preflight execution of {order_id_clone} with limit of {exec_limit_cycles} cycles"
474 );
475
476 let image_id =
478 upload_image_with_downloader(&prover, &image_url, &predicate, downloader.as_ref())
479 .await
480 .map_err(|e| OrderPricingError::FetchImageErr(Arc::new(e)))?;
481
482 let input_id =
484 upload_input_with_downloader(&prover, input_type, &input_data, downloader.as_ref(), is_priority)
485 .await
486 .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e)))?;
487
488 match prover
489 .preflight(&image_id, &input_id, vec![], Some(exec_limit_cycles), &order_id_clone)
490 .await
491 {
492 Ok(res) => {
493 tracing::debug!(
494 "Preflight execution of {order_id_clone} with session id {} and {} mcycles completed",
495 res.id,
496 res.stats.total_cycles / 1_000_000
497 );
498 Ok(PreflightCacheValue::Success {
499 exec_session_id: res.id,
500 cycle_count: res.stats.total_cycles,
501 image_id,
502 input_id,
503 })
504 }
505 Err(err) => {
506 let err_msg = err.to_string();
507 if err_msg.contains("Session limit exceeded")
508 || err_msg.contains("Execution stopped intentionally due to session limit")
509 {
510 tracing::debug!(
511 "Skipping order {order_id_clone} due to intentional execution limit of {exec_limit_cycles}"
512 );
513 Ok(PreflightCacheValue::Skip { cached_limit: exec_limit_cycles })
514 } else if err_msg.contains("Guest panicked") || err_msg.contains("GuestPanic") {
515 tracing::debug!(
516 "Skipping order {order_id_clone} due to guest panic: {}",
517 err_msg
518 );
519 Ok(PreflightCacheValue::Skip { cached_limit: u64::MAX })
520 } else {
521 Err(OrderPricingError::UnexpectedErr(Arc::new(err.into())))
522 }
523 }
524 }
525 })
526 .await
527 .map_err(|e| (*e).clone())?;
528
529 Ok(result)
530 }
531
532 async fn preflight_journal(&self, exec_session_id: &str) -> Result<Vec<u8>, OrderPricingError> {
534 self.prover()
535 .get_preflight_journal(exec_session_id)
536 .await
537 .map_err(|e| OrderPricingError::UnexpectedErr(Arc::new(e.into())))?
538 .ok_or_else(|| {
539 OrderPricingError::UnexpectedErr(Arc::new(anyhow::anyhow!(
540 "Preflight journal not found"
541 )))
542 })
543 }
544 async fn price_order(
545 &self,
546 order: &mut OrderRequest,
547 ) -> Result<OrderPricingOutcome, OrderPricingError> {
548 let order_id = order.id();
549 tracing::debug!("Pricing order {order_id}");
550
551 let now = now_timestamp();
552
553 let lock_expired = order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire;
556
557 let expiration = order.expiry();
558 let lockin_collateral =
559 if lock_expired { U256::ZERO } else { U256::from(order.request.offer.lockCollateral) };
560
561 if expiration <= now {
562 return Ok(Skip { reason: "order has expired".to_string() });
563 };
564
565 let config = self.market_config()?;
566 let min_deadline = config.min_deadline;
567 let denied_addresses_opt = self.denied_requestor_addresses()?;
568
569 let seconds_left = expiration.saturating_sub(now);
571 if seconds_left <= min_deadline {
572 return Ok(Skip {
573 reason: format!(
574 "order expires in {seconds_left} seconds with min_deadline {min_deadline}"
575 ),
576 });
577 }
578
579 if let Some(outcome) = self.check_requestor_allowed(order, denied_addresses_opt.as_ref())? {
581 return Ok(outcome);
582 }
583
584 if !self.supported_selectors().is_supported(order.request.requirements.selector) {
585 return Ok(Skip {
586 reason: format!(
587 "unsupported selector requirement. Requested: {:x}. Supported: {:?}",
588 order.request.requirements.selector,
589 self.supported_selectors()
590 .selectors
591 .iter()
592 .map(|(k, v)| format!("{k:x} ({v:?})"))
593 .collect::<Vec<_>>()
594 ),
595 });
596 };
597
598 let max_collateral: U256 =
601 parse_units(&config.max_collateral, self.collateral_token_decimals())
602 .context("Failed to parse max_collateral")?
603 .into();
604
605 if !lock_expired && lockin_collateral > max_collateral {
606 return Ok(Skip {
607 reason: format!(
608 "order collateral requirement exceeds max_collateral config {} > {}",
609 self.format_collateral(lockin_collateral),
610 self.format_collateral(max_collateral),
611 ),
612 });
613 }
614
615 if let Some(outcome) = self.check_request_available(order).await? {
617 return Ok(outcome);
618 }
619
620 let gas_price = self.current_gas_price().await?;
625 let order_gas = if lock_expired {
626 U256::from(
628 estimate_gas_to_fulfill(&config, self.supported_selectors(), &order.request)
629 .await?,
630 )
631 } else {
632 U256::from(
633 estimate_gas_to_lock(&config, order).await?
634 + estimate_gas_to_fulfill(&config, self.supported_selectors(), &order.request)
635 .await?,
636 )
637 };
638 let order_gas_cost = U256::from(gas_price) * order_gas;
639 tracing::debug!(
640 "Estimated {order_gas} gas to {} order {order_id}; {} ether @ {} gwei",
641 if lock_expired { "fulfill" } else { "lock and fulfill" },
642 format_ether(order_gas_cost),
643 format_units(gas_price, "gwei").unwrap()
644 );
645
646 if let Some(outcome) = self
647 .check_available_balances(order, order_gas_cost, lock_expired, lockin_collateral)
648 .await?
649 {
650 return Ok(outcome);
651 }
652
653 let (exec_limit_cycles, prove_limit, prove_limit_reason) =
655 self.calculate_exec_limits(order, order_gas_cost)?;
656
657 if prove_limit < 2 {
658 return Ok(Skip {
662 reason: format!("cycle limit hit from max reward: {} cycles", prove_limit),
663 });
664 }
665
666 tracing::debug!(
667 "Starting preflight execution of {order_id} with limit of {} cycles (~{} mcycles)",
668 exec_limit_cycles,
669 exec_limit_cycles / 1_000_000
670 );
671
672 let predicate_data = order.request.requirements.predicate.data.to_vec();
674 let cache_key = match order.request.input.inputType {
675 RequestInputType::Url => {
676 let input_url = std::str::from_utf8(&order.request.input.data)
677 .context("input url is not utf8")
678 .map_err(|e| OrderPricingError::FetchInputErr(Arc::new(e)))?
679 .to_string();
680 PreflightCacheKey { predicate_data, input: InputCacheKey::Url(input_url) }
681 }
682 RequestInputType::Inline => {
683 let mut hasher = Sha256::new();
685 sha2::Digest::update(&mut hasher, &order.request.input.data);
686 let input_hash: [u8; 32] = hasher.finalize().into();
687 PreflightCacheKey { predicate_data, input: InputCacheKey::Hash(input_hash) }
688 }
689 RequestInputType::__Invalid => {
690 return Err(OrderPricingError::UnexpectedErr(Arc::new(anyhow::anyhow!(
691 "Unknown input type: {:?}",
692 order.request.input.inputType
693 ))));
694 }
695 };
696
697 let preflight_result = loop {
698 let result =
699 self.preflight_execute(order, exec_limit_cycles, cache_key.clone()).await?;
700 if let PreflightCacheValue::Skip { cached_limit } = result {
701 if cached_limit < exec_limit_cycles {
702 self.preflight_cache().invalidate(&cache_key).await;
703 continue;
704 }
705 }
706 break result;
707 };
708
709 let (exec_session_id, cycle_count, image_id) = match preflight_result {
710 PreflightCacheValue::Success { exec_session_id, cycle_count, image_id, input_id } => {
711 tracing::debug!(
712 "Using preflight result for {order_id}: session id {} with {} mcycles",
713 exec_session_id,
714 cycle_count / 1_000_000
715 );
716
717 order.image_id = Some(image_id.clone());
719 order.input_id = Some(input_id.clone());
720
721 (exec_session_id, cycle_count, image_id)
722 }
723 PreflightCacheValue::Skip { cached_limit } => {
724 return Ok(Skip {
725 reason: format!(
726 "order preflight execution limit hit with {cached_limit} cycles - limited by {prove_limit_reason}"
727 ),
728 });
729 }
730 };
731
732 if cycle_count > prove_limit {
734 let config_info = match &prove_limit_reason {
737 ProveLimitReason::EthPricing { max_price, gas_cost, mcycle_price_eth } => {
738 let max_price_gas_adjusted = max_price.saturating_sub(*gas_cost);
739 let required_price_per_mcycle = max_price_gas_adjusted
740 .saturating_mul(ONE_MILLION)
741 / U256::from(cycle_count);
742 let required_price_per_mcycle_ignore_gas =
743 max_price.saturating_mul(ONE_MILLION) / U256::from(cycle_count);
744 format!(
745 "min_mcycle_price set to {} ETH/Mcycle in config, order requires min_mcycle_price <= {} ETH/Mcycle to be considered (gas cost: {} ETH, ignoring gas requires min {} ETH/Mcycle)",
746 format_ether(*mcycle_price_eth),
747 format_ether(required_price_per_mcycle),
748 format_ether(*gas_cost),
749 format_ether(required_price_per_mcycle_ignore_gas)
750 )
751 }
752 ProveLimitReason::CollateralPricing { mcycle_price_collateral, .. } => {
753 let reward =
754 order.request.offer.collateral_reward_if_locked_and_not_fulfilled();
755 let required_collateral_price =
756 reward.saturating_mul(ONE_MILLION) / U256::from(cycle_count);
757 format!(
758 "min_mcycle_price_collateral_token set to {} ZKC/Mcycle in config, order requires min_mcycle_price_collateral_token <= {} ZKC/Mcycle to be considered",
759 mcycle_price_collateral,
760 self.format_collateral(required_collateral_price)
761 )
762 }
763 ProveLimitReason::ConfigCap { max_mcycles } => {
764 let required_mcycles = cycle_count.div_ceil(1_000_000);
765 format!(
766 "max_mcycle_limit set to {} Mcycles in config, order requires max_mcycle_limit >= {} Mcycles to be considered",
767 max_mcycles, required_mcycles
768 )
769 }
770 ProveLimitReason::DeadlineCap { time_remaining_secs, peak_prove_khz } => {
771 let denom = time_remaining_secs.saturating_mul(1_000);
772 let required_khz = cycle_count.div_ceil(denom);
773 format!(
774 "peak_prove_khz set to {} kHz in config, order requires peak_prove_khz >= {} kHz to be considered",
775 peak_prove_khz, required_khz
776 )
777 }
778 };
779
780 return Ok(Skip {
781 reason: format!(
782 "order with {cycle_count} cycles above limit of {prove_limit} cycles - {config_info}"
783 ),
784 });
785 }
786
787 order.total_cycles = Some(cycle_count);
788
789 if config.min_mcycle_limit > 0 {
790 let min_cycles = config.min_mcycle_limit.saturating_mul(1_000_000);
791 if cycle_count < min_cycles {
792 tracing::debug!(
793 "Order {order_id} skipped due to min_mcycle_limit config: {} cycles < {} cycles",
794 cycle_count,
795 min_cycles
796 );
797 return Ok(Skip {
798 reason: format!(
799 "order with {} cycles below min limit of {} cycles - min_mcycle_limit set to {} Mcycles in config",
800 cycle_count,
801 min_cycles,
802 config.min_mcycle_limit,
803 ),
804 });
805 }
806 }
807
808 let journal = self.preflight_journal(&exec_session_id).await?;
809 let order_predicate_type = order.request.requirements.predicate.predicateType;
810 if matches!(order_predicate_type, PredicateType::PrefixMatch | PredicateType::DigestMatch)
811 && journal.len() > config.max_journal_bytes
812 {
813 return Ok(OrderPricingOutcome::Skip {
814 reason: format!(
815 "order journal larger than set limit ({} > {})",
816 journal.len(),
817 config.max_journal_bytes
818 ),
819 });
820 }
821
822 if is_blake3_groth16_selector(order.request.requirements.selector) && journal.len() != 32 {
824 tracing::info!(
825 "Order {order_id} journal is not 32 bytes for blake3 groth16 selector, skipping",
826 );
827 return Ok(Skip {
828 reason: "blake3 groth16 selector requires 32 byte journal".to_string(),
829 });
830 }
831
832 let predicate = Predicate::try_from(order.request.requirements.predicate.clone())
834 .map_err(|e| OrderPricingError::RequestError(Arc::new(e.into())))?;
835 let eval_data = if is_blake3_groth16_selector(order.request.requirements.selector) {
836 FulfillmentData::None
838 } else {
839 FulfillmentData::from_image_id_and_journal(
840 Risc0Digest::from_hex(image_id).context("Failed to parse image ID")?,
841 journal,
842 )
843 };
844 if predicate.eval(&eval_data).is_none() {
845 return Ok(Skip { reason: "order predicate check failed".to_string() });
846 }
847
848 if lock_expired {
850 let price = order.request.offer.collateral_reward_if_locked_and_not_fulfilled();
852 let mcycle_price_in_collateral_tokens =
853 price.saturating_mul(ONE_MILLION) / U256::from(cycle_count);
854
855 let config_min_mcycle_price_collateral_tokens = parse_units(
856 &config.min_mcycle_price_collateral_token,
857 self.collateral_token_decimals(),
858 )
859 .context("Failed to parse min_mcycle_price_collateral_token")?
860 .into();
861
862 tracing::debug!(
863 "Order price: {} (collateral tokens) - cycles: {} - mcycle price: {} (collateral tokens), config_min_mcycle_price_collateral_tokens: {} (collateral tokens)",
864 format_ether(price),
865 cycle_count,
866 self.format_collateral(mcycle_price_in_collateral_tokens),
867 self.format_collateral(config_min_mcycle_price_collateral_tokens),
868 );
869
870 if mcycle_price_in_collateral_tokens < config_min_mcycle_price_collateral_tokens {
872 return Ok(Skip {
873 reason: format!(
874 "slashed collateral reward too low. {} (collateral reward) < config mcycle_price_collateral_token {}",
875 self.format_collateral(mcycle_price_in_collateral_tokens),
876 self.format_collateral(config_min_mcycle_price_collateral_tokens),
877 ),
878 });
879 }
880
881 Ok(OrderPricingOutcome::ProveAfterLockExpire {
882 total_cycles: cycle_count,
883 lock_expire_timestamp_secs: order.request.offer.rampUpStart
884 + order.request.offer.lockTimeout as u64,
885 expiry_secs: order.request.offer.rampUpStart + order.request.offer.timeout as u64,
886 mcycle_price: mcycle_price_in_collateral_tokens,
887 config_min_mcycle_price: config_min_mcycle_price_collateral_tokens,
888 })
889 } else {
890 let config_min_mcycle_price: U256 = parse_units(&config.min_mcycle_price, 18)
892 .context("Failed to parse min_mcycle_price")?
893 .into();
894
895 let order_id = order.id();
896
897 let mcycle_price_min = U256::from(order.request.offer.minPrice)
898 .saturating_sub(order_gas_cost)
899 .saturating_mul(ONE_MILLION)
900 / U256::from(cycle_count);
901 let mcycle_price_max = U256::from(order.request.offer.maxPrice)
902 .saturating_sub(order_gas_cost)
903 .saturating_mul(ONE_MILLION)
904 / U256::from(cycle_count);
905
906 tracing::debug!(
907 "Order {order_id} price: {}-{} ETH, {}-{} ETH per mcycle, {} collateral required, {} ETH gas cost",
908 format_ether(U256::from(order.request.offer.minPrice)),
909 format_ether(U256::from(order.request.offer.maxPrice)),
910 format_ether(mcycle_price_min),
911 format_ether(mcycle_price_max),
912 self.format_collateral(order.request.offer.lockCollateral),
913 format_ether(order_gas_cost),
914 );
915
916 if mcycle_price_max < config_min_mcycle_price {
918 return Ok(OrderPricingOutcome::Skip {
919 reason: format!(
920 "order max price {} is less than mcycle_price config {}",
921 format_ether(U256::from(order.request.offer.maxPrice)),
922 format_ether(config_min_mcycle_price),
923 ),
924 });
925 }
926
927 let current_mcycle_price = order
928 .request
929 .offer
930 .price_at(now_timestamp())
931 .context("Failed to get current mcycle price")?;
932 let (target_mcycle_price, target_timestamp_secs) =
933 if mcycle_price_min >= config_min_mcycle_price {
934 tracing::info!(
935 "Selecting order {order_id} at price {} - ASAP",
936 format_ether(current_mcycle_price)
937 );
938 (mcycle_price_min, 0) } else {
940 let target_min_price = config_min_mcycle_price
941 .saturating_mul(U256::from(cycle_count))
942 .div_ceil(ONE_MILLION)
943 + order_gas_cost;
944 tracing::debug!(
945 "Order {order_id} minimum profitable price: {} ETH",
946 format_ether(target_min_price)
947 );
948
949 let target_time = order
950 .request
951 .offer
952 .time_at_price(target_min_price)
953 .context("Failed to get target price timestamp")?;
954 (target_min_price, target_time)
955 };
956
957 let expiry_secs =
958 order.request.offer.rampUpStart + order.request.offer.lockTimeout as u64;
959
960 Ok(OrderPricingOutcome::Lock {
961 total_cycles: cycle_count,
962 target_timestamp_secs,
963 expiry_secs,
964 target_mcycle_price,
965 max_mcycle_price: mcycle_price_max,
966 config_min_mcycle_price,
967 current_mcycle_price,
968 })
969 }
970 }
971
972 fn calculate_exec_limits(
981 &self,
982 order: &OrderRequest,
983 order_gas_cost: U256,
984 ) -> Result<(u64, u64, ProveLimitReason), OrderPricingError> {
985 let order_id = order.id();
987 let is_fulfill_after_lock_expire =
988 order.fulfillment_type == FulfillmentType::FulfillAfterLockExpire;
989 let now = now_timestamp();
990 let request_expiration = order.expiry();
991 let lock_expiry = order.request.lock_expires_at();
992 let order_expiry = order.request.expires_at();
993 let config = self.market_config()?;
994 let min_mcycle_price = parse_units(&config.min_mcycle_price, 18)
995 .context("Failed to parse min_mcycle_price")?
996 .into();
997 let min_mcycle_price_collateral_tokens: U256 = parse_units(
998 &config.min_mcycle_price_collateral_token,
999 self.collateral_token_decimals(),
1000 )
1001 .context("Failed to parse min_mcycle_price_collateral_token")?
1002 .into();
1003
1004 let collateral_based_limit = if min_mcycle_price_collateral_tokens == U256::ZERO {
1006 tracing::info!("min_mcycle_price_collateral_token is 0, setting unlimited exec limit");
1007 u64::MAX
1008 } else {
1009 let price = order.request.offer.collateral_reward_if_locked_and_not_fulfilled();
1010
1011 let initial_collateral_based_limit =
1012 (price.saturating_mul(ONE_MILLION).div_ceil(min_mcycle_price_collateral_tokens))
1013 .try_into()
1014 .unwrap_or(u64::MAX);
1015
1016 tracing::trace!(
1017 "Order {order_id} initial collateral based limit: {initial_collateral_based_limit}"
1018 );
1019 initial_collateral_based_limit
1020 };
1021
1022 let mut preflight_limit = collateral_based_limit;
1023 let mut prove_limit = collateral_based_limit;
1024 let mut prove_limit_reason = ProveLimitReason::CollateralPricing {
1025 mcycle_price_collateral: self.format_collateral(min_mcycle_price_collateral_tokens),
1026 collateral_reward: self.format_collateral(
1027 order.request.offer.collateral_reward_if_locked_and_not_fulfilled(),
1028 ),
1029 };
1030
1031 if !is_fulfill_after_lock_expire {
1033 let eth_based_limit: u64 = if min_mcycle_price == U256::ZERO {
1035 tracing::info!("min_mcycle_price is 0, setting unlimited exec limit");
1036 u64::MAX
1037 } else {
1038 let limit: U256 = U256::from(order.request.offer.maxPrice)
1039 .saturating_sub(order_gas_cost)
1040 .saturating_mul(ONE_MILLION)
1041 / min_mcycle_price;
1042 limit.try_into().unwrap_or(u64::MAX)
1043 };
1044
1045 if eth_based_limit > collateral_based_limit {
1046 tracing::debug!("Order {order_id} eth based limit ({eth_based_limit}) > collateral based limit ({collateral_based_limit}), using eth based limit for both preflight and prove");
1048 preflight_limit = eth_based_limit;
1049 prove_limit = eth_based_limit;
1050 } else {
1051 tracing::debug!("Order {order_id} eth based limit ({eth_based_limit}) < collateral based limit ({collateral_based_limit}), using eth based limit for prove");
1053 prove_limit = eth_based_limit;
1054 }
1055 prove_limit_reason = ProveLimitReason::EthPricing {
1056 max_price: U256::from(order.request.offer.maxPrice),
1057 gas_cost: order_gas_cost,
1058 mcycle_price_eth: min_mcycle_price,
1059 };
1060 }
1061
1062 debug_assert!(
1063 preflight_limit >= prove_limit,
1064 "preflight_limit ({preflight_limit}) < prove_limit ({prove_limit})",
1065 );
1066
1067 let client_addr = order.request.client_address();
1070 let skip_mcycle_limit = self.is_priority_requestor(&client_addr);
1071 if skip_mcycle_limit {
1072 tracing::debug!("Order {order_id} exec limit config ignored due to client {} being part of priority requestors.", client_addr);
1073 }
1074
1075 if !skip_mcycle_limit {
1076 let config_cycle_limit = config.max_mcycle_limit.saturating_mul(1_000_000);
1077 if prove_limit > config_cycle_limit {
1078 tracing::debug!(
1079 "Order {order_id} prove limit capped by max_mcycle_limit config: {} -> {} cycles",
1080 prove_limit,
1081 config_cycle_limit
1082 );
1083 prove_limit = config_cycle_limit;
1084 preflight_limit = config_cycle_limit;
1085 prove_limit_reason =
1086 ProveLimitReason::ConfigCap { max_mcycles: config.max_mcycle_limit };
1087 } else if preflight_limit > config_cycle_limit {
1088 preflight_limit = config_cycle_limit;
1089 }
1090 }
1091
1092 if let Some(peak_prove_khz) = config.peak_prove_khz {
1094 let prove_window = request_expiration.saturating_sub(now);
1095 let prove_deadline_limit = calculate_max_cycles_for_time(peak_prove_khz, prove_window);
1096 if prove_limit > prove_deadline_limit {
1097 tracing::debug!("Order {order_id} prove limit capped by deadline: {} -> {} cycles ({:.1}s at {} peak_prove_khz)", prove_limit, prove_deadline_limit, prove_window, peak_prove_khz);
1098 prove_limit = prove_deadline_limit;
1099 prove_limit_reason = ProveLimitReason::DeadlineCap {
1100 time_remaining_secs: prove_window,
1101 peak_prove_khz,
1102 };
1103 }
1104
1105 let new_preflight_limit = if !is_fulfill_after_lock_expire {
1107 let fulfill_after_expiry_window = order_expiry.saturating_sub(lock_expiry);
1108 let fulfill_after_expiry_limit =
1109 calculate_max_cycles_for_time(peak_prove_khz, fulfill_after_expiry_window);
1110 std::cmp::max(prove_deadline_limit, fulfill_after_expiry_limit)
1111 } else {
1112 prove_deadline_limit
1113 };
1114
1115 if preflight_limit > new_preflight_limit {
1116 tracing::debug!("Order {order_id} preflight limit capped by deadline: {} -> {} cycles ({:.1}s at {} peak_prove_khz)", preflight_limit, new_preflight_limit, prove_window, peak_prove_khz);
1117 preflight_limit = new_preflight_limit;
1118 }
1119 }
1120
1121 tracing::trace!(
1122 "Order {order_id} final limits - preflight: {} cycles, prove: {} cycles (reason: {})",
1123 preflight_limit,
1124 prove_limit,
1125 prove_limit_reason
1126 );
1127
1128 debug_assert!(
1129 preflight_limit >= prove_limit,
1130 "preflight_limit ({preflight_limit}) < prove_limit ({prove_limit})",
1131 );
1132
1133 Ok((preflight_limit, prove_limit, prove_limit_reason))
1134 }
1135}
1136
1137fn calculate_max_cycles_for_time(peak_prove_khz: u64, time_remaining_secs: u64) -> u64 {
1138 peak_prove_khz.saturating_mul(time_remaining_secs) * 1_000
1139}
1140
1141fn format_expiries(request: &ProofRequest) -> String {
1142 let now: i64 = now_timestamp().try_into().unwrap();
1143 let lock_expires_at: i64 = request.lock_expires_at().try_into().unwrap();
1144 let lock_expires_delta = lock_expires_at - now;
1145 let lock_expires_delta_str = if lock_expires_delta > 0 {
1146 format!("{lock_expires_delta} seconds from now")
1147 } else {
1148 format!("{} seconds ago", lock_expires_delta.abs())
1149 };
1150 let expires_at: i64 = request.expires_at().try_into().unwrap();
1151 let expires_delta = expires_at - now;
1152 let expires_delta_str = if expires_delta > 0 {
1153 format!("{expires_delta} seconds from now")
1154 } else {
1155 format!("{} seconds ago", expires_delta.abs())
1156 };
1157 format!("Lock expires {lock_expires_delta_str}, expires {expires_delta_str}")
1158}
1159
1160async fn estimate_gas_to_fulfill(
1161 config: &MarketConfig,
1162 supported_selectors: &SupportedSelectors,
1163 request: &ProofRequest,
1164) -> anyhow::Result<u64> {
1165 use crate::selector::ProofType;
1166
1167 let selector = request.requirements.selector;
1168 if !supported_selectors.is_supported(selector) {
1169 anyhow::bail!("unsupported selector requirement: {selector:x}");
1170 }
1171
1172 let mut estimate = config.fulfill_gas_estimate;
1173
1174 estimate += u64::try_from(
1176 request
1177 .requirements
1178 .callback
1179 .as_option()
1180 .map(|callback| callback.gasLimit)
1181 .unwrap_or(alloy::primitives::aliases::U96::ZERO),
1182 )?;
1183
1184 estimate += match supported_selectors.proof_type(selector).context("unsupported selector")? {
1185 ProofType::Any | ProofType::Inclusion => 0,
1186 ProofType::Groth16 | ProofType::Blake3Groth16 => config.groth16_verify_gas_estimate,
1187 };
1188
1189 Ok(estimate)
1190}
1191
1192const ERC1271_MAX_GAS_FOR_CHECK: u64 = 100000;
1194
1195async fn estimate_gas_to_lock(config: &MarketConfig, order: &OrderRequest) -> anyhow::Result<u64> {
1196 let mut estimate = config.lockin_gas_estimate;
1197
1198 if order.request.is_smart_contract_signed() {
1199 estimate += ERC1271_MAX_GAS_FOR_CHECK;
1200 }
1201
1202 Ok(estimate)
1203}