1use std::{cmp::max, future::Future, str::FromStr, sync::Arc, time::Duration};
16
17use alloy::{
18 network::{Ethereum, EthereumWallet, TxSigner},
19 primitives::{Address, Bytes, U256},
20 providers::{fillers::ChainIdFiller, DynProvider, Provider, ProviderBuilder},
21 rpc::client::RpcClient,
22 signers::{
23 local::{LocalSignerError, PrivateKeySigner},
24 Signer,
25 },
26 transports::{http::Http, layers::FallbackLayer},
27};
28use alloy_primitives::{utils::format_ether, Signature, B256};
29use anyhow::{anyhow, bail, Context, Result};
30use risc0_aggregation::SetInclusionReceipt;
31use risc0_ethereum_contracts::set_verifier::SetVerifierService;
32use risc0_zkvm::{sha::Digest, ReceiptClaim};
33use tower::ServiceBuilder;
34use url::Url;
35
36use crate::{
37 balance_alerts_layer::{BalanceAlertConfig, BalanceAlertLayer},
38 contracts::{
39 boundless_market::{BoundlessMarketService, MarketError},
40 Fulfillment, FulfillmentData, ProofRequest, RequestError,
41 },
42 deployments::Deployment,
43 dynamic_gas_filler::{DynamicGasFiller, PriorityMode},
44 indexer_client::IndexerClient,
45 nonce_layer::NonceProvider,
46 order_stream_client::OrderStreamClient,
47 price_provider::{
48 MarketPricing, MarketPricingConfigBuilder, PriceProviderArc, StandardPriceProvider,
49 },
50 prover_utils::local_executor::LocalExecutor,
51 request_builder::{
52 FinalizerConfigBuilder, OfferLayer, OfferLayerConfigBuilder, ParameterizationMode,
53 PreflightLayer, RequestBuilder, RequestIdLayer, RequestIdLayerConfigBuilder,
54 StandardRequestBuilder, StandardRequestBuilderBuilderError, StorageLayer,
55 StorageLayerConfigBuilder,
56 },
57 storage::{
58 StandardDownloader, StandardUploader, StorageDownloader, StorageError, StorageUploader,
59 StorageUploaderConfig,
60 },
61 util::NotProvided,
62};
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66#[non_exhaustive]
67pub enum FundingMode {
68 Always,
70
71 Never,
76
77 AvailableBalance,
81
82 BelowThreshold(U256),
87
88 MinMaxBalance {
94 min_balance: U256,
96 max_balance: U256,
98 },
99}
100#[derive(Clone)]
102pub struct ClientBuilder<U, D, S> {
103 deployment: Option<Deployment>,
104 rpc_url: Option<Url>,
105 rpc_urls: Vec<Url>,
106 signer: Option<S>,
107 uploader: Option<U>,
108 downloader: Option<D>,
109 tx_timeout: Option<std::time::Duration>,
110 balance_alerts: Option<BalanceAlertConfig>,
111 price_provider: Option<PriceProviderArc>,
114 pub offer_layer_config: OfferLayerConfigBuilder,
116 pub storage_layer_config: StorageLayerConfigBuilder,
118 pub request_id_layer_config: RequestIdLayerConfigBuilder,
120 pub request_finalizer_config: FinalizerConfigBuilder,
122 pub funding_mode: FundingMode,
130 pub skip_preflight: Option<bool>,
136}
137
138impl<U, D, S> Default for ClientBuilder<U, D, S> {
139 fn default() -> Self {
140 Self {
141 deployment: None,
142 rpc_url: None,
143 rpc_urls: Vec::new(),
144 signer: None,
145 uploader: None,
146 downloader: None,
147 tx_timeout: None,
148 balance_alerts: None,
149 price_provider: None,
150 offer_layer_config: Default::default(),
151 storage_layer_config: Default::default(),
152 request_id_layer_config: Default::default(),
153 request_finalizer_config: Default::default(),
154 funding_mode: FundingMode::Always,
155 skip_preflight: None,
156 }
157 }
158}
159
160impl ClientBuilder<NotProvided, NotProvided, NotProvided> {
161 pub fn new() -> Self {
163 #[cfg(feature = "gcs")]
167 {
168 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
169 }
170
171 Self::default()
172 }
173}
174
175pub trait ClientProviderBuilder {
177 type Error;
179
180 fn build_provider(
182 &self,
183 rpc_urls: Vec<Url>,
184 ) -> impl Future<Output = Result<DynProvider, Self::Error>>;
185
186 fn signer_address(&self) -> Option<Address>;
188}
189
190impl<U, D, S> ClientBuilder<U, D, S> {
191 fn collect_rpc_urls(&self) -> Result<Vec<Url>, anyhow::Error> {
194 let mut seen = std::collections::HashSet::new();
196 if let Some(ref rpc_url) = self.rpc_url {
197 seen.insert(rpc_url.clone());
198 }
199 seen.extend(self.rpc_urls.iter().cloned());
200 let all_urls: Vec<Url> = seen.into_iter().collect();
201
202 if all_urls.is_empty() {
203 bail!("no RPC URLs provided, set at least one using with_rpc_url or with_rpc_urls");
204 }
205
206 Ok(all_urls)
207 }
208
209 fn build_fallback_transport(&self, urls: &[Url]) -> Result<RpcClient, anyhow::Error> {
211 let transports: Vec<Http<_>> = urls.iter().map(|url| Http::new(url.clone())).collect();
213
214 let active_count = std::num::NonZeroUsize::new(transports.len())
216 .context("at least one transport is required")?;
217 let fallback_layer = FallbackLayer::default().with_active_transport_count(active_count);
218
219 tracing::info!(
220 "Configuring provider with fallback support: {} URLs: {:?}",
221 urls.len(),
222 urls
223 );
224
225 let transport = ServiceBuilder::new().layer(fallback_layer).service(transports);
227
228 Ok(RpcClient::builder().transport(transport, false))
230 }
231}
232
233impl<U, D, S> ClientProviderBuilder for ClientBuilder<U, D, S>
234where
235 S: TxSigner<Signature> + Send + Sync + Clone + 'static,
236{
237 type Error = anyhow::Error;
238
239 async fn build_provider(&self, rpc_urls: Vec<Url>) -> Result<DynProvider, Self::Error> {
240 let provider = match self.signer.clone() {
241 Some(signer) => {
242 let dynamic_gas_filler = DynamicGasFiller::new(
243 20, PriorityMode::Medium,
245 signer.address(),
246 );
247
248 let balance_alerts = self.balance_alerts.clone().unwrap_or_default();
250
251 let base_provider = if rpc_urls.len() > 1 {
252 let client = self.build_fallback_transport(&rpc_urls)?;
254 ProviderBuilder::new()
255 .disable_recommended_fillers()
256 .filler(ChainIdFiller::default())
257 .filler(dynamic_gas_filler)
258 .layer(BalanceAlertLayer::new(balance_alerts))
259 .connect_client(client)
260 } else {
261 let url = rpc_urls.first().unwrap();
263 ProviderBuilder::new()
264 .disable_recommended_fillers()
265 .filler(ChainIdFiller::default())
266 .filler(dynamic_gas_filler)
267 .layer(BalanceAlertLayer::new(balance_alerts))
268 .connect(url.as_str())
269 .await
270 .with_context(|| format!("failed to connect provider to {url}"))?
271 };
272
273 NonceProvider::new(base_provider, EthereumWallet::from(signer)).erased()
274 }
275 None => {
276 if rpc_urls.len() > 1 {
277 let client = self.build_fallback_transport(&rpc_urls)?;
279 ProviderBuilder::new().connect_client(client).erased()
280 } else {
281 let url = rpc_urls.first().context("no RPC URL provided")?;
283 ProviderBuilder::new()
284 .connect(url.as_str())
285 .await
286 .with_context(|| format!("failed to connect provider to {url}"))?
287 .erased()
288 }
289 }
290 };
291 Ok(provider)
292 }
293
294 fn signer_address(&self) -> Option<Address> {
295 self.signer.as_ref().map(|signer| signer.address())
296 }
297}
298
299impl<U, D> ClientProviderBuilder for ClientBuilder<U, D, NotProvided> {
300 type Error = anyhow::Error;
301
302 async fn build_provider(&self, rpc_urls: Vec<Url>) -> Result<DynProvider, Self::Error> {
303 let provider = if rpc_urls.len() > 1 {
304 let client = self.build_fallback_transport(&rpc_urls)?;
306 ProviderBuilder::new().connect_client(client).erased()
307 } else {
308 let url = rpc_urls.first().unwrap();
310 ProviderBuilder::new()
311 .connect(url.as_str())
312 .await
313 .with_context(|| format!("failed to connect provider to {url}"))?
314 .erased()
315 };
316 Ok(provider)
317 }
318
319 fn signer_address(&self) -> Option<Address> {
320 None
321 }
322}
323
324impl<U, S> ClientBuilder<U, NotProvided, S> {
325 pub async fn build(
327 self,
328 ) -> Result<
329 Client<
330 DynProvider,
331 U,
332 StandardDownloader,
333 StandardRequestBuilder<DynProvider, U, StandardDownloader>,
334 S,
335 >,
336 >
337 where
338 U: Clone,
339 ClientBuilder<U, StandardDownloader, S>: ClientProviderBuilder<Error = anyhow::Error>,
340 {
341 self.with_downloader(StandardDownloader::new().await).build().await
342 }
343}
344
345impl<U, D: StorageDownloader, S> ClientBuilder<U, D, S> {
346 pub async fn build(
348 self,
349 ) -> Result<Client<DynProvider, U, D, StandardRequestBuilder<DynProvider, U, D>, S>>
350 where
351 U: Clone,
352 D: Clone,
353 Self: ClientProviderBuilder<Error = anyhow::Error>,
354 {
355 let all_urls = self.collect_rpc_urls()?;
356 let first_rpc_url = all_urls.first().cloned().unwrap();
358 let provider = self.build_provider(all_urls).await?;
359
360 let chain_id =
362 provider.get_chain_id().await.context("failed to query chain ID from RPC provider")?;
363 let deployment =
364 self.deployment.clone().or_else(|| Deployment::from_chain_id(chain_id)).with_context(
365 || format!("no deployment provided for unknown chain_id {chain_id}"),
366 )?;
367
368 if deployment.market_chain_id.map(|id| id != chain_id).unwrap_or(false) {
370 bail!("RPC url does not match specified Boundless deployment: {chain_id} (RPC) != {} (Boundless)", deployment.market_chain_id.unwrap());
371 }
372
373 let boundless_market = BoundlessMarketService::new(
375 deployment.boundless_market_address,
376 provider.clone(),
377 self.signer_address().unwrap_or(Address::ZERO),
378 );
379 let set_verifier = SetVerifierService::new(
380 deployment.set_verifier_address,
381 provider.clone(),
382 self.signer_address().unwrap_or(Address::ZERO),
383 );
384
385 let downloader = self.downloader.unwrap();
387
388 let offchain_client = deployment
390 .order_stream_url
391 .as_ref()
392 .map(|order_stream_url| {
393 let url = Url::parse(order_stream_url.as_ref())
394 .context("failed to parse order_stream_url")?;
395 anyhow::Ok(OrderStreamClient::new(
396 url,
397 deployment.boundless_market_address,
398 chain_id,
399 ))
400 })
401 .transpose()?;
402
403 let price_provider: Option<PriceProviderArc> =
405 if let Some(provider) = self.price_provider.clone() {
406 Some(provider)
407 } else {
408 let market_pricing = MarketPricing::new(
409 first_rpc_url,
410 MarketPricingConfigBuilder::default()
411 .deployment(deployment.clone())
412 .build()
413 .with_context(|| {
414 format!(
415 "Failed to build MarketPricingConfig for deployment: {deployment:?}",
416 )
417 })?,
418 );
419 if let Some(url_str) = deployment.indexer_url.as_ref() {
420 let url = Url::parse(url_str.as_ref()).with_context(|| {
421 format!("Failed to parse indexer URL from deployment: {}", url_str)
422 })?;
423 let indexer_client = IndexerClient::new(url).with_context(|| {
424 format!(
425 "Failed to create indexer client from deployment indexer URL: {}",
426 url_str
427 )
428 })?;
429 Some(Arc::new(
430 StandardPriceProvider::new(indexer_client).with_fallback(market_pricing),
431 ))
432 } else {
433 Some(Arc::new(StandardPriceProvider::<MarketPricing, MarketPricing>::new(
434 market_pricing,
435 )))
436 }
437 };
438
439 let request_builder = StandardRequestBuilder::builder()
441 .storage_layer(StorageLayer::new(
442 self.uploader.clone(),
443 self.storage_layer_config.build()?,
444 ))
445 .preflight_layer(PreflightLayer::new(
446 LocalExecutor::default(),
447 Some(downloader.clone()),
448 ))
449 .offer_layer(
450 OfferLayer::new(provider.clone(), self.offer_layer_config.build()?)
451 .with_price_provider(price_provider),
452 )
453 .request_id_layer(RequestIdLayer::new(
454 boundless_market.clone(),
455 self.request_id_layer_config.build()?,
456 ))
457 .finalizer(self.request_finalizer_config.build()?)
458 .build()?;
459
460 let mut client = Client {
461 boundless_market,
462 set_verifier,
463 uploader: self.uploader,
464 downloader,
465 offchain_client,
466 signer: self.signer,
467 request_builder: Some(request_builder),
468 deployment,
469 funding_mode: self.funding_mode,
470 };
471
472 if let Some(timeout) = self.tx_timeout {
473 client = client.with_timeout(timeout);
474 }
475
476 if let Some(skip_preflight) = self.skip_preflight {
477 client = client.with_skip_preflight(skip_preflight);
478 }
479
480 Ok(client)
481 }
482}
483
484impl<U, D, S> ClientBuilder<U, D, S> {
485 pub fn with_deployment(self, deployment: impl Into<Option<Deployment>>) -> Self {
489 Self { deployment: deployment.into(), ..self }
490 }
491
492 pub fn with_rpc_url(self, rpc_url: Url) -> Self {
494 Self { rpc_url: Some(rpc_url), ..self }
495 }
496
497 pub fn with_funding_mode(self, funding_mode: FundingMode) -> Self {
499 Self { funding_mode, ..self }
500 }
501
502 pub fn with_parameterization_mode(self, parameterization_mode: ParameterizationMode) -> Self {
516 self.config_offer_layer(|config| config.parameterization_mode(parameterization_mode))
517 }
518
519 pub fn with_rpc_urls(self, rpc_urls: Vec<Url>) -> Self {
542 Self { rpc_urls, ..self }
543 }
544
545 pub fn with_private_key(
553 self,
554 private_key: impl Into<PrivateKeySigner>,
555 ) -> ClientBuilder<U, D, PrivateKeySigner> {
556 self.with_signer(private_key.into())
557 }
558
559 pub fn with_private_key_str(
568 self,
569 private_key: impl AsRef<str>,
570 ) -> Result<ClientBuilder<U, D, PrivateKeySigner>, LocalSignerError> {
571 Ok(self.with_signer(PrivateKeySigner::from_str(private_key.as_ref())?))
572 }
573
574 pub fn with_signer<Zi>(self, signer: impl Into<Option<Zi>>) -> ClientBuilder<U, D, Zi>
576 where
577 Zi: Signer + Clone + TxSigner<Signature> + Send + Sync + 'static,
578 {
579 ClientBuilder {
581 signer: signer.into(),
582 deployment: self.deployment,
583 uploader: self.uploader,
584 downloader: self.downloader,
585 rpc_url: self.rpc_url,
586 rpc_urls: self.rpc_urls,
587 tx_timeout: self.tx_timeout,
588 balance_alerts: self.balance_alerts,
589 price_provider: self.price_provider.clone(),
590 offer_layer_config: self.offer_layer_config,
591 storage_layer_config: self.storage_layer_config,
592 request_id_layer_config: self.request_id_layer_config,
593 request_finalizer_config: self.request_finalizer_config,
594 funding_mode: self.funding_mode,
595 skip_preflight: self.skip_preflight,
596 }
597 }
598
599 pub fn with_timeout(self, tx_timeout: impl Into<Option<Duration>>) -> Self {
601 Self { tx_timeout: tx_timeout.into(), ..self }
602 }
603
604 pub fn with_balance_alerts(self, config: impl Into<Option<BalanceAlertConfig>>) -> Self {
606 Self { balance_alerts: config.into(), ..self }
607 }
608
609 pub fn with_uploader<Z: StorageUploader>(self, uploader: Option<Z>) -> ClientBuilder<Z, D, S> {
613 ClientBuilder {
615 deployment: self.deployment,
616 rpc_url: self.rpc_url,
617 rpc_urls: self.rpc_urls,
618 signer: self.signer,
619 uploader,
620 downloader: self.downloader,
621 tx_timeout: self.tx_timeout,
622 balance_alerts: self.balance_alerts,
623 price_provider: self.price_provider.clone(),
624 request_finalizer_config: self.request_finalizer_config,
625 request_id_layer_config: self.request_id_layer_config,
626 storage_layer_config: self.storage_layer_config,
627 offer_layer_config: self.offer_layer_config,
628 funding_mode: self.funding_mode,
629 skip_preflight: self.skip_preflight,
630 }
631 }
632
633 pub fn with_downloader<Z: StorageDownloader>(self, downloader: Z) -> ClientBuilder<U, Z, S> {
635 ClientBuilder {
637 deployment: self.deployment,
638 rpc_url: self.rpc_url,
639 rpc_urls: self.rpc_urls,
640 signer: self.signer,
641 uploader: self.uploader,
642 downloader: Some(downloader),
643 tx_timeout: self.tx_timeout,
644 balance_alerts: self.balance_alerts,
645 price_provider: self.price_provider,
646 request_finalizer_config: self.request_finalizer_config,
647 request_id_layer_config: self.request_id_layer_config,
648 storage_layer_config: self.storage_layer_config,
649 offer_layer_config: self.offer_layer_config,
650 funding_mode: self.funding_mode,
651 skip_preflight: self.skip_preflight,
652 }
653 }
654
655 pub async fn with_uploader_config(
657 self,
658 config: &StorageUploaderConfig,
659 ) -> Result<ClientBuilder<StandardUploader, D, S>, StorageError> {
660 let storage_uploader = match StandardUploader::from_config(config).await {
661 Ok(storage_uploader) => Some(storage_uploader),
662 Err(StorageError::NoUploader) => None,
663 Err(e) => return Err(e),
664 };
665 Ok(self.with_uploader(storage_uploader))
666 }
667
668 pub fn with_price_provider(
687 mut self,
688 price_provider: impl Into<Option<PriceProviderArc>>,
689 ) -> Self {
690 self.price_provider = price_provider.into();
691 self
692 }
693
694 pub fn config_offer_layer(
708 mut self,
709 f: impl FnOnce(&mut OfferLayerConfigBuilder) -> &mut OfferLayerConfigBuilder,
710 ) -> Self {
711 f(&mut self.offer_layer_config);
712 self
713 }
714
715 pub fn config_request_id_layer(
726 mut self,
727 f: impl FnOnce(&mut RequestIdLayerConfigBuilder) -> &mut RequestIdLayerConfigBuilder,
728 ) -> Self {
729 f(&mut self.request_id_layer_config);
730 self
731 }
732
733 pub fn config_storage_layer(
742 mut self,
743 f: impl FnOnce(&mut StorageLayerConfigBuilder) -> &mut StorageLayerConfigBuilder,
744 ) -> Self {
745 f(&mut self.storage_layer_config);
746 self
747 }
748
749 pub fn config_request_finalizer(
751 mut self,
752 f: impl FnOnce(&mut FinalizerConfigBuilder) -> &mut FinalizerConfigBuilder,
753 ) -> Self {
754 f(&mut self.request_finalizer_config);
755 self
756 }
757
758 pub fn with_skip_preflight(self, skip: bool) -> Self {
764 Self { skip_preflight: Some(skip), ..self }
765 }
766}
767
768#[derive(Clone)]
769#[non_exhaustive]
770pub struct Client<
772 P = DynProvider,
773 U = StandardUploader,
774 D = StandardDownloader,
775 R = StandardRequestBuilder,
776 Si = PrivateKeySigner,
777> {
778 pub boundless_market: BoundlessMarketService<P>,
780 pub set_verifier: SetVerifierService<P>,
782 pub uploader: Option<U>,
786 pub downloader: D,
788 pub offchain_client: Option<OrderStreamClient>,
792 pub signer: Option<Si>,
796 pub request_builder: Option<R>,
800 pub deployment: Deployment,
802 pub funding_mode: FundingMode,
810}
811
812pub type StandardClient = Client<
814 DynProvider,
815 StandardUploader,
816 StandardDownloader,
817 StandardRequestBuilder<DynProvider, StandardUploader, StandardDownloader>,
818 PrivateKeySigner,
819>;
820
821impl<P, U, D, Si> Client<P, U, D, StandardRequestBuilder<P, U, D>, Si> {
822 fn with_skip_preflight(mut self, skip: bool) -> Self {
823 if let Some(ref mut builder) = self.request_builder {
824 builder.skip_preflight = Some(skip);
825 }
826 self
827 }
828}
829
830#[derive(thiserror::Error, Debug)]
831#[non_exhaustive]
832pub enum ClientError {
834 #[error("Storage error {0}")]
836 StorageError(#[from] StorageError),
837 #[error("Market error {0}")]
839 MarketError(#[from] MarketError),
840 #[error("RequestError {0}")]
842 RequestError(#[from] RequestError),
843 #[error("Error building RequestBuilder {0}")]
845 BuilderError(#[from] StandardRequestBuilderBuilderError),
846 #[error("Error {0}")]
848 Error(#[from] anyhow::Error),
849}
850
851impl Client<NotProvided, NotProvided, NotProvided, NotProvided, NotProvided> {
852 pub fn builder() -> ClientBuilder<NotProvided, NotProvided, NotProvided> {
854 ClientBuilder::new()
855 }
856}
857
858impl<P, D> Client<P, NotProvided, D, NotProvided, NotProvided>
859where
860 P: Provider<Ethereum> + 'static + Clone,
861 D: StorageDownloader,
862{
863 pub fn new(
865 boundless_market: BoundlessMarketService<P>,
866 set_verifier: SetVerifierService<P>,
867 downloader: D,
868 ) -> Self {
869 let boundless_market = boundless_market.clone();
870 let set_verifier = set_verifier.clone();
871 Self {
872 deployment: Deployment {
873 boundless_market_address: *boundless_market.instance().address(),
874 set_verifier_address: *set_verifier.instance().address(),
875 market_chain_id: None,
876 order_stream_url: None,
877 collateral_token_address: None,
878 verifier_router_address: None,
879 indexer_url: None,
880 deployment_block: None,
881 },
882 boundless_market,
883 set_verifier,
884 uploader: None,
885 downloader,
886 offchain_client: None,
887 signer: None,
888 request_builder: None,
889 funding_mode: FundingMode::Always,
890 }
891 }
892}
893
894fn funding_value_for_balance(balance: U256, max_price: U256, funding_mode: FundingMode) -> U256 {
897 match funding_mode {
898 FundingMode::Always => max_price,
899
900 FundingMode::Never => U256::ZERO,
901
902 FundingMode::AvailableBalance => {
903 if balance < max_price {
904 max_price.saturating_sub(balance)
905 } else {
906 U256::ZERO
907 }
908 }
909
910 FundingMode::BelowThreshold(threshold) => {
911 if balance < threshold || balance < max_price {
912 max(threshold.saturating_sub(balance), max_price.saturating_sub(balance))
913 } else {
914 U256::ZERO
915 }
916 }
917
918 FundingMode::MinMaxBalance { min_balance, max_balance } => {
919 if balance < min_balance || balance < max_price {
920 let topup = if balance < min_balance {
921 max_balance.saturating_sub(balance)
922 } else {
923 U256::ZERO
924 };
925 max(topup, max_price.saturating_sub(balance))
926 } else {
927 U256::ZERO
928 }
929 }
930 }
931}
932
933impl<P, St, D, R, Si> Client<P, St, D, R, Si>
934where
935 P: Provider<Ethereum> + 'static + Clone,
936{
937 pub fn provider(&self) -> P {
939 self.boundless_market.instance().provider().clone()
940 }
941
942 pub fn caller(&self) -> Address {
944 self.boundless_market.caller()
945 }
946
947 pub fn with_boundless_market(self, boundless_market: BoundlessMarketService<P>) -> Self {
949 Self {
950 deployment: Deployment {
951 boundless_market_address: *boundless_market.instance().address(),
952 ..self.deployment
953 },
954 boundless_market,
955 ..self
956 }
957 }
958
959 pub fn with_set_verifier(self, set_verifier: SetVerifierService<P>) -> Self {
961 Self {
962 deployment: Deployment {
963 set_verifier_address: *set_verifier.instance().address(),
964 ..self.deployment
965 },
966 set_verifier,
967 ..self
968 }
969 }
970
971 pub fn with_offchain_client(self, offchain_client: OrderStreamClient) -> Self {
973 Self {
974 deployment: Deployment {
975 order_stream_url: Some(offchain_client.base_url.to_string().into()),
976 ..self.deployment
977 },
978 offchain_client: Some(offchain_client),
979 ..self
980 }
981 }
982
983 pub fn with_timeout(self, tx_timeout: Duration) -> Self {
985 Self {
986 boundless_market: self.boundless_market.with_timeout(tx_timeout),
987 set_verifier: self.set_verifier.with_timeout(tx_timeout),
988 ..self
989 }
990 }
991
992 pub fn with_funding_mode(self, funding_mode: FundingMode) -> Self {
994 Self { funding_mode, ..self }
995 }
996
997 pub fn with_signer<Zi>(self, signer: Zi) -> Client<P, St, D, R, Zi> {
1010 Client {
1012 signer: Some(signer),
1013 boundless_market: self.boundless_market,
1014 set_verifier: self.set_verifier,
1015 uploader: self.uploader,
1016 downloader: self.downloader,
1017 offchain_client: self.offchain_client,
1018 request_builder: self.request_builder,
1019 deployment: self.deployment,
1020 funding_mode: self.funding_mode,
1021 }
1022 }
1023
1024 pub async fn upload_program(&self, program: &[u8]) -> Result<Url, ClientError>
1026 where
1027 St: StorageUploader,
1028 {
1029 Ok(self
1030 .uploader
1031 .as_ref()
1032 .context("Storage uploader not set")?
1033 .upload_program(program)
1034 .await
1035 .context("Failed to upload program")?)
1036 }
1037
1038 pub async fn upload_input(&self, input: &[u8]) -> Result<Url, ClientError>
1040 where
1041 St: StorageUploader,
1042 {
1043 Ok(self
1044 .uploader
1045 .as_ref()
1046 .context("Storage uploader not set")?
1047 .upload_input(input)
1048 .await
1049 .context("Failed to upload input")?)
1050 }
1051
1052 pub async fn download(&self, url: &str) -> Result<Vec<u8>, ClientError>
1054 where
1055 D: StorageDownloader,
1056 {
1057 Ok(self
1058 .downloader
1059 .download(url)
1060 .await
1061 .with_context(|| format!("Failed to download {}", url))?)
1062 }
1063
1064 pub fn new_request<Params>(&self) -> Params
1066 where
1067 R: RequestBuilder<Params>,
1068 Params: Default,
1069 {
1070 Params::default()
1071 }
1072
1073 pub async fn build_request<Params>(
1083 &self,
1084 params: impl Into<Params>,
1085 ) -> Result<ProofRequest, ClientError>
1086 where
1087 R: RequestBuilder<Params>,
1088 R::Error: Into<anyhow::Error>,
1089 {
1090 let request_builder =
1091 self.request_builder.as_ref().context("request_builder is not set on Client")?;
1092 tracing::debug!("Building request");
1093 let request = request_builder.build(params).await.map_err(Into::into)?;
1094 tracing::debug!("Built request with id {:x}", request.id);
1095
1096 Ok(request)
1097 }
1098}
1099
1100impl<P, U, D, R, Si> Client<P, U, D, R, Si>
1101where
1102 P: Provider<Ethereum> + 'static + Clone,
1103{
1104 async fn compute_funding_value(
1105 &self,
1106 client_address: Address,
1107 max_price: U256,
1108 ) -> Result<U256, ClientError> {
1109 let balance = self.boundless_market.balance_of(client_address).await?;
1110 let value = funding_value_for_balance(balance, max_price, self.funding_mode);
1111
1112 if value > U256::ZERO {
1113 if let FundingMode::BelowThreshold(threshold) = self.funding_mode {
1114 if balance < threshold {
1115 tracing::warn!(
1116 "Client balance is {} ETH < threshold {} ETH. \
1117 Sending additional funds to top up the balance.",
1118 format_ether(balance),
1119 format_ether(threshold),
1120 );
1121 }
1122 } else if let FundingMode::MinMaxBalance { min_balance, max_balance } =
1123 self.funding_mode
1124 {
1125 if balance < min_balance {
1126 tracing::warn!(
1127 "Client balance is {} ETH < min {} ETH. \
1128 Sending {} ETH (max target {}).",
1129 format_ether(balance),
1130 format_ether(min_balance),
1131 format_ether(value),
1132 format_ether(max_balance),
1133 );
1134 }
1135 }
1136 }
1137
1138 if let FundingMode::Always = self.funding_mode {
1139 if balance > max_price.saturating_mul(U256::from(3u8)) {
1140 tracing::warn!(
1141 "Client balance is {} ETH, that is more than 3x the value being sent. \
1142 Consider switching to a different funding mode to avoid overfunding.",
1143 format_ether(balance),
1144 );
1145 }
1146 }
1147
1148 Ok(value)
1149 }
1150
1151 pub async fn submit_onchain<Params>(
1156 &self,
1157 params: impl Into<Params>,
1158 ) -> Result<(U256, u64), ClientError>
1159 where
1160 Si: Signer,
1161 R: RequestBuilder<Params>,
1162 R::Error: Into<anyhow::Error>,
1163 {
1164 let signer = self.signer.as_ref().context("signer is set on Client")?;
1165 self.submit_request_onchain_with_signer(&self.build_request(params).await?, signer).await
1166 }
1167
1168 pub async fn submit_request_onchain(
1172 &self,
1173 request: &ProofRequest,
1174 ) -> Result<(U256, u64), ClientError>
1175 where
1176 Si: Signer,
1177 {
1178 let signer = self.signer.as_ref().context("signer not set")?;
1179 self.submit_request_onchain_with_signer(request, signer).await
1180 }
1181
1182 pub async fn submit_request_onchain_with_signer(
1187 &self,
1188 request: &ProofRequest,
1189 signer: &impl Signer,
1190 ) -> Result<(U256, u64), ClientError> {
1191 let mut request = request.clone();
1192
1193 if request.id == U256::ZERO {
1194 request.id = self.boundless_market.request_id_from_rand().await?;
1195 };
1196 let client_address = request.client_address();
1197 if client_address != signer.address() {
1198 return Err(MarketError::AddressMismatch(client_address, signer.address()))?;
1199 };
1200
1201 request.validate()?;
1202
1203 let max_price = U256::from(request.offer.maxPrice);
1204 let value = self.compute_funding_value(client_address, max_price).await?;
1205
1206 let request_id =
1207 self.boundless_market.submit_request_with_value(&request, signer, value).await?;
1208
1209 Ok((request_id, request.expires_at()))
1210 }
1211
1212 pub async fn submit_request_onchain_with_signature(
1216 &self,
1217 request: &ProofRequest,
1218 signature: impl Into<Bytes>,
1219 ) -> Result<(U256, u64), ClientError> {
1220 let request = request.clone();
1221 request.validate()?;
1222
1223 let request_id =
1224 self.boundless_market.submit_request_with_signature(&request, signature).await?;
1225 Ok((request_id, request.expires_at()))
1226 }
1227
1228 pub async fn submit<Params>(
1236 &self,
1237 params: impl Into<Params>,
1238 ) -> Result<(U256, u64), ClientError>
1239 where
1240 Si: Signer,
1241 R: RequestBuilder<Params>,
1242 R::Error: Into<anyhow::Error>,
1243 {
1244 let request = self.build_request(params).await?;
1245 self.submit_request(&request).await
1246 }
1247
1248 pub async fn submit_request(&self, request: &ProofRequest) -> Result<(U256, u64), ClientError>
1255 where
1256 Si: Signer,
1257 {
1258 let signer = self.signer.as_ref().context("signer not set")?;
1259 self.submit_request_with_signer(request, signer).await
1260 }
1261
1262 pub async fn submit_request_with_signer(
1269 &self,
1270 request: &ProofRequest,
1271 signer: &impl Signer,
1272 ) -> Result<(U256, u64), ClientError>
1273 where
1274 Si: Signer,
1275 {
1276 let mut request = request.clone();
1277
1278 if request.id == U256::ZERO {
1279 request.id = self.boundless_market.request_id_from_rand().await?;
1280 };
1281 let client_address = request.client_address();
1282 if client_address != signer.address() {
1283 return Err(MarketError::AddressMismatch(client_address, signer.address()))?;
1284 };
1285 request.validate()?;
1286
1287 let max_price = U256::from(request.offer.maxPrice);
1288 let mut value = self.compute_funding_value(client_address, max_price).await?;
1289
1290 if let Some(offchain_client) = &self.offchain_client {
1292 if value > 0 {
1294 self.boundless_market.deposit(value).await?;
1295 value = U256::ZERO; }
1297
1298 match offchain_client.submit_request(&request, signer).await {
1299 Ok(order) => return Ok((order.request.id, request.expires_at())),
1300 Err(e) => {
1301 tracing::warn!(
1302 "Failed to submit request offchain: {e:?}, falling back to onchain submission"
1303 );
1304 }
1306 }
1307 }
1308
1309 let request_id =
1311 self.boundless_market.submit_request_with_value(&request, signer, value).await?;
1312 Ok((request_id, request.expires_at()))
1313 }
1314
1315 pub async fn submit_offchain<Params>(
1320 &self,
1321 params: impl Into<Params>,
1322 ) -> Result<(U256, u64), ClientError>
1323 where
1324 Si: Signer,
1325 R: RequestBuilder<Params>,
1326 R::Error: Into<anyhow::Error>,
1327 {
1328 let signer = self.signer.as_ref().context("signer is set on Client")?;
1329 self.submit_request_offchain_with_signer(&self.build_request(params).await?, signer).await
1330 }
1331
1332 pub async fn submit_request_offchain(
1336 &self,
1337 request: &ProofRequest,
1338 ) -> Result<(U256, u64), ClientError>
1339 where
1340 Si: Signer,
1341 {
1342 let signer = self.signer.as_ref().context("signer not set")?;
1343 self.submit_request_offchain_with_signer(request, signer).await
1344 }
1345
1346 pub async fn submit_request_offchain_with_signer(
1350 &self,
1351 request: &ProofRequest,
1352 signer: &impl Signer,
1353 ) -> Result<(U256, u64), ClientError> {
1354 let offchain_client = self
1355 .offchain_client
1356 .as_ref()
1357 .context("Order stream client not available. Please provide an order stream URL")?;
1358 let mut request = request.clone();
1359
1360 if request.id == U256::ZERO {
1361 request.id = self.boundless_market.request_id_from_rand().await?;
1362 };
1363 let client_address = request.client_address();
1364 if client_address != signer.address() {
1365 return Err(MarketError::AddressMismatch(client_address, signer.address()))?;
1366 };
1367
1368 request.validate()?;
1369
1370 let max_price = U256::from(request.offer.maxPrice);
1371 let value = self.compute_funding_value(client_address, max_price).await?;
1372 if value > 0 {
1373 self.boundless_market.deposit(value).await?;
1374 }
1375
1376 let order = offchain_client.submit_request(&request, signer).await?;
1377
1378 Ok((order.request.id, request.expires_at()))
1379 }
1380
1381 pub async fn wait_for_request_fulfillment(
1386 &self,
1387 request_id: U256,
1388 check_interval: std::time::Duration,
1389 expires_at: u64,
1390 ) -> Result<Fulfillment, ClientError> {
1391 Ok(self
1392 .boundless_market
1393 .wait_for_request_fulfillment(request_id, check_interval, expires_at)
1394 .await?)
1395 }
1396
1397 pub async fn fetch_set_inclusion_receipt(
1449 &self,
1450 request_id: U256,
1451 image_id: B256,
1452 search_to_block: Option<u64>,
1453 search_from_block: Option<u64>,
1454 ) -> Result<(Bytes, SetInclusionReceipt<ReceiptClaim>), ClientError> {
1455 let fulfillment = self
1458 .boundless_market
1459 .get_request_fulfillment(request_id, search_to_block, search_from_block)
1460 .await?;
1461 match fulfillment.data().context("failed to decode fulfillment data")? {
1462 FulfillmentData::None => Err(ClientError::Error(anyhow!(
1463 "No fulfillment data found for set inclusion receipt"
1464 ))),
1465 FulfillmentData::ImageIdAndJournal(_, journal) => {
1466 let claim = ReceiptClaim::ok(Digest::from(image_id.0), journal.to_vec());
1467 let receipt = self
1468 .set_verifier
1469 .fetch_receipt_with_claim(fulfillment.seal, claim, journal.to_vec())
1470 .await?;
1471 Ok((journal, receipt))
1472 }
1473 }
1474 }
1475
1476 pub async fn fetch_proof_request(
1536 &self,
1537 request_id: U256,
1538 tx_hash: Option<B256>,
1539 request_digest: Option<B256>,
1540 search_to_block: Option<u64>,
1541 search_from_block: Option<u64>,
1542 ) -> Result<(ProofRequest, Bytes), ClientError> {
1543 if let Some(ref order_stream_client) = self.offchain_client {
1544 tracing::debug!("Querying the order stream for request: 0x{request_id:x} using request_digest {request_digest:?}");
1545 match order_stream_client.fetch_order(request_id, request_digest).await {
1546 Ok(order) => {
1547 tracing::debug!("Found request 0x{request_id:x} offchain");
1548 return Ok((order.request, Bytes::from(order.signature.as_bytes())));
1549 }
1550 Err(err) => {
1551 if err.to_string().contains("No order found") {
1553 tracing::debug!("Request 0x{request_id:x} not found offchain");
1554 } else {
1555 tracing::error!(
1556 "Error querying order stream for request 0x{request_id:x}; err = {err}"
1557 );
1558 }
1559 }
1560 }
1561 } else {
1562 tracing::debug!("Skipping query for request offchain; no order stream client provided");
1563 }
1564
1565 tracing::debug!(
1566 "Querying the blockchain for request: 0x{request_id:x} using tx_hash {tx_hash:?}"
1567 );
1568 match self
1569 .boundless_market
1570 .get_submitted_request(request_id, tx_hash, search_to_block, search_from_block)
1571 .await
1572 {
1573 Ok((proof_request, signature)) => Ok((proof_request, signature)),
1574 Err(err @ MarketError::RequestNotFound(..)) => Err(err.into()),
1575 err @ Err(_) => err
1576 .with_context(|| format!("error querying for 0x{request_id:x} onchain"))
1577 .map_err(Into::into),
1578 }
1579 }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584 use super::{funding_value_for_balance, FundingMode};
1585 use alloy::primitives::U256;
1586
1587 #[test]
1588 fn funding_always_sends_max_price() {
1589 let max_price = U256::from(20u64);
1590 assert_eq!(
1591 funding_value_for_balance(U256::ZERO, max_price, FundingMode::Always),
1592 max_price
1593 );
1594 assert_eq!(
1595 funding_value_for_balance(U256::from(100u64), max_price, FundingMode::Always),
1596 max_price
1597 );
1598 }
1599
1600 #[test]
1601 fn funding_never_sends_zero() {
1602 let balance = U256::from(5u64);
1603 let max_price = U256::from(20u64);
1604 assert_eq!(funding_value_for_balance(balance, max_price, FundingMode::Never), U256::ZERO);
1605 }
1606
1607 #[test]
1608 fn funding_available_balance_sends_shortfall_when_insufficient() {
1609 let balance = U256::from(5u64);
1610 let max_price = U256::from(20u64);
1611 assert_eq!(
1612 funding_value_for_balance(balance, max_price, FundingMode::AvailableBalance),
1613 U256::from(15u64)
1614 );
1615 }
1616
1617 #[test]
1618 fn funding_available_balance_sends_zero_when_sufficient() {
1619 let balance = U256::from(25u64);
1620 let max_price = U256::from(20u64);
1621 assert_eq!(
1622 funding_value_for_balance(balance, max_price, FundingMode::AvailableBalance),
1623 U256::ZERO
1624 );
1625 }
1626
1627 #[test]
1628 fn funding_below_threshold_balance_above_threshold_below_max_price_sends_shortfall() {
1629 let balance = U256::from(15u64);
1631 let threshold = U256::from(10u64);
1632 let max_price = U256::from(20u64);
1633
1634 let value =
1635 funding_value_for_balance(balance, max_price, FundingMode::BelowThreshold(threshold));
1636
1637 assert_eq!(value, U256::from(5u64), "should send shortfall for this request");
1638 }
1639
1640 #[test]
1641 fn funding_below_threshold_balance_below_threshold_sends_max_of_topup_and_shortfall() {
1642 let balance = U256::from(5u64);
1643 let threshold = U256::from(10u64);
1644 let max_price = U256::from(20u64);
1645
1646 let value =
1647 funding_value_for_balance(balance, max_price, FundingMode::BelowThreshold(threshold));
1648
1649 assert_eq!(value, U256::from(15u64)); }
1651
1652 #[test]
1653 fn funding_below_threshold_balance_above_max_price_sends_zero() {
1654 let balance = U256::from(25u64);
1655 let threshold = U256::from(10u64);
1656 let max_price = U256::from(20u64);
1657
1658 let value =
1659 funding_value_for_balance(balance, max_price, FundingMode::BelowThreshold(threshold));
1660
1661 assert_eq!(value, U256::ZERO);
1662 }
1663
1664 #[test]
1665 fn funding_min_max_balance_above_min_below_max_price_sends_shortfall() {
1666 let balance = U256::from(15u64);
1668 let min_balance = U256::from(10u64);
1669 let max_balance = U256::from(100u64);
1670 let max_price = U256::from(20u64);
1671
1672 let value = funding_value_for_balance(
1673 balance,
1674 max_price,
1675 FundingMode::MinMaxBalance { min_balance, max_balance },
1676 );
1677
1678 assert_eq!(value, U256::from(5u64), "should send shortfall for this request");
1679 }
1680
1681 #[test]
1682 fn funding_min_max_balance_below_min_sends_max_of_topup_and_shortfall() {
1683 let balance = U256::from(5u64);
1684 let min_balance = U256::from(10u64);
1685 let max_balance = U256::from(100u64);
1686 let max_price = U256::from(20u64);
1687
1688 let value = funding_value_for_balance(
1689 balance,
1690 max_price,
1691 FundingMode::MinMaxBalance { min_balance, max_balance },
1692 );
1693
1694 assert_eq!(value, U256::from(95u64)); }
1696
1697 #[test]
1698 fn funding_min_max_balance_above_max_price_sends_zero() {
1699 let balance = U256::from(25u64);
1700 let min_balance = U256::from(10u64);
1701 let max_balance = U256::from(100u64);
1702 let max_price = U256::from(20u64);
1703
1704 let value = funding_value_for_balance(
1705 balance,
1706 max_price,
1707 FundingMode::MinMaxBalance { min_balance, max_balance },
1708 );
1709
1710 assert_eq!(value, U256::ZERO);
1711 }
1712}