1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7 select,
8 sync::{
9 mpsc::{channel, error::SendError, Receiver, Sender},
10 oneshot,
11 },
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19 ResponseAccount, ResponseProtocolState, TracingResult,
20 },
21 Bytes,
22};
23
24use crate::{
25 deltas::{DeltasClient, SubscriptionOptions},
26 feed::{
27 component_tracker::{ComponentFilter, ComponentTracker},
28 BlockHeader, HeaderLike,
29 },
30 rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31 DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36 #[error("RPC error: {0}")]
38 RPCError(#[from] RPCError),
39
40 #[error("{0}")]
42 ChannelError(String),
43
44 #[error("Timeout error: {0}")]
46 Timeout(String),
47
48 #[error("Failed to close synchronizer: {0}")]
50 CloseError(String),
51
52 #[error("Connection error: {0}")]
54 ConnectionError(String),
55
56 #[error("Connection closed")]
58 ConnectionClosed,
59
60 #[error("Internal error: {0}")]
62 Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68 fn from(err: SendError<T>) -> Self {
69 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70 }
71}
72
73impl From<DeltasError> for SynchronizerError {
74 fn from(err: DeltasError) -> Self {
75 match err {
76 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77 _ => SynchronizerError::ConnectionError(err.to_string()),
78 }
79 }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83 extractor_id: ExtractorIdentity,
84 retrieve_balances: bool,
85 rpc_client: R,
86 deltas_client: D,
87 max_retries: u64,
88 retry_cooldown: Duration,
89 include_snapshots: bool,
90 component_tracker: ComponentTracker<R>,
91 last_synced_block: Option<BlockHeader>,
92 timeout: u64,
93 include_tvl: bool,
94 compression: bool,
95 partial_blocks: bool,
96 uses_dci: bool,
97}
98
99#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
100pub struct ComponentWithState {
101 pub state: ResponseProtocolState,
102 pub component: ProtocolComponent,
103 pub component_tvl: Option<f64>,
104 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
105}
106
107#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
108pub struct Snapshot {
109 pub states: HashMap<String, ComponentWithState>,
110 pub vm_storage: HashMap<Bytes, ResponseAccount>,
111}
112
113impl Snapshot {
114 fn extend(&mut self, other: Snapshot) {
115 self.states.extend(other.states);
116 self.vm_storage.extend(other.vm_storage);
117 }
118
119 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
120 &self.states
121 }
122
123 pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
124 &self.vm_storage
125 }
126}
127
128#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
129pub struct StateSyncMessage<H>
130where
131 H: HeaderLike,
132{
133 pub header: H,
135 pub snapshots: Snapshot,
137 pub deltas: Option<BlockChanges>,
141 pub removed_components: HashMap<String, ProtocolComponent>,
143}
144
145impl<H> StateSyncMessage<H>
146where
147 H: HeaderLike,
148{
149 pub fn merge(mut self, other: Self) -> Self {
150 self.removed_components
152 .retain(|k, _| !other.snapshots.states.contains_key(k));
153 self.snapshots
154 .states
155 .retain(|k, _| !other.removed_components.contains_key(k));
156
157 self.snapshots.extend(other.snapshots);
158 let deltas = match (self.deltas, other.deltas) {
159 (Some(l), Some(r)) => Some(l.merge(r)),
160 (None, Some(r)) => Some(r),
161 (Some(l), None) => Some(l),
162 (None, None) => None,
163 };
164 self.removed_components
165 .extend(other.removed_components);
166 Self {
167 header: other.header,
168 snapshots: self.snapshots,
169 deltas,
170 removed_components: self.removed_components,
171 }
172 }
173}
174
175pub struct SynchronizerTaskHandle {
180 join_handle: JoinHandle<()>,
181 close_tx: oneshot::Sender<()>,
182}
183
184impl SynchronizerTaskHandle {
193 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
194 Self { join_handle, close_tx }
195 }
196
197 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
203 (self.join_handle, self.close_tx)
204 }
205}
206
207#[async_trait]
208pub trait StateSynchronizer: Send + Sync + 'static {
209 async fn initialize(&mut self) -> SyncResult<()>;
210 async fn start(
213 mut self,
214 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
215}
216
217impl<R, D> ProtocolStateSynchronizer<R, D>
218where
219 R: RPCClient + Clone + Send + Sync + 'static,
222 D: DeltasClient + Clone + Send + Sync + 'static,
223{
224 #[allow(clippy::too_many_arguments)]
226 pub fn new(
227 extractor_id: ExtractorIdentity,
228 retrieve_balances: bool,
229 component_filter: ComponentFilter,
230 max_retries: u64,
231 retry_cooldown: Duration,
232 include_snapshots: bool,
233 include_tvl: bool,
234 compression: bool,
235 rpc_client: R,
236 deltas_client: D,
237 timeout: u64,
238 ) -> Self {
239 Self {
240 extractor_id: extractor_id.clone(),
241 retrieve_balances,
242 rpc_client: rpc_client.clone(),
243 include_snapshots,
244 deltas_client,
245 component_tracker: ComponentTracker::new(
246 extractor_id.chain,
247 extractor_id.name.as_str(),
248 component_filter,
249 rpc_client,
250 ),
251 max_retries,
252 retry_cooldown,
253 last_synced_block: None,
254 timeout,
255 include_tvl,
256 compression,
257 partial_blocks: false,
258 uses_dci: false,
259 }
260 }
261
262 pub fn with_dci(mut self, uses_dci: bool) -> Self {
265 self.uses_dci = uses_dci;
266 self
267 }
268
269 pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
271 self.partial_blocks = partial_blocks;
272 self
273 }
274
275 async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
277 &mut self,
278 header: BlockHeader,
279 ids: Option<I>,
280 ) -> SyncResult<StateSyncMessage<BlockHeader>> {
281 if !self.include_snapshots {
282 return Ok(StateSyncMessage { header, ..Default::default() });
283 }
284
285 let component_ids: Vec<_> = match ids {
287 Some(ids) => ids.into_iter().cloned().collect(),
288 None => self
289 .component_tracker
290 .get_tracked_component_ids(),
291 };
292
293 if component_ids.is_empty() {
294 return Ok(StateSyncMessage { header, ..Default::default() });
295 }
296
297 let entrypoints_result = if self.uses_dci {
298 let result = self
299 .rpc_client
300 .get_traced_entry_points_paginated(
301 self.extractor_id.chain,
302 &self.extractor_id.name,
303 &component_ids,
304 None,
305 RPC_CLIENT_CONCURRENCY,
306 )
307 .await?;
308 self.component_tracker
309 .process_entrypoints(&result.clone().into());
310 result.traced_entry_points.clone()
311 } else {
312 HashMap::new()
313 };
314
315 let contract_ids: Vec<Bytes> = self
317 .component_tracker
318 .get_contracts_by_component(&component_ids)
319 .into_iter()
320 .collect();
321
322 let filtered_components: HashMap<_, _> = self
324 .component_tracker
325 .components
326 .iter()
327 .filter(|(id, _)| component_ids.contains(id))
328 .map(|(k, v)| (k.clone(), v.clone()))
329 .collect();
330
331 let request = SnapshotParameters::new(
332 self.extractor_id.chain,
333 &self.extractor_id.name,
334 &filtered_components,
335 &contract_ids,
336 header.number,
337 )
338 .entrypoints(&entrypoints_result)
339 .include_balances(self.retrieve_balances)
340 .include_tvl(self.include_tvl);
341 let snapshot_response = self
342 .rpc_client
343 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
344 .await?;
345
346 trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
347 trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
348
349 Ok(StateSyncMessage {
350 header,
351 snapshots: snapshot_response,
352 deltas: None,
353 removed_components: HashMap::new(),
354 })
355 }
356
357 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
370 async fn state_sync(
371 &mut self,
372 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
373 mut end_rx: oneshot::Receiver<()>,
374 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
375 let subscription_options = SubscriptionOptions::new()
377 .with_state(self.include_snapshots)
378 .with_compression(self.compression)
379 .with_partial_blocks(self.partial_blocks);
380 let (subscription_id, mut msg_rx) = match self
381 .deltas_client
382 .subscribe(self.extractor_id.clone(), subscription_options)
383 .await
384 {
385 Ok(result) => result,
386 Err(e) => return Err((e.into(), Some(end_rx))),
387 };
388
389 let result = async {
390 info!("Waiting for deltas...");
391 let mut warned_waiting_for_new_block = false;
392 let mut warned_skipping_synced = false;
393 let mut last_block_number: Option<u64> = None;
395 let mut first_msg = loop {
396 let msg = select! {
397 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
398 deltas_result
399 .map_err(|_| {
400 SynchronizerError::Timeout(format!(
401 "First deltas took longer than {t}s to arrive",
402 t = self.timeout
403 ))
404 })?
405 .ok_or_else(|| {
406 SynchronizerError::ConnectionError(
407 "Deltas channel closed before first message".to_string(),
408 )
409 })?
410 },
411 _ = &mut end_rx => {
412 info!("Received close signal while waiting for first deltas");
413 return Ok(());
414 }
415 };
416
417 let incoming: BlockHeader = (&msg).into();
418
419 let is_new_block_candidate = if self.partial_blocks {
423 match msg.partial_block_index {
424 None => {
425 last_block_number = Some(incoming.number);
427 true
428 }
429 Some(current_partial_idx) => {
430 let is_new_block = last_block_number
431 .map(|prev_block| incoming.number > prev_block)
432 .unwrap_or(false);
433
434 if !warned_waiting_for_new_block {
435 info!(
436 extractor=%self.extractor_id,
437 block=incoming.number,
438 partial_idx=current_partial_idx,
439 "Syncing. Waiting for new block to start"
440 );
441 warned_waiting_for_new_block = true;
442 }
443 last_block_number = Some(incoming.number);
444 is_new_block
445 }
446 }
447 } else {
448 true };
450
451 if !is_new_block_candidate {
452 continue;
453 }
454
455 if let Some(current) = &self.last_synced_block {
457 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
458 if !warned_skipping_synced {
459 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
460 warned_skipping_synced = true;
461 }
462 continue;
463 }
464 }
465 break msg;
466 };
467
468 self.filter_deltas(&mut first_msg);
469
470 info!(height = first_msg.get_block().number, "First deltas received");
472 let header: BlockHeader = (&first_msg).into();
473 let deltas_msg = StateSyncMessage {
474 header: header.clone(),
475 snapshots: Default::default(),
476 deltas: Some(first_msg),
477 removed_components: Default::default(),
478 };
479
480 let msg = if !self.is_next_expected(&header) {
482 info!("Retrieving snapshot");
483 let snapshot_header = BlockHeader { revert: false, ..header.clone() };
484 let snapshot = self
485 .get_snapshots::<Vec<&String>>(
486 snapshot_header,
487 None,
488 )
489 .await?
490 .merge(deltas_msg);
491 let n_components = self.component_tracker.components.len();
492 let n_snapshots = snapshot.snapshots.states.len();
493 info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
494 snapshot
495 } else {
496 deltas_msg
497 };
498 block_tx.send(Ok(msg)).await?;
499 self.last_synced_block = Some(header.clone());
500 loop {
501 select! {
502 deltas_opt = msg_rx.recv() => {
503 if let Some(mut deltas) = deltas_opt {
504 let header: BlockHeader = (&deltas).into();
505 debug!(block_number=?header.number, "Received delta message");
506
507 let (snapshots, removed_components) = {
508 let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
511
512 let requiring_snapshot: Vec<_> = to_add
514 .iter()
515 .filter(|id| {
516 !self.component_tracker
517 .components
518 .contains_key(id.as_str())
519 })
520 .collect();
521 debug!(components=?requiring_snapshot, "SnapshotRequest");
522 self.component_tracker
523 .start_tracking(requiring_snapshot.as_slice())
524 .await?;
525
526 let snapshots = self
527 .get_snapshots(header.clone(), Some(requiring_snapshot))
528 .await?
529 .snapshots;
530
531 let removed_components = if !to_remove.is_empty() {
532 self.component_tracker.stop_tracking(&to_remove)
533 } else {
534 Default::default()
535 };
536
537 (snapshots, removed_components)
538 };
539
540 self.component_tracker.process_entrypoints(&deltas.dci_update);
542
543 self.filter_deltas(&mut deltas);
545 let n_changes = deltas.n_changes();
546
547 let next = StateSyncMessage {
549 header: header.clone(),
550 snapshots,
551 deltas: Some(deltas),
552 removed_components,
553 };
554 block_tx.send(Ok(next)).await?;
555 self.last_synced_block = Some(header.clone());
556
557 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
558 } else {
559 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
560 }
561 },
562 _ = &mut end_rx => {
563 info!("Received close signal during state_sync");
564 return Ok(());
565 }
566 }
567 }
568 }.await;
569
570 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
572 let _ = self
574 .deltas_client
575 .unsubscribe(subscription_id)
576 .await
577 .map_err(|err| {
578 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
579 });
580
581 match result {
584 Ok(()) => Ok(()), Err(e) => {
586 Err((e, Some(end_rx)))
590 }
591 }
592 }
593
594 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
595 if let Some(block) = self.last_synced_block.as_ref() {
596 return incoming.parent_hash == block.hash;
597 }
598 false
599 }
600 fn filter_deltas(&self, deltas: &mut BlockChanges) {
601 deltas.filter_by_component(|id| {
602 self.component_tracker
603 .components
604 .contains_key(id)
605 });
606 deltas.filter_by_contract(|id| {
607 self.component_tracker
608 .contracts
609 .contains(id)
610 });
611 }
612}
613
614#[async_trait]
615impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
616where
617 R: RPCClient + Clone + Send + Sync + 'static,
618 D: DeltasClient + Clone + Send + Sync + 'static,
619{
620 async fn initialize(&mut self) -> SyncResult<()> {
621 info!("Retrieving relevant protocol components");
622 self.component_tracker
623 .initialise_components()
624 .await?;
625 info!(
626 n_components = self.component_tracker.components.len(),
627 n_contracts = self.component_tracker.contracts.len(),
628 extractor = %self.extractor_id,
629 "Finished retrieving components",
630 );
631
632 Ok(())
633 }
634
635 async fn start(
636 mut self,
637 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
638 let (mut tx, rx) = channel(15);
639 let (end_tx, end_rx) = oneshot::channel::<()>();
640
641 let jh = tokio::spawn(async move {
642 let mut retry_count = 0;
643 let mut current_end_rx = end_rx;
644 let mut final_error = None;
645
646 while retry_count < self.max_retries {
647 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
648
649 let res = self
650 .state_sync(&mut tx, current_end_rx)
651 .await;
652 match res {
653 Ok(()) => {
654 info!(
655 extractor_id=%&self.extractor_id,
656 retry_count,
657 "State synchronization exited cleanly"
658 );
659 return;
660 }
661 Err((e, maybe_end_rx)) => {
662 warn!(
663 extractor_id=%&self.extractor_id,
664 retry_count,
665 error=%e,
666 "State synchronization errored!"
667 );
668
669 if let Some(recovered_end_rx) = maybe_end_rx {
671 current_end_rx = recovered_end_rx;
672
673 if let SynchronizerError::ConnectionClosed = e {
674 error!(
676 "Websocket connection closed. State synchronization exiting."
677 );
678 let _ = tx.send(Err(e)).await;
679 return;
680 } else {
681 final_error = Some(e);
683 }
684 } else {
685 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
687 return;
688 }
689 }
690 }
691 sleep(self.retry_cooldown).await;
692 retry_count += 1;
693 }
694 if let Some(e) = final_error {
695 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
696 let _ = tx.send(Err(e)).await;
697 }
698 });
699
700 let handle = SynchronizerTaskHandle::new(jh, end_tx);
701 (handle, rx)
702 }
703}
704
705#[cfg(test)]
706mod test {
707 use std::{collections::HashSet, sync::Arc};
726
727 use tycho_common::dto::{
728 AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
729 DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
730 ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
731 ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
732 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
733 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
734 };
735 use uuid::Uuid;
736
737 use super::*;
738 use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
739
740 struct ArcRPCClient<T>(Arc<T>);
742
743 impl<T> Clone for ArcRPCClient<T> {
745 fn clone(&self) -> Self {
746 ArcRPCClient(self.0.clone())
747 }
748 }
749
750 #[async_trait]
751 impl<T> RPCClient for ArcRPCClient<T>
752 where
753 T: RPCClient + Sync + Send + 'static,
754 {
755 async fn get_tokens(
756 &self,
757 request: &TokensRequestBody,
758 ) -> Result<TokensRequestResponse, RPCError> {
759 self.0.get_tokens(request).await
760 }
761
762 async fn get_contract_state(
763 &self,
764 request: &StateRequestBody,
765 ) -> Result<StateRequestResponse, RPCError> {
766 self.0.get_contract_state(request).await
767 }
768
769 async fn get_protocol_components(
770 &self,
771 request: &ProtocolComponentsRequestBody,
772 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
773 self.0
774 .get_protocol_components(request)
775 .await
776 }
777
778 async fn get_protocol_states(
779 &self,
780 request: &ProtocolStateRequestBody,
781 ) -> Result<ProtocolStateRequestResponse, RPCError> {
782 self.0
783 .get_protocol_states(request)
784 .await
785 }
786
787 async fn get_protocol_systems(
788 &self,
789 request: &ProtocolSystemsRequestBody,
790 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
791 self.0
792 .get_protocol_systems(request)
793 .await
794 }
795
796 async fn get_component_tvl(
797 &self,
798 request: &ComponentTvlRequestBody,
799 ) -> Result<ComponentTvlRequestResponse, RPCError> {
800 self.0.get_component_tvl(request).await
801 }
802
803 async fn get_traced_entry_points(
804 &self,
805 request: &TracedEntryPointRequestBody,
806 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
807 self.0
808 .get_traced_entry_points(request)
809 .await
810 }
811
812 async fn get_snapshots<'a>(
813 &self,
814 request: &SnapshotParameters<'a>,
815 chunk_size: Option<usize>,
816 concurrency: usize,
817 ) -> Result<Snapshot, RPCError> {
818 self.0
819 .get_snapshots(request, chunk_size, concurrency)
820 .await
821 }
822
823 fn compression(&self) -> bool {
824 self.0.compression()
825 }
826 }
827
828 struct ArcDeltasClient<T>(Arc<T>);
830
831 impl<T> Clone for ArcDeltasClient<T> {
833 fn clone(&self) -> Self {
834 ArcDeltasClient(self.0.clone())
835 }
836 }
837
838 #[async_trait]
839 impl<T> DeltasClient for ArcDeltasClient<T>
840 where
841 T: DeltasClient + Sync + Send + 'static,
842 {
843 async fn subscribe(
844 &self,
845 extractor_id: ExtractorIdentity,
846 options: SubscriptionOptions,
847 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
848 self.0
849 .subscribe(extractor_id, options)
850 .await
851 }
852
853 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
854 self.0
855 .unsubscribe(subscription_id)
856 .await
857 }
858
859 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
860 self.0.connect().await
861 }
862
863 async fn close(&self) -> Result<(), DeltasError> {
864 self.0.close().await
865 }
866 }
867
868 fn with_mocked_clients(
869 native: bool,
870 include_tvl: bool,
871 rpc_client: Option<MockRPCClient>,
872 deltas_client: Option<MockDeltasClient>,
873 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
874 {
875 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
876 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
877
878 ProtocolStateSynchronizer::new(
879 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
880 native,
881 ComponentFilter::with_tvl_range(50.0, 50.0),
882 1,
883 Duration::from_secs(0),
884 true,
885 include_tvl,
886 true, rpc_client,
888 deltas_client,
889 10_u64,
890 )
891 }
892
893 fn state_snapshot_native() -> ProtocolStateRequestResponse {
894 ProtocolStateRequestResponse {
895 states: vec![ResponseProtocolState {
896 component_id: "Component1".to_string(),
897 ..Default::default()
898 }],
899 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
900 }
901 }
902
903 fn make_mock_client() -> MockRPCClient {
904 let mut m = MockRPCClient::new();
905 m.expect_compression()
906 .return_const(false);
907 m
908 }
909
910 #[test_log::test(tokio::test)]
911 async fn test_get_snapshots_native() {
912 let header = BlockHeader::default();
913 let mut rpc = make_mock_client();
914 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
915
916 let component_clone = component.clone();
917 rpc.expect_get_snapshots()
918 .returning(move |_request, _chunk_size, _concurrency| {
919 Ok(Snapshot {
920 states: state_snapshot_native()
921 .states
922 .into_iter()
923 .map(|state| {
924 (
925 state.component_id.clone(),
926 ComponentWithState {
927 state,
928 component: component_clone.clone(),
929 entrypoints: vec![],
930 component_tvl: None,
931 },
932 )
933 })
934 .collect(),
935 vm_storage: HashMap::new(),
936 })
937 });
938
939 rpc.expect_get_traced_entry_points()
940 .returning(|_| {
941 Ok(TracedEntryPointRequestResponse {
942 traced_entry_points: HashMap::new(),
943 pagination: PaginationResponse::new(0, 20, 0),
944 })
945 });
946
947 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
948 state_sync
949 .component_tracker
950 .components
951 .insert("Component1".to_string(), component.clone());
952 let components_arg = ["Component1".to_string()];
953 let exp = StateSyncMessage {
954 header: header.clone(),
955 snapshots: Snapshot {
956 states: state_snapshot_native()
957 .states
958 .into_iter()
959 .map(|state| {
960 (
961 state.component_id.clone(),
962 ComponentWithState {
963 state,
964 component: component.clone(),
965 entrypoints: vec![],
966 component_tvl: None,
967 },
968 )
969 })
970 .collect(),
971 vm_storage: HashMap::new(),
972 },
973 deltas: None,
974 removed_components: Default::default(),
975 };
976
977 let snap = state_sync
978 .get_snapshots(header, Some(&components_arg))
979 .await
980 .expect("Retrieving snapshot failed");
981
982 assert_eq!(snap, exp);
983 }
984
985 #[test_log::test(tokio::test)]
986 async fn test_get_snapshots_native_with_tvl() {
987 let header = BlockHeader::default();
988 let mut rpc = make_mock_client();
989 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
990
991 let component_clone = component.clone();
992 rpc.expect_get_snapshots()
993 .returning(move |_request, _chunk_size, _concurrency| {
994 Ok(Snapshot {
995 states: state_snapshot_native()
996 .states
997 .into_iter()
998 .map(|state| {
999 (
1000 state.component_id.clone(),
1001 ComponentWithState {
1002 state,
1003 component: component_clone.clone(),
1004 component_tvl: Some(100.0),
1005 entrypoints: vec![],
1006 },
1007 )
1008 })
1009 .collect(),
1010 vm_storage: HashMap::new(),
1011 })
1012 });
1013
1014 rpc.expect_get_traced_entry_points()
1015 .returning(|_| {
1016 Ok(TracedEntryPointRequestResponse {
1017 traced_entry_points: HashMap::new(),
1018 pagination: PaginationResponse::new(0, 20, 0),
1019 })
1020 });
1021
1022 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1023 state_sync
1024 .component_tracker
1025 .components
1026 .insert("Component1".to_string(), component.clone());
1027 let components_arg = ["Component1".to_string()];
1028 let exp = StateSyncMessage {
1029 header: header.clone(),
1030 snapshots: Snapshot {
1031 states: state_snapshot_native()
1032 .states
1033 .into_iter()
1034 .map(|state| {
1035 (
1036 state.component_id.clone(),
1037 ComponentWithState {
1038 state,
1039 component: component.clone(),
1040 component_tvl: Some(100.0),
1041 entrypoints: vec![],
1042 },
1043 )
1044 })
1045 .collect(),
1046 vm_storage: HashMap::new(),
1047 },
1048 deltas: None,
1049 removed_components: Default::default(),
1050 };
1051
1052 let snap = state_sync
1053 .get_snapshots(header, Some(&components_arg))
1054 .await
1055 .expect("Retrieving snapshot failed");
1056
1057 assert_eq!(snap, exp);
1058 }
1059
1060 fn state_snapshot_vm() -> StateRequestResponse {
1061 StateRequestResponse {
1062 accounts: vec![
1063 ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1064 ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1065 ],
1066 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1067 }
1068 }
1069
1070 fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1071 TracedEntryPointRequestResponse {
1072 traced_entry_points: HashMap::from([(
1073 "Component1".to_string(),
1074 vec![(
1075 EntryPointWithTracingParams {
1076 entry_point: EntryPoint {
1077 external_id: "entrypoint_a".to_string(),
1078 target: Bytes::from("0x0badc0ffee"),
1079 signature: "sig()".to_string(),
1080 },
1081 params: TracingParams::RPCTracer(RPCTracerParams {
1082 caller: Some(Bytes::from("0x0badc0ffee")),
1083 calldata: Bytes::from("0x0badc0ffee"),
1084 state_overrides: None,
1085 prune_addresses: None,
1086 }),
1087 },
1088 TracingResult {
1089 retriggers: HashSet::from([(
1090 Bytes::from("0x0badc0ffee"),
1091 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1092 )]),
1093 accessed_slots: HashMap::from([(
1094 Bytes::from("0x0badc0ffee"),
1095 HashSet::from([Bytes::from("0xbadbeef0")]),
1096 )]),
1097 },
1098 )],
1099 )]),
1100 pagination: PaginationResponse::new(0, 20, 0),
1101 }
1102 }
1103
1104 #[test_log::test(tokio::test)]
1105 async fn test_get_snapshots_vm() {
1106 let header = BlockHeader::default();
1107 let mut rpc = make_mock_client();
1108
1109 let traced_ep_response = traced_entry_point_response();
1110 rpc.expect_get_snapshots()
1111 .returning(move |_request, _chunk_size, _concurrency| {
1112 let vm_storage_accounts = state_snapshot_vm();
1113 Ok(Snapshot {
1114 states: [(
1115 "Component1".to_string(),
1116 ComponentWithState {
1117 state: ResponseProtocolState {
1118 component_id: "Component1".to_string(),
1119 ..Default::default()
1120 },
1121 component: ProtocolComponent {
1122 id: "Component1".to_string(),
1123 contract_ids: vec![
1124 Bytes::from("0x0badc0ffee"),
1125 Bytes::from("0xbabe42"),
1126 ],
1127 ..Default::default()
1128 },
1129 component_tvl: None,
1130 entrypoints: traced_ep_response
1131 .traced_entry_points
1132 .get("Component1")
1133 .cloned()
1134 .unwrap_or_default(),
1135 },
1136 )]
1137 .into_iter()
1138 .collect(),
1139 vm_storage: vm_storage_accounts
1140 .accounts
1141 .into_iter()
1142 .map(|state| (state.address.clone(), state))
1143 .collect(),
1144 })
1145 });
1146
1147 rpc.expect_get_traced_entry_points()
1148 .returning(|_| Ok(traced_entry_point_response()));
1149
1150 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1151 let component = ProtocolComponent {
1152 id: "Component1".to_string(),
1153 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1154 ..Default::default()
1155 };
1156 state_sync
1157 .component_tracker
1158 .components
1159 .insert("Component1".to_string(), component.clone());
1160 let components_arg = ["Component1".to_string()];
1161 let exp = StateSyncMessage {
1162 header: header.clone(),
1163 snapshots: Snapshot {
1164 states: [(
1165 component.id.clone(),
1166 ComponentWithState {
1167 state: ResponseProtocolState {
1168 component_id: "Component1".to_string(),
1169 ..Default::default()
1170 },
1171 component: component.clone(),
1172 component_tvl: None,
1173 entrypoints: vec![(
1174 EntryPointWithTracingParams {
1175 entry_point: EntryPoint {
1176 external_id: "entrypoint_a".to_string(),
1177 target: Bytes::from("0x0badc0ffee"),
1178 signature: "sig()".to_string(),
1179 },
1180 params: TracingParams::RPCTracer(RPCTracerParams {
1181 caller: Some(Bytes::from("0x0badc0ffee")),
1182 calldata: Bytes::from("0x0badc0ffee"),
1183 state_overrides: None,
1184 prune_addresses: None,
1185 }),
1186 },
1187 TracingResult {
1188 retriggers: HashSet::from([(
1189 Bytes::from("0x0badc0ffee"),
1190 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1191 )]),
1192 accessed_slots: HashMap::from([(
1193 Bytes::from("0x0badc0ffee"),
1194 HashSet::from([Bytes::from("0xbadbeef0")]),
1195 )]),
1196 },
1197 )],
1198 },
1199 )]
1200 .into_iter()
1201 .collect(),
1202 vm_storage: state_snapshot_vm()
1203 .accounts
1204 .into_iter()
1205 .map(|state| (state.address.clone(), state))
1206 .collect(),
1207 },
1208 deltas: None,
1209 removed_components: Default::default(),
1210 };
1211
1212 let snap = state_sync
1213 .get_snapshots(header, Some(&components_arg))
1214 .await
1215 .expect("Retrieving snapshot failed");
1216
1217 assert_eq!(snap, exp);
1218 }
1219
1220 #[test_log::test(tokio::test)]
1221 async fn test_get_snapshots_vm_with_tvl() {
1222 let header = BlockHeader::default();
1223 let mut rpc = make_mock_client();
1224 let component = ProtocolComponent {
1225 id: "Component1".to_string(),
1226 contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1227 ..Default::default()
1228 };
1229
1230 let component_clone = component.clone();
1231 rpc.expect_get_snapshots()
1232 .returning(move |_request, _chunk_size, _concurrency| {
1233 let vm_storage_accounts = state_snapshot_vm();
1234 Ok(Snapshot {
1235 states: [(
1236 "Component1".to_string(),
1237 ComponentWithState {
1238 state: ResponseProtocolState {
1239 component_id: "Component1".to_string(),
1240 ..Default::default()
1241 },
1242 component: component_clone.clone(),
1243 component_tvl: Some(100.0),
1244 entrypoints: vec![],
1245 },
1246 )]
1247 .into_iter()
1248 .collect(),
1249 vm_storage: vm_storage_accounts
1250 .accounts
1251 .into_iter()
1252 .map(|state| (state.address.clone(), state))
1253 .collect(),
1254 })
1255 });
1256
1257 rpc.expect_get_traced_entry_points()
1258 .returning(|_| {
1259 Ok(TracedEntryPointRequestResponse {
1260 traced_entry_points: HashMap::new(),
1261 pagination: PaginationResponse::new(0, 20, 0),
1262 })
1263 });
1264
1265 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1266 state_sync
1267 .component_tracker
1268 .components
1269 .insert("Component1".to_string(), component.clone());
1270 let components_arg = ["Component1".to_string()];
1271 let exp = StateSyncMessage {
1272 header: header.clone(),
1273 snapshots: Snapshot {
1274 states: [(
1275 component.id.clone(),
1276 ComponentWithState {
1277 state: ResponseProtocolState {
1278 component_id: "Component1".to_string(),
1279 ..Default::default()
1280 },
1281 component: component.clone(),
1282 component_tvl: Some(100.0),
1283 entrypoints: vec![],
1284 },
1285 )]
1286 .into_iter()
1287 .collect(),
1288 vm_storage: state_snapshot_vm()
1289 .accounts
1290 .into_iter()
1291 .map(|state| (state.address.clone(), state))
1292 .collect(),
1293 },
1294 deltas: None,
1295 removed_components: Default::default(),
1296 };
1297
1298 let snap = state_sync
1299 .get_snapshots(header, Some(&components_arg))
1300 .await
1301 .expect("Retrieving snapshot failed");
1302
1303 assert_eq!(snap, exp);
1304 }
1305
1306 #[test_log::test(tokio::test)]
1310 async fn test_get_snapshots_filters_to_requested_components_only() {
1311 let header = BlockHeader::default();
1312 let mut rpc = make_mock_client();
1313
1314 let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1316 let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1317 let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1318
1319 let component2_clone = component2.clone();
1320
1321 rpc.expect_get_snapshots()
1323 .withf(
1324 |request: &SnapshotParameters,
1325 _chunk_size: &Option<usize>,
1326 _concurrency: &usize| {
1327 request.components.len() == 1 &&
1329 request
1330 .components
1331 .contains_key("Component2")
1332 },
1333 )
1334 .times(1)
1335 .returning(move |_request, _chunk_size, _concurrency| {
1336 Ok(Snapshot {
1337 states: [(
1338 "Component2".to_string(),
1339 ComponentWithState {
1340 state: ResponseProtocolState {
1341 component_id: "Component2".to_string(),
1342 ..Default::default()
1343 },
1344 component: component2_clone.clone(),
1345 entrypoints: vec![],
1346 component_tvl: None,
1347 },
1348 )]
1349 .into_iter()
1350 .collect(),
1351 vm_storage: HashMap::new(),
1352 })
1353 });
1354
1355 rpc.expect_get_traced_entry_points()
1356 .returning(|_| {
1357 Ok(TracedEntryPointRequestResponse {
1358 traced_entry_points: HashMap::new(),
1359 pagination: PaginationResponse::new(0, 20, 0),
1360 })
1361 });
1362
1363 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1364
1365 state_sync
1367 .component_tracker
1368 .components
1369 .insert("Component1".to_string(), component1.clone());
1370 state_sync
1371 .component_tracker
1372 .components
1373 .insert("Component2".to_string(), component2.clone());
1374 state_sync
1375 .component_tracker
1376 .components
1377 .insert("Component3".to_string(), component3.clone());
1378
1379 let components_arg = ["Component2".to_string()];
1381
1382 let snap = state_sync
1383 .get_snapshots(header.clone(), Some(&components_arg))
1384 .await
1385 .expect("Retrieving snapshot failed");
1386
1387 assert_eq!(snap.snapshots.states.len(), 1);
1389 assert!(snap
1390 .snapshots
1391 .states
1392 .contains_key("Component2"));
1393 assert!(!snap
1394 .snapshots
1395 .states
1396 .contains_key("Component1"));
1397 assert!(!snap
1398 .snapshots
1399 .states
1400 .contains_key("Component3"));
1401 }
1402
1403 fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1404 let mut rpc_client = make_mock_client();
1405 rpc_client
1408 .expect_get_protocol_components()
1409 .with(mockall::predicate::function(
1410 move |request_params: &ProtocolComponentsRequestBody| {
1411 if let Some(ids) = request_params.component_ids.as_ref() {
1412 ids.contains(&"Component3".to_string())
1413 } else {
1414 false
1415 }
1416 },
1417 ))
1418 .returning(|_| {
1419 Ok(ProtocolComponentRequestResponse {
1421 protocol_components: vec![
1422 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1424 ],
1425 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1426 })
1427 });
1428 rpc_client
1430 .expect_get_snapshots()
1431 .withf(
1432 |request: &SnapshotParameters,
1433 _chunk_size: &Option<usize>,
1434 _concurrency: &usize| {
1435 request
1436 .components
1437 .contains_key("Component3")
1438 },
1439 )
1440 .returning(|_request, _chunk_size, _concurrency| {
1441 Ok(Snapshot {
1442 states: [(
1443 "Component3".to_string(),
1444 ComponentWithState {
1445 state: ResponseProtocolState {
1446 component_id: "Component3".to_string(),
1447 ..Default::default()
1448 },
1449 component: ProtocolComponent {
1450 id: "Component3".to_string(),
1451 ..Default::default()
1452 },
1453 component_tvl: Some(1000.0),
1454 entrypoints: vec![],
1455 },
1456 )]
1457 .into_iter()
1458 .collect(),
1459 vm_storage: HashMap::new(),
1460 })
1461 });
1462
1463 rpc_client
1465 .expect_get_protocol_components()
1466 .returning(|_| {
1467 Ok(ProtocolComponentRequestResponse {
1469 protocol_components: vec![
1470 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1472 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1474 ],
1476 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1477 })
1478 });
1479
1480 rpc_client
1481 .expect_get_snapshots()
1482 .returning(|_request, _chunk_size, _concurrency| {
1483 Ok(Snapshot {
1484 states: [
1485 (
1486 "Component1".to_string(),
1487 ComponentWithState {
1488 state: ResponseProtocolState {
1489 component_id: "Component1".to_string(),
1490 ..Default::default()
1491 },
1492 component: ProtocolComponent {
1493 id: "Component1".to_string(),
1494 ..Default::default()
1495 },
1496 component_tvl: Some(100.0),
1497 entrypoints: vec![],
1498 },
1499 ),
1500 (
1501 "Component2".to_string(),
1502 ComponentWithState {
1503 state: ResponseProtocolState {
1504 component_id: "Component2".to_string(),
1505 ..Default::default()
1506 },
1507 component: ProtocolComponent {
1508 id: "Component2".to_string(),
1509 ..Default::default()
1510 },
1511 component_tvl: Some(0.0),
1512 entrypoints: vec![],
1513 },
1514 ),
1515 ]
1516 .into_iter()
1517 .collect(),
1518 vm_storage: HashMap::new(),
1519 })
1520 });
1521
1522 rpc_client
1524 .expect_get_traced_entry_points()
1525 .returning(|_| {
1526 Ok(TracedEntryPointRequestResponse {
1527 traced_entry_points: HashMap::new(),
1528 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1529 })
1530 });
1531
1532 let mut deltas_client = MockDeltasClient::new();
1534 let (tx, rx) = channel(1);
1535 deltas_client
1536 .expect_subscribe()
1537 .return_once(move |_, _| {
1538 Ok((Uuid::default(), rx))
1540 });
1541
1542 deltas_client
1544 .expect_unsubscribe()
1545 .return_once(|_| Ok(()));
1546
1547 (rpc_client, deltas_client, tx)
1548 }
1549
1550 #[test_log::test(tokio::test)]
1557 async fn test_state_sync() {
1558 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1559 let deltas = [
1560 BlockChanges {
1561 extractor: "uniswap-v2".to_string(),
1562 chain: Chain::Ethereum,
1563 block: Block {
1564 number: 1,
1565 hash: Bytes::from("0x01"),
1566 parent_hash: Bytes::from("0x00"),
1567 chain: Chain::Ethereum,
1568 ts: Default::default(),
1569 },
1570 revert: false,
1571 dci_update: DCIUpdate {
1572 new_entrypoints: HashMap::from([(
1573 "Component1".to_string(),
1574 HashSet::from([EntryPoint {
1575 external_id: "entrypoint_a".to_string(),
1576 target: Bytes::from("0x0badc0ffee"),
1577 signature: "sig()".to_string(),
1578 }]),
1579 )]),
1580 new_entrypoint_params: HashMap::from([(
1581 "entrypoint_a".to_string(),
1582 HashSet::from([(
1583 TracingParams::RPCTracer(RPCTracerParams {
1584 caller: Some(Bytes::from("0x0badc0ffee")),
1585 calldata: Bytes::from("0x0badc0ffee"),
1586 state_overrides: None,
1587 prune_addresses: None,
1588 }),
1589 "Component1".to_string(),
1590 )]),
1591 )]),
1592 trace_results: HashMap::from([(
1593 "entrypoint_a".to_string(),
1594 TracingResult {
1595 retriggers: HashSet::from([(
1596 Bytes::from("0x0badc0ffee"),
1597 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1598 )]),
1599 accessed_slots: HashMap::from([(
1600 Bytes::from("0x0badc0ffee"),
1601 HashSet::from([Bytes::from("0xbadbeef0")]),
1602 )]),
1603 },
1604 )]),
1605 },
1606 ..Default::default()
1607 },
1608 BlockChanges {
1609 extractor: "uniswap-v2".to_string(),
1610 chain: Chain::Ethereum,
1611 block: Block {
1612 number: 2,
1613 hash: Bytes::from("0x02"),
1614 parent_hash: Bytes::from("0x01"),
1615 chain: Chain::Ethereum,
1616 ts: Default::default(),
1617 },
1618 revert: false,
1619 component_tvl: [
1620 ("Component1".to_string(), 100.0),
1621 ("Component2".to_string(), 0.0),
1622 ("Component3".to_string(), 1000.0),
1623 ]
1624 .into_iter()
1625 .collect(),
1626 ..Default::default()
1627 },
1628 ];
1629 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1630 state_sync
1631 .initialize()
1632 .await
1633 .expect("Init failed");
1634
1635 let (handle, mut rx) = state_sync.start().await;
1637 let (jh, close_tx) = handle.split();
1638 tx.send(deltas[0].clone())
1639 .await
1640 .expect("deltas channel msg 0 closed!");
1641 let first_msg = timeout(Duration::from_millis(100), rx.recv())
1642 .await
1643 .expect("waiting for first state msg timed out!")
1644 .expect("state sync block sender closed!");
1645 tx.send(deltas[1].clone())
1646 .await
1647 .expect("deltas channel msg 1 closed!");
1648 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1649 .await
1650 .expect("waiting for second state msg timed out!")
1651 .expect("state sync block sender closed!");
1652 let _ = close_tx.send(());
1653 jh.await
1654 .expect("state sync task panicked!");
1655
1656 let exp1 = StateSyncMessage {
1658 header: BlockHeader {
1659 number: 1,
1660 hash: Bytes::from("0x01"),
1661 parent_hash: Bytes::from("0x00"),
1662 revert: false,
1663 ..Default::default()
1664 },
1665 snapshots: Snapshot {
1666 states: [
1667 (
1668 "Component1".to_string(),
1669 ComponentWithState {
1670 state: ResponseProtocolState {
1671 component_id: "Component1".to_string(),
1672 ..Default::default()
1673 },
1674 component: ProtocolComponent {
1675 id: "Component1".to_string(),
1676 ..Default::default()
1677 },
1678 component_tvl: Some(100.0),
1679 entrypoints: vec![],
1680 },
1681 ),
1682 (
1683 "Component2".to_string(),
1684 ComponentWithState {
1685 state: ResponseProtocolState {
1686 component_id: "Component2".to_string(),
1687 ..Default::default()
1688 },
1689 component: ProtocolComponent {
1690 id: "Component2".to_string(),
1691 ..Default::default()
1692 },
1693 component_tvl: Some(0.0),
1694 entrypoints: vec![],
1695 },
1696 ),
1697 ]
1698 .into_iter()
1699 .collect(),
1700 vm_storage: HashMap::new(),
1701 },
1702 deltas: Some(deltas[0].clone()),
1703 removed_components: Default::default(),
1704 };
1705
1706 let exp2 = StateSyncMessage {
1707 header: BlockHeader {
1708 number: 2,
1709 hash: Bytes::from("0x02"),
1710 parent_hash: Bytes::from("0x01"),
1711 revert: false,
1712 ..Default::default()
1713 },
1714 snapshots: Snapshot {
1715 states: [
1716 (
1718 "Component3".to_string(),
1719 ComponentWithState {
1720 state: ResponseProtocolState {
1721 component_id: "Component3".to_string(),
1722 ..Default::default()
1723 },
1724 component: ProtocolComponent {
1725 id: "Component3".to_string(),
1726 ..Default::default()
1727 },
1728 component_tvl: Some(1000.0),
1729 entrypoints: vec![],
1730 },
1731 ),
1732 ]
1733 .into_iter()
1734 .collect(),
1735 vm_storage: HashMap::new(),
1736 },
1737 deltas: Some(BlockChanges {
1740 extractor: "uniswap-v2".to_string(),
1741 chain: Chain::Ethereum,
1742 block: Block {
1743 number: 2,
1744 hash: Bytes::from("0x02"),
1745 parent_hash: Bytes::from("0x01"),
1746 chain: Chain::Ethereum,
1747 ts: Default::default(),
1748 },
1749 revert: false,
1750 component_tvl: [
1751 ("Component1".to_string(), 100.0),
1753 ("Component3".to_string(), 1000.0),
1754 ]
1755 .into_iter()
1756 .collect(),
1757 ..Default::default()
1758 }),
1759 removed_components: [(
1761 "Component2".to_string(),
1762 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1763 )]
1764 .into_iter()
1765 .collect(),
1766 };
1767 assert_eq!(first_msg.unwrap(), exp1);
1768 assert_eq!(second_msg.unwrap(), exp2);
1769 }
1770
1771 #[test_log::test(tokio::test)]
1772 async fn test_state_sync_with_tvl_range() {
1773 let remove_tvl_threshold = 5.0;
1775 let add_tvl_threshold = 7.0;
1776
1777 let mut rpc_client = make_mock_client();
1778 let mut deltas_client = MockDeltasClient::new();
1779
1780 rpc_client
1781 .expect_get_protocol_components()
1782 .with(mockall::predicate::function(
1783 move |request_params: &ProtocolComponentsRequestBody| {
1784 if let Some(ids) = request_params.component_ids.as_ref() {
1785 ids.contains(&"Component3".to_string())
1786 } else {
1787 false
1788 }
1789 },
1790 ))
1791 .returning(|_| {
1792 Ok(ProtocolComponentRequestResponse {
1793 protocol_components: vec![ProtocolComponent {
1794 id: "Component3".to_string(),
1795 ..Default::default()
1796 }],
1797 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1798 })
1799 });
1800 rpc_client
1802 .expect_get_snapshots()
1803 .withf(
1804 |request: &SnapshotParameters,
1805 _chunk_size: &Option<usize>,
1806 _concurrency: &usize| {
1807 request
1808 .components
1809 .contains_key("Component3")
1810 },
1811 )
1812 .returning(|_request, _chunk_size, _concurrency| {
1813 Ok(Snapshot {
1814 states: [(
1815 "Component3".to_string(),
1816 ComponentWithState {
1817 state: ResponseProtocolState {
1818 component_id: "Component3".to_string(),
1819 ..Default::default()
1820 },
1821 component: ProtocolComponent {
1822 id: "Component3".to_string(),
1823 ..Default::default()
1824 },
1825 component_tvl: Some(10.0),
1826 entrypoints: vec![],
1827 },
1828 )]
1829 .into_iter()
1830 .collect(),
1831 vm_storage: HashMap::new(),
1832 })
1833 });
1834
1835 rpc_client
1837 .expect_get_protocol_components()
1838 .returning(|_| {
1839 Ok(ProtocolComponentRequestResponse {
1840 protocol_components: vec![
1841 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1842 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1843 ],
1844 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1845 })
1846 });
1847
1848 rpc_client
1850 .expect_get_snapshots()
1851 .returning(|_request, _chunk_size, _concurrency| {
1852 Ok(Snapshot {
1853 states: [
1854 (
1855 "Component1".to_string(),
1856 ComponentWithState {
1857 state: ResponseProtocolState {
1858 component_id: "Component1".to_string(),
1859 ..Default::default()
1860 },
1861 component: ProtocolComponent {
1862 id: "Component1".to_string(),
1863 ..Default::default()
1864 },
1865 component_tvl: Some(6.0),
1866 entrypoints: vec![],
1867 },
1868 ),
1869 (
1870 "Component2".to_string(),
1871 ComponentWithState {
1872 state: ResponseProtocolState {
1873 component_id: "Component2".to_string(),
1874 ..Default::default()
1875 },
1876 component: ProtocolComponent {
1877 id: "Component2".to_string(),
1878 ..Default::default()
1879 },
1880 component_tvl: Some(2.0),
1881 entrypoints: vec![],
1882 },
1883 ),
1884 ]
1885 .into_iter()
1886 .collect(),
1887 vm_storage: HashMap::new(),
1888 })
1889 });
1890
1891 rpc_client
1893 .expect_get_traced_entry_points()
1894 .returning(|_| {
1895 Ok(TracedEntryPointRequestResponse {
1896 traced_entry_points: HashMap::new(),
1897 pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1898 })
1899 });
1900
1901 let (tx, rx) = channel(1);
1902 deltas_client
1903 .expect_subscribe()
1904 .return_once(move |_, _| Ok((Uuid::default(), rx)));
1905
1906 deltas_client
1908 .expect_unsubscribe()
1909 .return_once(|_| Ok(()));
1910
1911 let mut state_sync = ProtocolStateSynchronizer::new(
1912 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1913 true,
1914 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1915 1,
1916 Duration::from_secs(0),
1917 true,
1918 true,
1919 true,
1920 ArcRPCClient(Arc::new(rpc_client)),
1921 ArcDeltasClient(Arc::new(deltas_client)),
1922 10_u64,
1923 );
1924 state_sync
1925 .initialize()
1926 .await
1927 .expect("Init failed");
1928
1929 let deltas = [
1931 BlockChanges {
1932 extractor: "uniswap-v2".to_string(),
1933 chain: Chain::Ethereum,
1934 block: Block {
1935 number: 1,
1936 hash: Bytes::from("0x01"),
1937 parent_hash: Bytes::from("0x00"),
1938 chain: Chain::Ethereum,
1939 ts: Default::default(),
1940 },
1941 revert: false,
1942 ..Default::default()
1943 },
1944 BlockChanges {
1945 extractor: "uniswap-v2".to_string(),
1946 chain: Chain::Ethereum,
1947 block: Block {
1948 number: 2,
1949 hash: Bytes::from("0x02"),
1950 parent_hash: Bytes::from("0x01"),
1951 chain: Chain::Ethereum,
1952 ts: Default::default(),
1953 },
1954 revert: false,
1955 component_tvl: [
1956 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
1960 .into_iter()
1961 .collect(),
1962 ..Default::default()
1963 },
1964 ];
1965
1966 let (handle, mut rx) = state_sync.start().await;
1967 let (jh, close_tx) = handle.split();
1968
1969 tx.send(deltas[0].clone())
1971 .await
1972 .expect("deltas channel msg 0 closed!");
1973
1974 let _ = timeout(Duration::from_millis(100), rx.recv())
1976 .await
1977 .expect("waiting for first state msg timed out!")
1978 .expect("state sync block sender closed!");
1979
1980 tx.send(deltas[1].clone())
1982 .await
1983 .expect("deltas channel msg 1 closed!");
1984 let second_msg = timeout(Duration::from_millis(100), rx.recv())
1985 .await
1986 .expect("waiting for second state msg timed out!")
1987 .expect("state sync block sender closed!")
1988 .expect("no error");
1989
1990 let _ = close_tx.send(());
1991 jh.await
1992 .expect("state sync task panicked!");
1993
1994 let expected_second_msg = StateSyncMessage {
1995 header: BlockHeader {
1996 number: 2,
1997 hash: Bytes::from("0x02"),
1998 parent_hash: Bytes::from("0x01"),
1999 revert: false,
2000 ..Default::default()
2001 },
2002 snapshots: Snapshot {
2003 states: [(
2004 "Component3".to_string(),
2005 ComponentWithState {
2006 state: ResponseProtocolState {
2007 component_id: "Component3".to_string(),
2008 ..Default::default()
2009 },
2010 component: ProtocolComponent {
2011 id: "Component3".to_string(),
2012 ..Default::default()
2013 },
2014 component_tvl: Some(10.0),
2015 entrypoints: vec![], },
2017 )]
2018 .into_iter()
2019 .collect(),
2020 vm_storage: HashMap::new(),
2021 },
2022 deltas: Some(BlockChanges {
2023 extractor: "uniswap-v2".to_string(),
2024 chain: Chain::Ethereum,
2025 block: Block {
2026 number: 2,
2027 hash: Bytes::from("0x02"),
2028 parent_hash: Bytes::from("0x01"),
2029 chain: Chain::Ethereum,
2030 ts: Default::default(),
2031 },
2032 revert: false,
2033 component_tvl: [
2034 ("Component1".to_string(), 6.0), ("Component3".to_string(), 10.0), ]
2037 .into_iter()
2038 .collect(),
2039 ..Default::default()
2040 }),
2041 removed_components: [(
2042 "Component2".to_string(),
2043 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2044 )]
2045 .into_iter()
2046 .collect(),
2047 };
2048
2049 assert_eq!(second_msg, expected_second_msg);
2050 }
2051
2052 #[test_log::test(tokio::test)]
2053 async fn test_public_close_api_functionality() {
2054 let mut rpc_client = make_mock_client();
2061 let mut deltas_client = MockDeltasClient::new();
2062
2063 rpc_client
2065 .expect_get_protocol_components()
2066 .returning(|_| {
2067 Ok(ProtocolComponentRequestResponse {
2068 protocol_components: vec![],
2069 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2070 })
2071 });
2072
2073 let (_tx, rx) = channel(1);
2075 deltas_client
2076 .expect_subscribe()
2077 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2078
2079 deltas_client
2081 .expect_unsubscribe()
2082 .return_once(|_| Ok(()));
2083
2084 let mut state_sync = ProtocolStateSynchronizer::new(
2085 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2086 true,
2087 ComponentFilter::with_tvl_range(0.0, 0.0),
2088 5, Duration::from_secs(0),
2090 true,
2091 false,
2092 true,
2093 ArcRPCClient(Arc::new(rpc_client)),
2094 ArcDeltasClient(Arc::new(deltas_client)),
2095 10000_u64, );
2097
2098 state_sync
2099 .initialize()
2100 .await
2101 .expect("Init should succeed");
2102
2103 let (handle, _rx) = state_sync.start().await;
2105 let (jh, close_tx) = handle.split();
2106
2107 tokio::time::sleep(Duration::from_millis(100)).await;
2109
2110 close_tx
2112 .send(())
2113 .expect("Should be able to send close signal");
2114 jh.await.expect("Task should not panic");
2116 }
2117
2118 #[test_log::test(tokio::test)]
2119 async fn test_cleanup_runs_when_state_sync_processing_errors() {
2120 let mut rpc_client = make_mock_client();
2125 let mut deltas_client = MockDeltasClient::new();
2126
2127 rpc_client
2129 .expect_get_protocol_components()
2130 .returning(|_| {
2131 Ok(ProtocolComponentRequestResponse {
2132 protocol_components: vec![],
2133 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2134 })
2135 });
2136
2137 rpc_client
2139 .expect_get_protocol_states()
2140 .returning(|_| {
2141 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2142 });
2143
2144 let (tx, rx) = channel(10);
2146 deltas_client
2147 .expect_subscribe()
2148 .return_once(move |_, _| {
2149 let delta = BlockChanges {
2151 extractor: "test".to_string(),
2152 chain: Chain::Ethereum,
2153 block: Block {
2154 hash: Bytes::from("0x0123"),
2155 number: 1,
2156 parent_hash: Bytes::from("0x0000"),
2157 chain: Chain::Ethereum,
2158 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2159 .unwrap()
2160 .naive_utc(),
2161 },
2162 revert: false,
2163 new_protocol_components: [(
2165 "new_component".to_string(),
2166 ProtocolComponent {
2167 id: "new_component".to_string(),
2168 protocol_system: "test_protocol".to_string(),
2169 protocol_type_name: "test".to_string(),
2170 chain: Chain::Ethereum,
2171 tokens: vec![Bytes::from("0x0badc0ffee")],
2172 contract_ids: vec![Bytes::from("0x0badc0ffee")],
2173 static_attributes: Default::default(),
2174 creation_tx: Default::default(),
2175 created_at: Default::default(),
2176 change: Default::default(),
2177 },
2178 )]
2179 .into_iter()
2180 .collect(),
2181 component_tvl: [("new_component".to_string(), 100.0)]
2182 .into_iter()
2183 .collect(),
2184 ..Default::default()
2185 };
2186
2187 tokio::spawn(async move {
2188 let _ = tx.send(delta).await;
2189 });
2191
2192 Ok((Uuid::default(), rx))
2193 });
2194
2195 deltas_client
2197 .expect_unsubscribe()
2198 .return_once(|_| Ok(()));
2199
2200 let mut state_sync = ProtocolStateSynchronizer::new(
2201 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2202 true,
2203 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2205 Duration::from_secs(0),
2206 true,
2207 false,
2208 true,
2209 ArcRPCClient(Arc::new(rpc_client)),
2210 ArcDeltasClient(Arc::new(deltas_client)),
2211 5000_u64,
2212 );
2213
2214 state_sync
2215 .initialize()
2216 .await
2217 .expect("Init should succeed");
2218
2219 state_sync.last_synced_block = Some(BlockHeader {
2221 hash: Bytes::from("0x0badc0ffee"),
2222 number: 42,
2223 parent_hash: Bytes::from("0xbadbeef0"),
2224 revert: false,
2225 timestamp: 123456789,
2226 partial_block_index: None,
2227 });
2228
2229 let (mut block_tx, _block_rx) = channel(10);
2231
2232 let (_end_tx, end_rx) = oneshot::channel::<()>();
2234 let result = state_sync
2235 .state_sync(&mut block_tx, end_rx)
2236 .await;
2237 assert!(result.is_err(), "state_sync should have errored during processing");
2239
2240 }
2243
2244 #[test_log::test(tokio::test)]
2245 async fn test_close_signal_while_waiting_for_first_deltas() {
2246 let mut rpc_client = make_mock_client();
2250 let mut deltas_client = MockDeltasClient::new();
2251
2252 rpc_client
2253 .expect_get_protocol_components()
2254 .returning(|_| {
2255 Ok(ProtocolComponentRequestResponse {
2256 protocol_components: vec![],
2257 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2258 })
2259 });
2260
2261 let (_tx, rx) = channel(1);
2262 deltas_client
2263 .expect_subscribe()
2264 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2265
2266 deltas_client
2267 .expect_unsubscribe()
2268 .return_once(|_| Ok(()));
2269
2270 let mut state_sync = ProtocolStateSynchronizer::new(
2271 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2272 true,
2273 ComponentFilter::with_tvl_range(0.0, 0.0),
2274 1,
2275 Duration::from_secs(0),
2276 true,
2277 true,
2278 false,
2279 ArcRPCClient(Arc::new(rpc_client)),
2280 ArcDeltasClient(Arc::new(deltas_client)),
2281 10000_u64,
2282 );
2283
2284 state_sync
2285 .initialize()
2286 .await
2287 .expect("Init should succeed");
2288
2289 let (mut block_tx, _block_rx) = channel(10);
2290 let (end_tx, end_rx) = oneshot::channel::<()>();
2291
2292 let state_sync_handle = tokio::spawn(async move {
2294 state_sync
2295 .state_sync(&mut block_tx, end_rx)
2296 .await
2297 });
2298
2299 tokio::time::sleep(Duration::from_millis(100)).await;
2301
2302 let _ = end_tx.send(());
2304
2305 let result = state_sync_handle
2307 .await
2308 .expect("Task should not panic");
2309 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2310
2311 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2312 }
2313
2314 #[test_log::test(tokio::test)]
2315 async fn test_close_signal_during_main_processing_loop() {
2316 let mut rpc_client = make_mock_client();
2322 let mut deltas_client = MockDeltasClient::new();
2323
2324 rpc_client
2326 .expect_get_protocol_components()
2327 .returning(|_| {
2328 Ok(ProtocolComponentRequestResponse {
2329 protocol_components: vec![],
2330 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2331 })
2332 });
2333
2334 rpc_client
2336 .expect_get_protocol_states()
2337 .returning(|_| {
2338 Ok(ProtocolStateRequestResponse {
2339 states: vec![],
2340 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2341 })
2342 });
2343
2344 rpc_client
2345 .expect_get_component_tvl()
2346 .returning(|_| {
2347 Ok(ComponentTvlRequestResponse {
2348 tvl: HashMap::new(),
2349 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2350 })
2351 });
2352
2353 rpc_client
2354 .expect_get_traced_entry_points()
2355 .returning(|_| {
2356 Ok(TracedEntryPointRequestResponse {
2357 traced_entry_points: HashMap::new(),
2358 pagination: PaginationResponse::new(0, 20, 0),
2359 })
2360 });
2361
2362 let (tx, rx) = channel(10);
2364 deltas_client
2365 .expect_subscribe()
2366 .return_once(move |_, _| {
2367 let first_delta = BlockChanges {
2369 extractor: "test".to_string(),
2370 chain: Chain::Ethereum,
2371 block: Block {
2372 hash: Bytes::from("0x0123"),
2373 number: 1,
2374 parent_hash: Bytes::from("0x0000"),
2375 chain: Chain::Ethereum,
2376 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2377 .unwrap()
2378 .naive_utc(),
2379 },
2380 revert: false,
2381 ..Default::default()
2382 };
2383
2384 tokio::spawn(async move {
2385 let _ = tx.send(first_delta).await;
2386 tokio::time::sleep(Duration::from_secs(30)).await;
2389 });
2390
2391 Ok((Uuid::default(), rx))
2392 });
2393
2394 deltas_client
2395 .expect_unsubscribe()
2396 .return_once(|_| Ok(()));
2397
2398 let mut state_sync = ProtocolStateSynchronizer::new(
2399 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2400 true,
2401 ComponentFilter::with_tvl_range(0.0, 1000.0),
2402 1,
2403 Duration::from_secs(0),
2404 true,
2405 false,
2406 true,
2407 ArcRPCClient(Arc::new(rpc_client)),
2408 ArcDeltasClient(Arc::new(deltas_client)),
2409 10000_u64,
2410 );
2411
2412 state_sync
2413 .initialize()
2414 .await
2415 .expect("Init should succeed");
2416
2417 let (mut block_tx, mut block_rx) = channel(10);
2418 let (end_tx, end_rx) = oneshot::channel::<()>();
2419
2420 let state_sync_handle = tokio::spawn(async move {
2422 state_sync
2423 .state_sync(&mut block_tx, end_rx)
2424 .await
2425 });
2426
2427 let first_snapshot = block_rx
2429 .recv()
2430 .await
2431 .expect("Should receive first snapshot")
2432 .expect("Synchronizer error");
2433 assert!(
2434 !first_snapshot
2435 .snapshots
2436 .states
2437 .is_empty() ||
2438 first_snapshot.deltas.is_some()
2439 );
2440 let _ = end_tx.send(());
2442
2443 let result = state_sync_handle
2445 .await
2446 .expect("Task should not panic");
2447 assert!(
2448 result.is_ok(),
2449 "state_sync should exit cleanly when closed after first message: {result:?}"
2450 );
2451 }
2452
2453 #[test_log::test(tokio::test)]
2454 async fn test_max_retries_exceeded_error_propagation() {
2455 let mut rpc_client = make_mock_client();
2459 let mut deltas_client = MockDeltasClient::new();
2460
2461 rpc_client
2463 .expect_get_protocol_components()
2464 .returning(|_| {
2465 Ok(ProtocolComponentRequestResponse {
2466 protocol_components: vec![],
2467 pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2468 })
2469 });
2470
2471 deltas_client
2474 .expect_subscribe()
2475 .returning(|_, _| {
2476 Err(DeltasError::NotConnected)
2478 });
2479
2480 deltas_client
2482 .expect_unsubscribe()
2483 .returning(|_| Ok(()))
2484 .times(0..=5);
2485
2486 let mut state_sync = ProtocolStateSynchronizer::new(
2488 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2489 true,
2490 ComponentFilter::with_tvl_range(0.0, 1000.0),
2491 2, Duration::from_millis(10), true,
2494 false,
2495 true,
2496 ArcRPCClient(Arc::new(rpc_client)),
2497 ArcDeltasClient(Arc::new(deltas_client)),
2498 1000_u64,
2499 );
2500
2501 state_sync
2502 .initialize()
2503 .await
2504 .expect("Init should succeed");
2505
2506 let (handle, mut rx) = state_sync.start().await;
2508 let (jh, _close_tx) = handle.split();
2509
2510 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2511 .await
2512 .expect("responsds in time")
2513 .expect("channel open");
2514
2515 if let Err(err) = res {
2517 assert!(
2518 matches!(err, SynchronizerError::ConnectionClosed),
2519 "Expected ConnectionClosed error, got: {:?}",
2520 err
2521 );
2522 } else {
2523 panic!("Expected an error")
2524 }
2525
2526 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2528 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2529 }
2530
2531 #[test_log::test(tokio::test)]
2532 async fn test_is_next_expected() {
2533 let mut state_sync = with_mocked_clients(true, false, None, None);
2537
2538 let incoming_header = BlockHeader {
2540 number: 100,
2541 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2542 parent_hash: Bytes::from(
2543 "0x0000000000000000000000000000000000000000000000000000000000000000",
2544 ),
2545 revert: false,
2546 timestamp: 123456789,
2547 partial_block_index: None,
2548 };
2549 assert!(
2550 !state_sync.is_next_expected(&incoming_header),
2551 "Should return false when no previous block is set"
2552 );
2553
2554 let previous_header = BlockHeader {
2556 number: 99,
2557 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2558 parent_hash: Bytes::from(
2559 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2560 ),
2561 revert: false,
2562 timestamp: 123456788,
2563 partial_block_index: None,
2564 };
2565 state_sync.last_synced_block = Some(previous_header.clone());
2566
2567 assert!(
2568 state_sync.is_next_expected(&incoming_header),
2569 "Should return true when incoming parent_hash matches previous hash"
2570 );
2571
2572 let non_matching_header = BlockHeader {
2574 number: 100,
2575 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2576 parent_hash: Bytes::from(
2577 "0x1111111111111111111111111111111111111111111111111111111111111111",
2578 ), revert: false,
2580 timestamp: 123456789,
2581 partial_block_index: None,
2582 };
2583 assert!(
2584 !state_sync.is_next_expected(&non_matching_header),
2585 "Should return false when incoming parent_hash doesn't match previous hash"
2586 );
2587 }
2588
2589 #[test_log::test(tokio::test)]
2590 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2591 let mut rpc_client = make_mock_client();
2595 let mut deltas_client = MockDeltasClient::new();
2596
2597 rpc_client
2599 .expect_get_protocol_components()
2600 .returning(|_| {
2601 Ok(ProtocolComponentRequestResponse {
2602 protocol_components: vec![ProtocolComponent {
2603 id: "Component1".to_string(),
2604 ..Default::default()
2605 }],
2606 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2607 })
2608 });
2609
2610 let (tx, rx) = channel(10);
2612 deltas_client
2613 .expect_subscribe()
2614 .return_once(move |_, _| {
2615 let expected_next_delta = BlockChanges {
2616 extractor: "uniswap-v2".to_string(),
2617 chain: Chain::Ethereum,
2618 block: Block {
2619 hash: Bytes::from(
2620 "0x0000000000000000000000000000000000000000000000000000000000000002",
2621 ), number: 2,
2623 parent_hash: Bytes::from(
2624 "0x0000000000000000000000000000000000000000000000000000000000000001",
2625 ), chain: Chain::Ethereum,
2627 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2628 .unwrap()
2629 .naive_utc(),
2630 },
2631 revert: false,
2632 ..Default::default()
2633 };
2634
2635 tokio::spawn(async move {
2636 let _ = tx.send(expected_next_delta).await;
2637 });
2638
2639 Ok((Uuid::default(), rx))
2640 });
2641
2642 deltas_client
2643 .expect_unsubscribe()
2644 .return_once(|_| Ok(()));
2645
2646 let mut state_sync = ProtocolStateSynchronizer::new(
2647 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2648 true,
2649 ComponentFilter::with_tvl_range(0.0, 1000.0),
2650 1,
2651 Duration::from_secs(0),
2652 true, false,
2654 true,
2655 ArcRPCClient(Arc::new(rpc_client)),
2656 ArcDeltasClient(Arc::new(deltas_client)),
2657 10000_u64,
2658 );
2659
2660 state_sync
2662 .initialize()
2663 .await
2664 .expect("Init should succeed");
2665
2666 state_sync.last_synced_block = Some(BlockHeader {
2668 number: 1,
2669 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
2671 "0x0000000000000000000000000000000000000000000000000000000000000000",
2672 ),
2673 revert: false,
2674 timestamp: 123456789,
2675 partial_block_index: None,
2676 });
2677
2678 let (mut block_tx, mut block_rx) = channel(10);
2679 let (end_tx, end_rx) = oneshot::channel::<()>();
2680
2681 let state_sync_handle = tokio::spawn(async move {
2683 state_sync
2684 .state_sync(&mut block_tx, end_rx)
2685 .await
2686 });
2687
2688 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2690 .await
2691 .expect("Should receive message within timeout")
2692 .expect("Channel should be open")
2693 .expect("Should not be an error");
2694
2695 let _ = end_tx.send(());
2697
2698 let _ = state_sync_handle
2700 .await
2701 .expect("Task should not panic");
2702
2703 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2706 assert!(
2707 result_msg.snapshots.states.is_empty(),
2708 "Should not contain snapshots when next expected block is received"
2709 );
2710
2711 if let Some(deltas) = &result_msg.deltas {
2713 assert_eq!(deltas.block.number, 2);
2714 assert_eq!(
2715 deltas.block.hash,
2716 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2717 );
2718 assert_eq!(
2719 deltas.block.parent_hash,
2720 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2721 );
2722 }
2723 }
2724
2725 #[test_log::test(tokio::test)]
2726 async fn test_skip_previously_processed_messages() {
2727 let mut rpc_client = make_mock_client();
2731 let mut deltas_client = MockDeltasClient::new();
2732
2733 rpc_client
2735 .expect_get_protocol_components()
2736 .returning(|_| {
2737 Ok(ProtocolComponentRequestResponse {
2738 protocol_components: vec![ProtocolComponent {
2739 id: "Component1".to_string(),
2740 ..Default::default()
2741 }],
2742 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2743 })
2744 });
2745
2746 rpc_client
2748 .expect_get_protocol_states()
2749 .returning(|_| {
2750 Ok(ProtocolStateRequestResponse {
2751 states: vec![ResponseProtocolState {
2752 component_id: "Component1".to_string(),
2753 ..Default::default()
2754 }],
2755 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2756 })
2757 });
2758
2759 rpc_client
2760 .expect_get_component_tvl()
2761 .returning(|_| {
2762 Ok(ComponentTvlRequestResponse {
2763 tvl: [("Component1".to_string(), 100.0)]
2764 .into_iter()
2765 .collect(),
2766 pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2767 })
2768 });
2769
2770 rpc_client
2771 .expect_get_traced_entry_points()
2772 .returning(|_| {
2773 Ok(TracedEntryPointRequestResponse {
2774 traced_entry_points: HashMap::new(),
2775 pagination: PaginationResponse::new(0, 20, 0),
2776 })
2777 });
2778
2779 let (tx, rx) = channel(10);
2781 deltas_client
2782 .expect_subscribe()
2783 .return_once(move |_, _| {
2784 let old_messages = vec![
2786 BlockChanges {
2787 extractor: "uniswap-v2".to_string(),
2788 chain: Chain::Ethereum,
2789 block: Block {
2790 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2791 number: 3,
2792 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2793 chain: Chain::Ethereum,
2794 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2795 },
2796 revert: false,
2797 ..Default::default()
2798 },
2799 BlockChanges {
2800 extractor: "uniswap-v2".to_string(),
2801 chain: Chain::Ethereum,
2802 block: Block {
2803 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2804 number: 4,
2805 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2806 chain: Chain::Ethereum,
2807 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2808 },
2809 revert: false,
2810 ..Default::default()
2811 },
2812 BlockChanges {
2813 extractor: "uniswap-v2".to_string(),
2814 chain: Chain::Ethereum,
2815 block: Block {
2816 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2817 number: 5,
2818 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2819 chain: Chain::Ethereum,
2820 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2821 },
2822 revert: false,
2823 ..Default::default()
2824 },
2825 BlockChanges {
2827 extractor: "uniswap-v2".to_string(),
2828 chain: Chain::Ethereum,
2829 block: Block {
2830 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2831 number: 6,
2832 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2833 chain: Chain::Ethereum,
2834 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2835 },
2836 revert: false,
2837 ..Default::default()
2838 },
2839 ];
2840
2841 tokio::spawn(async move {
2842 for message in old_messages {
2843 let _ = tx.send(message).await;
2844 tokio::time::sleep(Duration::from_millis(10)).await;
2845 }
2846 });
2847
2848 Ok((Uuid::default(), rx))
2849 });
2850
2851 deltas_client
2852 .expect_unsubscribe()
2853 .return_once(|_| Ok(()));
2854
2855 let mut state_sync = ProtocolStateSynchronizer::new(
2856 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2857 true,
2858 ComponentFilter::with_tvl_range(0.0, 1000.0),
2859 1,
2860 Duration::from_secs(0),
2861 true,
2862 true,
2863 true,
2864 ArcRPCClient(Arc::new(rpc_client)),
2865 ArcDeltasClient(Arc::new(deltas_client)),
2866 10000_u64,
2867 );
2868
2869 state_sync
2871 .initialize()
2872 .await
2873 .expect("Init should succeed");
2874
2875 state_sync.last_synced_block = Some(BlockHeader {
2876 number: 5,
2877 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2878 parent_hash: Bytes::from(
2879 "0x0000000000000000000000000000000000000000000000000000000000000004",
2880 ),
2881 revert: false,
2882 timestamp: 1234567892,
2883 partial_block_index: None,
2884 });
2885
2886 let (mut block_tx, mut block_rx) = channel(10);
2887 let (end_tx, end_rx) = oneshot::channel::<()>();
2888
2889 let state_sync_handle = tokio::spawn(async move {
2891 state_sync
2892 .state_sync(&mut block_tx, end_rx)
2893 .await
2894 });
2895
2896 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2898 .await
2899 .expect("Should receive message within timeout")
2900 .expect("Channel should be open")
2901 .expect("Should not be an error");
2902
2903 let _ = end_tx.send(());
2905
2906 let _ = state_sync_handle
2908 .await
2909 .expect("Task should not panic");
2910
2911 assert!(result_msg.deltas.is_some(), "Should contain deltas");
2913 if let Some(deltas) = &result_msg.deltas {
2914 assert_eq!(
2915 deltas.block.number, 6,
2916 "Should only process block 6, skipping earlier blocks"
2917 );
2918 assert_eq!(
2919 deltas.block.hash,
2920 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2921 );
2922 }
2923
2924 match timeout(Duration::from_millis(50), block_rx.recv()).await {
2927 Err(_) => {
2928 }
2930 Ok(Some(Err(_))) => {
2931 }
2933 Ok(Some(Ok(_))) => {
2934 panic!("Should not receive additional messages - old blocks should be skipped");
2935 }
2936 Ok(None) => {
2937 }
2939 }
2940 }
2941
2942 fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
2943 let hash = Bytes::from(vec![block_num as u8; 32]);
2945 let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
2946 BlockChanges {
2947 extractor: "uniswap-v2".to_string(),
2948 chain: Chain::Ethereum,
2949 block: Block {
2950 number: block_num,
2951 hash,
2952 parent_hash,
2953 chain: Chain::Ethereum,
2954 ts: Default::default(),
2955 },
2956 revert: false,
2957 partial_block_index: partial_idx,
2958 ..Default::default()
2959 }
2960 }
2961
2962 #[test_log::test(tokio::test)]
2964 async fn test_partial_mode_accepts_full_block_as_first_message() {
2965 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2966 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2967 .with_partial_blocks(true);
2968 state_sync
2969 .initialize()
2970 .await
2971 .expect("Init failed");
2972
2973 let (handle, mut block_rx) = state_sync.start().await;
2974 let (jh, close_tx) = handle.split();
2975
2976 tx.send(make_block_changes(1, None))
2978 .await
2979 .unwrap();
2980
2981 let msg = timeout(Duration::from_millis(100), block_rx.recv())
2983 .await
2984 .expect("Should receive message")
2985 .expect("Channel open")
2986 .expect("No error");
2987
2988 assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
2989 assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
2990
2991 let _ = close_tx.send(());
2992 jh.await.expect("Task should not panic");
2993 }
2994
2995 #[test_log::test(tokio::test)]
2997 async fn test_partial_mode_detects_block_number_increase() {
2998 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2999 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3000 .with_partial_blocks(true);
3001 state_sync
3002 .initialize()
3003 .await
3004 .expect("Init failed");
3005
3006 let (handle, mut block_rx) = state_sync.start().await;
3007 let (jh, close_tx) = handle.split();
3008
3009 tx.send(make_block_changes(1, Some(0)))
3011 .await
3012 .unwrap();
3013 tx.send(make_block_changes(1, Some(3)))
3014 .await
3015 .unwrap();
3016
3017 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3019 Err(_) => { }
3020 Ok(_) => panic!("Should not receive message while waiting for new block"),
3021 }
3022
3023 tx.send(make_block_changes(2, Some(5)))
3026 .await
3027 .unwrap();
3028
3029 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3031 .await
3032 .expect("Should receive message")
3033 .expect("Channel open")
3034 .expect("No error");
3035
3036 assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3037 assert_eq!(msg.header.partial_block_index, Some(5));
3038
3039 let _ = close_tx.send(());
3040 jh.await.expect("Task should not panic");
3041 }
3042
3043 #[test_log::test(tokio::test)]
3045 async fn test_partial_mode_skips_already_synced_blocks() {
3046 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3047 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3048 .with_partial_blocks(true);
3049 state_sync
3050 .initialize()
3051 .await
3052 .expect("Init failed");
3053
3054 state_sync.last_synced_block = Some(BlockHeader {
3056 number: 5,
3057 hash: Bytes::from("0x05"),
3058 parent_hash: Bytes::from("0x04"),
3059 revert: false,
3060 timestamp: 0,
3061 partial_block_index: None,
3062 });
3063
3064 let (handle, mut block_rx) = state_sync.start().await;
3065 let (jh, close_tx) = handle.split();
3066
3067 tx.send(make_block_changes(3, Some(2)))
3069 .await
3070 .unwrap();
3071
3072 tx.send(make_block_changes(4, Some(0)))
3074 .await
3075 .unwrap();
3076
3077 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3079 Err(_) => { }
3080 Ok(_) => panic!("Should skip block 4 because it's already synced"),
3081 }
3082
3083 tx.send(make_block_changes(5, Some(3)))
3086 .await
3087 .unwrap();
3088 tx.send(make_block_changes(6, Some(0)))
3090 .await
3091 .unwrap();
3092
3093 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3094 .await
3095 .expect("Should receive message")
3096 .expect("Channel open")
3097 .expect("No error");
3098
3099 assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3100
3101 let _ = close_tx.send(());
3102 jh.await.expect("Task should not panic");
3103 }
3104
3105 #[test_log::test(tokio::test)]
3106 async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3107 let header = BlockHeader::default();
3108 let mut rpc = make_mock_client();
3109 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3110
3111 let component_clone = component.clone();
3112 rpc.expect_get_snapshots()
3113 .returning(move |_request, _chunk_size, _concurrency| {
3114 Ok(Snapshot {
3115 states: [(
3116 "Component1".to_string(),
3117 ComponentWithState {
3118 state: ResponseProtocolState {
3119 component_id: "Component1".to_string(),
3120 ..Default::default()
3121 },
3122 component: component_clone.clone(),
3123 entrypoints: vec![],
3124 component_tvl: None,
3125 },
3126 )]
3127 .into_iter()
3128 .collect(),
3129 vm_storage: HashMap::new(),
3130 })
3131 });
3132
3133 rpc.expect_get_traced_entry_points()
3135 .never();
3136
3137 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3138 state_sync
3140 .component_tracker
3141 .components
3142 .insert("Component1".to_string(), component);
3143
3144 let components_arg = ["Component1".to_string()];
3145 let snap = state_sync
3146 .get_snapshots(header, Some(&components_arg))
3147 .await
3148 .expect("Retrieving snapshot failed");
3149
3150 assert!(snap
3151 .snapshots
3152 .states
3153 .contains_key("Component1"));
3154 }
3155
3156 #[test_log::test(tokio::test)]
3157 async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3158 let header = BlockHeader::default();
3159 let mut rpc = make_mock_client();
3160 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3161
3162 let component_clone = component.clone();
3163 rpc.expect_get_snapshots()
3164 .returning(move |_request, _chunk_size, _concurrency| {
3165 Ok(Snapshot {
3166 states: [(
3167 "Component1".to_string(),
3168 ComponentWithState {
3169 state: ResponseProtocolState {
3170 component_id: "Component1".to_string(),
3171 ..Default::default()
3172 },
3173 component: component_clone.clone(),
3174 entrypoints: vec![],
3175 component_tvl: None,
3176 },
3177 )]
3178 .into_iter()
3179 .collect(),
3180 vm_storage: HashMap::new(),
3181 })
3182 });
3183
3184 rpc.expect_get_traced_entry_points()
3186 .times(1)
3187 .returning(|_| {
3188 Ok(TracedEntryPointRequestResponse {
3189 traced_entry_points: HashMap::new(),
3190 pagination: PaginationResponse::new(0, 20, 0),
3191 })
3192 });
3193
3194 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3195 state_sync
3196 .component_tracker
3197 .components
3198 .insert("Component1".to_string(), component);
3199
3200 let components_arg = ["Component1".to_string()];
3201 let snap = state_sync
3202 .get_snapshots(header, Some(&components_arg))
3203 .await
3204 .expect("Retrieving snapshot failed");
3205
3206 assert!(snap
3207 .snapshots
3208 .states
3209 .contains_key("Component1"));
3210 }
3211}