1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59
60 #[error("Internal error: {0}")]
62 Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68 fn from(err: SendError<T>) -> Self {
69 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70 }
71}
72
73impl From<DeltasError> for SynchronizerError {
74 fn from(err: DeltasError) -> Self {
75 match err {
76 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77 _ => SynchronizerError::ConnectionError(err.to_string()),
78 }
79 }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83 extractor_id: ExtractorIdentity,
84 retrieve_balances: bool,
85 rpc_client: R,
86 deltas_client: D,
87 max_retries: u64,
88 retry_cooldown: Duration,
89 include_snapshots: bool,
90 component_tracker: ComponentTracker<R>,
91 last_synced_block: Option<BlockHeader>,
92 timeout: u64,
93 include_tvl: bool,
94 compression: bool,
95 partial_blocks: bool,
96 uses_dci: bool,
97 deferred_snapshot_components: Vec<String>,
100}
101
102#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
103pub struct ComponentWithState {
104 pub state: ResponseProtocolState,
105 pub component: ProtocolComponent,
106 pub component_tvl: Option<f64>,
107 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
108}
109
110#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
111pub struct Snapshot {
112 pub states: HashMap<String, ComponentWithState>,
113 pub vm_storage: HashMap<Bytes, ResponseAccount>,
114}
115
116impl Snapshot {
117 fn extend(&mut self, other: Snapshot) {
118 self.states.extend(other.states);
119 self.vm_storage.extend(other.vm_storage);
120 }
121
122 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
123 &self.states
124 }
125
126 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
127 &self.vm_storage
128 }
129}
130
131#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
132pub struct StateSyncMessage<H>
133where
134 H: HeaderLike,
135{
136 pub header: H,
138 pub snapshots: Snapshot,
140 pub deltas: Option<BlockChanges>,
144 pub removed_components: HashMap<String, ProtocolComponent>,
146}
147
148impl<H> StateSyncMessage<H>
149where
150 H: HeaderLike,
151{
152 pub fn merge(mut self, other: Self) -> Self {
153 self.removed_components
155 .retain(|k, _| !other.snapshots.states.contains_key(k));
156 self.snapshots
157 .states
158 .retain(|k, _| !other.removed_components.contains_key(k));
159
160 self.snapshots.extend(other.snapshots);
161 let deltas = match (self.deltas, other.deltas) {
162 (Some(l), Some(r)) => Some(l.merge(r)),
163 (None, Some(r)) => Some(r),
164 (Some(l), None) => Some(l),
165 (None, None) => None,
166 };
167 self.removed_components
168 .extend(other.removed_components);
169 Self {
170 header: other.header,
171 snapshots: self.snapshots,
172 deltas,
173 removed_components: self.removed_components,
174 }
175 }
176}
177
178pub struct SynchronizerTaskHandle {
183 join_handle: JoinHandle<()>,
184 close_tx: oneshot::Sender<()>,
185}
186
187impl SynchronizerTaskHandle {
196 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
197 Self { join_handle, close_tx }
198 }
199
200 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
206 (self.join_handle, self.close_tx)
207 }
208}
209
210#[async_trait]
211pub trait StateSynchronizer: Send + Sync + 'static {
212 async fn initialize(&mut self) -> SyncResult<()>;
213 async fn start(
216 mut self,
217 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
218}
219
220impl<R, D> ProtocolStateSynchronizer<R, D>
221where
222 R: RPCClient + Clone + Send + Sync + 'static,
225 D: DeltasClient + Clone + Send + Sync + 'static,
226{
227 #[allow(clippy::too_many_arguments)]
229 pub fn new(
230 extractor_id: ExtractorIdentity,
231 retrieve_balances: bool,
232 component_filter: ComponentFilter,
233 max_retries: u64,
234 retry_cooldown: Duration,
235 include_snapshots: bool,
236 include_tvl: bool,
237 compression: bool,
238 rpc_client: R,
239 deltas_client: D,
240 timeout: u64,
241 ) -> Self {
242 Self {
243 extractor_id: extractor_id.clone(),
244 retrieve_balances,
245 rpc_client: rpc_client.clone(),
246 include_snapshots,
247 deltas_client,
248 component_tracker: ComponentTracker::new(
249 extractor_id.chain,
250 extractor_id.name.as_str(),
251 component_filter,
252 rpc_client,
253 ),
254 max_retries,
255 retry_cooldown,
256 last_synced_block: None,
257 timeout,
258 include_tvl,
259 compression,
260 partial_blocks: false,
261 uses_dci: false,
262 deferred_snapshot_components: Vec::new(),
263 }
264 }
265
266 pub fn with_dci(mut self, uses_dci: bool) -> Self {
269 self.uses_dci = uses_dci;
270 self
271 }
272
273 pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
275 self.partial_blocks = partial_blocks;
276 self
277 }
278
279 async fn take_flushed_deferred_snapshots(
283 &mut self,
284 header: &BlockHeader,
285 ) -> SyncResult<Snapshot> {
286 if !self.partial_blocks {
287 return Ok(Snapshot::default());
288 }
289 let prev = match self
290 .last_synced_block
291 .as_ref()
292 .filter(|p| header.number > p.number)
293 {
294 Some(p) => p,
295 None => return Ok(Snapshot::default()),
296 };
297 let deferred = std::mem::take(&mut self.deferred_snapshot_components);
298 if deferred.is_empty() {
299 return Ok(Snapshot::default());
300 }
301 let snapshot_header =
302 BlockHeader { number: prev.number, hash: prev.hash.clone(), ..Default::default() };
303 let msg = self
304 .get_snapshots(snapshot_header, Some(&deferred))
305 .await?;
306 Ok(msg.snapshots)
307 }
308
309 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
311 &mut self,
312 header: BlockHeader,
313 ids: Option<I>,
314 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
315 if !self.include_snapshots {
316 return Ok(StateSyncMessage { header, ..Default::default() });
317 }
318
319 let component_ids: Vec<_> = match ids {
321 Some(ids) => ids.into_iter().cloned().collect(),
322 None => self
323 .component_tracker
324 .get_tracked_component_ids(),
325 };
326
327 if component_ids.is_empty() {
328 return Ok(StateSyncMessage { header, ..Default::default() });
329 }
330
331 let entrypoints_result = if self.uses_dci {
332 let result = self
333 .rpc_client
334 .get_traced_entry_points_paginated(
335 self.extractor_id.chain,
336 &self.extractor_id.name,
337 &component_ids,
338 None,
339 RPC_CLIENT_CONCURRENCY,
340 )
341 .await?;
342 self.component_tracker
343 .process_entrypoints(&result.clone().into());
344 result.traced_entry_points.clone()
345 } else {
346 HashMap::new()
347 };
348
349 let contract_ids: Vec<Bytes> = self
351 .component_tracker
352 .get_contracts_by_component(&component_ids)
353 .into_iter()
354 .collect();
355
356 let filtered_components: HashMap<_, _> = self
358 .component_tracker
359 .components
360 .iter()
361 .filter(|(id, _)| component_ids.contains(id))
362 .map(|(k, v)| (k.clone(), v.clone()))
363 .collect();
364
365 let request = SnapshotParameters::new(
366 self.extractor_id.chain,
367 &self.extractor_id.name,
368 &filtered_components,
369 &contract_ids,
370 header.number,
371 )
372 .entrypoints(&entrypoints_result)
373 .include_balances(self.retrieve_balances)
374 .include_tvl(self.include_tvl);
375 let snapshot_response = self
376 .rpc_client
377 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
378 .await?;
379
380 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
381 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
382
383 Ok(StateSyncMessage {
384 header,
385 snapshots: snapshot_response,
386 deltas: None,
387 removed_components: HashMap::new(),
388 })
389 }
390
391 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
404 async fn state_sync(
405 &mut self,
406 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
407 mut end_rx: oneshot::Receiver<()>,
408 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
409 let subscription_options = SubscriptionOptions::new()
411 .with_state(self.include_snapshots)
412 .with_compression(self.compression)
413 .with_partial_blocks(self.partial_blocks);
414 let (subscription_id, mut msg_rx) = match self
415 .deltas_client
416 .subscribe(self.extractor_id.clone(), subscription_options)
417 .await
418 {
419 Ok(result) => result,
420 Err(e) => return Err((e.into(), Some(end_rx))),
421 };
422
423 let result = async {
424 info!("Waiting for deltas...");
425 let mut warned_waiting_for_new_block = false;
426 let mut warned_skipping_synced = false;
427 let mut last_block_number: Option<u64> = None;
429 let mut first_msg = loop {
430 let msg = select! {
431 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
432 deltas_result
433 .map_err(|_| {
434 SynchronizerError::Timeout(format!(
435 "First deltas took longer than {t}s to arrive",
436 t = self.timeout
437 ))
438 })?
439 .ok_or_else(|| {
440 SynchronizerError::ConnectionError(
441 "Deltas channel closed before first message".to_string(),
442 )
443 })?
444 },
445 _ = &mut end_rx => {
446 info!("Received close signal while waiting for first deltas");
447 return Ok(());
448 }
449 };
450
451 let incoming: BlockHeader = (&msg).into();
452
453 let is_new_block_candidate = if self.partial_blocks {
457 match msg.partial_block_index {
458 None => {
459 last_block_number = Some(incoming.number);
461 true
462 }
463 Some(current_partial_idx) => {
464 let is_new_block = last_block_number
465 .map(|prev_block| incoming.number > prev_block)
466 .unwrap_or(false);
467
468 if !warned_waiting_for_new_block {
469 info!(
470 extractor=%self.extractor_id,
471 block=incoming.number,
472 partial_idx=current_partial_idx,
473 "Syncing. Waiting for new block to start"
474 );
475 warned_waiting_for_new_block = true;
476 }
477 last_block_number = Some(incoming.number);
478 is_new_block
479 }
480 }
481 } else {
482 true };
484
485 if !is_new_block_candidate {
486 continue;
487 }
488
489 if let Some(current) = &self.last_synced_block {
491 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
492 if !warned_skipping_synced {
493 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
494 warned_skipping_synced = true;
495 }
496 continue;
497 }
498 }
499 break msg;
500 };
501
502 self.filter_deltas(&mut first_msg);
503
504 info!(height = first_msg.get_block().number, "First deltas received");
506 let header: BlockHeader = (&first_msg).into();
507 let deltas_msg = StateSyncMessage {
508 header: header.clone(),
509 snapshots: Default::default(),
510 deltas: Some(first_msg),
511 removed_components: Default::default(),
512 };
513
514 let msg = if !self.is_next_expected(&header) {
516 info!("Retrieving snapshot");
517 let snapshot_header = if self.partial_blocks && header.number > 0 {
520 BlockHeader {
521 number: header.number - 1,
522 hash: header.parent_hash.clone(),
523 ..Default::default()
524 }
525 } else {
526 BlockHeader { revert: false, ..header.clone() }
527 };
528 let snapshot = self
529 .get_snapshots::<Vec<&String>>(snapshot_header, None)
530 .await?
531 .merge(deltas_msg);
532 let n_components = self.component_tracker.components.len();
533 let n_snapshots = snapshot.snapshots.states.len();
534 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
535 snapshot
536 } else {
537 deltas_msg
538 };
539 block_tx.send(Ok(msg)).await?;
540 self.last_synced_block = Some(header.clone());
541 loop {
542 select! {
543 deltas_opt = msg_rx.recv() => {
544 if let Some(mut deltas) = deltas_opt {
545 let header: BlockHeader = (&deltas).into();
546 debug!(block_number=?header.number, "Received delta message");
547
548 let flushed_snapshots =
549 self.take_flushed_deferred_snapshots(&header).await?;
550
551 let (snapshots, removed_components) = {
552 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
555
556 let requiring_snapshot: Vec<String> = to_add
558 .iter()
559 .filter(|id| {
560 !self.component_tracker
561 .components
562 .contains_key(id.as_str())
563 })
564 .map(|id| id.to_string())
565 .collect();
566 debug!(components=?requiring_snapshot, "SnapshotRequest");
567 let requiring_snapshot_refs: Vec<&String> = requiring_snapshot.iter().collect();
568 self.component_tracker
569 .start_tracking(&requiring_snapshot_refs)
570 .await?;
571
572 let request_now: Vec<String> = if self.partial_blocks {
574 let (preexisting, brand_new) = requiring_snapshot
575 .into_iter()
576 .partition(|id| {
577 !deltas.new_protocol_components.contains_key(id.as_str())
578 });
579 self.deferred_snapshot_components.extend(brand_new);
580 preexisting
581 } else {
582 requiring_snapshot
583 };
584
585 let mut snapshots = if request_now.is_empty() {
586 Snapshot::default()
587 } else {
588 let snapshot_header = if self.partial_blocks && header.number > 0
589 {
590 BlockHeader {
592 number: header.number - 1,
593 hash: header.parent_hash.clone(),
594 ..Default::default()
595 }
596 } else {
597 BlockHeader { revert: false, ..header.clone() }
598 };
599 self.get_snapshots(snapshot_header, Some(&request_now))
600 .await?
601 .snapshots
602 };
603
604 snapshots.extend(flushed_snapshots);
605
606 let removed_components = if !to_remove.is_empty() {
607 self.component_tracker.stop_tracking(&to_remove)
608 } else {
609 Default::default()
610 };
611
612 (snapshots, removed_components)
613 };
614
615 self.component_tracker.process_entrypoints(&deltas.dci_update);
617
618 self.filter_deltas(&mut deltas);
620 let n_changes = deltas.n_changes();
621
622 let next = StateSyncMessage {
624 header: header.clone(),
625 snapshots,
626 deltas: Some(deltas),
627 removed_components,
628 };
629 block_tx.send(Ok(next)).await?;
630 self.last_synced_block = Some(header.clone());
631
632 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
633 } else {
634 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
635 }
636 },
637 _ = &mut end_rx => {
638 info!("Received close signal during state_sync");
639 return Ok(());
640 }
641 }
642 }
643 }.await;
644
645 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
647 let _ = self
649 .deltas_client
650 .unsubscribe(subscription_id)
651 .await
652 .map_err(|err| {
653 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
654 });
655
656 match result {
659 Ok(()) => Ok(()), Err(e) => {
661 Err((e, Some(end_rx)))
665 }
666 }
667 }
668
669 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
670 if let Some(block) = self.last_synced_block.as_ref() {
671 return incoming.parent_hash == block.hash;
672 }
673 false
674 }
675 fn filter_deltas(&self, deltas: &mut BlockChanges) {
676 deltas.filter_by_component(|id| {
677 self.component_tracker
678 .components
679 .contains_key(id)
680 });
681 deltas.filter_by_contract(|id| {
682 self.component_tracker
683 .contracts
684 .contains(id)
685 });
686 }
687}
688
689#[async_trait]
690impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
691where
692 R: RPCClient + Clone + Send + Sync + 'static,
693 D: DeltasClient + Clone + Send + Sync + 'static,
694{
695 async fn initialize(&mut self) -> SyncResult<()> {
696 info!("Retrieving relevant protocol components");
697 self.component_tracker
698 .initialise_components()
699 .await?;
700 info!(
701 n_components = self.component_tracker.components.len(),
702 n_contracts = self.component_tracker.contracts.len(),
703 extractor = %self.extractor_id,
704 "Finished retrieving components",
705 );
706
707 Ok(())
708 }
709
710 async fn start(
711 mut self,
712 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
713 let (mut tx, rx) = channel(15);
714 let (end_tx, end_rx) = oneshot::channel::<()>();
715
716 let jh = tokio::spawn(async move {
717 let mut retry_count = 0;
718 let mut current_end_rx = end_rx;
719 let mut final_error = None;
720
721 while retry_count < self.max_retries {
722 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
723
724 let res = self
725 .state_sync(&mut tx, current_end_rx)
726 .await;
727 match res {
728 Ok(()) => {
729 info!(
730 extractor_id=%&self.extractor_id,
731 retry_count,
732 "State synchronization exited cleanly"
733 );
734 return;
735 }
736 Err((e, maybe_end_rx)) => {
737 warn!(
738 extractor_id=%&self.extractor_id,
739 retry_count,
740 error=%e,
741 "State synchronization errored!"
742 );
743
744 if let Some(recovered_end_rx) = maybe_end_rx {
746 current_end_rx = recovered_end_rx;
747
748 if let SynchronizerError::ConnectionClosed = e {
749 error!(
751 "Websocket connection closed. State synchronization exiting."
752 );
753 let _ = tx.send(Err(e)).await;
754 return;
755 } else {
756 final_error = Some(e);
758 }
759 } else {
760 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
762 return;
763 }
764 }
765 }
766 sleep(self.retry_cooldown).await;
767 retry_count += 1;
768 }
769 if let Some(e) = final_error {
770 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
771 let _ = tx.send(Err(e)).await;
772 }
773 });
774
775 let handle = SynchronizerTaskHandle::new(jh, end_tx);
776 (handle, rx)
777 }
778}
779
780#[cfg(test)]
781mod test {
782 use std::{collections::HashSet, sync::Arc};
801
802 use tycho_common::dto::{
803 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
804 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
805 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
806 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
807 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
808 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
809 };
810 use uuid::Uuid;
811
812 use super::*;
813 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
814
815 struct ArcRPCClient<T>(Arc<T>);
817
818 impl<T> Clone for ArcRPCClient<T> {
820 fn clone(&self) -> Self {
821 ArcRPCClient(self.0.clone())
822 }
823 }
824
825 #[async_trait]
826 impl<T> RPCClient for ArcRPCClient<T>
827 where
828 T: RPCClient + Sync + Send + 'static,
829 {
830 async fn get_tokens(
831 &self,
832 request: &TokensRequestBody,
833 ) -> Result<TokensRequestResponse, RPCError> {
834 self.0.get_tokens(request).await
835 }
836
837 async fn get_contract_state(
838 &self,
839 request: &StateRequestBody,
840 ) -> Result<StateRequestResponse, RPCError> {
841 self.0.get_contract_state(request).await
842 }
843
844 async fn get_protocol_components(
845 &self,
846 request: &ProtocolComponentsRequestBody,
847 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
848 self.0
849 .get_protocol_components(request)
850 .await
851 }
852
853 async fn get_protocol_states(
854 &self,
855 request: &ProtocolStateRequestBody,
856 ) -> Result<ProtocolStateRequestResponse, RPCError> {
857 self.0
858 .get_protocol_states(request)
859 .await
860 }
861
862 async fn get_protocol_systems(
863 &self,
864 request: &ProtocolSystemsRequestBody,
865 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
866 self.0
867 .get_protocol_systems(request)
868 .await
869 }
870
871 async fn get_component_tvl(
872 &self,
873 request: &ComponentTvlRequestBody,
874 ) -> Result<ComponentTvlRequestResponse, RPCError> {
875 self.0.get_component_tvl(request).await
876 }
877
878 async fn get_traced_entry_points(
879 &self,
880 request: &TracedEntryPointRequestBody,
881 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
882 self.0
883 .get_traced_entry_points(request)
884 .await
885 }
886
887 async fn get_snapshots<'a>(
888 &self,
889 request: &SnapshotParameters<'a>,
890 chunk_size: Option<usize>,
891 concurrency: usize,
892 ) -> Result<Snapshot, RPCError> {
893 self.0
894 .get_snapshots(request, chunk_size, concurrency)
895 .await
896 }
897
898 fn compression(&self) -> bool {
899 self.0.compression()
900 }
901 }
902
903 struct ArcDeltasClient<T>(Arc<T>);
905
906 impl<T> Clone for ArcDeltasClient<T> {
908 fn clone(&self) -> Self {
909 ArcDeltasClient(self.0.clone())
910 }
911 }
912
913 #[async_trait]
914 impl<T> DeltasClient for ArcDeltasClient<T>
915 where
916 T: DeltasClient + Sync + Send + 'static,
917 {
918 async fn subscribe(
919 &self,
920 extractor_id: ExtractorIdentity,
921 options: SubscriptionOptions,
922 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
923 self.0
924 .subscribe(extractor_id, options)
925 .await
926 }
927
928 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
929 self.0
930 .unsubscribe(subscription_id)
931 .await
932 }
933
934 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
935 self.0.connect().await
936 }
937
938 async fn close(&self) -> Result<(), DeltasError> {
939 self.0.close().await
940 }
941 }
942
943 fn with_mocked_clients(
944 native: bool,
945 include_tvl: bool,
946 rpc_client: Option<MockRPCClient>,
947 deltas_client: Option<MockDeltasClient>,
948 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
949 {
950 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
951 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
952
953 ProtocolStateSynchronizer::new(
954 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
955 native,
956 ComponentFilter::with_tvl_range(50.0, 50.0),
957 1,
958 Duration::from_secs(0),
959 true,
960 include_tvl,
961 true, rpc_client,
963 deltas_client,
964 10_u64,
965 )
966 }
967
968 fn state_snapshot_native() -> ProtocolStateRequestResponse {
969 ProtocolStateRequestResponse {
970 states: vec![ResponseProtocolState {
971 component_id: "Component1".to_string(),
972 ..Default::default()
973 }],
974 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
975 }
976 }
977
978 fn make_mock_client() -> MockRPCClient {
979 let mut m = MockRPCClient::new();
980 m.expect_compression()
981 .return_const(false);
982 m
983 }
984
985 #[test_log::test(tokio::test)]
986 async fn test_get_snapshots_native() {
987 let header = BlockHeader::default();
988 let mut rpc = make_mock_client();
989 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
990
991 let component_clone = component.clone();
992 rpc.expect_get_snapshots()
993 .returning(move |_request, _chunk_size, _concurrency| {
994 Ok(Snapshot {
995 states: state_snapshot_native()
996 .states
997 .into_iter()
998 .map(|state| {
999 (
1000 state.component_id.clone(),
1001 ComponentWithState {
1002 state,
1003 component: component_clone.clone(),
1004 entrypoints: vec![],
1005 component_tvl: None,
1006 },
1007 )
1008 })
1009 .collect(),
1010 vm_storage: HashMap::new(),
1011 })
1012 });
1013
1014 rpc.expect_get_traced_entry_points()
1015 .returning(|_| {
1016 Ok(TracedEntryPointRequestResponse {
1017 traced_entry_points: HashMap::new(),
1018 pagination: PaginationResponse::new(0, 20, 0),
1019 })
1020 });
1021
1022 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1023 state_sync
1024 .component_tracker
1025 .components
1026 .insert("Component1".to_string(), component.clone());
1027 let components_arg = ["Component1".to_string()];
1028 let exp = StateSyncMessage {
1029 header: header.clone(),
1030 snapshots: Snapshot {
1031 states: state_snapshot_native()
1032 .states
1033 .into_iter()
1034 .map(|state| {
1035 (
1036 state.component_id.clone(),
1037 ComponentWithState {
1038 state,
1039 component: component.clone(),
1040 entrypoints: vec![],
1041 component_tvl: None,
1042 },
1043 )
1044 })
1045 .collect(),
1046 vm_storage: HashMap::new(),
1047 },
1048 deltas: None,
1049 removed_components: Default::default(),
1050 };
1051
1052 let snap = state_sync
1053 .get_snapshots(header, Some(&components_arg))
1054 .await
1055 .expect("Retrieving snapshot failed");
1056
1057 assert_eq!(snap, exp);
1058 }
1059
1060 #[test_log::test(tokio::test)]
1061 async fn test_get_snapshots_native_with_tvl() {
1062 let header = BlockHeader::default();
1063 let mut rpc = make_mock_client();
1064 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1065
1066 let component_clone = component.clone();
1067 rpc.expect_get_snapshots()
1068 .returning(move |_request, _chunk_size, _concurrency| {
1069 Ok(Snapshot {
1070 states: state_snapshot_native()
1071 .states
1072 .into_iter()
1073 .map(|state| {
1074 (
1075 state.component_id.clone(),
1076 ComponentWithState {
1077 state,
1078 component: component_clone.clone(),
1079 component_tvl: Some(100.0),
1080 entrypoints: vec![],
1081 },
1082 )
1083 })
1084 .collect(),
1085 vm_storage: HashMap::new(),
1086 })
1087 });
1088
1089 rpc.expect_get_traced_entry_points()
1090 .returning(|_| {
1091 Ok(TracedEntryPointRequestResponse {
1092 traced_entry_points: HashMap::new(),
1093 pagination: PaginationResponse::new(0, 20, 0),
1094 })
1095 });
1096
1097 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1098 state_sync
1099 .component_tracker
1100 .components
1101 .insert("Component1".to_string(), component.clone());
1102 let components_arg = ["Component1".to_string()];
1103 let exp = StateSyncMessage {
1104 header: header.clone(),
1105 snapshots: Snapshot {
1106 states: state_snapshot_native()
1107 .states
1108 .into_iter()
1109 .map(|state| {
1110 (
1111 state.component_id.clone(),
1112 ComponentWithState {
1113 state,
1114 component: component.clone(),
1115 component_tvl: Some(100.0),
1116 entrypoints: vec![],
1117 },
1118 )
1119 })
1120 .collect(),
1121 vm_storage: HashMap::new(),
1122 },
1123 deltas: None,
1124 removed_components: Default::default(),
1125 };
1126
1127 let snap = state_sync
1128 .get_snapshots(header, Some(&components_arg))
1129 .await
1130 .expect("Retrieving snapshot failed");
1131
1132 assert_eq!(snap, exp);
1133 }
1134
1135 fn state_snapshot_vm() -> StateRequestResponse {
1136 StateRequestResponse {
1137 accounts: vec![
1138 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1139 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1140 ],
1141 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1142 }
1143 }
1144
1145 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1146 TracedEntryPointRequestResponse {
1147 traced_entry_points: HashMap::from([(
1148 "Component1".to_string(),
1149 vec![(
1150 EntryPointWithTracingParams {
1151 entry_point: EntryPoint {
1152 external_id: "entrypoint_a".to_string(),
1153 target: Bytes::from("0x0badc0ffee"),
1154 signature: "sig()".to_string(),
1155 },
1156 params: TracingParams::RPCTracer(RPCTracerParams {
1157 caller: Some(Bytes::from("0x0badc0ffee")),
1158 calldata: Bytes::from("0x0badc0ffee"),
1159 state_overrides: None,
1160 prune_addresses: None,
1161 }),
1162 },
1163 TracingResult {
1164 retriggers: HashSet::from([(
1165 Bytes::from("0x0badc0ffee"),
1166 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1167 )]),
1168 accessed_slots: HashMap::from([(
1169 Bytes::from("0x0badc0ffee"),
1170 HashSet::from([Bytes::from("0xbadbeef0")]),
1171 )]),
1172 },
1173 )],
1174 )]),
1175 pagination: PaginationResponse::new(0, 20, 0),
1176 }
1177 }
1178
1179 #[test_log::test(tokio::test)]
1180 async fn test_get_snapshots_vm() {
1181 let header = BlockHeader::default();
1182 let mut rpc = make_mock_client();
1183
1184 let traced_ep_response = traced_entry_point_response();
1185 rpc.expect_get_snapshots()
1186 .returning(move |_request, _chunk_size, _concurrency| {
1187 let vm_storage_accounts = state_snapshot_vm();
1188 Ok(Snapshot {
1189 states: [(
1190 "Component1".to_string(),
1191 ComponentWithState {
1192 state: ResponseProtocolState {
1193 component_id: "Component1".to_string(),
1194 ..Default::default()
1195 },
1196 component: ProtocolComponent {
1197 id: "Component1".to_string(),
1198 contract_ids: vec![
1199 Bytes::from("0x0badc0ffee"),
1200 Bytes::from("0xbabe42"),
1201 ],
1202 ..Default::default()
1203 },
1204 component_tvl: None,
1205 entrypoints: traced_ep_response
1206 .traced_entry_points
1207 .get("Component1")
1208 .cloned()
1209 .unwrap_or_default(),
1210 },
1211 )]
1212 .into_iter()
1213 .collect(),
1214 vm_storage: vm_storage_accounts
1215 .accounts
1216 .into_iter()
1217 .map(|state| (state.address.clone(), state))
1218 .collect(),
1219 })
1220 });
1221
1222 rpc.expect_get_traced_entry_points()
1223 .returning(|_| Ok(traced_entry_point_response()));
1224
1225 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1226 let component = ProtocolComponent {
1227 id: "Component1".to_string(),
1228 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1229 ..Default::default()
1230 };
1231 state_sync
1232 .component_tracker
1233 .components
1234 .insert("Component1".to_string(), component.clone());
1235 let components_arg = ["Component1".to_string()];
1236 let exp = StateSyncMessage {
1237 header: header.clone(),
1238 snapshots: Snapshot {
1239 states: [(
1240 component.id.clone(),
1241 ComponentWithState {
1242 state: ResponseProtocolState {
1243 component_id: "Component1".to_string(),
1244 ..Default::default()
1245 },
1246 component: component.clone(),
1247 component_tvl: None,
1248 entrypoints: vec![(
1249 EntryPointWithTracingParams {
1250 entry_point: EntryPoint {
1251 external_id: "entrypoint_a".to_string(),
1252 target: Bytes::from("0x0badc0ffee"),
1253 signature: "sig()".to_string(),
1254 },
1255 params: TracingParams::RPCTracer(RPCTracerParams {
1256 caller: Some(Bytes::from("0x0badc0ffee")),
1257 calldata: Bytes::from("0x0badc0ffee"),
1258 state_overrides: None,
1259 prune_addresses: None,
1260 }),
1261 },
1262 TracingResult {
1263 retriggers: HashSet::from([(
1264 Bytes::from("0x0badc0ffee"),
1265 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1266 )]),
1267 accessed_slots: HashMap::from([(
1268 Bytes::from("0x0badc0ffee"),
1269 HashSet::from([Bytes::from("0xbadbeef0")]),
1270 )]),
1271 },
1272 )],
1273 },
1274 )]
1275 .into_iter()
1276 .collect(),
1277 vm_storage: state_snapshot_vm()
1278 .accounts
1279 .into_iter()
1280 .map(|state| (state.address.clone(), state))
1281 .collect(),
1282 },
1283 deltas: None,
1284 removed_components: Default::default(),
1285 };
1286
1287 let snap = state_sync
1288 .get_snapshots(header, Some(&components_arg))
1289 .await
1290 .expect("Retrieving snapshot failed");
1291
1292 assert_eq!(snap, exp);
1293 }
1294
1295 #[test_log::test(tokio::test)]
1296 async fn test_get_snapshots_vm_with_tvl() {
1297 let header = BlockHeader::default();
1298 let mut rpc = make_mock_client();
1299 let component = ProtocolComponent {
1300 id: "Component1".to_string(),
1301 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1302 ..Default::default()
1303 };
1304
1305 let component_clone = component.clone();
1306 rpc.expect_get_snapshots()
1307 .returning(move |_request, _chunk_size, _concurrency| {
1308 let vm_storage_accounts = state_snapshot_vm();
1309 Ok(Snapshot {
1310 states: [(
1311 "Component1".to_string(),
1312 ComponentWithState {
1313 state: ResponseProtocolState {
1314 component_id: "Component1".to_string(),
1315 ..Default::default()
1316 },
1317 component: component_clone.clone(),
1318 component_tvl: Some(100.0),
1319 entrypoints: vec![],
1320 },
1321 )]
1322 .into_iter()
1323 .collect(),
1324 vm_storage: vm_storage_accounts
1325 .accounts
1326 .into_iter()
1327 .map(|state| (state.address.clone(), state))
1328 .collect(),
1329 })
1330 });
1331
1332 rpc.expect_get_traced_entry_points()
1333 .returning(|_| {
1334 Ok(TracedEntryPointRequestResponse {
1335 traced_entry_points: HashMap::new(),
1336 pagination: PaginationResponse::new(0, 20, 0),
1337 })
1338 });
1339
1340 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1341 state_sync
1342 .component_tracker
1343 .components
1344 .insert("Component1".to_string(), component.clone());
1345 let components_arg = ["Component1".to_string()];
1346 let exp = StateSyncMessage {
1347 header: header.clone(),
1348 snapshots: Snapshot {
1349 states: [(
1350 component.id.clone(),
1351 ComponentWithState {
1352 state: ResponseProtocolState {
1353 component_id: "Component1".to_string(),
1354 ..Default::default()
1355 },
1356 component: component.clone(),
1357 component_tvl: Some(100.0),
1358 entrypoints: vec![],
1359 },
1360 )]
1361 .into_iter()
1362 .collect(),
1363 vm_storage: state_snapshot_vm()
1364 .accounts
1365 .into_iter()
1366 .map(|state| (state.address.clone(), state))
1367 .collect(),
1368 },
1369 deltas: None,
1370 removed_components: Default::default(),
1371 };
1372
1373 let snap = state_sync
1374 .get_snapshots(header, Some(&components_arg))
1375 .await
1376 .expect("Retrieving snapshot failed");
1377
1378 assert_eq!(snap, exp);
1379 }
1380
1381 #[test_log::test(tokio::test)]
1385 async fn test_get_snapshots_filters_to_requested_components_only() {
1386 let header = BlockHeader::default();
1387 let mut rpc = make_mock_client();
1388
1389 let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1391 let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1392 let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1393
1394 let component2_clone = component2.clone();
1395
1396 rpc.expect_get_snapshots()
1398 .withf(
1399 |request: &SnapshotParameters,
1400 _chunk_size: &Option<usize>,
1401 _concurrency: &usize| {
1402 request.components.len() == 1 &&
1404 request
1405 .components
1406 .contains_key("Component2")
1407 },
1408 )
1409 .times(1)
1410 .returning(move |_request, _chunk_size, _concurrency| {
1411 Ok(Snapshot {
1412 states: [(
1413 "Component2".to_string(),
1414 ComponentWithState {
1415 state: ResponseProtocolState {
1416 component_id: "Component2".to_string(),
1417 ..Default::default()
1418 },
1419 component: component2_clone.clone(),
1420 entrypoints: vec![],
1421 component_tvl: None,
1422 },
1423 )]
1424 .into_iter()
1425 .collect(),
1426 vm_storage: HashMap::new(),
1427 })
1428 });
1429
1430 rpc.expect_get_traced_entry_points()
1431 .returning(|_| {
1432 Ok(TracedEntryPointRequestResponse {
1433 traced_entry_points: HashMap::new(),
1434 pagination: PaginationResponse::new(0, 20, 0),
1435 })
1436 });
1437
1438 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1439
1440 state_sync
1442 .component_tracker
1443 .components
1444 .insert("Component1".to_string(), component1.clone());
1445 state_sync
1446 .component_tracker
1447 .components
1448 .insert("Component2".to_string(), component2.clone());
1449 state_sync
1450 .component_tracker
1451 .components
1452 .insert("Component3".to_string(), component3.clone());
1453
1454 let components_arg = ["Component2".to_string()];
1456
1457 let snap = state_sync
1458 .get_snapshots(header.clone(), Some(&components_arg))
1459 .await
1460 .expect("Retrieving snapshot failed");
1461
1462 assert_eq!(snap.snapshots.states.len(), 1);
1464 assert!(snap
1465 .snapshots
1466 .states
1467 .contains_key("Component2"));
1468 assert!(!snap
1469 .snapshots
1470 .states
1471 .contains_key("Component1"));
1472 assert!(!snap
1473 .snapshots
1474 .states
1475 .contains_key("Component3"));
1476 }
1477
1478 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1479 let mut rpc_client = make_mock_client();
1480 rpc_client
1483 .expect_get_protocol_components()
1484 .with(mockall::predicate::function(
1485 move |request_params: &ProtocolComponentsRequestBody| {
1486 if let Some(ids) = request_params.component_ids.as_ref() {
1487 ids.contains(&"Component3".to_string())
1488 } else {
1489 false
1490 }
1491 },
1492 ))
1493 .returning(|_| {
1494 Ok(ProtocolComponentRequestResponse {
1496 protocol_components: vec![
1497 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1499 ],
1500 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1501 })
1502 });
1503 rpc_client
1505 .expect_get_snapshots()
1506 .withf(
1507 |request: &SnapshotParameters,
1508 _chunk_size: &Option<usize>,
1509 _concurrency: &usize| {
1510 request
1511 .components
1512 .contains_key("Component3")
1513 },
1514 )
1515 .returning(|_request, _chunk_size, _concurrency| {
1516 Ok(Snapshot {
1517 states: [(
1518 "Component3".to_string(),
1519 ComponentWithState {
1520 state: ResponseProtocolState {
1521 component_id: "Component3".to_string(),
1522 ..Default::default()
1523 },
1524 component: ProtocolComponent {
1525 id: "Component3".to_string(),
1526 ..Default::default()
1527 },
1528 component_tvl: Some(1000.0),
1529 entrypoints: vec![],
1530 },
1531 )]
1532 .into_iter()
1533 .collect(),
1534 vm_storage: HashMap::new(),
1535 })
1536 });
1537
1538 rpc_client
1540 .expect_get_protocol_components()
1541 .returning(|_| {
1542 Ok(ProtocolComponentRequestResponse {
1544 protocol_components: vec![
1545 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1547 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1549 ],
1551 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1552 })
1553 });
1554
1555 rpc_client
1556 .expect_get_snapshots()
1557 .returning(|_request, _chunk_size, _concurrency| {
1558 Ok(Snapshot {
1559 states: [
1560 (
1561 "Component1".to_string(),
1562 ComponentWithState {
1563 state: ResponseProtocolState {
1564 component_id: "Component1".to_string(),
1565 ..Default::default()
1566 },
1567 component: ProtocolComponent {
1568 id: "Component1".to_string(),
1569 ..Default::default()
1570 },
1571 component_tvl: Some(100.0),
1572 entrypoints: vec![],
1573 },
1574 ),
1575 (
1576 "Component2".to_string(),
1577 ComponentWithState {
1578 state: ResponseProtocolState {
1579 component_id: "Component2".to_string(),
1580 ..Default::default()
1581 },
1582 component: ProtocolComponent {
1583 id: "Component2".to_string(),
1584 ..Default::default()
1585 },
1586 component_tvl: Some(0.0),
1587 entrypoints: vec![],
1588 },
1589 ),
1590 ]
1591 .into_iter()
1592 .collect(),
1593 vm_storage: HashMap::new(),
1594 })
1595 });
1596
1597 rpc_client
1599 .expect_get_traced_entry_points()
1600 .returning(|_| {
1601 Ok(TracedEntryPointRequestResponse {
1602 traced_entry_points: HashMap::new(),
1603 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1604 })
1605 });
1606
1607 let mut deltas_client = MockDeltasClient::new();
1609 let (tx, rx) = channel(1);
1610 deltas_client
1611 .expect_subscribe()
1612 .return_once(move |_, _| {
1613 Ok((Uuid::default(), rx))
1615 });
1616
1617 deltas_client
1619 .expect_unsubscribe()
1620 .return_once(|_| Ok(()));
1621
1622 (rpc_client, deltas_client, tx)
1623 }
1624
1625 #[test_log::test(tokio::test)]
1632 async fn test_state_sync() {
1633 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1634 let deltas = [
1635 BlockChanges {
1636 extractor: "uniswap-v2".to_string(),
1637 chain: Chain::Ethereum,
1638 block: Block {
1639 number: 1,
1640 hash: Bytes::from("0x01"),
1641 parent_hash: Bytes::from("0x00"),
1642 chain: Chain::Ethereum,
1643 ts: Default::default(),
1644 },
1645 revert: false,
1646 dci_update: DCIUpdate {
1647 new_entrypoints: HashMap::from([(
1648 "Component1".to_string(),
1649 HashSet::from([EntryPoint {
1650 external_id: "entrypoint_a".to_string(),
1651 target: Bytes::from("0x0badc0ffee"),
1652 signature: "sig()".to_string(),
1653 }]),
1654 )]),
1655 new_entrypoint_params: HashMap::from([(
1656 "entrypoint_a".to_string(),
1657 HashSet::from([(
1658 TracingParams::RPCTracer(RPCTracerParams {
1659 caller: Some(Bytes::from("0x0badc0ffee")),
1660 calldata: Bytes::from("0x0badc0ffee"),
1661 state_overrides: None,
1662 prune_addresses: None,
1663 }),
1664 "Component1".to_string(),
1665 )]),
1666 )]),
1667 trace_results: HashMap::from([(
1668 "entrypoint_a".to_string(),
1669 TracingResult {
1670 retriggers: HashSet::from([(
1671 Bytes::from("0x0badc0ffee"),
1672 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1673 )]),
1674 accessed_slots: HashMap::from([(
1675 Bytes::from("0x0badc0ffee"),
1676 HashSet::from([Bytes::from("0xbadbeef0")]),
1677 )]),
1678 },
1679 )]),
1680 },
1681 ..Default::default()
1682 },
1683 BlockChanges {
1684 extractor: "uniswap-v2".to_string(),
1685 chain: Chain::Ethereum,
1686 block: Block {
1687 number: 2,
1688 hash: Bytes::from("0x02"),
1689 parent_hash: Bytes::from("0x01"),
1690 chain: Chain::Ethereum,
1691 ts: Default::default(),
1692 },
1693 revert: false,
1694 component_tvl: [
1695 ("Component1".to_string(), 100.0),
1696 ("Component2".to_string(), 0.0),
1697 ("Component3".to_string(), 1000.0),
1698 ]
1699 .into_iter()
1700 .collect(),
1701 ..Default::default()
1702 },
1703 ];
1704 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1705 state_sync
1706 .initialize()
1707 .await
1708 .expect("Init failed");
1709
1710 let (handle, mut rx) = state_sync.start().await;
1712 let (jh, close_tx) = handle.split();
1713 tx.send(deltas[0].clone())
1714 .await
1715 .expect("deltas channel msg 0 closed!");
1716 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1717 .await
1718 .expect("waiting for first state msg timed out!")
1719 .expect("state sync block sender closed!");
1720 tx.send(deltas[1].clone())
1721 .await
1722 .expect("deltas channel msg 1 closed!");
1723 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1724 .await
1725 .expect("waiting for second state msg timed out!")
1726 .expect("state sync block sender closed!");
1727 let _ = close_tx.send(());
1728 jh.await
1729 .expect("state sync task panicked!");
1730
1731 let exp1 = StateSyncMessage {
1733 header: BlockHeader {
1734 number: 1,
1735 hash: Bytes::from("0x01"),
1736 parent_hash: Bytes::from("0x00"),
1737 revert: false,
1738 ..Default::default()
1739 },
1740 snapshots: Snapshot {
1741 states: [
1742 (
1743 "Component1".to_string(),
1744 ComponentWithState {
1745 state: ResponseProtocolState {
1746 component_id: "Component1".to_string(),
1747 ..Default::default()
1748 },
1749 component: ProtocolComponent {
1750 id: "Component1".to_string(),
1751 ..Default::default()
1752 },
1753 component_tvl: Some(100.0),
1754 entrypoints: vec![],
1755 },
1756 ),
1757 (
1758 "Component2".to_string(),
1759 ComponentWithState {
1760 state: ResponseProtocolState {
1761 component_id: "Component2".to_string(),
1762 ..Default::default()
1763 },
1764 component: ProtocolComponent {
1765 id: "Component2".to_string(),
1766 ..Default::default()
1767 },
1768 component_tvl: Some(0.0),
1769 entrypoints: vec![],
1770 },
1771 ),
1772 ]
1773 .into_iter()
1774 .collect(),
1775 vm_storage: HashMap::new(),
1776 },
1777 deltas: Some(deltas[0].clone()),
1778 removed_components: Default::default(),
1779 };
1780
1781 let exp2 = StateSyncMessage {
1782 header: BlockHeader {
1783 number: 2,
1784 hash: Bytes::from("0x02"),
1785 parent_hash: Bytes::from("0x01"),
1786 revert: false,
1787 ..Default::default()
1788 },
1789 snapshots: Snapshot {
1790 states: [
1791 (
1793 "Component3".to_string(),
1794 ComponentWithState {
1795 state: ResponseProtocolState {
1796 component_id: "Component3".to_string(),
1797 ..Default::default()
1798 },
1799 component: ProtocolComponent {
1800 id: "Component3".to_string(),
1801 ..Default::default()
1802 },
1803 component_tvl: Some(1000.0),
1804 entrypoints: vec![],
1805 },
1806 ),
1807 ]
1808 .into_iter()
1809 .collect(),
1810 vm_storage: HashMap::new(),
1811 },
1812 deltas: Some(BlockChanges {
1815 extractor: "uniswap-v2".to_string(),
1816 chain: Chain::Ethereum,
1817 block: Block {
1818 number: 2,
1819 hash: Bytes::from("0x02"),
1820 parent_hash: Bytes::from("0x01"),
1821 chain: Chain::Ethereum,
1822 ts: Default::default(),
1823 },
1824 revert: false,
1825 component_tvl: [
1826 ("Component1".to_string(), 100.0),
1828 ("Component3".to_string(), 1000.0),
1829 ]
1830 .into_iter()
1831 .collect(),
1832 ..Default::default()
1833 }),
1834 removed_components: [(
1836 "Component2".to_string(),
1837 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1838 )]
1839 .into_iter()
1840 .collect(),
1841 };
1842 assert_eq!(first_msg.unwrap(), exp1);
1843 assert_eq!(second_msg.unwrap(), exp2);
1844 }
1845
1846 #[test_log::test(tokio::test)]
1847 async fn test_state_sync_with_tvl_range() {
1848 let remove_tvl_threshold = 5.0;
1850 let add_tvl_threshold = 7.0;
1851
1852 let mut rpc_client = make_mock_client();
1853 let mut deltas_client = MockDeltasClient::new();
1854
1855 rpc_client
1856 .expect_get_protocol_components()
1857 .with(mockall::predicate::function(
1858 move |request_params: &ProtocolComponentsRequestBody| {
1859 if let Some(ids) = request_params.component_ids.as_ref() {
1860 ids.contains(&"Component3".to_string())
1861 } else {
1862 false
1863 }
1864 },
1865 ))
1866 .returning(|_| {
1867 Ok(ProtocolComponentRequestResponse {
1868 protocol_components: vec![ProtocolComponent {
1869 id: "Component3".to_string(),
1870 ..Default::default()
1871 }],
1872 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1873 })
1874 });
1875 rpc_client
1877 .expect_get_snapshots()
1878 .withf(
1879 |request: &SnapshotParameters,
1880 _chunk_size: &Option<usize>,
1881 _concurrency: &usize| {
1882 request
1883 .components
1884 .contains_key("Component3")
1885 },
1886 )
1887 .returning(|_request, _chunk_size, _concurrency| {
1888 Ok(Snapshot {
1889 states: [(
1890 "Component3".to_string(),
1891 ComponentWithState {
1892 state: ResponseProtocolState {
1893 component_id: "Component3".to_string(),
1894 ..Default::default()
1895 },
1896 component: ProtocolComponent {
1897 id: "Component3".to_string(),
1898 ..Default::default()
1899 },
1900 component_tvl: Some(10.0),
1901 entrypoints: vec![],
1902 },
1903 )]
1904 .into_iter()
1905 .collect(),
1906 vm_storage: HashMap::new(),
1907 })
1908 });
1909
1910 rpc_client
1912 .expect_get_protocol_components()
1913 .returning(|_| {
1914 Ok(ProtocolComponentRequestResponse {
1915 protocol_components: vec![
1916 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1917 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1918 ],
1919 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1920 })
1921 });
1922
1923 rpc_client
1925 .expect_get_snapshots()
1926 .returning(|_request, _chunk_size, _concurrency| {
1927 Ok(Snapshot {
1928 states: [
1929 (
1930 "Component1".to_string(),
1931 ComponentWithState {
1932 state: ResponseProtocolState {
1933 component_id: "Component1".to_string(),
1934 ..Default::default()
1935 },
1936 component: ProtocolComponent {
1937 id: "Component1".to_string(),
1938 ..Default::default()
1939 },
1940 component_tvl: Some(6.0),
1941 entrypoints: vec![],
1942 },
1943 ),
1944 (
1945 "Component2".to_string(),
1946 ComponentWithState {
1947 state: ResponseProtocolState {
1948 component_id: "Component2".to_string(),
1949 ..Default::default()
1950 },
1951 component: ProtocolComponent {
1952 id: "Component2".to_string(),
1953 ..Default::default()
1954 },
1955 component_tvl: Some(2.0),
1956 entrypoints: vec![],
1957 },
1958 ),
1959 ]
1960 .into_iter()
1961 .collect(),
1962 vm_storage: HashMap::new(),
1963 })
1964 });
1965
1966 rpc_client
1968 .expect_get_traced_entry_points()
1969 .returning(|_| {
1970 Ok(TracedEntryPointRequestResponse {
1971 traced_entry_points: HashMap::new(),
1972 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1973 })
1974 });
1975
1976 let (tx, rx) = channel(1);
1977 deltas_client
1978 .expect_subscribe()
1979 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1980
1981 deltas_client
1983 .expect_unsubscribe()
1984 .return_once(|_| Ok(()));
1985
1986 let mut state_sync = ProtocolStateSynchronizer::new(
1987 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1988 true,
1989 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1990 1,
1991 Duration::from_secs(0),
1992 true,
1993 true,
1994 true,
1995 ArcRPCClient(Arc::new(rpc_client)),
1996 ArcDeltasClient(Arc::new(deltas_client)),
1997 10_u64,
1998 );
1999 state_sync
2000 .initialize()
2001 .await
2002 .expect("Init failed");
2003
2004 let deltas = [
2006 BlockChanges {
2007 extractor: "uniswap-v2".to_string(),
2008 chain: Chain::Ethereum,
2009 block: Block {
2010 number: 1,
2011 hash: Bytes::from("0x01"),
2012 parent_hash: Bytes::from("0x00"),
2013 chain: Chain::Ethereum,
2014 ts: Default::default(),
2015 },
2016 revert: false,
2017 ..Default::default()
2018 },
2019 BlockChanges {
2020 extractor: "uniswap-v2".to_string(),
2021 chain: Chain::Ethereum,
2022 block: Block {
2023 number: 2,
2024 hash: Bytes::from("0x02"),
2025 parent_hash: Bytes::from("0x01"),
2026 chain: Chain::Ethereum,
2027 ts: Default::default(),
2028 },
2029 revert: false,
2030 component_tvl: [
2031 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
2035 .into_iter()
2036 .collect(),
2037 ..Default::default()
2038 },
2039 ];
2040
2041 let (handle, mut rx) = state_sync.start().await;
2042 let (jh, close_tx) = handle.split();
2043
2044 tx.send(deltas[0].clone())
2046 .await
2047 .expect("deltas channel msg 0 closed!");
2048
2049 let _ = timeout(Duration::from_millis(100), rx.recv())
2051 .await
2052 .expect("waiting for first state msg timed out!")
2053 .expect("state sync block sender closed!");
2054
2055 tx.send(deltas[1].clone())
2057 .await
2058 .expect("deltas channel msg 1 closed!");
2059 let second_msg = timeout(Duration::from_millis(100), rx.recv())
2060 .await
2061 .expect("waiting for second state msg timed out!")
2062 .expect("state sync block sender closed!")
2063 .expect("no error");
2064
2065 let _ = close_tx.send(());
2066 jh.await
2067 .expect("state sync task panicked!");
2068
2069 let expected_second_msg = StateSyncMessage {
2070 header: BlockHeader {
2071 number: 2,
2072 hash: Bytes::from("0x02"),
2073 parent_hash: Bytes::from("0x01"),
2074 revert: false,
2075 ..Default::default()
2076 },
2077 snapshots: Snapshot {
2078 states: [(
2079 "Component3".to_string(),
2080 ComponentWithState {
2081 state: ResponseProtocolState {
2082 component_id: "Component3".to_string(),
2083 ..Default::default()
2084 },
2085 component: ProtocolComponent {
2086 id: "Component3".to_string(),
2087 ..Default::default()
2088 },
2089 component_tvl: Some(10.0),
2090 entrypoints: vec![], },
2092 )]
2093 .into_iter()
2094 .collect(),
2095 vm_storage: HashMap::new(),
2096 },
2097 deltas: Some(BlockChanges {
2098 extractor: "uniswap-v2".to_string(),
2099 chain: Chain::Ethereum,
2100 block: Block {
2101 number: 2,
2102 hash: Bytes::from("0x02"),
2103 parent_hash: Bytes::from("0x01"),
2104 chain: Chain::Ethereum,
2105 ts: Default::default(),
2106 },
2107 revert: false,
2108 component_tvl: [
2109 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
2112 .into_iter()
2113 .collect(),
2114 ..Default::default()
2115 }),
2116 removed_components: [(
2117 "Component2".to_string(),
2118 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2119 )]
2120 .into_iter()
2121 .collect(),
2122 };
2123
2124 assert_eq!(second_msg, expected_second_msg);
2125 }
2126
2127 #[test_log::test(tokio::test)]
2128 async fn test_public_close_api_functionality() {
2129 let mut rpc_client = make_mock_client();
2136 let mut deltas_client = MockDeltasClient::new();
2137
2138 rpc_client
2140 .expect_get_protocol_components()
2141 .returning(|_| {
2142 Ok(ProtocolComponentRequestResponse {
2143 protocol_components: vec![],
2144 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2145 })
2146 });
2147
2148 let (_tx, rx) = channel(1);
2150 deltas_client
2151 .expect_subscribe()
2152 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2153
2154 deltas_client
2156 .expect_unsubscribe()
2157 .return_once(|_| Ok(()));
2158
2159 let mut state_sync = ProtocolStateSynchronizer::new(
2160 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2161 true,
2162 ComponentFilter::with_tvl_range(0.0, 0.0),
2163 5, Duration::from_secs(0),
2165 true,
2166 false,
2167 true,
2168 ArcRPCClient(Arc::new(rpc_client)),
2169 ArcDeltasClient(Arc::new(deltas_client)),
2170 10000_u64, );
2172
2173 state_sync
2174 .initialize()
2175 .await
2176 .expect("Init should succeed");
2177
2178 let (handle, _rx) = state_sync.start().await;
2180 let (jh, close_tx) = handle.split();
2181
2182 tokio::time::sleep(Duration::from_millis(100)).await;
2184
2185 close_tx
2187 .send(())
2188 .expect("Should be able to send close signal");
2189 jh.await.expect("Task should not panic");
2191 }
2192
2193 #[test_log::test(tokio::test)]
2194 async fn test_cleanup_runs_when_state_sync_processing_errors() {
2195 let mut rpc_client = make_mock_client();
2200 let mut deltas_client = MockDeltasClient::new();
2201
2202 rpc_client
2204 .expect_get_protocol_components()
2205 .returning(|_| {
2206 Ok(ProtocolComponentRequestResponse {
2207 protocol_components: vec![],
2208 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2209 })
2210 });
2211
2212 rpc_client
2214 .expect_get_protocol_states()
2215 .returning(|_| {
2216 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2217 });
2218
2219 let (tx, rx) = channel(10);
2221 deltas_client
2222 .expect_subscribe()
2223 .return_once(move |_, _| {
2224 let delta = BlockChanges {
2226 extractor: "test".to_string(),
2227 chain: Chain::Ethereum,
2228 block: Block {
2229 hash: Bytes::from("0x0123"),
2230 number: 1,
2231 parent_hash: Bytes::from("0x0000"),
2232 chain: Chain::Ethereum,
2233 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2234 .unwrap()
2235 .naive_utc(),
2236 },
2237 revert: false,
2238 new_protocol_components: [(
2240 "new_component".to_string(),
2241 ProtocolComponent {
2242 id: "new_component".to_string(),
2243 protocol_system: "test_protocol".to_string(),
2244 protocol_type_name: "test".to_string(),
2245 chain: Chain::Ethereum,
2246 tokens: vec![Bytes::from("0x0badc0ffee")],
2247 contract_ids: vec![Bytes::from("0x0badc0ffee")],
2248 static_attributes: Default::default(),
2249 creation_tx: Default::default(),
2250 created_at: Default::default(),
2251 change: Default::default(),
2252 },
2253 )]
2254 .into_iter()
2255 .collect(),
2256 component_tvl: [("new_component".to_string(), 100.0)]
2257 .into_iter()
2258 .collect(),
2259 ..Default::default()
2260 };
2261
2262 tokio::spawn(async move {
2263 let _ = tx.send(delta).await;
2264 });
2266
2267 Ok((Uuid::default(), rx))
2268 });
2269
2270 deltas_client
2272 .expect_unsubscribe()
2273 .return_once(|_| Ok(()));
2274
2275 let mut state_sync = ProtocolStateSynchronizer::new(
2276 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2277 true,
2278 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2280 Duration::from_secs(0),
2281 true,
2282 false,
2283 true,
2284 ArcRPCClient(Arc::new(rpc_client)),
2285 ArcDeltasClient(Arc::new(deltas_client)),
2286 5000_u64,
2287 );
2288
2289 state_sync
2290 .initialize()
2291 .await
2292 .expect("Init should succeed");
2293
2294 state_sync.last_synced_block = Some(BlockHeader {
2296 hash: Bytes::from("0x0badc0ffee"),
2297 number: 42,
2298 parent_hash: Bytes::from("0xbadbeef0"),
2299 revert: false,
2300 timestamp: 123456789,
2301 partial_block_index: None,
2302 });
2303
2304 let (mut block_tx, _block_rx) = channel(10);
2306
2307 let (_end_tx, end_rx) = oneshot::channel::<()>();
2309 let result = state_sync
2310 .state_sync(&mut block_tx, end_rx)
2311 .await;
2312 assert!(result.is_err(), "state_sync should have errored during processing");
2314
2315 }
2318
2319 #[test_log::test(tokio::test)]
2320 async fn test_close_signal_while_waiting_for_first_deltas() {
2321 let mut rpc_client = make_mock_client();
2325 let mut deltas_client = MockDeltasClient::new();
2326
2327 rpc_client
2328 .expect_get_protocol_components()
2329 .returning(|_| {
2330 Ok(ProtocolComponentRequestResponse {
2331 protocol_components: vec![],
2332 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2333 })
2334 });
2335
2336 let (_tx, rx) = channel(1);
2337 deltas_client
2338 .expect_subscribe()
2339 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2340
2341 deltas_client
2342 .expect_unsubscribe()
2343 .return_once(|_| Ok(()));
2344
2345 let mut state_sync = ProtocolStateSynchronizer::new(
2346 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2347 true,
2348 ComponentFilter::with_tvl_range(0.0, 0.0),
2349 1,
2350 Duration::from_secs(0),
2351 true,
2352 true,
2353 false,
2354 ArcRPCClient(Arc::new(rpc_client)),
2355 ArcDeltasClient(Arc::new(deltas_client)),
2356 10000_u64,
2357 );
2358
2359 state_sync
2360 .initialize()
2361 .await
2362 .expect("Init should succeed");
2363
2364 let (mut block_tx, _block_rx) = channel(10);
2365 let (end_tx, end_rx) = oneshot::channel::<()>();
2366
2367 let state_sync_handle = tokio::spawn(async move {
2369 state_sync
2370 .state_sync(&mut block_tx, end_rx)
2371 .await
2372 });
2373
2374 tokio::time::sleep(Duration::from_millis(100)).await;
2376
2377 let _ = end_tx.send(());
2379
2380 let result = state_sync_handle
2382 .await
2383 .expect("Task should not panic");
2384 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2385
2386 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2387 }
2388
2389 #[test_log::test(tokio::test)]
2390 async fn test_close_signal_during_main_processing_loop() {
2391 let mut rpc_client = make_mock_client();
2397 let mut deltas_client = MockDeltasClient::new();
2398
2399 rpc_client
2401 .expect_get_protocol_components()
2402 .returning(|_| {
2403 Ok(ProtocolComponentRequestResponse {
2404 protocol_components: vec![],
2405 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2406 })
2407 });
2408
2409 rpc_client
2411 .expect_get_protocol_states()
2412 .returning(|_| {
2413 Ok(ProtocolStateRequestResponse {
2414 states: vec![],
2415 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2416 })
2417 });
2418
2419 rpc_client
2420 .expect_get_component_tvl()
2421 .returning(|_| {
2422 Ok(ComponentTvlRequestResponse {
2423 tvl: HashMap::new(),
2424 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2425 })
2426 });
2427
2428 rpc_client
2429 .expect_get_traced_entry_points()
2430 .returning(|_| {
2431 Ok(TracedEntryPointRequestResponse {
2432 traced_entry_points: HashMap::new(),
2433 pagination: PaginationResponse::new(0, 20, 0),
2434 })
2435 });
2436
2437 let (tx, rx) = channel(10);
2439 deltas_client
2440 .expect_subscribe()
2441 .return_once(move |_, _| {
2442 let first_delta = BlockChanges {
2444 extractor: "test".to_string(),
2445 chain: Chain::Ethereum,
2446 block: Block {
2447 hash: Bytes::from("0x0123"),
2448 number: 1,
2449 parent_hash: Bytes::from("0x0000"),
2450 chain: Chain::Ethereum,
2451 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2452 .unwrap()
2453 .naive_utc(),
2454 },
2455 revert: false,
2456 ..Default::default()
2457 };
2458
2459 tokio::spawn(async move {
2460 let _ = tx.send(first_delta).await;
2461 tokio::time::sleep(Duration::from_secs(30)).await;
2464 });
2465
2466 Ok((Uuid::default(), rx))
2467 });
2468
2469 deltas_client
2470 .expect_unsubscribe()
2471 .return_once(|_| Ok(()));
2472
2473 let mut state_sync = ProtocolStateSynchronizer::new(
2474 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2475 true,
2476 ComponentFilter::with_tvl_range(0.0, 1000.0),
2477 1,
2478 Duration::from_secs(0),
2479 true,
2480 false,
2481 true,
2482 ArcRPCClient(Arc::new(rpc_client)),
2483 ArcDeltasClient(Arc::new(deltas_client)),
2484 10000_u64,
2485 );
2486
2487 state_sync
2488 .initialize()
2489 .await
2490 .expect("Init should succeed");
2491
2492 let (mut block_tx, mut block_rx) = channel(10);
2493 let (end_tx, end_rx) = oneshot::channel::<()>();
2494
2495 let state_sync_handle = tokio::spawn(async move {
2497 state_sync
2498 .state_sync(&mut block_tx, end_rx)
2499 .await
2500 });
2501
2502 let first_snapshot = block_rx
2504 .recv()
2505 .await
2506 .expect("Should receive first snapshot")
2507 .expect("Synchronizer error");
2508 assert!(
2509 !first_snapshot
2510 .snapshots
2511 .states
2512 .is_empty() ||
2513 first_snapshot.deltas.is_some()
2514 );
2515 let _ = end_tx.send(());
2517
2518 let result = state_sync_handle
2520 .await
2521 .expect("Task should not panic");
2522 assert!(
2523 result.is_ok(),
2524 "state_sync should exit cleanly when closed after first message: {result:?}"
2525 );
2526 }
2527
2528 #[test_log::test(tokio::test)]
2529 async fn test_max_retries_exceeded_error_propagation() {
2530 let mut rpc_client = make_mock_client();
2534 let mut deltas_client = MockDeltasClient::new();
2535
2536 rpc_client
2538 .expect_get_protocol_components()
2539 .returning(|_| {
2540 Ok(ProtocolComponentRequestResponse {
2541 protocol_components: vec![],
2542 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2543 })
2544 });
2545
2546 deltas_client
2549 .expect_subscribe()
2550 .returning(|_, _| {
2551 Err(DeltasError::NotConnected)
2553 });
2554
2555 deltas_client
2557 .expect_unsubscribe()
2558 .returning(|_| Ok(()))
2559 .times(0..=5);
2560
2561 let mut state_sync = ProtocolStateSynchronizer::new(
2563 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2564 true,
2565 ComponentFilter::with_tvl_range(0.0, 1000.0),
2566 2, Duration::from_millis(10), true,
2569 false,
2570 true,
2571 ArcRPCClient(Arc::new(rpc_client)),
2572 ArcDeltasClient(Arc::new(deltas_client)),
2573 1000_u64,
2574 );
2575
2576 state_sync
2577 .initialize()
2578 .await
2579 .expect("Init should succeed");
2580
2581 let (handle, mut rx) = state_sync.start().await;
2583 let (jh, _close_tx) = handle.split();
2584
2585 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2586 .await
2587 .expect("responsds in time")
2588 .expect("channel open");
2589
2590 if let Err(err) = res {
2592 assert!(
2593 matches!(err, SynchronizerError::ConnectionClosed),
2594 "Expected ConnectionClosed error, got: {:?}",
2595 err
2596 );
2597 } else {
2598 panic!("Expected an error")
2599 }
2600
2601 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2603 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2604 }
2605
2606 #[test_log::test(tokio::test)]
2607 async fn test_is_next_expected() {
2608 let mut state_sync = with_mocked_clients(true, false, None, None);
2612
2613 let incoming_header = BlockHeader {
2615 number: 100,
2616 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2617 parent_hash: Bytes::from(
2618 "0x0000000000000000000000000000000000000000000000000000000000000000",
2619 ),
2620 revert: false,
2621 timestamp: 123456789,
2622 partial_block_index: None,
2623 };
2624 assert!(
2625 !state_sync.is_next_expected(&incoming_header),
2626 "Should return false when no previous block is set"
2627 );
2628
2629 let previous_header = BlockHeader {
2631 number: 99,
2632 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2633 parent_hash: Bytes::from(
2634 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2635 ),
2636 revert: false,
2637 timestamp: 123456788,
2638 partial_block_index: None,
2639 };
2640 state_sync.last_synced_block = Some(previous_header.clone());
2641
2642 assert!(
2643 state_sync.is_next_expected(&incoming_header),
2644 "Should return true when incoming parent_hash matches previous hash"
2645 );
2646
2647 let non_matching_header = BlockHeader {
2649 number: 100,
2650 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2651 parent_hash: Bytes::from(
2652 "0x1111111111111111111111111111111111111111111111111111111111111111",
2653 ), revert: false,
2655 timestamp: 123456789,
2656 partial_block_index: None,
2657 };
2658 assert!(
2659 !state_sync.is_next_expected(&non_matching_header),
2660 "Should return false when incoming parent_hash doesn't match previous hash"
2661 );
2662 }
2663
2664 #[test_log::test(tokio::test)]
2665 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2666 let mut rpc_client = make_mock_client();
2670 let mut deltas_client = MockDeltasClient::new();
2671
2672 rpc_client
2674 .expect_get_protocol_components()
2675 .returning(|_| {
2676 Ok(ProtocolComponentRequestResponse {
2677 protocol_components: vec![ProtocolComponent {
2678 id: "Component1".to_string(),
2679 ..Default::default()
2680 }],
2681 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2682 })
2683 });
2684
2685 let (tx, rx) = channel(10);
2687 deltas_client
2688 .expect_subscribe()
2689 .return_once(move |_, _| {
2690 let expected_next_delta = BlockChanges {
2691 extractor: "uniswap-v2".to_string(),
2692 chain: Chain::Ethereum,
2693 block: Block {
2694 hash: Bytes::from(
2695 "0x0000000000000000000000000000000000000000000000000000000000000002",
2696 ), number: 2,
2698 parent_hash: Bytes::from(
2699 "0x0000000000000000000000000000000000000000000000000000000000000001",
2700 ), chain: Chain::Ethereum,
2702 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2703 .unwrap()
2704 .naive_utc(),
2705 },
2706 revert: false,
2707 ..Default::default()
2708 };
2709
2710 tokio::spawn(async move {
2711 let _ = tx.send(expected_next_delta).await;
2712 });
2713
2714 Ok((Uuid::default(), rx))
2715 });
2716
2717 deltas_client
2718 .expect_unsubscribe()
2719 .return_once(|_| Ok(()));
2720
2721 let mut state_sync = ProtocolStateSynchronizer::new(
2722 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2723 true,
2724 ComponentFilter::with_tvl_range(0.0, 1000.0),
2725 1,
2726 Duration::from_secs(0),
2727 true, false,
2729 true,
2730 ArcRPCClient(Arc::new(rpc_client)),
2731 ArcDeltasClient(Arc::new(deltas_client)),
2732 10000_u64,
2733 );
2734
2735 state_sync
2737 .initialize()
2738 .await
2739 .expect("Init should succeed");
2740
2741 state_sync.last_synced_block = Some(BlockHeader {
2743 number: 1,
2744 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2746 "0x0000000000000000000000000000000000000000000000000000000000000000",
2747 ),
2748 revert: false,
2749 timestamp: 123456789,
2750 partial_block_index: None,
2751 });
2752
2753 let (mut block_tx, mut block_rx) = channel(10);
2754 let (end_tx, end_rx) = oneshot::channel::<()>();
2755
2756 let state_sync_handle = tokio::spawn(async move {
2758 state_sync
2759 .state_sync(&mut block_tx, end_rx)
2760 .await
2761 });
2762
2763 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2765 .await
2766 .expect("Should receive message within timeout")
2767 .expect("Channel should be open")
2768 .expect("Should not be an error");
2769
2770 let _ = end_tx.send(());
2772
2773 let _ = state_sync_handle
2775 .await
2776 .expect("Task should not panic");
2777
2778 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2781 assert!(
2782 result_msg.snapshots.states.is_empty(),
2783 "Should not contain snapshots when next expected block is received"
2784 );
2785
2786 if let Some(deltas) = &result_msg.deltas {
2788 assert_eq!(deltas.block.number, 2);
2789 assert_eq!(
2790 deltas.block.hash,
2791 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2792 );
2793 assert_eq!(
2794 deltas.block.parent_hash,
2795 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2796 );
2797 }
2798 }
2799
2800 #[test_log::test(tokio::test)]
2801 async fn test_skip_previously_processed_messages() {
2802 let mut rpc_client = make_mock_client();
2806 let mut deltas_client = MockDeltasClient::new();
2807
2808 rpc_client
2810 .expect_get_protocol_components()
2811 .returning(|_| {
2812 Ok(ProtocolComponentRequestResponse {
2813 protocol_components: vec![ProtocolComponent {
2814 id: "Component1".to_string(),
2815 ..Default::default()
2816 }],
2817 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2818 })
2819 });
2820
2821 rpc_client
2823 .expect_get_protocol_states()
2824 .returning(|_| {
2825 Ok(ProtocolStateRequestResponse {
2826 states: vec![ResponseProtocolState {
2827 component_id: "Component1".to_string(),
2828 ..Default::default()
2829 }],
2830 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2831 })
2832 });
2833
2834 rpc_client
2835 .expect_get_component_tvl()
2836 .returning(|_| {
2837 Ok(ComponentTvlRequestResponse {
2838 tvl: [("Component1".to_string(), 100.0)]
2839 .into_iter()
2840 .collect(),
2841 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2842 })
2843 });
2844
2845 rpc_client
2846 .expect_get_traced_entry_points()
2847 .returning(|_| {
2848 Ok(TracedEntryPointRequestResponse {
2849 traced_entry_points: HashMap::new(),
2850 pagination: PaginationResponse::new(0, 20, 0),
2851 })
2852 });
2853
2854 let (tx, rx) = channel(10);
2856 deltas_client
2857 .expect_subscribe()
2858 .return_once(move |_, _| {
2859 let old_messages = vec![
2861 BlockChanges {
2862 extractor: "uniswap-v2".to_string(),
2863 chain: Chain::Ethereum,
2864 block: Block {
2865 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2866 number: 3,
2867 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2868 chain: Chain::Ethereum,
2869 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2870 },
2871 revert: false,
2872 ..Default::default()
2873 },
2874 BlockChanges {
2875 extractor: "uniswap-v2".to_string(),
2876 chain: Chain::Ethereum,
2877 block: Block {
2878 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2879 number: 4,
2880 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2881 chain: Chain::Ethereum,
2882 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2883 },
2884 revert: false,
2885 ..Default::default()
2886 },
2887 BlockChanges {
2888 extractor: "uniswap-v2".to_string(),
2889 chain: Chain::Ethereum,
2890 block: Block {
2891 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2892 number: 5,
2893 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2894 chain: Chain::Ethereum,
2895 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2896 },
2897 revert: false,
2898 ..Default::default()
2899 },
2900 BlockChanges {
2902 extractor: "uniswap-v2".to_string(),
2903 chain: Chain::Ethereum,
2904 block: Block {
2905 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2906 number: 6,
2907 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2908 chain: Chain::Ethereum,
2909 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2910 },
2911 revert: false,
2912 ..Default::default()
2913 },
2914 ];
2915
2916 tokio::spawn(async move {
2917 for message in old_messages {
2918 let _ = tx.send(message).await;
2919 tokio::time::sleep(Duration::from_millis(10)).await;
2920 }
2921 });
2922
2923 Ok((Uuid::default(), rx))
2924 });
2925
2926 deltas_client
2927 .expect_unsubscribe()
2928 .return_once(|_| Ok(()));
2929
2930 let mut state_sync = ProtocolStateSynchronizer::new(
2931 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2932 true,
2933 ComponentFilter::with_tvl_range(0.0, 1000.0),
2934 1,
2935 Duration::from_secs(0),
2936 true,
2937 true,
2938 true,
2939 ArcRPCClient(Arc::new(rpc_client)),
2940 ArcDeltasClient(Arc::new(deltas_client)),
2941 10000_u64,
2942 );
2943
2944 state_sync
2946 .initialize()
2947 .await
2948 .expect("Init should succeed");
2949
2950 state_sync.last_synced_block = Some(BlockHeader {
2951 number: 5,
2952 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2953 parent_hash: Bytes::from(
2954 "0x0000000000000000000000000000000000000000000000000000000000000004",
2955 ),
2956 revert: false,
2957 timestamp: 1234567892,
2958 partial_block_index: None,
2959 });
2960
2961 let (mut block_tx, mut block_rx) = channel(10);
2962 let (end_tx, end_rx) = oneshot::channel::<()>();
2963
2964 let state_sync_handle = tokio::spawn(async move {
2966 state_sync
2967 .state_sync(&mut block_tx, end_rx)
2968 .await
2969 });
2970
2971 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2973 .await
2974 .expect("Should receive message within timeout")
2975 .expect("Channel should be open")
2976 .expect("Should not be an error");
2977
2978 let _ = end_tx.send(());
2980
2981 let _ = state_sync_handle
2983 .await
2984 .expect("Task should not panic");
2985
2986 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2988 if let Some(deltas) = &result_msg.deltas {
2989 assert_eq!(
2990 deltas.block.number, 6,
2991 "Should only process block 6, skipping earlier blocks"
2992 );
2993 assert_eq!(
2994 deltas.block.hash,
2995 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2996 );
2997 }
2998
2999 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3002 Err(_) => {
3003 }
3005 Ok(Some(Err(_))) => {
3006 }
3008 Ok(Some(Ok(_))) => {
3009 panic!("Should not receive additional messages - old blocks should be skipped");
3010 }
3011 Ok(None) => {
3012 }
3014 }
3015 }
3016
3017 fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
3018 let hash = Bytes::from(vec![block_num as u8; 32]);
3020 let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
3021 BlockChanges {
3022 extractor: "uniswap-v2".to_string(),
3023 chain: Chain::Ethereum,
3024 block: Block {
3025 number: block_num,
3026 hash,
3027 parent_hash,
3028 chain: Chain::Ethereum,
3029 ts: Default::default(),
3030 },
3031 revert: false,
3032 partial_block_index: partial_idx,
3033 ..Default::default()
3034 }
3035 }
3036
3037 #[test_log::test(tokio::test)]
3039 async fn test_partial_mode_accepts_full_block_as_first_message() {
3040 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3041 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3042 .with_partial_blocks(true);
3043 state_sync
3044 .initialize()
3045 .await
3046 .expect("Init failed");
3047
3048 let (handle, mut block_rx) = state_sync.start().await;
3049 let (jh, close_tx) = handle.split();
3050
3051 tx.send(make_block_changes(1, None))
3053 .await
3054 .unwrap();
3055
3056 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3058 .await
3059 .expect("Should receive message")
3060 .expect("Channel open")
3061 .expect("No error");
3062
3063 assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
3064 assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
3065
3066 let _ = close_tx.send(());
3067 jh.await.expect("Task should not panic");
3068 }
3069
3070 #[test_log::test(tokio::test)]
3072 async fn test_partial_mode_detects_block_number_increase() {
3073 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3074 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3075 .with_partial_blocks(true);
3076 state_sync
3077 .initialize()
3078 .await
3079 .expect("Init failed");
3080
3081 let (handle, mut block_rx) = state_sync.start().await;
3082 let (jh, close_tx) = handle.split();
3083
3084 tx.send(make_block_changes(1, Some(0)))
3086 .await
3087 .unwrap();
3088 tx.send(make_block_changes(1, Some(3)))
3089 .await
3090 .unwrap();
3091
3092 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3094 Err(_) => { }
3095 Ok(_) => panic!("Should not receive message while waiting for new block"),
3096 }
3097
3098 tx.send(make_block_changes(2, Some(5)))
3101 .await
3102 .unwrap();
3103
3104 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3106 .await
3107 .expect("Should receive message")
3108 .expect("Channel open")
3109 .expect("No error");
3110
3111 assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3112 assert_eq!(msg.header.partial_block_index, Some(5));
3113
3114 let _ = close_tx.send(());
3115 jh.await.expect("Task should not panic");
3116 }
3117
3118 #[test_log::test(tokio::test)]
3120 async fn test_partial_mode_skips_already_synced_blocks() {
3121 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3122 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3123 .with_partial_blocks(true);
3124 state_sync
3125 .initialize()
3126 .await
3127 .expect("Init failed");
3128
3129 state_sync.last_synced_block = Some(BlockHeader {
3131 number: 5,
3132 hash: Bytes::from("0x05"),
3133 parent_hash: Bytes::from("0x04"),
3134 revert: false,
3135 timestamp: 0,
3136 partial_block_index: None,
3137 });
3138
3139 let (handle, mut block_rx) = state_sync.start().await;
3140 let (jh, close_tx) = handle.split();
3141
3142 tx.send(make_block_changes(3, Some(2)))
3144 .await
3145 .unwrap();
3146
3147 tx.send(make_block_changes(4, Some(0)))
3149 .await
3150 .unwrap();
3151
3152 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3154 Err(_) => { }
3155 Ok(_) => panic!("Should skip block 4 because it's already synced"),
3156 }
3157
3158 tx.send(make_block_changes(5, Some(3)))
3161 .await
3162 .unwrap();
3163 tx.send(make_block_changes(6, Some(0)))
3165 .await
3166 .unwrap();
3167
3168 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3169 .await
3170 .expect("Should receive message")
3171 .expect("Channel open")
3172 .expect("No error");
3173
3174 assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3175
3176 let _ = close_tx.send(());
3177 jh.await.expect("Task should not panic");
3178 }
3179
3180 #[test_log::test(tokio::test)]
3181 async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3182 let header = BlockHeader::default();
3183 let mut rpc = make_mock_client();
3184 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3185
3186 let component_clone = component.clone();
3187 rpc.expect_get_snapshots()
3188 .returning(move |_request, _chunk_size, _concurrency| {
3189 Ok(Snapshot {
3190 states: [(
3191 "Component1".to_string(),
3192 ComponentWithState {
3193 state: ResponseProtocolState {
3194 component_id: "Component1".to_string(),
3195 ..Default::default()
3196 },
3197 component: component_clone.clone(),
3198 entrypoints: vec![],
3199 component_tvl: None,
3200 },
3201 )]
3202 .into_iter()
3203 .collect(),
3204 vm_storage: HashMap::new(),
3205 })
3206 });
3207
3208 rpc.expect_get_traced_entry_points()
3210 .never();
3211
3212 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3213 state_sync
3215 .component_tracker
3216 .components
3217 .insert("Component1".to_string(), component);
3218
3219 let components_arg = ["Component1".to_string()];
3220 let snap = state_sync
3221 .get_snapshots(header, Some(&components_arg))
3222 .await
3223 .expect("Retrieving snapshot failed");
3224
3225 assert!(snap
3226 .snapshots
3227 .states
3228 .contains_key("Component1"));
3229 }
3230
3231 #[test_log::test(tokio::test)]
3232 async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3233 let header = BlockHeader::default();
3234 let mut rpc = make_mock_client();
3235 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3236
3237 let component_clone = component.clone();
3238 rpc.expect_get_snapshots()
3239 .returning(move |_request, _chunk_size, _concurrency| {
3240 Ok(Snapshot {
3241 states: [(
3242 "Component1".to_string(),
3243 ComponentWithState {
3244 state: ResponseProtocolState {
3245 component_id: "Component1".to_string(),
3246 ..Default::default()
3247 },
3248 component: component_clone.clone(),
3249 entrypoints: vec![],
3250 component_tvl: None,
3251 },
3252 )]
3253 .into_iter()
3254 .collect(),
3255 vm_storage: HashMap::new(),
3256 })
3257 });
3258
3259 rpc.expect_get_traced_entry_points()
3261 .times(1)
3262 .returning(|_| {
3263 Ok(TracedEntryPointRequestResponse {
3264 traced_entry_points: HashMap::new(),
3265 pagination: PaginationResponse::new(0, 20, 0),
3266 })
3267 });
3268
3269 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3270 state_sync
3271 .component_tracker
3272 .components
3273 .insert("Component1".to_string(), component);
3274
3275 let components_arg = ["Component1".to_string()];
3276 let snap = state_sync
3277 .get_snapshots(header, Some(&components_arg))
3278 .await
3279 .expect("Retrieving snapshot failed");
3280
3281 assert!(snap
3282 .snapshots
3283 .states
3284 .contains_key("Component1"));
3285 }
3286
3287 #[test_log::test(tokio::test)]
3293 async fn test_partial_mode_defers_brand_new_component_snapshot_to_next_block() {
3294 use std::time::Duration;
3295
3296 use tokio::{sync::mpsc::channel, time::timeout};
3297
3298 let mut rpc_client = make_mock_client();
3299 rpc_client
3302 .expect_get_protocol_components()
3303 .with(mockall::predicate::function(|req: &ProtocolComponentsRequestBody| {
3304 req.component_ids
3305 .as_ref()
3306 .is_some_and(|ids| ids.contains(&"BrandNew".to_string()))
3307 }))
3308 .returning(|_| {
3309 Ok(ProtocolComponentRequestResponse {
3310 protocol_components: vec![
3311 ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3312 ProtocolComponent { id: "Preexisting".to_string(), ..Default::default() },
3313 ],
3314 pagination: PaginationResponse { page: 0, page_size: 20, total: 2 },
3315 })
3316 });
3317 rpc_client
3319 .expect_get_protocol_components()
3320 .returning(|_| {
3321 Ok(ProtocolComponentRequestResponse {
3322 protocol_components: vec![
3323 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
3324 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
3325 ],
3326 pagination: PaginationResponse { page: 0, page_size: 20, total: 2 },
3327 })
3328 });
3329 rpc_client
3333 .expect_get_snapshots()
3334 .withf(
3335 |request: &SnapshotParameters,
3336 _chunk_size: &Option<usize>,
3337 _concurrency: &usize| {
3338 request.block_number == 2 &&
3339 request
3340 .components
3341 .contains_key("BrandNew")
3342 },
3343 )
3344 .returning(|_request, _chunk_size, _concurrency| {
3345 Ok(Snapshot {
3346 states: [(
3347 "BrandNew".to_string(),
3348 ComponentWithState {
3349 state: ResponseProtocolState {
3350 component_id: "BrandNew".to_string(),
3351 ..Default::default()
3352 },
3353 component: ProtocolComponent {
3354 id: "BrandNew".to_string(),
3355 ..Default::default()
3356 },
3357 component_tvl: Some(100.0),
3358 entrypoints: vec![],
3359 },
3360 )]
3361 .into_iter()
3362 .collect(),
3363 vm_storage: HashMap::new(),
3364 })
3365 });
3366 rpc_client
3368 .expect_get_snapshots()
3369 .withf(
3370 |request: &SnapshotParameters,
3371 _chunk_size: &Option<usize>,
3372 _concurrency: &usize| {
3373 request.block_number == 1 &&
3374 request
3375 .components
3376 .contains_key("Preexisting")
3377 },
3378 )
3379 .returning(|_request, _chunk_size, _concurrency| {
3380 Ok(Snapshot {
3381 states: [(
3382 "Preexisting".to_string(),
3383 ComponentWithState {
3384 state: ResponseProtocolState {
3385 component_id: "Preexisting".to_string(),
3386 ..Default::default()
3387 },
3388 component: ProtocolComponent {
3389 id: "Preexisting".to_string(),
3390 ..Default::default()
3391 },
3392 component_tvl: Some(75.0),
3393 entrypoints: vec![],
3394 },
3395 )]
3396 .into_iter()
3397 .collect(),
3398 vm_storage: HashMap::new(),
3399 })
3400 });
3401 rpc_client
3403 .expect_get_snapshots()
3404 .returning(|_request, _chunk_size, _concurrency| {
3405 Ok(Snapshot {
3406 states: [
3407 (
3408 "Component1".to_string(),
3409 ComponentWithState {
3410 state: ResponseProtocolState {
3411 component_id: "Component1".to_string(),
3412 ..Default::default()
3413 },
3414 component: ProtocolComponent {
3415 id: "Component1".to_string(),
3416 ..Default::default()
3417 },
3418 component_tvl: Some(100.0),
3419 entrypoints: vec![],
3420 },
3421 ),
3422 (
3423 "Component2".to_string(),
3424 ComponentWithState {
3425 state: ResponseProtocolState {
3426 component_id: "Component2".to_string(),
3427 ..Default::default()
3428 },
3429 component: ProtocolComponent {
3430 id: "Component2".to_string(),
3431 ..Default::default()
3432 },
3433 component_tvl: Some(0.0),
3434 entrypoints: vec![],
3435 },
3436 ),
3437 ]
3438 .into_iter()
3439 .collect(),
3440 vm_storage: HashMap::new(),
3441 })
3442 });
3443 rpc_client
3444 .expect_get_traced_entry_points()
3445 .returning(|_| {
3446 Ok(TracedEntryPointRequestResponse {
3447 traced_entry_points: HashMap::new(),
3448 pagination: PaginationResponse::new(0, 100, 0),
3449 })
3450 });
3451
3452 let mut deltas_client = MockDeltasClient::new();
3453 let (tx, rx) = channel(1);
3454 deltas_client
3455 .expect_subscribe()
3456 .return_once(move |_, _| Ok((Uuid::default(), rx)));
3457 deltas_client
3458 .expect_unsubscribe()
3459 .return_once(|_| Ok(()));
3460
3461 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3462 .with_partial_blocks(true);
3463 state_sync
3464 .initialize()
3465 .await
3466 .expect("Init failed");
3467
3468 let (handle, mut block_rx) = state_sync.start().await;
3469 let (jh, close_tx) = handle.split();
3470
3471 tx.send(make_block_changes(1, None))
3473 .await
3474 .unwrap();
3475 let _msg1 = timeout(Duration::from_millis(200), block_rx.recv())
3476 .await
3477 .expect("Should receive initial + block 1")
3478 .expect("Channel open")
3479 .expect("No error");
3480
3481 let mut block2 = make_block_changes(2, Some(2));
3484 block2.new_protocol_components = HashMap::from([(
3485 "BrandNew".to_string(),
3486 ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3487 )]);
3488 block2.component_tvl = HashMap::from([
3489 ("BrandNew".to_string(), 100.0),
3490 ("Preexisting".to_string(), 75.0), ]);
3492 tx.send(block2).await.unwrap();
3493 let msg2 = timeout(Duration::from_millis(200), block_rx.recv())
3494 .await
3495 .expect("Should receive block 2")
3496 .expect("Channel open")
3497 .expect("No error");
3498
3499 assert!(
3500 msg2.snapshots.states.contains_key("Preexisting"),
3501 "Preexisting component snapshot should be requested immediately in same block; got keys: {:?}",
3502 msg2.snapshots.states.keys().collect::<Vec<_>>()
3503 );
3504 assert!(
3505 !msg2
3506 .snapshots
3507 .states
3508 .contains_key("BrandNew"),
3509 "Brand-new component snapshot should be deferred, not in block 2 message"
3510 );
3511
3512 tx.send(make_block_changes(3, Some(1)))
3515 .await
3516 .unwrap();
3517 let msg3 = timeout(Duration::from_millis(200), block_rx.recv())
3518 .await
3519 .expect("Should receive block 3")
3520 .expect("Channel open")
3521 .expect("No error");
3522
3523 assert_eq!(msg3.header.number, 3);
3524 assert_eq!(msg3.header.partial_block_index, Some(1), "First partial of block 3");
3525 assert!(
3526 msg3.snapshots.states.contains_key("BrandNew"),
3527 "Deferred brand-new component snapshot should be included in next block message; got keys: {:?}",
3528 msg3.snapshots.states.keys().collect::<Vec<_>>()
3529 );
3530
3531 let _ = close_tx.send(());
3532 jh.await.expect("Task should not panic");
3533 }
3534}