1use std::{
16 fmt::Debug,
17 sync::atomic::{AtomicU64, Ordering},
18 time::Duration,
19};
20
21use alloy::{
22 consensus::{BlockHeader, Transaction},
23 eips::BlockNumberOrTag,
24 network::Ethereum,
25 primitives::{utils::format_ether, Address, Bytes, B256, U256},
26 providers::{PendingTransactionBuilder, PendingTransactionError, Provider},
27 rpc::types::{Log, TransactionReceipt},
28 signers::Signer,
29};
30
31use alloy_sol_types::{SolCall, SolEvent, SolInterface};
32use anyhow::{anyhow, Context, Result};
33use thiserror::Error;
34
35use super::{
36 eip712_domain, AssessorReceipt, EIP712DomainSaltless, Fulfillment,
37 IBoundlessMarket::{self, IBoundlessMarketErrors, IBoundlessMarketInstance, ProofDelivered},
38 Offer, ProofRequest, RequestError, RequestId, RequestStatus, TxnErr, TXN_CONFIRM_TIMEOUT,
39};
40use crate::{
41 contracts::token::{IERC20Permit, IHitPoints::IHitPointsErrors, Permit, IERC20},
42 deployments::collateral_token_supports_permit,
43};
44
45#[derive(Clone, Debug)]
47#[non_exhaustive]
48pub struct EventRetryConfig {
49 pub retries: u32,
56 pub retry_delay_ms: Option<u64>,
64}
65
66#[derive(Clone, Debug)]
68#[non_exhaustive]
69pub struct EventQueryConfig {
70 pub block_range: u64,
72 pub max_iterations: u64,
74 pub retry_config: Option<EventRetryConfig>,
78}
79
80impl Default for EventQueryConfig {
81 fn default() -> Self {
88 Self {
89 block_range: 499,
90 max_iterations: 500,
91 retry_config: Some(EventRetryConfig { retries: 1, retry_delay_ms: Some(5000) }),
92 }
93 }
94}
95
96impl EventQueryConfig {
97 pub fn default_for_broker() -> Self {
105 Self { block_range: 499, max_iterations: 100, retry_config: None }
106 }
107}
108
109const FRACTION_STAKE_NUMERATOR: u64 = 1;
114const FRACTION_STAKE_DENOMINATOR: u64 = 2;
115
116#[derive(Error, Debug)]
118pub enum MarketError {
119 #[error("Transaction error: {0}")]
121 TxnError(#[from] TxnErr),
122
123 #[error("Transaction confirmation error: {0:?}")]
125 TxnConfirmationError(anyhow::Error),
126
127 #[error("Request is not fulfilled 0x{0:x}")]
129 RequestNotFulfilled(U256),
130
131 #[error("Request has expired 0x{0:x}")]
133 RequestHasExpired(U256),
134
135 #[error("Request is slashed 0x{0:x}")]
137 RequestIsSlashed(U256),
138
139 #[error("Request error {0}")]
141 RequestError(#[from] RequestError),
142
143 #[error("Request address does not match with signer {0} - {0}")]
145 AddressMismatch(Address, Address),
146
147 #[error("Proof not found for request 0x{0:x} in events logs after searching from block {1} to block {2}. Consider increasing EventQueryConfig.block_range or EventQueryConfig.max_iterations to increase search depth or specify start/end blocks")]
149 ProofNotFound(U256, u64, u64),
150
151 #[error("Request not found for request 0x{0:x} in event logs after searching from block {1} to block {2}. Consider increasing EventQueryConfig.block_range or EventQueryConfig.max_iterations to increase search depth or specify start/end blocks")]
153 RequestNotFound(U256, u64, u64),
154
155 #[error("Request already locked: 0x{0:x}")]
157 RequestAlreadyLocked(U256),
158
159 #[error("Lock request reverted, possibly outbid: txn_hash: {0}")]
161 LockRevert(B256),
162
163 #[error("Slash request reverted, possibly already slashed: txn_hash: {0}")]
165 SlashRevert(B256),
166
167 #[error("Expected log was not emitted, txn {0} possibly reverted: {1}")]
169 LogNotEmitted(B256, anyhow::Error),
170
171 #[error("Other error: {0:?}")]
173 Error(#[from] anyhow::Error),
174
175 #[error("Timeout: 0x{0:x}")]
177 TimeoutReached(U256),
178
179 #[error("Payment requirements failed during order fulfillment: {0:?}")]
181 PaymentRequirementsFailed(IBoundlessMarketErrors),
182
183 #[error(
185 "Payment requirements failed during order fulfillment: unrecognized error payload {0:?}"
186 )]
187 PaymentRequirementsFailedUnknownError(Bytes),
188}
189
190impl From<alloy::contract::Error> for MarketError {
191 fn from(err: alloy::contract::Error) -> Self {
192 tracing::debug!("raw alloy contract error: {:?}", err);
193 MarketError::Error(TxnErr::from(err).into())
194 }
195}
196
197pub struct BoundlessMarketService<P> {
199 instance: IBoundlessMarketInstance<P, Ethereum>,
200 chain_id: AtomicU64,
203 caller: Address,
204 timeout: Duration,
205 event_query_config: EventQueryConfig,
206 balance_alert_config: StakeBalanceAlertConfig,
207 receipt_query_config: ReceiptQueryConfig,
208}
209
210#[derive(Clone, Debug)]
211struct ReceiptQueryConfig {
212 retry_interval: Duration,
214 retry_count: usize,
216}
217
218impl Default for ReceiptQueryConfig {
219 fn default() -> Self {
220 Self { retry_count: 10, retry_interval: Duration::from_millis(500) }
221 }
222}
223
224#[derive(Clone, Debug, Default)]
225struct StakeBalanceAlertConfig {
226 warn_threshold: Option<U256>,
228 error_threshold: Option<U256>,
230}
231
232impl<P: Clone> Clone for BoundlessMarketService<P> {
233 fn clone(&self) -> Self {
234 Self {
235 instance: self.instance.clone(),
236 chain_id: self.chain_id.load(Ordering::Relaxed).into(),
237 caller: self.caller,
238 timeout: self.timeout,
239 event_query_config: self.event_query_config.clone(),
240 balance_alert_config: self.balance_alert_config.clone(),
241 receipt_query_config: self.receipt_query_config.clone(),
242 }
243 }
244}
245
246fn extract_tx_log<E: SolEvent + Debug + Clone>(
247 receipt: &TransactionReceipt,
248) -> Result<Log<E>, anyhow::Error> {
249 let logs = receipt
250 .inner
251 .logs()
252 .iter()
253 .filter_map(|log| {
254 if log.topic0().map(|topic| E::SIGNATURE_HASH == *topic).unwrap_or(false) {
255 Some(
256 log.log_decode::<E>()
257 .with_context(|| format!("failed to decode event {}", E::SIGNATURE)),
258 )
259 } else {
260 tracing::debug!(
261 "skipping log on receipt; does not match {}: {log:?}",
262 E::SIGNATURE
263 );
264 None
265 }
266 })
267 .collect::<Result<Vec<_>>>()?;
268
269 match &logs[..] {
270 [log] => Ok(log.clone()),
271 [] => Err(anyhow!(
272 "transaction 0x{:x} did not emit event {}",
273 receipt.transaction_hash,
274 E::SIGNATURE
275 )),
276 _ => Err(anyhow!(
277 "transaction emitted more than one event with signature {}, {:#?}",
278 E::SIGNATURE,
279 logs
280 )),
281 }
282}
283
284fn validate_fulfill_receipt(receipt: TransactionReceipt) -> Result<(), MarketError> {
285 for (idx, log) in receipt.inner.logs().iter().enumerate() {
286 if log.topic0().is_some_and(|topic| {
287 *topic == IBoundlessMarket::PaymentRequirementsFailed::SIGNATURE_HASH
288 }) {
289 match log.log_decode::<IBoundlessMarket::PaymentRequirementsFailed>() {
290 Ok(decoded) => {
291 let raw_error = Bytes::copy_from_slice(decoded.inner.data.error.as_ref());
292 match IBoundlessMarketErrors::abi_decode(&raw_error) {
293 Ok(err) => tracing::warn!(
294 tx_hash = ?receipt.transaction_hash,
295 log_index = idx,
296 "Payment requirements failed for at least one fulfillment: {err:?}"
297 ),
298 Err(_) => tracing::warn!(
299 tx_hash = ?receipt.transaction_hash,
300 log_index = idx,
301 raw = ?raw_error,
302 "Payment requirements failed for at least one fulfillment, but error payload was unrecognized"
303 ),
304 }
305 }
306 Err(err) => tracing::warn!(
307 tx_hash = ?receipt.transaction_hash,
308 log_index = idx,
309 "Failed to decode PaymentRequirementsFailed event: {err:?}"
310 ),
311 }
312 }
313 }
314 Ok(())
315}
316
317#[derive(Debug, Clone)]
319pub struct RequestSubmittedEventData {
320 pub request: ProofRequest,
322 pub client_signature: Bytes,
324 pub block_number: u64,
326 pub tx_hash: B256,
328}
329
330#[derive(Debug, Clone)]
332pub struct RequestLockedEventData {
333 pub event: IBoundlessMarket::RequestLocked,
335 pub block_number: u64,
337 pub tx_hash: B256,
339}
340
341#[derive(Debug, Clone)]
343pub struct RequestFulfilledEventData {
344 pub event: IBoundlessMarket::RequestFulfilled,
346 pub block_number: u64,
348 pub tx_hash: B256,
350}
351
352#[derive(Debug, Clone)]
354pub struct ProofDeliveredEventData {
355 pub event: ProofDelivered,
357 pub block_number: u64,
359 pub tx_hash: B256,
361}
362
363#[derive(Debug, Clone)]
365pub struct ProverSlashedEventData {
366 pub event: IBoundlessMarket::ProverSlashed,
368 pub block_number: u64,
370 pub tx_hash: B256,
372}
373
374impl<P: Provider> BoundlessMarketService<P> {
375 pub fn new(address: impl Into<Address>, provider: P, caller: impl Into<Address>) -> Self {
380 let instance = IBoundlessMarket::new(address.into(), provider);
381
382 Self {
383 instance,
384 chain_id: AtomicU64::new(0),
385 caller: caller.into(),
386 timeout: TXN_CONFIRM_TIMEOUT,
387 event_query_config: EventQueryConfig::default(),
388 balance_alert_config: StakeBalanceAlertConfig::default(),
389 receipt_query_config: ReceiptQueryConfig::default(),
390 }
391 }
392
393 pub fn new_for_broker(
398 address: impl Into<Address>,
399 provider: P,
400 caller: impl Into<Address>,
401 ) -> Self {
402 let instance = IBoundlessMarket::new(address.into(), provider);
403
404 Self {
405 instance,
406 chain_id: AtomicU64::new(0),
407 caller: caller.into(),
408 timeout: TXN_CONFIRM_TIMEOUT,
409 event_query_config: EventQueryConfig::default_for_broker(),
410 balance_alert_config: StakeBalanceAlertConfig::default(),
411 receipt_query_config: ReceiptQueryConfig::default(),
412 }
413 }
414
415 pub fn with_timeout(self, timeout: Duration) -> Self {
417 Self { timeout, ..self }
418 }
419
420 pub fn with_event_query_config(self, config: EventQueryConfig) -> Self {
422 Self { event_query_config: config, ..self }
423 }
424
425 pub fn with_collateral_balance_alert(
427 self,
428 warn_threshold: &Option<U256>,
429 error_threshold: &Option<U256>,
430 ) -> Self {
431 Self {
432 balance_alert_config: StakeBalanceAlertConfig {
433 warn_threshold: *warn_threshold,
434 error_threshold: *error_threshold,
435 },
436 ..self
437 }
438 }
439
440 pub fn with_receipt_retry_count(mut self, count: usize) -> Self {
442 self.receipt_query_config.retry_count = count;
443 self
444 }
445
446 pub fn with_receipt_retry_interval(mut self, interval: Duration) -> Self {
448 self.receipt_query_config.retry_interval = interval;
449 self
450 }
451
452 pub fn instance(&self) -> &IBoundlessMarketInstance<P, Ethereum> {
454 &self.instance
455 }
456
457 pub fn caller(&self) -> Address {
459 self.caller
460 }
461
462 pub async fn eip712_domain(&self) -> Result<EIP712DomainSaltless, MarketError> {
466 Ok(eip712_domain(*self.instance.address(), self.get_chain_id().await?))
467 }
468
469 pub async fn deposit(&self, value: U256) -> Result<(), MarketError> {
471 tracing::trace!("Calling deposit() value: {value}");
472 let call = self.instance.deposit().value(value);
473 let pending_tx = call.send().await?;
474 tracing::debug!("Broadcasting deposit tx {}", pending_tx.tx_hash());
475 let tx_hash = pending_tx
476 .with_timeout(Some(self.timeout))
477 .watch()
478 .await
479 .context("failed to confirm tx")?;
480 tracing::debug!("Submitted deposit {}", tx_hash);
481
482 Ok(())
483 }
484
485 pub async fn withdraw(&self, amount: U256) -> Result<(), MarketError> {
487 tracing::trace!("Calling withdraw({amount})");
488 let call = self.instance.withdraw(amount);
489 let pending_tx = call.send().await?;
490 tracing::debug!("Broadcasting withdraw tx {}", pending_tx.tx_hash());
491 let tx_hash = pending_tx
492 .with_timeout(Some(self.timeout))
493 .watch()
494 .await
495 .context("failed to confirm tx")?;
496 tracing::debug!("Submitted withdraw {}", tx_hash);
497
498 Ok(())
499 }
500
501 pub async fn balance_of(&self, account: impl Into<Address>) -> Result<U256, MarketError> {
503 let account = account.into();
504 tracing::trace!("Calling balanceOf({account})");
505 let balance = self.instance.balanceOf(account).call().await?;
506
507 Ok(balance)
508 }
509
510 pub async fn submit_request_with_value(
513 &self,
514 request: &ProofRequest,
515 signer: &impl Signer,
516 value: impl Into<U256>,
517 ) -> Result<U256, MarketError> {
518 tracing::trace!("Calling submitRequest({:x?})", request);
519 tracing::debug!("Sending request ID {:x}", request.id);
520 let client_address = request.client_address();
521 if client_address != signer.address() {
522 return Err(MarketError::AddressMismatch(client_address, signer.address()));
523 };
524 let chain_id = self.get_chain_id().await.context("failed to get chain ID")?;
525 let client_sig = request
526 .sign_request(signer, *self.instance.address(), chain_id)
527 .await
528 .context("failed to sign request")?;
529 let call = self
530 .instance
531 .submitRequest(request.clone(), client_sig.as_bytes().into())
532 .from(self.caller)
533 .value(value.into());
534 let pending_tx = call.send().await?;
535 tracing::debug!(
536 "Broadcasting tx {:x} with request ID {:x}",
537 pending_tx.tx_hash(),
538 request.id
539 );
540
541 let receipt = self.get_receipt_with_retry(pending_tx).await?;
542
543 match extract_tx_log::<IBoundlessMarket::RequestSubmitted>(&receipt) {
545 Ok(log) => Ok(U256::from(log.inner.data.requestId)),
546 Err(e) => Err(MarketError::LogNotEmitted(receipt.transaction_hash, e)),
547 }
548 }
549
550 pub async fn submit_request_with_signature(
553 &self,
554 request: &ProofRequest,
555 signature: impl Into<Bytes>,
556 ) -> Result<U256, MarketError> {
557 tracing::trace!("Calling submitRequest({:x?})", request);
558 tracing::debug!("Sending request ID {:x}", request.id);
559 let call = self.instance.submitRequest(request.clone(), signature.into()).from(self.caller);
560 let pending_tx = call.send().await?;
561 tracing::debug!(
562 "Broadcasting tx {:x} with request ID {:x}",
563 pending_tx.tx_hash(),
564 request.id
565 );
566
567 let receipt = self.get_receipt_with_retry(pending_tx).await?;
568
569 match extract_tx_log::<IBoundlessMarket::RequestSubmitted>(&receipt) {
571 Ok(log) => Ok(U256::from(log.inner.data.requestId)),
572 Err(e) => Err(MarketError::LogNotEmitted(receipt.transaction_hash, e)),
573 }
574 }
575
576 pub async fn submit_request(
580 &self,
581 request: &ProofRequest,
582 signer: &impl Signer,
583 ) -> Result<U256, MarketError> {
584 let balance = self
585 .balance_of(signer.address())
586 .await
587 .context("failed to get whether the client balance can cover the offer max price")?;
588 let max_price = U256::from(request.offer.maxPrice);
589 let value = if balance > max_price { U256::ZERO } else { U256::from(max_price) - balance };
590 if value > U256::ZERO {
591 tracing::debug!("Sending {} ETH with request {:x}", format_ether(value), request.id);
592 }
593 self.submit_request_with_value(request, signer, value).await
594 }
595
596 pub async fn lock_request(
603 &self,
604 request: &ProofRequest,
605 client_sig: impl Into<Bytes>,
606 ) -> Result<u64, MarketError> {
607 tracing::trace!("Calling requestIsLocked({:x})", request.id);
608 let is_locked_in: bool =
609 self.instance.requestIsLocked(request.id).call().await.context("call failed")?;
610 if is_locked_in {
611 return Err(MarketError::RequestAlreadyLocked(request.id));
612 }
613
614 let client_sig_bytes = client_sig.into();
615 tracing::trace!("Calling lockRequest({:x?}, {:x?})", request, client_sig_bytes);
616
617 let call = self.instance.lockRequest(request.clone(), client_sig_bytes).from(self.caller);
618
619 tracing::trace!("Sending tx {}", format!("{:?}", call));
620 let pending_tx = call.send().await?;
621
622 let tx_hash = *pending_tx.tx_hash();
623 tracing::trace!("Broadcasting lock request tx {}", tx_hash);
624
625 let receipt = self.get_receipt_with_retry(pending_tx).await?;
626
627 if !receipt.status() {
628 return Err(MarketError::LockRevert(receipt.transaction_hash));
630 }
631
632 tracing::info!(
633 "Locked request {:x}, transaction hash: {}",
634 request.id,
635 receipt.transaction_hash
636 );
637
638 self.check_collateral_balance().await?;
639
640 Ok(receipt.block_number.context("TXN Receipt missing block number")?)
641 }
642
643 pub async fn lock_request_with_signature(
651 &self,
652 request: &ProofRequest,
653 client_sig: impl Into<Bytes>,
654 prover_sig: impl Into<Bytes>,
655 _priority_gas: Option<u128>,
656 ) -> Result<u64, MarketError> {
657 tracing::trace!("Calling requestIsLocked({:x})", request.id);
658 let is_locked_in: bool =
659 self.instance.requestIsLocked(request.id).call().await.context("call failed")?;
660 if is_locked_in {
661 return Err(MarketError::RequestAlreadyLocked(request.id));
662 }
663
664 let client_sig_bytes = client_sig.into();
665 let prover_sig_bytes = prover_sig.into();
666 tracing::trace!(
667 "Calling lockRequestWithSignature({:x?}, {:x?}, {:x?})",
668 request,
669 client_sig_bytes,
670 prover_sig_bytes
671 );
672
673 let call = self
674 .instance
675 .lockRequestWithSignature(request.clone(), client_sig_bytes.clone(), prover_sig_bytes)
676 .from(self.caller);
677 let pending_tx = call.send().await.context("Failed to lock")?;
678 tracing::trace!("Broadcasting lock request with signature tx {}", pending_tx.tx_hash());
679
680 let receipt = self.get_receipt_with_retry(pending_tx).await?;
681 if !receipt.status() {
682 return Err(MarketError::LockRevert(receipt.transaction_hash));
684 }
685
686 tracing::info!(
687 "Locked request {:x}, transaction hash: {}",
688 request.id,
689 receipt.transaction_hash
690 );
691
692 Ok(receipt.block_number.context("TXN Receipt missing block number")?)
693 }
694
695 async fn get_receipt_with_retry(
696 &self,
697 pending_tx: PendingTransactionBuilder<Ethereum>,
698 ) -> Result<TransactionReceipt, MarketError> {
699 let tx_hash = *pending_tx.tx_hash();
700
701 let tx_result = self.instance.provider().get_transaction_by_hash(tx_hash).await;
704 if let Ok(Some(tx)) = tx_result {
705 let nonce = tx.nonce();
706 tracing::debug!("Tx {} broadcasted with nonce {}", tx_hash, nonce);
707 } else {
708 tracing::debug!(
709 "Tx {} not found immediately after broadcast. Can't get nonce.",
710 tx_hash
711 );
712 }
713
714 match pending_tx.with_timeout(Some(self.timeout)).get_receipt().await {
715 Ok(receipt) => Ok(receipt),
716 Err(PendingTransactionError::TransportError(err)) if err.is_null_resp() => {
717 tracing::debug!("failed to query receipt of confirmed transaction, retrying");
718 for _ in 0..self.receipt_query_config.retry_count {
722 if let Ok(Some(receipt)) =
723 self.instance.provider().get_transaction_receipt(tx_hash).await
724 {
725 return Ok(receipt);
726 }
727
728 tokio::time::sleep(self.receipt_query_config.retry_interval).await;
729 }
730
731 Err(anyhow!(
732 "Transaction {:?} confirmed, but receipt was not found after {} retries.",
733 tx_hash,
734 self.receipt_query_config.retry_count
735 )
736 .into())
737 }
738 Err(e) => Err(MarketError::TxnConfirmationError(anyhow!(
739 "failed to confirm tx {:?} within timeout {:?}: {}",
740 tx_hash,
741 self.timeout,
742 e
743 ))),
744 }
745 }
746
747 pub async fn slash(
750 &self,
751 request_id: U256,
752 ) -> Result<IBoundlessMarket::ProverSlashed, MarketError> {
753 if self.is_slashed(request_id).await? {
754 return Err(MarketError::RequestIsSlashed(request_id));
755 }
756
757 tracing::trace!("Calling slash({:x?})", request_id);
758 let call = self.instance.slash(request_id).from(self.caller);
759 let pending_tx = call.send().await?;
760 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
761
762 let receipt = self.get_receipt_with_retry(pending_tx).await?;
763
764 if !receipt.status() {
765 return Err(MarketError::SlashRevert(receipt.transaction_hash));
766 }
767
768 match extract_tx_log::<IBoundlessMarket::ProverSlashed>(&receipt) {
769 Ok(log) => Ok(log.inner.data),
770 Err(e) => Err(MarketError::LogNotEmitted(receipt.transaction_hash, e)),
771 }
772 }
773
774 pub async fn fulfill(&self, tx: FulfillmentTx) -> Result<(), MarketError> {
776 let FulfillmentTx { root, unlocked_requests, fulfillments, assessor_receipt, withdraw } =
777 tx;
778 let price = !unlocked_requests.is_empty();
779 let request_ids = fulfillments.iter().map(|fill| fill.id).collect::<Vec<_>>();
780
781 match root {
782 None => match (price, withdraw) {
783 (false, false) => {
784 tracing::debug!("Fulfilling requests {:?} with fulfill", request_ids);
785 self._fulfill(fulfillments, assessor_receipt).await
786 }
787 (false, true) => {
788 tracing::debug!(
789 "Fulfilling requests {:?} with fulfill and withdraw",
790 request_ids
791 );
792 self.fulfill_and_withdraw(fulfillments, assessor_receipt).await
793 }
794 (true, false) => {
795 tracing::debug!("Fulfilling requests {:?} with price and fulfill", request_ids);
796 self.price_and_fulfill(unlocked_requests, fulfillments, assessor_receipt).await
797 }
798 (true, true) => {
799 tracing::debug!(
800 "Fulfilling requests {:?} with price and fulfill and withdraw",
801 request_ids
802 );
803 self.price_and_fulfill_and_withdraw(
804 unlocked_requests,
805 fulfillments,
806 assessor_receipt,
807 )
808 .await
809 }
810 },
811 Some(root) => match (price, withdraw) {
812 (false, false) => {
813 tracing::debug!(
814 "Fulfilling requests {:?} with submitting root and fulfill",
815 request_ids
816 );
817 self.submit_root_and_fulfill(root, fulfillments, assessor_receipt).await
818 }
819 (false, true) => {
820 tracing::debug!(
821 "Fulfilling requests {:?} with submitting root and fulfill and withdraw",
822 request_ids
823 );
824 self.submit_root_and_fulfill_and_withdraw(root, fulfillments, assessor_receipt)
825 .await
826 }
827 (true, false) => {
828 tracing::debug!(
829 "Fulfilling requests {:?} with submitting root and price and fulfill",
830 request_ids
831 );
832 self.submit_root_and_price_fulfill(
833 root,
834 unlocked_requests,
835 fulfillments,
836 assessor_receipt,
837 )
838 .await
839 }
840 (true, true) => {
841 tracing::debug!("Fulfilling requests {:?} with submitting root and price and fulfill and withdraw", request_ids);
842 self.submit_root_and_price_fulfill_and_withdraw(
843 root,
844 unlocked_requests,
845 fulfillments,
846 assessor_receipt,
847 )
848 .await
849 }
850 },
851 }
852 }
853
854 async fn _fulfill(
858 &self,
859 fulfillments: Vec<Fulfillment>,
860 assessor_fill: AssessorReceipt,
861 ) -> Result<(), MarketError> {
862 let fill_ids = fulfillments.iter().map(|fill| fill.id).collect::<Vec<_>>();
863 tracing::trace!("Calling fulfill({fulfillments:?}, {assessor_fill:?})");
864 let call = self.instance.fulfill(fulfillments, assessor_fill).from(self.caller);
865 tracing::trace!("Calldata: {:x}", call.calldata());
866 let pending_tx = call.send().await?;
867 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
868
869 let receipt = self.get_receipt_with_retry(pending_tx).await?;
870
871 tracing::info!("Submitted proof for batch {:?}: {}", fill_ids, receipt.transaction_hash);
872
873 validate_fulfill_receipt(receipt)
874 }
875
876 async fn fulfill_and_withdraw(
880 &self,
881 fulfillments: Vec<Fulfillment>,
882 assessor_fill: AssessorReceipt,
883 ) -> Result<(), MarketError> {
884 let fill_ids = fulfillments.iter().map(|fill| fill.id).collect::<Vec<_>>();
885 tracing::trace!("Calling fulfillAndWithdraw({fulfillments:?}, {assessor_fill:?})");
886 let call = self.instance.fulfillAndWithdraw(fulfillments, assessor_fill).from(self.caller);
887 tracing::trace!("Calldata: {:x}", call.calldata());
888 let pending_tx = call.send().await?;
889 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
890
891 let receipt = self.get_receipt_with_retry(pending_tx).await?;
892
893 tracing::info!("Submitted proof for batch {:?}: {}", fill_ids, receipt.transaction_hash);
894
895 validate_fulfill_receipt(receipt)
896 }
897
898 async fn submit_root_and_fulfill(
901 &self,
902 root: Root,
903 fulfillments: Vec<Fulfillment>,
904 assessor_fill: AssessorReceipt,
905 ) -> Result<(), MarketError> {
906 tracing::trace!(
907 "Calling submitRootAndFulfill({:?}, {:x}, {fulfillments:?}, {assessor_fill:?})",
908 root.root,
909 root.seal
910 );
911 let call = self
912 .instance
913 .submitRootAndFulfill(
914 root.verifier_address,
915 root.root,
916 root.seal,
917 fulfillments,
918 assessor_fill,
919 )
920 .from(self.caller);
921 tracing::trace!("Calldata: {}", call.calldata());
922 let pending_tx = call.send().await?;
923 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
924 let tx_receipt = self.get_receipt_with_retry(pending_tx).await?;
925
926 tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash);
927
928 validate_fulfill_receipt(tx_receipt)
929 }
930
931 async fn submit_root_and_fulfill_and_withdraw(
934 &self,
935 root: Root,
936 fulfillments: Vec<Fulfillment>,
937 assessor_fill: AssessorReceipt,
938 ) -> Result<(), MarketError> {
939 tracing::trace!("Calling submitRootAndFulfillAndWithdraw({:?}, {:x}, {fulfillments:?}, {assessor_fill:?})", root.root, root.seal);
940 let call = self
941 .instance
942 .submitRootAndFulfillAndWithdraw(
943 root.verifier_address,
944 root.root,
945 root.seal,
946 fulfillments,
947 assessor_fill,
948 )
949 .from(self.caller);
950 tracing::trace!("Calldata: {}", call.calldata());
951 let pending_tx = call.send().await?;
952 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
953 let tx_receipt = self.get_receipt_with_retry(pending_tx).await?;
954
955 tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash);
956
957 validate_fulfill_receipt(tx_receipt)
958 }
959
960 async fn price_and_fulfill(
964 &self,
965 unlocked_requests: Vec<UnlockedRequest>,
966 fulfillments: Vec<Fulfillment>,
967 assessor_fill: AssessorReceipt,
968 ) -> Result<(), MarketError> {
969 tracing::trace!("Calling priceAndFulfill({fulfillments:?}, {assessor_fill:?})");
970
971 let (requests, client_sigs): (Vec<_>, Vec<_>) =
972 unlocked_requests.into_iter().map(|ur| (ur.request, ur.client_sig)).unzip();
973 let call = self
974 .instance
975 .priceAndFulfill(requests, client_sigs, fulfillments, assessor_fill)
976 .from(self.caller);
977 tracing::trace!("Calldata: {}", call.calldata());
978
979 let pending_tx = call.send().await?;
980 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
981
982 let tx_receipt = self.get_receipt_with_retry(pending_tx).await?;
983
984 tracing::info!("Fulfilled proof for batch {}", tx_receipt.transaction_hash);
985
986 validate_fulfill_receipt(tx_receipt)
987 }
988
989 async fn price_and_fulfill_and_withdraw(
993 &self,
994 unlocked_requests: Vec<UnlockedRequest>,
995 fulfillments: Vec<Fulfillment>,
996 assessor_fill: AssessorReceipt,
997 ) -> Result<(), MarketError> {
998 tracing::trace!("Calling priceAndFulfillAndWithdraw({fulfillments:?}, {assessor_fill:?})");
999
1000 let (requests, client_sigs): (Vec<_>, Vec<_>) =
1001 unlocked_requests.into_iter().map(|ur| (ur.request, ur.client_sig)).unzip();
1002 let call = self
1003 .instance
1004 .priceAndFulfillAndWithdraw(requests, client_sigs, fulfillments, assessor_fill)
1005 .from(self.caller);
1006 tracing::trace!("Calldata: {}", call.calldata());
1007
1008 let pending_tx = call.send().await?;
1009 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
1010
1011 let tx_receipt = self.get_receipt_with_retry(pending_tx).await?;
1012
1013 tracing::info!("Fulfilled proof for batch {}", tx_receipt.transaction_hash);
1014
1015 validate_fulfill_receipt(tx_receipt)
1016 }
1017
1018 async fn submit_root_and_price_fulfill(
1021 &self,
1022 root: Root,
1023 unlocked_requests: Vec<UnlockedRequest>,
1024 fulfillments: Vec<Fulfillment>,
1025 assessor_fill: AssessorReceipt,
1026 ) -> Result<(), MarketError> {
1027 let (requests, client_sigs): (Vec<_>, Vec<_>) =
1028 unlocked_requests.into_iter().map(|ur| (ur.request, ur.client_sig)).unzip();
1029 tracing::trace!("Calling submitRootAndPriceAndFulfill({:?}, {:x}, {:?}, {:?}, {fulfillments:?}, {assessor_fill:?})", root.root, root.seal, requests, client_sigs);
1030 let call = self
1031 .instance
1032 .submitRootAndPriceAndFulfill(
1033 root.verifier_address,
1034 root.root,
1035 root.seal,
1036 requests,
1037 client_sigs,
1038 fulfillments,
1039 assessor_fill,
1040 )
1041 .from(self.caller);
1042 tracing::trace!("Calldata: {}", call.calldata());
1043 let pending_tx = call.send().await?;
1044 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
1045 let tx_receipt = pending_tx
1046 .with_timeout(Some(self.timeout))
1047 .get_receipt()
1048 .await
1049 .context("failed to confirm tx")
1050 .map_err(MarketError::TxnConfirmationError)?;
1051
1052 tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash);
1053
1054 validate_fulfill_receipt(tx_receipt)
1055 }
1056
1057 async fn submit_root_and_price_fulfill_and_withdraw(
1060 &self,
1061 root: Root,
1062 unlocked_requests: Vec<UnlockedRequest>,
1063 fulfillments: Vec<Fulfillment>,
1064 assessor_fill: AssessorReceipt,
1065 ) -> Result<(), MarketError> {
1066 let (requests, client_sigs): (Vec<_>, Vec<_>) =
1067 unlocked_requests.into_iter().map(|ur| (ur.request, ur.client_sig)).unzip();
1068 tracing::trace!("Calling submitRootAndPriceAndFulfillAndWithdraw({:?}, {:x}, {:?}, {:?}, {fulfillments:?}, {assessor_fill:?})", root.root, root.seal, requests, client_sigs);
1069 let call = self
1070 .instance
1071 .submitRootAndPriceAndFulfillAndWithdraw(
1072 root.verifier_address,
1073 root.root,
1074 root.seal,
1075 requests,
1076 client_sigs,
1077 fulfillments,
1078 assessor_fill,
1079 )
1080 .from(self.caller);
1081 tracing::trace!("Calldata: {}", call.calldata());
1082 let pending_tx = call.send().await?;
1083 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
1084 let tx_receipt = pending_tx
1085 .with_timeout(Some(self.timeout))
1086 .get_receipt()
1087 .await
1088 .context("failed to confirm tx")
1089 .map_err(MarketError::TxnConfirmationError)?;
1090
1091 tracing::info!("Submitted merkle root and proof for batch {}", tx_receipt.transaction_hash);
1092
1093 validate_fulfill_receipt(tx_receipt)
1094 }
1095
1096 pub async fn is_locked(&self, request_id: U256) -> Result<bool, MarketError> {
1098 tracing::trace!("Calling requestIsLocked({:x})", request_id);
1099 let res = self.instance.requestIsLocked(request_id).call().await?;
1100
1101 Ok(res)
1102 }
1103
1104 pub async fn is_fulfilled(&self, request_id: U256) -> Result<bool, MarketError> {
1106 tracing::trace!("Calling requestIsFulfilled({:x})", request_id);
1107 let res = self.instance.requestIsFulfilled(request_id).call().await?;
1108
1109 Ok(res)
1110 }
1111
1112 pub async fn is_slashed(&self, request_id: U256) -> Result<bool, MarketError> {
1114 tracing::trace!("Calling requestIsSlashed({:x})", request_id);
1115 let res = self.instance.requestIsSlashed(request_id).call().await?;
1116
1117 Ok(res)
1118 }
1119
1120 pub async fn get_status(
1124 &self,
1125 request_id: U256,
1126 expires_at: Option<u64>,
1127 ) -> Result<RequestStatus, MarketError> {
1128 let timestamp = self.get_latest_block_timestamp().await?;
1129
1130 if self.is_fulfilled(request_id).await.context("Failed to check fulfillment status")? {
1131 return Ok(RequestStatus::Fulfilled);
1132 }
1133
1134 if let Some(expires_at) = expires_at {
1135 if timestamp > expires_at {
1136 return Ok(RequestStatus::Expired);
1137 }
1138 }
1139
1140 if self.is_locked(request_id).await.context("Failed to check locked status")? {
1141 let deadline = self.instance.requestDeadline(request_id).call().await?;
1142 if timestamp > deadline && deadline > 0 {
1143 return Ok(RequestStatus::Expired);
1144 };
1145 return Ok(RequestStatus::Locked);
1146 }
1147
1148 Ok(RequestStatus::Unknown)
1149 }
1150
1151 async fn get_latest_block_number(&self) -> Result<u64, MarketError> {
1152 Ok(self
1153 .instance
1154 .provider()
1155 .get_block_number()
1156 .await
1157 .context("Failed to get latest block number")?)
1158 }
1159
1160 async fn get_latest_block_timestamp(&self) -> Result<u64, MarketError> {
1161 let block = self
1162 .instance
1163 .provider()
1164 .get_block_by_number(BlockNumberOrTag::Latest)
1165 .await
1166 .context("failed to get block")?
1167 .context("failed to get block")?;
1168 Ok(block.header.timestamp())
1169 }
1170
1171 pub(crate) async fn retry_query<F, Fut, T>(
1173 &self,
1174 config: &EventQueryConfig,
1175 query_fn: F,
1176 function_name: &str,
1177 ) -> Result<T, MarketError>
1178 where
1179 F: Fn() -> Fut,
1180 Fut: std::future::Future<Output = Result<T, MarketError>>,
1181 MarketError: std::fmt::Debug,
1182 {
1183 let Some(retry_config) = &config.retry_config else {
1184 return query_fn().await;
1185 };
1186
1187 let retries = retry_config.retries;
1188 if retries == 0 {
1189 return query_fn().await;
1190 }
1191
1192 let mut last_error = None;
1193 for attempt in 0..=retries {
1194 match query_fn().await {
1195 Ok(result) => return Ok(result),
1196 Err(err) => {
1197 if attempt < retries {
1198 tracing::warn!(
1199 "Operation [{}] failed: {err}, starting retry {}/{}",
1200 function_name,
1201 attempt + 1,
1202 retries
1203 );
1204 if let Some(delay_ms) = retry_config.retry_delay_ms {
1205 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1206 }
1207 last_error = Some(err);
1208 continue;
1209 }
1210 last_error = Some(err);
1211 }
1212 }
1213 }
1214
1215 tracing::warn!(
1216 "Operation [{}] failed after {} retries, returning last error: {:?}",
1217 function_name,
1218 retries,
1219 last_error
1220 );
1221 Err(last_error.unwrap())
1222 }
1223
1224 #[allow(clippy::too_many_arguments)]
1227 pub(crate) async fn query_events_backwards<'a, E>(
1228 &self,
1229 config: &EventQueryConfig,
1230 event_name: &str,
1231 request_id: U256,
1232 filter_builder: impl Fn() -> alloy::contract::Event<&'a P, E, Ethereum>,
1233 lower_bound: Option<u64>,
1234 upper_bound: Option<u64>,
1235 error_fn: impl Fn(U256, u64, u64) -> MarketError,
1236 return_first_only: bool,
1237 ) -> Result<Vec<(E, Log)>, MarketError>
1238 where
1239 E: SolEvent + Clone,
1240 P: 'a,
1241 {
1242 let mut all_events = Vec::new();
1243 let mut upper_block = upper_bound.unwrap_or(self.get_latest_block_number().await?);
1244 let initial_upper_block = upper_block;
1245 let start_block = lower_bound
1246 .unwrap_or(upper_block.saturating_sub(config.block_range * config.max_iterations));
1247
1248 let iterations = if lower_bound.is_some() {
1251 ((upper_block - start_block) / config.block_range).saturating_add(1)
1252 } else {
1253 config.max_iterations
1254 };
1255
1256 tracing::debug!(
1257 "Querying {} events for request ID {:x} in blocks {} to {} [iterations: {}, first_only: {}]",
1258 event_name,
1259 request_id,
1260 start_block,
1261 upper_block,
1262 iterations,
1263 return_first_only
1264 );
1265 let mut final_block_checked = initial_upper_block;
1266 for _ in 0..iterations {
1267 if upper_block <= start_block {
1269 break;
1270 }
1271
1272 let lower_block = upper_block.saturating_sub(config.block_range);
1274
1275 let mut event_filter = filter_builder();
1276 tracing::trace!(
1277 "Querying {} events for request ID {:x} in blocks {} to {}",
1278 event_name,
1279 request_id,
1280 lower_block,
1281 upper_block
1282 );
1283
1284 event_filter.filter = event_filter
1285 .filter
1286 .topic1(request_id)
1287 .from_block(lower_block)
1288 .to_block(upper_block);
1289
1290 let logs = event_filter.query().await?;
1291
1292 if return_first_only {
1293 if let Some((event, log_meta)) = logs.first() {
1295 return Ok(vec![(event.clone(), log_meta.clone())]);
1296 }
1297 } else {
1298 all_events.extend(logs);
1300 }
1301
1302 final_block_checked = lower_block;
1303 upper_block = lower_block.saturating_sub(1);
1305 }
1306
1307 if all_events.is_empty() && return_first_only {
1309 Err(error_fn(request_id, initial_upper_block, final_block_checked))
1310 } else {
1311 Ok(all_events)
1312 }
1313 }
1314
1315 #[allow(clippy::too_many_arguments)]
1317 pub(crate) async fn query_event_backwards<'a, E>(
1318 &self,
1319 config: &EventQueryConfig,
1320 event_name: &str,
1321 request_id: U256,
1322 filter_builder: impl Fn() -> alloy::contract::Event<&'a P, E, Ethereum>,
1323 lower_bound: Option<u64>,
1324 upper_bound: Option<u64>,
1325 error_fn: impl Fn(U256, u64, u64) -> MarketError,
1326 ) -> Result<(E, Log), MarketError>
1327 where
1328 E: SolEvent + Clone,
1329 P: 'a,
1330 {
1331 let events = self
1332 .query_events_backwards(
1333 config,
1334 event_name,
1335 request_id,
1336 filter_builder,
1337 lower_bound,
1338 upper_bound,
1339 error_fn,
1340 true,
1341 )
1342 .await?;
1343
1344 events.into_iter().next().ok_or_else(|| MarketError::RequestNotFound(request_id, 0, 0))
1346 }
1347
1348 async fn query_fulfilled_event(
1356 &self,
1357 request_id: U256,
1358 lower_bound: Option<u64>,
1359 upper_bound: Option<u64>,
1360 ) -> Result<ProofDelivered, MarketError> {
1361 let config = self.event_query_config.clone();
1362 self.retry_query(
1363 &config,
1364 || async {
1365 let (event, _log_meta) = self
1366 .query_event_backwards(
1367 &config,
1368 "ProofDelivered",
1369 request_id,
1370 || self.instance.ProofDelivered_filter(),
1371 lower_bound,
1372 upper_bound,
1373 MarketError::ProofNotFound,
1374 )
1375 .await?;
1376
1377 Ok(event)
1378 },
1379 "query_fulfilled_event",
1380 )
1381 .await
1382 }
1383
1384 pub async fn query_request_submitted_event(
1393 &self,
1394 request_id: U256,
1395 lower_bound: Option<u64>,
1396 upper_bound: Option<u64>,
1397 ) -> Result<RequestSubmittedEventData, MarketError> {
1398 let config = self.event_query_config.clone();
1399 self.retry_query(
1400 &config,
1401 || async {
1402 let (event, log_meta) = self
1403 .query_event_backwards(
1404 &config,
1405 "RequestSubmitted",
1406 request_id,
1407 || self.instance.RequestSubmitted_filter(),
1408 lower_bound,
1409 upper_bound,
1410 MarketError::RequestNotFound,
1411 )
1412 .await?;
1413
1414 Ok(RequestSubmittedEventData {
1415 request: event.request,
1416 client_signature: event.clientSignature,
1417 block_number: log_meta.block_number.unwrap_or(0),
1418 tx_hash: log_meta.transaction_hash.unwrap_or(B256::ZERO),
1419 })
1420 },
1421 "query_request_submitted_event",
1422 )
1423 .await
1424 }
1425
1426 pub async fn query_request_locked_event(
1437 &self,
1438 request_id: U256,
1439 lower_bound: Option<u64>,
1440 upper_bound: Option<u64>,
1441 ) -> Result<RequestLockedEventData, MarketError> {
1442 let config = self.event_query_config.clone();
1443 self.retry_query(
1444 &config,
1445 || async {
1446 let (event, log_meta) = self
1447 .query_event_backwards(
1448 &config,
1449 "RequestLocked",
1450 request_id,
1451 || self.instance.RequestLocked_filter(),
1452 lower_bound,
1453 upper_bound,
1454 MarketError::RequestNotFound,
1455 )
1456 .await?;
1457
1458 Ok(RequestLockedEventData {
1459 event,
1460 block_number: log_meta.block_number.unwrap_or(0),
1461 tx_hash: log_meta.transaction_hash.unwrap_or(B256::ZERO),
1462 })
1463 },
1464 "query_request_locked_event",
1465 )
1466 .await
1467 }
1468
1469 pub async fn query_request_fulfilled_event(
1475 &self,
1476 request_id: U256,
1477 lower_bound: Option<u64>,
1478 upper_bound: Option<u64>,
1479 ) -> Result<RequestFulfilledEventData, MarketError> {
1480 let config = self.event_query_config.clone();
1481 self.retry_query(
1482 &config,
1483 || async {
1484 let (event, log_meta) = self
1485 .query_event_backwards(
1486 &config,
1487 "RequestFulfilled",
1488 request_id,
1489 || self.instance.RequestFulfilled_filter(),
1490 lower_bound,
1491 upper_bound,
1492 MarketError::RequestNotFound,
1493 )
1494 .await?;
1495
1496 Ok(RequestFulfilledEventData {
1497 event,
1498 block_number: log_meta.block_number.unwrap_or(0),
1499 tx_hash: log_meta.transaction_hash.unwrap_or(B256::ZERO),
1500 })
1501 },
1502 "query_request_fulfilled_event",
1503 )
1504 .await
1505 }
1506
1507 pub async fn query_all_proof_delivered_events(
1513 &self,
1514 request_id: U256,
1515 lower_bound: Option<u64>,
1516 upper_bound: Option<u64>,
1517 ) -> Result<Vec<ProofDeliveredEventData>, MarketError> {
1518 let config = self.event_query_config.clone();
1519 self.retry_query(
1520 &config,
1521 || async {
1522 let events = self
1523 .query_events_backwards(
1524 &config,
1525 "ProofDelivered",
1526 request_id,
1527 || self.instance.ProofDelivered_filter(),
1528 lower_bound,
1529 upper_bound,
1530 |_, _, _| unreachable!(),
1531 false,
1532 )
1533 .await?;
1534
1535 Ok(events
1536 .into_iter()
1537 .map(|(event, log_meta)| ProofDeliveredEventData {
1538 event,
1539 block_number: log_meta.block_number.unwrap_or(0),
1540 tx_hash: log_meta.transaction_hash.unwrap_or(B256::ZERO),
1541 })
1542 .collect())
1543 },
1544 "query_all_proof_delivered_events",
1545 )
1546 .await
1547 }
1548
1549 pub async fn query_prover_slashed_event(
1553 &self,
1554 request_id: U256,
1555 lower_bound: Option<u64>,
1556 upper_bound: Option<u64>,
1557 ) -> Result<ProverSlashedEventData, MarketError> {
1558 let config = self.event_query_config.clone();
1559 self.retry_query(
1560 &config,
1561 || async {
1562 let (event, log_meta) = self
1563 .query_event_backwards(
1564 &config,
1565 "ProverSlashed",
1566 request_id,
1567 || self.instance.ProverSlashed_filter(),
1568 lower_bound,
1569 upper_bound,
1570 MarketError::RequestNotFound,
1571 )
1572 .await?;
1573
1574 Ok(ProverSlashedEventData {
1575 event,
1576 block_number: log_meta.block_number.unwrap_or(0),
1577 tx_hash: log_meta.transaction_hash.unwrap_or(B256::ZERO),
1578 })
1579 },
1580 "query_prover_slashed_event",
1581 )
1582 .await
1583 }
1584
1585 pub async fn get_request_fulfillment(
1587 &self,
1588 request_id: U256,
1589 lower_bound: Option<u64>,
1590 upper_bound: Option<u64>,
1591 ) -> Result<Fulfillment, MarketError> {
1592 match self.get_status(request_id, None).await? {
1593 RequestStatus::Expired => Err(MarketError::RequestHasExpired(request_id)),
1594 RequestStatus::Fulfilled => {
1595 let event =
1596 self.query_fulfilled_event(request_id, lower_bound, upper_bound).await?;
1597 Ok(event.fulfillment)
1598 }
1599 _ => Err(MarketError::RequestNotFulfilled(request_id)),
1600 }
1601 }
1602
1603 pub async fn get_request_fulfillment_prover(
1605 &self,
1606 request_id: U256,
1607 lower_bound: Option<u64>,
1608 upper_bound: Option<u64>,
1609 ) -> Result<Address, MarketError> {
1610 match self.get_status(request_id, None).await? {
1611 RequestStatus::Expired => Err(MarketError::RequestHasExpired(request_id)),
1612 RequestStatus::Fulfilled => {
1613 let event =
1614 self.query_fulfilled_event(request_id, lower_bound, upper_bound).await?;
1615 Ok(event.prover)
1616 }
1617 _ => Err(MarketError::RequestNotFulfilled(request_id)),
1618 }
1619 }
1620
1621 pub async fn get_submitted_request(
1623 &self,
1624 request_id: U256,
1625 tx_hash: Option<B256>,
1626 lower_bound: Option<u64>,
1627 upper_bound: Option<u64>,
1628 ) -> Result<(ProofRequest, Bytes), MarketError> {
1629 if let Some(tx_hash) = tx_hash {
1630 let tx_data = self
1631 .instance
1632 .provider()
1633 .get_transaction_by_hash(tx_hash)
1634 .await
1635 .context("Failed to get transaction")?
1636 .context("Transaction not found")?;
1637 let inputs = tx_data.input();
1638 let calldata = IBoundlessMarket::submitRequestCall::abi_decode(inputs)
1640 .context("Failed to decode input")?;
1641 return Ok((calldata.request, calldata.clientSignature));
1642 }
1643
1644 let data = self.query_request_submitted_event(request_id, lower_bound, upper_bound).await?;
1645 Ok((data.request, data.client_signature))
1646 }
1647
1648 pub async fn wait_for_request_fulfillment(
1654 &self,
1655 request_id: U256,
1656 retry_interval: Duration,
1657 expires_at: u64,
1658 ) -> Result<Fulfillment, MarketError> {
1659 loop {
1660 let status = self.get_status(request_id, Some(expires_at)).await?;
1661 match status {
1662 RequestStatus::Expired => return Err(MarketError::RequestHasExpired(request_id)),
1663 RequestStatus::Fulfilled => {
1664 let event = self.query_fulfilled_event(request_id, None, None).await?;
1665 return Ok(event.fulfillment);
1666 }
1667 _ => {
1668 tracing::info!(
1669 "Request {:x} status: {:?}. Retrying in {:?}",
1670 request_id,
1671 status,
1672 retry_interval
1673 );
1674 tokio::time::sleep(retry_interval).await;
1675 continue;
1676 }
1677 }
1678 }
1679 }
1680
1681 pub async fn index_from_nonce(&self) -> Result<u32, MarketError> {
1685 let nonce = self
1686 .instance
1687 .provider()
1688 .get_transaction_count(self.caller)
1689 .await
1690 .context(format!("Failed to get EOA nonce for {:?}", self.caller))?;
1691 let id: u32 = nonce.try_into().context("Failed to convert nonce to u32")?;
1692 let request_id = RequestId::u256(self.caller, id);
1693 match self.get_status(request_id, None).await? {
1694 RequestStatus::Unknown => Ok(id),
1695 _ => Err(MarketError::Error(anyhow!("index already in use"))),
1696 }
1697 }
1698
1699 pub async fn request_id_from_nonce(&self) -> Result<U256, MarketError> {
1703 let index = self.index_from_nonce().await?;
1704 Ok(RequestId::u256(self.caller, index))
1705 }
1706
1707 pub async fn index_from_rand(&self) -> Result<u32, MarketError> {
1712 let attempts = 10usize;
1713 for _ in 0..attempts {
1714 let id: u32 = rand::random();
1715 let request_id = RequestId::u256(self.caller, id);
1716 match self.get_status(request_id, None).await? {
1717 RequestStatus::Unknown => return Ok(id),
1718 _ => continue,
1719 }
1720 }
1721 Err(MarketError::Error(anyhow!(
1722 "failed to generate a unique index after {attempts} attempts"
1723 )))
1724 }
1725
1726 pub async fn request_id_from_rand(&self) -> Result<U256, MarketError> {
1730 let index = self.index_from_rand().await?;
1731 Ok(RequestId::u256(self.caller, index))
1732 }
1733
1734 pub async fn image_info(&self) -> Result<(B256, String)> {
1736 tracing::trace!("Calling imageInfo()");
1737 let (image_id, image_url) =
1738 self.instance.imageInfo().call().await.context("call failed")?.into();
1739
1740 Ok((image_id, image_url))
1741 }
1742
1743 pub async fn get_chain_id(&self) -> Result<u64, MarketError> {
1747 let mut id = self.chain_id.load(Ordering::Relaxed);
1748 if id != 0 {
1749 return Ok(id);
1750 }
1751 id = self.instance.provider().get_chain_id().await.context("failed to get chain ID")?;
1752 self.chain_id.store(id, Ordering::Relaxed);
1753 Ok(id)
1754 }
1755
1756 pub async fn approve_deposit_collateral(&self, value: U256) -> Result<()> {
1758 let spender = *self.instance.address();
1759 tracing::trace!("Calling approve({:?}, {})", spender, value);
1760 let token_address = self
1761 .instance
1762 .COLLATERAL_TOKEN_CONTRACT()
1763 .call()
1764 .await
1765 .context("COLLATERAL_TOKEN_CONTRACT call failed")?
1766 .0;
1767 let contract = IERC20::new(token_address.into(), self.instance.provider());
1768 let call = contract.approve(spender, value).from(self.caller);
1769 let pending_tx = call.send().await.map_err(IHitPointsErrors::decode_error)?;
1770 tracing::debug!("Broadcasting tx {}", pending_tx.tx_hash());
1771 let tx_hash = pending_tx
1772 .with_timeout(Some(self.timeout))
1773 .watch()
1774 .await
1775 .context("failed to confirm tx")?;
1776
1777 tracing::debug!(
1778 "Approved {} to spend {} of token 0x{:x}. Tx hash: {}",
1779 spender,
1780 value,
1781 token_address,
1782 tx_hash
1783 );
1784
1785 Ok(())
1786 }
1787
1788 pub async fn deposit_collateral(&self, value: U256) -> Result<(), MarketError> {
1793 tracing::trace!("Calling depositCollateral({})", value);
1794 let call = self.instance.depositCollateral(value);
1795 tracing::debug!("Sending tx {}", format!("{:?}", call));
1796 tracing::debug!("Market address: {:?}", self.instance.address());
1797 let pending_tx = call.send().await?;
1798 tracing::debug!(
1799 "Broadcasting {} collateral deposit to market {:?}. Tx hash: {}",
1800 value,
1801 self.instance.address(),
1802 pending_tx.tx_hash()
1803 );
1804 let tx_hash = pending_tx
1805 .with_timeout(Some(self.timeout))
1806 .watch()
1807 .await
1808 .context("failed to confirm tx")?;
1809 tracing::debug!(
1810 "Submitted {} collateral deposit to market {:?}. Tx hash: {}",
1811 value,
1812 self.instance.address(),
1813 tx_hash
1814 );
1815 Ok(())
1816 }
1817
1818 pub async fn deposit_collateral_with_permit(
1822 &self,
1823 value: U256,
1824 signer: &impl Signer,
1825 ) -> Result<(), MarketError> {
1826 if !collateral_token_supports_permit(self.get_chain_id().await?) {
1827 return Err(MarketError::Error(anyhow!("Collateral token does not support permit. Use approve_deposit_collateral and deposit_collateral instead.")));
1828 }
1829 let token_address = self
1830 .instance
1831 .COLLATERAL_TOKEN_CONTRACT()
1832 .call()
1833 .await
1834 .context("COLLATERAL_TOKEN_CONTRACT call failed")?
1835 .0;
1836 let contract = IERC20Permit::new(token_address.into(), self.instance.provider());
1837 let call = contract.nonces(self.caller());
1838 let nonce = call.call().await.map_err(IHitPointsErrors::decode_error)?;
1839 let block = self
1840 .instance
1841 .provider()
1842 .get_block_by_number(BlockNumberOrTag::Latest)
1843 .await
1844 .context("failed to get block")?
1845 .context("failed to get block")?;
1846 let deadline = U256::from(block.header.timestamp() + 1000);
1847 let permit = Permit {
1848 owner: self.caller(),
1849 spender: *self.instance().address(),
1850 value,
1851 nonce,
1852 deadline,
1853 };
1854 tracing::debug!("Permit: {:?}", permit);
1855 let domain_separator = contract.DOMAIN_SEPARATOR().call().await?;
1856 let sig = permit.sign(signer, domain_separator).await?.as_bytes();
1857 let r = B256::from_slice(&sig[..32]);
1858 let s = B256::from_slice(&sig[32..64]);
1859 let v: u8 = sig[64];
1860 tracing::trace!("Calling depositStakeWithPermit({})", value);
1861 let call = self.instance.depositCollateralWithPermit(value, deadline, v, r, s);
1862 let pending_tx = call.send().await?;
1863 tracing::debug!(
1864 "Broadcasting {} collateral deposit to market {:?}. Tx hash: {}",
1865 value,
1866 self.instance.address(),
1867 pending_tx.tx_hash()
1868 );
1869 let tx_hash = pending_tx
1870 .with_timeout(Some(self.timeout))
1871 .watch()
1872 .await
1873 .context("failed to confirm tx")?;
1874 tracing::debug!(
1875 "Submitted {} collateral deposit to market {:?}. Tx hash: {}",
1876 value,
1877 self.instance.address(),
1878 tx_hash
1879 );
1880 Ok(())
1881 }
1882
1883 pub async fn withdraw_collateral(&self, value: U256) -> Result<(), MarketError> {
1885 tracing::trace!("Calling withdrawStake({})", value);
1886 let call = self.instance.withdrawCollateral(value);
1887 let pending_tx = call.send().await?;
1888 tracing::debug!(
1889 "Broadcasting {} collateral withdraw to market {:?}. Tx hash: {}",
1890 value,
1891 self.instance.address(),
1892 pending_tx.tx_hash()
1893 );
1894 let tx_hash = pending_tx
1895 .with_timeout(Some(self.timeout))
1896 .watch()
1897 .await
1898 .context("failed to confirm tx")?;
1899 tracing::debug!(
1900 "Submitted {} collateral withdraw to market {:?}. Tx hash: {}",
1901 value,
1902 self.instance.address(),
1903 tx_hash
1904 );
1905 self.check_collateral_balance().await?;
1906 Ok(())
1907 }
1908
1909 pub async fn balance_of_collateral(
1911 &self,
1912 account: impl Into<Address>,
1913 ) -> Result<U256, MarketError> {
1914 let account = account.into();
1915 tracing::trace!("Calling balanceOfCollateral({})", account);
1916 let balance =
1917 self.instance.balanceOfCollateral(account).call().await.context("call failed")?;
1918 Ok(balance)
1919 }
1920
1921 async fn check_collateral_balance(&self) -> Result<(), MarketError> {
1924 let collateral_balance = self.balance_of_collateral(self.caller()).await?;
1925 if collateral_balance < self.balance_alert_config.error_threshold.unwrap_or(U256::ZERO) {
1926 tracing::error!(
1927 "[B-BAL-STK] collateral balance {} for {} < error threshold",
1928 collateral_balance,
1929 self.caller(),
1930 );
1931 } else if collateral_balance
1932 < self.balance_alert_config.warn_threshold.unwrap_or(U256::ZERO)
1933 {
1934 tracing::warn!(
1935 "[B-BAL-STK] collateral balance {} for {} < warning threshold",
1936 collateral_balance,
1937 self.caller(),
1938 );
1939 } else {
1940 tracing::trace!("collateral balance for {} is: {}", self.caller(), collateral_balance);
1941 }
1942 Ok(())
1943 }
1944
1945 pub async fn collateral_token_address(&self) -> Result<Address, MarketError> {
1947 tracing::trace!("Calling COLLATERAL_TOKEN_CONTRACT()");
1948 let address = self
1949 .instance
1950 .COLLATERAL_TOKEN_CONTRACT()
1951 .call()
1952 .await
1953 .context("COLLATERAL_TOKEN_CONTRACT call failed")?
1954 .0;
1955 Ok(address.into())
1956 }
1957
1958 pub async fn collateral_token_symbol(&self) -> Result<String, MarketError> {
1960 let address = self.collateral_token_address().await?;
1961 let contract = IERC20::new(address, self.instance.provider());
1962 let symbol = contract.symbol().call().await.context("Failed to get token symbol")?;
1963 Ok(symbol)
1964 }
1965
1966 pub async fn collateral_token_decimals(&self) -> Result<u8, MarketError> {
1968 let address = self.collateral_token_address().await?;
1969 let contract = IERC20::new(address, self.instance.provider());
1970 let decimals = contract.decimals().call().await.context("Failed to get token decimals")?;
1971 Ok(decimals)
1972 }
1973}
1974
1975impl Offer {
1976 pub fn time_at_price(&self, price: U256) -> Result<u64, MarketError> {
1978 let max_price = U256::from(self.maxPrice);
1979 let min_price = U256::from(self.minPrice);
1980
1981 if price > U256::from(max_price) {
1982 return Err(MarketError::Error(anyhow::anyhow!("Price cannot exceed max price")));
1983 }
1984
1985 if price <= min_price {
1986 return Ok(0);
1987 }
1988
1989 let rise = max_price - min_price;
1990 let run = U256::from(self.rampUpPeriod);
1991 let delta = ((price - min_price) * run).div_ceil(rise);
1992 let delta: u64 = delta.try_into().context("Failed to convert block delta to u64")?;
1993
1994 Ok(self.rampUpStart + delta)
1995 }
1996
1997 pub fn price_at(&self, timestamp: u64) -> Result<U256, MarketError> {
1999 Ok(crate::contracts::pricing::price_at_time(
2000 U256::from(self.minPrice),
2001 U256::from(self.maxPrice),
2002 self.rampUpStart,
2003 self.rampUpPeriod,
2004 self.lockTimeout,
2005 timestamp,
2006 ))
2007 }
2008
2009 pub fn deadline(&self) -> u64 {
2011 self.rampUpStart + (self.timeout as u64)
2012 }
2013
2014 pub fn lock_deadline(&self) -> u64 {
2022 self.rampUpStart + (self.lockTimeout as u64)
2023 }
2024
2025 pub fn collateral_reward_if_locked_and_not_fulfilled(&self) -> U256 {
2028 self.lockCollateral
2029 .checked_mul(U256::from(FRACTION_STAKE_NUMERATOR))
2030 .unwrap()
2031 .checked_div(U256::from(FRACTION_STAKE_DENOMINATOR))
2032 .unwrap()
2033 }
2034}
2035
2036#[derive(Debug, Clone)]
2037pub struct Root {
2039 pub verifier_address: Address,
2041 pub root: B256,
2043 pub seal: Bytes,
2045}
2046
2047#[derive(Debug, Clone)]
2048pub struct UnlockedRequest {
2050 pub request: ProofRequest,
2052 pub client_sig: Bytes,
2054}
2055
2056impl UnlockedRequest {
2057 pub fn new(request: ProofRequest, client_sig: impl Into<Bytes>) -> Self {
2059 Self { request, client_sig: client_sig.into() }
2060 }
2061}
2062
2063#[derive(Clone)]
2064#[non_exhaustive]
2065pub struct FulfillmentTx {
2071 pub root: Option<Root>,
2073 pub unlocked_requests: Vec<UnlockedRequest>,
2075 pub fulfillments: Vec<Fulfillment>,
2077 pub assessor_receipt: AssessorReceipt,
2079 pub withdraw: bool,
2081}
2082
2083impl FulfillmentTx {
2084 pub fn new(fulfillments: Vec<Fulfillment>, assessor_receipt: AssessorReceipt) -> Self {
2086 Self {
2087 root: None,
2088 unlocked_requests: Vec::new(),
2089 fulfillments,
2090 assessor_receipt,
2091 withdraw: false,
2092 }
2093 }
2094
2095 pub fn with_submit_root(
2097 self,
2098 verifier_address: impl Into<Address>,
2099 root: B256,
2100 seal: impl Into<Bytes>,
2101 ) -> Self {
2102 Self {
2103 root: Some(Root { verifier_address: verifier_address.into(), root, seal: seal.into() }),
2104 ..self
2105 }
2106 }
2107
2108 pub fn with_unlocked_request(self, unlocked_request: UnlockedRequest) -> Self {
2110 let mut requests = self.unlocked_requests;
2111 requests.push(unlocked_request);
2112 Self { unlocked_requests: requests, ..self }
2113 }
2114
2115 pub fn with_unlocked_requests(self, unlocked_requests: Vec<UnlockedRequest>) -> Self {
2117 let mut requests = self.unlocked_requests;
2118 requests.extend(unlocked_requests);
2119 Self { unlocked_requests: requests, ..self }
2120 }
2121
2122 pub fn with_withdraw(self, withdraw: bool) -> Self {
2124 Self { withdraw, ..self }
2125 }
2126}
2127
2128#[cfg(test)]
2129mod tests {
2130 use crate::contracts::Offer;
2131 use alloy::primitives::{utils::parse_ether, U256};
2132 use tracing_test::traced_test;
2133 fn ether(value: &str) -> U256 {
2134 parse_ether(value).unwrap()
2135 }
2136
2137 fn test_offer(bidding_start: u64) -> Offer {
2138 Offer {
2139 minPrice: ether("1"),
2140 maxPrice: ether("2"),
2141 rampUpStart: bidding_start,
2142 rampUpPeriod: 100,
2143 timeout: 500,
2144 lockTimeout: 500,
2145 lockCollateral: ether("1"),
2146 }
2147 }
2148
2149 #[test]
2150 fn test_price_at() {
2151 let offer = &test_offer(100);
2152
2153 assert_eq!(offer.price_at(90).unwrap(), ether("1"));
2155
2156 assert_eq!(offer.price_at(100).unwrap(), ether("1"));
2157
2158 assert_eq!(offer.price_at(101).unwrap(), ether("1.01"));
2159 assert_eq!(offer.price_at(125).unwrap(), ether("1.25"));
2160 assert_eq!(offer.price_at(150).unwrap(), ether("1.5"));
2161 assert_eq!(offer.price_at(175).unwrap(), ether("1.75"));
2162 assert_eq!(offer.price_at(199).unwrap(), ether("1.99"));
2163
2164 assert_eq!(offer.price_at(200).unwrap(), ether("2"));
2165 assert_eq!(offer.price_at(500).unwrap(), ether("2"));
2166 }
2167
2168 #[test]
2169 fn test_time_at_price() {
2170 let offer = &test_offer(100);
2171
2172 assert_eq!(offer.time_at_price(ether("1")).unwrap(), 0);
2173
2174 assert_eq!(offer.time_at_price(ether("1.01")).unwrap(), 101);
2175 assert_eq!(offer.time_at_price(ether("1.001")).unwrap(), 101);
2176
2177 assert_eq!(offer.time_at_price(ether("1.25")).unwrap(), 125);
2178 assert_eq!(offer.time_at_price(ether("1.5")).unwrap(), 150);
2179 assert_eq!(offer.time_at_price(ether("1.75")).unwrap(), 175);
2180 assert_eq!(offer.time_at_price(ether("1.99")).unwrap(), 199);
2181 assert_eq!(offer.time_at_price(ether("2")).unwrap(), 200);
2182
2183 assert!(offer.time_at_price(ether("3")).is_err());
2185 }
2186
2187 #[test]
2188 fn test_collateral_reward_if_locked_and_not_fulfilled() {
2189 let offer = &test_offer(100);
2190 assert_eq!(offer.collateral_reward_if_locked_and_not_fulfilled(), ether("0.5"));
2191 }
2192
2193 #[tokio::test]
2194 #[traced_test]
2195 async fn test_retry_query_success_after_retry() {
2196 use super::*;
2197 use std::sync::{
2198 atomic::{AtomicU32, Ordering},
2199 Arc,
2200 };
2201
2202 let config = EventQueryConfig {
2203 block_range: 1000,
2204 max_iterations: 100,
2205 retry_config: Some(EventRetryConfig { retries: 2, retry_delay_ms: None }), };
2207
2208 let counter = Arc::new(AtomicU32::new(0));
2209 let counter_clone = counter.clone();
2210
2211 let anvil = alloy::node_bindings::Anvil::new().spawn();
2212 let provider = alloy::providers::ProviderBuilder::new().connect_http(anvil.endpoint_url());
2213 let service = BoundlessMarketService::new(Address::ZERO, provider, Address::ZERO);
2214
2215 let result = service
2216 .retry_query(
2217 &config,
2218 || {
2219 let counter = counter_clone.clone();
2220 async move {
2221 let current = counter.fetch_add(1, Ordering::SeqCst);
2222 if current == 0 || current == 1 {
2223 Err(MarketError::Error(anyhow::anyhow!("Attempt {} failed", current)))
2224 } else {
2225 Ok(current)
2226 }
2227 }
2228 },
2229 "test_operation",
2230 )
2231 .await;
2232
2233 assert!(result.is_ok());
2234 assert_eq!(result.unwrap(), 2);
2235 assert_eq!(counter.load(Ordering::SeqCst), 3); assert!(logs_contain("Operation [test_operation] failed"));
2239 assert!(logs_contain("starting retry 1/2"));
2240 assert!(logs_contain("starting retry 2/2"));
2241 }
2242
2243 #[tokio::test]
2244 #[traced_test]
2245 async fn test_query_fulfilled_event_retries() {
2246 use super::*;
2247 use alloy::node_bindings::Anvil;
2248 use boundless_test_utils::market::create_test_ctx;
2249
2250 let anvil = Anvil::new().spawn();
2251 let ctx = create_test_ctx(&anvil).await.unwrap();
2252
2253 let mut market_service = BoundlessMarketService::new(
2255 ctx.deployment.boundless_market_address,
2256 ctx.prover_provider.clone(),
2257 ctx.prover_signer.address(),
2258 );
2259 market_service = market_service.with_event_query_config(EventQueryConfig {
2260 block_range: 1000,
2261 max_iterations: 10,
2262 retry_config: Some(EventRetryConfig { retries: 1, retry_delay_ms: None }), });
2264
2265 let non_existent_id = U256::from(999999);
2267 let result = market_service.query_fulfilled_event(non_existent_id, None, None).await;
2268
2269 assert!(result.is_err());
2271
2272 assert!(logs_contain("Operation [query_fulfilled_event] failed"));
2274 assert!(logs_contain("starting retry 1/1"));
2275 assert!(logs_contain("failed after 1 retries"));
2276 }
2277}