1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59}
60
61pub type SyncResult<T> = Result<T, SynchronizerError>;
62
63impl<T> From<SendError<T>> for SynchronizerError {
64 fn from(err: SendError<T>) -> Self {
65 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
66 }
67}
68
69impl From<DeltasError> for SynchronizerError {
70 fn from(err: DeltasError) -> Self {
71 match err {
72 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
73 _ => SynchronizerError::ConnectionError(err.to_string()),
74 }
75 }
76}
77
78pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
79 extractor_id: ExtractorIdentity,
80 retrieve_balances: bool,
81 rpc_client: R,
82 deltas_client: D,
83 max_retries: u64,
84 retry_cooldown: Duration,
85 include_snapshots: bool,
86 component_tracker: ComponentTracker<R>,
87 last_synced_block: Option<BlockHeader>,
88 timeout: u64,
89 include_tvl: bool,
90 compression: bool,
91}
92
93#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
94pub struct ComponentWithState {
95 pub state: ResponseProtocolState,
96 pub component: ProtocolComponent,
97 pub component_tvl: Option<f64>,
98 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
99}
100
101#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
102pub struct Snapshot {
103 pub states: HashMap<String, ComponentWithState>,
104 pub vm_storage: HashMap<Bytes, ResponseAccount>,
105}
106
107impl Snapshot {
108 fn extend(&mut self, other: Snapshot) {
109 self.states.extend(other.states);
110 self.vm_storage.extend(other.vm_storage);
111 }
112
113 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
114 &self.states
115 }
116
117 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
118 &self.vm_storage
119 }
120}
121
122#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
123pub struct StateSyncMessage<H>
124where
125 H: HeaderLike,
126{
127 pub header: H,
129 pub snapshots: Snapshot,
131 pub deltas: Option<BlockChanges>,
135 pub removed_components: HashMap<String, ProtocolComponent>,
137}
138
139impl<H> StateSyncMessage<H>
140where
141 H: HeaderLike,
142{
143 pub fn merge(mut self, other: Self) -> Self {
144 self.removed_components
146 .retain(|k, _| !other.snapshots.states.contains_key(k));
147 self.snapshots
148 .states
149 .retain(|k, _| !other.removed_components.contains_key(k));
150
151 self.snapshots.extend(other.snapshots);
152 let deltas = match (self.deltas, other.deltas) {
153 (Some(l), Some(r)) => Some(l.merge(r)),
154 (None, Some(r)) => Some(r),
155 (Some(l), None) => Some(l),
156 (None, None) => None,
157 };
158 self.removed_components
159 .extend(other.removed_components);
160 Self {
161 header: other.header,
162 snapshots: self.snapshots,
163 deltas,
164 removed_components: self.removed_components,
165 }
166 }
167}
168
169pub struct SynchronizerTaskHandle {
174 join_handle: JoinHandle<()>,
175 close_tx: oneshot::Sender<()>,
176}
177
178impl SynchronizerTaskHandle {
187 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
188 Self { join_handle, close_tx }
189 }
190
191 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
197 (self.join_handle, self.close_tx)
198 }
199}
200
201#[async_trait]
202pub trait StateSynchronizer: Send + Sync + 'static {
203 async fn initialize(&mut self) -> SyncResult<()>;
204 async fn start(
207 mut self,
208 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
209}
210
211impl<R, D> ProtocolStateSynchronizer<R, D>
212where
213 R: RPCClient + Clone + Send + Sync + 'static,
216 D: DeltasClient + Clone + Send + Sync + 'static,
217{
218 #[allow(clippy::too_many_arguments)]
220 pub fn new(
221 extractor_id: ExtractorIdentity,
222 retrieve_balances: bool,
223 component_filter: ComponentFilter,
224 max_retries: u64,
225 retry_cooldown: Duration,
226 include_snapshots: bool,
227 include_tvl: bool,
228 compression: bool,
229 rpc_client: R,
230 deltas_client: D,
231 timeout: u64,
232 ) -> Self {
233 Self {
234 extractor_id: extractor_id.clone(),
235 retrieve_balances,
236 rpc_client: rpc_client.clone(),
237 include_snapshots,
238 deltas_client,
239 component_tracker: ComponentTracker::new(
240 extractor_id.chain,
241 extractor_id.name.as_str(),
242 component_filter,
243 rpc_client,
244 ),
245 max_retries,
246 retry_cooldown,
247 last_synced_block: None,
248 timeout,
249 include_tvl,
250 compression,
251 }
252 }
253
254 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
256 &mut self,
257 header: BlockHeader,
258 ids: Option<I>,
259 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
260 if !self.include_snapshots {
261 return Ok(StateSyncMessage { header, ..Default::default() });
262 }
263
264 let component_ids: Vec<_> = match ids {
266 Some(ids) => ids.into_iter().cloned().collect(),
267 None => self
268 .component_tracker
269 .get_tracked_component_ids(),
270 };
271
272 if component_ids.is_empty() {
273 return Ok(StateSyncMessage { header, ..Default::default() });
274 }
275
276 const DCI_PROTOCOLS: &[&str] = &[
278 "uniswap_v4_hooks",
279 "vm:curve",
280 "vm:balancer_v2",
281 "vm:balancer_v3",
282 "fluid_v1",
283 "erc4626",
284 ];
285 let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
286 let result = self
287 .rpc_client
288 .get_traced_entry_points_paginated(
289 self.extractor_id.chain,
290 &self.extractor_id.name,
291 &component_ids,
292 None,
293 RPC_CLIENT_CONCURRENCY,
294 )
295 .await?;
296 self.component_tracker
297 .process_entrypoints(&result.clone().into());
298 result.traced_entry_points.clone()
299 } else {
300 HashMap::new()
301 };
302
303 let contract_ids: Vec<Bytes> = self
305 .component_tracker
306 .get_contracts_by_component(&component_ids)
307 .into_iter()
308 .collect();
309
310 let request = SnapshotParameters::new(
311 self.extractor_id.chain,
312 &self.extractor_id.name,
313 &self.component_tracker.components,
314 &contract_ids,
315 header.number,
316 )
317 .entrypoints(&entrypoints_result)
318 .include_balances(self.retrieve_balances)
319 .include_tvl(self.include_tvl);
320 let snapshot_response = self
321 .rpc_client
322 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
323 .await?;
324
325 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
326 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
327
328 Ok(StateSyncMessage {
329 header,
330 snapshots: snapshot_response,
331 deltas: None,
332 removed_components: HashMap::new(),
333 })
334 }
335
336 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
349 async fn state_sync(
350 &mut self,
351 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
352 mut end_rx: oneshot::Receiver<()>,
353 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
354 let subscription_options = SubscriptionOptions::new()
356 .with_state(self.include_snapshots)
357 .with_compression(self.compression);
358 let (subscription_id, mut msg_rx) = match self
359 .deltas_client
360 .subscribe(self.extractor_id.clone(), subscription_options)
361 .await
362 {
363 Ok(result) => result,
364 Err(e) => return Err((e.into(), Some(end_rx))),
365 };
366
367 let result = async {
368 info!("Waiting for deltas...");
369 let mut warned = false;
370 let mut first_msg = loop {
371 let msg = select! {
372 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
373 deltas_result
374 .map_err(|_| {
375 SynchronizerError::Timeout(format!(
376 "First deltas took longer than {t}s to arrive",
377 t = self.timeout
378 ))
379 })?
380 .ok_or_else(|| {
381 SynchronizerError::ConnectionError(
382 "Deltas channel closed before first message".to_string(),
383 )
384 })?
385 },
386 _ = &mut end_rx => {
387 info!("Received close signal while waiting for first deltas");
388 return Ok(());
389 }
390 };
391
392 let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
393 if let Some(current) = &self.last_synced_block {
394 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
395 if !warned {
396 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
397 warned = true;
398 }
399 continue
400 }
401 }
402 break msg;
403 };
404
405 self.filter_deltas(&mut first_msg);
406
407 let block = first_msg.get_block().clone();
409 info!(height = &block.number, "First deltas received");
410 let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
411 let deltas_msg = StateSyncMessage {
412 header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
413 snapshots: Default::default(),
414 deltas: Some(first_msg),
415 removed_components: Default::default(),
416 };
417
418 let msg = if !self.is_next_expected(&header) {
420 info!("Retrieving snapshot");
421 let snapshot = self
422 .get_snapshots::<Vec<&String>>(
423 BlockHeader::from_block(&block, false),
424 None,
425 )
426 .await?
427 .merge(deltas_msg);
428 let n_components = self.component_tracker.components.len();
429 let n_snapshots = snapshot.snapshots.states.len();
430 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
431 snapshot
432 } else {
433 deltas_msg
434 };
435 block_tx.send(Ok(msg)).await?;
436 self.last_synced_block = Some(header.clone());
437 loop {
438 select! {
439 deltas_opt = msg_rx.recv() => {
440 if let Some(mut deltas) = deltas_opt {
441 let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
442 debug!(block_number=?header.number, "Received delta message");
443
444 let (snapshots, removed_components) = {
445 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
448
449 let requiring_snapshot: Vec<_> = to_add
451 .iter()
452 .filter(|id| {
453 !self.component_tracker
454 .components
455 .contains_key(id.as_str())
456 })
457 .collect();
458 debug!(components=?requiring_snapshot, "SnapshotRequest");
459 self.component_tracker
460 .start_tracking(requiring_snapshot.as_slice())
461 .await?;
462
463 let snapshots = self
464 .get_snapshots(header.clone(), Some(requiring_snapshot))
465 .await?
466 .snapshots;
467
468 let removed_components = if !to_remove.is_empty() {
469 self.component_tracker.stop_tracking(&to_remove)
470 } else {
471 Default::default()
472 };
473
474 (snapshots, removed_components)
475 };
476
477 self.component_tracker.process_entrypoints(&deltas.dci_update);
479
480 self.filter_deltas(&mut deltas);
482 let n_changes = deltas.n_changes();
483
484 let next = StateSyncMessage {
486 header: header.clone(),
487 snapshots,
488 deltas: Some(deltas),
489 removed_components,
490 };
491 block_tx.send(Ok(next)).await?;
492 self.last_synced_block = Some(header.clone());
493
494 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
495 } else {
496 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
497 }
498 },
499 _ = &mut end_rx => {
500 info!("Received close signal during state_sync");
501 return Ok(());
502 }
503 }
504 }
505 }.await;
506
507 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
509 let _ = self
511 .deltas_client
512 .unsubscribe(subscription_id)
513 .await
514 .map_err(|err| {
515 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
516 });
517
518 match result {
521 Ok(()) => Ok(()), Err(e) => {
523 Err((e, Some(end_rx)))
527 }
528 }
529 }
530
531 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
532 if let Some(block) = self.last_synced_block.as_ref() {
533 return incoming.parent_hash == block.hash;
534 }
535 false
536 }
537 fn filter_deltas(&self, deltas: &mut BlockChanges) {
538 deltas.filter_by_component(|id| {
539 self.component_tracker
540 .components
541 .contains_key(id)
542 });
543 deltas.filter_by_contract(|id| {
544 self.component_tracker
545 .contracts
546 .contains(id)
547 });
548 }
549}
550
551#[async_trait]
552impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
553where
554 R: RPCClient + Clone + Send + Sync + 'static,
555 D: DeltasClient + Clone + Send + Sync + 'static,
556{
557 async fn initialize(&mut self) -> SyncResult<()> {
558 info!("Retrieving relevant protocol components");
559 self.component_tracker
560 .initialise_components()
561 .await?;
562 info!(
563 n_components = self.component_tracker.components.len(),
564 n_contracts = self.component_tracker.contracts.len(),
565 "Finished retrieving components",
566 );
567
568 Ok(())
569 }
570
571 async fn start(
572 mut self,
573 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
574 let (mut tx, rx) = channel(15);
575 let (end_tx, end_rx) = oneshot::channel::<()>();
576
577 let jh = tokio::spawn(async move {
578 let mut retry_count = 0;
579 let mut current_end_rx = end_rx;
580 let mut final_error = None;
581
582 while retry_count < self.max_retries {
583 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
584
585 let res = self
586 .state_sync(&mut tx, current_end_rx)
587 .await;
588 match res {
589 Ok(()) => {
590 info!(
591 extractor_id=%&self.extractor_id,
592 retry_count,
593 "State synchronization exited cleanly"
594 );
595 return;
596 }
597 Err((e, maybe_end_rx)) => {
598 warn!(
599 extractor_id=%&self.extractor_id,
600 retry_count,
601 error=%e,
602 "State synchronization errored!"
603 );
604
605 if let Some(recovered_end_rx) = maybe_end_rx {
607 current_end_rx = recovered_end_rx;
608
609 if let SynchronizerError::ConnectionClosed = e {
610 error!(
612 "Websocket connection closed. State synchronization exiting."
613 );
614 let _ = tx.send(Err(e)).await;
615 return;
616 } else {
617 final_error = Some(e);
619 }
620 } else {
621 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
623 return;
624 }
625 }
626 }
627 sleep(self.retry_cooldown).await;
628 retry_count += 1;
629 }
630 if let Some(e) = final_error {
631 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
632 let _ = tx.send(Err(e)).await;
633 }
634 });
635
636 let handle = SynchronizerTaskHandle::new(jh, end_tx);
637 (handle, rx)
638 }
639}
640
641#[cfg(test)]
642mod test {
643 use std::{collections::HashSet, sync::Arc};
662
663 use test_log::test;
664 use tycho_common::dto::{
665 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
666 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
667 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
668 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
669 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
670 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
671 };
672 use uuid::Uuid;
673
674 use super::*;
675 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
676
677 struct ArcRPCClient<T>(Arc<T>);
679
680 impl<T> Clone for ArcRPCClient<T> {
682 fn clone(&self) -> Self {
683 ArcRPCClient(self.0.clone())
684 }
685 }
686
687 #[async_trait]
688 impl<T> RPCClient for ArcRPCClient<T>
689 where
690 T: RPCClient + Sync + Send + 'static,
691 {
692 async fn get_tokens(
693 &self,
694 request: &TokensRequestBody,
695 ) -> Result<TokensRequestResponse, RPCError> {
696 self.0.get_tokens(request).await
697 }
698
699 async fn get_contract_state(
700 &self,
701 request: &StateRequestBody,
702 ) -> Result<StateRequestResponse, RPCError> {
703 self.0.get_contract_state(request).await
704 }
705
706 async fn get_protocol_components(
707 &self,
708 request: &ProtocolComponentsRequestBody,
709 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
710 self.0
711 .get_protocol_components(request)
712 .await
713 }
714
715 async fn get_protocol_states(
716 &self,
717 request: &ProtocolStateRequestBody,
718 ) -> Result<ProtocolStateRequestResponse, RPCError> {
719 self.0
720 .get_protocol_states(request)
721 .await
722 }
723
724 async fn get_protocol_systems(
725 &self,
726 request: &ProtocolSystemsRequestBody,
727 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
728 self.0
729 .get_protocol_systems(request)
730 .await
731 }
732
733 async fn get_component_tvl(
734 &self,
735 request: &ComponentTvlRequestBody,
736 ) -> Result<ComponentTvlRequestResponse, RPCError> {
737 self.0.get_component_tvl(request).await
738 }
739
740 async fn get_traced_entry_points(
741 &self,
742 request: &TracedEntryPointRequestBody,
743 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
744 self.0
745 .get_traced_entry_points(request)
746 .await
747 }
748
749 async fn get_snapshots<'a>(
750 &self,
751 request: &SnapshotParameters<'a>,
752 chunk_size: Option<usize>,
753 concurrency: usize,
754 ) -> Result<Snapshot, RPCError> {
755 self.0
756 .get_snapshots(request, chunk_size, concurrency)
757 .await
758 }
759
760 fn compression(&self) -> bool {
761 self.0.compression()
762 }
763 }
764
765 struct ArcDeltasClient<T>(Arc<T>);
767
768 impl<T> Clone for ArcDeltasClient<T> {
770 fn clone(&self) -> Self {
771 ArcDeltasClient(self.0.clone())
772 }
773 }
774
775 #[async_trait]
776 impl<T> DeltasClient for ArcDeltasClient<T>
777 where
778 T: DeltasClient + Sync + Send + 'static,
779 {
780 async fn subscribe(
781 &self,
782 extractor_id: ExtractorIdentity,
783 options: SubscriptionOptions,
784 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
785 self.0
786 .subscribe(extractor_id, options)
787 .await
788 }
789
790 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
791 self.0
792 .unsubscribe(subscription_id)
793 .await
794 }
795
796 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
797 self.0.connect().await
798 }
799
800 async fn close(&self) -> Result<(), DeltasError> {
801 self.0.close().await
802 }
803 }
804
805 fn with_mocked_clients(
806 native: bool,
807 include_tvl: bool,
808 rpc_client: Option<MockRPCClient>,
809 deltas_client: Option<MockDeltasClient>,
810 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
811 {
812 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
813 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
814
815 ProtocolStateSynchronizer::new(
816 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
817 native,
818 ComponentFilter::with_tvl_range(50.0, 50.0),
819 1,
820 Duration::from_secs(0),
821 true,
822 include_tvl,
823 true, rpc_client,
825 deltas_client,
826 10_u64,
827 )
828 }
829
830 fn state_snapshot_native() -> ProtocolStateRequestResponse {
831 ProtocolStateRequestResponse {
832 states: vec![ResponseProtocolState {
833 component_id: "Component1".to_string(),
834 ..Default::default()
835 }],
836 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
837 }
838 }
839
840 fn make_mock_client() -> MockRPCClient {
841 let mut m = MockRPCClient::new();
842 m.expect_compression()
843 .return_const(false);
844 m
845 }
846
847 #[test(tokio::test)]
848 async fn test_get_snapshots_native() {
849 let header = BlockHeader::default();
850 let mut rpc = make_mock_client();
851 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
852
853 let component_clone = component.clone();
854 rpc.expect_get_snapshots()
855 .returning(move |_request, _chunk_size, _concurrency| {
856 Ok(Snapshot {
857 states: state_snapshot_native()
858 .states
859 .into_iter()
860 .map(|state| {
861 (
862 state.component_id.clone(),
863 ComponentWithState {
864 state,
865 component: component_clone.clone(),
866 entrypoints: vec![],
867 component_tvl: None,
868 },
869 )
870 })
871 .collect(),
872 vm_storage: HashMap::new(),
873 })
874 });
875
876 rpc.expect_get_traced_entry_points()
877 .returning(|_| {
878 Ok(TracedEntryPointRequestResponse {
879 traced_entry_points: HashMap::new(),
880 pagination: PaginationResponse::new(0, 20, 0),
881 })
882 });
883
884 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
885 state_sync
886 .component_tracker
887 .components
888 .insert("Component1".to_string(), component.clone());
889 let components_arg = ["Component1".to_string()];
890 let exp = StateSyncMessage {
891 header: header.clone(),
892 snapshots: Snapshot {
893 states: state_snapshot_native()
894 .states
895 .into_iter()
896 .map(|state| {
897 (
898 state.component_id.clone(),
899 ComponentWithState {
900 state,
901 component: component.clone(),
902 entrypoints: vec![],
903 component_tvl: None,
904 },
905 )
906 })
907 .collect(),
908 vm_storage: HashMap::new(),
909 },
910 deltas: None,
911 removed_components: Default::default(),
912 };
913
914 let snap = state_sync
915 .get_snapshots(header, Some(&components_arg))
916 .await
917 .expect("Retrieving snapshot failed");
918
919 assert_eq!(snap, exp);
920 }
921
922 #[test(tokio::test)]
923 async fn test_get_snapshots_native_with_tvl() {
924 let header = BlockHeader::default();
925 let mut rpc = make_mock_client();
926 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
927
928 let component_clone = component.clone();
929 rpc.expect_get_snapshots()
930 .returning(move |_request, _chunk_size, _concurrency| {
931 Ok(Snapshot {
932 states: state_snapshot_native()
933 .states
934 .into_iter()
935 .map(|state| {
936 (
937 state.component_id.clone(),
938 ComponentWithState {
939 state,
940 component: component_clone.clone(),
941 component_tvl: Some(100.0),
942 entrypoints: vec![],
943 },
944 )
945 })
946 .collect(),
947 vm_storage: HashMap::new(),
948 })
949 });
950
951 rpc.expect_get_traced_entry_points()
952 .returning(|_| {
953 Ok(TracedEntryPointRequestResponse {
954 traced_entry_points: HashMap::new(),
955 pagination: PaginationResponse::new(0, 20, 0),
956 })
957 });
958
959 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
960 state_sync
961 .component_tracker
962 .components
963 .insert("Component1".to_string(), component.clone());
964 let components_arg = ["Component1".to_string()];
965 let exp = StateSyncMessage {
966 header: header.clone(),
967 snapshots: Snapshot {
968 states: state_snapshot_native()
969 .states
970 .into_iter()
971 .map(|state| {
972 (
973 state.component_id.clone(),
974 ComponentWithState {
975 state,
976 component: component.clone(),
977 component_tvl: Some(100.0),
978 entrypoints: vec![],
979 },
980 )
981 })
982 .collect(),
983 vm_storage: HashMap::new(),
984 },
985 deltas: None,
986 removed_components: Default::default(),
987 };
988
989 let snap = state_sync
990 .get_snapshots(header, Some(&components_arg))
991 .await
992 .expect("Retrieving snapshot failed");
993
994 assert_eq!(snap, exp);
995 }
996
997 fn state_snapshot_vm() -> StateRequestResponse {
998 StateRequestResponse {
999 accounts: vec![
1000 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1001 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1002 ],
1003 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1004 }
1005 }
1006
1007 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1008 TracedEntryPointRequestResponse {
1009 traced_entry_points: HashMap::from([(
1010 "Component1".to_string(),
1011 vec![(
1012 EntryPointWithTracingParams {
1013 entry_point: EntryPoint {
1014 external_id: "entrypoint_a".to_string(),
1015 target: Bytes::from("0x0badc0ffee"),
1016 signature: "sig()".to_string(),
1017 },
1018 params: TracingParams::RPCTracer(RPCTracerParams {
1019 caller: Some(Bytes::from("0x0badc0ffee")),
1020 calldata: Bytes::from("0x0badc0ffee"),
1021 state_overrides: None,
1022 prune_addresses: None,
1023 }),
1024 },
1025 TracingResult {
1026 retriggers: HashSet::from([(
1027 Bytes::from("0x0badc0ffee"),
1028 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1029 )]),
1030 accessed_slots: HashMap::from([(
1031 Bytes::from("0x0badc0ffee"),
1032 HashSet::from([Bytes::from("0xbadbeef0")]),
1033 )]),
1034 },
1035 )],
1036 )]),
1037 pagination: PaginationResponse::new(0, 20, 0),
1038 }
1039 }
1040
1041 #[test(tokio::test)]
1042 async fn test_get_snapshots_vm() {
1043 let header = BlockHeader::default();
1044 let mut rpc = make_mock_client();
1045
1046 let traced_ep_response = traced_entry_point_response();
1047 rpc.expect_get_snapshots()
1048 .returning(move |_request, _chunk_size, _concurrency| {
1049 let vm_storage_accounts = state_snapshot_vm();
1050 Ok(Snapshot {
1051 states: [(
1052 "Component1".to_string(),
1053 ComponentWithState {
1054 state: ResponseProtocolState {
1055 component_id: "Component1".to_string(),
1056 ..Default::default()
1057 },
1058 component: ProtocolComponent {
1059 id: "Component1".to_string(),
1060 contract_ids: vec![
1061 Bytes::from("0x0badc0ffee"),
1062 Bytes::from("0xbabe42"),
1063 ],
1064 ..Default::default()
1065 },
1066 component_tvl: None,
1067 entrypoints: traced_ep_response
1068 .traced_entry_points
1069 .get("Component1")
1070 .cloned()
1071 .unwrap_or_default(),
1072 },
1073 )]
1074 .into_iter()
1075 .collect(),
1076 vm_storage: vm_storage_accounts
1077 .accounts
1078 .into_iter()
1079 .map(|state| (state.address.clone(), state))
1080 .collect(),
1081 })
1082 });
1083
1084 rpc.expect_get_traced_entry_points()
1085 .returning(|_| Ok(traced_entry_point_response()));
1086
1087 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1088 let component = ProtocolComponent {
1089 id: "Component1".to_string(),
1090 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1091 ..Default::default()
1092 };
1093 state_sync
1094 .component_tracker
1095 .components
1096 .insert("Component1".to_string(), component.clone());
1097 let components_arg = ["Component1".to_string()];
1098 let exp = StateSyncMessage {
1099 header: header.clone(),
1100 snapshots: Snapshot {
1101 states: [(
1102 component.id.clone(),
1103 ComponentWithState {
1104 state: ResponseProtocolState {
1105 component_id: "Component1".to_string(),
1106 ..Default::default()
1107 },
1108 component: component.clone(),
1109 component_tvl: None,
1110 entrypoints: vec![(
1111 EntryPointWithTracingParams {
1112 entry_point: EntryPoint {
1113 external_id: "entrypoint_a".to_string(),
1114 target: Bytes::from("0x0badc0ffee"),
1115 signature: "sig()".to_string(),
1116 },
1117 params: TracingParams::RPCTracer(RPCTracerParams {
1118 caller: Some(Bytes::from("0x0badc0ffee")),
1119 calldata: Bytes::from("0x0badc0ffee"),
1120 state_overrides: None,
1121 prune_addresses: None,
1122 }),
1123 },
1124 TracingResult {
1125 retriggers: HashSet::from([(
1126 Bytes::from("0x0badc0ffee"),
1127 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1128 )]),
1129 accessed_slots: HashMap::from([(
1130 Bytes::from("0x0badc0ffee"),
1131 HashSet::from([Bytes::from("0xbadbeef0")]),
1132 )]),
1133 },
1134 )],
1135 },
1136 )]
1137 .into_iter()
1138 .collect(),
1139 vm_storage: state_snapshot_vm()
1140 .accounts
1141 .into_iter()
1142 .map(|state| (state.address.clone(), state))
1143 .collect(),
1144 },
1145 deltas: None,
1146 removed_components: Default::default(),
1147 };
1148
1149 let snap = state_sync
1150 .get_snapshots(header, Some(&components_arg))
1151 .await
1152 .expect("Retrieving snapshot failed");
1153
1154 assert_eq!(snap, exp);
1155 }
1156
1157 #[test(tokio::test)]
1158 async fn test_get_snapshots_vm_with_tvl() {
1159 let header = BlockHeader::default();
1160 let mut rpc = make_mock_client();
1161 let component = ProtocolComponent {
1162 id: "Component1".to_string(),
1163 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1164 ..Default::default()
1165 };
1166
1167 let component_clone = component.clone();
1168 rpc.expect_get_snapshots()
1169 .returning(move |_request, _chunk_size, _concurrency| {
1170 let vm_storage_accounts = state_snapshot_vm();
1171 Ok(Snapshot {
1172 states: [(
1173 "Component1".to_string(),
1174 ComponentWithState {
1175 state: ResponseProtocolState {
1176 component_id: "Component1".to_string(),
1177 ..Default::default()
1178 },
1179 component: component_clone.clone(),
1180 component_tvl: Some(100.0),
1181 entrypoints: vec![],
1182 },
1183 )]
1184 .into_iter()
1185 .collect(),
1186 vm_storage: vm_storage_accounts
1187 .accounts
1188 .into_iter()
1189 .map(|state| (state.address.clone(), state))
1190 .collect(),
1191 })
1192 });
1193
1194 rpc.expect_get_traced_entry_points()
1195 .returning(|_| {
1196 Ok(TracedEntryPointRequestResponse {
1197 traced_entry_points: HashMap::new(),
1198 pagination: PaginationResponse::new(0, 20, 0),
1199 })
1200 });
1201
1202 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1203 state_sync
1204 .component_tracker
1205 .components
1206 .insert("Component1".to_string(), component.clone());
1207 let components_arg = ["Component1".to_string()];
1208 let exp = StateSyncMessage {
1209 header: header.clone(),
1210 snapshots: Snapshot {
1211 states: [(
1212 component.id.clone(),
1213 ComponentWithState {
1214 state: ResponseProtocolState {
1215 component_id: "Component1".to_string(),
1216 ..Default::default()
1217 },
1218 component: component.clone(),
1219 component_tvl: Some(100.0),
1220 entrypoints: vec![],
1221 },
1222 )]
1223 .into_iter()
1224 .collect(),
1225 vm_storage: state_snapshot_vm()
1226 .accounts
1227 .into_iter()
1228 .map(|state| (state.address.clone(), state))
1229 .collect(),
1230 },
1231 deltas: None,
1232 removed_components: Default::default(),
1233 };
1234
1235 let snap = state_sync
1236 .get_snapshots(header, Some(&components_arg))
1237 .await
1238 .expect("Retrieving snapshot failed");
1239
1240 assert_eq!(snap, exp);
1241 }
1242
1243 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1244 let mut rpc_client = make_mock_client();
1245 rpc_client
1248 .expect_get_protocol_components()
1249 .with(mockall::predicate::function(
1250 move |request_params: &ProtocolComponentsRequestBody| {
1251 if let Some(ids) = request_params.component_ids.as_ref() {
1252 ids.contains(&"Component3".to_string())
1253 } else {
1254 false
1255 }
1256 },
1257 ))
1258 .returning(|_| {
1259 Ok(ProtocolComponentRequestResponse {
1261 protocol_components: vec![
1262 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1264 ],
1265 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1266 })
1267 });
1268 rpc_client
1270 .expect_get_snapshots()
1271 .withf(
1272 |request: &SnapshotParameters,
1273 _chunk_size: &Option<usize>,
1274 _concurrency: &usize| {
1275 request
1276 .components
1277 .contains_key("Component3")
1278 },
1279 )
1280 .returning(|_request, _chunk_size, _concurrency| {
1281 Ok(Snapshot {
1282 states: [(
1283 "Component3".to_string(),
1284 ComponentWithState {
1285 state: ResponseProtocolState {
1286 component_id: "Component3".to_string(),
1287 ..Default::default()
1288 },
1289 component: ProtocolComponent {
1290 id: "Component3".to_string(),
1291 ..Default::default()
1292 },
1293 component_tvl: Some(1000.0),
1294 entrypoints: vec![],
1295 },
1296 )]
1297 .into_iter()
1298 .collect(),
1299 vm_storage: HashMap::new(),
1300 })
1301 });
1302
1303 rpc_client
1305 .expect_get_protocol_components()
1306 .returning(|_| {
1307 Ok(ProtocolComponentRequestResponse {
1309 protocol_components: vec![
1310 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1312 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1314 ],
1316 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1317 })
1318 });
1319
1320 rpc_client
1321 .expect_get_snapshots()
1322 .returning(|_request, _chunk_size, _concurrency| {
1323 Ok(Snapshot {
1324 states: [
1325 (
1326 "Component1".to_string(),
1327 ComponentWithState {
1328 state: ResponseProtocolState {
1329 component_id: "Component1".to_string(),
1330 ..Default::default()
1331 },
1332 component: ProtocolComponent {
1333 id: "Component1".to_string(),
1334 ..Default::default()
1335 },
1336 component_tvl: Some(100.0),
1337 entrypoints: vec![],
1338 },
1339 ),
1340 (
1341 "Component2".to_string(),
1342 ComponentWithState {
1343 state: ResponseProtocolState {
1344 component_id: "Component2".to_string(),
1345 ..Default::default()
1346 },
1347 component: ProtocolComponent {
1348 id: "Component2".to_string(),
1349 ..Default::default()
1350 },
1351 component_tvl: Some(0.0),
1352 entrypoints: vec![],
1353 },
1354 ),
1355 ]
1356 .into_iter()
1357 .collect(),
1358 vm_storage: HashMap::new(),
1359 })
1360 });
1361
1362 rpc_client
1364 .expect_get_traced_entry_points()
1365 .returning(|_| {
1366 Ok(TracedEntryPointRequestResponse {
1367 traced_entry_points: HashMap::new(),
1368 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1369 })
1370 });
1371
1372 let mut deltas_client = MockDeltasClient::new();
1374 let (tx, rx) = channel(1);
1375 deltas_client
1376 .expect_subscribe()
1377 .return_once(move |_, _| {
1378 Ok((Uuid::default(), rx))
1380 });
1381
1382 deltas_client
1384 .expect_unsubscribe()
1385 .return_once(|_| Ok(()));
1386
1387 (rpc_client, deltas_client, tx)
1388 }
1389
1390 #[test(tokio::test)]
1397 async fn test_state_sync() {
1398 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1399 let deltas = [
1400 BlockChanges {
1401 extractor: "uniswap-v2".to_string(),
1402 chain: Chain::Ethereum,
1403 block: Block {
1404 number: 1,
1405 hash: Bytes::from("0x01"),
1406 parent_hash: Bytes::from("0x00"),
1407 chain: Chain::Ethereum,
1408 ts: Default::default(),
1409 },
1410 revert: false,
1411 dci_update: DCIUpdate {
1412 new_entrypoints: HashMap::from([(
1413 "Component1".to_string(),
1414 HashSet::from([EntryPoint {
1415 external_id: "entrypoint_a".to_string(),
1416 target: Bytes::from("0x0badc0ffee"),
1417 signature: "sig()".to_string(),
1418 }]),
1419 )]),
1420 new_entrypoint_params: HashMap::from([(
1421 "entrypoint_a".to_string(),
1422 HashSet::from([(
1423 TracingParams::RPCTracer(RPCTracerParams {
1424 caller: Some(Bytes::from("0x0badc0ffee")),
1425 calldata: Bytes::from("0x0badc0ffee"),
1426 state_overrides: None,
1427 prune_addresses: None,
1428 }),
1429 "Component1".to_string(),
1430 )]),
1431 )]),
1432 trace_results: HashMap::from([(
1433 "entrypoint_a".to_string(),
1434 TracingResult {
1435 retriggers: HashSet::from([(
1436 Bytes::from("0x0badc0ffee"),
1437 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1438 )]),
1439 accessed_slots: HashMap::from([(
1440 Bytes::from("0x0badc0ffee"),
1441 HashSet::from([Bytes::from("0xbadbeef0")]),
1442 )]),
1443 },
1444 )]),
1445 },
1446 ..Default::default()
1447 },
1448 BlockChanges {
1449 extractor: "uniswap-v2".to_string(),
1450 chain: Chain::Ethereum,
1451 block: Block {
1452 number: 2,
1453 hash: Bytes::from("0x02"),
1454 parent_hash: Bytes::from("0x01"),
1455 chain: Chain::Ethereum,
1456 ts: Default::default(),
1457 },
1458 revert: false,
1459 component_tvl: [
1460 ("Component1".to_string(), 100.0),
1461 ("Component2".to_string(), 0.0),
1462 ("Component3".to_string(), 1000.0),
1463 ]
1464 .into_iter()
1465 .collect(),
1466 ..Default::default()
1467 },
1468 ];
1469 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1470 state_sync
1471 .initialize()
1472 .await
1473 .expect("Init failed");
1474
1475 let (handle, mut rx) = state_sync.start().await;
1477 let (jh, close_tx) = handle.split();
1478 tx.send(deltas[0].clone())
1479 .await
1480 .expect("deltas channel msg 0 closed!");
1481 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1482 .await
1483 .expect("waiting for first state msg timed out!")
1484 .expect("state sync block sender closed!");
1485 tx.send(deltas[1].clone())
1486 .await
1487 .expect("deltas channel msg 1 closed!");
1488 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1489 .await
1490 .expect("waiting for second state msg timed out!")
1491 .expect("state sync block sender closed!");
1492 let _ = close_tx.send(());
1493 jh.await
1494 .expect("state sync task panicked!");
1495
1496 let exp1 = StateSyncMessage {
1498 header: BlockHeader {
1499 number: 1,
1500 hash: Bytes::from("0x01"),
1501 parent_hash: Bytes::from("0x00"),
1502 revert: false,
1503 ..Default::default()
1504 },
1505 snapshots: Snapshot {
1506 states: [
1507 (
1508 "Component1".to_string(),
1509 ComponentWithState {
1510 state: ResponseProtocolState {
1511 component_id: "Component1".to_string(),
1512 ..Default::default()
1513 },
1514 component: ProtocolComponent {
1515 id: "Component1".to_string(),
1516 ..Default::default()
1517 },
1518 component_tvl: Some(100.0),
1519 entrypoints: vec![],
1520 },
1521 ),
1522 (
1523 "Component2".to_string(),
1524 ComponentWithState {
1525 state: ResponseProtocolState {
1526 component_id: "Component2".to_string(),
1527 ..Default::default()
1528 },
1529 component: ProtocolComponent {
1530 id: "Component2".to_string(),
1531 ..Default::default()
1532 },
1533 component_tvl: Some(0.0),
1534 entrypoints: vec![],
1535 },
1536 ),
1537 ]
1538 .into_iter()
1539 .collect(),
1540 vm_storage: HashMap::new(),
1541 },
1542 deltas: Some(deltas[0].clone()),
1543 removed_components: Default::default(),
1544 };
1545
1546 let exp2 = StateSyncMessage {
1547 header: BlockHeader {
1548 number: 2,
1549 hash: Bytes::from("0x02"),
1550 parent_hash: Bytes::from("0x01"),
1551 revert: false,
1552 ..Default::default()
1553 },
1554 snapshots: Snapshot {
1555 states: [
1556 (
1558 "Component3".to_string(),
1559 ComponentWithState {
1560 state: ResponseProtocolState {
1561 component_id: "Component3".to_string(),
1562 ..Default::default()
1563 },
1564 component: ProtocolComponent {
1565 id: "Component3".to_string(),
1566 ..Default::default()
1567 },
1568 component_tvl: Some(1000.0),
1569 entrypoints: vec![],
1570 },
1571 ),
1572 ]
1573 .into_iter()
1574 .collect(),
1575 vm_storage: HashMap::new(),
1576 },
1577 deltas: Some(BlockChanges {
1580 extractor: "uniswap-v2".to_string(),
1581 chain: Chain::Ethereum,
1582 block: Block {
1583 number: 2,
1584 hash: Bytes::from("0x02"),
1585 parent_hash: Bytes::from("0x01"),
1586 chain: Chain::Ethereum,
1587 ts: Default::default(),
1588 },
1589 revert: false,
1590 component_tvl: [
1591 ("Component1".to_string(), 100.0),
1593 ("Component3".to_string(), 1000.0),
1594 ]
1595 .into_iter()
1596 .collect(),
1597 ..Default::default()
1598 }),
1599 removed_components: [(
1601 "Component2".to_string(),
1602 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1603 )]
1604 .into_iter()
1605 .collect(),
1606 };
1607 assert_eq!(first_msg.unwrap(), exp1);
1608 assert_eq!(second_msg.unwrap(), exp2);
1609 }
1610
1611 #[test(tokio::test)]
1612 async fn test_state_sync_with_tvl_range() {
1613 let remove_tvl_threshold = 5.0;
1615 let add_tvl_threshold = 7.0;
1616
1617 let mut rpc_client = make_mock_client();
1618 let mut deltas_client = MockDeltasClient::new();
1619
1620 rpc_client
1621 .expect_get_protocol_components()
1622 .with(mockall::predicate::function(
1623 move |request_params: &ProtocolComponentsRequestBody| {
1624 if let Some(ids) = request_params.component_ids.as_ref() {
1625 ids.contains(&"Component3".to_string())
1626 } else {
1627 false
1628 }
1629 },
1630 ))
1631 .returning(|_| {
1632 Ok(ProtocolComponentRequestResponse {
1633 protocol_components: vec![ProtocolComponent {
1634 id: "Component3".to_string(),
1635 ..Default::default()
1636 }],
1637 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1638 })
1639 });
1640 rpc_client
1642 .expect_get_snapshots()
1643 .withf(
1644 |request: &SnapshotParameters,
1645 _chunk_size: &Option<usize>,
1646 _concurrency: &usize| {
1647 request
1648 .components
1649 .contains_key("Component3")
1650 },
1651 )
1652 .returning(|_request, _chunk_size, _concurrency| {
1653 Ok(Snapshot {
1654 states: [(
1655 "Component3".to_string(),
1656 ComponentWithState {
1657 state: ResponseProtocolState {
1658 component_id: "Component3".to_string(),
1659 ..Default::default()
1660 },
1661 component: ProtocolComponent {
1662 id: "Component3".to_string(),
1663 ..Default::default()
1664 },
1665 component_tvl: Some(10.0),
1666 entrypoints: vec![],
1667 },
1668 )]
1669 .into_iter()
1670 .collect(),
1671 vm_storage: HashMap::new(),
1672 })
1673 });
1674
1675 rpc_client
1677 .expect_get_protocol_components()
1678 .returning(|_| {
1679 Ok(ProtocolComponentRequestResponse {
1680 protocol_components: vec![
1681 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1682 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1683 ],
1684 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1685 })
1686 });
1687
1688 rpc_client
1690 .expect_get_snapshots()
1691 .returning(|_request, _chunk_size, _concurrency| {
1692 Ok(Snapshot {
1693 states: [
1694 (
1695 "Component1".to_string(),
1696 ComponentWithState {
1697 state: ResponseProtocolState {
1698 component_id: "Component1".to_string(),
1699 ..Default::default()
1700 },
1701 component: ProtocolComponent {
1702 id: "Component1".to_string(),
1703 ..Default::default()
1704 },
1705 component_tvl: Some(6.0),
1706 entrypoints: vec![],
1707 },
1708 ),
1709 (
1710 "Component2".to_string(),
1711 ComponentWithState {
1712 state: ResponseProtocolState {
1713 component_id: "Component2".to_string(),
1714 ..Default::default()
1715 },
1716 component: ProtocolComponent {
1717 id: "Component2".to_string(),
1718 ..Default::default()
1719 },
1720 component_tvl: Some(2.0),
1721 entrypoints: vec![],
1722 },
1723 ),
1724 ]
1725 .into_iter()
1726 .collect(),
1727 vm_storage: HashMap::new(),
1728 })
1729 });
1730
1731 rpc_client
1733 .expect_get_traced_entry_points()
1734 .returning(|_| {
1735 Ok(TracedEntryPointRequestResponse {
1736 traced_entry_points: HashMap::new(),
1737 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1738 })
1739 });
1740
1741 let (tx, rx) = channel(1);
1742 deltas_client
1743 .expect_subscribe()
1744 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1745
1746 deltas_client
1748 .expect_unsubscribe()
1749 .return_once(|_| Ok(()));
1750
1751 let mut state_sync = ProtocolStateSynchronizer::new(
1752 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1753 true,
1754 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1755 1,
1756 Duration::from_secs(0),
1757 true,
1758 true,
1759 true,
1760 ArcRPCClient(Arc::new(rpc_client)),
1761 ArcDeltasClient(Arc::new(deltas_client)),
1762 10_u64,
1763 );
1764 state_sync
1765 .initialize()
1766 .await
1767 .expect("Init failed");
1768
1769 let deltas = [
1771 BlockChanges {
1772 extractor: "uniswap-v2".to_string(),
1773 chain: Chain::Ethereum,
1774 block: Block {
1775 number: 1,
1776 hash: Bytes::from("0x01"),
1777 parent_hash: Bytes::from("0x00"),
1778 chain: Chain::Ethereum,
1779 ts: Default::default(),
1780 },
1781 revert: false,
1782 ..Default::default()
1783 },
1784 BlockChanges {
1785 extractor: "uniswap-v2".to_string(),
1786 chain: Chain::Ethereum,
1787 block: Block {
1788 number: 2,
1789 hash: Bytes::from("0x02"),
1790 parent_hash: Bytes::from("0x01"),
1791 chain: Chain::Ethereum,
1792 ts: Default::default(),
1793 },
1794 revert: false,
1795 component_tvl: [
1796 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1800 .into_iter()
1801 .collect(),
1802 ..Default::default()
1803 },
1804 ];
1805
1806 let (handle, mut rx) = state_sync.start().await;
1807 let (jh, close_tx) = handle.split();
1808
1809 tx.send(deltas[0].clone())
1811 .await
1812 .expect("deltas channel msg 0 closed!");
1813
1814 let _ = timeout(Duration::from_millis(100), rx.recv())
1816 .await
1817 .expect("waiting for first state msg timed out!")
1818 .expect("state sync block sender closed!");
1819
1820 tx.send(deltas[1].clone())
1822 .await
1823 .expect("deltas channel msg 1 closed!");
1824 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1825 .await
1826 .expect("waiting for second state msg timed out!")
1827 .expect("state sync block sender closed!")
1828 .expect("no error");
1829
1830 let _ = close_tx.send(());
1831 jh.await
1832 .expect("state sync task panicked!");
1833
1834 let expected_second_msg = StateSyncMessage {
1835 header: BlockHeader {
1836 number: 2,
1837 hash: Bytes::from("0x02"),
1838 parent_hash: Bytes::from("0x01"),
1839 revert: false,
1840 ..Default::default()
1841 },
1842 snapshots: Snapshot {
1843 states: [(
1844 "Component3".to_string(),
1845 ComponentWithState {
1846 state: ResponseProtocolState {
1847 component_id: "Component3".to_string(),
1848 ..Default::default()
1849 },
1850 component: ProtocolComponent {
1851 id: "Component3".to_string(),
1852 ..Default::default()
1853 },
1854 component_tvl: Some(10.0),
1855 entrypoints: vec![], },
1857 )]
1858 .into_iter()
1859 .collect(),
1860 vm_storage: HashMap::new(),
1861 },
1862 deltas: Some(BlockChanges {
1863 extractor: "uniswap-v2".to_string(),
1864 chain: Chain::Ethereum,
1865 block: Block {
1866 number: 2,
1867 hash: Bytes::from("0x02"),
1868 parent_hash: Bytes::from("0x01"),
1869 chain: Chain::Ethereum,
1870 ts: Default::default(),
1871 },
1872 revert: false,
1873 component_tvl: [
1874 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
1877 .into_iter()
1878 .collect(),
1879 ..Default::default()
1880 }),
1881 removed_components: [(
1882 "Component2".to_string(),
1883 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1884 )]
1885 .into_iter()
1886 .collect(),
1887 };
1888
1889 assert_eq!(second_msg, expected_second_msg);
1890 }
1891
1892 #[test(tokio::test)]
1893 async fn test_public_close_api_functionality() {
1894 let mut rpc_client = make_mock_client();
1901 let mut deltas_client = MockDeltasClient::new();
1902
1903 rpc_client
1905 .expect_get_protocol_components()
1906 .returning(|_| {
1907 Ok(ProtocolComponentRequestResponse {
1908 protocol_components: vec![],
1909 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1910 })
1911 });
1912
1913 let (_tx, rx) = channel(1);
1915 deltas_client
1916 .expect_subscribe()
1917 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1918
1919 deltas_client
1921 .expect_unsubscribe()
1922 .return_once(|_| Ok(()));
1923
1924 let mut state_sync = ProtocolStateSynchronizer::new(
1925 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1926 true,
1927 ComponentFilter::with_tvl_range(0.0, 0.0),
1928 5, Duration::from_secs(0),
1930 true,
1931 false,
1932 true,
1933 ArcRPCClient(Arc::new(rpc_client)),
1934 ArcDeltasClient(Arc::new(deltas_client)),
1935 10000_u64, );
1937
1938 state_sync
1939 .initialize()
1940 .await
1941 .expect("Init should succeed");
1942
1943 let (handle, _rx) = state_sync.start().await;
1945 let (jh, close_tx) = handle.split();
1946
1947 tokio::time::sleep(Duration::from_millis(100)).await;
1949
1950 close_tx
1952 .send(())
1953 .expect("Should be able to send close signal");
1954 jh.await.expect("Task should not panic");
1956 }
1957
1958 #[test(tokio::test)]
1959 async fn test_cleanup_runs_when_state_sync_processing_errors() {
1960 let mut rpc_client = make_mock_client();
1965 let mut deltas_client = MockDeltasClient::new();
1966
1967 rpc_client
1969 .expect_get_protocol_components()
1970 .returning(|_| {
1971 Ok(ProtocolComponentRequestResponse {
1972 protocol_components: vec![],
1973 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1974 })
1975 });
1976
1977 rpc_client
1979 .expect_get_protocol_states()
1980 .returning(|_| {
1981 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1982 });
1983
1984 let (tx, rx) = channel(10);
1986 deltas_client
1987 .expect_subscribe()
1988 .return_once(move |_, _| {
1989 let delta = BlockChanges {
1991 extractor: "test".to_string(),
1992 chain: Chain::Ethereum,
1993 block: Block {
1994 hash: Bytes::from("0x0123"),
1995 number: 1,
1996 parent_hash: Bytes::from("0x0000"),
1997 chain: Chain::Ethereum,
1998 ts: chrono::DateTime::from_timestamp(1234567890, 0)
1999 .unwrap()
2000 .naive_utc(),
2001 },
2002 revert: false,
2003 new_protocol_components: [(
2005 "new_component".to_string(),
2006 ProtocolComponent {
2007 id: "new_component".to_string(),
2008 protocol_system: "test_protocol".to_string(),
2009 protocol_type_name: "test".to_string(),
2010 chain: Chain::Ethereum,
2011 tokens: vec![Bytes::from("0x0badc0ffee")],
2012 contract_ids: vec![Bytes::from("0x0badc0ffee")],
2013 static_attributes: Default::default(),
2014 creation_tx: Default::default(),
2015 created_at: Default::default(),
2016 change: Default::default(),
2017 },
2018 )]
2019 .into_iter()
2020 .collect(),
2021 component_tvl: [("new_component".to_string(), 100.0)]
2022 .into_iter()
2023 .collect(),
2024 ..Default::default()
2025 };
2026
2027 tokio::spawn(async move {
2028 let _ = tx.send(delta).await;
2029 });
2031
2032 Ok((Uuid::default(), rx))
2033 });
2034
2035 deltas_client
2037 .expect_unsubscribe()
2038 .return_once(|_| Ok(()));
2039
2040 let mut state_sync = ProtocolStateSynchronizer::new(
2041 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2042 true,
2043 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2045 Duration::from_secs(0),
2046 true,
2047 false,
2048 true,
2049 ArcRPCClient(Arc::new(rpc_client)),
2050 ArcDeltasClient(Arc::new(deltas_client)),
2051 5000_u64,
2052 );
2053
2054 state_sync
2055 .initialize()
2056 .await
2057 .expect("Init should succeed");
2058
2059 state_sync.last_synced_block = Some(BlockHeader {
2061 hash: Bytes::from("0x0badc0ffee"),
2062 number: 42,
2063 parent_hash: Bytes::from("0xbadbeef0"),
2064 revert: false,
2065 timestamp: 123456789,
2066 });
2067
2068 let (mut block_tx, _block_rx) = channel(10);
2070
2071 let (_end_tx, end_rx) = oneshot::channel::<()>();
2073 let result = state_sync
2074 .state_sync(&mut block_tx, end_rx)
2075 .await;
2076 assert!(result.is_err(), "state_sync should have errored during processing");
2078
2079 }
2082
2083 #[test(tokio::test)]
2084 async fn test_close_signal_while_waiting_for_first_deltas() {
2085 let mut rpc_client = make_mock_client();
2089 let mut deltas_client = MockDeltasClient::new();
2090
2091 rpc_client
2092 .expect_get_protocol_components()
2093 .returning(|_| {
2094 Ok(ProtocolComponentRequestResponse {
2095 protocol_components: vec![],
2096 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2097 })
2098 });
2099
2100 let (_tx, rx) = channel(1);
2101 deltas_client
2102 .expect_subscribe()
2103 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2104
2105 deltas_client
2106 .expect_unsubscribe()
2107 .return_once(|_| Ok(()));
2108
2109 let mut state_sync = ProtocolStateSynchronizer::new(
2110 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2111 true,
2112 ComponentFilter::with_tvl_range(0.0, 0.0),
2113 1,
2114 Duration::from_secs(0),
2115 true,
2116 true,
2117 false,
2118 ArcRPCClient(Arc::new(rpc_client)),
2119 ArcDeltasClient(Arc::new(deltas_client)),
2120 10000_u64,
2121 );
2122
2123 state_sync
2124 .initialize()
2125 .await
2126 .expect("Init should succeed");
2127
2128 let (mut block_tx, _block_rx) = channel(10);
2129 let (end_tx, end_rx) = oneshot::channel::<()>();
2130
2131 let state_sync_handle = tokio::spawn(async move {
2133 state_sync
2134 .state_sync(&mut block_tx, end_rx)
2135 .await
2136 });
2137
2138 tokio::time::sleep(Duration::from_millis(100)).await;
2140
2141 let _ = end_tx.send(());
2143
2144 let result = state_sync_handle
2146 .await
2147 .expect("Task should not panic");
2148 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2149
2150 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2151 }
2152
2153 #[test(tokio::test)]
2154 async fn test_close_signal_during_main_processing_loop() {
2155 let mut rpc_client = make_mock_client();
2161 let mut deltas_client = MockDeltasClient::new();
2162
2163 rpc_client
2165 .expect_get_protocol_components()
2166 .returning(|_| {
2167 Ok(ProtocolComponentRequestResponse {
2168 protocol_components: vec![],
2169 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2170 })
2171 });
2172
2173 rpc_client
2175 .expect_get_protocol_states()
2176 .returning(|_| {
2177 Ok(ProtocolStateRequestResponse {
2178 states: vec![],
2179 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2180 })
2181 });
2182
2183 rpc_client
2184 .expect_get_component_tvl()
2185 .returning(|_| {
2186 Ok(ComponentTvlRequestResponse {
2187 tvl: HashMap::new(),
2188 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2189 })
2190 });
2191
2192 rpc_client
2193 .expect_get_traced_entry_points()
2194 .returning(|_| {
2195 Ok(TracedEntryPointRequestResponse {
2196 traced_entry_points: HashMap::new(),
2197 pagination: PaginationResponse::new(0, 20, 0),
2198 })
2199 });
2200
2201 let (tx, rx) = channel(10);
2203 deltas_client
2204 .expect_subscribe()
2205 .return_once(move |_, _| {
2206 let first_delta = BlockChanges {
2208 extractor: "test".to_string(),
2209 chain: Chain::Ethereum,
2210 block: Block {
2211 hash: Bytes::from("0x0123"),
2212 number: 1,
2213 parent_hash: Bytes::from("0x0000"),
2214 chain: Chain::Ethereum,
2215 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2216 .unwrap()
2217 .naive_utc(),
2218 },
2219 revert: false,
2220 ..Default::default()
2221 };
2222
2223 tokio::spawn(async move {
2224 let _ = tx.send(first_delta).await;
2225 tokio::time::sleep(Duration::from_secs(30)).await;
2228 });
2229
2230 Ok((Uuid::default(), rx))
2231 });
2232
2233 deltas_client
2234 .expect_unsubscribe()
2235 .return_once(|_| Ok(()));
2236
2237 let mut state_sync = ProtocolStateSynchronizer::new(
2238 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2239 true,
2240 ComponentFilter::with_tvl_range(0.0, 1000.0),
2241 1,
2242 Duration::from_secs(0),
2243 true,
2244 false,
2245 true,
2246 ArcRPCClient(Arc::new(rpc_client)),
2247 ArcDeltasClient(Arc::new(deltas_client)),
2248 10000_u64,
2249 );
2250
2251 state_sync
2252 .initialize()
2253 .await
2254 .expect("Init should succeed");
2255
2256 let (mut block_tx, mut block_rx) = channel(10);
2257 let (end_tx, end_rx) = oneshot::channel::<()>();
2258
2259 let state_sync_handle = tokio::spawn(async move {
2261 state_sync
2262 .state_sync(&mut block_tx, end_rx)
2263 .await
2264 });
2265
2266 let first_snapshot = block_rx
2268 .recv()
2269 .await
2270 .expect("Should receive first snapshot")
2271 .expect("Synchronizer error");
2272 assert!(
2273 !first_snapshot
2274 .snapshots
2275 .states
2276 .is_empty() ||
2277 first_snapshot.deltas.is_some()
2278 );
2279 let _ = end_tx.send(());
2281
2282 let result = state_sync_handle
2284 .await
2285 .expect("Task should not panic");
2286 assert!(
2287 result.is_ok(),
2288 "state_sync should exit cleanly when closed after first message: {result:?}"
2289 );
2290 }
2291
2292 #[test(tokio::test)]
2293 async fn test_max_retries_exceeded_error_propagation() {
2294 let mut rpc_client = make_mock_client();
2298 let mut deltas_client = MockDeltasClient::new();
2299
2300 rpc_client
2302 .expect_get_protocol_components()
2303 .returning(|_| {
2304 Ok(ProtocolComponentRequestResponse {
2305 protocol_components: vec![],
2306 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2307 })
2308 });
2309
2310 deltas_client
2313 .expect_subscribe()
2314 .returning(|_, _| {
2315 Err(DeltasError::NotConnected)
2317 });
2318
2319 deltas_client
2321 .expect_unsubscribe()
2322 .returning(|_| Ok(()))
2323 .times(0..=5);
2324
2325 let mut state_sync = ProtocolStateSynchronizer::new(
2327 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2328 true,
2329 ComponentFilter::with_tvl_range(0.0, 1000.0),
2330 2, Duration::from_millis(10), true,
2333 false,
2334 true,
2335 ArcRPCClient(Arc::new(rpc_client)),
2336 ArcDeltasClient(Arc::new(deltas_client)),
2337 1000_u64,
2338 );
2339
2340 state_sync
2341 .initialize()
2342 .await
2343 .expect("Init should succeed");
2344
2345 let (handle, mut rx) = state_sync.start().await;
2347 let (jh, _close_tx) = handle.split();
2348
2349 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2350 .await
2351 .expect("responsds in time")
2352 .expect("channel open");
2353
2354 if let Err(err) = res {
2356 assert!(
2357 matches!(err, SynchronizerError::ConnectionClosed),
2358 "Expected ConnectionClosed error, got: {:?}",
2359 err
2360 );
2361 } else {
2362 panic!("Expected an error")
2363 }
2364
2365 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2367 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2368 }
2369
2370 #[test(tokio::test)]
2371 async fn test_is_next_expected() {
2372 let mut state_sync = with_mocked_clients(true, false, None, None);
2376
2377 let incoming_header = BlockHeader {
2379 number: 100,
2380 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2381 parent_hash: Bytes::from(
2382 "0x0000000000000000000000000000000000000000000000000000000000000000",
2383 ),
2384 revert: false,
2385 timestamp: 123456789,
2386 };
2387 assert!(
2388 !state_sync.is_next_expected(&incoming_header),
2389 "Should return false when no previous block is set"
2390 );
2391
2392 let previous_header = BlockHeader {
2394 number: 99,
2395 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2396 parent_hash: Bytes::from(
2397 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2398 ),
2399 revert: false,
2400 timestamp: 123456788,
2401 };
2402 state_sync.last_synced_block = Some(previous_header.clone());
2403
2404 assert!(
2405 state_sync.is_next_expected(&incoming_header),
2406 "Should return true when incoming parent_hash matches previous hash"
2407 );
2408
2409 let non_matching_header = BlockHeader {
2411 number: 100,
2412 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2413 parent_hash: Bytes::from(
2414 "0x1111111111111111111111111111111111111111111111111111111111111111",
2415 ), revert: false,
2417 timestamp: 123456789,
2418 };
2419 assert!(
2420 !state_sync.is_next_expected(&non_matching_header),
2421 "Should return false when incoming parent_hash doesn't match previous hash"
2422 );
2423 }
2424
2425 #[test(tokio::test)]
2426 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2427 let mut rpc_client = make_mock_client();
2431 let mut deltas_client = MockDeltasClient::new();
2432
2433 rpc_client
2435 .expect_get_protocol_components()
2436 .returning(|_| {
2437 Ok(ProtocolComponentRequestResponse {
2438 protocol_components: vec![ProtocolComponent {
2439 id: "Component1".to_string(),
2440 ..Default::default()
2441 }],
2442 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2443 })
2444 });
2445
2446 let (tx, rx) = channel(10);
2448 deltas_client
2449 .expect_subscribe()
2450 .return_once(move |_, _| {
2451 let expected_next_delta = BlockChanges {
2452 extractor: "uniswap-v2".to_string(),
2453 chain: Chain::Ethereum,
2454 block: Block {
2455 hash: Bytes::from(
2456 "0x0000000000000000000000000000000000000000000000000000000000000002",
2457 ), number: 2,
2459 parent_hash: Bytes::from(
2460 "0x0000000000000000000000000000000000000000000000000000000000000001",
2461 ), chain: Chain::Ethereum,
2463 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2464 .unwrap()
2465 .naive_utc(),
2466 },
2467 revert: false,
2468 ..Default::default()
2469 };
2470
2471 tokio::spawn(async move {
2472 let _ = tx.send(expected_next_delta).await;
2473 });
2474
2475 Ok((Uuid::default(), rx))
2476 });
2477
2478 deltas_client
2479 .expect_unsubscribe()
2480 .return_once(|_| Ok(()));
2481
2482 let mut state_sync = ProtocolStateSynchronizer::new(
2483 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2484 true,
2485 ComponentFilter::with_tvl_range(0.0, 1000.0),
2486 1,
2487 Duration::from_secs(0),
2488 true, false,
2490 true,
2491 ArcRPCClient(Arc::new(rpc_client)),
2492 ArcDeltasClient(Arc::new(deltas_client)),
2493 10000_u64,
2494 );
2495
2496 state_sync
2498 .initialize()
2499 .await
2500 .expect("Init should succeed");
2501
2502 state_sync.last_synced_block = Some(BlockHeader {
2504 number: 1,
2505 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2507 "0x0000000000000000000000000000000000000000000000000000000000000000",
2508 ),
2509 revert: false,
2510 timestamp: 123456789,
2511 });
2512
2513 let (mut block_tx, mut block_rx) = channel(10);
2514 let (end_tx, end_rx) = oneshot::channel::<()>();
2515
2516 let state_sync_handle = tokio::spawn(async move {
2518 state_sync
2519 .state_sync(&mut block_tx, end_rx)
2520 .await
2521 });
2522
2523 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2525 .await
2526 .expect("Should receive message within timeout")
2527 .expect("Channel should be open")
2528 .expect("Should not be an error");
2529
2530 let _ = end_tx.send(());
2532
2533 let _ = state_sync_handle
2535 .await
2536 .expect("Task should not panic");
2537
2538 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2541 assert!(
2542 result_msg.snapshots.states.is_empty(),
2543 "Should not contain snapshots when next expected block is received"
2544 );
2545
2546 if let Some(deltas) = &result_msg.deltas {
2548 assert_eq!(deltas.block.number, 2);
2549 assert_eq!(
2550 deltas.block.hash,
2551 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2552 );
2553 assert_eq!(
2554 deltas.block.parent_hash,
2555 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2556 );
2557 }
2558 }
2559
2560 #[test(tokio::test)]
2561 async fn test_skip_previously_processed_messages() {
2562 let mut rpc_client = make_mock_client();
2566 let mut deltas_client = MockDeltasClient::new();
2567
2568 rpc_client
2570 .expect_get_protocol_components()
2571 .returning(|_| {
2572 Ok(ProtocolComponentRequestResponse {
2573 protocol_components: vec![ProtocolComponent {
2574 id: "Component1".to_string(),
2575 ..Default::default()
2576 }],
2577 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2578 })
2579 });
2580
2581 rpc_client
2583 .expect_get_protocol_states()
2584 .returning(|_| {
2585 Ok(ProtocolStateRequestResponse {
2586 states: vec![ResponseProtocolState {
2587 component_id: "Component1".to_string(),
2588 ..Default::default()
2589 }],
2590 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2591 })
2592 });
2593
2594 rpc_client
2595 .expect_get_component_tvl()
2596 .returning(|_| {
2597 Ok(ComponentTvlRequestResponse {
2598 tvl: [("Component1".to_string(), 100.0)]
2599 .into_iter()
2600 .collect(),
2601 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2602 })
2603 });
2604
2605 rpc_client
2606 .expect_get_traced_entry_points()
2607 .returning(|_| {
2608 Ok(TracedEntryPointRequestResponse {
2609 traced_entry_points: HashMap::new(),
2610 pagination: PaginationResponse::new(0, 20, 0),
2611 })
2612 });
2613
2614 let (tx, rx) = channel(10);
2616 deltas_client
2617 .expect_subscribe()
2618 .return_once(move |_, _| {
2619 let old_messages = vec![
2621 BlockChanges {
2622 extractor: "uniswap-v2".to_string(),
2623 chain: Chain::Ethereum,
2624 block: Block {
2625 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2626 number: 3,
2627 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2628 chain: Chain::Ethereum,
2629 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2630 },
2631 revert: false,
2632 ..Default::default()
2633 },
2634 BlockChanges {
2635 extractor: "uniswap-v2".to_string(),
2636 chain: Chain::Ethereum,
2637 block: Block {
2638 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2639 number: 4,
2640 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2641 chain: Chain::Ethereum,
2642 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2643 },
2644 revert: false,
2645 ..Default::default()
2646 },
2647 BlockChanges {
2648 extractor: "uniswap-v2".to_string(),
2649 chain: Chain::Ethereum,
2650 block: Block {
2651 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2652 number: 5,
2653 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2654 chain: Chain::Ethereum,
2655 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2656 },
2657 revert: false,
2658 ..Default::default()
2659 },
2660 BlockChanges {
2662 extractor: "uniswap-v2".to_string(),
2663 chain: Chain::Ethereum,
2664 block: Block {
2665 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2666 number: 6,
2667 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2668 chain: Chain::Ethereum,
2669 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2670 },
2671 revert: false,
2672 ..Default::default()
2673 },
2674 ];
2675
2676 tokio::spawn(async move {
2677 for message in old_messages {
2678 let _ = tx.send(message).await;
2679 tokio::time::sleep(Duration::from_millis(10)).await;
2680 }
2681 });
2682
2683 Ok((Uuid::default(), rx))
2684 });
2685
2686 deltas_client
2687 .expect_unsubscribe()
2688 .return_once(|_| Ok(()));
2689
2690 let mut state_sync = ProtocolStateSynchronizer::new(
2691 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2692 true,
2693 ComponentFilter::with_tvl_range(0.0, 1000.0),
2694 1,
2695 Duration::from_secs(0),
2696 true,
2697 true,
2698 true,
2699 ArcRPCClient(Arc::new(rpc_client)),
2700 ArcDeltasClient(Arc::new(deltas_client)),
2701 10000_u64,
2702 );
2703
2704 state_sync
2706 .initialize()
2707 .await
2708 .expect("Init should succeed");
2709
2710 state_sync.last_synced_block = Some(BlockHeader {
2711 number: 5,
2712 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2713 parent_hash: Bytes::from(
2714 "0x0000000000000000000000000000000000000000000000000000000000000004",
2715 ),
2716 revert: false,
2717 timestamp: 1234567892,
2718 });
2719
2720 let (mut block_tx, mut block_rx) = channel(10);
2721 let (end_tx, end_rx) = oneshot::channel::<()>();
2722
2723 let state_sync_handle = tokio::spawn(async move {
2725 state_sync
2726 .state_sync(&mut block_tx, end_rx)
2727 .await
2728 });
2729
2730 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2732 .await
2733 .expect("Should receive message within timeout")
2734 .expect("Channel should be open")
2735 .expect("Should not be an error");
2736
2737 let _ = end_tx.send(());
2739
2740 let _ = state_sync_handle
2742 .await
2743 .expect("Task should not panic");
2744
2745 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2747 if let Some(deltas) = &result_msg.deltas {
2748 assert_eq!(
2749 deltas.block.number, 6,
2750 "Should only process block 6, skipping earlier blocks"
2751 );
2752 assert_eq!(
2753 deltas.block.hash,
2754 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2755 );
2756 }
2757
2758 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2761 Err(_) => {
2762 }
2764 Ok(Some(Err(_))) => {
2765 }
2767 Ok(Some(Ok(_))) => {
2768 panic!("Should not receive additional messages - old blocks should be skipped");
2769 }
2770 Ok(None) => {
2771 }
2773 }
2774 }
2775}