1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59
60 #[error("Internal error: {0}")]
62 Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68 fn from(err: SendError<T>) -> Self {
69 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70 }
71}
72
73impl From<DeltasError> for SynchronizerError {
74 fn from(err: DeltasError) -> Self {
75 match err {
76 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77 _ => SynchronizerError::ConnectionError(err.to_string()),
78 }
79 }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83 extractor_id: ExtractorIdentity,
84 retrieve_balances: bool,
85 rpc_client: R,
86 deltas_client: D,
87 max_retries: u64,
88 retry_cooldown: Duration,
89 include_snapshots: bool,
90 component_tracker: ComponentTracker<R>,
91 last_synced_block: Option<BlockHeader>,
92 timeout: u64,
93 include_tvl: bool,
94 compression: bool,
95 partial_blocks: bool,
96}
97
98#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
99pub struct ComponentWithState {
100 pub state: ResponseProtocolState,
101 pub component: ProtocolComponent,
102 pub component_tvl: Option<f64>,
103 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
104}
105
106#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
107pub struct Snapshot {
108 pub states: HashMap<String, ComponentWithState>,
109 pub vm_storage: HashMap<Bytes, ResponseAccount>,
110}
111
112impl Snapshot {
113 fn extend(&mut self, other: Snapshot) {
114 self.states.extend(other.states);
115 self.vm_storage.extend(other.vm_storage);
116 }
117
118 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
119 &self.states
120 }
121
122 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
123 &self.vm_storage
124 }
125}
126
127#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
128pub struct StateSyncMessage<H>
129where
130 H: HeaderLike,
131{
132 pub header: H,
134 pub snapshots: Snapshot,
136 pub deltas: Option<BlockChanges>,
140 pub removed_components: HashMap<String, ProtocolComponent>,
142}
143
144impl<H> StateSyncMessage<H>
145where
146 H: HeaderLike,
147{
148 pub fn merge(mut self, other: Self) -> Self {
149 self.removed_components
151 .retain(|k, _| !other.snapshots.states.contains_key(k));
152 self.snapshots
153 .states
154 .retain(|k, _| !other.removed_components.contains_key(k));
155
156 self.snapshots.extend(other.snapshots);
157 let deltas = match (self.deltas, other.deltas) {
158 (Some(l), Some(r)) => Some(l.merge(r)),
159 (None, Some(r)) => Some(r),
160 (Some(l), None) => Some(l),
161 (None, None) => None,
162 };
163 self.removed_components
164 .extend(other.removed_components);
165 Self {
166 header: other.header,
167 snapshots: self.snapshots,
168 deltas,
169 removed_components: self.removed_components,
170 }
171 }
172}
173
174pub struct SynchronizerTaskHandle {
179 join_handle: JoinHandle<()>,
180 close_tx: oneshot::Sender<()>,
181}
182
183impl SynchronizerTaskHandle {
192 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
193 Self { join_handle, close_tx }
194 }
195
196 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
202 (self.join_handle, self.close_tx)
203 }
204}
205
206#[async_trait]
207pub trait StateSynchronizer: Send + Sync + 'static {
208 async fn initialize(&mut self) -> SyncResult<()>;
209 async fn start(
212 mut self,
213 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
214}
215
216impl<R, D> ProtocolStateSynchronizer<R, D>
217where
218 R: RPCClient + Clone + Send + Sync + 'static,
221 D: DeltasClient + Clone + Send + Sync + 'static,
222{
223 #[allow(clippy::too_many_arguments)]
225 pub fn new(
226 extractor_id: ExtractorIdentity,
227 retrieve_balances: bool,
228 component_filter: ComponentFilter,
229 max_retries: u64,
230 retry_cooldown: Duration,
231 include_snapshots: bool,
232 include_tvl: bool,
233 compression: bool,
234 rpc_client: R,
235 deltas_client: D,
236 timeout: u64,
237 ) -> Self {
238 Self {
239 extractor_id: extractor_id.clone(),
240 retrieve_balances,
241 rpc_client: rpc_client.clone(),
242 include_snapshots,
243 deltas_client,
244 component_tracker: ComponentTracker::new(
245 extractor_id.chain,
246 extractor_id.name.as_str(),
247 component_filter,
248 rpc_client,
249 ),
250 max_retries,
251 retry_cooldown,
252 last_synced_block: None,
253 timeout,
254 include_tvl,
255 compression,
256 partial_blocks: false,
257 }
258 }
259
260 pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
262 self.partial_blocks = partial_blocks;
263 self
264 }
265
266 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
268 &mut self,
269 header: BlockHeader,
270 ids: Option<I>,
271 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
272 if !self.include_snapshots {
273 return Ok(StateSyncMessage { header, ..Default::default() });
274 }
275
276 let component_ids: Vec<_> = match ids {
278 Some(ids) => ids.into_iter().cloned().collect(),
279 None => self
280 .component_tracker
281 .get_tracked_component_ids(),
282 };
283
284 if component_ids.is_empty() {
285 return Ok(StateSyncMessage { header, ..Default::default() });
286 }
287
288 const DCI_PROTOCOLS: &[&str] = &[
290 "uniswap_v4_hooks",
291 "vm:curve",
292 "vm:balancer_v2",
293 "vm:balancer_v3",
294 "fluid_v1",
295 "erc4626",
296 ];
297 let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
298 let result = self
299 .rpc_client
300 .get_traced_entry_points_paginated(
301 self.extractor_id.chain,
302 &self.extractor_id.name,
303 &component_ids,
304 None,
305 RPC_CLIENT_CONCURRENCY,
306 )
307 .await?;
308 self.component_tracker
309 .process_entrypoints(&result.clone().into());
310 result.traced_entry_points.clone()
311 } else {
312 HashMap::new()
313 };
314
315 let contract_ids: Vec<Bytes> = self
317 .component_tracker
318 .get_contracts_by_component(&component_ids)
319 .into_iter()
320 .collect();
321
322 let request = SnapshotParameters::new(
323 self.extractor_id.chain,
324 &self.extractor_id.name,
325 &self.component_tracker.components,
326 &contract_ids,
327 header.number,
328 )
329 .entrypoints(&entrypoints_result)
330 .include_balances(self.retrieve_balances)
331 .include_tvl(self.include_tvl);
332 let snapshot_response = self
333 .rpc_client
334 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
335 .await?;
336
337 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
338 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
339
340 Ok(StateSyncMessage {
341 header,
342 snapshots: snapshot_response,
343 deltas: None,
344 removed_components: HashMap::new(),
345 })
346 }
347
348 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
361 async fn state_sync(
362 &mut self,
363 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
364 mut end_rx: oneshot::Receiver<()>,
365 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
366 let subscription_options = SubscriptionOptions::new()
368 .with_state(self.include_snapshots)
369 .with_compression(self.compression)
370 .with_partial_blocks(self.partial_blocks);
371 let (subscription_id, mut msg_rx) = match self
372 .deltas_client
373 .subscribe(self.extractor_id.clone(), subscription_options)
374 .await
375 {
376 Ok(result) => result,
377 Err(e) => return Err((e.into(), Some(end_rx))),
378 };
379
380 let result = async {
381 info!("Waiting for deltas...");
382 let mut warned_waiting_for_new_block = false;
383 let mut warned_skipping_synced = false;
384 let mut last_block_number: Option<u64> = None;
386 let mut first_msg = loop {
387 let msg = select! {
388 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
389 deltas_result
390 .map_err(|_| {
391 SynchronizerError::Timeout(format!(
392 "First deltas took longer than {t}s to arrive",
393 t = self.timeout
394 ))
395 })?
396 .ok_or_else(|| {
397 SynchronizerError::ConnectionError(
398 "Deltas channel closed before first message".to_string(),
399 )
400 })?
401 },
402 _ = &mut end_rx => {
403 info!("Received close signal while waiting for first deltas");
404 return Ok(());
405 }
406 };
407
408 let incoming: BlockHeader = (&msg).into();
409
410 let is_new_block_candidate = if self.partial_blocks {
414 match msg.partial_block_index {
415 None => {
416 last_block_number = Some(incoming.number);
418 true
419 }
420 Some(current_partial_idx) => {
421 let is_new_block = last_block_number
422 .map(|prev_block| incoming.number > prev_block)
423 .unwrap_or(false);
424
425 if !warned_waiting_for_new_block {
426 info!(
427 extractor=%self.extractor_id,
428 block=incoming.number,
429 partial_idx=current_partial_idx,
430 "Syncing. Waiting for new block to start"
431 );
432 warned_waiting_for_new_block = true;
433 }
434 last_block_number = Some(incoming.number);
435 is_new_block
436 }
437 }
438 } else {
439 true };
441
442 if !is_new_block_candidate {
443 continue;
444 }
445
446 if let Some(current) = &self.last_synced_block {
448 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
449 if !warned_skipping_synced {
450 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
451 warned_skipping_synced = true;
452 }
453 continue;
454 }
455 }
456 break msg;
457 };
458
459 self.filter_deltas(&mut first_msg);
460
461 info!(height = first_msg.get_block().number, "First deltas received");
463 let header: BlockHeader = (&first_msg).into();
464 let deltas_msg = StateSyncMessage {
465 header: header.clone(),
466 snapshots: Default::default(),
467 deltas: Some(first_msg),
468 removed_components: Default::default(),
469 };
470
471 let msg = if !self.is_next_expected(&header) {
473 info!("Retrieving snapshot");
474 let snapshot_header = BlockHeader { revert: false, ..header.clone() };
475 let snapshot = self
476 .get_snapshots::<Vec<&String>>(
477 snapshot_header,
478 None,
479 )
480 .await?
481 .merge(deltas_msg);
482 let n_components = self.component_tracker.components.len();
483 let n_snapshots = snapshot.snapshots.states.len();
484 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
485 snapshot
486 } else {
487 deltas_msg
488 };
489 block_tx.send(Ok(msg)).await?;
490 self.last_synced_block = Some(header.clone());
491 loop {
492 select! {
493 deltas_opt = msg_rx.recv() => {
494 if let Some(mut deltas) = deltas_opt {
495 let header: BlockHeader = (&deltas).into();
496 debug!(block_number=?header.number, "Received delta message");
497
498 let (snapshots, removed_components) = {
499 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
502
503 let requiring_snapshot: Vec<_> = to_add
505 .iter()
506 .filter(|id| {
507 !self.component_tracker
508 .components
509 .contains_key(id.as_str())
510 })
511 .collect();
512 debug!(components=?requiring_snapshot, "SnapshotRequest");
513 self.component_tracker
514 .start_tracking(requiring_snapshot.as_slice())
515 .await?;
516
517 let snapshots = self
518 .get_snapshots(header.clone(), Some(requiring_snapshot))
519 .await?
520 .snapshots;
521
522 let removed_components = if !to_remove.is_empty() {
523 self.component_tracker.stop_tracking(&to_remove)
524 } else {
525 Default::default()
526 };
527
528 (snapshots, removed_components)
529 };
530
531 self.component_tracker.process_entrypoints(&deltas.dci_update);
533
534 self.filter_deltas(&mut deltas);
536 let n_changes = deltas.n_changes();
537
538 let next = StateSyncMessage {
540 header: header.clone(),
541 snapshots,
542 deltas: Some(deltas),
543 removed_components,
544 };
545 block_tx.send(Ok(next)).await?;
546 self.last_synced_block = Some(header.clone());
547
548 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
549 } else {
550 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
551 }
552 },
553 _ = &mut end_rx => {
554 info!("Received close signal during state_sync");
555 return Ok(());
556 }
557 }
558 }
559 }.await;
560
561 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
563 let _ = self
565 .deltas_client
566 .unsubscribe(subscription_id)
567 .await
568 .map_err(|err| {
569 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
570 });
571
572 match result {
575 Ok(()) => Ok(()), Err(e) => {
577 Err((e, Some(end_rx)))
581 }
582 }
583 }
584
585 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
586 if let Some(block) = self.last_synced_block.as_ref() {
587 return incoming.parent_hash == block.hash;
588 }
589 false
590 }
591 fn filter_deltas(&self, deltas: &mut BlockChanges) {
592 deltas.filter_by_component(|id| {
593 self.component_tracker
594 .components
595 .contains_key(id)
596 });
597 deltas.filter_by_contract(|id| {
598 self.component_tracker
599 .contracts
600 .contains(id)
601 });
602 }
603}
604
605#[async_trait]
606impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
607where
608 R: RPCClient + Clone + Send + Sync + 'static,
609 D: DeltasClient + Clone + Send + Sync + 'static,
610{
611 async fn initialize(&mut self) -> SyncResult<()> {
612 info!("Retrieving relevant protocol components");
613 self.component_tracker
614 .initialise_components()
615 .await?;
616 info!(
617 n_components = self.component_tracker.components.len(),
618 n_contracts = self.component_tracker.contracts.len(),
619 "Finished retrieving components",
620 );
621
622 Ok(())
623 }
624
625 async fn start(
626 mut self,
627 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
628 let (mut tx, rx) = channel(15);
629 let (end_tx, end_rx) = oneshot::channel::<()>();
630
631 let jh = tokio::spawn(async move {
632 let mut retry_count = 0;
633 let mut current_end_rx = end_rx;
634 let mut final_error = None;
635
636 while retry_count < self.max_retries {
637 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
638
639 let res = self
640 .state_sync(&mut tx, current_end_rx)
641 .await;
642 match res {
643 Ok(()) => {
644 info!(
645 extractor_id=%&self.extractor_id,
646 retry_count,
647 "State synchronization exited cleanly"
648 );
649 return;
650 }
651 Err((e, maybe_end_rx)) => {
652 warn!(
653 extractor_id=%&self.extractor_id,
654 retry_count,
655 error=%e,
656 "State synchronization errored!"
657 );
658
659 if let Some(recovered_end_rx) = maybe_end_rx {
661 current_end_rx = recovered_end_rx;
662
663 if let SynchronizerError::ConnectionClosed = e {
664 error!(
666 "Websocket connection closed. State synchronization exiting."
667 );
668 let _ = tx.send(Err(e)).await;
669 return;
670 } else {
671 final_error = Some(e);
673 }
674 } else {
675 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
677 return;
678 }
679 }
680 }
681 sleep(self.retry_cooldown).await;
682 retry_count += 1;
683 }
684 if let Some(e) = final_error {
685 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
686 let _ = tx.send(Err(e)).await;
687 }
688 });
689
690 let handle = SynchronizerTaskHandle::new(jh, end_tx);
691 (handle, rx)
692 }
693}
694
695#[cfg(test)]
696mod test {
697 use std::{collections::HashSet, sync::Arc};
716
717 use tycho_common::dto::{
718 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
719 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
720 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
721 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
722 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
723 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
724 };
725 use uuid::Uuid;
726
727 use super::*;
728 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
729
730 struct ArcRPCClient<T>(Arc<T>);
732
733 impl<T> Clone for ArcRPCClient<T> {
735 fn clone(&self) -> Self {
736 ArcRPCClient(self.0.clone())
737 }
738 }
739
740 #[async_trait]
741 impl<T> RPCClient for ArcRPCClient<T>
742 where
743 T: RPCClient + Sync + Send + 'static,
744 {
745 async fn get_tokens(
746 &self,
747 request: &TokensRequestBody,
748 ) -> Result<TokensRequestResponse, RPCError> {
749 self.0.get_tokens(request).await
750 }
751
752 async fn get_contract_state(
753 &self,
754 request: &StateRequestBody,
755 ) -> Result<StateRequestResponse, RPCError> {
756 self.0.get_contract_state(request).await
757 }
758
759 async fn get_protocol_components(
760 &self,
761 request: &ProtocolComponentsRequestBody,
762 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
763 self.0
764 .get_protocol_components(request)
765 .await
766 }
767
768 async fn get_protocol_states(
769 &self,
770 request: &ProtocolStateRequestBody,
771 ) -> Result<ProtocolStateRequestResponse, RPCError> {
772 self.0
773 .get_protocol_states(request)
774 .await
775 }
776
777 async fn get_protocol_systems(
778 &self,
779 request: &ProtocolSystemsRequestBody,
780 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
781 self.0
782 .get_protocol_systems(request)
783 .await
784 }
785
786 async fn get_component_tvl(
787 &self,
788 request: &ComponentTvlRequestBody,
789 ) -> Result<ComponentTvlRequestResponse, RPCError> {
790 self.0.get_component_tvl(request).await
791 }
792
793 async fn get_traced_entry_points(
794 &self,
795 request: &TracedEntryPointRequestBody,
796 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
797 self.0
798 .get_traced_entry_points(request)
799 .await
800 }
801
802 async fn get_snapshots<'a>(
803 &self,
804 request: &SnapshotParameters<'a>,
805 chunk_size: Option<usize>,
806 concurrency: usize,
807 ) -> Result<Snapshot, RPCError> {
808 self.0
809 .get_snapshots(request, chunk_size, concurrency)
810 .await
811 }
812
813 fn compression(&self) -> bool {
814 self.0.compression()
815 }
816 }
817
818 struct ArcDeltasClient<T>(Arc<T>);
820
821 impl<T> Clone for ArcDeltasClient<T> {
823 fn clone(&self) -> Self {
824 ArcDeltasClient(self.0.clone())
825 }
826 }
827
828 #[async_trait]
829 impl<T> DeltasClient for ArcDeltasClient<T>
830 where
831 T: DeltasClient + Sync + Send + 'static,
832 {
833 async fn subscribe(
834 &self,
835 extractor_id: ExtractorIdentity,
836 options: SubscriptionOptions,
837 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
838 self.0
839 .subscribe(extractor_id, options)
840 .await
841 }
842
843 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
844 self.0
845 .unsubscribe(subscription_id)
846 .await
847 }
848
849 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
850 self.0.connect().await
851 }
852
853 async fn close(&self) -> Result<(), DeltasError> {
854 self.0.close().await
855 }
856 }
857
858 fn with_mocked_clients(
859 native: bool,
860 include_tvl: bool,
861 rpc_client: Option<MockRPCClient>,
862 deltas_client: Option<MockDeltasClient>,
863 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
864 {
865 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
866 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
867
868 ProtocolStateSynchronizer::new(
869 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
870 native,
871 ComponentFilter::with_tvl_range(50.0, 50.0),
872 1,
873 Duration::from_secs(0),
874 true,
875 include_tvl,
876 true, rpc_client,
878 deltas_client,
879 10_u64,
880 )
881 }
882
883 fn state_snapshot_native() -> ProtocolStateRequestResponse {
884 ProtocolStateRequestResponse {
885 states: vec![ResponseProtocolState {
886 component_id: "Component1".to_string(),
887 ..Default::default()
888 }],
889 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
890 }
891 }
892
893 fn make_mock_client() -> MockRPCClient {
894 let mut m = MockRPCClient::new();
895 m.expect_compression()
896 .return_const(false);
897 m
898 }
899
900 #[test_log::test(tokio::test)]
901 async fn test_get_snapshots_native() {
902 let header = BlockHeader::default();
903 let mut rpc = make_mock_client();
904 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
905
906 let component_clone = component.clone();
907 rpc.expect_get_snapshots()
908 .returning(move |_request, _chunk_size, _concurrency| {
909 Ok(Snapshot {
910 states: state_snapshot_native()
911 .states
912 .into_iter()
913 .map(|state| {
914 (
915 state.component_id.clone(),
916 ComponentWithState {
917 state,
918 component: component_clone.clone(),
919 entrypoints: vec![],
920 component_tvl: None,
921 },
922 )
923 })
924 .collect(),
925 vm_storage: HashMap::new(),
926 })
927 });
928
929 rpc.expect_get_traced_entry_points()
930 .returning(|_| {
931 Ok(TracedEntryPointRequestResponse {
932 traced_entry_points: HashMap::new(),
933 pagination: PaginationResponse::new(0, 20, 0),
934 })
935 });
936
937 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
938 state_sync
939 .component_tracker
940 .components
941 .insert("Component1".to_string(), component.clone());
942 let components_arg = ["Component1".to_string()];
943 let exp = StateSyncMessage {
944 header: header.clone(),
945 snapshots: Snapshot {
946 states: state_snapshot_native()
947 .states
948 .into_iter()
949 .map(|state| {
950 (
951 state.component_id.clone(),
952 ComponentWithState {
953 state,
954 component: component.clone(),
955 entrypoints: vec![],
956 component_tvl: None,
957 },
958 )
959 })
960 .collect(),
961 vm_storage: HashMap::new(),
962 },
963 deltas: None,
964 removed_components: Default::default(),
965 };
966
967 let snap = state_sync
968 .get_snapshots(header, Some(&components_arg))
969 .await
970 .expect("Retrieving snapshot failed");
971
972 assert_eq!(snap, exp);
973 }
974
975 #[test_log::test(tokio::test)]
976 async fn test_get_snapshots_native_with_tvl() {
977 let header = BlockHeader::default();
978 let mut rpc = make_mock_client();
979 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
980
981 let component_clone = component.clone();
982 rpc.expect_get_snapshots()
983 .returning(move |_request, _chunk_size, _concurrency| {
984 Ok(Snapshot {
985 states: state_snapshot_native()
986 .states
987 .into_iter()
988 .map(|state| {
989 (
990 state.component_id.clone(),
991 ComponentWithState {
992 state,
993 component: component_clone.clone(),
994 component_tvl: Some(100.0),
995 entrypoints: vec![],
996 },
997 )
998 })
999 .collect(),
1000 vm_storage: HashMap::new(),
1001 })
1002 });
1003
1004 rpc.expect_get_traced_entry_points()
1005 .returning(|_| {
1006 Ok(TracedEntryPointRequestResponse {
1007 traced_entry_points: HashMap::new(),
1008 pagination: PaginationResponse::new(0, 20, 0),
1009 })
1010 });
1011
1012 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1013 state_sync
1014 .component_tracker
1015 .components
1016 .insert("Component1".to_string(), component.clone());
1017 let components_arg = ["Component1".to_string()];
1018 let exp = StateSyncMessage {
1019 header: header.clone(),
1020 snapshots: Snapshot {
1021 states: state_snapshot_native()
1022 .states
1023 .into_iter()
1024 .map(|state| {
1025 (
1026 state.component_id.clone(),
1027 ComponentWithState {
1028 state,
1029 component: component.clone(),
1030 component_tvl: Some(100.0),
1031 entrypoints: vec![],
1032 },
1033 )
1034 })
1035 .collect(),
1036 vm_storage: HashMap::new(),
1037 },
1038 deltas: None,
1039 removed_components: Default::default(),
1040 };
1041
1042 let snap = state_sync
1043 .get_snapshots(header, Some(&components_arg))
1044 .await
1045 .expect("Retrieving snapshot failed");
1046
1047 assert_eq!(snap, exp);
1048 }
1049
1050 fn state_snapshot_vm() -> StateRequestResponse {
1051 StateRequestResponse {
1052 accounts: vec![
1053 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1054 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1055 ],
1056 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1057 }
1058 }
1059
1060 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1061 TracedEntryPointRequestResponse {
1062 traced_entry_points: HashMap::from([(
1063 "Component1".to_string(),
1064 vec![(
1065 EntryPointWithTracingParams {
1066 entry_point: EntryPoint {
1067 external_id: "entrypoint_a".to_string(),
1068 target: Bytes::from("0x0badc0ffee"),
1069 signature: "sig()".to_string(),
1070 },
1071 params: TracingParams::RPCTracer(RPCTracerParams {
1072 caller: Some(Bytes::from("0x0badc0ffee")),
1073 calldata: Bytes::from("0x0badc0ffee"),
1074 state_overrides: None,
1075 prune_addresses: None,
1076 }),
1077 },
1078 TracingResult {
1079 retriggers: HashSet::from([(
1080 Bytes::from("0x0badc0ffee"),
1081 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1082 )]),
1083 accessed_slots: HashMap::from([(
1084 Bytes::from("0x0badc0ffee"),
1085 HashSet::from([Bytes::from("0xbadbeef0")]),
1086 )]),
1087 },
1088 )],
1089 )]),
1090 pagination: PaginationResponse::new(0, 20, 0),
1091 }
1092 }
1093
1094 #[test_log::test(tokio::test)]
1095 async fn test_get_snapshots_vm() {
1096 let header = BlockHeader::default();
1097 let mut rpc = make_mock_client();
1098
1099 let traced_ep_response = traced_entry_point_response();
1100 rpc.expect_get_snapshots()
1101 .returning(move |_request, _chunk_size, _concurrency| {
1102 let vm_storage_accounts = state_snapshot_vm();
1103 Ok(Snapshot {
1104 states: [(
1105 "Component1".to_string(),
1106 ComponentWithState {
1107 state: ResponseProtocolState {
1108 component_id: "Component1".to_string(),
1109 ..Default::default()
1110 },
1111 component: ProtocolComponent {
1112 id: "Component1".to_string(),
1113 contract_ids: vec![
1114 Bytes::from("0x0badc0ffee"),
1115 Bytes::from("0xbabe42"),
1116 ],
1117 ..Default::default()
1118 },
1119 component_tvl: None,
1120 entrypoints: traced_ep_response
1121 .traced_entry_points
1122 .get("Component1")
1123 .cloned()
1124 .unwrap_or_default(),
1125 },
1126 )]
1127 .into_iter()
1128 .collect(),
1129 vm_storage: vm_storage_accounts
1130 .accounts
1131 .into_iter()
1132 .map(|state| (state.address.clone(), state))
1133 .collect(),
1134 })
1135 });
1136
1137 rpc.expect_get_traced_entry_points()
1138 .returning(|_| Ok(traced_entry_point_response()));
1139
1140 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1141 let component = ProtocolComponent {
1142 id: "Component1".to_string(),
1143 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1144 ..Default::default()
1145 };
1146 state_sync
1147 .component_tracker
1148 .components
1149 .insert("Component1".to_string(), component.clone());
1150 let components_arg = ["Component1".to_string()];
1151 let exp = StateSyncMessage {
1152 header: header.clone(),
1153 snapshots: Snapshot {
1154 states: [(
1155 component.id.clone(),
1156 ComponentWithState {
1157 state: ResponseProtocolState {
1158 component_id: "Component1".to_string(),
1159 ..Default::default()
1160 },
1161 component: component.clone(),
1162 component_tvl: None,
1163 entrypoints: vec![(
1164 EntryPointWithTracingParams {
1165 entry_point: EntryPoint {
1166 external_id: "entrypoint_a".to_string(),
1167 target: Bytes::from("0x0badc0ffee"),
1168 signature: "sig()".to_string(),
1169 },
1170 params: TracingParams::RPCTracer(RPCTracerParams {
1171 caller: Some(Bytes::from("0x0badc0ffee")),
1172 calldata: Bytes::from("0x0badc0ffee"),
1173 state_overrides: None,
1174 prune_addresses: None,
1175 }),
1176 },
1177 TracingResult {
1178 retriggers: HashSet::from([(
1179 Bytes::from("0x0badc0ffee"),
1180 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1181 )]),
1182 accessed_slots: HashMap::from([(
1183 Bytes::from("0x0badc0ffee"),
1184 HashSet::from([Bytes::from("0xbadbeef0")]),
1185 )]),
1186 },
1187 )],
1188 },
1189 )]
1190 .into_iter()
1191 .collect(),
1192 vm_storage: state_snapshot_vm()
1193 .accounts
1194 .into_iter()
1195 .map(|state| (state.address.clone(), state))
1196 .collect(),
1197 },
1198 deltas: None,
1199 removed_components: Default::default(),
1200 };
1201
1202 let snap = state_sync
1203 .get_snapshots(header, Some(&components_arg))
1204 .await
1205 .expect("Retrieving snapshot failed");
1206
1207 assert_eq!(snap, exp);
1208 }
1209
1210 #[test_log::test(tokio::test)]
1211 async fn test_get_snapshots_vm_with_tvl() {
1212 let header = BlockHeader::default();
1213 let mut rpc = make_mock_client();
1214 let component = ProtocolComponent {
1215 id: "Component1".to_string(),
1216 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1217 ..Default::default()
1218 };
1219
1220 let component_clone = component.clone();
1221 rpc.expect_get_snapshots()
1222 .returning(move |_request, _chunk_size, _concurrency| {
1223 let vm_storage_accounts = state_snapshot_vm();
1224 Ok(Snapshot {
1225 states: [(
1226 "Component1".to_string(),
1227 ComponentWithState {
1228 state: ResponseProtocolState {
1229 component_id: "Component1".to_string(),
1230 ..Default::default()
1231 },
1232 component: component_clone.clone(),
1233 component_tvl: Some(100.0),
1234 entrypoints: vec![],
1235 },
1236 )]
1237 .into_iter()
1238 .collect(),
1239 vm_storage: vm_storage_accounts
1240 .accounts
1241 .into_iter()
1242 .map(|state| (state.address.clone(), state))
1243 .collect(),
1244 })
1245 });
1246
1247 rpc.expect_get_traced_entry_points()
1248 .returning(|_| {
1249 Ok(TracedEntryPointRequestResponse {
1250 traced_entry_points: HashMap::new(),
1251 pagination: PaginationResponse::new(0, 20, 0),
1252 })
1253 });
1254
1255 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1256 state_sync
1257 .component_tracker
1258 .components
1259 .insert("Component1".to_string(), component.clone());
1260 let components_arg = ["Component1".to_string()];
1261 let exp = StateSyncMessage {
1262 header: header.clone(),
1263 snapshots: Snapshot {
1264 states: [(
1265 component.id.clone(),
1266 ComponentWithState {
1267 state: ResponseProtocolState {
1268 component_id: "Component1".to_string(),
1269 ..Default::default()
1270 },
1271 component: component.clone(),
1272 component_tvl: Some(100.0),
1273 entrypoints: vec![],
1274 },
1275 )]
1276 .into_iter()
1277 .collect(),
1278 vm_storage: state_snapshot_vm()
1279 .accounts
1280 .into_iter()
1281 .map(|state| (state.address.clone(), state))
1282 .collect(),
1283 },
1284 deltas: None,
1285 removed_components: Default::default(),
1286 };
1287
1288 let snap = state_sync
1289 .get_snapshots(header, Some(&components_arg))
1290 .await
1291 .expect("Retrieving snapshot failed");
1292
1293 assert_eq!(snap, exp);
1294 }
1295
1296 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1297 let mut rpc_client = make_mock_client();
1298 rpc_client
1301 .expect_get_protocol_components()
1302 .with(mockall::predicate::function(
1303 move |request_params: &ProtocolComponentsRequestBody| {
1304 if let Some(ids) = request_params.component_ids.as_ref() {
1305 ids.contains(&"Component3".to_string())
1306 } else {
1307 false
1308 }
1309 },
1310 ))
1311 .returning(|_| {
1312 Ok(ProtocolComponentRequestResponse {
1314 protocol_components: vec![
1315 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1317 ],
1318 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1319 })
1320 });
1321 rpc_client
1323 .expect_get_snapshots()
1324 .withf(
1325 |request: &SnapshotParameters,
1326 _chunk_size: &Option<usize>,
1327 _concurrency: &usize| {
1328 request
1329 .components
1330 .contains_key("Component3")
1331 },
1332 )
1333 .returning(|_request, _chunk_size, _concurrency| {
1334 Ok(Snapshot {
1335 states: [(
1336 "Component3".to_string(),
1337 ComponentWithState {
1338 state: ResponseProtocolState {
1339 component_id: "Component3".to_string(),
1340 ..Default::default()
1341 },
1342 component: ProtocolComponent {
1343 id: "Component3".to_string(),
1344 ..Default::default()
1345 },
1346 component_tvl: Some(1000.0),
1347 entrypoints: vec![],
1348 },
1349 )]
1350 .into_iter()
1351 .collect(),
1352 vm_storage: HashMap::new(),
1353 })
1354 });
1355
1356 rpc_client
1358 .expect_get_protocol_components()
1359 .returning(|_| {
1360 Ok(ProtocolComponentRequestResponse {
1362 protocol_components: vec![
1363 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1365 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1367 ],
1369 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1370 })
1371 });
1372
1373 rpc_client
1374 .expect_get_snapshots()
1375 .returning(|_request, _chunk_size, _concurrency| {
1376 Ok(Snapshot {
1377 states: [
1378 (
1379 "Component1".to_string(),
1380 ComponentWithState {
1381 state: ResponseProtocolState {
1382 component_id: "Component1".to_string(),
1383 ..Default::default()
1384 },
1385 component: ProtocolComponent {
1386 id: "Component1".to_string(),
1387 ..Default::default()
1388 },
1389 component_tvl: Some(100.0),
1390 entrypoints: vec![],
1391 },
1392 ),
1393 (
1394 "Component2".to_string(),
1395 ComponentWithState {
1396 state: ResponseProtocolState {
1397 component_id: "Component2".to_string(),
1398 ..Default::default()
1399 },
1400 component: ProtocolComponent {
1401 id: "Component2".to_string(),
1402 ..Default::default()
1403 },
1404 component_tvl: Some(0.0),
1405 entrypoints: vec![],
1406 },
1407 ),
1408 ]
1409 .into_iter()
1410 .collect(),
1411 vm_storage: HashMap::new(),
1412 })
1413 });
1414
1415 rpc_client
1417 .expect_get_traced_entry_points()
1418 .returning(|_| {
1419 Ok(TracedEntryPointRequestResponse {
1420 traced_entry_points: HashMap::new(),
1421 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1422 })
1423 });
1424
1425 let mut deltas_client = MockDeltasClient::new();
1427 let (tx, rx) = channel(1);
1428 deltas_client
1429 .expect_subscribe()
1430 .return_once(move |_, _| {
1431 Ok((Uuid::default(), rx))
1433 });
1434
1435 deltas_client
1437 .expect_unsubscribe()
1438 .return_once(|_| Ok(()));
1439
1440 (rpc_client, deltas_client, tx)
1441 }
1442
1443 #[test_log::test(tokio::test)]
1450 async fn test_state_sync() {
1451 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1452 let deltas = [
1453 BlockChanges {
1454 extractor: "uniswap-v2".to_string(),
1455 chain: Chain::Ethereum,
1456 block: Block {
1457 number: 1,
1458 hash: Bytes::from("0x01"),
1459 parent_hash: Bytes::from("0x00"),
1460 chain: Chain::Ethereum,
1461 ts: Default::default(),
1462 },
1463 revert: false,
1464 dci_update: DCIUpdate {
1465 new_entrypoints: HashMap::from([(
1466 "Component1".to_string(),
1467 HashSet::from([EntryPoint {
1468 external_id: "entrypoint_a".to_string(),
1469 target: Bytes::from("0x0badc0ffee"),
1470 signature: "sig()".to_string(),
1471 }]),
1472 )]),
1473 new_entrypoint_params: HashMap::from([(
1474 "entrypoint_a".to_string(),
1475 HashSet::from([(
1476 TracingParams::RPCTracer(RPCTracerParams {
1477 caller: Some(Bytes::from("0x0badc0ffee")),
1478 calldata: Bytes::from("0x0badc0ffee"),
1479 state_overrides: None,
1480 prune_addresses: None,
1481 }),
1482 "Component1".to_string(),
1483 )]),
1484 )]),
1485 trace_results: HashMap::from([(
1486 "entrypoint_a".to_string(),
1487 TracingResult {
1488 retriggers: HashSet::from([(
1489 Bytes::from("0x0badc0ffee"),
1490 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1491 )]),
1492 accessed_slots: HashMap::from([(
1493 Bytes::from("0x0badc0ffee"),
1494 HashSet::from([Bytes::from("0xbadbeef0")]),
1495 )]),
1496 },
1497 )]),
1498 },
1499 ..Default::default()
1500 },
1501 BlockChanges {
1502 extractor: "uniswap-v2".to_string(),
1503 chain: Chain::Ethereum,
1504 block: Block {
1505 number: 2,
1506 hash: Bytes::from("0x02"),
1507 parent_hash: Bytes::from("0x01"),
1508 chain: Chain::Ethereum,
1509 ts: Default::default(),
1510 },
1511 revert: false,
1512 component_tvl: [
1513 ("Component1".to_string(), 100.0),
1514 ("Component2".to_string(), 0.0),
1515 ("Component3".to_string(), 1000.0),
1516 ]
1517 .into_iter()
1518 .collect(),
1519 ..Default::default()
1520 },
1521 ];
1522 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1523 state_sync
1524 .initialize()
1525 .await
1526 .expect("Init failed");
1527
1528 let (handle, mut rx) = state_sync.start().await;
1530 let (jh, close_tx) = handle.split();
1531 tx.send(deltas[0].clone())
1532 .await
1533 .expect("deltas channel msg 0 closed!");
1534 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1535 .await
1536 .expect("waiting for first state msg timed out!")
1537 .expect("state sync block sender closed!");
1538 tx.send(deltas[1].clone())
1539 .await
1540 .expect("deltas channel msg 1 closed!");
1541 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1542 .await
1543 .expect("waiting for second state msg timed out!")
1544 .expect("state sync block sender closed!");
1545 let _ = close_tx.send(());
1546 jh.await
1547 .expect("state sync task panicked!");
1548
1549 let exp1 = StateSyncMessage {
1551 header: BlockHeader {
1552 number: 1,
1553 hash: Bytes::from("0x01"),
1554 parent_hash: Bytes::from("0x00"),
1555 revert: false,
1556 ..Default::default()
1557 },
1558 snapshots: Snapshot {
1559 states: [
1560 (
1561 "Component1".to_string(),
1562 ComponentWithState {
1563 state: ResponseProtocolState {
1564 component_id: "Component1".to_string(),
1565 ..Default::default()
1566 },
1567 component: ProtocolComponent {
1568 id: "Component1".to_string(),
1569 ..Default::default()
1570 },
1571 component_tvl: Some(100.0),
1572 entrypoints: vec![],
1573 },
1574 ),
1575 (
1576 "Component2".to_string(),
1577 ComponentWithState {
1578 state: ResponseProtocolState {
1579 component_id: "Component2".to_string(),
1580 ..Default::default()
1581 },
1582 component: ProtocolComponent {
1583 id: "Component2".to_string(),
1584 ..Default::default()
1585 },
1586 component_tvl: Some(0.0),
1587 entrypoints: vec![],
1588 },
1589 ),
1590 ]
1591 .into_iter()
1592 .collect(),
1593 vm_storage: HashMap::new(),
1594 },
1595 deltas: Some(deltas[0].clone()),
1596 removed_components: Default::default(),
1597 };
1598
1599 let exp2 = StateSyncMessage {
1600 header: BlockHeader {
1601 number: 2,
1602 hash: Bytes::from("0x02"),
1603 parent_hash: Bytes::from("0x01"),
1604 revert: false,
1605 ..Default::default()
1606 },
1607 snapshots: Snapshot {
1608 states: [
1609 (
1611 "Component3".to_string(),
1612 ComponentWithState {
1613 state: ResponseProtocolState {
1614 component_id: "Component3".to_string(),
1615 ..Default::default()
1616 },
1617 component: ProtocolComponent {
1618 id: "Component3".to_string(),
1619 ..Default::default()
1620 },
1621 component_tvl: Some(1000.0),
1622 entrypoints: vec![],
1623 },
1624 ),
1625 ]
1626 .into_iter()
1627 .collect(),
1628 vm_storage: HashMap::new(),
1629 },
1630 deltas: Some(BlockChanges {
1633 extractor: "uniswap-v2".to_string(),
1634 chain: Chain::Ethereum,
1635 block: Block {
1636 number: 2,
1637 hash: Bytes::from("0x02"),
1638 parent_hash: Bytes::from("0x01"),
1639 chain: Chain::Ethereum,
1640 ts: Default::default(),
1641 },
1642 revert: false,
1643 component_tvl: [
1644 ("Component1".to_string(), 100.0),
1646 ("Component3".to_string(), 1000.0),
1647 ]
1648 .into_iter()
1649 .collect(),
1650 ..Default::default()
1651 }),
1652 removed_components: [(
1654 "Component2".to_string(),
1655 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1656 )]
1657 .into_iter()
1658 .collect(),
1659 };
1660 assert_eq!(first_msg.unwrap(), exp1);
1661 assert_eq!(second_msg.unwrap(), exp2);
1662 }
1663
1664 #[test_log::test(tokio::test)]
1665 async fn test_state_sync_with_tvl_range() {
1666 let remove_tvl_threshold = 5.0;
1668 let add_tvl_threshold = 7.0;
1669
1670 let mut rpc_client = make_mock_client();
1671 let mut deltas_client = MockDeltasClient::new();
1672
1673 rpc_client
1674 .expect_get_protocol_components()
1675 .with(mockall::predicate::function(
1676 move |request_params: &ProtocolComponentsRequestBody| {
1677 if let Some(ids) = request_params.component_ids.as_ref() {
1678 ids.contains(&"Component3".to_string())
1679 } else {
1680 false
1681 }
1682 },
1683 ))
1684 .returning(|_| {
1685 Ok(ProtocolComponentRequestResponse {
1686 protocol_components: vec![ProtocolComponent {
1687 id: "Component3".to_string(),
1688 ..Default::default()
1689 }],
1690 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1691 })
1692 });
1693 rpc_client
1695 .expect_get_snapshots()
1696 .withf(
1697 |request: &SnapshotParameters,
1698 _chunk_size: &Option<usize>,
1699 _concurrency: &usize| {
1700 request
1701 .components
1702 .contains_key("Component3")
1703 },
1704 )
1705 .returning(|_request, _chunk_size, _concurrency| {
1706 Ok(Snapshot {
1707 states: [(
1708 "Component3".to_string(),
1709 ComponentWithState {
1710 state: ResponseProtocolState {
1711 component_id: "Component3".to_string(),
1712 ..Default::default()
1713 },
1714 component: ProtocolComponent {
1715 id: "Component3".to_string(),
1716 ..Default::default()
1717 },
1718 component_tvl: Some(10.0),
1719 entrypoints: vec![],
1720 },
1721 )]
1722 .into_iter()
1723 .collect(),
1724 vm_storage: HashMap::new(),
1725 })
1726 });
1727
1728 rpc_client
1730 .expect_get_protocol_components()
1731 .returning(|_| {
1732 Ok(ProtocolComponentRequestResponse {
1733 protocol_components: vec![
1734 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1735 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1736 ],
1737 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1738 })
1739 });
1740
1741 rpc_client
1743 .expect_get_snapshots()
1744 .returning(|_request, _chunk_size, _concurrency| {
1745 Ok(Snapshot {
1746 states: [
1747 (
1748 "Component1".to_string(),
1749 ComponentWithState {
1750 state: ResponseProtocolState {
1751 component_id: "Component1".to_string(),
1752 ..Default::default()
1753 },
1754 component: ProtocolComponent {
1755 id: "Component1".to_string(),
1756 ..Default::default()
1757 },
1758 component_tvl: Some(6.0),
1759 entrypoints: vec![],
1760 },
1761 ),
1762 (
1763 "Component2".to_string(),
1764 ComponentWithState {
1765 state: ResponseProtocolState {
1766 component_id: "Component2".to_string(),
1767 ..Default::default()
1768 },
1769 component: ProtocolComponent {
1770 id: "Component2".to_string(),
1771 ..Default::default()
1772 },
1773 component_tvl: Some(2.0),
1774 entrypoints: vec![],
1775 },
1776 ),
1777 ]
1778 .into_iter()
1779 .collect(),
1780 vm_storage: HashMap::new(),
1781 })
1782 });
1783
1784 rpc_client
1786 .expect_get_traced_entry_points()
1787 .returning(|_| {
1788 Ok(TracedEntryPointRequestResponse {
1789 traced_entry_points: HashMap::new(),
1790 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1791 })
1792 });
1793
1794 let (tx, rx) = channel(1);
1795 deltas_client
1796 .expect_subscribe()
1797 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1798
1799 deltas_client
1801 .expect_unsubscribe()
1802 .return_once(|_| Ok(()));
1803
1804 let mut state_sync = ProtocolStateSynchronizer::new(
1805 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1806 true,
1807 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1808 1,
1809 Duration::from_secs(0),
1810 true,
1811 true,
1812 true,
1813 ArcRPCClient(Arc::new(rpc_client)),
1814 ArcDeltasClient(Arc::new(deltas_client)),
1815 10_u64,
1816 );
1817 state_sync
1818 .initialize()
1819 .await
1820 .expect("Init failed");
1821
1822 let deltas = [
1824 BlockChanges {
1825 extractor: "uniswap-v2".to_string(),
1826 chain: Chain::Ethereum,
1827 block: Block {
1828 number: 1,
1829 hash: Bytes::from("0x01"),
1830 parent_hash: Bytes::from("0x00"),
1831 chain: Chain::Ethereum,
1832 ts: Default::default(),
1833 },
1834 revert: false,
1835 ..Default::default()
1836 },
1837 BlockChanges {
1838 extractor: "uniswap-v2".to_string(),
1839 chain: Chain::Ethereum,
1840 block: Block {
1841 number: 2,
1842 hash: Bytes::from("0x02"),
1843 parent_hash: Bytes::from("0x01"),
1844 chain: Chain::Ethereum,
1845 ts: Default::default(),
1846 },
1847 revert: false,
1848 component_tvl: [
1849 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1853 .into_iter()
1854 .collect(),
1855 ..Default::default()
1856 },
1857 ];
1858
1859 let (handle, mut rx) = state_sync.start().await;
1860 let (jh, close_tx) = handle.split();
1861
1862 tx.send(deltas[0].clone())
1864 .await
1865 .expect("deltas channel msg 0 closed!");
1866
1867 let _ = timeout(Duration::from_millis(100), rx.recv())
1869 .await
1870 .expect("waiting for first state msg timed out!")
1871 .expect("state sync block sender closed!");
1872
1873 tx.send(deltas[1].clone())
1875 .await
1876 .expect("deltas channel msg 1 closed!");
1877 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1878 .await
1879 .expect("waiting for second state msg timed out!")
1880 .expect("state sync block sender closed!")
1881 .expect("no error");
1882
1883 let _ = close_tx.send(());
1884 jh.await
1885 .expect("state sync task panicked!");
1886
1887 let expected_second_msg = StateSyncMessage {
1888 header: BlockHeader {
1889 number: 2,
1890 hash: Bytes::from("0x02"),
1891 parent_hash: Bytes::from("0x01"),
1892 revert: false,
1893 ..Default::default()
1894 },
1895 snapshots: Snapshot {
1896 states: [(
1897 "Component3".to_string(),
1898 ComponentWithState {
1899 state: ResponseProtocolState {
1900 component_id: "Component3".to_string(),
1901 ..Default::default()
1902 },
1903 component: ProtocolComponent {
1904 id: "Component3".to_string(),
1905 ..Default::default()
1906 },
1907 component_tvl: Some(10.0),
1908 entrypoints: vec![], },
1910 )]
1911 .into_iter()
1912 .collect(),
1913 vm_storage: HashMap::new(),
1914 },
1915 deltas: Some(BlockChanges {
1916 extractor: "uniswap-v2".to_string(),
1917 chain: Chain::Ethereum,
1918 block: Block {
1919 number: 2,
1920 hash: Bytes::from("0x02"),
1921 parent_hash: Bytes::from("0x01"),
1922 chain: Chain::Ethereum,
1923 ts: Default::default(),
1924 },
1925 revert: false,
1926 component_tvl: [
1927 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
1930 .into_iter()
1931 .collect(),
1932 ..Default::default()
1933 }),
1934 removed_components: [(
1935 "Component2".to_string(),
1936 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1937 )]
1938 .into_iter()
1939 .collect(),
1940 };
1941
1942 assert_eq!(second_msg, expected_second_msg);
1943 }
1944
1945 #[test_log::test(tokio::test)]
1946 async fn test_public_close_api_functionality() {
1947 let mut rpc_client = make_mock_client();
1954 let mut deltas_client = MockDeltasClient::new();
1955
1956 rpc_client
1958 .expect_get_protocol_components()
1959 .returning(|_| {
1960 Ok(ProtocolComponentRequestResponse {
1961 protocol_components: vec![],
1962 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1963 })
1964 });
1965
1966 let (_tx, rx) = channel(1);
1968 deltas_client
1969 .expect_subscribe()
1970 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1971
1972 deltas_client
1974 .expect_unsubscribe()
1975 .return_once(|_| Ok(()));
1976
1977 let mut state_sync = ProtocolStateSynchronizer::new(
1978 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1979 true,
1980 ComponentFilter::with_tvl_range(0.0, 0.0),
1981 5, Duration::from_secs(0),
1983 true,
1984 false,
1985 true,
1986 ArcRPCClient(Arc::new(rpc_client)),
1987 ArcDeltasClient(Arc::new(deltas_client)),
1988 10000_u64, );
1990
1991 state_sync
1992 .initialize()
1993 .await
1994 .expect("Init should succeed");
1995
1996 let (handle, _rx) = state_sync.start().await;
1998 let (jh, close_tx) = handle.split();
1999
2000 tokio::time::sleep(Duration::from_millis(100)).await;
2002
2003 close_tx
2005 .send(())
2006 .expect("Should be able to send close signal");
2007 jh.await.expect("Task should not panic");
2009 }
2010
2011 #[test_log::test(tokio::test)]
2012 async fn test_cleanup_runs_when_state_sync_processing_errors() {
2013 let mut rpc_client = make_mock_client();
2018 let mut deltas_client = MockDeltasClient::new();
2019
2020 rpc_client
2022 .expect_get_protocol_components()
2023 .returning(|_| {
2024 Ok(ProtocolComponentRequestResponse {
2025 protocol_components: vec![],
2026 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2027 })
2028 });
2029
2030 rpc_client
2032 .expect_get_protocol_states()
2033 .returning(|_| {
2034 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2035 });
2036
2037 let (tx, rx) = channel(10);
2039 deltas_client
2040 .expect_subscribe()
2041 .return_once(move |_, _| {
2042 let delta = BlockChanges {
2044 extractor: "test".to_string(),
2045 chain: Chain::Ethereum,
2046 block: Block {
2047 hash: Bytes::from("0x0123"),
2048 number: 1,
2049 parent_hash: Bytes::from("0x0000"),
2050 chain: Chain::Ethereum,
2051 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2052 .unwrap()
2053 .naive_utc(),
2054 },
2055 revert: false,
2056 new_protocol_components: [(
2058 "new_component".to_string(),
2059 ProtocolComponent {
2060 id: "new_component".to_string(),
2061 protocol_system: "test_protocol".to_string(),
2062 protocol_type_name: "test".to_string(),
2063 chain: Chain::Ethereum,
2064 tokens: vec![Bytes::from("0x0badc0ffee")],
2065 contract_ids: vec![Bytes::from("0x0badc0ffee")],
2066 static_attributes: Default::default(),
2067 creation_tx: Default::default(),
2068 created_at: Default::default(),
2069 change: Default::default(),
2070 },
2071 )]
2072 .into_iter()
2073 .collect(),
2074 component_tvl: [("new_component".to_string(), 100.0)]
2075 .into_iter()
2076 .collect(),
2077 ..Default::default()
2078 };
2079
2080 tokio::spawn(async move {
2081 let _ = tx.send(delta).await;
2082 });
2084
2085 Ok((Uuid::default(), rx))
2086 });
2087
2088 deltas_client
2090 .expect_unsubscribe()
2091 .return_once(|_| Ok(()));
2092
2093 let mut state_sync = ProtocolStateSynchronizer::new(
2094 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2095 true,
2096 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2098 Duration::from_secs(0),
2099 true,
2100 false,
2101 true,
2102 ArcRPCClient(Arc::new(rpc_client)),
2103 ArcDeltasClient(Arc::new(deltas_client)),
2104 5000_u64,
2105 );
2106
2107 state_sync
2108 .initialize()
2109 .await
2110 .expect("Init should succeed");
2111
2112 state_sync.last_synced_block = Some(BlockHeader {
2114 hash: Bytes::from("0x0badc0ffee"),
2115 number: 42,
2116 parent_hash: Bytes::from("0xbadbeef0"),
2117 revert: false,
2118 timestamp: 123456789,
2119 partial_block_index: None,
2120 });
2121
2122 let (mut block_tx, _block_rx) = channel(10);
2124
2125 let (_end_tx, end_rx) = oneshot::channel::<()>();
2127 let result = state_sync
2128 .state_sync(&mut block_tx, end_rx)
2129 .await;
2130 assert!(result.is_err(), "state_sync should have errored during processing");
2132
2133 }
2136
2137 #[test_log::test(tokio::test)]
2138 async fn test_close_signal_while_waiting_for_first_deltas() {
2139 let mut rpc_client = make_mock_client();
2143 let mut deltas_client = MockDeltasClient::new();
2144
2145 rpc_client
2146 .expect_get_protocol_components()
2147 .returning(|_| {
2148 Ok(ProtocolComponentRequestResponse {
2149 protocol_components: vec![],
2150 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2151 })
2152 });
2153
2154 let (_tx, rx) = channel(1);
2155 deltas_client
2156 .expect_subscribe()
2157 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2158
2159 deltas_client
2160 .expect_unsubscribe()
2161 .return_once(|_| Ok(()));
2162
2163 let mut state_sync = ProtocolStateSynchronizer::new(
2164 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2165 true,
2166 ComponentFilter::with_tvl_range(0.0, 0.0),
2167 1,
2168 Duration::from_secs(0),
2169 true,
2170 true,
2171 false,
2172 ArcRPCClient(Arc::new(rpc_client)),
2173 ArcDeltasClient(Arc::new(deltas_client)),
2174 10000_u64,
2175 );
2176
2177 state_sync
2178 .initialize()
2179 .await
2180 .expect("Init should succeed");
2181
2182 let (mut block_tx, _block_rx) = channel(10);
2183 let (end_tx, end_rx) = oneshot::channel::<()>();
2184
2185 let state_sync_handle = tokio::spawn(async move {
2187 state_sync
2188 .state_sync(&mut block_tx, end_rx)
2189 .await
2190 });
2191
2192 tokio::time::sleep(Duration::from_millis(100)).await;
2194
2195 let _ = end_tx.send(());
2197
2198 let result = state_sync_handle
2200 .await
2201 .expect("Task should not panic");
2202 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2203
2204 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2205 }
2206
2207 #[test_log::test(tokio::test)]
2208 async fn test_close_signal_during_main_processing_loop() {
2209 let mut rpc_client = make_mock_client();
2215 let mut deltas_client = MockDeltasClient::new();
2216
2217 rpc_client
2219 .expect_get_protocol_components()
2220 .returning(|_| {
2221 Ok(ProtocolComponentRequestResponse {
2222 protocol_components: vec![],
2223 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2224 })
2225 });
2226
2227 rpc_client
2229 .expect_get_protocol_states()
2230 .returning(|_| {
2231 Ok(ProtocolStateRequestResponse {
2232 states: vec![],
2233 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2234 })
2235 });
2236
2237 rpc_client
2238 .expect_get_component_tvl()
2239 .returning(|_| {
2240 Ok(ComponentTvlRequestResponse {
2241 tvl: HashMap::new(),
2242 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2243 })
2244 });
2245
2246 rpc_client
2247 .expect_get_traced_entry_points()
2248 .returning(|_| {
2249 Ok(TracedEntryPointRequestResponse {
2250 traced_entry_points: HashMap::new(),
2251 pagination: PaginationResponse::new(0, 20, 0),
2252 })
2253 });
2254
2255 let (tx, rx) = channel(10);
2257 deltas_client
2258 .expect_subscribe()
2259 .return_once(move |_, _| {
2260 let first_delta = BlockChanges {
2262 extractor: "test".to_string(),
2263 chain: Chain::Ethereum,
2264 block: Block {
2265 hash: Bytes::from("0x0123"),
2266 number: 1,
2267 parent_hash: Bytes::from("0x0000"),
2268 chain: Chain::Ethereum,
2269 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2270 .unwrap()
2271 .naive_utc(),
2272 },
2273 revert: false,
2274 ..Default::default()
2275 };
2276
2277 tokio::spawn(async move {
2278 let _ = tx.send(first_delta).await;
2279 tokio::time::sleep(Duration::from_secs(30)).await;
2282 });
2283
2284 Ok((Uuid::default(), rx))
2285 });
2286
2287 deltas_client
2288 .expect_unsubscribe()
2289 .return_once(|_| Ok(()));
2290
2291 let mut state_sync = ProtocolStateSynchronizer::new(
2292 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2293 true,
2294 ComponentFilter::with_tvl_range(0.0, 1000.0),
2295 1,
2296 Duration::from_secs(0),
2297 true,
2298 false,
2299 true,
2300 ArcRPCClient(Arc::new(rpc_client)),
2301 ArcDeltasClient(Arc::new(deltas_client)),
2302 10000_u64,
2303 );
2304
2305 state_sync
2306 .initialize()
2307 .await
2308 .expect("Init should succeed");
2309
2310 let (mut block_tx, mut block_rx) = channel(10);
2311 let (end_tx, end_rx) = oneshot::channel::<()>();
2312
2313 let state_sync_handle = tokio::spawn(async move {
2315 state_sync
2316 .state_sync(&mut block_tx, end_rx)
2317 .await
2318 });
2319
2320 let first_snapshot = block_rx
2322 .recv()
2323 .await
2324 .expect("Should receive first snapshot")
2325 .expect("Synchronizer error");
2326 assert!(
2327 !first_snapshot
2328 .snapshots
2329 .states
2330 .is_empty() ||
2331 first_snapshot.deltas.is_some()
2332 );
2333 let _ = end_tx.send(());
2335
2336 let result = state_sync_handle
2338 .await
2339 .expect("Task should not panic");
2340 assert!(
2341 result.is_ok(),
2342 "state_sync should exit cleanly when closed after first message: {result:?}"
2343 );
2344 }
2345
2346 #[test_log::test(tokio::test)]
2347 async fn test_max_retries_exceeded_error_propagation() {
2348 let mut rpc_client = make_mock_client();
2352 let mut deltas_client = MockDeltasClient::new();
2353
2354 rpc_client
2356 .expect_get_protocol_components()
2357 .returning(|_| {
2358 Ok(ProtocolComponentRequestResponse {
2359 protocol_components: vec![],
2360 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2361 })
2362 });
2363
2364 deltas_client
2367 .expect_subscribe()
2368 .returning(|_, _| {
2369 Err(DeltasError::NotConnected)
2371 });
2372
2373 deltas_client
2375 .expect_unsubscribe()
2376 .returning(|_| Ok(()))
2377 .times(0..=5);
2378
2379 let mut state_sync = ProtocolStateSynchronizer::new(
2381 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2382 true,
2383 ComponentFilter::with_tvl_range(0.0, 1000.0),
2384 2, Duration::from_millis(10), true,
2387 false,
2388 true,
2389 ArcRPCClient(Arc::new(rpc_client)),
2390 ArcDeltasClient(Arc::new(deltas_client)),
2391 1000_u64,
2392 );
2393
2394 state_sync
2395 .initialize()
2396 .await
2397 .expect("Init should succeed");
2398
2399 let (handle, mut rx) = state_sync.start().await;
2401 let (jh, _close_tx) = handle.split();
2402
2403 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2404 .await
2405 .expect("responsds in time")
2406 .expect("channel open");
2407
2408 if let Err(err) = res {
2410 assert!(
2411 matches!(err, SynchronizerError::ConnectionClosed),
2412 "Expected ConnectionClosed error, got: {:?}",
2413 err
2414 );
2415 } else {
2416 panic!("Expected an error")
2417 }
2418
2419 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2421 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2422 }
2423
2424 #[test_log::test(tokio::test)]
2425 async fn test_is_next_expected() {
2426 let mut state_sync = with_mocked_clients(true, false, None, None);
2430
2431 let incoming_header = BlockHeader {
2433 number: 100,
2434 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2435 parent_hash: Bytes::from(
2436 "0x0000000000000000000000000000000000000000000000000000000000000000",
2437 ),
2438 revert: false,
2439 timestamp: 123456789,
2440 partial_block_index: None,
2441 };
2442 assert!(
2443 !state_sync.is_next_expected(&incoming_header),
2444 "Should return false when no previous block is set"
2445 );
2446
2447 let previous_header = BlockHeader {
2449 number: 99,
2450 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2451 parent_hash: Bytes::from(
2452 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2453 ),
2454 revert: false,
2455 timestamp: 123456788,
2456 partial_block_index: None,
2457 };
2458 state_sync.last_synced_block = Some(previous_header.clone());
2459
2460 assert!(
2461 state_sync.is_next_expected(&incoming_header),
2462 "Should return true when incoming parent_hash matches previous hash"
2463 );
2464
2465 let non_matching_header = BlockHeader {
2467 number: 100,
2468 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2469 parent_hash: Bytes::from(
2470 "0x1111111111111111111111111111111111111111111111111111111111111111",
2471 ), revert: false,
2473 timestamp: 123456789,
2474 partial_block_index: None,
2475 };
2476 assert!(
2477 !state_sync.is_next_expected(&non_matching_header),
2478 "Should return false when incoming parent_hash doesn't match previous hash"
2479 );
2480 }
2481
2482 #[test_log::test(tokio::test)]
2483 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2484 let mut rpc_client = make_mock_client();
2488 let mut deltas_client = MockDeltasClient::new();
2489
2490 rpc_client
2492 .expect_get_protocol_components()
2493 .returning(|_| {
2494 Ok(ProtocolComponentRequestResponse {
2495 protocol_components: vec![ProtocolComponent {
2496 id: "Component1".to_string(),
2497 ..Default::default()
2498 }],
2499 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2500 })
2501 });
2502
2503 let (tx, rx) = channel(10);
2505 deltas_client
2506 .expect_subscribe()
2507 .return_once(move |_, _| {
2508 let expected_next_delta = BlockChanges {
2509 extractor: "uniswap-v2".to_string(),
2510 chain: Chain::Ethereum,
2511 block: Block {
2512 hash: Bytes::from(
2513 "0x0000000000000000000000000000000000000000000000000000000000000002",
2514 ), number: 2,
2516 parent_hash: Bytes::from(
2517 "0x0000000000000000000000000000000000000000000000000000000000000001",
2518 ), chain: Chain::Ethereum,
2520 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2521 .unwrap()
2522 .naive_utc(),
2523 },
2524 revert: false,
2525 ..Default::default()
2526 };
2527
2528 tokio::spawn(async move {
2529 let _ = tx.send(expected_next_delta).await;
2530 });
2531
2532 Ok((Uuid::default(), rx))
2533 });
2534
2535 deltas_client
2536 .expect_unsubscribe()
2537 .return_once(|_| Ok(()));
2538
2539 let mut state_sync = ProtocolStateSynchronizer::new(
2540 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2541 true,
2542 ComponentFilter::with_tvl_range(0.0, 1000.0),
2543 1,
2544 Duration::from_secs(0),
2545 true, false,
2547 true,
2548 ArcRPCClient(Arc::new(rpc_client)),
2549 ArcDeltasClient(Arc::new(deltas_client)),
2550 10000_u64,
2551 );
2552
2553 state_sync
2555 .initialize()
2556 .await
2557 .expect("Init should succeed");
2558
2559 state_sync.last_synced_block = Some(BlockHeader {
2561 number: 1,
2562 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2564 "0x0000000000000000000000000000000000000000000000000000000000000000",
2565 ),
2566 revert: false,
2567 timestamp: 123456789,
2568 partial_block_index: None,
2569 });
2570
2571 let (mut block_tx, mut block_rx) = channel(10);
2572 let (end_tx, end_rx) = oneshot::channel::<()>();
2573
2574 let state_sync_handle = tokio::spawn(async move {
2576 state_sync
2577 .state_sync(&mut block_tx, end_rx)
2578 .await
2579 });
2580
2581 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2583 .await
2584 .expect("Should receive message within timeout")
2585 .expect("Channel should be open")
2586 .expect("Should not be an error");
2587
2588 let _ = end_tx.send(());
2590
2591 let _ = state_sync_handle
2593 .await
2594 .expect("Task should not panic");
2595
2596 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2599 assert!(
2600 result_msg.snapshots.states.is_empty(),
2601 "Should not contain snapshots when next expected block is received"
2602 );
2603
2604 if let Some(deltas) = &result_msg.deltas {
2606 assert_eq!(deltas.block.number, 2);
2607 assert_eq!(
2608 deltas.block.hash,
2609 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2610 );
2611 assert_eq!(
2612 deltas.block.parent_hash,
2613 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2614 );
2615 }
2616 }
2617
2618 #[test_log::test(tokio::test)]
2619 async fn test_skip_previously_processed_messages() {
2620 let mut rpc_client = make_mock_client();
2624 let mut deltas_client = MockDeltasClient::new();
2625
2626 rpc_client
2628 .expect_get_protocol_components()
2629 .returning(|_| {
2630 Ok(ProtocolComponentRequestResponse {
2631 protocol_components: vec![ProtocolComponent {
2632 id: "Component1".to_string(),
2633 ..Default::default()
2634 }],
2635 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2636 })
2637 });
2638
2639 rpc_client
2641 .expect_get_protocol_states()
2642 .returning(|_| {
2643 Ok(ProtocolStateRequestResponse {
2644 states: vec![ResponseProtocolState {
2645 component_id: "Component1".to_string(),
2646 ..Default::default()
2647 }],
2648 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2649 })
2650 });
2651
2652 rpc_client
2653 .expect_get_component_tvl()
2654 .returning(|_| {
2655 Ok(ComponentTvlRequestResponse {
2656 tvl: [("Component1".to_string(), 100.0)]
2657 .into_iter()
2658 .collect(),
2659 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2660 })
2661 });
2662
2663 rpc_client
2664 .expect_get_traced_entry_points()
2665 .returning(|_| {
2666 Ok(TracedEntryPointRequestResponse {
2667 traced_entry_points: HashMap::new(),
2668 pagination: PaginationResponse::new(0, 20, 0),
2669 })
2670 });
2671
2672 let (tx, rx) = channel(10);
2674 deltas_client
2675 .expect_subscribe()
2676 .return_once(move |_, _| {
2677 let old_messages = vec![
2679 BlockChanges {
2680 extractor: "uniswap-v2".to_string(),
2681 chain: Chain::Ethereum,
2682 block: Block {
2683 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2684 number: 3,
2685 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2686 chain: Chain::Ethereum,
2687 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2688 },
2689 revert: false,
2690 ..Default::default()
2691 },
2692 BlockChanges {
2693 extractor: "uniswap-v2".to_string(),
2694 chain: Chain::Ethereum,
2695 block: Block {
2696 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2697 number: 4,
2698 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2699 chain: Chain::Ethereum,
2700 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2701 },
2702 revert: false,
2703 ..Default::default()
2704 },
2705 BlockChanges {
2706 extractor: "uniswap-v2".to_string(),
2707 chain: Chain::Ethereum,
2708 block: Block {
2709 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2710 number: 5,
2711 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2712 chain: Chain::Ethereum,
2713 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2714 },
2715 revert: false,
2716 ..Default::default()
2717 },
2718 BlockChanges {
2720 extractor: "uniswap-v2".to_string(),
2721 chain: Chain::Ethereum,
2722 block: Block {
2723 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2724 number: 6,
2725 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2726 chain: Chain::Ethereum,
2727 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2728 },
2729 revert: false,
2730 ..Default::default()
2731 },
2732 ];
2733
2734 tokio::spawn(async move {
2735 for message in old_messages {
2736 let _ = tx.send(message).await;
2737 tokio::time::sleep(Duration::from_millis(10)).await;
2738 }
2739 });
2740
2741 Ok((Uuid::default(), rx))
2742 });
2743
2744 deltas_client
2745 .expect_unsubscribe()
2746 .return_once(|_| Ok(()));
2747
2748 let mut state_sync = ProtocolStateSynchronizer::new(
2749 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2750 true,
2751 ComponentFilter::with_tvl_range(0.0, 1000.0),
2752 1,
2753 Duration::from_secs(0),
2754 true,
2755 true,
2756 true,
2757 ArcRPCClient(Arc::new(rpc_client)),
2758 ArcDeltasClient(Arc::new(deltas_client)),
2759 10000_u64,
2760 );
2761
2762 state_sync
2764 .initialize()
2765 .await
2766 .expect("Init should succeed");
2767
2768 state_sync.last_synced_block = Some(BlockHeader {
2769 number: 5,
2770 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2771 parent_hash: Bytes::from(
2772 "0x0000000000000000000000000000000000000000000000000000000000000004",
2773 ),
2774 revert: false,
2775 timestamp: 1234567892,
2776 partial_block_index: None,
2777 });
2778
2779 let (mut block_tx, mut block_rx) = channel(10);
2780 let (end_tx, end_rx) = oneshot::channel::<()>();
2781
2782 let state_sync_handle = tokio::spawn(async move {
2784 state_sync
2785 .state_sync(&mut block_tx, end_rx)
2786 .await
2787 });
2788
2789 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2791 .await
2792 .expect("Should receive message within timeout")
2793 .expect("Channel should be open")
2794 .expect("Should not be an error");
2795
2796 let _ = end_tx.send(());
2798
2799 let _ = state_sync_handle
2801 .await
2802 .expect("Task should not panic");
2803
2804 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2806 if let Some(deltas) = &result_msg.deltas {
2807 assert_eq!(
2808 deltas.block.number, 6,
2809 "Should only process block 6, skipping earlier blocks"
2810 );
2811 assert_eq!(
2812 deltas.block.hash,
2813 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2814 );
2815 }
2816
2817 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2820 Err(_) => {
2821 }
2823 Ok(Some(Err(_))) => {
2824 }
2826 Ok(Some(Ok(_))) => {
2827 panic!("Should not receive additional messages - old blocks should be skipped");
2828 }
2829 Ok(None) => {
2830 }
2832 }
2833 }
2834
2835 fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
2836 let hash = Bytes::from(vec![block_num as u8; 32]);
2838 let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
2839 BlockChanges {
2840 extractor: "uniswap-v2".to_string(),
2841 chain: Chain::Ethereum,
2842 block: Block {
2843 number: block_num,
2844 hash,
2845 parent_hash,
2846 chain: Chain::Ethereum,
2847 ts: Default::default(),
2848 },
2849 revert: false,
2850 partial_block_index: partial_idx,
2851 ..Default::default()
2852 }
2853 }
2854
2855 #[test_log::test(tokio::test)]
2857 async fn test_partial_mode_accepts_full_block_as_first_message() {
2858 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2859 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2860 .with_partial_blocks(true);
2861 state_sync
2862 .initialize()
2863 .await
2864 .expect("Init failed");
2865
2866 let (handle, mut block_rx) = state_sync.start().await;
2867 let (jh, close_tx) = handle.split();
2868
2869 tx.send(make_block_changes(1, None))
2871 .await
2872 .unwrap();
2873
2874 let msg = timeout(Duration::from_millis(100), block_rx.recv())
2876 .await
2877 .expect("Should receive message")
2878 .expect("Channel open")
2879 .expect("No error");
2880
2881 assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
2882 assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
2883
2884 let _ = close_tx.send(());
2885 jh.await.expect("Task should not panic");
2886 }
2887
2888 #[test_log::test(tokio::test)]
2890 async fn test_partial_mode_detects_block_number_increase() {
2891 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2892 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2893 .with_partial_blocks(true);
2894 state_sync
2895 .initialize()
2896 .await
2897 .expect("Init failed");
2898
2899 let (handle, mut block_rx) = state_sync.start().await;
2900 let (jh, close_tx) = handle.split();
2901
2902 tx.send(make_block_changes(1, Some(0)))
2904 .await
2905 .unwrap();
2906 tx.send(make_block_changes(1, Some(3)))
2907 .await
2908 .unwrap();
2909
2910 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2912 Err(_) => { }
2913 Ok(_) => panic!("Should not receive message while waiting for new block"),
2914 }
2915
2916 tx.send(make_block_changes(2, Some(5)))
2919 .await
2920 .unwrap();
2921
2922 let msg = timeout(Duration::from_millis(100), block_rx.recv())
2924 .await
2925 .expect("Should receive message")
2926 .expect("Channel open")
2927 .expect("No error");
2928
2929 assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
2930 assert_eq!(msg.header.partial_block_index, Some(5));
2931
2932 let _ = close_tx.send(());
2933 jh.await.expect("Task should not panic");
2934 }
2935
2936 #[test_log::test(tokio::test)]
2938 async fn test_partial_mode_skips_already_synced_blocks() {
2939 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2940 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2941 .with_partial_blocks(true);
2942 state_sync
2943 .initialize()
2944 .await
2945 .expect("Init failed");
2946
2947 state_sync.last_synced_block = Some(BlockHeader {
2949 number: 5,
2950 hash: Bytes::from("0x05"),
2951 parent_hash: Bytes::from("0x04"),
2952 revert: false,
2953 timestamp: 0,
2954 partial_block_index: None,
2955 });
2956
2957 let (handle, mut block_rx) = state_sync.start().await;
2958 let (jh, close_tx) = handle.split();
2959
2960 tx.send(make_block_changes(3, Some(2)))
2962 .await
2963 .unwrap();
2964
2965 tx.send(make_block_changes(4, Some(0)))
2967 .await
2968 .unwrap();
2969
2970 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2972 Err(_) => { }
2973 Ok(_) => panic!("Should skip block 4 because it's already synced"),
2974 }
2975
2976 tx.send(make_block_changes(5, Some(3)))
2979 .await
2980 .unwrap();
2981 tx.send(make_block_changes(6, Some(0)))
2983 .await
2984 .unwrap();
2985
2986 let msg = timeout(Duration::from_millis(100), block_rx.recv())
2987 .await
2988 .expect("Should receive message")
2989 .expect("Channel open")
2990 .expect("No error");
2991
2992 assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
2993
2994 let _ = close_tx.send(());
2995 jh.await.expect("Task should not panic");
2996 }
2997}