1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, Chain, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59}
60
61pub type SyncResult<T> = Result<T, SynchronizerError>;
62
63impl<T> From<SendError<T>> for SynchronizerError {
64 fn from(err: SendError<T>) -> Self {
65 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
66 }
67}
68
69impl From<DeltasError> for SynchronizerError {
70 fn from(err: DeltasError) -> Self {
71 match err {
72 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
73 _ => SynchronizerError::ConnectionError(err.to_string()),
74 }
75 }
76}
77
78pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
79 extractor_id: ExtractorIdentity,
80 retrieve_balances: bool,
81 rpc_client: R,
82 deltas_client: D,
83 max_retries: u64,
84 retry_cooldown: Duration,
85 include_snapshots: bool,
86 component_tracker: ComponentTracker<R>,
87 last_synced_block: Option<BlockHeader>,
88 timeout: u64,
89 include_tvl: bool,
90 compression: bool,
91}
92
93#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
94pub struct ComponentWithState {
95 pub state: ResponseProtocolState,
96 pub component: ProtocolComponent,
97 pub component_tvl: Option<f64>,
98 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
99}
100
101#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
102pub struct Snapshot {
103 pub states: HashMap<String, ComponentWithState>,
104 pub vm_storage: HashMap<Bytes, ResponseAccount>,
105}
106
107impl Snapshot {
108 fn extend(&mut self, other: Snapshot) {
109 self.states.extend(other.states);
110 self.vm_storage.extend(other.vm_storage);
111 }
112
113 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
114 &self.states
115 }
116
117 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
118 &self.vm_storage
119 }
120}
121
122#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
123pub struct StateSyncMessage<H>
124where
125 H: HeaderLike,
126{
127 pub header: H,
129 pub snapshots: Snapshot,
131 pub deltas: Option<BlockChanges>,
135 pub removed_components: HashMap<String, ProtocolComponent>,
137}
138
139impl<H> StateSyncMessage<H>
140where
141 H: HeaderLike,
142{
143 pub fn merge(mut self, other: Self) -> Self {
144 self.removed_components
146 .retain(|k, _| !other.snapshots.states.contains_key(k));
147 self.snapshots
148 .states
149 .retain(|k, _| !other.removed_components.contains_key(k));
150
151 self.snapshots.extend(other.snapshots);
152 let deltas = match (self.deltas, other.deltas) {
153 (Some(l), Some(r)) => Some(l.merge(r)),
154 (None, Some(r)) => Some(r),
155 (Some(l), None) => Some(l),
156 (None, None) => None,
157 };
158 self.removed_components
159 .extend(other.removed_components);
160 Self {
161 header: other.header,
162 snapshots: self.snapshots,
163 deltas,
164 removed_components: self.removed_components,
165 }
166 }
167}
168
169pub struct SynchronizerTaskHandle {
174 join_handle: JoinHandle<()>,
175 close_tx: oneshot::Sender<()>,
176}
177
178impl SynchronizerTaskHandle {
187 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
188 Self { join_handle, close_tx }
189 }
190
191 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
197 (self.join_handle, self.close_tx)
198 }
199}
200
201#[async_trait]
202pub trait StateSynchronizer: Send + Sync + 'static {
203 async fn initialize(&mut self) -> SyncResult<()>;
204 async fn start(
207 mut self,
208 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
209}
210
211impl<R, D> ProtocolStateSynchronizer<R, D>
212where
213 R: RPCClient + Clone + Send + Sync + 'static,
216 D: DeltasClient + Clone + Send + Sync + 'static,
217{
218 #[allow(clippy::too_many_arguments)]
220 pub fn new(
221 extractor_id: ExtractorIdentity,
222 retrieve_balances: bool,
223 component_filter: ComponentFilter,
224 max_retries: u64,
225 retry_cooldown: Duration,
226 include_snapshots: bool,
227 include_tvl: bool,
228 compression: bool,
229 rpc_client: R,
230 deltas_client: D,
231 timeout: u64,
232 ) -> Self {
233 Self {
234 extractor_id: extractor_id.clone(),
235 retrieve_balances,
236 rpc_client: rpc_client.clone(),
237 include_snapshots,
238 deltas_client,
239 component_tracker: ComponentTracker::new(
240 extractor_id.chain,
241 extractor_id.name.as_str(),
242 component_filter,
243 rpc_client,
244 ),
245 max_retries,
246 retry_cooldown,
247 last_synced_block: None,
248 timeout,
249 include_tvl,
250 compression,
251 }
252 }
253
254 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
256 &mut self,
257 header: BlockHeader,
258 ids: Option<I>,
259 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
260 if !self.include_snapshots {
261 return Ok(StateSyncMessage { header, ..Default::default() });
262 }
263
264 let component_ids: Vec<_> = match ids {
266 Some(ids) => ids.into_iter().cloned().collect(),
267 None => self
268 .component_tracker
269 .get_tracked_component_ids(),
270 };
271
272 if component_ids.is_empty() {
273 return Ok(StateSyncMessage { header, ..Default::default() });
274 }
275
276 let entrypoints_result = if self.extractor_id.chain == Chain::Ethereum {
279 let result = self
280 .rpc_client
281 .get_traced_entry_points_paginated(
282 self.extractor_id.chain,
283 &self.extractor_id.name,
284 &component_ids,
285 None,
286 RPC_CLIENT_CONCURRENCY,
287 )
288 .await?;
289 self.component_tracker
290 .process_entrypoints(&result.clone().into());
291 result.traced_entry_points.clone()
292 } else {
293 HashMap::new()
294 };
295
296 let contract_ids: Vec<Bytes> = self
298 .component_tracker
299 .get_contracts_by_component(&component_ids)
300 .into_iter()
301 .collect();
302
303 let request = SnapshotParameters::new(
304 self.extractor_id.chain,
305 &self.extractor_id.name,
306 &self.component_tracker.components,
307 &contract_ids,
308 header.number,
309 )
310 .entrypoints(&entrypoints_result)
311 .include_balances(self.retrieve_balances)
312 .include_tvl(self.include_tvl);
313 let snapshot_response = self
314 .rpc_client
315 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
316 .await?;
317
318 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
319 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
320
321 Ok(StateSyncMessage {
322 header,
323 snapshots: snapshot_response,
324 deltas: None,
325 removed_components: HashMap::new(),
326 })
327 }
328
329 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
342 async fn state_sync(
343 &mut self,
344 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
345 mut end_rx: oneshot::Receiver<()>,
346 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
347 let subscription_options = SubscriptionOptions::new()
349 .with_state(self.include_snapshots)
350 .with_compression(self.compression);
351 let (subscription_id, mut msg_rx) = match self
352 .deltas_client
353 .subscribe(self.extractor_id.clone(), subscription_options)
354 .await
355 {
356 Ok(result) => result,
357 Err(e) => return Err((e.into(), Some(end_rx))),
358 };
359
360 let result = async {
361 info!("Waiting for deltas...");
362 let mut warned = false;
363 let mut first_msg = loop {
364 let msg = select! {
365 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
366 deltas_result
367 .map_err(|_| {
368 SynchronizerError::Timeout(format!(
369 "First deltas took longer than {t}s to arrive",
370 t = self.timeout
371 ))
372 })?
373 .ok_or_else(|| {
374 SynchronizerError::ConnectionError(
375 "Deltas channel closed before first message".to_string(),
376 )
377 })?
378 },
379 _ = &mut end_rx => {
380 info!("Received close signal while waiting for first deltas");
381 return Ok(());
382 }
383 };
384
385 let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
386 if let Some(current) = &self.last_synced_block {
387 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
388 if !warned {
389 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
390 warned = true;
391 }
392 continue
393 }
394 }
395 break msg;
396 };
397
398 self.filter_deltas(&mut first_msg);
399
400 let block = first_msg.get_block().clone();
402 info!(height = &block.number, "First deltas received");
403 let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
404 let deltas_msg = StateSyncMessage {
405 header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
406 snapshots: Default::default(),
407 deltas: Some(first_msg),
408 removed_components: Default::default(),
409 };
410
411 let msg = if !self.is_next_expected(&header) {
413 info!("Retrieving snapshot");
414 let snapshot = self
415 .get_snapshots::<Vec<&String>>(
416 BlockHeader::from_block(&block, false),
417 None,
418 )
419 .await?
420 .merge(deltas_msg);
421 let n_components = self.component_tracker.components.len();
422 let n_snapshots = snapshot.snapshots.states.len();
423 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
424 snapshot
425 } else {
426 deltas_msg
427 };
428 block_tx.send(Ok(msg)).await?;
429 self.last_synced_block = Some(header.clone());
430 loop {
431 select! {
432 deltas_opt = msg_rx.recv() => {
433 if let Some(mut deltas) = deltas_opt {
434 let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
435 debug!(block_number=?header.number, "Received delta message");
436
437 let (snapshots, removed_components) = {
438 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
441
442 let requiring_snapshot: Vec<_> = to_add
444 .iter()
445 .filter(|id| {
446 !self.component_tracker
447 .components
448 .contains_key(id.as_str())
449 })
450 .collect();
451 debug!(components=?requiring_snapshot, "SnapshotRequest");
452 self.component_tracker
453 .start_tracking(requiring_snapshot.as_slice())
454 .await?;
455
456 let snapshots = self
457 .get_snapshots(header.clone(), Some(requiring_snapshot))
458 .await?
459 .snapshots;
460
461 let removed_components = if !to_remove.is_empty() {
462 self.component_tracker.stop_tracking(&to_remove)
463 } else {
464 Default::default()
465 };
466
467 (snapshots, removed_components)
468 };
469
470 self.component_tracker.process_entrypoints(&deltas.dci_update);
472
473 self.filter_deltas(&mut deltas);
475 let n_changes = deltas.n_changes();
476
477 let next = StateSyncMessage {
479 header: header.clone(),
480 snapshots,
481 deltas: Some(deltas),
482 removed_components,
483 };
484 block_tx.send(Ok(next)).await?;
485 self.last_synced_block = Some(header.clone());
486
487 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
488 } else {
489 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
490 }
491 },
492 _ = &mut end_rx => {
493 info!("Received close signal during state_sync");
494 return Ok(());
495 }
496 }
497 }
498 }.await;
499
500 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
502 let _ = self
504 .deltas_client
505 .unsubscribe(subscription_id)
506 .await
507 .map_err(|err| {
508 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
509 });
510
511 match result {
514 Ok(()) => Ok(()), Err(e) => {
516 Err((e, Some(end_rx)))
520 }
521 }
522 }
523
524 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
525 if let Some(block) = self.last_synced_block.as_ref() {
526 return incoming.parent_hash == block.hash;
527 }
528 false
529 }
530 fn filter_deltas(&self, deltas: &mut BlockChanges) {
531 deltas.filter_by_component(|id| {
532 self.component_tracker
533 .components
534 .contains_key(id)
535 });
536 deltas.filter_by_contract(|id| {
537 self.component_tracker
538 .contracts
539 .contains(id)
540 });
541 }
542}
543
544#[async_trait]
545impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
546where
547 R: RPCClient + Clone + Send + Sync + 'static,
548 D: DeltasClient + Clone + Send + Sync + 'static,
549{
550 async fn initialize(&mut self) -> SyncResult<()> {
551 info!("Retrieving relevant protocol components");
552 self.component_tracker
553 .initialise_components()
554 .await?;
555 info!(
556 n_components = self.component_tracker.components.len(),
557 n_contracts = self.component_tracker.contracts.len(),
558 "Finished retrieving components",
559 );
560
561 Ok(())
562 }
563
564 async fn start(
565 mut self,
566 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
567 let (mut tx, rx) = channel(15);
568 let (end_tx, end_rx) = oneshot::channel::<()>();
569
570 let jh = tokio::spawn(async move {
571 let mut retry_count = 0;
572 let mut current_end_rx = end_rx;
573 let mut final_error = None;
574
575 while retry_count < self.max_retries {
576 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
577
578 let res = self
579 .state_sync(&mut tx, current_end_rx)
580 .await;
581 match res {
582 Ok(()) => {
583 info!(
584 extractor_id=%&self.extractor_id,
585 retry_count,
586 "State synchronization exited cleanly"
587 );
588 return;
589 }
590 Err((e, maybe_end_rx)) => {
591 warn!(
592 extractor_id=%&self.extractor_id,
593 retry_count,
594 error=%e,
595 "State synchronization errored!"
596 );
597
598 if let Some(recovered_end_rx) = maybe_end_rx {
600 current_end_rx = recovered_end_rx;
601
602 if let SynchronizerError::ConnectionClosed = e {
603 error!(
605 "Websocket connection closed. State synchronization exiting."
606 );
607 let _ = tx.send(Err(e)).await;
608 return;
609 } else {
610 final_error = Some(e);
612 }
613 } else {
614 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
616 return;
617 }
618 }
619 }
620 sleep(self.retry_cooldown).await;
621 retry_count += 1;
622 }
623 if let Some(e) = final_error {
624 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
625 let _ = tx.send(Err(e)).await;
626 }
627 });
628
629 let handle = SynchronizerTaskHandle::new(jh, end_tx);
630 (handle, rx)
631 }
632}
633
634#[cfg(test)]
635mod test {
636 use std::{collections::HashSet, sync::Arc};
655
656 use test_log::test;
657 use tycho_common::dto::{
658 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
659 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
660 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
661 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
662 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
663 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
664 };
665 use uuid::Uuid;
666
667 use super::*;
668 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
669
670 struct ArcRPCClient<T>(Arc<T>);
672
673 impl<T> Clone for ArcRPCClient<T> {
675 fn clone(&self) -> Self {
676 ArcRPCClient(self.0.clone())
677 }
678 }
679
680 #[async_trait]
681 impl<T> RPCClient for ArcRPCClient<T>
682 where
683 T: RPCClient + Sync + Send + 'static,
684 {
685 async fn get_tokens(
686 &self,
687 request: &TokensRequestBody,
688 ) -> Result<TokensRequestResponse, RPCError> {
689 self.0.get_tokens(request).await
690 }
691
692 async fn get_contract_state(
693 &self,
694 request: &StateRequestBody,
695 ) -> Result<StateRequestResponse, RPCError> {
696 self.0.get_contract_state(request).await
697 }
698
699 async fn get_protocol_components(
700 &self,
701 request: &ProtocolComponentsRequestBody,
702 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
703 self.0
704 .get_protocol_components(request)
705 .await
706 }
707
708 async fn get_protocol_states(
709 &self,
710 request: &ProtocolStateRequestBody,
711 ) -> Result<ProtocolStateRequestResponse, RPCError> {
712 self.0
713 .get_protocol_states(request)
714 .await
715 }
716
717 async fn get_protocol_systems(
718 &self,
719 request: &ProtocolSystemsRequestBody,
720 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
721 self.0
722 .get_protocol_systems(request)
723 .await
724 }
725
726 async fn get_component_tvl(
727 &self,
728 request: &ComponentTvlRequestBody,
729 ) -> Result<ComponentTvlRequestResponse, RPCError> {
730 self.0.get_component_tvl(request).await
731 }
732
733 async fn get_traced_entry_points(
734 &self,
735 request: &TracedEntryPointRequestBody,
736 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
737 self.0
738 .get_traced_entry_points(request)
739 .await
740 }
741
742 async fn get_snapshots<'a>(
743 &self,
744 request: &SnapshotParameters<'a>,
745 chunk_size: Option<usize>,
746 concurrency: usize,
747 ) -> Result<Snapshot, RPCError> {
748 self.0
749 .get_snapshots(request, chunk_size, concurrency)
750 .await
751 }
752
753 fn compression(&self) -> bool {
754 self.0.compression()
755 }
756 }
757
758 struct ArcDeltasClient<T>(Arc<T>);
760
761 impl<T> Clone for ArcDeltasClient<T> {
763 fn clone(&self) -> Self {
764 ArcDeltasClient(self.0.clone())
765 }
766 }
767
768 #[async_trait]
769 impl<T> DeltasClient for ArcDeltasClient<T>
770 where
771 T: DeltasClient + Sync + Send + 'static,
772 {
773 async fn subscribe(
774 &self,
775 extractor_id: ExtractorIdentity,
776 options: SubscriptionOptions,
777 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
778 self.0
779 .subscribe(extractor_id, options)
780 .await
781 }
782
783 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
784 self.0
785 .unsubscribe(subscription_id)
786 .await
787 }
788
789 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
790 self.0.connect().await
791 }
792
793 async fn close(&self) -> Result<(), DeltasError> {
794 self.0.close().await
795 }
796 }
797
798 fn with_mocked_clients(
799 native: bool,
800 include_tvl: bool,
801 rpc_client: Option<MockRPCClient>,
802 deltas_client: Option<MockDeltasClient>,
803 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
804 {
805 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
806 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
807
808 ProtocolStateSynchronizer::new(
809 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
810 native,
811 ComponentFilter::with_tvl_range(50.0, 50.0),
812 1,
813 Duration::from_secs(0),
814 true,
815 include_tvl,
816 true, rpc_client,
818 deltas_client,
819 10_u64,
820 )
821 }
822
823 fn state_snapshot_native() -> ProtocolStateRequestResponse {
824 ProtocolStateRequestResponse {
825 states: vec![ResponseProtocolState {
826 component_id: "Component1".to_string(),
827 ..Default::default()
828 }],
829 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
830 }
831 }
832
833 fn make_mock_client() -> MockRPCClient {
834 let mut m = MockRPCClient::new();
835 m.expect_compression()
836 .return_const(false);
837 m
838 }
839
840 #[test(tokio::test)]
841 async fn test_get_snapshots_native() {
842 let header = BlockHeader::default();
843 let mut rpc = make_mock_client();
844 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
845
846 let component_clone = component.clone();
847 rpc.expect_get_snapshots()
848 .returning(move |_request, _chunk_size, _concurrency| {
849 Ok(Snapshot {
850 states: state_snapshot_native()
851 .states
852 .into_iter()
853 .map(|state| {
854 (
855 state.component_id.clone(),
856 ComponentWithState {
857 state,
858 component: component_clone.clone(),
859 entrypoints: vec![],
860 component_tvl: None,
861 },
862 )
863 })
864 .collect(),
865 vm_storage: HashMap::new(),
866 })
867 });
868
869 rpc.expect_get_traced_entry_points()
870 .returning(|_| {
871 Ok(TracedEntryPointRequestResponse {
872 traced_entry_points: HashMap::new(),
873 pagination: PaginationResponse::new(0, 20, 0),
874 })
875 });
876
877 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
878 state_sync
879 .component_tracker
880 .components
881 .insert("Component1".to_string(), component.clone());
882 let components_arg = ["Component1".to_string()];
883 let exp = StateSyncMessage {
884 header: header.clone(),
885 snapshots: Snapshot {
886 states: state_snapshot_native()
887 .states
888 .into_iter()
889 .map(|state| {
890 (
891 state.component_id.clone(),
892 ComponentWithState {
893 state,
894 component: component.clone(),
895 entrypoints: vec![],
896 component_tvl: None,
897 },
898 )
899 })
900 .collect(),
901 vm_storage: HashMap::new(),
902 },
903 deltas: None,
904 removed_components: Default::default(),
905 };
906
907 let snap = state_sync
908 .get_snapshots(header, Some(&components_arg))
909 .await
910 .expect("Retrieving snapshot failed");
911
912 assert_eq!(snap, exp);
913 }
914
915 #[test(tokio::test)]
916 async fn test_get_snapshots_native_with_tvl() {
917 let header = BlockHeader::default();
918 let mut rpc = make_mock_client();
919 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
920
921 let component_clone = component.clone();
922 rpc.expect_get_snapshots()
923 .returning(move |_request, _chunk_size, _concurrency| {
924 Ok(Snapshot {
925 states: state_snapshot_native()
926 .states
927 .into_iter()
928 .map(|state| {
929 (
930 state.component_id.clone(),
931 ComponentWithState {
932 state,
933 component: component_clone.clone(),
934 component_tvl: Some(100.0),
935 entrypoints: vec![],
936 },
937 )
938 })
939 .collect(),
940 vm_storage: HashMap::new(),
941 })
942 });
943
944 rpc.expect_get_traced_entry_points()
945 .returning(|_| {
946 Ok(TracedEntryPointRequestResponse {
947 traced_entry_points: HashMap::new(),
948 pagination: PaginationResponse::new(0, 20, 0),
949 })
950 });
951
952 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
953 state_sync
954 .component_tracker
955 .components
956 .insert("Component1".to_string(), component.clone());
957 let components_arg = ["Component1".to_string()];
958 let exp = StateSyncMessage {
959 header: header.clone(),
960 snapshots: Snapshot {
961 states: state_snapshot_native()
962 .states
963 .into_iter()
964 .map(|state| {
965 (
966 state.component_id.clone(),
967 ComponentWithState {
968 state,
969 component: component.clone(),
970 component_tvl: Some(100.0),
971 entrypoints: vec![],
972 },
973 )
974 })
975 .collect(),
976 vm_storage: HashMap::new(),
977 },
978 deltas: None,
979 removed_components: Default::default(),
980 };
981
982 let snap = state_sync
983 .get_snapshots(header, Some(&components_arg))
984 .await
985 .expect("Retrieving snapshot failed");
986
987 assert_eq!(snap, exp);
988 }
989
990 fn state_snapshot_vm() -> StateRequestResponse {
991 StateRequestResponse {
992 accounts: vec![
993 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
994 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
995 ],
996 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
997 }
998 }
999
1000 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1001 TracedEntryPointRequestResponse {
1002 traced_entry_points: HashMap::from([(
1003 "Component1".to_string(),
1004 vec![(
1005 EntryPointWithTracingParams {
1006 entry_point: EntryPoint {
1007 external_id: "entrypoint_a".to_string(),
1008 target: Bytes::from("0x0badc0ffee"),
1009 signature: "sig()".to_string(),
1010 },
1011 params: TracingParams::RPCTracer(RPCTracerParams {
1012 caller: Some(Bytes::from("0x0badc0ffee")),
1013 calldata: Bytes::from("0x0badc0ffee"),
1014 state_overrides: None,
1015 prune_addresses: None,
1016 }),
1017 },
1018 TracingResult {
1019 retriggers: HashSet::from([(
1020 Bytes::from("0x0badc0ffee"),
1021 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1022 )]),
1023 accessed_slots: HashMap::from([(
1024 Bytes::from("0x0badc0ffee"),
1025 HashSet::from([Bytes::from("0xbadbeef0")]),
1026 )]),
1027 },
1028 )],
1029 )]),
1030 pagination: PaginationResponse::new(0, 20, 0),
1031 }
1032 }
1033
1034 #[test(tokio::test)]
1035 async fn test_get_snapshots_vm() {
1036 let header = BlockHeader::default();
1037 let mut rpc = make_mock_client();
1038
1039 let traced_ep_response = traced_entry_point_response();
1040 rpc.expect_get_snapshots()
1041 .returning(move |_request, _chunk_size, _concurrency| {
1042 let vm_storage_accounts = state_snapshot_vm();
1043 Ok(Snapshot {
1044 states: [(
1045 "Component1".to_string(),
1046 ComponentWithState {
1047 state: ResponseProtocolState {
1048 component_id: "Component1".to_string(),
1049 ..Default::default()
1050 },
1051 component: ProtocolComponent {
1052 id: "Component1".to_string(),
1053 contract_ids: vec![
1054 Bytes::from("0x0badc0ffee"),
1055 Bytes::from("0xbabe42"),
1056 ],
1057 ..Default::default()
1058 },
1059 component_tvl: None,
1060 entrypoints: traced_ep_response
1061 .traced_entry_points
1062 .get("Component1")
1063 .cloned()
1064 .unwrap_or_default(),
1065 },
1066 )]
1067 .into_iter()
1068 .collect(),
1069 vm_storage: vm_storage_accounts
1070 .accounts
1071 .into_iter()
1072 .map(|state| (state.address.clone(), state))
1073 .collect(),
1074 })
1075 });
1076
1077 rpc.expect_get_traced_entry_points()
1078 .returning(|_| Ok(traced_entry_point_response()));
1079
1080 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1081 let component = ProtocolComponent {
1082 id: "Component1".to_string(),
1083 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1084 ..Default::default()
1085 };
1086 state_sync
1087 .component_tracker
1088 .components
1089 .insert("Component1".to_string(), component.clone());
1090 let components_arg = ["Component1".to_string()];
1091 let exp = StateSyncMessage {
1092 header: header.clone(),
1093 snapshots: Snapshot {
1094 states: [(
1095 component.id.clone(),
1096 ComponentWithState {
1097 state: ResponseProtocolState {
1098 component_id: "Component1".to_string(),
1099 ..Default::default()
1100 },
1101 component: component.clone(),
1102 component_tvl: None,
1103 entrypoints: vec![(
1104 EntryPointWithTracingParams {
1105 entry_point: EntryPoint {
1106 external_id: "entrypoint_a".to_string(),
1107 target: Bytes::from("0x0badc0ffee"),
1108 signature: "sig()".to_string(),
1109 },
1110 params: TracingParams::RPCTracer(RPCTracerParams {
1111 caller: Some(Bytes::from("0x0badc0ffee")),
1112 calldata: Bytes::from("0x0badc0ffee"),
1113 state_overrides: None,
1114 prune_addresses: None,
1115 }),
1116 },
1117 TracingResult {
1118 retriggers: HashSet::from([(
1119 Bytes::from("0x0badc0ffee"),
1120 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1121 )]),
1122 accessed_slots: HashMap::from([(
1123 Bytes::from("0x0badc0ffee"),
1124 HashSet::from([Bytes::from("0xbadbeef0")]),
1125 )]),
1126 },
1127 )],
1128 },
1129 )]
1130 .into_iter()
1131 .collect(),
1132 vm_storage: state_snapshot_vm()
1133 .accounts
1134 .into_iter()
1135 .map(|state| (state.address.clone(), state))
1136 .collect(),
1137 },
1138 deltas: None,
1139 removed_components: Default::default(),
1140 };
1141
1142 let snap = state_sync
1143 .get_snapshots(header, Some(&components_arg))
1144 .await
1145 .expect("Retrieving snapshot failed");
1146
1147 assert_eq!(snap, exp);
1148 }
1149
1150 #[test(tokio::test)]
1151 async fn test_get_snapshots_vm_with_tvl() {
1152 let header = BlockHeader::default();
1153 let mut rpc = make_mock_client();
1154 let component = ProtocolComponent {
1155 id: "Component1".to_string(),
1156 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1157 ..Default::default()
1158 };
1159
1160 let component_clone = component.clone();
1161 rpc.expect_get_snapshots()
1162 .returning(move |_request, _chunk_size, _concurrency| {
1163 let vm_storage_accounts = state_snapshot_vm();
1164 Ok(Snapshot {
1165 states: [(
1166 "Component1".to_string(),
1167 ComponentWithState {
1168 state: ResponseProtocolState {
1169 component_id: "Component1".to_string(),
1170 ..Default::default()
1171 },
1172 component: component_clone.clone(),
1173 component_tvl: Some(100.0),
1174 entrypoints: vec![],
1175 },
1176 )]
1177 .into_iter()
1178 .collect(),
1179 vm_storage: vm_storage_accounts
1180 .accounts
1181 .into_iter()
1182 .map(|state| (state.address.clone(), state))
1183 .collect(),
1184 })
1185 });
1186
1187 rpc.expect_get_traced_entry_points()
1188 .returning(|_| {
1189 Ok(TracedEntryPointRequestResponse {
1190 traced_entry_points: HashMap::new(),
1191 pagination: PaginationResponse::new(0, 20, 0),
1192 })
1193 });
1194
1195 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1196 state_sync
1197 .component_tracker
1198 .components
1199 .insert("Component1".to_string(), component.clone());
1200 let components_arg = ["Component1".to_string()];
1201 let exp = StateSyncMessage {
1202 header: header.clone(),
1203 snapshots: Snapshot {
1204 states: [(
1205 component.id.clone(),
1206 ComponentWithState {
1207 state: ResponseProtocolState {
1208 component_id: "Component1".to_string(),
1209 ..Default::default()
1210 },
1211 component: component.clone(),
1212 component_tvl: Some(100.0),
1213 entrypoints: vec![],
1214 },
1215 )]
1216 .into_iter()
1217 .collect(),
1218 vm_storage: state_snapshot_vm()
1219 .accounts
1220 .into_iter()
1221 .map(|state| (state.address.clone(), state))
1222 .collect(),
1223 },
1224 deltas: None,
1225 removed_components: Default::default(),
1226 };
1227
1228 let snap = state_sync
1229 .get_snapshots(header, Some(&components_arg))
1230 .await
1231 .expect("Retrieving snapshot failed");
1232
1233 assert_eq!(snap, exp);
1234 }
1235
1236 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1237 let mut rpc_client = make_mock_client();
1238 rpc_client
1241 .expect_get_protocol_components()
1242 .with(mockall::predicate::function(
1243 move |request_params: &ProtocolComponentsRequestBody| {
1244 if let Some(ids) = request_params.component_ids.as_ref() {
1245 ids.contains(&"Component3".to_string())
1246 } else {
1247 false
1248 }
1249 },
1250 ))
1251 .returning(|_| {
1252 Ok(ProtocolComponentRequestResponse {
1254 protocol_components: vec![
1255 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1257 ],
1258 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1259 })
1260 });
1261 rpc_client
1263 .expect_get_snapshots()
1264 .withf(
1265 |request: &SnapshotParameters,
1266 _chunk_size: &Option<usize>,
1267 _concurrency: &usize| {
1268 request
1269 .components
1270 .contains_key("Component3")
1271 },
1272 )
1273 .returning(|_request, _chunk_size, _concurrency| {
1274 Ok(Snapshot {
1275 states: [(
1276 "Component3".to_string(),
1277 ComponentWithState {
1278 state: ResponseProtocolState {
1279 component_id: "Component3".to_string(),
1280 ..Default::default()
1281 },
1282 component: ProtocolComponent {
1283 id: "Component3".to_string(),
1284 ..Default::default()
1285 },
1286 component_tvl: Some(1000.0),
1287 entrypoints: vec![],
1288 },
1289 )]
1290 .into_iter()
1291 .collect(),
1292 vm_storage: HashMap::new(),
1293 })
1294 });
1295
1296 rpc_client
1298 .expect_get_protocol_components()
1299 .returning(|_| {
1300 Ok(ProtocolComponentRequestResponse {
1302 protocol_components: vec![
1303 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1305 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1307 ],
1309 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1310 })
1311 });
1312
1313 rpc_client
1314 .expect_get_snapshots()
1315 .returning(|_request, _chunk_size, _concurrency| {
1316 Ok(Snapshot {
1317 states: [
1318 (
1319 "Component1".to_string(),
1320 ComponentWithState {
1321 state: ResponseProtocolState {
1322 component_id: "Component1".to_string(),
1323 ..Default::default()
1324 },
1325 component: ProtocolComponent {
1326 id: "Component1".to_string(),
1327 ..Default::default()
1328 },
1329 component_tvl: Some(100.0),
1330 entrypoints: vec![],
1331 },
1332 ),
1333 (
1334 "Component2".to_string(),
1335 ComponentWithState {
1336 state: ResponseProtocolState {
1337 component_id: "Component2".to_string(),
1338 ..Default::default()
1339 },
1340 component: ProtocolComponent {
1341 id: "Component2".to_string(),
1342 ..Default::default()
1343 },
1344 component_tvl: Some(0.0),
1345 entrypoints: vec![],
1346 },
1347 ),
1348 ]
1349 .into_iter()
1350 .collect(),
1351 vm_storage: HashMap::new(),
1352 })
1353 });
1354
1355 rpc_client
1357 .expect_get_traced_entry_points()
1358 .returning(|_| {
1359 Ok(TracedEntryPointRequestResponse {
1360 traced_entry_points: HashMap::new(),
1361 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1362 })
1363 });
1364
1365 let mut deltas_client = MockDeltasClient::new();
1367 let (tx, rx) = channel(1);
1368 deltas_client
1369 .expect_subscribe()
1370 .return_once(move |_, _| {
1371 Ok((Uuid::default(), rx))
1373 });
1374
1375 deltas_client
1377 .expect_unsubscribe()
1378 .return_once(|_| Ok(()));
1379
1380 (rpc_client, deltas_client, tx)
1381 }
1382
1383 #[test(tokio::test)]
1390 async fn test_state_sync() {
1391 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1392 let deltas = [
1393 BlockChanges {
1394 extractor: "uniswap-v2".to_string(),
1395 chain: Chain::Ethereum,
1396 block: Block {
1397 number: 1,
1398 hash: Bytes::from("0x01"),
1399 parent_hash: Bytes::from("0x00"),
1400 chain: Chain::Ethereum,
1401 ts: Default::default(),
1402 },
1403 revert: false,
1404 dci_update: DCIUpdate {
1405 new_entrypoints: HashMap::from([(
1406 "Component1".to_string(),
1407 HashSet::from([EntryPoint {
1408 external_id: "entrypoint_a".to_string(),
1409 target: Bytes::from("0x0badc0ffee"),
1410 signature: "sig()".to_string(),
1411 }]),
1412 )]),
1413 new_entrypoint_params: HashMap::from([(
1414 "entrypoint_a".to_string(),
1415 HashSet::from([(
1416 TracingParams::RPCTracer(RPCTracerParams {
1417 caller: Some(Bytes::from("0x0badc0ffee")),
1418 calldata: Bytes::from("0x0badc0ffee"),
1419 state_overrides: None,
1420 prune_addresses: None,
1421 }),
1422 Some("Component1".to_string()),
1423 )]),
1424 )]),
1425 trace_results: HashMap::from([(
1426 "entrypoint_a".to_string(),
1427 TracingResult {
1428 retriggers: HashSet::from([(
1429 Bytes::from("0x0badc0ffee"),
1430 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1431 )]),
1432 accessed_slots: HashMap::from([(
1433 Bytes::from("0x0badc0ffee"),
1434 HashSet::from([Bytes::from("0xbadbeef0")]),
1435 )]),
1436 },
1437 )]),
1438 },
1439 ..Default::default()
1440 },
1441 BlockChanges {
1442 extractor: "uniswap-v2".to_string(),
1443 chain: Chain::Ethereum,
1444 block: Block {
1445 number: 2,
1446 hash: Bytes::from("0x02"),
1447 parent_hash: Bytes::from("0x01"),
1448 chain: Chain::Ethereum,
1449 ts: Default::default(),
1450 },
1451 revert: false,
1452 component_tvl: [
1453 ("Component1".to_string(), 100.0),
1454 ("Component2".to_string(), 0.0),
1455 ("Component3".to_string(), 1000.0),
1456 ]
1457 .into_iter()
1458 .collect(),
1459 ..Default::default()
1460 },
1461 ];
1462 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1463 state_sync
1464 .initialize()
1465 .await
1466 .expect("Init failed");
1467
1468 let (handle, mut rx) = state_sync.start().await;
1470 let (jh, close_tx) = handle.split();
1471 tx.send(deltas[0].clone())
1472 .await
1473 .expect("deltas channel msg 0 closed!");
1474 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1475 .await
1476 .expect("waiting for first state msg timed out!")
1477 .expect("state sync block sender closed!");
1478 tx.send(deltas[1].clone())
1479 .await
1480 .expect("deltas channel msg 1 closed!");
1481 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1482 .await
1483 .expect("waiting for second state msg timed out!")
1484 .expect("state sync block sender closed!");
1485 let _ = close_tx.send(());
1486 jh.await
1487 .expect("state sync task panicked!");
1488
1489 let exp1 = StateSyncMessage {
1491 header: BlockHeader {
1492 number: 1,
1493 hash: Bytes::from("0x01"),
1494 parent_hash: Bytes::from("0x00"),
1495 revert: false,
1496 ..Default::default()
1497 },
1498 snapshots: Snapshot {
1499 states: [
1500 (
1501 "Component1".to_string(),
1502 ComponentWithState {
1503 state: ResponseProtocolState {
1504 component_id: "Component1".to_string(),
1505 ..Default::default()
1506 },
1507 component: ProtocolComponent {
1508 id: "Component1".to_string(),
1509 ..Default::default()
1510 },
1511 component_tvl: Some(100.0),
1512 entrypoints: vec![],
1513 },
1514 ),
1515 (
1516 "Component2".to_string(),
1517 ComponentWithState {
1518 state: ResponseProtocolState {
1519 component_id: "Component2".to_string(),
1520 ..Default::default()
1521 },
1522 component: ProtocolComponent {
1523 id: "Component2".to_string(),
1524 ..Default::default()
1525 },
1526 component_tvl: Some(0.0),
1527 entrypoints: vec![],
1528 },
1529 ),
1530 ]
1531 .into_iter()
1532 .collect(),
1533 vm_storage: HashMap::new(),
1534 },
1535 deltas: Some(deltas[0].clone()),
1536 removed_components: Default::default(),
1537 };
1538
1539 let exp2 = StateSyncMessage {
1540 header: BlockHeader {
1541 number: 2,
1542 hash: Bytes::from("0x02"),
1543 parent_hash: Bytes::from("0x01"),
1544 revert: false,
1545 ..Default::default()
1546 },
1547 snapshots: Snapshot {
1548 states: [
1549 (
1551 "Component3".to_string(),
1552 ComponentWithState {
1553 state: ResponseProtocolState {
1554 component_id: "Component3".to_string(),
1555 ..Default::default()
1556 },
1557 component: ProtocolComponent {
1558 id: "Component3".to_string(),
1559 ..Default::default()
1560 },
1561 component_tvl: Some(1000.0),
1562 entrypoints: vec![],
1563 },
1564 ),
1565 ]
1566 .into_iter()
1567 .collect(),
1568 vm_storage: HashMap::new(),
1569 },
1570 deltas: Some(BlockChanges {
1573 extractor: "uniswap-v2".to_string(),
1574 chain: Chain::Ethereum,
1575 block: Block {
1576 number: 2,
1577 hash: Bytes::from("0x02"),
1578 parent_hash: Bytes::from("0x01"),
1579 chain: Chain::Ethereum,
1580 ts: Default::default(),
1581 },
1582 revert: false,
1583 component_tvl: [
1584 ("Component1".to_string(), 100.0),
1586 ("Component3".to_string(), 1000.0),
1587 ]
1588 .into_iter()
1589 .collect(),
1590 ..Default::default()
1591 }),
1592 removed_components: [(
1594 "Component2".to_string(),
1595 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1596 )]
1597 .into_iter()
1598 .collect(),
1599 };
1600 assert_eq!(first_msg.unwrap(), exp1);
1601 assert_eq!(second_msg.unwrap(), exp2);
1602 }
1603
1604 #[test(tokio::test)]
1605 async fn test_state_sync_with_tvl_range() {
1606 let remove_tvl_threshold = 5.0;
1608 let add_tvl_threshold = 7.0;
1609
1610 let mut rpc_client = make_mock_client();
1611 let mut deltas_client = MockDeltasClient::new();
1612
1613 rpc_client
1614 .expect_get_protocol_components()
1615 .with(mockall::predicate::function(
1616 move |request_params: &ProtocolComponentsRequestBody| {
1617 if let Some(ids) = request_params.component_ids.as_ref() {
1618 ids.contains(&"Component3".to_string())
1619 } else {
1620 false
1621 }
1622 },
1623 ))
1624 .returning(|_| {
1625 Ok(ProtocolComponentRequestResponse {
1626 protocol_components: vec![ProtocolComponent {
1627 id: "Component3".to_string(),
1628 ..Default::default()
1629 }],
1630 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1631 })
1632 });
1633 rpc_client
1635 .expect_get_snapshots()
1636 .withf(
1637 |request: &SnapshotParameters,
1638 _chunk_size: &Option<usize>,
1639 _concurrency: &usize| {
1640 request
1641 .components
1642 .contains_key("Component3")
1643 },
1644 )
1645 .returning(|_request, _chunk_size, _concurrency| {
1646 Ok(Snapshot {
1647 states: [(
1648 "Component3".to_string(),
1649 ComponentWithState {
1650 state: ResponseProtocolState {
1651 component_id: "Component3".to_string(),
1652 ..Default::default()
1653 },
1654 component: ProtocolComponent {
1655 id: "Component3".to_string(),
1656 ..Default::default()
1657 },
1658 component_tvl: Some(10.0),
1659 entrypoints: vec![],
1660 },
1661 )]
1662 .into_iter()
1663 .collect(),
1664 vm_storage: HashMap::new(),
1665 })
1666 });
1667
1668 rpc_client
1670 .expect_get_protocol_components()
1671 .returning(|_| {
1672 Ok(ProtocolComponentRequestResponse {
1673 protocol_components: vec![
1674 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1675 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1676 ],
1677 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1678 })
1679 });
1680
1681 rpc_client
1683 .expect_get_snapshots()
1684 .returning(|_request, _chunk_size, _concurrency| {
1685 Ok(Snapshot {
1686 states: [
1687 (
1688 "Component1".to_string(),
1689 ComponentWithState {
1690 state: ResponseProtocolState {
1691 component_id: "Component1".to_string(),
1692 ..Default::default()
1693 },
1694 component: ProtocolComponent {
1695 id: "Component1".to_string(),
1696 ..Default::default()
1697 },
1698 component_tvl: Some(6.0),
1699 entrypoints: vec![],
1700 },
1701 ),
1702 (
1703 "Component2".to_string(),
1704 ComponentWithState {
1705 state: ResponseProtocolState {
1706 component_id: "Component2".to_string(),
1707 ..Default::default()
1708 },
1709 component: ProtocolComponent {
1710 id: "Component2".to_string(),
1711 ..Default::default()
1712 },
1713 component_tvl: Some(2.0),
1714 entrypoints: vec![],
1715 },
1716 ),
1717 ]
1718 .into_iter()
1719 .collect(),
1720 vm_storage: HashMap::new(),
1721 })
1722 });
1723
1724 rpc_client
1726 .expect_get_traced_entry_points()
1727 .returning(|_| {
1728 Ok(TracedEntryPointRequestResponse {
1729 traced_entry_points: HashMap::new(),
1730 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1731 })
1732 });
1733
1734 let (tx, rx) = channel(1);
1735 deltas_client
1736 .expect_subscribe()
1737 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1738
1739 deltas_client
1741 .expect_unsubscribe()
1742 .return_once(|_| Ok(()));
1743
1744 let mut state_sync = ProtocolStateSynchronizer::new(
1745 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1746 true,
1747 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1748 1,
1749 Duration::from_secs(0),
1750 true,
1751 true,
1752 true,
1753 ArcRPCClient(Arc::new(rpc_client)),
1754 ArcDeltasClient(Arc::new(deltas_client)),
1755 10_u64,
1756 );
1757 state_sync
1758 .initialize()
1759 .await
1760 .expect("Init failed");
1761
1762 let deltas = [
1764 BlockChanges {
1765 extractor: "uniswap-v2".to_string(),
1766 chain: Chain::Ethereum,
1767 block: Block {
1768 number: 1,
1769 hash: Bytes::from("0x01"),
1770 parent_hash: Bytes::from("0x00"),
1771 chain: Chain::Ethereum,
1772 ts: Default::default(),
1773 },
1774 revert: false,
1775 ..Default::default()
1776 },
1777 BlockChanges {
1778 extractor: "uniswap-v2".to_string(),
1779 chain: Chain::Ethereum,
1780 block: Block {
1781 number: 2,
1782 hash: Bytes::from("0x02"),
1783 parent_hash: Bytes::from("0x01"),
1784 chain: Chain::Ethereum,
1785 ts: Default::default(),
1786 },
1787 revert: false,
1788 component_tvl: [
1789 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1793 .into_iter()
1794 .collect(),
1795 ..Default::default()
1796 },
1797 ];
1798
1799 let (handle, mut rx) = state_sync.start().await;
1800 let (jh, close_tx) = handle.split();
1801
1802 tx.send(deltas[0].clone())
1804 .await
1805 .expect("deltas channel msg 0 closed!");
1806
1807 let _ = timeout(Duration::from_millis(100), rx.recv())
1809 .await
1810 .expect("waiting for first state msg timed out!")
1811 .expect("state sync block sender closed!");
1812
1813 tx.send(deltas[1].clone())
1815 .await
1816 .expect("deltas channel msg 1 closed!");
1817 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1818 .await
1819 .expect("waiting for second state msg timed out!")
1820 .expect("state sync block sender closed!")
1821 .expect("no error");
1822
1823 let _ = close_tx.send(());
1824 jh.await
1825 .expect("state sync task panicked!");
1826
1827 let expected_second_msg = StateSyncMessage {
1828 header: BlockHeader {
1829 number: 2,
1830 hash: Bytes::from("0x02"),
1831 parent_hash: Bytes::from("0x01"),
1832 revert: false,
1833 ..Default::default()
1834 },
1835 snapshots: Snapshot {
1836 states: [(
1837 "Component3".to_string(),
1838 ComponentWithState {
1839 state: ResponseProtocolState {
1840 component_id: "Component3".to_string(),
1841 ..Default::default()
1842 },
1843 component: ProtocolComponent {
1844 id: "Component3".to_string(),
1845 ..Default::default()
1846 },
1847 component_tvl: Some(10.0),
1848 entrypoints: vec![], },
1850 )]
1851 .into_iter()
1852 .collect(),
1853 vm_storage: HashMap::new(),
1854 },
1855 deltas: Some(BlockChanges {
1856 extractor: "uniswap-v2".to_string(),
1857 chain: Chain::Ethereum,
1858 block: Block {
1859 number: 2,
1860 hash: Bytes::from("0x02"),
1861 parent_hash: Bytes::from("0x01"),
1862 chain: Chain::Ethereum,
1863 ts: Default::default(),
1864 },
1865 revert: false,
1866 component_tvl: [
1867 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
1870 .into_iter()
1871 .collect(),
1872 ..Default::default()
1873 }),
1874 removed_components: [(
1875 "Component2".to_string(),
1876 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1877 )]
1878 .into_iter()
1879 .collect(),
1880 };
1881
1882 assert_eq!(second_msg, expected_second_msg);
1883 }
1884
1885 #[test(tokio::test)]
1886 async fn test_public_close_api_functionality() {
1887 let mut rpc_client = make_mock_client();
1894 let mut deltas_client = MockDeltasClient::new();
1895
1896 rpc_client
1898 .expect_get_protocol_components()
1899 .returning(|_| {
1900 Ok(ProtocolComponentRequestResponse {
1901 protocol_components: vec![],
1902 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1903 })
1904 });
1905
1906 let (_tx, rx) = channel(1);
1908 deltas_client
1909 .expect_subscribe()
1910 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1911
1912 deltas_client
1914 .expect_unsubscribe()
1915 .return_once(|_| Ok(()));
1916
1917 let mut state_sync = ProtocolStateSynchronizer::new(
1918 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1919 true,
1920 ComponentFilter::with_tvl_range(0.0, 0.0),
1921 5, Duration::from_secs(0),
1923 true,
1924 false,
1925 true,
1926 ArcRPCClient(Arc::new(rpc_client)),
1927 ArcDeltasClient(Arc::new(deltas_client)),
1928 10000_u64, );
1930
1931 state_sync
1932 .initialize()
1933 .await
1934 .expect("Init should succeed");
1935
1936 let (handle, _rx) = state_sync.start().await;
1938 let (jh, close_tx) = handle.split();
1939
1940 tokio::time::sleep(Duration::from_millis(100)).await;
1942
1943 close_tx
1945 .send(())
1946 .expect("Should be able to send close signal");
1947 jh.await.expect("Task should not panic");
1949 }
1950
1951 #[test(tokio::test)]
1952 async fn test_cleanup_runs_when_state_sync_processing_errors() {
1953 let mut rpc_client = make_mock_client();
1958 let mut deltas_client = MockDeltasClient::new();
1959
1960 rpc_client
1962 .expect_get_protocol_components()
1963 .returning(|_| {
1964 Ok(ProtocolComponentRequestResponse {
1965 protocol_components: vec![],
1966 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1967 })
1968 });
1969
1970 rpc_client
1972 .expect_get_protocol_states()
1973 .returning(|_| {
1974 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1975 });
1976
1977 let (tx, rx) = channel(10);
1979 deltas_client
1980 .expect_subscribe()
1981 .return_once(move |_, _| {
1982 let delta = BlockChanges {
1984 extractor: "test".to_string(),
1985 chain: Chain::Ethereum,
1986 block: Block {
1987 hash: Bytes::from("0x0123"),
1988 number: 1,
1989 parent_hash: Bytes::from("0x0000"),
1990 chain: Chain::Ethereum,
1991 ts: chrono::DateTime::from_timestamp(1234567890, 0)
1992 .unwrap()
1993 .naive_utc(),
1994 },
1995 revert: false,
1996 new_protocol_components: [(
1998 "new_component".to_string(),
1999 ProtocolComponent {
2000 id: "new_component".to_string(),
2001 protocol_system: "test_protocol".to_string(),
2002 protocol_type_name: "test".to_string(),
2003 chain: Chain::Ethereum,
2004 tokens: vec![Bytes::from("0x0badc0ffee")],
2005 contract_ids: vec![Bytes::from("0x0badc0ffee")],
2006 static_attributes: Default::default(),
2007 creation_tx: Default::default(),
2008 created_at: Default::default(),
2009 change: Default::default(),
2010 },
2011 )]
2012 .into_iter()
2013 .collect(),
2014 component_tvl: [("new_component".to_string(), 100.0)]
2015 .into_iter()
2016 .collect(),
2017 ..Default::default()
2018 };
2019
2020 tokio::spawn(async move {
2021 let _ = tx.send(delta).await;
2022 });
2024
2025 Ok((Uuid::default(), rx))
2026 });
2027
2028 deltas_client
2030 .expect_unsubscribe()
2031 .return_once(|_| Ok(()));
2032
2033 let mut state_sync = ProtocolStateSynchronizer::new(
2034 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2035 true,
2036 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2038 Duration::from_secs(0),
2039 true,
2040 false,
2041 true,
2042 ArcRPCClient(Arc::new(rpc_client)),
2043 ArcDeltasClient(Arc::new(deltas_client)),
2044 5000_u64,
2045 );
2046
2047 state_sync
2048 .initialize()
2049 .await
2050 .expect("Init should succeed");
2051
2052 state_sync.last_synced_block = Some(BlockHeader {
2054 hash: Bytes::from("0x0badc0ffee"),
2055 number: 42,
2056 parent_hash: Bytes::from("0xbadbeef0"),
2057 revert: false,
2058 timestamp: 123456789,
2059 });
2060
2061 let (mut block_tx, _block_rx) = channel(10);
2063
2064 let (_end_tx, end_rx) = oneshot::channel::<()>();
2066 let result = state_sync
2067 .state_sync(&mut block_tx, end_rx)
2068 .await;
2069 assert!(result.is_err(), "state_sync should have errored during processing");
2071
2072 }
2075
2076 #[test(tokio::test)]
2077 async fn test_close_signal_while_waiting_for_first_deltas() {
2078 let mut rpc_client = make_mock_client();
2082 let mut deltas_client = MockDeltasClient::new();
2083
2084 rpc_client
2085 .expect_get_protocol_components()
2086 .returning(|_| {
2087 Ok(ProtocolComponentRequestResponse {
2088 protocol_components: vec![],
2089 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2090 })
2091 });
2092
2093 let (_tx, rx) = channel(1);
2094 deltas_client
2095 .expect_subscribe()
2096 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2097
2098 deltas_client
2099 .expect_unsubscribe()
2100 .return_once(|_| Ok(()));
2101
2102 let mut state_sync = ProtocolStateSynchronizer::new(
2103 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2104 true,
2105 ComponentFilter::with_tvl_range(0.0, 0.0),
2106 1,
2107 Duration::from_secs(0),
2108 true,
2109 true,
2110 false,
2111 ArcRPCClient(Arc::new(rpc_client)),
2112 ArcDeltasClient(Arc::new(deltas_client)),
2113 10000_u64,
2114 );
2115
2116 state_sync
2117 .initialize()
2118 .await
2119 .expect("Init should succeed");
2120
2121 let (mut block_tx, _block_rx) = channel(10);
2122 let (end_tx, end_rx) = oneshot::channel::<()>();
2123
2124 let state_sync_handle = tokio::spawn(async move {
2126 state_sync
2127 .state_sync(&mut block_tx, end_rx)
2128 .await
2129 });
2130
2131 tokio::time::sleep(Duration::from_millis(100)).await;
2133
2134 let _ = end_tx.send(());
2136
2137 let result = state_sync_handle
2139 .await
2140 .expect("Task should not panic");
2141 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2142
2143 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2144 }
2145
2146 #[test(tokio::test)]
2147 async fn test_close_signal_during_main_processing_loop() {
2148 let mut rpc_client = make_mock_client();
2154 let mut deltas_client = MockDeltasClient::new();
2155
2156 rpc_client
2158 .expect_get_protocol_components()
2159 .returning(|_| {
2160 Ok(ProtocolComponentRequestResponse {
2161 protocol_components: vec![],
2162 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2163 })
2164 });
2165
2166 rpc_client
2168 .expect_get_protocol_states()
2169 .returning(|_| {
2170 Ok(ProtocolStateRequestResponse {
2171 states: vec![],
2172 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2173 })
2174 });
2175
2176 rpc_client
2177 .expect_get_component_tvl()
2178 .returning(|_| {
2179 Ok(ComponentTvlRequestResponse {
2180 tvl: HashMap::new(),
2181 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2182 })
2183 });
2184
2185 rpc_client
2186 .expect_get_traced_entry_points()
2187 .returning(|_| {
2188 Ok(TracedEntryPointRequestResponse {
2189 traced_entry_points: HashMap::new(),
2190 pagination: PaginationResponse::new(0, 20, 0),
2191 })
2192 });
2193
2194 let (tx, rx) = channel(10);
2196 deltas_client
2197 .expect_subscribe()
2198 .return_once(move |_, _| {
2199 let first_delta = BlockChanges {
2201 extractor: "test".to_string(),
2202 chain: Chain::Ethereum,
2203 block: Block {
2204 hash: Bytes::from("0x0123"),
2205 number: 1,
2206 parent_hash: Bytes::from("0x0000"),
2207 chain: Chain::Ethereum,
2208 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2209 .unwrap()
2210 .naive_utc(),
2211 },
2212 revert: false,
2213 ..Default::default()
2214 };
2215
2216 tokio::spawn(async move {
2217 let _ = tx.send(first_delta).await;
2218 tokio::time::sleep(Duration::from_secs(30)).await;
2221 });
2222
2223 Ok((Uuid::default(), rx))
2224 });
2225
2226 deltas_client
2227 .expect_unsubscribe()
2228 .return_once(|_| Ok(()));
2229
2230 let mut state_sync = ProtocolStateSynchronizer::new(
2231 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2232 true,
2233 ComponentFilter::with_tvl_range(0.0, 1000.0),
2234 1,
2235 Duration::from_secs(0),
2236 true,
2237 false,
2238 true,
2239 ArcRPCClient(Arc::new(rpc_client)),
2240 ArcDeltasClient(Arc::new(deltas_client)),
2241 10000_u64,
2242 );
2243
2244 state_sync
2245 .initialize()
2246 .await
2247 .expect("Init should succeed");
2248
2249 let (mut block_tx, mut block_rx) = channel(10);
2250 let (end_tx, end_rx) = oneshot::channel::<()>();
2251
2252 let state_sync_handle = tokio::spawn(async move {
2254 state_sync
2255 .state_sync(&mut block_tx, end_rx)
2256 .await
2257 });
2258
2259 let first_snapshot = block_rx
2261 .recv()
2262 .await
2263 .expect("Should receive first snapshot")
2264 .expect("Synchronizer error");
2265 assert!(
2266 !first_snapshot
2267 .snapshots
2268 .states
2269 .is_empty() ||
2270 first_snapshot.deltas.is_some()
2271 );
2272 let _ = end_tx.send(());
2274
2275 let result = state_sync_handle
2277 .await
2278 .expect("Task should not panic");
2279 assert!(
2280 result.is_ok(),
2281 "state_sync should exit cleanly when closed after first message: {result:?}"
2282 );
2283 }
2284
2285 #[test(tokio::test)]
2286 async fn test_max_retries_exceeded_error_propagation() {
2287 let mut rpc_client = make_mock_client();
2291 let mut deltas_client = MockDeltasClient::new();
2292
2293 rpc_client
2295 .expect_get_protocol_components()
2296 .returning(|_| {
2297 Ok(ProtocolComponentRequestResponse {
2298 protocol_components: vec![],
2299 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2300 })
2301 });
2302
2303 deltas_client
2306 .expect_subscribe()
2307 .returning(|_, _| {
2308 Err(DeltasError::NotConnected)
2310 });
2311
2312 deltas_client
2314 .expect_unsubscribe()
2315 .returning(|_| Ok(()))
2316 .times(0..=5);
2317
2318 let mut state_sync = ProtocolStateSynchronizer::new(
2320 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2321 true,
2322 ComponentFilter::with_tvl_range(0.0, 1000.0),
2323 2, Duration::from_millis(10), true,
2326 false,
2327 true,
2328 ArcRPCClient(Arc::new(rpc_client)),
2329 ArcDeltasClient(Arc::new(deltas_client)),
2330 1000_u64,
2331 );
2332
2333 state_sync
2334 .initialize()
2335 .await
2336 .expect("Init should succeed");
2337
2338 let (handle, mut rx) = state_sync.start().await;
2340 let (jh, _close_tx) = handle.split();
2341
2342 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2343 .await
2344 .expect("responsds in time")
2345 .expect("channel open");
2346
2347 if let Err(err) = res {
2349 assert!(
2350 matches!(err, SynchronizerError::ConnectionClosed),
2351 "Expected ConnectionClosed error, got: {:?}",
2352 err
2353 );
2354 } else {
2355 panic!("Expected an error")
2356 }
2357
2358 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2360 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2361 }
2362
2363 #[test(tokio::test)]
2364 async fn test_is_next_expected() {
2365 let mut state_sync = with_mocked_clients(true, false, None, None);
2369
2370 let incoming_header = BlockHeader {
2372 number: 100,
2373 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2374 parent_hash: Bytes::from(
2375 "0x0000000000000000000000000000000000000000000000000000000000000000",
2376 ),
2377 revert: false,
2378 timestamp: 123456789,
2379 };
2380 assert!(
2381 !state_sync.is_next_expected(&incoming_header),
2382 "Should return false when no previous block is set"
2383 );
2384
2385 let previous_header = BlockHeader {
2387 number: 99,
2388 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2389 parent_hash: Bytes::from(
2390 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2391 ),
2392 revert: false,
2393 timestamp: 123456788,
2394 };
2395 state_sync.last_synced_block = Some(previous_header.clone());
2396
2397 assert!(
2398 state_sync.is_next_expected(&incoming_header),
2399 "Should return true when incoming parent_hash matches previous hash"
2400 );
2401
2402 let non_matching_header = BlockHeader {
2404 number: 100,
2405 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2406 parent_hash: Bytes::from(
2407 "0x1111111111111111111111111111111111111111111111111111111111111111",
2408 ), revert: false,
2410 timestamp: 123456789,
2411 };
2412 assert!(
2413 !state_sync.is_next_expected(&non_matching_header),
2414 "Should return false when incoming parent_hash doesn't match previous hash"
2415 );
2416 }
2417
2418 #[test(tokio::test)]
2419 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2420 let mut rpc_client = make_mock_client();
2424 let mut deltas_client = MockDeltasClient::new();
2425
2426 rpc_client
2428 .expect_get_protocol_components()
2429 .returning(|_| {
2430 Ok(ProtocolComponentRequestResponse {
2431 protocol_components: vec![ProtocolComponent {
2432 id: "Component1".to_string(),
2433 ..Default::default()
2434 }],
2435 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2436 })
2437 });
2438
2439 let (tx, rx) = channel(10);
2441 deltas_client
2442 .expect_subscribe()
2443 .return_once(move |_, _| {
2444 let expected_next_delta = BlockChanges {
2445 extractor: "uniswap-v2".to_string(),
2446 chain: Chain::Ethereum,
2447 block: Block {
2448 hash: Bytes::from(
2449 "0x0000000000000000000000000000000000000000000000000000000000000002",
2450 ), number: 2,
2452 parent_hash: Bytes::from(
2453 "0x0000000000000000000000000000000000000000000000000000000000000001",
2454 ), chain: Chain::Ethereum,
2456 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2457 .unwrap()
2458 .naive_utc(),
2459 },
2460 revert: false,
2461 ..Default::default()
2462 };
2463
2464 tokio::spawn(async move {
2465 let _ = tx.send(expected_next_delta).await;
2466 });
2467
2468 Ok((Uuid::default(), rx))
2469 });
2470
2471 deltas_client
2472 .expect_unsubscribe()
2473 .return_once(|_| Ok(()));
2474
2475 let mut state_sync = ProtocolStateSynchronizer::new(
2476 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2477 true,
2478 ComponentFilter::with_tvl_range(0.0, 1000.0),
2479 1,
2480 Duration::from_secs(0),
2481 true, false,
2483 true,
2484 ArcRPCClient(Arc::new(rpc_client)),
2485 ArcDeltasClient(Arc::new(deltas_client)),
2486 10000_u64,
2487 );
2488
2489 state_sync
2491 .initialize()
2492 .await
2493 .expect("Init should succeed");
2494
2495 state_sync.last_synced_block = Some(BlockHeader {
2497 number: 1,
2498 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2500 "0x0000000000000000000000000000000000000000000000000000000000000000",
2501 ),
2502 revert: false,
2503 timestamp: 123456789,
2504 });
2505
2506 let (mut block_tx, mut block_rx) = channel(10);
2507 let (end_tx, end_rx) = oneshot::channel::<()>();
2508
2509 let state_sync_handle = tokio::spawn(async move {
2511 state_sync
2512 .state_sync(&mut block_tx, end_rx)
2513 .await
2514 });
2515
2516 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2518 .await
2519 .expect("Should receive message within timeout")
2520 .expect("Channel should be open")
2521 .expect("Should not be an error");
2522
2523 let _ = end_tx.send(());
2525
2526 let _ = state_sync_handle
2528 .await
2529 .expect("Task should not panic");
2530
2531 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2534 assert!(
2535 result_msg.snapshots.states.is_empty(),
2536 "Should not contain snapshots when next expected block is received"
2537 );
2538
2539 if let Some(deltas) = &result_msg.deltas {
2541 assert_eq!(deltas.block.number, 2);
2542 assert_eq!(
2543 deltas.block.hash,
2544 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2545 );
2546 assert_eq!(
2547 deltas.block.parent_hash,
2548 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2549 );
2550 }
2551 }
2552
2553 #[test(tokio::test)]
2554 async fn test_skip_previously_processed_messages() {
2555 let mut rpc_client = make_mock_client();
2559 let mut deltas_client = MockDeltasClient::new();
2560
2561 rpc_client
2563 .expect_get_protocol_components()
2564 .returning(|_| {
2565 Ok(ProtocolComponentRequestResponse {
2566 protocol_components: vec![ProtocolComponent {
2567 id: "Component1".to_string(),
2568 ..Default::default()
2569 }],
2570 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2571 })
2572 });
2573
2574 rpc_client
2576 .expect_get_protocol_states()
2577 .returning(|_| {
2578 Ok(ProtocolStateRequestResponse {
2579 states: vec![ResponseProtocolState {
2580 component_id: "Component1".to_string(),
2581 ..Default::default()
2582 }],
2583 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2584 })
2585 });
2586
2587 rpc_client
2588 .expect_get_component_tvl()
2589 .returning(|_| {
2590 Ok(ComponentTvlRequestResponse {
2591 tvl: [("Component1".to_string(), 100.0)]
2592 .into_iter()
2593 .collect(),
2594 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2595 })
2596 });
2597
2598 rpc_client
2599 .expect_get_traced_entry_points()
2600 .returning(|_| {
2601 Ok(TracedEntryPointRequestResponse {
2602 traced_entry_points: HashMap::new(),
2603 pagination: PaginationResponse::new(0, 20, 0),
2604 })
2605 });
2606
2607 let (tx, rx) = channel(10);
2609 deltas_client
2610 .expect_subscribe()
2611 .return_once(move |_, _| {
2612 let old_messages = vec![
2614 BlockChanges {
2615 extractor: "uniswap-v2".to_string(),
2616 chain: Chain::Ethereum,
2617 block: Block {
2618 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2619 number: 3,
2620 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2621 chain: Chain::Ethereum,
2622 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2623 },
2624 revert: false,
2625 ..Default::default()
2626 },
2627 BlockChanges {
2628 extractor: "uniswap-v2".to_string(),
2629 chain: Chain::Ethereum,
2630 block: Block {
2631 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2632 number: 4,
2633 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2634 chain: Chain::Ethereum,
2635 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2636 },
2637 revert: false,
2638 ..Default::default()
2639 },
2640 BlockChanges {
2641 extractor: "uniswap-v2".to_string(),
2642 chain: Chain::Ethereum,
2643 block: Block {
2644 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2645 number: 5,
2646 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2647 chain: Chain::Ethereum,
2648 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2649 },
2650 revert: false,
2651 ..Default::default()
2652 },
2653 BlockChanges {
2655 extractor: "uniswap-v2".to_string(),
2656 chain: Chain::Ethereum,
2657 block: Block {
2658 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2659 number: 6,
2660 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2661 chain: Chain::Ethereum,
2662 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2663 },
2664 revert: false,
2665 ..Default::default()
2666 },
2667 ];
2668
2669 tokio::spawn(async move {
2670 for message in old_messages {
2671 let _ = tx.send(message).await;
2672 tokio::time::sleep(Duration::from_millis(10)).await;
2673 }
2674 });
2675
2676 Ok((Uuid::default(), rx))
2677 });
2678
2679 deltas_client
2680 .expect_unsubscribe()
2681 .return_once(|_| Ok(()));
2682
2683 let mut state_sync = ProtocolStateSynchronizer::new(
2684 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2685 true,
2686 ComponentFilter::with_tvl_range(0.0, 1000.0),
2687 1,
2688 Duration::from_secs(0),
2689 true,
2690 true,
2691 true,
2692 ArcRPCClient(Arc::new(rpc_client)),
2693 ArcDeltasClient(Arc::new(deltas_client)),
2694 10000_u64,
2695 );
2696
2697 state_sync
2699 .initialize()
2700 .await
2701 .expect("Init should succeed");
2702
2703 state_sync.last_synced_block = Some(BlockHeader {
2704 number: 5,
2705 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2706 parent_hash: Bytes::from(
2707 "0x0000000000000000000000000000000000000000000000000000000000000004",
2708 ),
2709 revert: false,
2710 timestamp: 1234567892,
2711 });
2712
2713 let (mut block_tx, mut block_rx) = channel(10);
2714 let (end_tx, end_rx) = oneshot::channel::<()>();
2715
2716 let state_sync_handle = tokio::spawn(async move {
2718 state_sync
2719 .state_sync(&mut block_tx, end_rx)
2720 .await
2721 });
2722
2723 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2725 .await
2726 .expect("Should receive message within timeout")
2727 .expect("Channel should be open")
2728 .expect("Should not be an error");
2729
2730 let _ = end_tx.send(());
2732
2733 let _ = state_sync_handle
2735 .await
2736 .expect("Task should not panic");
2737
2738 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2740 if let Some(deltas) = &result_msg.deltas {
2741 assert_eq!(
2742 deltas.block.number, 6,
2743 "Should only process block 6, skipping earlier blocks"
2744 );
2745 assert_eq!(
2746 deltas.block.hash,
2747 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2748 );
2749 }
2750
2751 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2754 Err(_) => {
2755 }
2757 Ok(Some(Err(_))) => {
2758 }
2760 Ok(Some(Ok(_))) => {
2761 panic!("Should not receive additional messages - old blocks should be skipped");
2762 }
2763 Ok(None) => {
2764 }
2766 }
2767 }
2768}