1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, BlockParam, Chain, ComponentTvlRequestBody, EntryPointWithTracingParams,
19 ExtractorIdentity, ProtocolComponent, ResponseAccount, ResponseProtocolState,
20 TracingResult, VersionParam,
21 },
22 Bytes,
23};
24
25use crate::{
26 deltas::{DeltasClient, SubscriptionOptions},
27 feed::{
28 component_tracker::{ComponentFilter, ComponentTracker},
29 BlockHeader, HeaderLike,
30 },
31 rpc::{RPCClient, RPCError},
32 DeltasError,
33};
34
35#[derive(Error, Debug)]
36pub enum SynchronizerError {
37 #[error("RPC error: {0}")]
39 RPCError(#[from] RPCError),
40
41 #[error("{0}")]
43 ChannelError(String),
44
45 #[error("Timeout error: {0}")]
47 Timeout(String),
48
49 #[error("Failed to close synchronizer: {0}")]
51 CloseError(String),
52
53 #[error("Connection error: {0}")]
55 ConnectionError(String),
56
57 #[error("Connection closed")]
59 ConnectionClosed,
60}
61
62pub type SyncResult<T> = Result<T, SynchronizerError>;
63
64impl<T> From<SendError<T>> for SynchronizerError {
65 fn from(err: SendError<T>) -> Self {
66 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
67 }
68}
69
70impl From<DeltasError> for SynchronizerError {
71 fn from(err: DeltasError) -> Self {
72 match err {
73 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
74 _ => SynchronizerError::ConnectionError(err.to_string()),
75 }
76 }
77}
78
79pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
80 extractor_id: ExtractorIdentity,
81 retrieve_balances: bool,
82 rpc_client: R,
83 deltas_client: D,
84 max_retries: u64,
85 retry_cooldown: Duration,
86 include_snapshots: bool,
87 component_tracker: ComponentTracker<R>,
88 last_synced_block: Option<BlockHeader>,
89 timeout: u64,
90 include_tvl: bool,
91}
92
93#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
94pub struct ComponentWithState {
95 pub state: ResponseProtocolState,
96 pub component: ProtocolComponent,
97 pub component_tvl: Option<f64>,
98 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
99}
100
101#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
102pub struct Snapshot {
103 pub states: HashMap<String, ComponentWithState>,
104 pub vm_storage: HashMap<Bytes, ResponseAccount>,
105}
106
107impl Snapshot {
108 fn extend(&mut self, other: Snapshot) {
109 self.states.extend(other.states);
110 self.vm_storage.extend(other.vm_storage);
111 }
112
113 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
114 &self.states
115 }
116
117 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
118 &self.vm_storage
119 }
120}
121
122#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
123pub struct StateSyncMessage<H>
124where
125 H: HeaderLike,
126{
127 pub header: H,
129 pub snapshots: Snapshot,
131 pub deltas: Option<BlockChanges>,
135 pub removed_components: HashMap<String, ProtocolComponent>,
137}
138
139impl<H> StateSyncMessage<H>
140where
141 H: HeaderLike,
142{
143 pub fn merge(mut self, other: Self) -> Self {
144 self.removed_components
146 .retain(|k, _| !other.snapshots.states.contains_key(k));
147 self.snapshots
148 .states
149 .retain(|k, _| !other.removed_components.contains_key(k));
150
151 self.snapshots.extend(other.snapshots);
152 let deltas = match (self.deltas, other.deltas) {
153 (Some(l), Some(r)) => Some(l.merge(r)),
154 (None, Some(r)) => Some(r),
155 (Some(l), None) => Some(l),
156 (None, None) => None,
157 };
158 self.removed_components
159 .extend(other.removed_components);
160 Self {
161 header: other.header,
162 snapshots: self.snapshots,
163 deltas,
164 removed_components: self.removed_components,
165 }
166 }
167}
168
169pub struct SynchronizerTaskHandle {
174 join_handle: JoinHandle<()>,
175 close_tx: oneshot::Sender<()>,
176}
177
178impl SynchronizerTaskHandle {
187 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
188 Self { join_handle, close_tx }
189 }
190
191 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
197 (self.join_handle, self.close_tx)
198 }
199}
200
201#[async_trait]
202pub trait StateSynchronizer: Send + Sync + 'static {
203 async fn initialize(&mut self) -> SyncResult<()>;
204 async fn start(
207 mut self,
208 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
209}
210
211impl<R, D> ProtocolStateSynchronizer<R, D>
212where
213 R: RPCClient + Clone + Send + Sync + 'static,
216 D: DeltasClient + Clone + Send + Sync + 'static,
217{
218 #[allow(clippy::too_many_arguments)]
220 pub fn new(
221 extractor_id: ExtractorIdentity,
222 retrieve_balances: bool,
223 component_filter: ComponentFilter,
224 max_retries: u64,
225 retry_cooldown: Duration,
226 include_snapshots: bool,
227 include_tvl: bool,
228 rpc_client: R,
229 deltas_client: D,
230 timeout: u64,
231 ) -> Self {
232 Self {
233 extractor_id: extractor_id.clone(),
234 retrieve_balances,
235 rpc_client: rpc_client.clone(),
236 include_snapshots,
237 deltas_client,
238 component_tracker: ComponentTracker::new(
239 extractor_id.chain,
240 extractor_id.name.as_str(),
241 component_filter,
242 rpc_client,
243 ),
244 max_retries,
245 retry_cooldown,
246 last_synced_block: None,
247 timeout,
248 include_tvl,
249 }
250 }
251
252 #[allow(deprecated)]
254 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
255 &mut self,
256 header: BlockHeader,
257 ids: Option<I>,
258 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
259 if !self.include_snapshots {
260 return Ok(StateSyncMessage { header, ..Default::default() });
261 }
262 let version = VersionParam::new(
263 None,
264 Some(BlockParam {
265 chain: Some(self.extractor_id.chain),
266 hash: None,
267 number: Some(header.number as i64),
268 }),
269 );
270
271 let component_ids: Vec<_> = match ids {
273 Some(ids) => ids.into_iter().cloned().collect(),
274 None => self
275 .component_tracker
276 .get_tracked_component_ids(),
277 };
278
279 if component_ids.is_empty() {
280 return Ok(StateSyncMessage { header, ..Default::default() });
281 }
282
283 let component_tvl = if self.include_tvl {
284 let body = ComponentTvlRequestBody::id_filtered(
285 component_ids.clone(),
286 self.extractor_id.chain,
287 );
288 self.rpc_client
289 .get_component_tvl_paginated(&body, 100, 4)
290 .await?
291 .tvl
292 } else {
293 HashMap::new()
294 };
295
296 let entrypoints_result = if self.extractor_id.chain == Chain::Ethereum {
299 let result = self
301 .rpc_client
302 .get_traced_entry_points_paginated(
303 self.extractor_id.chain,
304 &self.extractor_id.name,
305 &component_ids,
306 100,
307 4,
308 )
309 .await?;
310 self.component_tracker
311 .process_entrypoints(&result.clone().into());
312 Some(result)
313 } else {
314 None
315 };
316
317 let mut protocol_states = self
319 .rpc_client
320 .get_protocol_states_paginated(
321 self.extractor_id.chain,
322 &component_ids,
323 &self.extractor_id.name,
324 self.retrieve_balances,
325 &version,
326 100,
327 4,
328 )
329 .await?
330 .states
331 .into_iter()
332 .map(|state| (state.component_id.clone(), state))
333 .collect::<HashMap<_, _>>();
334
335 trace!(states=?&protocol_states, "Retrieved ProtocolStates");
336 let states = self
337 .component_tracker
338 .components
339 .values()
340 .filter_map(|component| {
341 if let Some(state) = protocol_states.remove(&component.id) {
342 Some((
343 component.id.clone(),
344 ComponentWithState {
345 state,
346 component: component.clone(),
347 component_tvl: component_tvl
348 .get(&component.id)
349 .cloned(),
350 entrypoints: entrypoints_result
351 .as_ref()
352 .map(|r| {
353 r.traced_entry_points
354 .get(&component.id)
355 .cloned()
356 .unwrap_or_default()
357 })
358 .unwrap_or_default(),
359 },
360 ))
361 } else if component_ids.contains(&component.id) {
362 let component_id = &component.id;
364 error!(?component_id, "Missing state for native component!");
365 None
366 } else {
367 None
368 }
369 })
370 .collect();
371
372 let contract_ids = self
374 .component_tracker
375 .get_contracts_by_component(&component_ids);
376 let vm_storage = if !contract_ids.is_empty() {
377 let ids: Vec<Bytes> = contract_ids
378 .clone()
379 .into_iter()
380 .collect();
381 let contract_states = self
382 .rpc_client
383 .get_contract_state_paginated(
384 self.extractor_id.chain,
385 ids.as_slice(),
386 &self.extractor_id.name,
387 &version,
388 100,
389 4,
390 )
391 .await?
392 .accounts
393 .into_iter()
394 .map(|acc| (acc.address.clone(), acc))
395 .collect::<HashMap<_, _>>();
396
397 trace!(states=?&contract_states, "Retrieved ContractState");
398
399 let contract_address_to_components = self
400 .component_tracker
401 .components
402 .iter()
403 .filter_map(|(id, comp)| {
404 if component_ids.contains(id) {
405 Some(
406 comp.contract_ids
407 .iter()
408 .map(|address| (address.clone(), comp.id.clone())),
409 )
410 } else {
411 None
412 }
413 })
414 .flatten()
415 .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
416 acc.entry(addr).or_default().push(c_id);
417 acc
418 });
419
420 contract_ids
421 .iter()
422 .filter_map(|address| {
423 if let Some(state) = contract_states.get(address) {
424 Some((address.clone(), state.clone()))
425 } else if let Some(ids) = contract_address_to_components.get(address) {
426 error!(
428 ?address,
429 ?ids,
430 "Component with lacking contract storage encountered!"
431 );
432 None
433 } else {
434 None
435 }
436 })
437 .collect()
438 } else {
439 HashMap::new()
440 };
441
442 Ok(StateSyncMessage {
443 header,
444 snapshots: Snapshot { states, vm_storage },
445 deltas: None,
446 removed_components: HashMap::new(),
447 })
448 }
449
450 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
463 async fn state_sync(
464 &mut self,
465 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
466 mut end_rx: oneshot::Receiver<()>,
467 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
468 let subscription_options = SubscriptionOptions::new().with_state(self.include_snapshots);
470 let (subscription_id, mut msg_rx) = match self
471 .deltas_client
472 .subscribe(self.extractor_id.clone(), subscription_options)
473 .await
474 {
475 Ok(result) => result,
476 Err(e) => return Err((e.into(), Some(end_rx))),
477 };
478
479 let result = async {
480 info!("Waiting for deltas...");
481 let mut warned = false;
482 let mut first_msg = loop {
483 let msg = select! {
484 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
485 deltas_result
486 .map_err(|_| {
487 SynchronizerError::Timeout(format!(
488 "First deltas took longer than {t}s to arrive",
489 t = self.timeout
490 ))
491 })?
492 .ok_or_else(|| {
493 SynchronizerError::ConnectionError(
494 "Deltas channel closed before first message".to_string(),
495 )
496 })?
497 },
498 _ = &mut end_rx => {
499 info!("Received close signal while waiting for first deltas");
500 return Ok(());
501 }
502 };
503
504 let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
505 if let Some(current) = &self.last_synced_block {
506 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
507 if !warned {
508 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
509 warned = true;
510 }
511 continue
512 }
513 }
514 break msg;
515 };
516
517 self.filter_deltas(&mut first_msg);
518
519 let block = first_msg.get_block().clone();
521 info!(height = &block.number, "First deltas received");
522 let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
523 let deltas_msg = StateSyncMessage {
524 header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
525 snapshots: Default::default(),
526 deltas: Some(first_msg),
527 removed_components: Default::default(),
528 };
529
530 let msg = if !self.is_next_expected(&header) {
532 info!("Retrieving snapshot");
533 let snapshot = self
534 .get_snapshots::<Vec<&String>>(
535 BlockHeader::from_block(&block, false),
536 None,
537 )
538 .await?
539 .merge(deltas_msg);
540 let n_components = self.component_tracker.components.len();
541 let n_snapshots = snapshot.snapshots.states.len();
542 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
543 snapshot
544 } else {
545 deltas_msg
546 };
547 block_tx.send(Ok(msg)).await?;
548 self.last_synced_block = Some(header.clone());
549 loop {
550 select! {
551 deltas_opt = msg_rx.recv() => {
552 if let Some(mut deltas) = deltas_opt {
553 let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
554 debug!(block_number=?header.number, "Received delta message");
555
556 let (snapshots, removed_components) = {
557 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
560
561 let requiring_snapshot: Vec<_> = to_add
563 .iter()
564 .filter(|id| {
565 !self.component_tracker
566 .components
567 .contains_key(id.as_str())
568 })
569 .collect();
570 debug!(components=?requiring_snapshot, "SnapshotRequest");
571 self.component_tracker
572 .start_tracking(requiring_snapshot.as_slice())
573 .await?;
574 let snapshots = self
575 .get_snapshots(header.clone(), Some(requiring_snapshot))
576 .await?
577 .snapshots;
578
579 let removed_components = if !to_remove.is_empty() {
580 self.component_tracker.stop_tracking(&to_remove)
581 } else {
582 Default::default()
583 };
584
585 (snapshots, removed_components)
586 };
587
588 self.component_tracker.process_entrypoints(&deltas.dci_update);
590
591 self.filter_deltas(&mut deltas);
593 let n_changes = deltas.n_changes();
594
595 let next = StateSyncMessage {
597 header: header.clone(),
598 snapshots,
599 deltas: Some(deltas),
600 removed_components,
601 };
602 block_tx.send(Ok(next)).await?;
603 self.last_synced_block = Some(header.clone());
604
605 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
606 } else {
607 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
608 }
609 },
610 _ = &mut end_rx => {
611 info!("Received close signal during state_sync");
612 return Ok(());
613 }
614 }
615 }
616 }.await;
617
618 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
620 let _ = self
622 .deltas_client
623 .unsubscribe(subscription_id)
624 .await
625 .map_err(|err| {
626 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
627 });
628
629 match result {
632 Ok(()) => Ok(()), Err(e) => {
634 Err((e, Some(end_rx)))
638 }
639 }
640 }
641
642 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
643 if let Some(block) = self.last_synced_block.as_ref() {
644 return incoming.parent_hash == block.hash;
645 }
646 false
647 }
648 fn filter_deltas(&self, deltas: &mut BlockChanges) {
649 deltas.filter_by_component(|id| {
650 self.component_tracker
651 .components
652 .contains_key(id)
653 });
654 deltas.filter_by_contract(|id| {
655 self.component_tracker
656 .contracts
657 .contains(id)
658 });
659 }
660}
661
662#[async_trait]
663impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
664where
665 R: RPCClient + Clone + Send + Sync + 'static,
666 D: DeltasClient + Clone + Send + Sync + 'static,
667{
668 async fn initialize(&mut self) -> SyncResult<()> {
669 info!("Retrieving relevant protocol components");
670 self.component_tracker
671 .initialise_components()
672 .await?;
673 info!(
674 n_components = self.component_tracker.components.len(),
675 n_contracts = self.component_tracker.contracts.len(),
676 "Finished retrieving components",
677 );
678
679 Ok(())
680 }
681
682 async fn start(
683 mut self,
684 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
685 let (mut tx, rx) = channel(15);
686 let (end_tx, end_rx) = oneshot::channel::<()>();
687
688 let jh = tokio::spawn(async move {
689 let mut retry_count = 0;
690 let mut current_end_rx = end_rx;
691 let mut final_error = None;
692
693 while retry_count < self.max_retries {
694 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
695
696 let res = self
697 .state_sync(&mut tx, current_end_rx)
698 .await;
699 match res {
700 Ok(()) => {
701 info!(
702 extractor_id=%&self.extractor_id,
703 retry_count,
704 "State synchronization exited cleanly"
705 );
706 return;
707 }
708 Err((e, maybe_end_rx)) => {
709 warn!(
710 extractor_id=%&self.extractor_id,
711 retry_count,
712 error=%e,
713 "State synchronization errored!"
714 );
715
716 if let Some(recovered_end_rx) = maybe_end_rx {
718 current_end_rx = recovered_end_rx;
719
720 if let SynchronizerError::ConnectionClosed = e {
721 error!(
723 "Websocket connection closed. State synchronization exiting."
724 );
725 let _ = tx.send(Err(e)).await;
726 return;
727 } else {
728 final_error = Some(e);
730 }
731 } else {
732 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
734 return;
735 }
736 }
737 }
738 sleep(self.retry_cooldown).await;
739 retry_count += 1;
740 }
741 if let Some(e) = final_error {
742 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
743 let _ = tx.send(Err(e)).await;
744 }
745 });
746
747 let handle = SynchronizerTaskHandle::new(jh, end_tx);
748 (handle, rx)
749 }
750}
751
752#[cfg(test)]
753mod test {
754 use std::{collections::HashSet, sync::Arc};
773
774 use test_log::test;
775 use tycho_common::dto::{
776 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
777 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
778 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
779 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
780 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
781 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
782 };
783 use uuid::Uuid;
784
785 use super::*;
786 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
787
788 struct ArcRPCClient<T>(Arc<T>);
790
791 impl<T> Clone for ArcRPCClient<T> {
793 fn clone(&self) -> Self {
794 ArcRPCClient(self.0.clone())
795 }
796 }
797
798 #[async_trait]
799 impl<T> RPCClient for ArcRPCClient<T>
800 where
801 T: RPCClient + Sync + Send + 'static,
802 {
803 async fn get_tokens(
804 &self,
805 request: &TokensRequestBody,
806 ) -> Result<TokensRequestResponse, RPCError> {
807 self.0.get_tokens(request).await
808 }
809
810 async fn get_contract_state(
811 &self,
812 request: &StateRequestBody,
813 ) -> Result<StateRequestResponse, RPCError> {
814 self.0.get_contract_state(request).await
815 }
816
817 async fn get_protocol_components(
818 &self,
819 request: &ProtocolComponentsRequestBody,
820 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
821 self.0
822 .get_protocol_components(request)
823 .await
824 }
825
826 async fn get_protocol_states(
827 &self,
828 request: &ProtocolStateRequestBody,
829 ) -> Result<ProtocolStateRequestResponse, RPCError> {
830 self.0
831 .get_protocol_states(request)
832 .await
833 }
834
835 async fn get_protocol_systems(
836 &self,
837 request: &ProtocolSystemsRequestBody,
838 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
839 self.0
840 .get_protocol_systems(request)
841 .await
842 }
843
844 async fn get_component_tvl(
845 &self,
846 request: &ComponentTvlRequestBody,
847 ) -> Result<ComponentTvlRequestResponse, RPCError> {
848 self.0.get_component_tvl(request).await
849 }
850
851 async fn get_traced_entry_points(
852 &self,
853 request: &TracedEntryPointRequestBody,
854 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
855 self.0
856 .get_traced_entry_points(request)
857 .await
858 }
859 }
860
861 struct ArcDeltasClient<T>(Arc<T>);
863
864 impl<T> Clone for ArcDeltasClient<T> {
866 fn clone(&self) -> Self {
867 ArcDeltasClient(self.0.clone())
868 }
869 }
870
871 #[async_trait]
872 impl<T> DeltasClient for ArcDeltasClient<T>
873 where
874 T: DeltasClient + Sync + Send + 'static,
875 {
876 async fn subscribe(
877 &self,
878 extractor_id: ExtractorIdentity,
879 options: SubscriptionOptions,
880 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
881 self.0
882 .subscribe(extractor_id, options)
883 .await
884 }
885
886 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
887 self.0
888 .unsubscribe(subscription_id)
889 .await
890 }
891
892 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
893 self.0.connect().await
894 }
895
896 async fn close(&self) -> Result<(), DeltasError> {
897 self.0.close().await
898 }
899 }
900
901 fn with_mocked_clients(
902 native: bool,
903 include_tvl: bool,
904 rpc_client: Option<MockRPCClient>,
905 deltas_client: Option<MockDeltasClient>,
906 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
907 {
908 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
909 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
910
911 ProtocolStateSynchronizer::new(
912 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
913 native,
914 ComponentFilter::with_tvl_range(50.0, 50.0),
915 1,
916 Duration::from_secs(0),
917 true,
918 include_tvl,
919 rpc_client,
920 deltas_client,
921 10_u64,
922 )
923 }
924
925 fn state_snapshot_native() -> ProtocolStateRequestResponse {
926 ProtocolStateRequestResponse {
927 states: vec![ResponseProtocolState {
928 component_id: "Component1".to_string(),
929 ..Default::default()
930 }],
931 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
932 }
933 }
934
935 fn component_tvl_snapshot() -> ComponentTvlRequestResponse {
936 let tvl = HashMap::from([("Component1".to_string(), 100.0)]);
937
938 ComponentTvlRequestResponse {
939 tvl,
940 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
941 }
942 }
943
944 #[test(tokio::test)]
945 async fn test_get_snapshots_native() {
946 let header = BlockHeader::default();
947 let mut rpc = MockRPCClient::new();
948 rpc.expect_get_protocol_states()
949 .returning(|_| Ok(state_snapshot_native()));
950 rpc.expect_get_traced_entry_points()
951 .returning(|_| {
952 Ok(TracedEntryPointRequestResponse {
953 traced_entry_points: HashMap::new(),
954 pagination: PaginationResponse::new(0, 20, 0),
955 })
956 });
957 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
958 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
959 state_sync
960 .component_tracker
961 .components
962 .insert("Component1".to_string(), component.clone());
963 let components_arg = ["Component1".to_string()];
964 let exp = StateSyncMessage {
965 header: header.clone(),
966 snapshots: Snapshot {
967 states: state_snapshot_native()
968 .states
969 .into_iter()
970 .map(|state| {
971 (
972 state.component_id.clone(),
973 ComponentWithState {
974 state,
975 component: component.clone(),
976 entrypoints: vec![],
977 component_tvl: None,
978 },
979 )
980 })
981 .collect(),
982 vm_storage: HashMap::new(),
983 },
984 deltas: None,
985 removed_components: Default::default(),
986 };
987
988 let snap = state_sync
989 .get_snapshots(header, Some(&components_arg))
990 .await
991 .expect("Retrieving snapshot failed");
992
993 assert_eq!(snap, exp);
994 }
995
996 #[test(tokio::test)]
997 async fn test_get_snapshots_native_with_tvl() {
998 let header = BlockHeader::default();
999 let mut rpc = MockRPCClient::new();
1000 rpc.expect_get_protocol_states()
1001 .returning(|_| Ok(state_snapshot_native()));
1002 rpc.expect_get_component_tvl()
1003 .returning(|_| Ok(component_tvl_snapshot()));
1004 rpc.expect_get_traced_entry_points()
1005 .returning(|_| {
1006 Ok(TracedEntryPointRequestResponse {
1007 traced_entry_points: HashMap::new(),
1008 pagination: PaginationResponse::new(0, 20, 0),
1009 })
1010 });
1011 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1012 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1013 state_sync
1014 .component_tracker
1015 .components
1016 .insert("Component1".to_string(), component.clone());
1017 let components_arg = ["Component1".to_string()];
1018 let exp = StateSyncMessage {
1019 header: header.clone(),
1020 snapshots: Snapshot {
1021 states: state_snapshot_native()
1022 .states
1023 .into_iter()
1024 .map(|state| {
1025 (
1026 state.component_id.clone(),
1027 ComponentWithState {
1028 state,
1029 component: component.clone(),
1030 component_tvl: Some(100.0),
1031 entrypoints: vec![],
1032 },
1033 )
1034 })
1035 .collect(),
1036 vm_storage: HashMap::new(),
1037 },
1038 deltas: None,
1039 removed_components: Default::default(),
1040 };
1041
1042 let snap = state_sync
1043 .get_snapshots(header, Some(&components_arg))
1044 .await
1045 .expect("Retrieving snapshot failed");
1046
1047 assert_eq!(snap, exp);
1048 }
1049
1050 fn state_snapshot_vm() -> StateRequestResponse {
1051 StateRequestResponse {
1052 accounts: vec![
1053 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1054 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1055 ],
1056 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1057 }
1058 }
1059
1060 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1061 TracedEntryPointRequestResponse {
1062 traced_entry_points: HashMap::from([(
1063 "Component1".to_string(),
1064 vec![(
1065 EntryPointWithTracingParams {
1066 entry_point: EntryPoint {
1067 external_id: "entrypoint_a".to_string(),
1068 target: Bytes::from("0x0badc0ffee"),
1069 signature: "sig()".to_string(),
1070 },
1071 params: TracingParams::RPCTracer(RPCTracerParams {
1072 caller: Some(Bytes::from("0x0badc0ffee")),
1073 calldata: Bytes::from("0x0badc0ffee"),
1074 state_overrides: None,
1075 prune_addresses: None,
1076 }),
1077 },
1078 TracingResult {
1079 retriggers: HashSet::from([(
1080 Bytes::from("0x0badc0ffee"),
1081 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1082 )]),
1083 accessed_slots: HashMap::from([(
1084 Bytes::from("0x0badc0ffee"),
1085 HashSet::from([Bytes::from("0xbadbeef0")]),
1086 )]),
1087 },
1088 )],
1089 )]),
1090 pagination: PaginationResponse::new(0, 20, 0),
1091 }
1092 }
1093
1094 #[test(tokio::test)]
1095 async fn test_get_snapshots_vm() {
1096 let header = BlockHeader::default();
1097 let mut rpc = MockRPCClient::new();
1098 rpc.expect_get_protocol_states()
1099 .returning(|_| Ok(state_snapshot_native()));
1100 rpc.expect_get_contract_state()
1101 .returning(|_| Ok(state_snapshot_vm()));
1102 rpc.expect_get_traced_entry_points()
1103 .returning(|_| Ok(traced_entry_point_response()));
1104 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1105 let component = ProtocolComponent {
1106 id: "Component1".to_string(),
1107 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1108 ..Default::default()
1109 };
1110 state_sync
1111 .component_tracker
1112 .components
1113 .insert("Component1".to_string(), component.clone());
1114 let components_arg = ["Component1".to_string()];
1115 let exp = StateSyncMessage {
1116 header: header.clone(),
1117 snapshots: Snapshot {
1118 states: [(
1119 component.id.clone(),
1120 ComponentWithState {
1121 state: ResponseProtocolState {
1122 component_id: "Component1".to_string(),
1123 ..Default::default()
1124 },
1125 component: component.clone(),
1126 component_tvl: None,
1127 entrypoints: vec![(
1128 EntryPointWithTracingParams {
1129 entry_point: EntryPoint {
1130 external_id: "entrypoint_a".to_string(),
1131 target: Bytes::from("0x0badc0ffee"),
1132 signature: "sig()".to_string(),
1133 },
1134 params: TracingParams::RPCTracer(RPCTracerParams {
1135 caller: Some(Bytes::from("0x0badc0ffee")),
1136 calldata: Bytes::from("0x0badc0ffee"),
1137 state_overrides: None,
1138 prune_addresses: None,
1139 }),
1140 },
1141 TracingResult {
1142 retriggers: HashSet::from([(
1143 Bytes::from("0x0badc0ffee"),
1144 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1145 )]),
1146 accessed_slots: HashMap::from([(
1147 Bytes::from("0x0badc0ffee"),
1148 HashSet::from([Bytes::from("0xbadbeef0")]),
1149 )]),
1150 },
1151 )],
1152 },
1153 )]
1154 .into_iter()
1155 .collect(),
1156 vm_storage: state_snapshot_vm()
1157 .accounts
1158 .into_iter()
1159 .map(|state| (state.address.clone(), state))
1160 .collect(),
1161 },
1162 deltas: None,
1163 removed_components: Default::default(),
1164 };
1165
1166 let snap = state_sync
1167 .get_snapshots(header, Some(&components_arg))
1168 .await
1169 .expect("Retrieving snapshot failed");
1170
1171 assert_eq!(snap, exp);
1172 }
1173
1174 #[test(tokio::test)]
1175 async fn test_get_snapshots_vm_with_tvl() {
1176 let header = BlockHeader::default();
1177 let mut rpc = MockRPCClient::new();
1178 rpc.expect_get_protocol_states()
1179 .returning(|_| Ok(state_snapshot_native()));
1180 rpc.expect_get_contract_state()
1181 .returning(|_| Ok(state_snapshot_vm()));
1182 rpc.expect_get_component_tvl()
1183 .returning(|_| Ok(component_tvl_snapshot()));
1184 rpc.expect_get_traced_entry_points()
1185 .returning(|_| {
1186 Ok(TracedEntryPointRequestResponse {
1187 traced_entry_points: HashMap::new(),
1188 pagination: PaginationResponse::new(0, 20, 0),
1189 })
1190 });
1191 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1192 let component = ProtocolComponent {
1193 id: "Component1".to_string(),
1194 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1195 ..Default::default()
1196 };
1197 state_sync
1198 .component_tracker
1199 .components
1200 .insert("Component1".to_string(), component.clone());
1201 let components_arg = ["Component1".to_string()];
1202 let exp = StateSyncMessage {
1203 header: header.clone(),
1204 snapshots: Snapshot {
1205 states: [(
1206 component.id.clone(),
1207 ComponentWithState {
1208 state: ResponseProtocolState {
1209 component_id: "Component1".to_string(),
1210 ..Default::default()
1211 },
1212 component: component.clone(),
1213 component_tvl: Some(100.0),
1214 entrypoints: vec![],
1215 },
1216 )]
1217 .into_iter()
1218 .collect(),
1219 vm_storage: state_snapshot_vm()
1220 .accounts
1221 .into_iter()
1222 .map(|state| (state.address.clone(), state))
1223 .collect(),
1224 },
1225 deltas: None,
1226 removed_components: Default::default(),
1227 };
1228
1229 let snap = state_sync
1230 .get_snapshots(header, Some(&components_arg))
1231 .await
1232 .expect("Retrieving snapshot failed");
1233
1234 assert_eq!(snap, exp);
1235 }
1236
1237 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1238 let mut rpc_client = MockRPCClient::new();
1239 rpc_client
1242 .expect_get_protocol_components()
1243 .with(mockall::predicate::function(
1244 move |request_params: &ProtocolComponentsRequestBody| {
1245 if let Some(ids) = request_params.component_ids.as_ref() {
1246 ids.contains(&"Component3".to_string())
1247 } else {
1248 false
1249 }
1250 },
1251 ))
1252 .returning(|_| {
1253 Ok(ProtocolComponentRequestResponse {
1255 protocol_components: vec![
1256 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1258 ],
1259 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1260 })
1261 });
1262 rpc_client
1263 .expect_get_protocol_states()
1264 .with(mockall::predicate::function(move |request_params: &ProtocolStateRequestBody| {
1265 let expected_id = "Component3".to_string();
1266 if let Some(ids) = request_params.protocol_ids.as_ref() {
1267 ids.contains(&expected_id)
1268 } else {
1269 false
1270 }
1271 }))
1272 .returning(|_| {
1273 Ok(ProtocolStateRequestResponse {
1275 states: vec![ResponseProtocolState {
1276 component_id: "Component3".to_string(),
1277 ..Default::default()
1278 }],
1279 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1280 })
1281 });
1282
1283 rpc_client
1285 .expect_get_protocol_components()
1286 .returning(|_| {
1287 Ok(ProtocolComponentRequestResponse {
1289 protocol_components: vec![
1290 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1292 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1294 ],
1296 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1297 })
1298 });
1299 rpc_client
1300 .expect_get_protocol_states()
1301 .returning(|_| {
1302 Ok(ProtocolStateRequestResponse {
1304 states: vec![
1305 ResponseProtocolState {
1306 component_id: "Component1".to_string(),
1307 ..Default::default()
1308 },
1309 ResponseProtocolState {
1310 component_id: "Component2".to_string(),
1311 ..Default::default()
1312 },
1313 ],
1314 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1315 })
1316 });
1317 rpc_client
1318 .expect_get_component_tvl()
1319 .returning(|_| {
1320 Ok(ComponentTvlRequestResponse {
1321 tvl: [
1322 ("Component1".to_string(), 100.0),
1323 ("Component2".to_string(), 0.0),
1324 ("Component3".to_string(), 1000.0),
1325 ]
1326 .into_iter()
1327 .collect(),
1328 pagination: PaginationResponse { page: 0, page_size: 20, total: 3 },
1329 })
1330 });
1331 rpc_client
1332 .expect_get_traced_entry_points()
1333 .returning(|_| {
1334 Ok(TracedEntryPointRequestResponse {
1335 traced_entry_points: HashMap::new(),
1336 pagination: PaginationResponse::new(0, 20, 0),
1337 })
1338 });
1339
1340 let mut deltas_client = MockDeltasClient::new();
1342 let (tx, rx) = channel(1);
1343 deltas_client
1344 .expect_subscribe()
1345 .return_once(move |_, _| {
1346 Ok((Uuid::default(), rx))
1348 });
1349
1350 deltas_client
1352 .expect_unsubscribe()
1353 .return_once(|_| Ok(()));
1354
1355 (rpc_client, deltas_client, tx)
1356 }
1357
1358 #[test(tokio::test)]
1365 async fn test_state_sync() {
1366 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1367 let deltas = [
1368 BlockChanges {
1369 extractor: "uniswap-v2".to_string(),
1370 chain: Chain::Ethereum,
1371 block: Block {
1372 number: 1,
1373 hash: Bytes::from("0x01"),
1374 parent_hash: Bytes::from("0x00"),
1375 chain: Chain::Ethereum,
1376 ts: Default::default(),
1377 },
1378 revert: false,
1379 dci_update: DCIUpdate {
1380 new_entrypoints: HashMap::from([(
1381 "Component1".to_string(),
1382 HashSet::from([EntryPoint {
1383 external_id: "entrypoint_a".to_string(),
1384 target: Bytes::from("0x0badc0ffee"),
1385 signature: "sig()".to_string(),
1386 }]),
1387 )]),
1388 new_entrypoint_params: HashMap::from([(
1389 "entrypoint_a".to_string(),
1390 HashSet::from([(
1391 TracingParams::RPCTracer(RPCTracerParams {
1392 caller: Some(Bytes::from("0x0badc0ffee")),
1393 calldata: Bytes::from("0x0badc0ffee"),
1394 state_overrides: None,
1395 prune_addresses: None,
1396 }),
1397 Some("Component1".to_string()),
1398 )]),
1399 )]),
1400 trace_results: HashMap::from([(
1401 "entrypoint_a".to_string(),
1402 TracingResult {
1403 retriggers: HashSet::from([(
1404 Bytes::from("0x0badc0ffee"),
1405 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1406 )]),
1407 accessed_slots: HashMap::from([(
1408 Bytes::from("0x0badc0ffee"),
1409 HashSet::from([Bytes::from("0xbadbeef0")]),
1410 )]),
1411 },
1412 )]),
1413 },
1414 ..Default::default()
1415 },
1416 BlockChanges {
1417 extractor: "uniswap-v2".to_string(),
1418 chain: Chain::Ethereum,
1419 block: Block {
1420 number: 2,
1421 hash: Bytes::from("0x02"),
1422 parent_hash: Bytes::from("0x01"),
1423 chain: Chain::Ethereum,
1424 ts: Default::default(),
1425 },
1426 revert: false,
1427 component_tvl: [
1428 ("Component1".to_string(), 100.0),
1429 ("Component2".to_string(), 0.0),
1430 ("Component3".to_string(), 1000.0),
1431 ]
1432 .into_iter()
1433 .collect(),
1434 ..Default::default()
1435 },
1436 ];
1437 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1438 state_sync
1439 .initialize()
1440 .await
1441 .expect("Init failed");
1442
1443 let (handle, mut rx) = state_sync.start().await;
1445 let (jh, close_tx) = handle.split();
1446 tx.send(deltas[0].clone())
1447 .await
1448 .expect("deltas channel msg 0 closed!");
1449 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1450 .await
1451 .expect("waiting for first state msg timed out!")
1452 .expect("state sync block sender closed!");
1453 tx.send(deltas[1].clone())
1454 .await
1455 .expect("deltas channel msg 1 closed!");
1456 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1457 .await
1458 .expect("waiting for second state msg timed out!")
1459 .expect("state sync block sender closed!");
1460 let _ = close_tx.send(());
1461 jh.await
1462 .expect("state sync task panicked!");
1463
1464 let exp1 = StateSyncMessage {
1466 header: BlockHeader {
1467 number: 1,
1468 hash: Bytes::from("0x01"),
1469 parent_hash: Bytes::from("0x00"),
1470 revert: false,
1471 ..Default::default()
1472 },
1473 snapshots: Snapshot {
1474 states: [
1475 (
1476 "Component1".to_string(),
1477 ComponentWithState {
1478 state: ResponseProtocolState {
1479 component_id: "Component1".to_string(),
1480 ..Default::default()
1481 },
1482 component: ProtocolComponent {
1483 id: "Component1".to_string(),
1484 ..Default::default()
1485 },
1486 component_tvl: Some(100.0),
1487 entrypoints: vec![],
1488 },
1489 ),
1490 (
1491 "Component2".to_string(),
1492 ComponentWithState {
1493 state: ResponseProtocolState {
1494 component_id: "Component2".to_string(),
1495 ..Default::default()
1496 },
1497 component: ProtocolComponent {
1498 id: "Component2".to_string(),
1499 ..Default::default()
1500 },
1501 component_tvl: Some(0.0),
1502 entrypoints: vec![],
1503 },
1504 ),
1505 ]
1506 .into_iter()
1507 .collect(),
1508 vm_storage: HashMap::new(),
1509 },
1510 deltas: Some(deltas[0].clone()),
1511 removed_components: Default::default(),
1512 };
1513
1514 let exp2 = StateSyncMessage {
1515 header: BlockHeader {
1516 number: 2,
1517 hash: Bytes::from("0x02"),
1518 parent_hash: Bytes::from("0x01"),
1519 revert: false,
1520 ..Default::default()
1521 },
1522 snapshots: Snapshot {
1523 states: [
1524 (
1526 "Component3".to_string(),
1527 ComponentWithState {
1528 state: ResponseProtocolState {
1529 component_id: "Component3".to_string(),
1530 ..Default::default()
1531 },
1532 component: ProtocolComponent {
1533 id: "Component3".to_string(),
1534 ..Default::default()
1535 },
1536 component_tvl: Some(1000.0),
1537 entrypoints: vec![],
1538 },
1539 ),
1540 ]
1541 .into_iter()
1542 .collect(),
1543 vm_storage: HashMap::new(),
1544 },
1545 deltas: Some(BlockChanges {
1548 extractor: "uniswap-v2".to_string(),
1549 chain: Chain::Ethereum,
1550 block: Block {
1551 number: 2,
1552 hash: Bytes::from("0x02"),
1553 parent_hash: Bytes::from("0x01"),
1554 chain: Chain::Ethereum,
1555 ts: Default::default(),
1556 },
1557 revert: false,
1558 component_tvl: [
1559 ("Component1".to_string(), 100.0),
1561 ("Component3".to_string(), 1000.0),
1562 ]
1563 .into_iter()
1564 .collect(),
1565 ..Default::default()
1566 }),
1567 removed_components: [(
1569 "Component2".to_string(),
1570 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1571 )]
1572 .into_iter()
1573 .collect(),
1574 };
1575 assert_eq!(first_msg.unwrap(), exp1);
1576 assert_eq!(second_msg.unwrap(), exp2);
1577 }
1578
1579 #[test(tokio::test)]
1580 async fn test_state_sync_with_tvl_range() {
1581 let remove_tvl_threshold = 5.0;
1583 let add_tvl_threshold = 7.0;
1584
1585 let mut rpc_client = MockRPCClient::new();
1586 let mut deltas_client = MockDeltasClient::new();
1587
1588 rpc_client
1589 .expect_get_protocol_components()
1590 .with(mockall::predicate::function(
1591 move |request_params: &ProtocolComponentsRequestBody| {
1592 if let Some(ids) = request_params.component_ids.as_ref() {
1593 ids.contains(&"Component3".to_string())
1594 } else {
1595 false
1596 }
1597 },
1598 ))
1599 .returning(|_| {
1600 Ok(ProtocolComponentRequestResponse {
1601 protocol_components: vec![ProtocolComponent {
1602 id: "Component3".to_string(),
1603 ..Default::default()
1604 }],
1605 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1606 })
1607 });
1608 rpc_client
1609 .expect_get_protocol_states()
1610 .with(mockall::predicate::function(move |request_params: &ProtocolStateRequestBody| {
1611 let expected_id = "Component3".to_string();
1612 if let Some(ids) = request_params.protocol_ids.as_ref() {
1613 ids.contains(&expected_id)
1614 } else {
1615 false
1616 }
1617 }))
1618 .returning(|_| {
1619 Ok(ProtocolStateRequestResponse {
1620 states: vec![ResponseProtocolState {
1621 component_id: "Component3".to_string(),
1622 ..Default::default()
1623 }],
1624 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1625 })
1626 });
1627
1628 rpc_client
1630 .expect_get_protocol_components()
1631 .returning(|_| {
1632 Ok(ProtocolComponentRequestResponse {
1633 protocol_components: vec![
1634 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1635 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1636 ],
1637 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1638 })
1639 });
1640 rpc_client
1641 .expect_get_protocol_states()
1642 .returning(|_| {
1643 Ok(ProtocolStateRequestResponse {
1644 states: vec![
1645 ResponseProtocolState {
1646 component_id: "Component1".to_string(),
1647 ..Default::default()
1648 },
1649 ResponseProtocolState {
1650 component_id: "Component2".to_string(),
1651 ..Default::default()
1652 },
1653 ],
1654 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1655 })
1656 });
1657 rpc_client
1658 .expect_get_traced_entry_points()
1659 .returning(|_| {
1660 Ok(TracedEntryPointRequestResponse {
1661 traced_entry_points: HashMap::new(),
1662 pagination: PaginationResponse::new(0, 20, 0),
1663 })
1664 });
1665
1666 rpc_client
1667 .expect_get_component_tvl()
1668 .returning(|_| {
1669 Ok(ComponentTvlRequestResponse {
1670 tvl: [
1671 ("Component1".to_string(), 6.0),
1672 ("Component2".to_string(), 2.0),
1673 ("Component3".to_string(), 10.0),
1674 ]
1675 .into_iter()
1676 .collect(),
1677 pagination: PaginationResponse { page: 0, page_size: 20, total: 3 },
1678 })
1679 });
1680
1681 rpc_client
1682 .expect_get_component_tvl()
1683 .returning(|_| {
1684 Ok(ComponentTvlRequestResponse {
1685 tvl: [
1686 ("Component1".to_string(), 6.0),
1687 ("Component2".to_string(), 2.0),
1688 ("Component3".to_string(), 10.0),
1689 ]
1690 .into_iter()
1691 .collect(),
1692 pagination: PaginationResponse { page: 0, page_size: 20, total: 3 },
1693 })
1694 });
1695
1696 let (tx, rx) = channel(1);
1697 deltas_client
1698 .expect_subscribe()
1699 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1700
1701 deltas_client
1703 .expect_unsubscribe()
1704 .return_once(|_| Ok(()));
1705
1706 let mut state_sync = ProtocolStateSynchronizer::new(
1707 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1708 true,
1709 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1710 1,
1711 Duration::from_secs(0),
1712 true,
1713 true,
1714 ArcRPCClient(Arc::new(rpc_client)),
1715 ArcDeltasClient(Arc::new(deltas_client)),
1716 10_u64,
1717 );
1718 state_sync
1719 .initialize()
1720 .await
1721 .expect("Init failed");
1722
1723 let deltas = [
1725 BlockChanges {
1726 extractor: "uniswap-v2".to_string(),
1727 chain: Chain::Ethereum,
1728 block: Block {
1729 number: 1,
1730 hash: Bytes::from("0x01"),
1731 parent_hash: Bytes::from("0x00"),
1732 chain: Chain::Ethereum,
1733 ts: Default::default(),
1734 },
1735 revert: false,
1736 ..Default::default()
1737 },
1738 BlockChanges {
1739 extractor: "uniswap-v2".to_string(),
1740 chain: Chain::Ethereum,
1741 block: Block {
1742 number: 2,
1743 hash: Bytes::from("0x02"),
1744 parent_hash: Bytes::from("0x01"),
1745 chain: Chain::Ethereum,
1746 ts: Default::default(),
1747 },
1748 revert: false,
1749 component_tvl: [
1750 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1754 .into_iter()
1755 .collect(),
1756 ..Default::default()
1757 },
1758 ];
1759
1760 let (handle, mut rx) = state_sync.start().await;
1761 let (jh, close_tx) = handle.split();
1762
1763 tx.send(deltas[0].clone())
1765 .await
1766 .expect("deltas channel msg 0 closed!");
1767
1768 let _ = timeout(Duration::from_millis(100), rx.recv())
1770 .await
1771 .expect("waiting for first state msg timed out!")
1772 .expect("state sync block sender closed!");
1773
1774 tx.send(deltas[1].clone())
1776 .await
1777 .expect("deltas channel msg 1 closed!");
1778 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1779 .await
1780 .expect("waiting for second state msg timed out!")
1781 .expect("state sync block sender closed!")
1782 .expect("no error");
1783
1784 let _ = close_tx.send(());
1785 jh.await
1786 .expect("state sync task panicked!");
1787
1788 let expected_second_msg = StateSyncMessage {
1789 header: BlockHeader {
1790 number: 2,
1791 hash: Bytes::from("0x02"),
1792 parent_hash: Bytes::from("0x01"),
1793 revert: false,
1794 ..Default::default()
1795 },
1796 snapshots: Snapshot {
1797 states: [(
1798 "Component3".to_string(),
1799 ComponentWithState {
1800 state: ResponseProtocolState {
1801 component_id: "Component3".to_string(),
1802 ..Default::default()
1803 },
1804 component: ProtocolComponent {
1805 id: "Component3".to_string(),
1806 ..Default::default()
1807 },
1808 component_tvl: Some(10.0),
1809 entrypoints: vec![], },
1811 )]
1812 .into_iter()
1813 .collect(),
1814 vm_storage: HashMap::new(),
1815 },
1816 deltas: Some(BlockChanges {
1817 extractor: "uniswap-v2".to_string(),
1818 chain: Chain::Ethereum,
1819 block: Block {
1820 number: 2,
1821 hash: Bytes::from("0x02"),
1822 parent_hash: Bytes::from("0x01"),
1823 chain: Chain::Ethereum,
1824 ts: Default::default(),
1825 },
1826 revert: false,
1827 component_tvl: [
1828 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
1831 .into_iter()
1832 .collect(),
1833 ..Default::default()
1834 }),
1835 removed_components: [(
1836 "Component2".to_string(),
1837 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1838 )]
1839 .into_iter()
1840 .collect(),
1841 };
1842
1843 assert_eq!(second_msg, expected_second_msg);
1844 }
1845
1846 #[test(tokio::test)]
1847 async fn test_public_close_api_functionality() {
1848 let mut rpc_client = MockRPCClient::new();
1855 let mut deltas_client = MockDeltasClient::new();
1856
1857 rpc_client
1859 .expect_get_protocol_components()
1860 .returning(|_| {
1861 Ok(ProtocolComponentRequestResponse {
1862 protocol_components: vec![],
1863 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1864 })
1865 });
1866
1867 let (_tx, rx) = channel(1);
1869 deltas_client
1870 .expect_subscribe()
1871 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1872
1873 deltas_client
1875 .expect_unsubscribe()
1876 .return_once(|_| Ok(()));
1877
1878 let mut state_sync = ProtocolStateSynchronizer::new(
1879 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1880 true,
1881 ComponentFilter::with_tvl_range(0.0, 0.0),
1882 5, Duration::from_secs(0),
1884 true,
1885 false,
1886 ArcRPCClient(Arc::new(rpc_client)),
1887 ArcDeltasClient(Arc::new(deltas_client)),
1888 10000_u64, );
1890
1891 state_sync
1892 .initialize()
1893 .await
1894 .expect("Init should succeed");
1895
1896 let (handle, _rx) = state_sync.start().await;
1898 let (jh, close_tx) = handle.split();
1899
1900 tokio::time::sleep(Duration::from_millis(100)).await;
1902
1903 close_tx
1905 .send(())
1906 .expect("Should be able to send close signal");
1907 jh.await.expect("Task should not panic");
1909 }
1910
1911 #[test(tokio::test)]
1912 async fn test_cleanup_runs_when_state_sync_processing_errors() {
1913 let mut rpc_client = MockRPCClient::new();
1918 let mut deltas_client = MockDeltasClient::new();
1919
1920 rpc_client
1922 .expect_get_protocol_components()
1923 .returning(|_| {
1924 Ok(ProtocolComponentRequestResponse {
1925 protocol_components: vec![],
1926 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1927 })
1928 });
1929
1930 rpc_client
1932 .expect_get_protocol_states()
1933 .returning(|_| {
1934 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1935 });
1936
1937 let (tx, rx) = channel(10);
1939 deltas_client
1940 .expect_subscribe()
1941 .return_once(move |_, _| {
1942 let delta = BlockChanges {
1944 extractor: "test".to_string(),
1945 chain: Chain::Ethereum,
1946 block: Block {
1947 hash: Bytes::from("0x0123"),
1948 number: 1,
1949 parent_hash: Bytes::from("0x0000"),
1950 chain: Chain::Ethereum,
1951 ts: chrono::DateTime::from_timestamp(1234567890, 0)
1952 .unwrap()
1953 .naive_utc(),
1954 },
1955 revert: false,
1956 new_protocol_components: [(
1958 "new_component".to_string(),
1959 ProtocolComponent {
1960 id: "new_component".to_string(),
1961 protocol_system: "test_protocol".to_string(),
1962 protocol_type_name: "test".to_string(),
1963 chain: Chain::Ethereum,
1964 tokens: vec![Bytes::from("0x0badc0ffee")],
1965 contract_ids: vec![Bytes::from("0x0badc0ffee")],
1966 static_attributes: Default::default(),
1967 creation_tx: Default::default(),
1968 created_at: Default::default(),
1969 change: Default::default(),
1970 },
1971 )]
1972 .into_iter()
1973 .collect(),
1974 component_tvl: [("new_component".to_string(), 100.0)]
1975 .into_iter()
1976 .collect(),
1977 ..Default::default()
1978 };
1979
1980 tokio::spawn(async move {
1981 let _ = tx.send(delta).await;
1982 });
1984
1985 Ok((Uuid::default(), rx))
1986 });
1987
1988 deltas_client
1990 .expect_unsubscribe()
1991 .return_once(|_| Ok(()));
1992
1993 let mut state_sync = ProtocolStateSynchronizer::new(
1994 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1995 true,
1996 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
1998 Duration::from_secs(0),
1999 true,
2000 false,
2001 ArcRPCClient(Arc::new(rpc_client)),
2002 ArcDeltasClient(Arc::new(deltas_client)),
2003 5000_u64,
2004 );
2005
2006 state_sync
2007 .initialize()
2008 .await
2009 .expect("Init should succeed");
2010
2011 state_sync.last_synced_block = Some(BlockHeader {
2013 hash: Bytes::from("0x0badc0ffee"),
2014 number: 42,
2015 parent_hash: Bytes::from("0xbadbeef0"),
2016 revert: false,
2017 timestamp: 123456789,
2018 });
2019
2020 let (mut block_tx, _block_rx) = channel(10);
2022
2023 let (_end_tx, end_rx) = oneshot::channel::<()>();
2025 let result = state_sync
2026 .state_sync(&mut block_tx, end_rx)
2027 .await;
2028 assert!(result.is_err(), "state_sync should have errored during processing");
2030
2031 }
2034
2035 #[test(tokio::test)]
2036 async fn test_close_signal_while_waiting_for_first_deltas() {
2037 let mut rpc_client = MockRPCClient::new();
2041 let mut deltas_client = MockDeltasClient::new();
2042
2043 rpc_client
2044 .expect_get_protocol_components()
2045 .returning(|_| {
2046 Ok(ProtocolComponentRequestResponse {
2047 protocol_components: vec![],
2048 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2049 })
2050 });
2051
2052 let (_tx, rx) = channel(1);
2053 deltas_client
2054 .expect_subscribe()
2055 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2056
2057 deltas_client
2058 .expect_unsubscribe()
2059 .return_once(|_| Ok(()));
2060
2061 let mut state_sync = ProtocolStateSynchronizer::new(
2062 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2063 true,
2064 ComponentFilter::with_tvl_range(0.0, 0.0),
2065 1,
2066 Duration::from_secs(0),
2067 true,
2068 false,
2069 ArcRPCClient(Arc::new(rpc_client)),
2070 ArcDeltasClient(Arc::new(deltas_client)),
2071 10000_u64,
2072 );
2073
2074 state_sync
2075 .initialize()
2076 .await
2077 .expect("Init should succeed");
2078
2079 let (mut block_tx, _block_rx) = channel(10);
2080 let (end_tx, end_rx) = oneshot::channel::<()>();
2081
2082 let state_sync_handle = tokio::spawn(async move {
2084 state_sync
2085 .state_sync(&mut block_tx, end_rx)
2086 .await
2087 });
2088
2089 tokio::time::sleep(Duration::from_millis(100)).await;
2091
2092 let _ = end_tx.send(());
2094
2095 let result = state_sync_handle
2097 .await
2098 .expect("Task should not panic");
2099 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2100
2101 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2102 }
2103
2104 #[test(tokio::test)]
2105 async fn test_close_signal_during_main_processing_loop() {
2106 let mut rpc_client = MockRPCClient::new();
2112 let mut deltas_client = MockDeltasClient::new();
2113
2114 rpc_client
2116 .expect_get_protocol_components()
2117 .returning(|_| {
2118 Ok(ProtocolComponentRequestResponse {
2119 protocol_components: vec![],
2120 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2121 })
2122 });
2123
2124 rpc_client
2126 .expect_get_protocol_states()
2127 .returning(|_| {
2128 Ok(ProtocolStateRequestResponse {
2129 states: vec![],
2130 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2131 })
2132 });
2133
2134 rpc_client
2135 .expect_get_component_tvl()
2136 .returning(|_| {
2137 Ok(ComponentTvlRequestResponse {
2138 tvl: HashMap::new(),
2139 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2140 })
2141 });
2142
2143 rpc_client
2144 .expect_get_traced_entry_points()
2145 .returning(|_| {
2146 Ok(TracedEntryPointRequestResponse {
2147 traced_entry_points: HashMap::new(),
2148 pagination: PaginationResponse::new(0, 20, 0),
2149 })
2150 });
2151
2152 let (tx, rx) = channel(10);
2154 deltas_client
2155 .expect_subscribe()
2156 .return_once(move |_, _| {
2157 let first_delta = BlockChanges {
2159 extractor: "test".to_string(),
2160 chain: Chain::Ethereum,
2161 block: Block {
2162 hash: Bytes::from("0x0123"),
2163 number: 1,
2164 parent_hash: Bytes::from("0x0000"),
2165 chain: Chain::Ethereum,
2166 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2167 .unwrap()
2168 .naive_utc(),
2169 },
2170 revert: false,
2171 ..Default::default()
2172 };
2173
2174 tokio::spawn(async move {
2175 let _ = tx.send(first_delta).await;
2176 tokio::time::sleep(Duration::from_secs(30)).await;
2179 });
2180
2181 Ok((Uuid::default(), rx))
2182 });
2183
2184 deltas_client
2185 .expect_unsubscribe()
2186 .return_once(|_| Ok(()));
2187
2188 let mut state_sync = ProtocolStateSynchronizer::new(
2189 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2190 true,
2191 ComponentFilter::with_tvl_range(0.0, 1000.0),
2192 1,
2193 Duration::from_secs(0),
2194 true,
2195 false,
2196 ArcRPCClient(Arc::new(rpc_client)),
2197 ArcDeltasClient(Arc::new(deltas_client)),
2198 10000_u64,
2199 );
2200
2201 state_sync
2202 .initialize()
2203 .await
2204 .expect("Init should succeed");
2205
2206 let (mut block_tx, mut block_rx) = channel(10);
2207 let (end_tx, end_rx) = oneshot::channel::<()>();
2208
2209 let state_sync_handle = tokio::spawn(async move {
2211 state_sync
2212 .state_sync(&mut block_tx, end_rx)
2213 .await
2214 });
2215
2216 let first_snapshot = block_rx
2218 .recv()
2219 .await
2220 .expect("Should receive first snapshot")
2221 .expect("Synchronizer error");
2222 assert!(
2223 !first_snapshot
2224 .snapshots
2225 .states
2226 .is_empty() ||
2227 first_snapshot.deltas.is_some()
2228 );
2229 let _ = end_tx.send(());
2231
2232 let result = state_sync_handle
2234 .await
2235 .expect("Task should not panic");
2236 assert!(
2237 result.is_ok(),
2238 "state_sync should exit cleanly when closed after first message: {result:?}"
2239 );
2240 }
2241
2242 #[test(tokio::test)]
2243 async fn test_max_retries_exceeded_error_propagation() {
2244 let mut rpc_client = MockRPCClient::new();
2248 let mut deltas_client = MockDeltasClient::new();
2249
2250 rpc_client
2252 .expect_get_protocol_components()
2253 .returning(|_| {
2254 Ok(ProtocolComponentRequestResponse {
2255 protocol_components: vec![],
2256 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2257 })
2258 });
2259
2260 deltas_client
2263 .expect_subscribe()
2264 .returning(|_, _| {
2265 Err(DeltasError::NotConnected)
2267 });
2268
2269 deltas_client
2271 .expect_unsubscribe()
2272 .returning(|_| Ok(()))
2273 .times(0..=5);
2274
2275 let mut state_sync = ProtocolStateSynchronizer::new(
2277 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2278 true,
2279 ComponentFilter::with_tvl_range(0.0, 1000.0),
2280 2, Duration::from_millis(10), true,
2283 false,
2284 ArcRPCClient(Arc::new(rpc_client)),
2285 ArcDeltasClient(Arc::new(deltas_client)),
2286 1000_u64,
2287 );
2288
2289 state_sync
2290 .initialize()
2291 .await
2292 .expect("Init should succeed");
2293
2294 let (handle, mut rx) = state_sync.start().await;
2296 let (jh, _close_tx) = handle.split();
2297
2298 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2299 .await
2300 .expect("responsds in time")
2301 .expect("channel open");
2302
2303 if let Err(err) = res {
2305 assert!(
2306 matches!(err, SynchronizerError::ConnectionClosed),
2307 "Expected ConnectionClosed error, got: {:?}",
2308 err
2309 );
2310 } else {
2311 panic!("Expected an error")
2312 }
2313
2314 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2316 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2317 }
2318
2319 #[test(tokio::test)]
2320 async fn test_is_next_expected() {
2321 let mut state_sync = with_mocked_clients(true, false, None, None);
2325
2326 let incoming_header = BlockHeader {
2328 number: 100,
2329 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2330 parent_hash: Bytes::from(
2331 "0x0000000000000000000000000000000000000000000000000000000000000000",
2332 ),
2333 revert: false,
2334 timestamp: 123456789,
2335 };
2336 assert!(
2337 !state_sync.is_next_expected(&incoming_header),
2338 "Should return false when no previous block is set"
2339 );
2340
2341 let previous_header = BlockHeader {
2343 number: 99,
2344 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2345 parent_hash: Bytes::from(
2346 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2347 ),
2348 revert: false,
2349 timestamp: 123456788,
2350 };
2351 state_sync.last_synced_block = Some(previous_header.clone());
2352
2353 assert!(
2354 state_sync.is_next_expected(&incoming_header),
2355 "Should return true when incoming parent_hash matches previous hash"
2356 );
2357
2358 let non_matching_header = BlockHeader {
2360 number: 100,
2361 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2362 parent_hash: Bytes::from(
2363 "0x1111111111111111111111111111111111111111111111111111111111111111",
2364 ), revert: false,
2366 timestamp: 123456789,
2367 };
2368 assert!(
2369 !state_sync.is_next_expected(&non_matching_header),
2370 "Should return false when incoming parent_hash doesn't match previous hash"
2371 );
2372 }
2373
2374 #[test(tokio::test)]
2375 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2376 let mut rpc_client = MockRPCClient::new();
2380 let mut deltas_client = MockDeltasClient::new();
2381
2382 rpc_client
2384 .expect_get_protocol_components()
2385 .returning(|_| {
2386 Ok(ProtocolComponentRequestResponse {
2387 protocol_components: vec![ProtocolComponent {
2388 id: "Component1".to_string(),
2389 ..Default::default()
2390 }],
2391 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2392 })
2393 });
2394
2395 let (tx, rx) = channel(10);
2397 deltas_client
2398 .expect_subscribe()
2399 .return_once(move |_, _| {
2400 let expected_next_delta = BlockChanges {
2401 extractor: "uniswap-v2".to_string(),
2402 chain: Chain::Ethereum,
2403 block: Block {
2404 hash: Bytes::from(
2405 "0x0000000000000000000000000000000000000000000000000000000000000002",
2406 ), number: 2,
2408 parent_hash: Bytes::from(
2409 "0x0000000000000000000000000000000000000000000000000000000000000001",
2410 ), chain: Chain::Ethereum,
2412 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2413 .unwrap()
2414 .naive_utc(),
2415 },
2416 revert: false,
2417 ..Default::default()
2418 };
2419
2420 tokio::spawn(async move {
2421 let _ = tx.send(expected_next_delta).await;
2422 });
2423
2424 Ok((Uuid::default(), rx))
2425 });
2426
2427 deltas_client
2428 .expect_unsubscribe()
2429 .return_once(|_| Ok(()));
2430
2431 let mut state_sync = ProtocolStateSynchronizer::new(
2432 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2433 true,
2434 ComponentFilter::with_tvl_range(0.0, 1000.0),
2435 1,
2436 Duration::from_secs(0),
2437 true, false,
2439 ArcRPCClient(Arc::new(rpc_client)),
2440 ArcDeltasClient(Arc::new(deltas_client)),
2441 10000_u64,
2442 );
2443
2444 state_sync
2446 .initialize()
2447 .await
2448 .expect("Init should succeed");
2449
2450 state_sync.last_synced_block = Some(BlockHeader {
2452 number: 1,
2453 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2455 "0x0000000000000000000000000000000000000000000000000000000000000000",
2456 ),
2457 revert: false,
2458 timestamp: 123456789,
2459 });
2460
2461 let (mut block_tx, mut block_rx) = channel(10);
2462 let (end_tx, end_rx) = oneshot::channel::<()>();
2463
2464 let state_sync_handle = tokio::spawn(async move {
2466 state_sync
2467 .state_sync(&mut block_tx, end_rx)
2468 .await
2469 });
2470
2471 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2473 .await
2474 .expect("Should receive message within timeout")
2475 .expect("Channel should be open")
2476 .expect("Should not be an error");
2477
2478 let _ = end_tx.send(());
2480
2481 let _ = state_sync_handle
2483 .await
2484 .expect("Task should not panic");
2485
2486 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2489 assert!(
2490 result_msg.snapshots.states.is_empty(),
2491 "Should not contain snapshots when next expected block is received"
2492 );
2493
2494 if let Some(deltas) = &result_msg.deltas {
2496 assert_eq!(deltas.block.number, 2);
2497 assert_eq!(
2498 deltas.block.hash,
2499 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2500 );
2501 assert_eq!(
2502 deltas.block.parent_hash,
2503 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2504 );
2505 }
2506 }
2507
2508 #[test(tokio::test)]
2509 async fn test_skip_previously_processed_messages() {
2510 let mut rpc_client = MockRPCClient::new();
2514 let mut deltas_client = MockDeltasClient::new();
2515
2516 rpc_client
2518 .expect_get_protocol_components()
2519 .returning(|_| {
2520 Ok(ProtocolComponentRequestResponse {
2521 protocol_components: vec![ProtocolComponent {
2522 id: "Component1".to_string(),
2523 ..Default::default()
2524 }],
2525 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2526 })
2527 });
2528
2529 rpc_client
2531 .expect_get_protocol_states()
2532 .returning(|_| {
2533 Ok(ProtocolStateRequestResponse {
2534 states: vec![ResponseProtocolState {
2535 component_id: "Component1".to_string(),
2536 ..Default::default()
2537 }],
2538 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2539 })
2540 });
2541
2542 rpc_client
2543 .expect_get_component_tvl()
2544 .returning(|_| {
2545 Ok(ComponentTvlRequestResponse {
2546 tvl: [("Component1".to_string(), 100.0)]
2547 .into_iter()
2548 .collect(),
2549 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2550 })
2551 });
2552
2553 rpc_client
2554 .expect_get_traced_entry_points()
2555 .returning(|_| {
2556 Ok(TracedEntryPointRequestResponse {
2557 traced_entry_points: HashMap::new(),
2558 pagination: PaginationResponse::new(0, 20, 0),
2559 })
2560 });
2561
2562 let (tx, rx) = channel(10);
2564 deltas_client
2565 .expect_subscribe()
2566 .return_once(move |_, _| {
2567 let old_messages = vec![
2569 BlockChanges {
2570 extractor: "uniswap-v2".to_string(),
2571 chain: Chain::Ethereum,
2572 block: Block {
2573 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2574 number: 3,
2575 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2576 chain: Chain::Ethereum,
2577 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2578 },
2579 revert: false,
2580 ..Default::default()
2581 },
2582 BlockChanges {
2583 extractor: "uniswap-v2".to_string(),
2584 chain: Chain::Ethereum,
2585 block: Block {
2586 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2587 number: 4,
2588 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2589 chain: Chain::Ethereum,
2590 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2591 },
2592 revert: false,
2593 ..Default::default()
2594 },
2595 BlockChanges {
2596 extractor: "uniswap-v2".to_string(),
2597 chain: Chain::Ethereum,
2598 block: Block {
2599 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2600 number: 5,
2601 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2602 chain: Chain::Ethereum,
2603 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2604 },
2605 revert: false,
2606 ..Default::default()
2607 },
2608 BlockChanges {
2610 extractor: "uniswap-v2".to_string(),
2611 chain: Chain::Ethereum,
2612 block: Block {
2613 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2614 number: 6,
2615 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2616 chain: Chain::Ethereum,
2617 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2618 },
2619 revert: false,
2620 ..Default::default()
2621 },
2622 ];
2623
2624 tokio::spawn(async move {
2625 for message in old_messages {
2626 let _ = tx.send(message).await;
2627 tokio::time::sleep(Duration::from_millis(10)).await;
2628 }
2629 });
2630
2631 Ok((Uuid::default(), rx))
2632 });
2633
2634 deltas_client
2635 .expect_unsubscribe()
2636 .return_once(|_| Ok(()));
2637
2638 let mut state_sync = ProtocolStateSynchronizer::new(
2639 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2640 true,
2641 ComponentFilter::with_tvl_range(0.0, 1000.0),
2642 1,
2643 Duration::from_secs(0),
2644 true,
2645 true,
2646 ArcRPCClient(Arc::new(rpc_client)),
2647 ArcDeltasClient(Arc::new(deltas_client)),
2648 10000_u64,
2649 );
2650
2651 state_sync
2653 .initialize()
2654 .await
2655 .expect("Init should succeed");
2656
2657 state_sync.last_synced_block = Some(BlockHeader {
2658 number: 5,
2659 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2660 parent_hash: Bytes::from(
2661 "0x0000000000000000000000000000000000000000000000000000000000000004",
2662 ),
2663 revert: false,
2664 timestamp: 1234567892,
2665 });
2666
2667 let (mut block_tx, mut block_rx) = channel(10);
2668 let (end_tx, end_rx) = oneshot::channel::<()>();
2669
2670 let state_sync_handle = tokio::spawn(async move {
2672 state_sync
2673 .state_sync(&mut block_tx, end_rx)
2674 .await
2675 });
2676
2677 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2679 .await
2680 .expect("Should receive message within timeout")
2681 .expect("Channel should be open")
2682 .expect("Should not be an error");
2683
2684 let _ = end_tx.send(());
2686
2687 let _ = state_sync_handle
2689 .await
2690 .expect("Task should not panic");
2691
2692 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2694 if let Some(deltas) = &result_msg.deltas {
2695 assert_eq!(
2696 deltas.block.number, 6,
2697 "Should only process block 6, skipping earlier blocks"
2698 );
2699 assert_eq!(
2700 deltas.block.hash,
2701 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2702 );
2703 }
2704
2705 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2708 Err(_) => {
2709 }
2711 Ok(Some(Err(_))) => {
2712 }
2714 Ok(Some(Ok(_))) => {
2715 panic!("Should not receive additional messages - old blocks should be skipped");
2716 }
2717 Ok(None) => {
2718 }
2720 }
2721 }
2722}