1use std::collections::HashMap;
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use backon::{ExponentialBuilder, Retryable};
7use bytes::Bytes;
8use xenith_core::{
9 wire, ChainId, ConflictResolver, KeyMetadata, MessageId, MessagingTransport, ReadStrategy,
10 Result, SendOptions, StateKey, StateStore, StateValue, StateVersion, SyncStatus, SyncedState,
11 XenithError,
12};
13use xenith_read::MultiChainReader;
14
15use crate::subscription::SubscriptionHandle;
16
17pub struct SyncConfig {
19 pub retry_attempts: u32,
21 pub retry_delay_ms: u64,
23 pub default_strategy: ReadStrategy,
25}
26
27impl Default for SyncConfig {
28 fn default() -> Self {
29 Self {
30 retry_attempts: 3,
31 retry_delay_ms: 500,
32 default_strategy: ReadStrategy::Latest,
33 }
34 }
35}
36
37#[derive(Clone, Debug)]
48pub struct SyncReceipt {
49 pub key: StateKey,
50 pub successes: Vec<(ChainId, MessageId)>,
52 pub failures: Vec<(ChainId, XenithError)>,
54 pub status: SyncStatus,
55 pub store_written: bool,
57}
58
59pub struct SyncEngine {
80 pub transport: Arc<dyn MessagingTransport>,
81 pub store: Arc<dyn StateStore>,
82 pub config: SyncConfig,
83 reader: Option<Arc<MultiChainReader>>,
84}
85
86impl SyncEngine {
87 pub fn new(
88 transport: Arc<dyn MessagingTransport>,
89 store: Arc<dyn StateStore>,
90 config: SyncConfig,
91 ) -> Self {
92 Self {
93 transport,
94 store,
95 config,
96 reader: None,
97 }
98 }
99
100 pub fn new_with_reader(
105 transport: Arc<dyn MessagingTransport>,
106 store: Arc<dyn StateStore>,
107 config: SyncConfig,
108 reader: MultiChainReader,
109 ) -> Self {
110 Self {
111 transport,
112 store,
113 config,
114 reader: Some(Arc::new(reader)),
115 }
116 }
117
118 pub async fn push(
131 &self,
132 key: StateKey,
133 value: Bytes,
134 targets: Vec<ChainId>,
135 source: ChainId,
136 metadata: Option<KeyMetadata>,
137 ) -> Result<SyncReceipt> {
138 let ts_ms = unix_now_ms()?;
139
140 let state_value = StateValue {
141 data: value.clone(),
142 version: StateVersion {
143 timestamp_ms: ts_ms,
144 sequence: 0,
145 source_chain: source.0,
146 },
147 updated_at: ts_ms / 1_000,
148 source_chain: source,
149 };
150
151 let payload = wire::encode(&key, &state_value, metadata.as_ref());
152
153 let mut successes: Vec<(ChainId, MessageId)> = Vec::with_capacity(targets.len());
154 let mut failures: Vec<(ChainId, XenithError)> = Vec::new();
155
156 for chain in &targets {
157 match send_with_retry(
158 Arc::clone(&self.transport),
159 *chain,
160 payload.clone(),
161 SendOptions::default(),
162 self.config.retry_attempts,
163 self.config.retry_delay_ms,
164 )
165 .await
166 {
167 Ok(id) => successes.push((*chain, id)),
168 Err(e) => failures.push((*chain, e)),
169 }
170 }
171
172 let store_written = targets.is_empty() || !successes.is_empty();
178 if store_written {
179 self.store.set(&key, state_value.clone()).await?;
180 if let Some(ref m) = metadata {
181 self.store.set_metadata(&key, m.clone()).await?;
182 }
183 }
184
185 let status = if failures.is_empty() {
186 successes
187 .first()
188 .map(|&(_, id)| SyncStatus::Pending { message_id: id })
189 .unwrap_or(SyncStatus::Synced)
190 } else {
191 SyncStatus::PartialFailure {
192 succeeded: successes.iter().map(|&(c, _)| c).collect(),
193 failed: failures.iter().map(|&(c, _)| c).collect(),
194 }
195 };
196
197 Ok(SyncReceipt {
198 key,
199 successes,
200 failures,
201 status,
202 store_written,
203 })
204 }
205
206 pub async fn read(&self, key: StateKey, strategy: ReadStrategy) -> Result<SyncedState> {
208 let value = self
209 .store
210 .get(&key)
211 .await?
212 .ok_or_else(|| XenithError::StoreError("key not found".into()))?;
213
214 let chains = vec![value.source_chain];
215
216 match strategy {
217 ReadStrategy::SourceOfTruth(chain) => {
218 let status = if value.source_chain == chain {
219 SyncStatus::Synced
220 } else {
221 SyncStatus::Diverged {
222 chains: vec![(value.source_chain, value.clone())],
223 }
224 };
225 Ok(SyncedState {
226 key,
227 value,
228 chains,
229 status,
230 })
231 }
232
233 ReadStrategy::Latest => Ok(SyncedState {
234 key,
235 value,
236 chains,
237 status: SyncStatus::Synced,
238 }),
239
240 ReadStrategy::Quorum(n) => {
241 let reader = self.reader.as_ref().ok_or_else(|| {
242 XenithError::StoreError(
243 "Quorum strategy requires a MultiChainReader — \
244 use SyncEngine::new_with_reader"
245 .into(),
246 )
247 })?;
248
249 let meta = self.store.get_metadata(&key).await?.ok_or_else(|| {
250 XenithError::StoreError(
251 "Quorum read requires KeyMetadata (address + slot) — \
252 call store.set_metadata before pushing"
253 .into(),
254 )
255 })?;
256
257 let address = meta
258 .address
259 .ok_or_else(|| XenithError::StoreError("KeyMetadata.address is None".into()))?;
260 let slot = meta
261 .slot
262 .ok_or_else(|| XenithError::StoreError("KeyMetadata.slot is None".into()))?;
263
264 let target_chains: Vec<ChainId> = reader.providers.keys().copied().collect();
265 let all_chains = target_chains.clone();
266 let readings = reader.read_parallel(target_chains, address, slot).await?;
267
268 let mut counts: HashMap<[u8; 32], usize> = HashMap::new();
270 for (_, raw) in &readings {
271 *counts.entry(*raw).or_insert(0) += 1;
272 }
273
274 let max_count = counts.values().copied().max().unwrap_or(0);
275 let status = if max_count >= n {
276 SyncStatus::Synced
277 } else {
278 let diverged: Vec<(ChainId, StateValue)> = readings
280 .iter()
281 .map(|(chain, raw)| {
282 (
283 *chain,
284 StateValue {
285 data: Bytes::copy_from_slice(raw.as_ref()),
286 version: value.version,
287 updated_at: value.updated_at,
288 source_chain: *chain,
289 },
290 )
291 })
292 .collect();
293 SyncStatus::Diverged { chains: diverged }
294 };
295
296 Ok(SyncedState {
297 key,
298 value,
299 chains: all_chains,
300 status,
301 })
302 }
303
304 ReadStrategy::Custom(f) => {
305 let resolved = f(vec![(value.source_chain, value)]);
306 let resolved_chain = resolved.source_chain;
307 Ok(SyncedState {
308 key,
309 chains: vec![resolved_chain],
310 value: resolved,
311 status: SyncStatus::Synced,
312 })
313 }
314 }
315 }
316
317 pub async fn resolve(
320 &self,
321 key: StateKey,
322 resolver: &dyn ConflictResolver,
323 ) -> Result<StateValue> {
324 let value = self
325 .store
326 .get(&key)
327 .await?
328 .ok_or_else(|| XenithError::StoreError("key not found".into()))?;
329
330 let candidates = vec![(value.source_chain, value)];
331 let resolved = resolver.resolve(&key, candidates).await?;
332 self.store.set(&key, resolved.clone()).await?;
333 Ok(resolved)
334 }
335
336 pub async fn subscribe<F, Fut>(
361 &self,
362 key: StateKey,
363 source: ChainId,
364 poll_interval_ms: u64,
365 handler: F,
366 ) -> Result<SubscriptionHandle>
367 where
368 F: Fn(StateValue) -> Fut + Send + 'static,
369 Fut: Future<Output = ()> + Send,
370 {
371 let store = Arc::clone(&self.store);
372 let transport = Arc::clone(&self.transport);
373 let key_clone = key.clone();
374 let interval = tokio::time::Duration::from_millis(poll_interval_ms);
375
376 let join_handle = tokio::spawn(async move {
377 let mut last_seen: Option<StateVersion> = None;
378 loop {
379 if let Ok(messages) = transport.poll_incoming().await {
381 for (incoming_key, incoming_value, incoming_metadata) in messages {
382 if incoming_key == key_clone && incoming_value.source_chain == source {
383 let _ = store.set(&key_clone, incoming_value.clone()).await;
384 if let Some(ref m) = incoming_metadata {
385 let _ = store.set_metadata(&key_clone, m.clone()).await;
386 }
387 last_seen = Some(incoming_value.version);
388 handler(incoming_value).await;
389 }
390 }
391 }
392
393 if let Ok(Some(value)) = store.get(&key_clone).await {
395 let is_newer = last_seen.map(|seen| value.version > seen).unwrap_or(true);
396 if is_newer {
397 last_seen = Some(value.version);
398 handler(value).await;
399 }
400 }
401
402 tokio::time::sleep(interval).await;
403 }
404 });
405
406 Ok(SubscriptionHandle::new(
407 key,
408 source,
409 join_handle.abort_handle(),
410 ))
411 }
412
413 pub async fn unsubscribe(&self, handle: SubscriptionHandle) {
415 handle.cancel();
416 }
417}
418
419async fn send_with_retry(
423 transport: Arc<dyn MessagingTransport>,
424 chain: ChainId,
425 payload: Bytes,
426 options: SendOptions,
427 attempts: u32,
428 delay_ms: u64,
429) -> Result<MessageId> {
430 let backoff = ExponentialBuilder::default()
431 .with_max_times(attempts as usize)
432 .with_min_delay(Duration::from_millis(delay_ms));
433
434 (|| {
435 let t = Arc::clone(&transport);
436 let p = payload.clone();
437 let o = options.clone();
438 async move { t.send_message(chain, p, o).await }
439 })
440 .retry(&backoff)
441 .when(|e| matches!(e, XenithError::Transport { .. }))
442 .notify(|e, dur| {
443 eprintln!(
444 "xenith: transient error on chain {}: {e}; retrying in {dur:?}",
445 chain.0
446 );
447 })
448 .await
449}
450
451fn unix_now_ms() -> Result<u64> {
452 SystemTime::now()
453 .duration_since(UNIX_EPOCH)
454 .map(|d| d.as_millis() as u64)
455 .map_err(|_| XenithError::StoreError("system clock is before the Unix epoch".into()))
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use xenith_core::{InMemoryStore, LatestVersionResolver, ReadStrategy};
462 use xenith_layerzero::LayerZeroTransport;
463
464 fn make_engine(chains: &[(u64, u32)]) -> SyncEngine {
465 let transport = Arc::new(LayerZeroTransport::new(
466 [0u8; 20],
467 chains
468 .iter()
469 .map(|&(c, eid)| (ChainId::from(c), eid))
470 .collect(),
471 ));
472 let store = Arc::new(InMemoryStore::default());
473 SyncEngine::new(transport, store, SyncConfig::default())
474 }
475
476 #[tokio::test]
477 async fn push_then_read_source_of_truth() {
478 let engine = make_engine(&[(42161, 30110)]);
480 let key = StateKey::new("uniswap", "pool", "0xabc");
481
482 let receipt = engine
483 .push(
484 key.clone(),
485 Bytes::from_static(b"price=100"),
486 vec![ChainId(42161)],
487 ChainId(1),
488 None,
489 )
490 .await
491 .unwrap();
492
493 assert_eq!(receipt.successes.len(), 1);
494 assert!(receipt.failures.is_empty());
495 assert!(matches!(receipt.status, SyncStatus::Pending { .. }));
496
497 let state = engine
499 .read(key, ReadStrategy::SourceOfTruth(ChainId(1)))
500 .await
501 .unwrap();
502 assert!(matches!(state.status, SyncStatus::Synced));
503 assert_eq!(state.value.data, Bytes::from_static(b"price=100"));
504 assert_eq!(state.value.source_chain, ChainId(1));
505 }
506
507 #[tokio::test]
508 async fn push_then_read_wrong_source_of_truth_is_diverged() {
509 let engine = make_engine(&[(42161, 30110)]);
510 let key = StateKey::new("uniswap", "pool", "0xdef");
511
512 engine
513 .push(
514 key.clone(),
515 Bytes::from_static(b"v"),
516 vec![ChainId(42161)],
517 ChainId(1),
518 None,
519 )
520 .await
521 .unwrap();
522
523 let state = engine
525 .read(key, ReadStrategy::SourceOfTruth(ChainId(42161)))
526 .await
527 .unwrap();
528 assert!(matches!(state.status, SyncStatus::Diverged { .. }));
529 }
530
531 #[tokio::test]
532 async fn push_then_resolve_latest_version() {
533 let engine = make_engine(&[(42161, 30110)]);
534 let key = StateKey::new("aave", "reserve", "0x1");
535
536 engine
537 .push(
538 key.clone(),
539 Bytes::from_static(b"ltv=0.8"),
540 vec![ChainId(42161)],
541 ChainId(1),
542 None,
543 )
544 .await
545 .unwrap();
546
547 let resolved = engine
548 .resolve(key.clone(), &LatestVersionResolver)
549 .await
550 .unwrap();
551 assert_eq!(resolved.data, Bytes::from_static(b"ltv=0.8"));
552
553 let state = engine.read(key, ReadStrategy::Latest).await.unwrap();
555 assert_eq!(state.value.data, Bytes::from_static(b"ltv=0.8"));
556 }
557
558 #[tokio::test]
559 async fn push_no_targets_yields_synced_receipt() {
560 let engine = make_engine(&[]);
561 let key = StateKey::new("proto", "x", "1");
562 let receipt = engine
563 .push(key, Bytes::from_static(b"d"), vec![], ChainId(1), None)
564 .await
565 .unwrap();
566 assert!(receipt.successes.is_empty());
567 assert!(receipt.failures.is_empty());
568 assert!(matches!(receipt.status, SyncStatus::Synced));
569 }
570
571 #[tokio::test]
572 async fn read_missing_key_returns_error() {
573 let engine = make_engine(&[]);
574 let err = engine
575 .read(StateKey::new("x", "y", "z"), ReadStrategy::Latest)
576 .await
577 .unwrap_err();
578 assert!(matches!(err, XenithError::StoreError(_)));
579 }
580
581 #[tokio::test]
582 async fn push_to_unsupported_chain_is_partial_failure() {
583 let engine = make_engine(&[]); let receipt = engine
585 .push(
586 StateKey::new("p", "q", "r"),
587 Bytes::from_static(b"x"),
588 vec![ChainId(42161)],
589 ChainId(1),
590 None,
591 )
592 .await
593 .unwrap();
594 assert_eq!(receipt.failures.len(), 1);
595 assert!(matches!(
596 receipt.failures[0].1,
597 XenithError::UnsupportedChain(_)
598 ));
599 assert!(matches!(receipt.status, SyncStatus::PartialFailure { .. }));
600 }
601
602 #[tokio::test]
603 async fn test_subscribe_fires_on_new_value() {
604 use tokio::sync::mpsc;
605
606 let store = Arc::new(InMemoryStore::default());
607 let transport = Arc::new(LayerZeroTransport::new([0u8; 20], vec![]));
608 let engine = SyncEngine::new(
609 transport,
610 Arc::clone(&store) as Arc<dyn StateStore>,
611 SyncConfig::default(),
612 );
613
614 let key = StateKey::new("test", "pos", "u1");
615 let (tx, mut rx) = mpsc::channel::<StateValue>(1);
616
617 let handle = engine
618 .subscribe(key.clone(), ChainId(1), 10, move |value| {
619 let tx = tx.clone();
620 async move {
621 let _ = tx.send(value).await;
622 }
623 })
624 .await
625 .unwrap();
626
627 let new_value = StateValue {
628 data: Bytes::from_static(b"new_data"),
629 version: StateVersion {
630 timestamp_ms: 1_000_000,
631 sequence: 0,
632 source_chain: 1,
633 },
634 updated_at: 1000,
635 source_chain: ChainId(1),
636 };
637 store.set(&key, new_value.clone()).await.unwrap();
638
639 let received = tokio::time::timeout(tokio::time::Duration::from_millis(200), rx.recv())
640 .await
641 .expect("timed out waiting for subscription event")
642 .expect("channel closed");
643
644 assert_eq!(received, new_value);
645 handle.cancel();
646 }
647
648 #[tokio::test]
649 async fn test_subscribe_fires_on_incoming_message() {
650 use std::sync::atomic::{AtomicBool, Ordering};
651 use tokio::sync::mpsc;
652
653 let key = StateKey::new("proto", "entity", "id1");
654 let incoming_value = StateValue {
655 data: Bytes::from_static(b"incoming_data"),
656 version: StateVersion {
657 timestamp_ms: 9_000_000,
658 sequence: 0,
659 source_chain: 1,
660 },
661 updated_at: 9000,
662 source_chain: ChainId(1),
663 };
664
665 struct MockTransport {
666 returned: AtomicBool,
667 message: (StateKey, StateValue, Option<xenith_core::KeyMetadata>),
668 }
669
670 #[async_trait::async_trait]
671 impl xenith_core::MessagingTransport for MockTransport {
672 async fn send_message(
673 &self,
674 destination: ChainId,
675 _: Bytes,
676 _: SendOptions,
677 ) -> xenith_core::Result<MessageId> {
678 Err(XenithError::UnsupportedChain(destination))
679 }
680 async fn estimate_fee(&self, _: ChainId, _: Bytes) -> xenith_core::Result<u128> {
681 Ok(0)
682 }
683 async fn message_status(
684 &self,
685 _: MessageId,
686 ) -> xenith_core::Result<xenith_core::MessageStatus> {
687 Ok(xenith_core::MessageStatus::Delivered)
688 }
689 fn sender_address(&self) -> Option<[u8; 20]> {
690 None
691 }
692 async fn poll_incoming(
693 &self,
694 ) -> xenith_core::Result<
695 Vec<(
696 xenith_core::StateKey,
697 xenith_core::StateValue,
698 Option<xenith_core::KeyMetadata>,
699 )>,
700 > {
701 if self.returned.swap(true, Ordering::SeqCst) {
702 Ok(vec![])
703 } else {
704 Ok(vec![self.message.clone()])
705 }
706 }
707 }
708
709 let (tx, mut rx) = mpsc::channel::<StateValue>(1);
710 let store = Arc::new(InMemoryStore::default());
711
712 let mock_transport = Arc::new(MockTransport {
713 returned: AtomicBool::new(false),
714 message: (key.clone(), incoming_value.clone(), None),
715 });
716
717 let engine = SyncEngine::new(
718 mock_transport as Arc<dyn xenith_core::MessagingTransport>,
719 Arc::clone(&store) as Arc<dyn StateStore>,
720 SyncConfig::default(),
721 );
722
723 let handle = engine
724 .subscribe(key.clone(), ChainId(1), 10, move |value| {
725 let tx = tx.clone();
726 async move {
727 let _ = tx.send(value).await;
728 }
729 })
730 .await
731 .unwrap();
732
733 let received = tokio::time::timeout(tokio::time::Duration::from_millis(200), rx.recv())
734 .await
735 .expect("timed out waiting for subscription event from transport")
736 .expect("channel closed");
737
738 assert_eq!(received.data, Bytes::from_static(b"incoming_data"));
739 assert_eq!(received.source_chain, ChainId(1));
740 handle.cancel();
741 }
742
743 struct FailNTimesTransport {
746 fail_count: Arc<std::sync::atomic::AtomicU32>,
747 fail_times: u32,
748 }
749
750 impl FailNTimesTransport {
751 fn new(fail_times: u32) -> (Arc<std::sync::atomic::AtomicU32>, Arc<Self>) {
752 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
753 let t = Arc::new(Self {
754 fail_count: Arc::clone(&counter),
755 fail_times,
756 });
757 (counter, t)
758 }
759 }
760
761 #[async_trait::async_trait]
762 impl xenith_core::MessagingTransport for FailNTimesTransport {
763 async fn send_message(
764 &self,
765 _destination: ChainId,
766 _payload: Bytes,
767 _options: SendOptions,
768 ) -> xenith_core::Result<MessageId> {
769 let prev = self
770 .fail_count
771 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
772 if prev < self.fail_times {
773 Err(XenithError::Transport {
774 chain: ChainId(1),
775 message: "transient".into(),
776 })
777 } else {
778 Ok(MessageId(prev as u64 + 1))
779 }
780 }
781
782 async fn estimate_fee(
783 &self,
784 _destination: ChainId,
785 _payload: Bytes,
786 ) -> xenith_core::Result<u128> {
787 Ok(0)
788 }
789
790 async fn message_status(
791 &self,
792 _message_id: MessageId,
793 ) -> xenith_core::Result<xenith_core::MessageStatus> {
794 Ok(xenith_core::MessageStatus::Delivered)
795 }
796
797 fn sender_address(&self) -> Option<[u8; 20]> {
798 None
799 }
800
801 async fn poll_incoming(
802 &self,
803 ) -> xenith_core::Result<
804 Vec<(
805 xenith_core::StateKey,
806 xenith_core::StateValue,
807 Option<xenith_core::KeyMetadata>,
808 )>,
809 > {
810 Ok(vec![])
811 }
812 }
813
814 #[tokio::test]
815 async fn push_retries_on_transport_error() {
816 let (call_count, transport) = FailNTimesTransport::new(2);
817 let engine = SyncEngine::new(
818 transport as Arc<dyn xenith_core::MessagingTransport>,
819 Arc::new(InMemoryStore::default()),
820 SyncConfig {
821 retry_attempts: 3,
822 retry_delay_ms: 1, ..SyncConfig::default()
824 },
825 );
826
827 let receipt = engine
828 .push(
829 StateKey::new("test", "retry", "1"),
830 Bytes::from_static(b"v"),
831 vec![ChainId(1)],
832 ChainId(1),
833 None,
834 )
835 .await
836 .unwrap();
837
838 assert_eq!(
840 call_count.load(std::sync::atomic::Ordering::SeqCst),
841 3,
842 "expected 3 calls (2 failures + 1 success)"
843 );
844 assert_eq!(receipt.successes.len(), 1);
845 assert!(receipt.failures.is_empty());
846 }
847
848 #[tokio::test]
849 async fn push_does_not_retry_unsupported_chain() {
850 let (call_count, transport) = FailNTimesTransport::new(u32::MAX);
852 let _engine = SyncEngine::new(
853 transport as Arc<dyn xenith_core::MessagingTransport>,
854 Arc::new(InMemoryStore::default()),
855 SyncConfig {
856 retry_attempts: 3,
857 retry_delay_ms: 1,
858 ..SyncConfig::default()
859 },
860 );
861
862 struct UnsupportedTransport;
869 #[async_trait::async_trait]
870 impl xenith_core::MessagingTransport for UnsupportedTransport {
871 async fn send_message(
872 &self,
873 destination: ChainId,
874 _payload: Bytes,
875 _options: SendOptions,
876 ) -> xenith_core::Result<MessageId> {
877 Err(XenithError::UnsupportedChain(destination))
878 }
879 async fn estimate_fee(
880 &self,
881 _dst: ChainId,
882 _payload: Bytes,
883 ) -> xenith_core::Result<u128> {
884 Ok(0)
885 }
886 async fn message_status(
887 &self,
888 _id: MessageId,
889 ) -> xenith_core::Result<xenith_core::MessageStatus> {
890 Ok(xenith_core::MessageStatus::Delivered)
891 }
892
893 fn sender_address(&self) -> Option<[u8; 20]> {
894 None
895 }
896
897 async fn poll_incoming(
898 &self,
899 ) -> xenith_core::Result<
900 Vec<(
901 xenith_core::StateKey,
902 xenith_core::StateValue,
903 Option<xenith_core::KeyMetadata>,
904 )>,
905 > {
906 Ok(vec![])
907 }
908 }
909
910 let engine2 = SyncEngine::new(
911 Arc::new(UnsupportedTransport) as Arc<dyn xenith_core::MessagingTransport>,
912 Arc::new(InMemoryStore::default()),
913 SyncConfig {
914 retry_attempts: 3,
915 retry_delay_ms: 1,
916 ..SyncConfig::default()
917 },
918 );
919
920 let receipt = engine2
921 .push(
922 StateKey::new("test", "no-retry", "1"),
923 Bytes::from_static(b"v"),
924 vec![ChainId(99)],
925 ChainId(1),
926 None,
927 )
928 .await
929 .unwrap();
930
931 assert_eq!(receipt.failures.len(), 1);
933 assert!(matches!(
934 receipt.failures[0].1,
935 XenithError::UnsupportedChain(_)
936 ));
937 let _ = call_count; }
939}