1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59
60 #[error("Internal error: {0}")]
62 Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68 fn from(err: SendError<T>) -> Self {
69 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70 }
71}
72
73impl From<DeltasError> for SynchronizerError {
74 fn from(err: DeltasError) -> Self {
75 match err {
76 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77 _ => SynchronizerError::ConnectionError(err.to_string()),
78 }
79 }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83 extractor_id: ExtractorIdentity,
84 retrieve_balances: bool,
85 rpc_client: R,
86 deltas_client: D,
87 max_retries: u64,
88 retry_cooldown: Duration,
89 include_snapshots: bool,
90 component_tracker: ComponentTracker<R>,
91 last_synced_block: Option<BlockHeader>,
92 timeout: u64,
93 include_tvl: bool,
94 compression: bool,
95 partial_blocks: bool,
96}
97
98#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
99pub struct ComponentWithState {
100 pub state: ResponseProtocolState,
101 pub component: ProtocolComponent,
102 pub component_tvl: Option<f64>,
103 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
104}
105
106#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
107pub struct Snapshot {
108 pub states: HashMap<String, ComponentWithState>,
109 pub vm_storage: HashMap<Bytes, ResponseAccount>,
110}
111
112impl Snapshot {
113 fn extend(&mut self, other: Snapshot) {
114 self.states.extend(other.states);
115 self.vm_storage.extend(other.vm_storage);
116 }
117
118 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
119 &self.states
120 }
121
122 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
123 &self.vm_storage
124 }
125}
126
127#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
128pub struct StateSyncMessage<H>
129where
130 H: HeaderLike,
131{
132 pub header: H,
134 pub snapshots: Snapshot,
136 pub deltas: Option<BlockChanges>,
140 pub removed_components: HashMap<String, ProtocolComponent>,
142}
143
144impl<H> StateSyncMessage<H>
145where
146 H: HeaderLike,
147{
148 pub fn merge(mut self, other: Self) -> Self {
149 self.removed_components
151 .retain(|k, _| !other.snapshots.states.contains_key(k));
152 self.snapshots
153 .states
154 .retain(|k, _| !other.removed_components.contains_key(k));
155
156 self.snapshots.extend(other.snapshots);
157 let deltas = match (self.deltas, other.deltas) {
158 (Some(l), Some(r)) => Some(l.merge(r)),
159 (None, Some(r)) => Some(r),
160 (Some(l), None) => Some(l),
161 (None, None) => None,
162 };
163 self.removed_components
164 .extend(other.removed_components);
165 Self {
166 header: other.header,
167 snapshots: self.snapshots,
168 deltas,
169 removed_components: self.removed_components,
170 }
171 }
172}
173
174pub struct SynchronizerTaskHandle {
179 join_handle: JoinHandle<()>,
180 close_tx: oneshot::Sender<()>,
181}
182
183impl SynchronizerTaskHandle {
192 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
193 Self { join_handle, close_tx }
194 }
195
196 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
202 (self.join_handle, self.close_tx)
203 }
204}
205
206#[async_trait]
207pub trait StateSynchronizer: Send + Sync + 'static {
208 async fn initialize(&mut self) -> SyncResult<()>;
209 async fn start(
212 mut self,
213 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
214}
215
216impl<R, D> ProtocolStateSynchronizer<R, D>
217where
218 R: RPCClient + Clone + Send + Sync + 'static,
221 D: DeltasClient + Clone + Send + Sync + 'static,
222{
223 #[allow(clippy::too_many_arguments)]
225 pub fn new(
226 extractor_id: ExtractorIdentity,
227 retrieve_balances: bool,
228 component_filter: ComponentFilter,
229 max_retries: u64,
230 retry_cooldown: Duration,
231 include_snapshots: bool,
232 include_tvl: bool,
233 compression: bool,
234 rpc_client: R,
235 deltas_client: D,
236 timeout: u64,
237 ) -> Self {
238 Self {
239 extractor_id: extractor_id.clone(),
240 retrieve_balances,
241 rpc_client: rpc_client.clone(),
242 include_snapshots,
243 deltas_client,
244 component_tracker: ComponentTracker::new(
245 extractor_id.chain,
246 extractor_id.name.as_str(),
247 component_filter,
248 rpc_client,
249 ),
250 max_retries,
251 retry_cooldown,
252 last_synced_block: None,
253 timeout,
254 include_tvl,
255 compression,
256 partial_blocks: false,
257 }
258 }
259
260 pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
262 self.partial_blocks = partial_blocks;
263 self
264 }
265
266 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
268 &mut self,
269 header: BlockHeader,
270 ids: Option<I>,
271 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
272 if !self.include_snapshots {
273 return Ok(StateSyncMessage { header, ..Default::default() });
274 }
275
276 let component_ids: Vec<_> = match ids {
278 Some(ids) => ids.into_iter().cloned().collect(),
279 None => self
280 .component_tracker
281 .get_tracked_component_ids(),
282 };
283
284 if component_ids.is_empty() {
285 return Ok(StateSyncMessage { header, ..Default::default() });
286 }
287
288 const DCI_PROTOCOLS: &[&str] = &[
290 "uniswap_v4_hooks",
291 "vm:curve",
292 "vm:balancer_v2",
293 "vm:balancer_v3",
294 "fluid_v1",
295 "erc4626",
296 ];
297 let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
298 let result = self
299 .rpc_client
300 .get_traced_entry_points_paginated(
301 self.extractor_id.chain,
302 &self.extractor_id.name,
303 &component_ids,
304 None,
305 RPC_CLIENT_CONCURRENCY,
306 )
307 .await?;
308 self.component_tracker
309 .process_entrypoints(&result.clone().into());
310 result.traced_entry_points.clone()
311 } else {
312 HashMap::new()
313 };
314
315 let contract_ids: Vec<Bytes> = self
317 .component_tracker
318 .get_contracts_by_component(&component_ids)
319 .into_iter()
320 .collect();
321
322 let filtered_components: HashMap<_, _> = self
324 .component_tracker
325 .components
326 .iter()
327 .filter(|(id, _)| component_ids.contains(id))
328 .map(|(k, v)| (k.clone(), v.clone()))
329 .collect();
330
331 let request = SnapshotParameters::new(
332 self.extractor_id.chain,
333 &self.extractor_id.name,
334 &filtered_components,
335 &contract_ids,
336 header.number,
337 )
338 .entrypoints(&entrypoints_result)
339 .include_balances(self.retrieve_balances)
340 .include_tvl(self.include_tvl);
341 let snapshot_response = self
342 .rpc_client
343 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
344 .await?;
345
346 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
347 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
348
349 Ok(StateSyncMessage {
350 header,
351 snapshots: snapshot_response,
352 deltas: None,
353 removed_components: HashMap::new(),
354 })
355 }
356
357 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
370 async fn state_sync(
371 &mut self,
372 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
373 mut end_rx: oneshot::Receiver<()>,
374 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
375 let subscription_options = SubscriptionOptions::new()
377 .with_state(self.include_snapshots)
378 .with_compression(self.compression)
379 .with_partial_blocks(self.partial_blocks);
380 let (subscription_id, mut msg_rx) = match self
381 .deltas_client
382 .subscribe(self.extractor_id.clone(), subscription_options)
383 .await
384 {
385 Ok(result) => result,
386 Err(e) => return Err((e.into(), Some(end_rx))),
387 };
388
389 let result = async {
390 info!("Waiting for deltas...");
391 let mut warned_waiting_for_new_block = false;
392 let mut warned_skipping_synced = false;
393 let mut last_block_number: Option<u64> = None;
395 let mut first_msg = loop {
396 let msg = select! {
397 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
398 deltas_result
399 .map_err(|_| {
400 SynchronizerError::Timeout(format!(
401 "First deltas took longer than {t}s to arrive",
402 t = self.timeout
403 ))
404 })?
405 .ok_or_else(|| {
406 SynchronizerError::ConnectionError(
407 "Deltas channel closed before first message".to_string(),
408 )
409 })?
410 },
411 _ = &mut end_rx => {
412 info!("Received close signal while waiting for first deltas");
413 return Ok(());
414 }
415 };
416
417 let incoming: BlockHeader = (&msg).into();
418
419 let is_new_block_candidate = if self.partial_blocks {
423 match msg.partial_block_index {
424 None => {
425 last_block_number = Some(incoming.number);
427 true
428 }
429 Some(current_partial_idx) => {
430 let is_new_block = last_block_number
431 .map(|prev_block| incoming.number > prev_block)
432 .unwrap_or(false);
433
434 if !warned_waiting_for_new_block {
435 info!(
436 extractor=%self.extractor_id,
437 block=incoming.number,
438 partial_idx=current_partial_idx,
439 "Syncing. Waiting for new block to start"
440 );
441 warned_waiting_for_new_block = true;
442 }
443 last_block_number = Some(incoming.number);
444 is_new_block
445 }
446 }
447 } else {
448 true };
450
451 if !is_new_block_candidate {
452 continue;
453 }
454
455 if let Some(current) = &self.last_synced_block {
457 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
458 if !warned_skipping_synced {
459 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
460 warned_skipping_synced = true;
461 }
462 continue;
463 }
464 }
465 break msg;
466 };
467
468 self.filter_deltas(&mut first_msg);
469
470 info!(height = first_msg.get_block().number, "First deltas received");
472 let header: BlockHeader = (&first_msg).into();
473 let deltas_msg = StateSyncMessage {
474 header: header.clone(),
475 snapshots: Default::default(),
476 deltas: Some(first_msg),
477 removed_components: Default::default(),
478 };
479
480 let msg = if !self.is_next_expected(&header) {
482 info!("Retrieving snapshot");
483 let snapshot_header = BlockHeader { revert: false, ..header.clone() };
484 let snapshot = self
485 .get_snapshots::<Vec<&String>>(
486 snapshot_header,
487 None,
488 )
489 .await?
490 .merge(deltas_msg);
491 let n_components = self.component_tracker.components.len();
492 let n_snapshots = snapshot.snapshots.states.len();
493 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
494 snapshot
495 } else {
496 deltas_msg
497 };
498 block_tx.send(Ok(msg)).await?;
499 self.last_synced_block = Some(header.clone());
500 loop {
501 select! {
502 deltas_opt = msg_rx.recv() => {
503 if let Some(mut deltas) = deltas_opt {
504 let header: BlockHeader = (&deltas).into();
505 debug!(block_number=?header.number, "Received delta message");
506
507 let (snapshots, removed_components) = {
508 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
511
512 let requiring_snapshot: Vec<_> = to_add
514 .iter()
515 .filter(|id| {
516 !self.component_tracker
517 .components
518 .contains_key(id.as_str())
519 })
520 .collect();
521 debug!(components=?requiring_snapshot, "SnapshotRequest");
522 self.component_tracker
523 .start_tracking(requiring_snapshot.as_slice())
524 .await?;
525
526 let snapshots = self
527 .get_snapshots(header.clone(), Some(requiring_snapshot))
528 .await?
529 .snapshots;
530
531 let removed_components = if !to_remove.is_empty() {
532 self.component_tracker.stop_tracking(&to_remove)
533 } else {
534 Default::default()
535 };
536
537 (snapshots, removed_components)
538 };
539
540 self.component_tracker.process_entrypoints(&deltas.dci_update);
542
543 self.filter_deltas(&mut deltas);
545 let n_changes = deltas.n_changes();
546
547 let next = StateSyncMessage {
549 header: header.clone(),
550 snapshots,
551 deltas: Some(deltas),
552 removed_components,
553 };
554 block_tx.send(Ok(next)).await?;
555 self.last_synced_block = Some(header.clone());
556
557 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
558 } else {
559 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
560 }
561 },
562 _ = &mut end_rx => {
563 info!("Received close signal during state_sync");
564 return Ok(());
565 }
566 }
567 }
568 }.await;
569
570 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
572 let _ = self
574 .deltas_client
575 .unsubscribe(subscription_id)
576 .await
577 .map_err(|err| {
578 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
579 });
580
581 match result {
584 Ok(()) => Ok(()), Err(e) => {
586 Err((e, Some(end_rx)))
590 }
591 }
592 }
593
594 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
595 if let Some(block) = self.last_synced_block.as_ref() {
596 return incoming.parent_hash == block.hash;
597 }
598 false
599 }
600 fn filter_deltas(&self, deltas: &mut BlockChanges) {
601 deltas.filter_by_component(|id| {
602 self.component_tracker
603 .components
604 .contains_key(id)
605 });
606 deltas.filter_by_contract(|id| {
607 self.component_tracker
608 .contracts
609 .contains(id)
610 });
611 }
612}
613
614#[async_trait]
615impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
616where
617 R: RPCClient + Clone + Send + Sync + 'static,
618 D: DeltasClient + Clone + Send + Sync + 'static,
619{
620 async fn initialize(&mut self) -> SyncResult<()> {
621 info!("Retrieving relevant protocol components");
622 self.component_tracker
623 .initialise_components()
624 .await?;
625 info!(
626 n_components = self.component_tracker.components.len(),
627 n_contracts = self.component_tracker.contracts.len(),
628 "Finished retrieving components",
629 );
630
631 Ok(())
632 }
633
634 async fn start(
635 mut self,
636 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
637 let (mut tx, rx) = channel(15);
638 let (end_tx, end_rx) = oneshot::channel::<()>();
639
640 let jh = tokio::spawn(async move {
641 let mut retry_count = 0;
642 let mut current_end_rx = end_rx;
643 let mut final_error = None;
644
645 while retry_count < self.max_retries {
646 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
647
648 let res = self
649 .state_sync(&mut tx, current_end_rx)
650 .await;
651 match res {
652 Ok(()) => {
653 info!(
654 extractor_id=%&self.extractor_id,
655 retry_count,
656 "State synchronization exited cleanly"
657 );
658 return;
659 }
660 Err((e, maybe_end_rx)) => {
661 warn!(
662 extractor_id=%&self.extractor_id,
663 retry_count,
664 error=%e,
665 "State synchronization errored!"
666 );
667
668 if let Some(recovered_end_rx) = maybe_end_rx {
670 current_end_rx = recovered_end_rx;
671
672 if let SynchronizerError::ConnectionClosed = e {
673 error!(
675 "Websocket connection closed. State synchronization exiting."
676 );
677 let _ = tx.send(Err(e)).await;
678 return;
679 } else {
680 final_error = Some(e);
682 }
683 } else {
684 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
686 return;
687 }
688 }
689 }
690 sleep(self.retry_cooldown).await;
691 retry_count += 1;
692 }
693 if let Some(e) = final_error {
694 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
695 let _ = tx.send(Err(e)).await;
696 }
697 });
698
699 let handle = SynchronizerTaskHandle::new(jh, end_tx);
700 (handle, rx)
701 }
702}
703
704#[cfg(test)]
705mod test {
706 use std::{collections::HashSet, sync::Arc};
725
726 use tycho_common::dto::{
727 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
728 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
729 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
730 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
731 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
732 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
733 };
734 use uuid::Uuid;
735
736 use super::*;
737 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
738
739 struct ArcRPCClient<T>(Arc<T>);
741
742 impl<T> Clone for ArcRPCClient<T> {
744 fn clone(&self) -> Self {
745 ArcRPCClient(self.0.clone())
746 }
747 }
748
749 #[async_trait]
750 impl<T> RPCClient for ArcRPCClient<T>
751 where
752 T: RPCClient + Sync + Send + 'static,
753 {
754 async fn get_tokens(
755 &self,
756 request: &TokensRequestBody,
757 ) -> Result<TokensRequestResponse, RPCError> {
758 self.0.get_tokens(request).await
759 }
760
761 async fn get_contract_state(
762 &self,
763 request: &StateRequestBody,
764 ) -> Result<StateRequestResponse, RPCError> {
765 self.0.get_contract_state(request).await
766 }
767
768 async fn get_protocol_components(
769 &self,
770 request: &ProtocolComponentsRequestBody,
771 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
772 self.0
773 .get_protocol_components(request)
774 .await
775 }
776
777 async fn get_protocol_states(
778 &self,
779 request: &ProtocolStateRequestBody,
780 ) -> Result<ProtocolStateRequestResponse, RPCError> {
781 self.0
782 .get_protocol_states(request)
783 .await
784 }
785
786 async fn get_protocol_systems(
787 &self,
788 request: &ProtocolSystemsRequestBody,
789 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
790 self.0
791 .get_protocol_systems(request)
792 .await
793 }
794
795 async fn get_component_tvl(
796 &self,
797 request: &ComponentTvlRequestBody,
798 ) -> Result<ComponentTvlRequestResponse, RPCError> {
799 self.0.get_component_tvl(request).await
800 }
801
802 async fn get_traced_entry_points(
803 &self,
804 request: &TracedEntryPointRequestBody,
805 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
806 self.0
807 .get_traced_entry_points(request)
808 .await
809 }
810
811 async fn get_snapshots<'a>(
812 &self,
813 request: &SnapshotParameters<'a>,
814 chunk_size: Option<usize>,
815 concurrency: usize,
816 ) -> Result<Snapshot, RPCError> {
817 self.0
818 .get_snapshots(request, chunk_size, concurrency)
819 .await
820 }
821
822 fn compression(&self) -> bool {
823 self.0.compression()
824 }
825 }
826
827 struct ArcDeltasClient<T>(Arc<T>);
829
830 impl<T> Clone for ArcDeltasClient<T> {
832 fn clone(&self) -> Self {
833 ArcDeltasClient(self.0.clone())
834 }
835 }
836
837 #[async_trait]
838 impl<T> DeltasClient for ArcDeltasClient<T>
839 where
840 T: DeltasClient + Sync + Send + 'static,
841 {
842 async fn subscribe(
843 &self,
844 extractor_id: ExtractorIdentity,
845 options: SubscriptionOptions,
846 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
847 self.0
848 .subscribe(extractor_id, options)
849 .await
850 }
851
852 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
853 self.0
854 .unsubscribe(subscription_id)
855 .await
856 }
857
858 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
859 self.0.connect().await
860 }
861
862 async fn close(&self) -> Result<(), DeltasError> {
863 self.0.close().await
864 }
865 }
866
867 fn with_mocked_clients(
868 native: bool,
869 include_tvl: bool,
870 rpc_client: Option<MockRPCClient>,
871 deltas_client: Option<MockDeltasClient>,
872 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
873 {
874 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
875 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
876
877 ProtocolStateSynchronizer::new(
878 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
879 native,
880 ComponentFilter::with_tvl_range(50.0, 50.0),
881 1,
882 Duration::from_secs(0),
883 true,
884 include_tvl,
885 true, rpc_client,
887 deltas_client,
888 10_u64,
889 )
890 }
891
892 fn state_snapshot_native() -> ProtocolStateRequestResponse {
893 ProtocolStateRequestResponse {
894 states: vec![ResponseProtocolState {
895 component_id: "Component1".to_string(),
896 ..Default::default()
897 }],
898 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
899 }
900 }
901
902 fn make_mock_client() -> MockRPCClient {
903 let mut m = MockRPCClient::new();
904 m.expect_compression()
905 .return_const(false);
906 m
907 }
908
909 #[test_log::test(tokio::test)]
910 async fn test_get_snapshots_native() {
911 let header = BlockHeader::default();
912 let mut rpc = make_mock_client();
913 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
914
915 let component_clone = component.clone();
916 rpc.expect_get_snapshots()
917 .returning(move |_request, _chunk_size, _concurrency| {
918 Ok(Snapshot {
919 states: state_snapshot_native()
920 .states
921 .into_iter()
922 .map(|state| {
923 (
924 state.component_id.clone(),
925 ComponentWithState {
926 state,
927 component: component_clone.clone(),
928 entrypoints: vec![],
929 component_tvl: None,
930 },
931 )
932 })
933 .collect(),
934 vm_storage: HashMap::new(),
935 })
936 });
937
938 rpc.expect_get_traced_entry_points()
939 .returning(|_| {
940 Ok(TracedEntryPointRequestResponse {
941 traced_entry_points: HashMap::new(),
942 pagination: PaginationResponse::new(0, 20, 0),
943 })
944 });
945
946 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
947 state_sync
948 .component_tracker
949 .components
950 .insert("Component1".to_string(), component.clone());
951 let components_arg = ["Component1".to_string()];
952 let exp = StateSyncMessage {
953 header: header.clone(),
954 snapshots: Snapshot {
955 states: state_snapshot_native()
956 .states
957 .into_iter()
958 .map(|state| {
959 (
960 state.component_id.clone(),
961 ComponentWithState {
962 state,
963 component: component.clone(),
964 entrypoints: vec![],
965 component_tvl: None,
966 },
967 )
968 })
969 .collect(),
970 vm_storage: HashMap::new(),
971 },
972 deltas: None,
973 removed_components: Default::default(),
974 };
975
976 let snap = state_sync
977 .get_snapshots(header, Some(&components_arg))
978 .await
979 .expect("Retrieving snapshot failed");
980
981 assert_eq!(snap, exp);
982 }
983
984 #[test_log::test(tokio::test)]
985 async fn test_get_snapshots_native_with_tvl() {
986 let header = BlockHeader::default();
987 let mut rpc = make_mock_client();
988 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
989
990 let component_clone = component.clone();
991 rpc.expect_get_snapshots()
992 .returning(move |_request, _chunk_size, _concurrency| {
993 Ok(Snapshot {
994 states: state_snapshot_native()
995 .states
996 .into_iter()
997 .map(|state| {
998 (
999 state.component_id.clone(),
1000 ComponentWithState {
1001 state,
1002 component: component_clone.clone(),
1003 component_tvl: Some(100.0),
1004 entrypoints: vec![],
1005 },
1006 )
1007 })
1008 .collect(),
1009 vm_storage: HashMap::new(),
1010 })
1011 });
1012
1013 rpc.expect_get_traced_entry_points()
1014 .returning(|_| {
1015 Ok(TracedEntryPointRequestResponse {
1016 traced_entry_points: HashMap::new(),
1017 pagination: PaginationResponse::new(0, 20, 0),
1018 })
1019 });
1020
1021 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1022 state_sync
1023 .component_tracker
1024 .components
1025 .insert("Component1".to_string(), component.clone());
1026 let components_arg = ["Component1".to_string()];
1027 let exp = StateSyncMessage {
1028 header: header.clone(),
1029 snapshots: Snapshot {
1030 states: state_snapshot_native()
1031 .states
1032 .into_iter()
1033 .map(|state| {
1034 (
1035 state.component_id.clone(),
1036 ComponentWithState {
1037 state,
1038 component: component.clone(),
1039 component_tvl: Some(100.0),
1040 entrypoints: vec![],
1041 },
1042 )
1043 })
1044 .collect(),
1045 vm_storage: HashMap::new(),
1046 },
1047 deltas: None,
1048 removed_components: Default::default(),
1049 };
1050
1051 let snap = state_sync
1052 .get_snapshots(header, Some(&components_arg))
1053 .await
1054 .expect("Retrieving snapshot failed");
1055
1056 assert_eq!(snap, exp);
1057 }
1058
1059 fn state_snapshot_vm() -> StateRequestResponse {
1060 StateRequestResponse {
1061 accounts: vec![
1062 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1063 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1064 ],
1065 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1066 }
1067 }
1068
1069 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1070 TracedEntryPointRequestResponse {
1071 traced_entry_points: HashMap::from([(
1072 "Component1".to_string(),
1073 vec![(
1074 EntryPointWithTracingParams {
1075 entry_point: EntryPoint {
1076 external_id: "entrypoint_a".to_string(),
1077 target: Bytes::from("0x0badc0ffee"),
1078 signature: "sig()".to_string(),
1079 },
1080 params: TracingParams::RPCTracer(RPCTracerParams {
1081 caller: Some(Bytes::from("0x0badc0ffee")),
1082 calldata: Bytes::from("0x0badc0ffee"),
1083 state_overrides: None,
1084 prune_addresses: None,
1085 }),
1086 },
1087 TracingResult {
1088 retriggers: HashSet::from([(
1089 Bytes::from("0x0badc0ffee"),
1090 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1091 )]),
1092 accessed_slots: HashMap::from([(
1093 Bytes::from("0x0badc0ffee"),
1094 HashSet::from([Bytes::from("0xbadbeef0")]),
1095 )]),
1096 },
1097 )],
1098 )]),
1099 pagination: PaginationResponse::new(0, 20, 0),
1100 }
1101 }
1102
1103 #[test_log::test(tokio::test)]
1104 async fn test_get_snapshots_vm() {
1105 let header = BlockHeader::default();
1106 let mut rpc = make_mock_client();
1107
1108 let traced_ep_response = traced_entry_point_response();
1109 rpc.expect_get_snapshots()
1110 .returning(move |_request, _chunk_size, _concurrency| {
1111 let vm_storage_accounts = state_snapshot_vm();
1112 Ok(Snapshot {
1113 states: [(
1114 "Component1".to_string(),
1115 ComponentWithState {
1116 state: ResponseProtocolState {
1117 component_id: "Component1".to_string(),
1118 ..Default::default()
1119 },
1120 component: ProtocolComponent {
1121 id: "Component1".to_string(),
1122 contract_ids: vec![
1123 Bytes::from("0x0badc0ffee"),
1124 Bytes::from("0xbabe42"),
1125 ],
1126 ..Default::default()
1127 },
1128 component_tvl: None,
1129 entrypoints: traced_ep_response
1130 .traced_entry_points
1131 .get("Component1")
1132 .cloned()
1133 .unwrap_or_default(),
1134 },
1135 )]
1136 .into_iter()
1137 .collect(),
1138 vm_storage: vm_storage_accounts
1139 .accounts
1140 .into_iter()
1141 .map(|state| (state.address.clone(), state))
1142 .collect(),
1143 })
1144 });
1145
1146 rpc.expect_get_traced_entry_points()
1147 .returning(|_| Ok(traced_entry_point_response()));
1148
1149 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1150 let component = ProtocolComponent {
1151 id: "Component1".to_string(),
1152 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1153 ..Default::default()
1154 };
1155 state_sync
1156 .component_tracker
1157 .components
1158 .insert("Component1".to_string(), component.clone());
1159 let components_arg = ["Component1".to_string()];
1160 let exp = StateSyncMessage {
1161 header: header.clone(),
1162 snapshots: Snapshot {
1163 states: [(
1164 component.id.clone(),
1165 ComponentWithState {
1166 state: ResponseProtocolState {
1167 component_id: "Component1".to_string(),
1168 ..Default::default()
1169 },
1170 component: component.clone(),
1171 component_tvl: None,
1172 entrypoints: vec![(
1173 EntryPointWithTracingParams {
1174 entry_point: EntryPoint {
1175 external_id: "entrypoint_a".to_string(),
1176 target: Bytes::from("0x0badc0ffee"),
1177 signature: "sig()".to_string(),
1178 },
1179 params: TracingParams::RPCTracer(RPCTracerParams {
1180 caller: Some(Bytes::from("0x0badc0ffee")),
1181 calldata: Bytes::from("0x0badc0ffee"),
1182 state_overrides: None,
1183 prune_addresses: None,
1184 }),
1185 },
1186 TracingResult {
1187 retriggers: HashSet::from([(
1188 Bytes::from("0x0badc0ffee"),
1189 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1190 )]),
1191 accessed_slots: HashMap::from([(
1192 Bytes::from("0x0badc0ffee"),
1193 HashSet::from([Bytes::from("0xbadbeef0")]),
1194 )]),
1195 },
1196 )],
1197 },
1198 )]
1199 .into_iter()
1200 .collect(),
1201 vm_storage: state_snapshot_vm()
1202 .accounts
1203 .into_iter()
1204 .map(|state| (state.address.clone(), state))
1205 .collect(),
1206 },
1207 deltas: None,
1208 removed_components: Default::default(),
1209 };
1210
1211 let snap = state_sync
1212 .get_snapshots(header, Some(&components_arg))
1213 .await
1214 .expect("Retrieving snapshot failed");
1215
1216 assert_eq!(snap, exp);
1217 }
1218
1219 #[test_log::test(tokio::test)]
1220 async fn test_get_snapshots_vm_with_tvl() {
1221 let header = BlockHeader::default();
1222 let mut rpc = make_mock_client();
1223 let component = ProtocolComponent {
1224 id: "Component1".to_string(),
1225 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1226 ..Default::default()
1227 };
1228
1229 let component_clone = component.clone();
1230 rpc.expect_get_snapshots()
1231 .returning(move |_request, _chunk_size, _concurrency| {
1232 let vm_storage_accounts = state_snapshot_vm();
1233 Ok(Snapshot {
1234 states: [(
1235 "Component1".to_string(),
1236 ComponentWithState {
1237 state: ResponseProtocolState {
1238 component_id: "Component1".to_string(),
1239 ..Default::default()
1240 },
1241 component: component_clone.clone(),
1242 component_tvl: Some(100.0),
1243 entrypoints: vec![],
1244 },
1245 )]
1246 .into_iter()
1247 .collect(),
1248 vm_storage: vm_storage_accounts
1249 .accounts
1250 .into_iter()
1251 .map(|state| (state.address.clone(), state))
1252 .collect(),
1253 })
1254 });
1255
1256 rpc.expect_get_traced_entry_points()
1257 .returning(|_| {
1258 Ok(TracedEntryPointRequestResponse {
1259 traced_entry_points: HashMap::new(),
1260 pagination: PaginationResponse::new(0, 20, 0),
1261 })
1262 });
1263
1264 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1265 state_sync
1266 .component_tracker
1267 .components
1268 .insert("Component1".to_string(), component.clone());
1269 let components_arg = ["Component1".to_string()];
1270 let exp = StateSyncMessage {
1271 header: header.clone(),
1272 snapshots: Snapshot {
1273 states: [(
1274 component.id.clone(),
1275 ComponentWithState {
1276 state: ResponseProtocolState {
1277 component_id: "Component1".to_string(),
1278 ..Default::default()
1279 },
1280 component: component.clone(),
1281 component_tvl: Some(100.0),
1282 entrypoints: vec![],
1283 },
1284 )]
1285 .into_iter()
1286 .collect(),
1287 vm_storage: state_snapshot_vm()
1288 .accounts
1289 .into_iter()
1290 .map(|state| (state.address.clone(), state))
1291 .collect(),
1292 },
1293 deltas: None,
1294 removed_components: Default::default(),
1295 };
1296
1297 let snap = state_sync
1298 .get_snapshots(header, Some(&components_arg))
1299 .await
1300 .expect("Retrieving snapshot failed");
1301
1302 assert_eq!(snap, exp);
1303 }
1304
1305 #[test_log::test(tokio::test)]
1309 async fn test_get_snapshots_filters_to_requested_components_only() {
1310 let header = BlockHeader::default();
1311 let mut rpc = make_mock_client();
1312
1313 let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1315 let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1316 let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1317
1318 let component2_clone = component2.clone();
1319
1320 rpc.expect_get_snapshots()
1322 .withf(
1323 |request: &SnapshotParameters,
1324 _chunk_size: &Option<usize>,
1325 _concurrency: &usize| {
1326 request.components.len() == 1 &&
1328 request
1329 .components
1330 .contains_key("Component2")
1331 },
1332 )
1333 .times(1)
1334 .returning(move |_request, _chunk_size, _concurrency| {
1335 Ok(Snapshot {
1336 states: [(
1337 "Component2".to_string(),
1338 ComponentWithState {
1339 state: ResponseProtocolState {
1340 component_id: "Component2".to_string(),
1341 ..Default::default()
1342 },
1343 component: component2_clone.clone(),
1344 entrypoints: vec![],
1345 component_tvl: None,
1346 },
1347 )]
1348 .into_iter()
1349 .collect(),
1350 vm_storage: HashMap::new(),
1351 })
1352 });
1353
1354 rpc.expect_get_traced_entry_points()
1355 .returning(|_| {
1356 Ok(TracedEntryPointRequestResponse {
1357 traced_entry_points: HashMap::new(),
1358 pagination: PaginationResponse::new(0, 20, 0),
1359 })
1360 });
1361
1362 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1363
1364 state_sync
1366 .component_tracker
1367 .components
1368 .insert("Component1".to_string(), component1.clone());
1369 state_sync
1370 .component_tracker
1371 .components
1372 .insert("Component2".to_string(), component2.clone());
1373 state_sync
1374 .component_tracker
1375 .components
1376 .insert("Component3".to_string(), component3.clone());
1377
1378 let components_arg = ["Component2".to_string()];
1380
1381 let snap = state_sync
1382 .get_snapshots(header.clone(), Some(&components_arg))
1383 .await
1384 .expect("Retrieving snapshot failed");
1385
1386 assert_eq!(snap.snapshots.states.len(), 1);
1388 assert!(snap
1389 .snapshots
1390 .states
1391 .contains_key("Component2"));
1392 assert!(!snap
1393 .snapshots
1394 .states
1395 .contains_key("Component1"));
1396 assert!(!snap
1397 .snapshots
1398 .states
1399 .contains_key("Component3"));
1400 }
1401
1402 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1403 let mut rpc_client = make_mock_client();
1404 rpc_client
1407 .expect_get_protocol_components()
1408 .with(mockall::predicate::function(
1409 move |request_params: &ProtocolComponentsRequestBody| {
1410 if let Some(ids) = request_params.component_ids.as_ref() {
1411 ids.contains(&"Component3".to_string())
1412 } else {
1413 false
1414 }
1415 },
1416 ))
1417 .returning(|_| {
1418 Ok(ProtocolComponentRequestResponse {
1420 protocol_components: vec![
1421 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1423 ],
1424 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1425 })
1426 });
1427 rpc_client
1429 .expect_get_snapshots()
1430 .withf(
1431 |request: &SnapshotParameters,
1432 _chunk_size: &Option<usize>,
1433 _concurrency: &usize| {
1434 request
1435 .components
1436 .contains_key("Component3")
1437 },
1438 )
1439 .returning(|_request, _chunk_size, _concurrency| {
1440 Ok(Snapshot {
1441 states: [(
1442 "Component3".to_string(),
1443 ComponentWithState {
1444 state: ResponseProtocolState {
1445 component_id: "Component3".to_string(),
1446 ..Default::default()
1447 },
1448 component: ProtocolComponent {
1449 id: "Component3".to_string(),
1450 ..Default::default()
1451 },
1452 component_tvl: Some(1000.0),
1453 entrypoints: vec![],
1454 },
1455 )]
1456 .into_iter()
1457 .collect(),
1458 vm_storage: HashMap::new(),
1459 })
1460 });
1461
1462 rpc_client
1464 .expect_get_protocol_components()
1465 .returning(|_| {
1466 Ok(ProtocolComponentRequestResponse {
1468 protocol_components: vec![
1469 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1471 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1473 ],
1475 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1476 })
1477 });
1478
1479 rpc_client
1480 .expect_get_snapshots()
1481 .returning(|_request, _chunk_size, _concurrency| {
1482 Ok(Snapshot {
1483 states: [
1484 (
1485 "Component1".to_string(),
1486 ComponentWithState {
1487 state: ResponseProtocolState {
1488 component_id: "Component1".to_string(),
1489 ..Default::default()
1490 },
1491 component: ProtocolComponent {
1492 id: "Component1".to_string(),
1493 ..Default::default()
1494 },
1495 component_tvl: Some(100.0),
1496 entrypoints: vec![],
1497 },
1498 ),
1499 (
1500 "Component2".to_string(),
1501 ComponentWithState {
1502 state: ResponseProtocolState {
1503 component_id: "Component2".to_string(),
1504 ..Default::default()
1505 },
1506 component: ProtocolComponent {
1507 id: "Component2".to_string(),
1508 ..Default::default()
1509 },
1510 component_tvl: Some(0.0),
1511 entrypoints: vec![],
1512 },
1513 ),
1514 ]
1515 .into_iter()
1516 .collect(),
1517 vm_storage: HashMap::new(),
1518 })
1519 });
1520
1521 rpc_client
1523 .expect_get_traced_entry_points()
1524 .returning(|_| {
1525 Ok(TracedEntryPointRequestResponse {
1526 traced_entry_points: HashMap::new(),
1527 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1528 })
1529 });
1530
1531 let mut deltas_client = MockDeltasClient::new();
1533 let (tx, rx) = channel(1);
1534 deltas_client
1535 .expect_subscribe()
1536 .return_once(move |_, _| {
1537 Ok((Uuid::default(), rx))
1539 });
1540
1541 deltas_client
1543 .expect_unsubscribe()
1544 .return_once(|_| Ok(()));
1545
1546 (rpc_client, deltas_client, tx)
1547 }
1548
1549 #[test_log::test(tokio::test)]
1556 async fn test_state_sync() {
1557 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1558 let deltas = [
1559 BlockChanges {
1560 extractor: "uniswap-v2".to_string(),
1561 chain: Chain::Ethereum,
1562 block: Block {
1563 number: 1,
1564 hash: Bytes::from("0x01"),
1565 parent_hash: Bytes::from("0x00"),
1566 chain: Chain::Ethereum,
1567 ts: Default::default(),
1568 },
1569 revert: false,
1570 dci_update: DCIUpdate {
1571 new_entrypoints: HashMap::from([(
1572 "Component1".to_string(),
1573 HashSet::from([EntryPoint {
1574 external_id: "entrypoint_a".to_string(),
1575 target: Bytes::from("0x0badc0ffee"),
1576 signature: "sig()".to_string(),
1577 }]),
1578 )]),
1579 new_entrypoint_params: HashMap::from([(
1580 "entrypoint_a".to_string(),
1581 HashSet::from([(
1582 TracingParams::RPCTracer(RPCTracerParams {
1583 caller: Some(Bytes::from("0x0badc0ffee")),
1584 calldata: Bytes::from("0x0badc0ffee"),
1585 state_overrides: None,
1586 prune_addresses: None,
1587 }),
1588 "Component1".to_string(),
1589 )]),
1590 )]),
1591 trace_results: HashMap::from([(
1592 "entrypoint_a".to_string(),
1593 TracingResult {
1594 retriggers: HashSet::from([(
1595 Bytes::from("0x0badc0ffee"),
1596 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1597 )]),
1598 accessed_slots: HashMap::from([(
1599 Bytes::from("0x0badc0ffee"),
1600 HashSet::from([Bytes::from("0xbadbeef0")]),
1601 )]),
1602 },
1603 )]),
1604 },
1605 ..Default::default()
1606 },
1607 BlockChanges {
1608 extractor: "uniswap-v2".to_string(),
1609 chain: Chain::Ethereum,
1610 block: Block {
1611 number: 2,
1612 hash: Bytes::from("0x02"),
1613 parent_hash: Bytes::from("0x01"),
1614 chain: Chain::Ethereum,
1615 ts: Default::default(),
1616 },
1617 revert: false,
1618 component_tvl: [
1619 ("Component1".to_string(), 100.0),
1620 ("Component2".to_string(), 0.0),
1621 ("Component3".to_string(), 1000.0),
1622 ]
1623 .into_iter()
1624 .collect(),
1625 ..Default::default()
1626 },
1627 ];
1628 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1629 state_sync
1630 .initialize()
1631 .await
1632 .expect("Init failed");
1633
1634 let (handle, mut rx) = state_sync.start().await;
1636 let (jh, close_tx) = handle.split();
1637 tx.send(deltas[0].clone())
1638 .await
1639 .expect("deltas channel msg 0 closed!");
1640 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1641 .await
1642 .expect("waiting for first state msg timed out!")
1643 .expect("state sync block sender closed!");
1644 tx.send(deltas[1].clone())
1645 .await
1646 .expect("deltas channel msg 1 closed!");
1647 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1648 .await
1649 .expect("waiting for second state msg timed out!")
1650 .expect("state sync block sender closed!");
1651 let _ = close_tx.send(());
1652 jh.await
1653 .expect("state sync task panicked!");
1654
1655 let exp1 = StateSyncMessage {
1657 header: BlockHeader {
1658 number: 1,
1659 hash: Bytes::from("0x01"),
1660 parent_hash: Bytes::from("0x00"),
1661 revert: false,
1662 ..Default::default()
1663 },
1664 snapshots: Snapshot {
1665 states: [
1666 (
1667 "Component1".to_string(),
1668 ComponentWithState {
1669 state: ResponseProtocolState {
1670 component_id: "Component1".to_string(),
1671 ..Default::default()
1672 },
1673 component: ProtocolComponent {
1674 id: "Component1".to_string(),
1675 ..Default::default()
1676 },
1677 component_tvl: Some(100.0),
1678 entrypoints: vec![],
1679 },
1680 ),
1681 (
1682 "Component2".to_string(),
1683 ComponentWithState {
1684 state: ResponseProtocolState {
1685 component_id: "Component2".to_string(),
1686 ..Default::default()
1687 },
1688 component: ProtocolComponent {
1689 id: "Component2".to_string(),
1690 ..Default::default()
1691 },
1692 component_tvl: Some(0.0),
1693 entrypoints: vec![],
1694 },
1695 ),
1696 ]
1697 .into_iter()
1698 .collect(),
1699 vm_storage: HashMap::new(),
1700 },
1701 deltas: Some(deltas[0].clone()),
1702 removed_components: Default::default(),
1703 };
1704
1705 let exp2 = StateSyncMessage {
1706 header: BlockHeader {
1707 number: 2,
1708 hash: Bytes::from("0x02"),
1709 parent_hash: Bytes::from("0x01"),
1710 revert: false,
1711 ..Default::default()
1712 },
1713 snapshots: Snapshot {
1714 states: [
1715 (
1717 "Component3".to_string(),
1718 ComponentWithState {
1719 state: ResponseProtocolState {
1720 component_id: "Component3".to_string(),
1721 ..Default::default()
1722 },
1723 component: ProtocolComponent {
1724 id: "Component3".to_string(),
1725 ..Default::default()
1726 },
1727 component_tvl: Some(1000.0),
1728 entrypoints: vec![],
1729 },
1730 ),
1731 ]
1732 .into_iter()
1733 .collect(),
1734 vm_storage: HashMap::new(),
1735 },
1736 deltas: Some(BlockChanges {
1739 extractor: "uniswap-v2".to_string(),
1740 chain: Chain::Ethereum,
1741 block: Block {
1742 number: 2,
1743 hash: Bytes::from("0x02"),
1744 parent_hash: Bytes::from("0x01"),
1745 chain: Chain::Ethereum,
1746 ts: Default::default(),
1747 },
1748 revert: false,
1749 component_tvl: [
1750 ("Component1".to_string(), 100.0),
1752 ("Component3".to_string(), 1000.0),
1753 ]
1754 .into_iter()
1755 .collect(),
1756 ..Default::default()
1757 }),
1758 removed_components: [(
1760 "Component2".to_string(),
1761 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1762 )]
1763 .into_iter()
1764 .collect(),
1765 };
1766 assert_eq!(first_msg.unwrap(), exp1);
1767 assert_eq!(second_msg.unwrap(), exp2);
1768 }
1769
1770 #[test_log::test(tokio::test)]
1771 async fn test_state_sync_with_tvl_range() {
1772 let remove_tvl_threshold = 5.0;
1774 let add_tvl_threshold = 7.0;
1775
1776 let mut rpc_client = make_mock_client();
1777 let mut deltas_client = MockDeltasClient::new();
1778
1779 rpc_client
1780 .expect_get_protocol_components()
1781 .with(mockall::predicate::function(
1782 move |request_params: &ProtocolComponentsRequestBody| {
1783 if let Some(ids) = request_params.component_ids.as_ref() {
1784 ids.contains(&"Component3".to_string())
1785 } else {
1786 false
1787 }
1788 },
1789 ))
1790 .returning(|_| {
1791 Ok(ProtocolComponentRequestResponse {
1792 protocol_components: vec![ProtocolComponent {
1793 id: "Component3".to_string(),
1794 ..Default::default()
1795 }],
1796 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1797 })
1798 });
1799 rpc_client
1801 .expect_get_snapshots()
1802 .withf(
1803 |request: &SnapshotParameters,
1804 _chunk_size: &Option<usize>,
1805 _concurrency: &usize| {
1806 request
1807 .components
1808 .contains_key("Component3")
1809 },
1810 )
1811 .returning(|_request, _chunk_size, _concurrency| {
1812 Ok(Snapshot {
1813 states: [(
1814 "Component3".to_string(),
1815 ComponentWithState {
1816 state: ResponseProtocolState {
1817 component_id: "Component3".to_string(),
1818 ..Default::default()
1819 },
1820 component: ProtocolComponent {
1821 id: "Component3".to_string(),
1822 ..Default::default()
1823 },
1824 component_tvl: Some(10.0),
1825 entrypoints: vec![],
1826 },
1827 )]
1828 .into_iter()
1829 .collect(),
1830 vm_storage: HashMap::new(),
1831 })
1832 });
1833
1834 rpc_client
1836 .expect_get_protocol_components()
1837 .returning(|_| {
1838 Ok(ProtocolComponentRequestResponse {
1839 protocol_components: vec![
1840 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1841 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1842 ],
1843 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1844 })
1845 });
1846
1847 rpc_client
1849 .expect_get_snapshots()
1850 .returning(|_request, _chunk_size, _concurrency| {
1851 Ok(Snapshot {
1852 states: [
1853 (
1854 "Component1".to_string(),
1855 ComponentWithState {
1856 state: ResponseProtocolState {
1857 component_id: "Component1".to_string(),
1858 ..Default::default()
1859 },
1860 component: ProtocolComponent {
1861 id: "Component1".to_string(),
1862 ..Default::default()
1863 },
1864 component_tvl: Some(6.0),
1865 entrypoints: vec![],
1866 },
1867 ),
1868 (
1869 "Component2".to_string(),
1870 ComponentWithState {
1871 state: ResponseProtocolState {
1872 component_id: "Component2".to_string(),
1873 ..Default::default()
1874 },
1875 component: ProtocolComponent {
1876 id: "Component2".to_string(),
1877 ..Default::default()
1878 },
1879 component_tvl: Some(2.0),
1880 entrypoints: vec![],
1881 },
1882 ),
1883 ]
1884 .into_iter()
1885 .collect(),
1886 vm_storage: HashMap::new(),
1887 })
1888 });
1889
1890 rpc_client
1892 .expect_get_traced_entry_points()
1893 .returning(|_| {
1894 Ok(TracedEntryPointRequestResponse {
1895 traced_entry_points: HashMap::new(),
1896 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1897 })
1898 });
1899
1900 let (tx, rx) = channel(1);
1901 deltas_client
1902 .expect_subscribe()
1903 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1904
1905 deltas_client
1907 .expect_unsubscribe()
1908 .return_once(|_| Ok(()));
1909
1910 let mut state_sync = ProtocolStateSynchronizer::new(
1911 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1912 true,
1913 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1914 1,
1915 Duration::from_secs(0),
1916 true,
1917 true,
1918 true,
1919 ArcRPCClient(Arc::new(rpc_client)),
1920 ArcDeltasClient(Arc::new(deltas_client)),
1921 10_u64,
1922 );
1923 state_sync
1924 .initialize()
1925 .await
1926 .expect("Init failed");
1927
1928 let deltas = [
1930 BlockChanges {
1931 extractor: "uniswap-v2".to_string(),
1932 chain: Chain::Ethereum,
1933 block: Block {
1934 number: 1,
1935 hash: Bytes::from("0x01"),
1936 parent_hash: Bytes::from("0x00"),
1937 chain: Chain::Ethereum,
1938 ts: Default::default(),
1939 },
1940 revert: false,
1941 ..Default::default()
1942 },
1943 BlockChanges {
1944 extractor: "uniswap-v2".to_string(),
1945 chain: Chain::Ethereum,
1946 block: Block {
1947 number: 2,
1948 hash: Bytes::from("0x02"),
1949 parent_hash: Bytes::from("0x01"),
1950 chain: Chain::Ethereum,
1951 ts: Default::default(),
1952 },
1953 revert: false,
1954 component_tvl: [
1955 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1959 .into_iter()
1960 .collect(),
1961 ..Default::default()
1962 },
1963 ];
1964
1965 let (handle, mut rx) = state_sync.start().await;
1966 let (jh, close_tx) = handle.split();
1967
1968 tx.send(deltas[0].clone())
1970 .await
1971 .expect("deltas channel msg 0 closed!");
1972
1973 let _ = timeout(Duration::from_millis(100), rx.recv())
1975 .await
1976 .expect("waiting for first state msg timed out!")
1977 .expect("state sync block sender closed!");
1978
1979 tx.send(deltas[1].clone())
1981 .await
1982 .expect("deltas channel msg 1 closed!");
1983 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1984 .await
1985 .expect("waiting for second state msg timed out!")
1986 .expect("state sync block sender closed!")
1987 .expect("no error");
1988
1989 let _ = close_tx.send(());
1990 jh.await
1991 .expect("state sync task panicked!");
1992
1993 let expected_second_msg = StateSyncMessage {
1994 header: BlockHeader {
1995 number: 2,
1996 hash: Bytes::from("0x02"),
1997 parent_hash: Bytes::from("0x01"),
1998 revert: false,
1999 ..Default::default()
2000 },
2001 snapshots: Snapshot {
2002 states: [(
2003 "Component3".to_string(),
2004 ComponentWithState {
2005 state: ResponseProtocolState {
2006 component_id: "Component3".to_string(),
2007 ..Default::default()
2008 },
2009 component: ProtocolComponent {
2010 id: "Component3".to_string(),
2011 ..Default::default()
2012 },
2013 component_tvl: Some(10.0),
2014 entrypoints: vec![], },
2016 )]
2017 .into_iter()
2018 .collect(),
2019 vm_storage: HashMap::new(),
2020 },
2021 deltas: Some(BlockChanges {
2022 extractor: "uniswap-v2".to_string(),
2023 chain: Chain::Ethereum,
2024 block: Block {
2025 number: 2,
2026 hash: Bytes::from("0x02"),
2027 parent_hash: Bytes::from("0x01"),
2028 chain: Chain::Ethereum,
2029 ts: Default::default(),
2030 },
2031 revert: false,
2032 component_tvl: [
2033 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
2036 .into_iter()
2037 .collect(),
2038 ..Default::default()
2039 }),
2040 removed_components: [(
2041 "Component2".to_string(),
2042 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2043 )]
2044 .into_iter()
2045 .collect(),
2046 };
2047
2048 assert_eq!(second_msg, expected_second_msg);
2049 }
2050
2051 #[test_log::test(tokio::test)]
2052 async fn test_public_close_api_functionality() {
2053 let mut rpc_client = make_mock_client();
2060 let mut deltas_client = MockDeltasClient::new();
2061
2062 rpc_client
2064 .expect_get_protocol_components()
2065 .returning(|_| {
2066 Ok(ProtocolComponentRequestResponse {
2067 protocol_components: vec![],
2068 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2069 })
2070 });
2071
2072 let (_tx, rx) = channel(1);
2074 deltas_client
2075 .expect_subscribe()
2076 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2077
2078 deltas_client
2080 .expect_unsubscribe()
2081 .return_once(|_| Ok(()));
2082
2083 let mut state_sync = ProtocolStateSynchronizer::new(
2084 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2085 true,
2086 ComponentFilter::with_tvl_range(0.0, 0.0),
2087 5, Duration::from_secs(0),
2089 true,
2090 false,
2091 true,
2092 ArcRPCClient(Arc::new(rpc_client)),
2093 ArcDeltasClient(Arc::new(deltas_client)),
2094 10000_u64, );
2096
2097 state_sync
2098 .initialize()
2099 .await
2100 .expect("Init should succeed");
2101
2102 let (handle, _rx) = state_sync.start().await;
2104 let (jh, close_tx) = handle.split();
2105
2106 tokio::time::sleep(Duration::from_millis(100)).await;
2108
2109 close_tx
2111 .send(())
2112 .expect("Should be able to send close signal");
2113 jh.await.expect("Task should not panic");
2115 }
2116
2117 #[test_log::test(tokio::test)]
2118 async fn test_cleanup_runs_when_state_sync_processing_errors() {
2119 let mut rpc_client = make_mock_client();
2124 let mut deltas_client = MockDeltasClient::new();
2125
2126 rpc_client
2128 .expect_get_protocol_components()
2129 .returning(|_| {
2130 Ok(ProtocolComponentRequestResponse {
2131 protocol_components: vec![],
2132 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2133 })
2134 });
2135
2136 rpc_client
2138 .expect_get_protocol_states()
2139 .returning(|_| {
2140 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2141 });
2142
2143 let (tx, rx) = channel(10);
2145 deltas_client
2146 .expect_subscribe()
2147 .return_once(move |_, _| {
2148 let delta = BlockChanges {
2150 extractor: "test".to_string(),
2151 chain: Chain::Ethereum,
2152 block: Block {
2153 hash: Bytes::from("0x0123"),
2154 number: 1,
2155 parent_hash: Bytes::from("0x0000"),
2156 chain: Chain::Ethereum,
2157 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2158 .unwrap()
2159 .naive_utc(),
2160 },
2161 revert: false,
2162 new_protocol_components: [(
2164 "new_component".to_string(),
2165 ProtocolComponent {
2166 id: "new_component".to_string(),
2167 protocol_system: "test_protocol".to_string(),
2168 protocol_type_name: "test".to_string(),
2169 chain: Chain::Ethereum,
2170 tokens: vec![Bytes::from("0x0badc0ffee")],
2171 contract_ids: vec![Bytes::from("0x0badc0ffee")],
2172 static_attributes: Default::default(),
2173 creation_tx: Default::default(),
2174 created_at: Default::default(),
2175 change: Default::default(),
2176 },
2177 )]
2178 .into_iter()
2179 .collect(),
2180 component_tvl: [("new_component".to_string(), 100.0)]
2181 .into_iter()
2182 .collect(),
2183 ..Default::default()
2184 };
2185
2186 tokio::spawn(async move {
2187 let _ = tx.send(delta).await;
2188 });
2190
2191 Ok((Uuid::default(), rx))
2192 });
2193
2194 deltas_client
2196 .expect_unsubscribe()
2197 .return_once(|_| Ok(()));
2198
2199 let mut state_sync = ProtocolStateSynchronizer::new(
2200 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2201 true,
2202 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2204 Duration::from_secs(0),
2205 true,
2206 false,
2207 true,
2208 ArcRPCClient(Arc::new(rpc_client)),
2209 ArcDeltasClient(Arc::new(deltas_client)),
2210 5000_u64,
2211 );
2212
2213 state_sync
2214 .initialize()
2215 .await
2216 .expect("Init should succeed");
2217
2218 state_sync.last_synced_block = Some(BlockHeader {
2220 hash: Bytes::from("0x0badc0ffee"),
2221 number: 42,
2222 parent_hash: Bytes::from("0xbadbeef0"),
2223 revert: false,
2224 timestamp: 123456789,
2225 partial_block_index: None,
2226 });
2227
2228 let (mut block_tx, _block_rx) = channel(10);
2230
2231 let (_end_tx, end_rx) = oneshot::channel::<()>();
2233 let result = state_sync
2234 .state_sync(&mut block_tx, end_rx)
2235 .await;
2236 assert!(result.is_err(), "state_sync should have errored during processing");
2238
2239 }
2242
2243 #[test_log::test(tokio::test)]
2244 async fn test_close_signal_while_waiting_for_first_deltas() {
2245 let mut rpc_client = make_mock_client();
2249 let mut deltas_client = MockDeltasClient::new();
2250
2251 rpc_client
2252 .expect_get_protocol_components()
2253 .returning(|_| {
2254 Ok(ProtocolComponentRequestResponse {
2255 protocol_components: vec![],
2256 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2257 })
2258 });
2259
2260 let (_tx, rx) = channel(1);
2261 deltas_client
2262 .expect_subscribe()
2263 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2264
2265 deltas_client
2266 .expect_unsubscribe()
2267 .return_once(|_| Ok(()));
2268
2269 let mut state_sync = ProtocolStateSynchronizer::new(
2270 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2271 true,
2272 ComponentFilter::with_tvl_range(0.0, 0.0),
2273 1,
2274 Duration::from_secs(0),
2275 true,
2276 true,
2277 false,
2278 ArcRPCClient(Arc::new(rpc_client)),
2279 ArcDeltasClient(Arc::new(deltas_client)),
2280 10000_u64,
2281 );
2282
2283 state_sync
2284 .initialize()
2285 .await
2286 .expect("Init should succeed");
2287
2288 let (mut block_tx, _block_rx) = channel(10);
2289 let (end_tx, end_rx) = oneshot::channel::<()>();
2290
2291 let state_sync_handle = tokio::spawn(async move {
2293 state_sync
2294 .state_sync(&mut block_tx, end_rx)
2295 .await
2296 });
2297
2298 tokio::time::sleep(Duration::from_millis(100)).await;
2300
2301 let _ = end_tx.send(());
2303
2304 let result = state_sync_handle
2306 .await
2307 .expect("Task should not panic");
2308 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2309
2310 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2311 }
2312
2313 #[test_log::test(tokio::test)]
2314 async fn test_close_signal_during_main_processing_loop() {
2315 let mut rpc_client = make_mock_client();
2321 let mut deltas_client = MockDeltasClient::new();
2322
2323 rpc_client
2325 .expect_get_protocol_components()
2326 .returning(|_| {
2327 Ok(ProtocolComponentRequestResponse {
2328 protocol_components: vec![],
2329 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2330 })
2331 });
2332
2333 rpc_client
2335 .expect_get_protocol_states()
2336 .returning(|_| {
2337 Ok(ProtocolStateRequestResponse {
2338 states: vec![],
2339 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2340 })
2341 });
2342
2343 rpc_client
2344 .expect_get_component_tvl()
2345 .returning(|_| {
2346 Ok(ComponentTvlRequestResponse {
2347 tvl: HashMap::new(),
2348 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2349 })
2350 });
2351
2352 rpc_client
2353 .expect_get_traced_entry_points()
2354 .returning(|_| {
2355 Ok(TracedEntryPointRequestResponse {
2356 traced_entry_points: HashMap::new(),
2357 pagination: PaginationResponse::new(0, 20, 0),
2358 })
2359 });
2360
2361 let (tx, rx) = channel(10);
2363 deltas_client
2364 .expect_subscribe()
2365 .return_once(move |_, _| {
2366 let first_delta = BlockChanges {
2368 extractor: "test".to_string(),
2369 chain: Chain::Ethereum,
2370 block: Block {
2371 hash: Bytes::from("0x0123"),
2372 number: 1,
2373 parent_hash: Bytes::from("0x0000"),
2374 chain: Chain::Ethereum,
2375 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2376 .unwrap()
2377 .naive_utc(),
2378 },
2379 revert: false,
2380 ..Default::default()
2381 };
2382
2383 tokio::spawn(async move {
2384 let _ = tx.send(first_delta).await;
2385 tokio::time::sleep(Duration::from_secs(30)).await;
2388 });
2389
2390 Ok((Uuid::default(), rx))
2391 });
2392
2393 deltas_client
2394 .expect_unsubscribe()
2395 .return_once(|_| Ok(()));
2396
2397 let mut state_sync = ProtocolStateSynchronizer::new(
2398 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2399 true,
2400 ComponentFilter::with_tvl_range(0.0, 1000.0),
2401 1,
2402 Duration::from_secs(0),
2403 true,
2404 false,
2405 true,
2406 ArcRPCClient(Arc::new(rpc_client)),
2407 ArcDeltasClient(Arc::new(deltas_client)),
2408 10000_u64,
2409 );
2410
2411 state_sync
2412 .initialize()
2413 .await
2414 .expect("Init should succeed");
2415
2416 let (mut block_tx, mut block_rx) = channel(10);
2417 let (end_tx, end_rx) = oneshot::channel::<()>();
2418
2419 let state_sync_handle = tokio::spawn(async move {
2421 state_sync
2422 .state_sync(&mut block_tx, end_rx)
2423 .await
2424 });
2425
2426 let first_snapshot = block_rx
2428 .recv()
2429 .await
2430 .expect("Should receive first snapshot")
2431 .expect("Synchronizer error");
2432 assert!(
2433 !first_snapshot
2434 .snapshots
2435 .states
2436 .is_empty() ||
2437 first_snapshot.deltas.is_some()
2438 );
2439 let _ = end_tx.send(());
2441
2442 let result = state_sync_handle
2444 .await
2445 .expect("Task should not panic");
2446 assert!(
2447 result.is_ok(),
2448 "state_sync should exit cleanly when closed after first message: {result:?}"
2449 );
2450 }
2451
2452 #[test_log::test(tokio::test)]
2453 async fn test_max_retries_exceeded_error_propagation() {
2454 let mut rpc_client = make_mock_client();
2458 let mut deltas_client = MockDeltasClient::new();
2459
2460 rpc_client
2462 .expect_get_protocol_components()
2463 .returning(|_| {
2464 Ok(ProtocolComponentRequestResponse {
2465 protocol_components: vec![],
2466 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2467 })
2468 });
2469
2470 deltas_client
2473 .expect_subscribe()
2474 .returning(|_, _| {
2475 Err(DeltasError::NotConnected)
2477 });
2478
2479 deltas_client
2481 .expect_unsubscribe()
2482 .returning(|_| Ok(()))
2483 .times(0..=5);
2484
2485 let mut state_sync = ProtocolStateSynchronizer::new(
2487 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2488 true,
2489 ComponentFilter::with_tvl_range(0.0, 1000.0),
2490 2, Duration::from_millis(10), true,
2493 false,
2494 true,
2495 ArcRPCClient(Arc::new(rpc_client)),
2496 ArcDeltasClient(Arc::new(deltas_client)),
2497 1000_u64,
2498 );
2499
2500 state_sync
2501 .initialize()
2502 .await
2503 .expect("Init should succeed");
2504
2505 let (handle, mut rx) = state_sync.start().await;
2507 let (jh, _close_tx) = handle.split();
2508
2509 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2510 .await
2511 .expect("responsds in time")
2512 .expect("channel open");
2513
2514 if let Err(err) = res {
2516 assert!(
2517 matches!(err, SynchronizerError::ConnectionClosed),
2518 "Expected ConnectionClosed error, got: {:?}",
2519 err
2520 );
2521 } else {
2522 panic!("Expected an error")
2523 }
2524
2525 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2527 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2528 }
2529
2530 #[test_log::test(tokio::test)]
2531 async fn test_is_next_expected() {
2532 let mut state_sync = with_mocked_clients(true, false, None, None);
2536
2537 let incoming_header = BlockHeader {
2539 number: 100,
2540 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2541 parent_hash: Bytes::from(
2542 "0x0000000000000000000000000000000000000000000000000000000000000000",
2543 ),
2544 revert: false,
2545 timestamp: 123456789,
2546 partial_block_index: None,
2547 };
2548 assert!(
2549 !state_sync.is_next_expected(&incoming_header),
2550 "Should return false when no previous block is set"
2551 );
2552
2553 let previous_header = BlockHeader {
2555 number: 99,
2556 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2557 parent_hash: Bytes::from(
2558 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2559 ),
2560 revert: false,
2561 timestamp: 123456788,
2562 partial_block_index: None,
2563 };
2564 state_sync.last_synced_block = Some(previous_header.clone());
2565
2566 assert!(
2567 state_sync.is_next_expected(&incoming_header),
2568 "Should return true when incoming parent_hash matches previous hash"
2569 );
2570
2571 let non_matching_header = BlockHeader {
2573 number: 100,
2574 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2575 parent_hash: Bytes::from(
2576 "0x1111111111111111111111111111111111111111111111111111111111111111",
2577 ), revert: false,
2579 timestamp: 123456789,
2580 partial_block_index: None,
2581 };
2582 assert!(
2583 !state_sync.is_next_expected(&non_matching_header),
2584 "Should return false when incoming parent_hash doesn't match previous hash"
2585 );
2586 }
2587
2588 #[test_log::test(tokio::test)]
2589 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2590 let mut rpc_client = make_mock_client();
2594 let mut deltas_client = MockDeltasClient::new();
2595
2596 rpc_client
2598 .expect_get_protocol_components()
2599 .returning(|_| {
2600 Ok(ProtocolComponentRequestResponse {
2601 protocol_components: vec![ProtocolComponent {
2602 id: "Component1".to_string(),
2603 ..Default::default()
2604 }],
2605 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2606 })
2607 });
2608
2609 let (tx, rx) = channel(10);
2611 deltas_client
2612 .expect_subscribe()
2613 .return_once(move |_, _| {
2614 let expected_next_delta = BlockChanges {
2615 extractor: "uniswap-v2".to_string(),
2616 chain: Chain::Ethereum,
2617 block: Block {
2618 hash: Bytes::from(
2619 "0x0000000000000000000000000000000000000000000000000000000000000002",
2620 ), number: 2,
2622 parent_hash: Bytes::from(
2623 "0x0000000000000000000000000000000000000000000000000000000000000001",
2624 ), chain: Chain::Ethereum,
2626 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2627 .unwrap()
2628 .naive_utc(),
2629 },
2630 revert: false,
2631 ..Default::default()
2632 };
2633
2634 tokio::spawn(async move {
2635 let _ = tx.send(expected_next_delta).await;
2636 });
2637
2638 Ok((Uuid::default(), rx))
2639 });
2640
2641 deltas_client
2642 .expect_unsubscribe()
2643 .return_once(|_| Ok(()));
2644
2645 let mut state_sync = ProtocolStateSynchronizer::new(
2646 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2647 true,
2648 ComponentFilter::with_tvl_range(0.0, 1000.0),
2649 1,
2650 Duration::from_secs(0),
2651 true, false,
2653 true,
2654 ArcRPCClient(Arc::new(rpc_client)),
2655 ArcDeltasClient(Arc::new(deltas_client)),
2656 10000_u64,
2657 );
2658
2659 state_sync
2661 .initialize()
2662 .await
2663 .expect("Init should succeed");
2664
2665 state_sync.last_synced_block = Some(BlockHeader {
2667 number: 1,
2668 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2670 "0x0000000000000000000000000000000000000000000000000000000000000000",
2671 ),
2672 revert: false,
2673 timestamp: 123456789,
2674 partial_block_index: None,
2675 });
2676
2677 let (mut block_tx, mut block_rx) = channel(10);
2678 let (end_tx, end_rx) = oneshot::channel::<()>();
2679
2680 let state_sync_handle = tokio::spawn(async move {
2682 state_sync
2683 .state_sync(&mut block_tx, end_rx)
2684 .await
2685 });
2686
2687 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2689 .await
2690 .expect("Should receive message within timeout")
2691 .expect("Channel should be open")
2692 .expect("Should not be an error");
2693
2694 let _ = end_tx.send(());
2696
2697 let _ = state_sync_handle
2699 .await
2700 .expect("Task should not panic");
2701
2702 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2705 assert!(
2706 result_msg.snapshots.states.is_empty(),
2707 "Should not contain snapshots when next expected block is received"
2708 );
2709
2710 if let Some(deltas) = &result_msg.deltas {
2712 assert_eq!(deltas.block.number, 2);
2713 assert_eq!(
2714 deltas.block.hash,
2715 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2716 );
2717 assert_eq!(
2718 deltas.block.parent_hash,
2719 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2720 );
2721 }
2722 }
2723
2724 #[test_log::test(tokio::test)]
2725 async fn test_skip_previously_processed_messages() {
2726 let mut rpc_client = make_mock_client();
2730 let mut deltas_client = MockDeltasClient::new();
2731
2732 rpc_client
2734 .expect_get_protocol_components()
2735 .returning(|_| {
2736 Ok(ProtocolComponentRequestResponse {
2737 protocol_components: vec![ProtocolComponent {
2738 id: "Component1".to_string(),
2739 ..Default::default()
2740 }],
2741 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2742 })
2743 });
2744
2745 rpc_client
2747 .expect_get_protocol_states()
2748 .returning(|_| {
2749 Ok(ProtocolStateRequestResponse {
2750 states: vec![ResponseProtocolState {
2751 component_id: "Component1".to_string(),
2752 ..Default::default()
2753 }],
2754 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2755 })
2756 });
2757
2758 rpc_client
2759 .expect_get_component_tvl()
2760 .returning(|_| {
2761 Ok(ComponentTvlRequestResponse {
2762 tvl: [("Component1".to_string(), 100.0)]
2763 .into_iter()
2764 .collect(),
2765 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2766 })
2767 });
2768
2769 rpc_client
2770 .expect_get_traced_entry_points()
2771 .returning(|_| {
2772 Ok(TracedEntryPointRequestResponse {
2773 traced_entry_points: HashMap::new(),
2774 pagination: PaginationResponse::new(0, 20, 0),
2775 })
2776 });
2777
2778 let (tx, rx) = channel(10);
2780 deltas_client
2781 .expect_subscribe()
2782 .return_once(move |_, _| {
2783 let old_messages = vec![
2785 BlockChanges {
2786 extractor: "uniswap-v2".to_string(),
2787 chain: Chain::Ethereum,
2788 block: Block {
2789 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2790 number: 3,
2791 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2792 chain: Chain::Ethereum,
2793 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2794 },
2795 revert: false,
2796 ..Default::default()
2797 },
2798 BlockChanges {
2799 extractor: "uniswap-v2".to_string(),
2800 chain: Chain::Ethereum,
2801 block: Block {
2802 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2803 number: 4,
2804 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2805 chain: Chain::Ethereum,
2806 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2807 },
2808 revert: false,
2809 ..Default::default()
2810 },
2811 BlockChanges {
2812 extractor: "uniswap-v2".to_string(),
2813 chain: Chain::Ethereum,
2814 block: Block {
2815 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2816 number: 5,
2817 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2818 chain: Chain::Ethereum,
2819 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2820 },
2821 revert: false,
2822 ..Default::default()
2823 },
2824 BlockChanges {
2826 extractor: "uniswap-v2".to_string(),
2827 chain: Chain::Ethereum,
2828 block: Block {
2829 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2830 number: 6,
2831 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2832 chain: Chain::Ethereum,
2833 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2834 },
2835 revert: false,
2836 ..Default::default()
2837 },
2838 ];
2839
2840 tokio::spawn(async move {
2841 for message in old_messages {
2842 let _ = tx.send(message).await;
2843 tokio::time::sleep(Duration::from_millis(10)).await;
2844 }
2845 });
2846
2847 Ok((Uuid::default(), rx))
2848 });
2849
2850 deltas_client
2851 .expect_unsubscribe()
2852 .return_once(|_| Ok(()));
2853
2854 let mut state_sync = ProtocolStateSynchronizer::new(
2855 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2856 true,
2857 ComponentFilter::with_tvl_range(0.0, 1000.0),
2858 1,
2859 Duration::from_secs(0),
2860 true,
2861 true,
2862 true,
2863 ArcRPCClient(Arc::new(rpc_client)),
2864 ArcDeltasClient(Arc::new(deltas_client)),
2865 10000_u64,
2866 );
2867
2868 state_sync
2870 .initialize()
2871 .await
2872 .expect("Init should succeed");
2873
2874 state_sync.last_synced_block = Some(BlockHeader {
2875 number: 5,
2876 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2877 parent_hash: Bytes::from(
2878 "0x0000000000000000000000000000000000000000000000000000000000000004",
2879 ),
2880 revert: false,
2881 timestamp: 1234567892,
2882 partial_block_index: None,
2883 });
2884
2885 let (mut block_tx, mut block_rx) = channel(10);
2886 let (end_tx, end_rx) = oneshot::channel::<()>();
2887
2888 let state_sync_handle = tokio::spawn(async move {
2890 state_sync
2891 .state_sync(&mut block_tx, end_rx)
2892 .await
2893 });
2894
2895 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2897 .await
2898 .expect("Should receive message within timeout")
2899 .expect("Channel should be open")
2900 .expect("Should not be an error");
2901
2902 let _ = end_tx.send(());
2904
2905 let _ = state_sync_handle
2907 .await
2908 .expect("Task should not panic");
2909
2910 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2912 if let Some(deltas) = &result_msg.deltas {
2913 assert_eq!(
2914 deltas.block.number, 6,
2915 "Should only process block 6, skipping earlier blocks"
2916 );
2917 assert_eq!(
2918 deltas.block.hash,
2919 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2920 );
2921 }
2922
2923 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2926 Err(_) => {
2927 }
2929 Ok(Some(Err(_))) => {
2930 }
2932 Ok(Some(Ok(_))) => {
2933 panic!("Should not receive additional messages - old blocks should be skipped");
2934 }
2935 Ok(None) => {
2936 }
2938 }
2939 }
2940
2941 fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
2942 let hash = Bytes::from(vec![block_num as u8; 32]);
2944 let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
2945 BlockChanges {
2946 extractor: "uniswap-v2".to_string(),
2947 chain: Chain::Ethereum,
2948 block: Block {
2949 number: block_num,
2950 hash,
2951 parent_hash,
2952 chain: Chain::Ethereum,
2953 ts: Default::default(),
2954 },
2955 revert: false,
2956 partial_block_index: partial_idx,
2957 ..Default::default()
2958 }
2959 }
2960
2961 #[test_log::test(tokio::test)]
2963 async fn test_partial_mode_accepts_full_block_as_first_message() {
2964 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2965 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2966 .with_partial_blocks(true);
2967 state_sync
2968 .initialize()
2969 .await
2970 .expect("Init failed");
2971
2972 let (handle, mut block_rx) = state_sync.start().await;
2973 let (jh, close_tx) = handle.split();
2974
2975 tx.send(make_block_changes(1, None))
2977 .await
2978 .unwrap();
2979
2980 let msg = timeout(Duration::from_millis(100), block_rx.recv())
2982 .await
2983 .expect("Should receive message")
2984 .expect("Channel open")
2985 .expect("No error");
2986
2987 assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
2988 assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
2989
2990 let _ = close_tx.send(());
2991 jh.await.expect("Task should not panic");
2992 }
2993
2994 #[test_log::test(tokio::test)]
2996 async fn test_partial_mode_detects_block_number_increase() {
2997 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2998 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2999 .with_partial_blocks(true);
3000 state_sync
3001 .initialize()
3002 .await
3003 .expect("Init failed");
3004
3005 let (handle, mut block_rx) = state_sync.start().await;
3006 let (jh, close_tx) = handle.split();
3007
3008 tx.send(make_block_changes(1, Some(0)))
3010 .await
3011 .unwrap();
3012 tx.send(make_block_changes(1, Some(3)))
3013 .await
3014 .unwrap();
3015
3016 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3018 Err(_) => { }
3019 Ok(_) => panic!("Should not receive message while waiting for new block"),
3020 }
3021
3022 tx.send(make_block_changes(2, Some(5)))
3025 .await
3026 .unwrap();
3027
3028 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3030 .await
3031 .expect("Should receive message")
3032 .expect("Channel open")
3033 .expect("No error");
3034
3035 assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3036 assert_eq!(msg.header.partial_block_index, Some(5));
3037
3038 let _ = close_tx.send(());
3039 jh.await.expect("Task should not panic");
3040 }
3041
3042 #[test_log::test(tokio::test)]
3044 async fn test_partial_mode_skips_already_synced_blocks() {
3045 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3046 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3047 .with_partial_blocks(true);
3048 state_sync
3049 .initialize()
3050 .await
3051 .expect("Init failed");
3052
3053 state_sync.last_synced_block = Some(BlockHeader {
3055 number: 5,
3056 hash: Bytes::from("0x05"),
3057 parent_hash: Bytes::from("0x04"),
3058 revert: false,
3059 timestamp: 0,
3060 partial_block_index: None,
3061 });
3062
3063 let (handle, mut block_rx) = state_sync.start().await;
3064 let (jh, close_tx) = handle.split();
3065
3066 tx.send(make_block_changes(3, Some(2)))
3068 .await
3069 .unwrap();
3070
3071 tx.send(make_block_changes(4, Some(0)))
3073 .await
3074 .unwrap();
3075
3076 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3078 Err(_) => { }
3079 Ok(_) => panic!("Should skip block 4 because it's already synced"),
3080 }
3081
3082 tx.send(make_block_changes(5, Some(3)))
3085 .await
3086 .unwrap();
3087 tx.send(make_block_changes(6, Some(0)))
3089 .await
3090 .unwrap();
3091
3092 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3093 .await
3094 .expect("Should receive message")
3095 .expect("Channel open")
3096 .expect("No error");
3097
3098 assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3099
3100 let _ = close_tx.send(());
3101 jh.await.expect("Task should not panic");
3102 }
3103}