1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, Chain, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59}
60
61pub type SyncResult<T> = Result<T, SynchronizerError>;
62
63impl<T> From<SendError<T>> for SynchronizerError {
64 fn from(err: SendError<T>) -> Self {
65 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
66 }
67}
68
69impl From<DeltasError> for SynchronizerError {
70 fn from(err: DeltasError) -> Self {
71 match err {
72 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
73 _ => SynchronizerError::ConnectionError(err.to_string()),
74 }
75 }
76}
77
78pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
79 extractor_id: ExtractorIdentity,
80 retrieve_balances: bool,
81 rpc_client: R,
82 deltas_client: D,
83 max_retries: u64,
84 retry_cooldown: Duration,
85 include_snapshots: bool,
86 component_tracker: ComponentTracker<R>,
87 last_synced_block: Option<BlockHeader>,
88 timeout: u64,
89 include_tvl: bool,
90}
91
92#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
93pub struct ComponentWithState {
94 pub state: ResponseProtocolState,
95 pub component: ProtocolComponent,
96 pub component_tvl: Option<f64>,
97 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
98}
99
100#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
101pub struct Snapshot {
102 pub states: HashMap<String, ComponentWithState>,
103 pub vm_storage: HashMap<Bytes, ResponseAccount>,
104}
105
106impl Snapshot {
107 fn extend(&mut self, other: Snapshot) {
108 self.states.extend(other.states);
109 self.vm_storage.extend(other.vm_storage);
110 }
111
112 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
113 &self.states
114 }
115
116 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
117 &self.vm_storage
118 }
119}
120
121#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
122pub struct StateSyncMessage<H>
123where
124 H: HeaderLike,
125{
126 pub header: H,
128 pub snapshots: Snapshot,
130 pub deltas: Option<BlockChanges>,
134 pub removed_components: HashMap<String, ProtocolComponent>,
136}
137
138impl<H> StateSyncMessage<H>
139where
140 H: HeaderLike,
141{
142 pub fn merge(mut self, other: Self) -> Self {
143 self.removed_components
145 .retain(|k, _| !other.snapshots.states.contains_key(k));
146 self.snapshots
147 .states
148 .retain(|k, _| !other.removed_components.contains_key(k));
149
150 self.snapshots.extend(other.snapshots);
151 let deltas = match (self.deltas, other.deltas) {
152 (Some(l), Some(r)) => Some(l.merge(r)),
153 (None, Some(r)) => Some(r),
154 (Some(l), None) => Some(l),
155 (None, None) => None,
156 };
157 self.removed_components
158 .extend(other.removed_components);
159 Self {
160 header: other.header,
161 snapshots: self.snapshots,
162 deltas,
163 removed_components: self.removed_components,
164 }
165 }
166}
167
168pub struct SynchronizerTaskHandle {
173 join_handle: JoinHandle<()>,
174 close_tx: oneshot::Sender<()>,
175}
176
177impl SynchronizerTaskHandle {
186 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
187 Self { join_handle, close_tx }
188 }
189
190 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
196 (self.join_handle, self.close_tx)
197 }
198}
199
200#[async_trait]
201pub trait StateSynchronizer: Send + Sync + 'static {
202 async fn initialize(&mut self) -> SyncResult<()>;
203 async fn start(
206 mut self,
207 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
208}
209
210impl<R, D> ProtocolStateSynchronizer<R, D>
211where
212 R: RPCClient + Clone + Send + Sync + 'static,
215 D: DeltasClient + Clone + Send + Sync + 'static,
216{
217 #[allow(clippy::too_many_arguments)]
219 pub fn new(
220 extractor_id: ExtractorIdentity,
221 retrieve_balances: bool,
222 component_filter: ComponentFilter,
223 max_retries: u64,
224 retry_cooldown: Duration,
225 include_snapshots: bool,
226 include_tvl: bool,
227 rpc_client: R,
228 deltas_client: D,
229 timeout: u64,
230 ) -> Self {
231 Self {
232 extractor_id: extractor_id.clone(),
233 retrieve_balances,
234 rpc_client: rpc_client.clone(),
235 include_snapshots,
236 deltas_client,
237 component_tracker: ComponentTracker::new(
238 extractor_id.chain,
239 extractor_id.name.as_str(),
240 component_filter,
241 rpc_client,
242 ),
243 max_retries,
244 retry_cooldown,
245 last_synced_block: None,
246 timeout,
247 include_tvl,
248 }
249 }
250
251 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
253 &mut self,
254 header: BlockHeader,
255 ids: Option<I>,
256 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
257 if !self.include_snapshots {
258 return Ok(StateSyncMessage { header, ..Default::default() });
259 }
260
261 let component_ids: Vec<_> = match ids {
263 Some(ids) => ids.into_iter().cloned().collect(),
264 None => self
265 .component_tracker
266 .get_tracked_component_ids(),
267 };
268
269 if component_ids.is_empty() {
270 return Ok(StateSyncMessage { header, ..Default::default() });
271 }
272
273 let entrypoints_result = if self.extractor_id.chain == Chain::Ethereum {
276 let result = self
277 .rpc_client
278 .get_traced_entry_points_paginated(
279 self.extractor_id.chain,
280 &self.extractor_id.name,
281 &component_ids,
282 100,
283 4,
284 )
285 .await?;
286 self.component_tracker
287 .process_entrypoints(&result.clone().into());
288 result.traced_entry_points.clone()
289 } else {
290 HashMap::new()
291 };
292
293 let contract_ids: Vec<Bytes> = self
295 .component_tracker
296 .get_contracts_by_component(&component_ids)
297 .into_iter()
298 .collect();
299
300 let request = SnapshotParameters::new(
301 self.extractor_id.chain,
302 &self.extractor_id.name,
303 &self.component_tracker.components,
304 &contract_ids,
305 header.number,
306 )
307 .entrypoints(&entrypoints_result)
308 .include_balances(self.retrieve_balances)
309 .include_tvl(self.include_tvl);
310 let snapshot_response = self
311 .rpc_client
312 .get_snapshots(&request, 100, 4)
313 .await?;
314
315 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
316 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
317
318 Ok(StateSyncMessage {
319 header,
320 snapshots: snapshot_response,
321 deltas: None,
322 removed_components: HashMap::new(),
323 })
324 }
325
326 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
339 async fn state_sync(
340 &mut self,
341 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
342 mut end_rx: oneshot::Receiver<()>,
343 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
344 let subscription_options = SubscriptionOptions::new().with_state(self.include_snapshots);
346 let (subscription_id, mut msg_rx) = match self
347 .deltas_client
348 .subscribe(self.extractor_id.clone(), subscription_options)
349 .await
350 {
351 Ok(result) => result,
352 Err(e) => return Err((e.into(), Some(end_rx))),
353 };
354
355 let result = async {
356 info!("Waiting for deltas...");
357 let mut warned = false;
358 let mut first_msg = loop {
359 let msg = select! {
360 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
361 deltas_result
362 .map_err(|_| {
363 SynchronizerError::Timeout(format!(
364 "First deltas took longer than {t}s to arrive",
365 t = self.timeout
366 ))
367 })?
368 .ok_or_else(|| {
369 SynchronizerError::ConnectionError(
370 "Deltas channel closed before first message".to_string(),
371 )
372 })?
373 },
374 _ = &mut end_rx => {
375 info!("Received close signal while waiting for first deltas");
376 return Ok(());
377 }
378 };
379
380 let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
381 if let Some(current) = &self.last_synced_block {
382 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
383 if !warned {
384 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
385 warned = true;
386 }
387 continue
388 }
389 }
390 break msg;
391 };
392
393 self.filter_deltas(&mut first_msg);
394
395 let block = first_msg.get_block().clone();
397 info!(height = &block.number, "First deltas received");
398 let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
399 let deltas_msg = StateSyncMessage {
400 header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
401 snapshots: Default::default(),
402 deltas: Some(first_msg),
403 removed_components: Default::default(),
404 };
405
406 let msg = if !self.is_next_expected(&header) {
408 info!("Retrieving snapshot");
409 let snapshot = self
410 .get_snapshots::<Vec<&String>>(
411 BlockHeader::from_block(&block, false),
412 None,
413 )
414 .await?
415 .merge(deltas_msg);
416 let n_components = self.component_tracker.components.len();
417 let n_snapshots = snapshot.snapshots.states.len();
418 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
419 snapshot
420 } else {
421 deltas_msg
422 };
423 block_tx.send(Ok(msg)).await?;
424 self.last_synced_block = Some(header.clone());
425 loop {
426 select! {
427 deltas_opt = msg_rx.recv() => {
428 if let Some(mut deltas) = deltas_opt {
429 let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
430 debug!(block_number=?header.number, "Received delta message");
431
432 let (snapshots, removed_components) = {
433 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
436
437 let requiring_snapshot: Vec<_> = to_add
439 .iter()
440 .filter(|id| {
441 !self.component_tracker
442 .components
443 .contains_key(id.as_str())
444 })
445 .collect();
446 debug!(components=?requiring_snapshot, "SnapshotRequest");
447 self.component_tracker
448 .start_tracking(requiring_snapshot.as_slice())
449 .await?;
450
451 let snapshots = self
452 .get_snapshots(header.clone(), Some(requiring_snapshot))
453 .await?
454 .snapshots;
455
456 let removed_components = if !to_remove.is_empty() {
457 self.component_tracker.stop_tracking(&to_remove)
458 } else {
459 Default::default()
460 };
461
462 (snapshots, removed_components)
463 };
464
465 self.component_tracker.process_entrypoints(&deltas.dci_update);
467
468 self.filter_deltas(&mut deltas);
470 let n_changes = deltas.n_changes();
471
472 let next = StateSyncMessage {
474 header: header.clone(),
475 snapshots,
476 deltas: Some(deltas),
477 removed_components,
478 };
479 block_tx.send(Ok(next)).await?;
480 self.last_synced_block = Some(header.clone());
481
482 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
483 } else {
484 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
485 }
486 },
487 _ = &mut end_rx => {
488 info!("Received close signal during state_sync");
489 return Ok(());
490 }
491 }
492 }
493 }.await;
494
495 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
497 let _ = self
499 .deltas_client
500 .unsubscribe(subscription_id)
501 .await
502 .map_err(|err| {
503 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
504 });
505
506 match result {
509 Ok(()) => Ok(()), Err(e) => {
511 Err((e, Some(end_rx)))
515 }
516 }
517 }
518
519 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
520 if let Some(block) = self.last_synced_block.as_ref() {
521 return incoming.parent_hash == block.hash;
522 }
523 false
524 }
525 fn filter_deltas(&self, deltas: &mut BlockChanges) {
526 deltas.filter_by_component(|id| {
527 self.component_tracker
528 .components
529 .contains_key(id)
530 });
531 deltas.filter_by_contract(|id| {
532 self.component_tracker
533 .contracts
534 .contains(id)
535 });
536 }
537}
538
539#[async_trait]
540impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
541where
542 R: RPCClient + Clone + Send + Sync + 'static,
543 D: DeltasClient + Clone + Send + Sync + 'static,
544{
545 async fn initialize(&mut self) -> SyncResult<()> {
546 info!("Retrieving relevant protocol components");
547 self.component_tracker
548 .initialise_components()
549 .await?;
550 info!(
551 n_components = self.component_tracker.components.len(),
552 n_contracts = self.component_tracker.contracts.len(),
553 "Finished retrieving components",
554 );
555
556 Ok(())
557 }
558
559 async fn start(
560 mut self,
561 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
562 let (mut tx, rx) = channel(15);
563 let (end_tx, end_rx) = oneshot::channel::<()>();
564
565 let jh = tokio::spawn(async move {
566 let mut retry_count = 0;
567 let mut current_end_rx = end_rx;
568 let mut final_error = None;
569
570 while retry_count < self.max_retries {
571 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
572
573 let res = self
574 .state_sync(&mut tx, current_end_rx)
575 .await;
576 match res {
577 Ok(()) => {
578 info!(
579 extractor_id=%&self.extractor_id,
580 retry_count,
581 "State synchronization exited cleanly"
582 );
583 return;
584 }
585 Err((e, maybe_end_rx)) => {
586 warn!(
587 extractor_id=%&self.extractor_id,
588 retry_count,
589 error=%e,
590 "State synchronization errored!"
591 );
592
593 if let Some(recovered_end_rx) = maybe_end_rx {
595 current_end_rx = recovered_end_rx;
596
597 if let SynchronizerError::ConnectionClosed = e {
598 error!(
600 "Websocket connection closed. State synchronization exiting."
601 );
602 let _ = tx.send(Err(e)).await;
603 return;
604 } else {
605 final_error = Some(e);
607 }
608 } else {
609 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
611 return;
612 }
613 }
614 }
615 sleep(self.retry_cooldown).await;
616 retry_count += 1;
617 }
618 if let Some(e) = final_error {
619 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
620 let _ = tx.send(Err(e)).await;
621 }
622 });
623
624 let handle = SynchronizerTaskHandle::new(jh, end_tx);
625 (handle, rx)
626 }
627}
628
629#[cfg(test)]
630mod test {
631 use std::{collections::HashSet, sync::Arc};
650
651 use test_log::test;
652 use tycho_common::dto::{
653 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
654 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
655 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
656 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
657 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
658 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
659 };
660 use uuid::Uuid;
661
662 use super::*;
663 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
664
665 struct ArcRPCClient<T>(Arc<T>);
667
668 impl<T> Clone for ArcRPCClient<T> {
670 fn clone(&self) -> Self {
671 ArcRPCClient(self.0.clone())
672 }
673 }
674
675 #[async_trait]
676 impl<T> RPCClient for ArcRPCClient<T>
677 where
678 T: RPCClient + Sync + Send + 'static,
679 {
680 async fn get_tokens(
681 &self,
682 request: &TokensRequestBody,
683 ) -> Result<TokensRequestResponse, RPCError> {
684 self.0.get_tokens(request).await
685 }
686
687 async fn get_contract_state(
688 &self,
689 request: &StateRequestBody,
690 ) -> Result<StateRequestResponse, RPCError> {
691 self.0.get_contract_state(request).await
692 }
693
694 async fn get_protocol_components(
695 &self,
696 request: &ProtocolComponentsRequestBody,
697 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
698 self.0
699 .get_protocol_components(request)
700 .await
701 }
702
703 async fn get_protocol_states(
704 &self,
705 request: &ProtocolStateRequestBody,
706 ) -> Result<ProtocolStateRequestResponse, RPCError> {
707 self.0
708 .get_protocol_states(request)
709 .await
710 }
711
712 async fn get_protocol_systems(
713 &self,
714 request: &ProtocolSystemsRequestBody,
715 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
716 self.0
717 .get_protocol_systems(request)
718 .await
719 }
720
721 async fn get_component_tvl(
722 &self,
723 request: &ComponentTvlRequestBody,
724 ) -> Result<ComponentTvlRequestResponse, RPCError> {
725 self.0.get_component_tvl(request).await
726 }
727
728 async fn get_traced_entry_points(
729 &self,
730 request: &TracedEntryPointRequestBody,
731 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
732 self.0
733 .get_traced_entry_points(request)
734 .await
735 }
736
737 async fn get_snapshots<'a>(
738 &self,
739 request: &SnapshotParameters<'a>,
740 chunk_size: usize,
741 concurrency: usize,
742 ) -> Result<Snapshot, RPCError> {
743 self.0
744 .get_snapshots(request, chunk_size, concurrency)
745 .await
746 }
747 }
748
749 struct ArcDeltasClient<T>(Arc<T>);
751
752 impl<T> Clone for ArcDeltasClient<T> {
754 fn clone(&self) -> Self {
755 ArcDeltasClient(self.0.clone())
756 }
757 }
758
759 #[async_trait]
760 impl<T> DeltasClient for ArcDeltasClient<T>
761 where
762 T: DeltasClient + Sync + Send + 'static,
763 {
764 async fn subscribe(
765 &self,
766 extractor_id: ExtractorIdentity,
767 options: SubscriptionOptions,
768 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
769 self.0
770 .subscribe(extractor_id, options)
771 .await
772 }
773
774 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
775 self.0
776 .unsubscribe(subscription_id)
777 .await
778 }
779
780 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
781 self.0.connect().await
782 }
783
784 async fn close(&self) -> Result<(), DeltasError> {
785 self.0.close().await
786 }
787 }
788
789 fn with_mocked_clients(
790 native: bool,
791 include_tvl: bool,
792 rpc_client: Option<MockRPCClient>,
793 deltas_client: Option<MockDeltasClient>,
794 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
795 {
796 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
797 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
798
799 ProtocolStateSynchronizer::new(
800 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
801 native,
802 ComponentFilter::with_tvl_range(50.0, 50.0),
803 1,
804 Duration::from_secs(0),
805 true,
806 include_tvl,
807 rpc_client,
808 deltas_client,
809 10_u64,
810 )
811 }
812
813 fn state_snapshot_native() -> ProtocolStateRequestResponse {
814 ProtocolStateRequestResponse {
815 states: vec![ResponseProtocolState {
816 component_id: "Component1".to_string(),
817 ..Default::default()
818 }],
819 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
820 }
821 }
822
823 #[test(tokio::test)]
824 async fn test_get_snapshots_native() {
825 let header = BlockHeader::default();
826 let mut rpc = MockRPCClient::new();
827 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
828
829 let component_clone = component.clone();
830 rpc.expect_get_snapshots()
831 .returning(move |_request, _chunk_size, _concurrency| {
832 Ok(Snapshot {
833 states: state_snapshot_native()
834 .states
835 .into_iter()
836 .map(|state| {
837 (
838 state.component_id.clone(),
839 ComponentWithState {
840 state,
841 component: component_clone.clone(),
842 entrypoints: vec![],
843 component_tvl: None,
844 },
845 )
846 })
847 .collect(),
848 vm_storage: HashMap::new(),
849 })
850 });
851
852 rpc.expect_get_traced_entry_points()
853 .returning(|_| {
854 Ok(TracedEntryPointRequestResponse {
855 traced_entry_points: HashMap::new(),
856 pagination: PaginationResponse::new(0, 20, 0),
857 })
858 });
859
860 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
861 state_sync
862 .component_tracker
863 .components
864 .insert("Component1".to_string(), component.clone());
865 let components_arg = ["Component1".to_string()];
866 let exp = StateSyncMessage {
867 header: header.clone(),
868 snapshots: Snapshot {
869 states: state_snapshot_native()
870 .states
871 .into_iter()
872 .map(|state| {
873 (
874 state.component_id.clone(),
875 ComponentWithState {
876 state,
877 component: component.clone(),
878 entrypoints: vec![],
879 component_tvl: None,
880 },
881 )
882 })
883 .collect(),
884 vm_storage: HashMap::new(),
885 },
886 deltas: None,
887 removed_components: Default::default(),
888 };
889
890 let snap = state_sync
891 .get_snapshots(header, Some(&components_arg))
892 .await
893 .expect("Retrieving snapshot failed");
894
895 assert_eq!(snap, exp);
896 }
897
898 #[test(tokio::test)]
899 async fn test_get_snapshots_native_with_tvl() {
900 let header = BlockHeader::default();
901 let mut rpc = MockRPCClient::new();
902 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
903
904 let component_clone = component.clone();
905 rpc.expect_get_snapshots()
906 .returning(move |_request, _chunk_size, _concurrency| {
907 Ok(Snapshot {
908 states: state_snapshot_native()
909 .states
910 .into_iter()
911 .map(|state| {
912 (
913 state.component_id.clone(),
914 ComponentWithState {
915 state,
916 component: component_clone.clone(),
917 component_tvl: Some(100.0),
918 entrypoints: vec![],
919 },
920 )
921 })
922 .collect(),
923 vm_storage: HashMap::new(),
924 })
925 });
926
927 rpc.expect_get_traced_entry_points()
928 .returning(|_| {
929 Ok(TracedEntryPointRequestResponse {
930 traced_entry_points: HashMap::new(),
931 pagination: PaginationResponse::new(0, 20, 0),
932 })
933 });
934
935 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
936 state_sync
937 .component_tracker
938 .components
939 .insert("Component1".to_string(), component.clone());
940 let components_arg = ["Component1".to_string()];
941 let exp = StateSyncMessage {
942 header: header.clone(),
943 snapshots: Snapshot {
944 states: state_snapshot_native()
945 .states
946 .into_iter()
947 .map(|state| {
948 (
949 state.component_id.clone(),
950 ComponentWithState {
951 state,
952 component: component.clone(),
953 component_tvl: Some(100.0),
954 entrypoints: vec![],
955 },
956 )
957 })
958 .collect(),
959 vm_storage: HashMap::new(),
960 },
961 deltas: None,
962 removed_components: Default::default(),
963 };
964
965 let snap = state_sync
966 .get_snapshots(header, Some(&components_arg))
967 .await
968 .expect("Retrieving snapshot failed");
969
970 assert_eq!(snap, exp);
971 }
972
973 fn state_snapshot_vm() -> StateRequestResponse {
974 StateRequestResponse {
975 accounts: vec![
976 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
977 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
978 ],
979 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
980 }
981 }
982
983 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
984 TracedEntryPointRequestResponse {
985 traced_entry_points: HashMap::from([(
986 "Component1".to_string(),
987 vec![(
988 EntryPointWithTracingParams {
989 entry_point: EntryPoint {
990 external_id: "entrypoint_a".to_string(),
991 target: Bytes::from("0x0badc0ffee"),
992 signature: "sig()".to_string(),
993 },
994 params: TracingParams::RPCTracer(RPCTracerParams {
995 caller: Some(Bytes::from("0x0badc0ffee")),
996 calldata: Bytes::from("0x0badc0ffee"),
997 state_overrides: None,
998 prune_addresses: None,
999 }),
1000 },
1001 TracingResult {
1002 retriggers: HashSet::from([(
1003 Bytes::from("0x0badc0ffee"),
1004 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1005 )]),
1006 accessed_slots: HashMap::from([(
1007 Bytes::from("0x0badc0ffee"),
1008 HashSet::from([Bytes::from("0xbadbeef0")]),
1009 )]),
1010 },
1011 )],
1012 )]),
1013 pagination: PaginationResponse::new(0, 20, 0),
1014 }
1015 }
1016
1017 #[test(tokio::test)]
1018 async fn test_get_snapshots_vm() {
1019 let header = BlockHeader::default();
1020 let mut rpc = MockRPCClient::new();
1021
1022 let traced_ep_response = traced_entry_point_response();
1023 rpc.expect_get_snapshots()
1024 .returning(move |_request, _chunk_size, _concurrency| {
1025 let vm_storage_accounts = state_snapshot_vm();
1026 Ok(Snapshot {
1027 states: [(
1028 "Component1".to_string(),
1029 ComponentWithState {
1030 state: ResponseProtocolState {
1031 component_id: "Component1".to_string(),
1032 ..Default::default()
1033 },
1034 component: ProtocolComponent {
1035 id: "Component1".to_string(),
1036 contract_ids: vec![
1037 Bytes::from("0x0badc0ffee"),
1038 Bytes::from("0xbabe42"),
1039 ],
1040 ..Default::default()
1041 },
1042 component_tvl: None,
1043 entrypoints: traced_ep_response
1044 .traced_entry_points
1045 .get("Component1")
1046 .cloned()
1047 .unwrap_or_default(),
1048 },
1049 )]
1050 .into_iter()
1051 .collect(),
1052 vm_storage: vm_storage_accounts
1053 .accounts
1054 .into_iter()
1055 .map(|state| (state.address.clone(), state))
1056 .collect(),
1057 })
1058 });
1059
1060 rpc.expect_get_traced_entry_points()
1061 .returning(|_| Ok(traced_entry_point_response()));
1062
1063 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1064 let component = ProtocolComponent {
1065 id: "Component1".to_string(),
1066 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1067 ..Default::default()
1068 };
1069 state_sync
1070 .component_tracker
1071 .components
1072 .insert("Component1".to_string(), component.clone());
1073 let components_arg = ["Component1".to_string()];
1074 let exp = StateSyncMessage {
1075 header: header.clone(),
1076 snapshots: Snapshot {
1077 states: [(
1078 component.id.clone(),
1079 ComponentWithState {
1080 state: ResponseProtocolState {
1081 component_id: "Component1".to_string(),
1082 ..Default::default()
1083 },
1084 component: component.clone(),
1085 component_tvl: None,
1086 entrypoints: vec![(
1087 EntryPointWithTracingParams {
1088 entry_point: EntryPoint {
1089 external_id: "entrypoint_a".to_string(),
1090 target: Bytes::from("0x0badc0ffee"),
1091 signature: "sig()".to_string(),
1092 },
1093 params: TracingParams::RPCTracer(RPCTracerParams {
1094 caller: Some(Bytes::from("0x0badc0ffee")),
1095 calldata: Bytes::from("0x0badc0ffee"),
1096 state_overrides: None,
1097 prune_addresses: None,
1098 }),
1099 },
1100 TracingResult {
1101 retriggers: HashSet::from([(
1102 Bytes::from("0x0badc0ffee"),
1103 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1104 )]),
1105 accessed_slots: HashMap::from([(
1106 Bytes::from("0x0badc0ffee"),
1107 HashSet::from([Bytes::from("0xbadbeef0")]),
1108 )]),
1109 },
1110 )],
1111 },
1112 )]
1113 .into_iter()
1114 .collect(),
1115 vm_storage: state_snapshot_vm()
1116 .accounts
1117 .into_iter()
1118 .map(|state| (state.address.clone(), state))
1119 .collect(),
1120 },
1121 deltas: None,
1122 removed_components: Default::default(),
1123 };
1124
1125 let snap = state_sync
1126 .get_snapshots(header, Some(&components_arg))
1127 .await
1128 .expect("Retrieving snapshot failed");
1129
1130 assert_eq!(snap, exp);
1131 }
1132
1133 #[test(tokio::test)]
1134 async fn test_get_snapshots_vm_with_tvl() {
1135 let header = BlockHeader::default();
1136 let mut rpc = MockRPCClient::new();
1137 let component = ProtocolComponent {
1138 id: "Component1".to_string(),
1139 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1140 ..Default::default()
1141 };
1142
1143 let component_clone = component.clone();
1144 rpc.expect_get_snapshots()
1145 .returning(move |_request, _chunk_size, _concurrency| {
1146 let vm_storage_accounts = state_snapshot_vm();
1147 Ok(Snapshot {
1148 states: [(
1149 "Component1".to_string(),
1150 ComponentWithState {
1151 state: ResponseProtocolState {
1152 component_id: "Component1".to_string(),
1153 ..Default::default()
1154 },
1155 component: component_clone.clone(),
1156 component_tvl: Some(100.0),
1157 entrypoints: vec![],
1158 },
1159 )]
1160 .into_iter()
1161 .collect(),
1162 vm_storage: vm_storage_accounts
1163 .accounts
1164 .into_iter()
1165 .map(|state| (state.address.clone(), state))
1166 .collect(),
1167 })
1168 });
1169
1170 rpc.expect_get_traced_entry_points()
1171 .returning(|_| {
1172 Ok(TracedEntryPointRequestResponse {
1173 traced_entry_points: HashMap::new(),
1174 pagination: PaginationResponse::new(0, 20, 0),
1175 })
1176 });
1177
1178 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1179 state_sync
1180 .component_tracker
1181 .components
1182 .insert("Component1".to_string(), component.clone());
1183 let components_arg = ["Component1".to_string()];
1184 let exp = StateSyncMessage {
1185 header: header.clone(),
1186 snapshots: Snapshot {
1187 states: [(
1188 component.id.clone(),
1189 ComponentWithState {
1190 state: ResponseProtocolState {
1191 component_id: "Component1".to_string(),
1192 ..Default::default()
1193 },
1194 component: component.clone(),
1195 component_tvl: Some(100.0),
1196 entrypoints: vec![],
1197 },
1198 )]
1199 .into_iter()
1200 .collect(),
1201 vm_storage: state_snapshot_vm()
1202 .accounts
1203 .into_iter()
1204 .map(|state| (state.address.clone(), state))
1205 .collect(),
1206 },
1207 deltas: None,
1208 removed_components: Default::default(),
1209 };
1210
1211 let snap = state_sync
1212 .get_snapshots(header, Some(&components_arg))
1213 .await
1214 .expect("Retrieving snapshot failed");
1215
1216 assert_eq!(snap, exp);
1217 }
1218
1219 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1220 let mut rpc_client = MockRPCClient::new();
1221 rpc_client
1224 .expect_get_protocol_components()
1225 .with(mockall::predicate::function(
1226 move |request_params: &ProtocolComponentsRequestBody| {
1227 if let Some(ids) = request_params.component_ids.as_ref() {
1228 ids.contains(&"Component3".to_string())
1229 } else {
1230 false
1231 }
1232 },
1233 ))
1234 .returning(|_| {
1235 Ok(ProtocolComponentRequestResponse {
1237 protocol_components: vec![
1238 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1240 ],
1241 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1242 })
1243 });
1244 rpc_client
1246 .expect_get_snapshots()
1247 .withf(|request: &SnapshotParameters, _chunk_size: &usize, _concurrency: &usize| {
1248 request
1249 .components
1250 .contains_key("Component3")
1251 })
1252 .returning(|_request, _chunk_size, _concurrency| {
1253 Ok(Snapshot {
1254 states: [(
1255 "Component3".to_string(),
1256 ComponentWithState {
1257 state: ResponseProtocolState {
1258 component_id: "Component3".to_string(),
1259 ..Default::default()
1260 },
1261 component: ProtocolComponent {
1262 id: "Component3".to_string(),
1263 ..Default::default()
1264 },
1265 component_tvl: Some(1000.0),
1266 entrypoints: vec![],
1267 },
1268 )]
1269 .into_iter()
1270 .collect(),
1271 vm_storage: HashMap::new(),
1272 })
1273 });
1274
1275 rpc_client
1277 .expect_get_protocol_components()
1278 .returning(|_| {
1279 Ok(ProtocolComponentRequestResponse {
1281 protocol_components: vec![
1282 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1284 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1286 ],
1288 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1289 })
1290 });
1291
1292 rpc_client
1293 .expect_get_snapshots()
1294 .returning(|_request, _chunk_size, _concurrency| {
1295 Ok(Snapshot {
1296 states: [
1297 (
1298 "Component1".to_string(),
1299 ComponentWithState {
1300 state: ResponseProtocolState {
1301 component_id: "Component1".to_string(),
1302 ..Default::default()
1303 },
1304 component: ProtocolComponent {
1305 id: "Component1".to_string(),
1306 ..Default::default()
1307 },
1308 component_tvl: Some(100.0),
1309 entrypoints: vec![],
1310 },
1311 ),
1312 (
1313 "Component2".to_string(),
1314 ComponentWithState {
1315 state: ResponseProtocolState {
1316 component_id: "Component2".to_string(),
1317 ..Default::default()
1318 },
1319 component: ProtocolComponent {
1320 id: "Component2".to_string(),
1321 ..Default::default()
1322 },
1323 component_tvl: Some(0.0),
1324 entrypoints: vec![],
1325 },
1326 ),
1327 ]
1328 .into_iter()
1329 .collect(),
1330 vm_storage: HashMap::new(),
1331 })
1332 });
1333
1334 rpc_client
1336 .expect_get_traced_entry_points()
1337 .returning(|_| {
1338 Ok(TracedEntryPointRequestResponse {
1339 traced_entry_points: HashMap::new(),
1340 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1341 })
1342 });
1343
1344 let mut deltas_client = MockDeltasClient::new();
1346 let (tx, rx) = channel(1);
1347 deltas_client
1348 .expect_subscribe()
1349 .return_once(move |_, _| {
1350 Ok((Uuid::default(), rx))
1352 });
1353
1354 deltas_client
1356 .expect_unsubscribe()
1357 .return_once(|_| Ok(()));
1358
1359 (rpc_client, deltas_client, tx)
1360 }
1361
1362 #[test(tokio::test)]
1369 async fn test_state_sync() {
1370 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1371 let deltas = [
1372 BlockChanges {
1373 extractor: "uniswap-v2".to_string(),
1374 chain: Chain::Ethereum,
1375 block: Block {
1376 number: 1,
1377 hash: Bytes::from("0x01"),
1378 parent_hash: Bytes::from("0x00"),
1379 chain: Chain::Ethereum,
1380 ts: Default::default(),
1381 },
1382 revert: false,
1383 dci_update: DCIUpdate {
1384 new_entrypoints: HashMap::from([(
1385 "Component1".to_string(),
1386 HashSet::from([EntryPoint {
1387 external_id: "entrypoint_a".to_string(),
1388 target: Bytes::from("0x0badc0ffee"),
1389 signature: "sig()".to_string(),
1390 }]),
1391 )]),
1392 new_entrypoint_params: HashMap::from([(
1393 "entrypoint_a".to_string(),
1394 HashSet::from([(
1395 TracingParams::RPCTracer(RPCTracerParams {
1396 caller: Some(Bytes::from("0x0badc0ffee")),
1397 calldata: Bytes::from("0x0badc0ffee"),
1398 state_overrides: None,
1399 prune_addresses: None,
1400 }),
1401 Some("Component1".to_string()),
1402 )]),
1403 )]),
1404 trace_results: HashMap::from([(
1405 "entrypoint_a".to_string(),
1406 TracingResult {
1407 retriggers: HashSet::from([(
1408 Bytes::from("0x0badc0ffee"),
1409 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1410 )]),
1411 accessed_slots: HashMap::from([(
1412 Bytes::from("0x0badc0ffee"),
1413 HashSet::from([Bytes::from("0xbadbeef0")]),
1414 )]),
1415 },
1416 )]),
1417 },
1418 ..Default::default()
1419 },
1420 BlockChanges {
1421 extractor: "uniswap-v2".to_string(),
1422 chain: Chain::Ethereum,
1423 block: Block {
1424 number: 2,
1425 hash: Bytes::from("0x02"),
1426 parent_hash: Bytes::from("0x01"),
1427 chain: Chain::Ethereum,
1428 ts: Default::default(),
1429 },
1430 revert: false,
1431 component_tvl: [
1432 ("Component1".to_string(), 100.0),
1433 ("Component2".to_string(), 0.0),
1434 ("Component3".to_string(), 1000.0),
1435 ]
1436 .into_iter()
1437 .collect(),
1438 ..Default::default()
1439 },
1440 ];
1441 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1442 state_sync
1443 .initialize()
1444 .await
1445 .expect("Init failed");
1446
1447 let (handle, mut rx) = state_sync.start().await;
1449 let (jh, close_tx) = handle.split();
1450 tx.send(deltas[0].clone())
1451 .await
1452 .expect("deltas channel msg 0 closed!");
1453 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1454 .await
1455 .expect("waiting for first state msg timed out!")
1456 .expect("state sync block sender closed!");
1457 tx.send(deltas[1].clone())
1458 .await
1459 .expect("deltas channel msg 1 closed!");
1460 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1461 .await
1462 .expect("waiting for second state msg timed out!")
1463 .expect("state sync block sender closed!");
1464 let _ = close_tx.send(());
1465 jh.await
1466 .expect("state sync task panicked!");
1467
1468 let exp1 = StateSyncMessage {
1470 header: BlockHeader {
1471 number: 1,
1472 hash: Bytes::from("0x01"),
1473 parent_hash: Bytes::from("0x00"),
1474 revert: false,
1475 ..Default::default()
1476 },
1477 snapshots: Snapshot {
1478 states: [
1479 (
1480 "Component1".to_string(),
1481 ComponentWithState {
1482 state: ResponseProtocolState {
1483 component_id: "Component1".to_string(),
1484 ..Default::default()
1485 },
1486 component: ProtocolComponent {
1487 id: "Component1".to_string(),
1488 ..Default::default()
1489 },
1490 component_tvl: Some(100.0),
1491 entrypoints: vec![],
1492 },
1493 ),
1494 (
1495 "Component2".to_string(),
1496 ComponentWithState {
1497 state: ResponseProtocolState {
1498 component_id: "Component2".to_string(),
1499 ..Default::default()
1500 },
1501 component: ProtocolComponent {
1502 id: "Component2".to_string(),
1503 ..Default::default()
1504 },
1505 component_tvl: Some(0.0),
1506 entrypoints: vec![],
1507 },
1508 ),
1509 ]
1510 .into_iter()
1511 .collect(),
1512 vm_storage: HashMap::new(),
1513 },
1514 deltas: Some(deltas[0].clone()),
1515 removed_components: Default::default(),
1516 };
1517
1518 let exp2 = StateSyncMessage {
1519 header: BlockHeader {
1520 number: 2,
1521 hash: Bytes::from("0x02"),
1522 parent_hash: Bytes::from("0x01"),
1523 revert: false,
1524 ..Default::default()
1525 },
1526 snapshots: Snapshot {
1527 states: [
1528 (
1530 "Component3".to_string(),
1531 ComponentWithState {
1532 state: ResponseProtocolState {
1533 component_id: "Component3".to_string(),
1534 ..Default::default()
1535 },
1536 component: ProtocolComponent {
1537 id: "Component3".to_string(),
1538 ..Default::default()
1539 },
1540 component_tvl: Some(1000.0),
1541 entrypoints: vec![],
1542 },
1543 ),
1544 ]
1545 .into_iter()
1546 .collect(),
1547 vm_storage: HashMap::new(),
1548 },
1549 deltas: Some(BlockChanges {
1552 extractor: "uniswap-v2".to_string(),
1553 chain: Chain::Ethereum,
1554 block: Block {
1555 number: 2,
1556 hash: Bytes::from("0x02"),
1557 parent_hash: Bytes::from("0x01"),
1558 chain: Chain::Ethereum,
1559 ts: Default::default(),
1560 },
1561 revert: false,
1562 component_tvl: [
1563 ("Component1".to_string(), 100.0),
1565 ("Component3".to_string(), 1000.0),
1566 ]
1567 .into_iter()
1568 .collect(),
1569 ..Default::default()
1570 }),
1571 removed_components: [(
1573 "Component2".to_string(),
1574 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1575 )]
1576 .into_iter()
1577 .collect(),
1578 };
1579 assert_eq!(first_msg.unwrap(), exp1);
1580 assert_eq!(second_msg.unwrap(), exp2);
1581 }
1582
1583 #[test(tokio::test)]
1584 async fn test_state_sync_with_tvl_range() {
1585 let remove_tvl_threshold = 5.0;
1587 let add_tvl_threshold = 7.0;
1588
1589 let mut rpc_client = MockRPCClient::new();
1590 let mut deltas_client = MockDeltasClient::new();
1591
1592 rpc_client
1593 .expect_get_protocol_components()
1594 .with(mockall::predicate::function(
1595 move |request_params: &ProtocolComponentsRequestBody| {
1596 if let Some(ids) = request_params.component_ids.as_ref() {
1597 ids.contains(&"Component3".to_string())
1598 } else {
1599 false
1600 }
1601 },
1602 ))
1603 .returning(|_| {
1604 Ok(ProtocolComponentRequestResponse {
1605 protocol_components: vec![ProtocolComponent {
1606 id: "Component3".to_string(),
1607 ..Default::default()
1608 }],
1609 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1610 })
1611 });
1612 rpc_client
1614 .expect_get_snapshots()
1615 .withf(|request: &SnapshotParameters, _chunk_size: &usize, _concurrency: &usize| {
1616 request
1617 .components
1618 .contains_key("Component3")
1619 })
1620 .returning(|_request, _chunk_size, _concurrency| {
1621 Ok(Snapshot {
1622 states: [(
1623 "Component3".to_string(),
1624 ComponentWithState {
1625 state: ResponseProtocolState {
1626 component_id: "Component3".to_string(),
1627 ..Default::default()
1628 },
1629 component: ProtocolComponent {
1630 id: "Component3".to_string(),
1631 ..Default::default()
1632 },
1633 component_tvl: Some(10.0),
1634 entrypoints: vec![],
1635 },
1636 )]
1637 .into_iter()
1638 .collect(),
1639 vm_storage: HashMap::new(),
1640 })
1641 });
1642
1643 rpc_client
1645 .expect_get_protocol_components()
1646 .returning(|_| {
1647 Ok(ProtocolComponentRequestResponse {
1648 protocol_components: vec![
1649 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1650 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1651 ],
1652 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1653 })
1654 });
1655
1656 rpc_client
1658 .expect_get_snapshots()
1659 .returning(|_request, _chunk_size, _concurrency| {
1660 Ok(Snapshot {
1661 states: [
1662 (
1663 "Component1".to_string(),
1664 ComponentWithState {
1665 state: ResponseProtocolState {
1666 component_id: "Component1".to_string(),
1667 ..Default::default()
1668 },
1669 component: ProtocolComponent {
1670 id: "Component1".to_string(),
1671 ..Default::default()
1672 },
1673 component_tvl: Some(6.0),
1674 entrypoints: vec![],
1675 },
1676 ),
1677 (
1678 "Component2".to_string(),
1679 ComponentWithState {
1680 state: ResponseProtocolState {
1681 component_id: "Component2".to_string(),
1682 ..Default::default()
1683 },
1684 component: ProtocolComponent {
1685 id: "Component2".to_string(),
1686 ..Default::default()
1687 },
1688 component_tvl: Some(2.0),
1689 entrypoints: vec![],
1690 },
1691 ),
1692 ]
1693 .into_iter()
1694 .collect(),
1695 vm_storage: HashMap::new(),
1696 })
1697 });
1698
1699 rpc_client
1701 .expect_get_traced_entry_points()
1702 .returning(|_| {
1703 Ok(TracedEntryPointRequestResponse {
1704 traced_entry_points: HashMap::new(),
1705 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1706 })
1707 });
1708
1709 let (tx, rx) = channel(1);
1710 deltas_client
1711 .expect_subscribe()
1712 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1713
1714 deltas_client
1716 .expect_unsubscribe()
1717 .return_once(|_| Ok(()));
1718
1719 let mut state_sync = ProtocolStateSynchronizer::new(
1720 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1721 true,
1722 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1723 1,
1724 Duration::from_secs(0),
1725 true,
1726 true,
1727 ArcRPCClient(Arc::new(rpc_client)),
1728 ArcDeltasClient(Arc::new(deltas_client)),
1729 10_u64,
1730 );
1731 state_sync
1732 .initialize()
1733 .await
1734 .expect("Init failed");
1735
1736 let deltas = [
1738 BlockChanges {
1739 extractor: "uniswap-v2".to_string(),
1740 chain: Chain::Ethereum,
1741 block: Block {
1742 number: 1,
1743 hash: Bytes::from("0x01"),
1744 parent_hash: Bytes::from("0x00"),
1745 chain: Chain::Ethereum,
1746 ts: Default::default(),
1747 },
1748 revert: false,
1749 ..Default::default()
1750 },
1751 BlockChanges {
1752 extractor: "uniswap-v2".to_string(),
1753 chain: Chain::Ethereum,
1754 block: Block {
1755 number: 2,
1756 hash: Bytes::from("0x02"),
1757 parent_hash: Bytes::from("0x01"),
1758 chain: Chain::Ethereum,
1759 ts: Default::default(),
1760 },
1761 revert: false,
1762 component_tvl: [
1763 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1767 .into_iter()
1768 .collect(),
1769 ..Default::default()
1770 },
1771 ];
1772
1773 let (handle, mut rx) = state_sync.start().await;
1774 let (jh, close_tx) = handle.split();
1775
1776 tx.send(deltas[0].clone())
1778 .await
1779 .expect("deltas channel msg 0 closed!");
1780
1781 let _ = timeout(Duration::from_millis(100), rx.recv())
1783 .await
1784 .expect("waiting for first state msg timed out!")
1785 .expect("state sync block sender closed!");
1786
1787 tx.send(deltas[1].clone())
1789 .await
1790 .expect("deltas channel msg 1 closed!");
1791 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1792 .await
1793 .expect("waiting for second state msg timed out!")
1794 .expect("state sync block sender closed!")
1795 .expect("no error");
1796
1797 let _ = close_tx.send(());
1798 jh.await
1799 .expect("state sync task panicked!");
1800
1801 let expected_second_msg = StateSyncMessage {
1802 header: BlockHeader {
1803 number: 2,
1804 hash: Bytes::from("0x02"),
1805 parent_hash: Bytes::from("0x01"),
1806 revert: false,
1807 ..Default::default()
1808 },
1809 snapshots: Snapshot {
1810 states: [(
1811 "Component3".to_string(),
1812 ComponentWithState {
1813 state: ResponseProtocolState {
1814 component_id: "Component3".to_string(),
1815 ..Default::default()
1816 },
1817 component: ProtocolComponent {
1818 id: "Component3".to_string(),
1819 ..Default::default()
1820 },
1821 component_tvl: Some(10.0),
1822 entrypoints: vec![], },
1824 )]
1825 .into_iter()
1826 .collect(),
1827 vm_storage: HashMap::new(),
1828 },
1829 deltas: Some(BlockChanges {
1830 extractor: "uniswap-v2".to_string(),
1831 chain: Chain::Ethereum,
1832 block: Block {
1833 number: 2,
1834 hash: Bytes::from("0x02"),
1835 parent_hash: Bytes::from("0x01"),
1836 chain: Chain::Ethereum,
1837 ts: Default::default(),
1838 },
1839 revert: false,
1840 component_tvl: [
1841 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
1844 .into_iter()
1845 .collect(),
1846 ..Default::default()
1847 }),
1848 removed_components: [(
1849 "Component2".to_string(),
1850 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1851 )]
1852 .into_iter()
1853 .collect(),
1854 };
1855
1856 assert_eq!(second_msg, expected_second_msg);
1857 }
1858
1859 #[test(tokio::test)]
1860 async fn test_public_close_api_functionality() {
1861 let mut rpc_client = MockRPCClient::new();
1868 let mut deltas_client = MockDeltasClient::new();
1869
1870 rpc_client
1872 .expect_get_protocol_components()
1873 .returning(|_| {
1874 Ok(ProtocolComponentRequestResponse {
1875 protocol_components: vec![],
1876 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1877 })
1878 });
1879
1880 let (_tx, rx) = channel(1);
1882 deltas_client
1883 .expect_subscribe()
1884 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1885
1886 deltas_client
1888 .expect_unsubscribe()
1889 .return_once(|_| Ok(()));
1890
1891 let mut state_sync = ProtocolStateSynchronizer::new(
1892 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1893 true,
1894 ComponentFilter::with_tvl_range(0.0, 0.0),
1895 5, Duration::from_secs(0),
1897 true,
1898 false,
1899 ArcRPCClient(Arc::new(rpc_client)),
1900 ArcDeltasClient(Arc::new(deltas_client)),
1901 10000_u64, );
1903
1904 state_sync
1905 .initialize()
1906 .await
1907 .expect("Init should succeed");
1908
1909 let (handle, _rx) = state_sync.start().await;
1911 let (jh, close_tx) = handle.split();
1912
1913 tokio::time::sleep(Duration::from_millis(100)).await;
1915
1916 close_tx
1918 .send(())
1919 .expect("Should be able to send close signal");
1920 jh.await.expect("Task should not panic");
1922 }
1923
1924 #[test(tokio::test)]
1925 async fn test_cleanup_runs_when_state_sync_processing_errors() {
1926 let mut rpc_client = MockRPCClient::new();
1931 let mut deltas_client = MockDeltasClient::new();
1932
1933 rpc_client
1935 .expect_get_protocol_components()
1936 .returning(|_| {
1937 Ok(ProtocolComponentRequestResponse {
1938 protocol_components: vec![],
1939 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1940 })
1941 });
1942
1943 rpc_client
1945 .expect_get_protocol_states()
1946 .returning(|_| {
1947 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1948 });
1949
1950 let (tx, rx) = channel(10);
1952 deltas_client
1953 .expect_subscribe()
1954 .return_once(move |_, _| {
1955 let delta = BlockChanges {
1957 extractor: "test".to_string(),
1958 chain: Chain::Ethereum,
1959 block: Block {
1960 hash: Bytes::from("0x0123"),
1961 number: 1,
1962 parent_hash: Bytes::from("0x0000"),
1963 chain: Chain::Ethereum,
1964 ts: chrono::DateTime::from_timestamp(1234567890, 0)
1965 .unwrap()
1966 .naive_utc(),
1967 },
1968 revert: false,
1969 new_protocol_components: [(
1971 "new_component".to_string(),
1972 ProtocolComponent {
1973 id: "new_component".to_string(),
1974 protocol_system: "test_protocol".to_string(),
1975 protocol_type_name: "test".to_string(),
1976 chain: Chain::Ethereum,
1977 tokens: vec![Bytes::from("0x0badc0ffee")],
1978 contract_ids: vec![Bytes::from("0x0badc0ffee")],
1979 static_attributes: Default::default(),
1980 creation_tx: Default::default(),
1981 created_at: Default::default(),
1982 change: Default::default(),
1983 },
1984 )]
1985 .into_iter()
1986 .collect(),
1987 component_tvl: [("new_component".to_string(), 100.0)]
1988 .into_iter()
1989 .collect(),
1990 ..Default::default()
1991 };
1992
1993 tokio::spawn(async move {
1994 let _ = tx.send(delta).await;
1995 });
1997
1998 Ok((Uuid::default(), rx))
1999 });
2000
2001 deltas_client
2003 .expect_unsubscribe()
2004 .return_once(|_| Ok(()));
2005
2006 let mut state_sync = ProtocolStateSynchronizer::new(
2007 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2008 true,
2009 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2011 Duration::from_secs(0),
2012 true,
2013 false,
2014 ArcRPCClient(Arc::new(rpc_client)),
2015 ArcDeltasClient(Arc::new(deltas_client)),
2016 5000_u64,
2017 );
2018
2019 state_sync
2020 .initialize()
2021 .await
2022 .expect("Init should succeed");
2023
2024 state_sync.last_synced_block = Some(BlockHeader {
2026 hash: Bytes::from("0x0badc0ffee"),
2027 number: 42,
2028 parent_hash: Bytes::from("0xbadbeef0"),
2029 revert: false,
2030 timestamp: 123456789,
2031 });
2032
2033 let (mut block_tx, _block_rx) = channel(10);
2035
2036 let (_end_tx, end_rx) = oneshot::channel::<()>();
2038 let result = state_sync
2039 .state_sync(&mut block_tx, end_rx)
2040 .await;
2041 assert!(result.is_err(), "state_sync should have errored during processing");
2043
2044 }
2047
2048 #[test(tokio::test)]
2049 async fn test_close_signal_while_waiting_for_first_deltas() {
2050 let mut rpc_client = MockRPCClient::new();
2054 let mut deltas_client = MockDeltasClient::new();
2055
2056 rpc_client
2057 .expect_get_protocol_components()
2058 .returning(|_| {
2059 Ok(ProtocolComponentRequestResponse {
2060 protocol_components: vec![],
2061 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2062 })
2063 });
2064
2065 let (_tx, rx) = channel(1);
2066 deltas_client
2067 .expect_subscribe()
2068 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2069
2070 deltas_client
2071 .expect_unsubscribe()
2072 .return_once(|_| Ok(()));
2073
2074 let mut state_sync = ProtocolStateSynchronizer::new(
2075 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2076 true,
2077 ComponentFilter::with_tvl_range(0.0, 0.0),
2078 1,
2079 Duration::from_secs(0),
2080 true,
2081 false,
2082 ArcRPCClient(Arc::new(rpc_client)),
2083 ArcDeltasClient(Arc::new(deltas_client)),
2084 10000_u64,
2085 );
2086
2087 state_sync
2088 .initialize()
2089 .await
2090 .expect("Init should succeed");
2091
2092 let (mut block_tx, _block_rx) = channel(10);
2093 let (end_tx, end_rx) = oneshot::channel::<()>();
2094
2095 let state_sync_handle = tokio::spawn(async move {
2097 state_sync
2098 .state_sync(&mut block_tx, end_rx)
2099 .await
2100 });
2101
2102 tokio::time::sleep(Duration::from_millis(100)).await;
2104
2105 let _ = end_tx.send(());
2107
2108 let result = state_sync_handle
2110 .await
2111 .expect("Task should not panic");
2112 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2113
2114 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2115 }
2116
2117 #[test(tokio::test)]
2118 async fn test_close_signal_during_main_processing_loop() {
2119 let mut rpc_client = MockRPCClient::new();
2125 let mut deltas_client = MockDeltasClient::new();
2126
2127 rpc_client
2129 .expect_get_protocol_components()
2130 .returning(|_| {
2131 Ok(ProtocolComponentRequestResponse {
2132 protocol_components: vec![],
2133 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2134 })
2135 });
2136
2137 rpc_client
2139 .expect_get_protocol_states()
2140 .returning(|_| {
2141 Ok(ProtocolStateRequestResponse {
2142 states: vec![],
2143 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2144 })
2145 });
2146
2147 rpc_client
2148 .expect_get_component_tvl()
2149 .returning(|_| {
2150 Ok(ComponentTvlRequestResponse {
2151 tvl: HashMap::new(),
2152 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2153 })
2154 });
2155
2156 rpc_client
2157 .expect_get_traced_entry_points()
2158 .returning(|_| {
2159 Ok(TracedEntryPointRequestResponse {
2160 traced_entry_points: HashMap::new(),
2161 pagination: PaginationResponse::new(0, 20, 0),
2162 })
2163 });
2164
2165 let (tx, rx) = channel(10);
2167 deltas_client
2168 .expect_subscribe()
2169 .return_once(move |_, _| {
2170 let first_delta = BlockChanges {
2172 extractor: "test".to_string(),
2173 chain: Chain::Ethereum,
2174 block: Block {
2175 hash: Bytes::from("0x0123"),
2176 number: 1,
2177 parent_hash: Bytes::from("0x0000"),
2178 chain: Chain::Ethereum,
2179 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2180 .unwrap()
2181 .naive_utc(),
2182 },
2183 revert: false,
2184 ..Default::default()
2185 };
2186
2187 tokio::spawn(async move {
2188 let _ = tx.send(first_delta).await;
2189 tokio::time::sleep(Duration::from_secs(30)).await;
2192 });
2193
2194 Ok((Uuid::default(), rx))
2195 });
2196
2197 deltas_client
2198 .expect_unsubscribe()
2199 .return_once(|_| Ok(()));
2200
2201 let mut state_sync = ProtocolStateSynchronizer::new(
2202 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2203 true,
2204 ComponentFilter::with_tvl_range(0.0, 1000.0),
2205 1,
2206 Duration::from_secs(0),
2207 true,
2208 false,
2209 ArcRPCClient(Arc::new(rpc_client)),
2210 ArcDeltasClient(Arc::new(deltas_client)),
2211 10000_u64,
2212 );
2213
2214 state_sync
2215 .initialize()
2216 .await
2217 .expect("Init should succeed");
2218
2219 let (mut block_tx, mut block_rx) = channel(10);
2220 let (end_tx, end_rx) = oneshot::channel::<()>();
2221
2222 let state_sync_handle = tokio::spawn(async move {
2224 state_sync
2225 .state_sync(&mut block_tx, end_rx)
2226 .await
2227 });
2228
2229 let first_snapshot = block_rx
2231 .recv()
2232 .await
2233 .expect("Should receive first snapshot")
2234 .expect("Synchronizer error");
2235 assert!(
2236 !first_snapshot
2237 .snapshots
2238 .states
2239 .is_empty() ||
2240 first_snapshot.deltas.is_some()
2241 );
2242 let _ = end_tx.send(());
2244
2245 let result = state_sync_handle
2247 .await
2248 .expect("Task should not panic");
2249 assert!(
2250 result.is_ok(),
2251 "state_sync should exit cleanly when closed after first message: {result:?}"
2252 );
2253 }
2254
2255 #[test(tokio::test)]
2256 async fn test_max_retries_exceeded_error_propagation() {
2257 let mut rpc_client = MockRPCClient::new();
2261 let mut deltas_client = MockDeltasClient::new();
2262
2263 rpc_client
2265 .expect_get_protocol_components()
2266 .returning(|_| {
2267 Ok(ProtocolComponentRequestResponse {
2268 protocol_components: vec![],
2269 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2270 })
2271 });
2272
2273 deltas_client
2276 .expect_subscribe()
2277 .returning(|_, _| {
2278 Err(DeltasError::NotConnected)
2280 });
2281
2282 deltas_client
2284 .expect_unsubscribe()
2285 .returning(|_| Ok(()))
2286 .times(0..=5);
2287
2288 let mut state_sync = ProtocolStateSynchronizer::new(
2290 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2291 true,
2292 ComponentFilter::with_tvl_range(0.0, 1000.0),
2293 2, Duration::from_millis(10), true,
2296 false,
2297 ArcRPCClient(Arc::new(rpc_client)),
2298 ArcDeltasClient(Arc::new(deltas_client)),
2299 1000_u64,
2300 );
2301
2302 state_sync
2303 .initialize()
2304 .await
2305 .expect("Init should succeed");
2306
2307 let (handle, mut rx) = state_sync.start().await;
2309 let (jh, _close_tx) = handle.split();
2310
2311 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2312 .await
2313 .expect("responsds in time")
2314 .expect("channel open");
2315
2316 if let Err(err) = res {
2318 assert!(
2319 matches!(err, SynchronizerError::ConnectionClosed),
2320 "Expected ConnectionClosed error, got: {:?}",
2321 err
2322 );
2323 } else {
2324 panic!("Expected an error")
2325 }
2326
2327 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2329 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2330 }
2331
2332 #[test(tokio::test)]
2333 async fn test_is_next_expected() {
2334 let mut state_sync = with_mocked_clients(true, false, None, None);
2338
2339 let incoming_header = BlockHeader {
2341 number: 100,
2342 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2343 parent_hash: Bytes::from(
2344 "0x0000000000000000000000000000000000000000000000000000000000000000",
2345 ),
2346 revert: false,
2347 timestamp: 123456789,
2348 };
2349 assert!(
2350 !state_sync.is_next_expected(&incoming_header),
2351 "Should return false when no previous block is set"
2352 );
2353
2354 let previous_header = BlockHeader {
2356 number: 99,
2357 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2358 parent_hash: Bytes::from(
2359 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2360 ),
2361 revert: false,
2362 timestamp: 123456788,
2363 };
2364 state_sync.last_synced_block = Some(previous_header.clone());
2365
2366 assert!(
2367 state_sync.is_next_expected(&incoming_header),
2368 "Should return true when incoming parent_hash matches previous hash"
2369 );
2370
2371 let non_matching_header = BlockHeader {
2373 number: 100,
2374 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2375 parent_hash: Bytes::from(
2376 "0x1111111111111111111111111111111111111111111111111111111111111111",
2377 ), revert: false,
2379 timestamp: 123456789,
2380 };
2381 assert!(
2382 !state_sync.is_next_expected(&non_matching_header),
2383 "Should return false when incoming parent_hash doesn't match previous hash"
2384 );
2385 }
2386
2387 #[test(tokio::test)]
2388 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2389 let mut rpc_client = MockRPCClient::new();
2393 let mut deltas_client = MockDeltasClient::new();
2394
2395 rpc_client
2397 .expect_get_protocol_components()
2398 .returning(|_| {
2399 Ok(ProtocolComponentRequestResponse {
2400 protocol_components: vec![ProtocolComponent {
2401 id: "Component1".to_string(),
2402 ..Default::default()
2403 }],
2404 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2405 })
2406 });
2407
2408 let (tx, rx) = channel(10);
2410 deltas_client
2411 .expect_subscribe()
2412 .return_once(move |_, _| {
2413 let expected_next_delta = BlockChanges {
2414 extractor: "uniswap-v2".to_string(),
2415 chain: Chain::Ethereum,
2416 block: Block {
2417 hash: Bytes::from(
2418 "0x0000000000000000000000000000000000000000000000000000000000000002",
2419 ), number: 2,
2421 parent_hash: Bytes::from(
2422 "0x0000000000000000000000000000000000000000000000000000000000000001",
2423 ), chain: Chain::Ethereum,
2425 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2426 .unwrap()
2427 .naive_utc(),
2428 },
2429 revert: false,
2430 ..Default::default()
2431 };
2432
2433 tokio::spawn(async move {
2434 let _ = tx.send(expected_next_delta).await;
2435 });
2436
2437 Ok((Uuid::default(), rx))
2438 });
2439
2440 deltas_client
2441 .expect_unsubscribe()
2442 .return_once(|_| Ok(()));
2443
2444 let mut state_sync = ProtocolStateSynchronizer::new(
2445 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2446 true,
2447 ComponentFilter::with_tvl_range(0.0, 1000.0),
2448 1,
2449 Duration::from_secs(0),
2450 true, false,
2452 ArcRPCClient(Arc::new(rpc_client)),
2453 ArcDeltasClient(Arc::new(deltas_client)),
2454 10000_u64,
2455 );
2456
2457 state_sync
2459 .initialize()
2460 .await
2461 .expect("Init should succeed");
2462
2463 state_sync.last_synced_block = Some(BlockHeader {
2465 number: 1,
2466 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2468 "0x0000000000000000000000000000000000000000000000000000000000000000",
2469 ),
2470 revert: false,
2471 timestamp: 123456789,
2472 });
2473
2474 let (mut block_tx, mut block_rx) = channel(10);
2475 let (end_tx, end_rx) = oneshot::channel::<()>();
2476
2477 let state_sync_handle = tokio::spawn(async move {
2479 state_sync
2480 .state_sync(&mut block_tx, end_rx)
2481 .await
2482 });
2483
2484 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2486 .await
2487 .expect("Should receive message within timeout")
2488 .expect("Channel should be open")
2489 .expect("Should not be an error");
2490
2491 let _ = end_tx.send(());
2493
2494 let _ = state_sync_handle
2496 .await
2497 .expect("Task should not panic");
2498
2499 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2502 assert!(
2503 result_msg.snapshots.states.is_empty(),
2504 "Should not contain snapshots when next expected block is received"
2505 );
2506
2507 if let Some(deltas) = &result_msg.deltas {
2509 assert_eq!(deltas.block.number, 2);
2510 assert_eq!(
2511 deltas.block.hash,
2512 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2513 );
2514 assert_eq!(
2515 deltas.block.parent_hash,
2516 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2517 );
2518 }
2519 }
2520
2521 #[test(tokio::test)]
2522 async fn test_skip_previously_processed_messages() {
2523 let mut rpc_client = MockRPCClient::new();
2527 let mut deltas_client = MockDeltasClient::new();
2528
2529 rpc_client
2531 .expect_get_protocol_components()
2532 .returning(|_| {
2533 Ok(ProtocolComponentRequestResponse {
2534 protocol_components: vec![ProtocolComponent {
2535 id: "Component1".to_string(),
2536 ..Default::default()
2537 }],
2538 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2539 })
2540 });
2541
2542 rpc_client
2544 .expect_get_protocol_states()
2545 .returning(|_| {
2546 Ok(ProtocolStateRequestResponse {
2547 states: vec![ResponseProtocolState {
2548 component_id: "Component1".to_string(),
2549 ..Default::default()
2550 }],
2551 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2552 })
2553 });
2554
2555 rpc_client
2556 .expect_get_component_tvl()
2557 .returning(|_| {
2558 Ok(ComponentTvlRequestResponse {
2559 tvl: [("Component1".to_string(), 100.0)]
2560 .into_iter()
2561 .collect(),
2562 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2563 })
2564 });
2565
2566 rpc_client
2567 .expect_get_traced_entry_points()
2568 .returning(|_| {
2569 Ok(TracedEntryPointRequestResponse {
2570 traced_entry_points: HashMap::new(),
2571 pagination: PaginationResponse::new(0, 20, 0),
2572 })
2573 });
2574
2575 let (tx, rx) = channel(10);
2577 deltas_client
2578 .expect_subscribe()
2579 .return_once(move |_, _| {
2580 let old_messages = vec![
2582 BlockChanges {
2583 extractor: "uniswap-v2".to_string(),
2584 chain: Chain::Ethereum,
2585 block: Block {
2586 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2587 number: 3,
2588 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2589 chain: Chain::Ethereum,
2590 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2591 },
2592 revert: false,
2593 ..Default::default()
2594 },
2595 BlockChanges {
2596 extractor: "uniswap-v2".to_string(),
2597 chain: Chain::Ethereum,
2598 block: Block {
2599 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2600 number: 4,
2601 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2602 chain: Chain::Ethereum,
2603 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2604 },
2605 revert: false,
2606 ..Default::default()
2607 },
2608 BlockChanges {
2609 extractor: "uniswap-v2".to_string(),
2610 chain: Chain::Ethereum,
2611 block: Block {
2612 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2613 number: 5,
2614 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2615 chain: Chain::Ethereum,
2616 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2617 },
2618 revert: false,
2619 ..Default::default()
2620 },
2621 BlockChanges {
2623 extractor: "uniswap-v2".to_string(),
2624 chain: Chain::Ethereum,
2625 block: Block {
2626 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2627 number: 6,
2628 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2629 chain: Chain::Ethereum,
2630 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2631 },
2632 revert: false,
2633 ..Default::default()
2634 },
2635 ];
2636
2637 tokio::spawn(async move {
2638 for message in old_messages {
2639 let _ = tx.send(message).await;
2640 tokio::time::sleep(Duration::from_millis(10)).await;
2641 }
2642 });
2643
2644 Ok((Uuid::default(), rx))
2645 });
2646
2647 deltas_client
2648 .expect_unsubscribe()
2649 .return_once(|_| Ok(()));
2650
2651 let mut state_sync = ProtocolStateSynchronizer::new(
2652 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2653 true,
2654 ComponentFilter::with_tvl_range(0.0, 1000.0),
2655 1,
2656 Duration::from_secs(0),
2657 true,
2658 true,
2659 ArcRPCClient(Arc::new(rpc_client)),
2660 ArcDeltasClient(Arc::new(deltas_client)),
2661 10000_u64,
2662 );
2663
2664 state_sync
2666 .initialize()
2667 .await
2668 .expect("Init should succeed");
2669
2670 state_sync.last_synced_block = Some(BlockHeader {
2671 number: 5,
2672 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2673 parent_hash: Bytes::from(
2674 "0x0000000000000000000000000000000000000000000000000000000000000004",
2675 ),
2676 revert: false,
2677 timestamp: 1234567892,
2678 });
2679
2680 let (mut block_tx, mut block_rx) = channel(10);
2681 let (end_tx, end_rx) = oneshot::channel::<()>();
2682
2683 let state_sync_handle = tokio::spawn(async move {
2685 state_sync
2686 .state_sync(&mut block_tx, end_rx)
2687 .await
2688 });
2689
2690 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2692 .await
2693 .expect("Should receive message within timeout")
2694 .expect("Channel should be open")
2695 .expect("Should not be an error");
2696
2697 let _ = end_tx.send(());
2699
2700 let _ = state_sync_handle
2702 .await
2703 .expect("Task should not panic");
2704
2705 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2707 if let Some(deltas) = &result_msg.deltas {
2708 assert_eq!(
2709 deltas.block.number, 6,
2710 "Should only process block 6, skipping earlier blocks"
2711 );
2712 assert_eq!(
2713 deltas.block.hash,
2714 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2715 );
2716 }
2717
2718 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2721 Err(_) => {
2722 }
2724 Ok(Some(Err(_))) => {
2725 }
2727 Ok(Some(Ok(_))) => {
2728 panic!("Should not receive additional messages - old blocks should be skipped");
2729 }
2730 Ok(None) => {
2731 }
2733 }
2734 }
2735}