1use std::{
2 collections::{HashMap, HashSet},
3 time::Duration,
4};
5
6use async_trait::async_trait;
7use thiserror::Error;
8use tokio::{
9 select,
10 sync::{
11 mpsc::{channel, error::SendError, Receiver, Sender},
12 oneshot,
13 },
14 task::JoinHandle,
15 time::{sleep, timeout},
16};
17use tracing::{debug, error, info, instrument, warn};
18use tycho_common::{
19 models::{
20 blockchain::{
21 BlockAggregatedChanges, DCIUpdate, EntryPointWithTracingParams, TracingResult,
22 },
23 contract::Account,
24 protocol::{ProtocolComponent, ProtocolComponentState},
25 Chain, ExtractorIdentity,
26 },
27 Bytes,
28};
29
30use crate::{
31 deltas::{DeltasClient, SubscriptionOptions},
32 feed::{
33 component_tracker::{ComponentFilter, ComponentTracker},
34 BlockHeader, HeaderLike,
35 },
36 rpc::{
37 RPCClient, RPCError, SnapshotParameters, TracedEntryPointsPaginatedParams,
38 RPC_CLIENT_CONCURRENCY,
39 },
40 DeltasError,
41};
42
43#[derive(Error, Debug)]
44pub enum SynchronizerError {
45 #[error("RPC error: {0}")]
47 RPCError(#[from] RPCError),
48
49 #[error("{0}")]
51 ChannelError(String),
52
53 #[error("Timeout error: {0}")]
55 Timeout(String),
56
57 #[error("Failed to close synchronizer: {0}")]
59 CloseError(String),
60
61 #[error("Connection error: {0}")]
63 ConnectionError(String),
64
65 #[error("Connection closed")]
67 ConnectionClosed,
68
69 #[error("Internal error: {0}")]
71 Internal(String),
72}
73
74pub type SyncResult<T> = Result<T, SynchronizerError>;
75
76impl SynchronizerError {
77 pub fn is_transient(&self) -> bool {
85 match self {
86 SynchronizerError::RPCError(e) => matches!(
87 e,
88 RPCError::HttpClient(_, _) |
89 RPCError::RateLimited(_) |
90 RPCError::ServerUnreachable(_) |
91 RPCError::StaleBlock(_)
92 ),
93 SynchronizerError::Timeout(_) |
94 SynchronizerError::ConnectionError(_) |
95 SynchronizerError::ConnectionClosed => true,
96 _ => false,
97 }
98 }
99}
100
101impl<T> From<SendError<T>> for SynchronizerError {
102 fn from(err: SendError<T>) -> Self {
103 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
104 }
105}
106
107impl From<DeltasError> for SynchronizerError {
108 fn from(err: DeltasError) -> Self {
109 match err {
110 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
111 _ => SynchronizerError::ConnectionError(err.to_string()),
112 }
113 }
114}
115
116pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
117 extractor_id: ExtractorIdentity,
118 retrieve_balances: bool,
119 rpc_client: R,
120 deltas_client: D,
121 max_retries: u64,
122 retry_cooldown: Duration,
123 include_snapshots: bool,
124 component_tracker: ComponentTracker<R>,
125 last_synced_block: Option<BlockHeader>,
126 timeout: u64,
127 include_tvl: bool,
128 compression: bool,
129 partial_blocks: bool,
130 uses_dci: bool,
131 snapshot_tasks: Vec<SnapshotTask>,
135 buffered_deltas: Vec<BlockAggregatedChanges>,
139 snapshot_queue: HashMap<String, SnapshotStatus>,
143}
144
145#[derive(Clone, PartialEq, Debug)]
146pub struct ComponentWithState {
147 pub state: ProtocolComponentState,
148 pub component: ProtocolComponent,
149 pub component_tvl: Option<f64>,
150 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
151}
152
153#[derive(Clone, PartialEq, Debug, Default)]
154pub struct Snapshot {
155 pub states: HashMap<String, ComponentWithState>,
156 pub vm_storage: HashMap<Bytes, Account>,
157}
158
159impl Snapshot {
160 fn extend(&mut self, other: Snapshot) {
161 self.states.extend(other.states);
162 self.vm_storage.extend(other.vm_storage);
163 }
164
165 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
166 &self.states
167 }
168
169 pub fn get_vm_storage(&self) -> &HashMap<Bytes, Account> {
170 &self.vm_storage
171 }
172}
173
174#[derive(Clone, PartialEq, Debug, Default)]
175pub struct StateSyncMessage<H>
176where
177 H: HeaderLike,
178{
179 pub header: H,
181 pub snapshots: Snapshot,
183 pub deltas: Option<BlockAggregatedChanges>,
187 pub removed_components: HashMap<String, ProtocolComponent>,
189}
190
191impl<H> StateSyncMessage<H>
192where
193 H: HeaderLike,
194{
195 pub fn merge(mut self, other: Self) -> Self {
196 self.removed_components
198 .retain(|k, _| !other.snapshots.states.contains_key(k));
199 self.snapshots
200 .states
201 .retain(|k, _| !other.removed_components.contains_key(k));
202
203 self.snapshots.extend(other.snapshots);
204 let deltas = match (self.deltas, other.deltas) {
205 (Some(l), Some(r)) => Some(l.merge(r)),
206 (None, Some(r)) => Some(r),
207 (Some(l), None) => Some(l),
208 (None, None) => None,
209 };
210 self.removed_components
211 .extend(other.removed_components);
212 Self {
213 header: other.header,
214 snapshots: self.snapshots,
215 deltas,
216 removed_components: self.removed_components,
217 }
218 }
219}
220
221#[derive(Debug, Clone, PartialEq)]
223enum SnapshotStatus {
224 Deferred,
226 InFlight,
228 RetryNext,
230 Blacklisted,
232}
233
234struct SnapshotFetchResult {
235 components: HashMap<String, ProtocolComponent>,
236 contract_ids: HashSet<Bytes>,
237 dci_update: DCIUpdate,
238 snapshot: Snapshot,
239 snapshot_block: u64,
240}
241
242struct SnapshotTask {
243 component_ids: Vec<String>,
244 snapshot_block: u64,
245 receiver: oneshot::Receiver<Result<SnapshotFetchResult, SynchronizerError>>,
246}
247
248pub struct SynchronizerTaskHandle {
253 join_handle: JoinHandle<()>,
254 close_tx: oneshot::Sender<()>,
255}
256
257impl SynchronizerTaskHandle {
266 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
267 Self { join_handle, close_tx }
268 }
269
270 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
276 (self.join_handle, self.close_tx)
277 }
278}
279
280#[async_trait]
281pub trait StateSynchronizer: Send + Sync + 'static {
282 async fn initialize(&mut self) -> SyncResult<()>;
283 async fn start(
286 mut self,
287 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
288}
289
290struct FetchSnapshotParams {
291 chain: Chain,
292 protocol_system: String,
293 block_number: u64,
294 uses_dci: bool,
295 retrieve_balances: bool,
296 include_tvl: bool,
297}
298
299async fn fetch_snapshot<R: RPCClient>(
305 rpc_client: &R,
306 components: HashMap<String, ProtocolComponent>,
307 mut contract_ids: HashSet<Bytes>,
308 params: &FetchSnapshotParams,
309) -> Result<(Snapshot, DCIUpdate, HashSet<Bytes>), SynchronizerError> {
310 if components.is_empty() {
311 return Ok((Snapshot::default(), DCIUpdate::default(), contract_ids));
312 }
313
314 let component_ids: Vec<String> = components.keys().cloned().collect();
315
316 let (dci_update, entrypoints_result) = if params.uses_dci {
317 let result = rpc_client
318 .get_traced_entry_points_paginated(TracedEntryPointsPaginatedParams::new(
319 params.chain,
320 ¶ms.protocol_system,
321 component_ids.clone(),
322 RPC_CLIENT_CONCURRENCY,
323 ))
324 .await?;
325 let dci_contracts: HashSet<Bytes> = result
326 .values()
327 .flat_map(|traces| {
328 traces
329 .iter()
330 .flat_map(|(_, tr)| tr.accessed_slots.keys().cloned())
331 })
332 .collect();
333 contract_ids.extend(dci_contracts);
334 let eps = result.clone();
335 let dci: DCIUpdate = result.into();
336 (dci, eps)
337 } else {
338 (DCIUpdate::default(), HashMap::new())
339 };
340
341 let contract_ids_vec: Vec<Bytes> = contract_ids.iter().cloned().collect();
342 let request = SnapshotParameters::new(
343 params.chain,
344 ¶ms.protocol_system,
345 &components,
346 &contract_ids_vec,
347 params.block_number,
348 )
349 .entrypoints(&entrypoints_result)
350 .include_balances(params.retrieve_balances)
351 .include_tvl(params.include_tvl);
352
353 let snapshot = rpc_client
354 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
355 .await?;
356
357 Ok((snapshot, dci_update, contract_ids))
358}
359
360async fn fetch_snapshot_background<R: RPCClient>(
363 rpc_client: R,
364 component_ids: Vec<String>,
365 params: FetchSnapshotParams,
366) -> Result<SnapshotFetchResult, SynchronizerError> {
367 if component_ids.is_empty() {
368 return Ok(SnapshotFetchResult {
369 components: HashMap::new(),
370 contract_ids: HashSet::new(),
371 dci_update: DCIUpdate::default(),
372 snapshot: Snapshot::default(),
373 snapshot_block: params.block_number,
374 });
375 }
376
377 let request = crate::rpc::ProtocolComponentsParams::new(params.chain, ¶ms.protocol_system)
378 .with_component_ids(component_ids);
379 let components: HashMap<String, ProtocolComponent> = rpc_client
380 .get_protocol_components(request)
381 .await?
382 .into_data()
383 .into_iter()
384 .map(|pc| (pc.id.clone(), pc))
385 .collect();
386
387 let contract_ids: HashSet<Bytes> = components
388 .values()
389 .flat_map(|c| c.contract_addresses.iter().cloned())
390 .collect();
391
392 let snapshot_block = params.block_number;
393 let (snapshot, dci_update, contract_ids) =
394 fetch_snapshot(&rpc_client, components.clone(), contract_ids, ¶ms).await?;
395
396 Ok(SnapshotFetchResult { components, contract_ids, dci_update, snapshot, snapshot_block })
397}
398
399impl<R, D> ProtocolStateSynchronizer<R, D>
400where
401 R: RPCClient + Clone + Send + Sync + 'static,
404 D: DeltasClient + Clone + Send + Sync + 'static,
405{
406 #[allow(clippy::too_many_arguments)]
408 pub fn new(
409 extractor_id: ExtractorIdentity,
410 retrieve_balances: bool,
411 component_filter: ComponentFilter,
412 max_retries: u64,
413 retry_cooldown: Duration,
414 include_snapshots: bool,
415 include_tvl: bool,
416 compression: bool,
417 rpc_client: R,
418 deltas_client: D,
419 timeout: u64,
420 ) -> Self {
421 Self {
422 extractor_id: extractor_id.clone(),
423 retrieve_balances,
424 rpc_client: rpc_client.clone(),
425 include_snapshots,
426 deltas_client,
427 component_tracker: ComponentTracker::new(
428 extractor_id.chain,
429 extractor_id.name.as_str(),
430 component_filter,
431 rpc_client,
432 ),
433 max_retries,
434 retry_cooldown,
435 last_synced_block: None,
436 timeout,
437 include_tvl,
438 compression,
439 partial_blocks: false,
440 uses_dci: false,
441 snapshot_tasks: Vec::new(),
442 buffered_deltas: Vec::new(),
443 snapshot_queue: HashMap::new(),
444 }
445 }
446
447 pub fn with_dci(mut self, uses_dci: bool) -> Self {
450 self.uses_dci = uses_dci;
451 self
452 }
453
454 pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
456 self.partial_blocks = partial_blocks;
457 self
458 }
459
460 #[allow(clippy::result_large_err)]
473 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
474 async fn state_sync(
475 &mut self,
476 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
477 mut end_rx: oneshot::Receiver<()>,
478 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
479 let subscription_options = SubscriptionOptions::new()
481 .with_state(self.include_snapshots)
482 .with_compression(self.compression)
483 .with_partial_blocks(self.partial_blocks);
484 let (subscription_id, mut msg_rx) = match self
485 .deltas_client
486 .subscribe(self.extractor_id.clone(), subscription_options)
487 .await
488 {
489 Ok(result) => result,
490 Err(e) => return Err((e.into(), Some(end_rx))),
491 };
492
493 let result = async {
494 info!("Waiting for deltas...");
495 let mut last_block_number: Option<u64> = None;
497
498 const MAX_STALE_RETRIES: u32 = 5;
504 let mut stale_retries: u32 = 0;
505 let (msg, header) = 'init: loop {
506 let mut warned_waiting_for_new_block = false;
507 let mut warned_skipping_synced = false;
508 let mut first_msg = loop {
509 let msg = select! {
510 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
511 deltas_result
512 .map_err(|_| {
513 SynchronizerError::Timeout(format!(
514 "First deltas took longer than {t}s to arrive",
515 t = self.timeout
516 ))
517 })?
518 .ok_or_else(|| {
519 SynchronizerError::ConnectionError(
520 "Deltas channel closed before first message".to_string(),
521 )
522 })?
523 },
524 _ = &mut end_rx => {
525 info!("Received close signal while waiting for first deltas");
526 return Ok(());
527 }
528 };
529
530 let incoming: BlockHeader = (&msg).into();
531
532 let is_new_block_candidate = if self.partial_blocks {
536 match msg.partial_block_index {
537 None => {
538 last_block_number = Some(incoming.number);
540 true
541 }
542 Some(current_partial_idx) => {
543 let is_new_block = last_block_number
544 .map(|prev_block| incoming.number > prev_block)
545 .unwrap_or(false);
546
547 if !warned_waiting_for_new_block {
548 info!(
549 extractor=%self.extractor_id,
550 block=incoming.number,
551 partial_idx=current_partial_idx,
552 "Syncing. Waiting for new block to start"
553 );
554 warned_waiting_for_new_block = true;
555 }
556 last_block_number = Some(incoming.number);
557 is_new_block
558 }
559 }
560 } else {
561 true };
563
564 if !is_new_block_candidate {
565 continue;
566 }
567
568 if let Some(current) = &self.last_synced_block {
570 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
571 if !warned_skipping_synced {
572 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
573 warned_skipping_synced = true;
574 }
575 continue;
576 }
577 }
578 break msg;
579 };
580
581 self.filter_deltas(&mut first_msg);
582
583 info!(height = first_msg.get_block().number, "First deltas received");
585 let header: BlockHeader = (&first_msg).into();
586 let deltas_msg = StateSyncMessage {
587 header: header.clone(),
588 snapshots: Default::default(),
589 deltas: Some(first_msg),
590 removed_components: Default::default(),
591 };
592
593 if !self.is_next_expected(&header) {
595 info!("Retrieving snapshot");
596 let snapshot_header = if self.partial_blocks && header.number > 0 {
599 BlockHeader {
600 number: header.number - 1,
601 hash: header.parent_hash.clone(),
602 ..Default::default()
603 }
604 } else {
605 BlockHeader { revert: false, ..header.clone() }
606 };
607 let component_ids =
608 self.component_tracker.get_tracked_component_ids();
609 let init_snapshot = if !self.include_snapshots ||
610 component_ids.is_empty()
611 {
612 Snapshot::default()
613 } else {
614 let components: HashMap<_, _> = self
616 .component_tracker
617 .components
618 .iter()
619 .filter(|(id, _)| component_ids.contains(id))
620 .map(|(k, v)| (k.clone(), v.clone()))
621 .collect();
622 let contract_ids: HashSet<Bytes> = self
623 .component_tracker
624 .get_contracts_by_component(&component_ids)
625 .into_iter()
626 .collect();
627 let fetch_params = FetchSnapshotParams {
628 chain: self.extractor_id.chain,
629 protocol_system: self.extractor_id.name.clone(),
630 block_number: snapshot_header.number,
631 uses_dci: self.uses_dci,
632 retrieve_balances: self.retrieve_balances,
633 include_tvl: self.include_tvl,
634 };
635 match fetch_snapshot(
636 &self.rpc_client,
637 components,
638 contract_ids,
639 &fetch_params,
640 )
641 .await
642 {
643 Ok((snap, dci_update, _)) => {
644 self.component_tracker
645 .process_entrypoints(&dci_update);
646 snap
647 }
648 Err(SynchronizerError::RPCError(
649 crate::rpc::RPCError::StaleBlock(reason),
650 )) => {
651 stale_retries += 1;
652 if stale_retries > MAX_STALE_RETRIES {
653 return Err(SynchronizerError::RPCError(
654 crate::rpc::RPCError::StaleBlock(reason),
655 ));
656 }
657 warn!(
662 block = header.number,
663 stale_retries,
664 %reason,
665 "Snapshot block is outside server retention \
666 window; waiting for a more recent block"
667 );
668 continue 'init;
669 }
670 Err(e) => return Err(e),
671 }
672 };
673 let n_components = self.component_tracker.components.len();
674 let n_snapshots = init_snapshot.states.len();
675 info!(
676 n_components,
677 n_snapshots,
678 "Initial snapshot retrieved, starting delta message feed"
679 );
680 let snapshot_msg = StateSyncMessage {
681 header: snapshot_header,
682 snapshots: init_snapshot,
683 deltas: None,
684 removed_components: HashMap::new(),
685 };
686 break 'init (snapshot_msg.merge(deltas_msg), header);
687 } else {
688 break 'init (deltas_msg, header);
689 }
690 };
691
692 block_tx.send(Ok(msg)).await?;
693 self.last_synced_block = Some(header);
694 loop {
695 select! {
696 deltas_opt = msg_rx.recv() => {
697 if let Some(mut deltas) = deltas_opt {
698 let header: BlockHeader = (&deltas).into();
699 debug!(block_number=?header.number, "Received delta message");
700
701 if !self.snapshot_tasks.is_empty() {
703 self.buffered_deltas.push(deltas.clone());
704 }
705
706 let background_snapshots = self.drain_completed_snapshots();
707
708 if self.snapshot_tasks.is_empty() {
710 self.buffered_deltas.clear();
711 } else {
712 let oldest_pending_block = self
713 .snapshot_tasks
714 .iter()
715 .map(|p| p.snapshot_block)
716 .min()
717 .unwrap_or(u64::MAX);
718 self.buffered_deltas
719 .retain(|d| d.block.number > oldest_pending_block);
720 }
721
722 let (snapshots, removed_components) = {
723 let (to_add, to_remove) =
724 self.component_tracker.filter_updated_components(&deltas);
725
726 let retry_ids: Vec<String> = self
732 .snapshot_queue
733 .iter()
734 .filter(|(_, s)| matches!(s, SnapshotStatus::RetryNext))
735 .map(|(id, _)| id.clone())
736 .collect();
737 for id in &retry_ids {
738 self.snapshot_queue.remove(id);
739 }
740
741 let truly_new: Vec<String> = {
746 let mut seen = HashSet::new();
747 to_add
748 .iter()
749 .chain(retry_ids.iter())
750 .filter(|id| {
751 !self.component_tracker
752 .components
753 .contains_key(id.as_str())
754 && !self
755 .snapshot_queue
756 .contains_key(id.as_str())
757 && seen.insert(id.as_str())
758 })
759 .cloned()
760 .collect()
761 };
762
763 if self.partial_blocks {
764 let is_new_block = self
765 .last_synced_block
766 .as_ref()
767 .map(|b| header.number > b.number)
768 .unwrap_or(true);
769
770 let has_deferred = self
771 .snapshot_queue
772 .values()
773 .any(|s| matches!(s, SnapshotStatus::Deferred));
774 if is_new_block && has_deferred && header.number > 0 {
775 let to_fire: Vec<String> = self
779 .snapshot_queue
780 .iter()
781 .filter(|(_, s)| matches!(s, SnapshotStatus::Deferred))
782 .map(|(id, _)| id.clone())
783 .collect();
784 for id in &to_fire {
785 self.snapshot_queue.remove(id);
786 }
787 let snapshot_header = BlockHeader {
788 number: header.number - 1,
789 hash: header.parent_hash.clone(),
790 ..Default::default()
791 };
792 debug!(
793 components = ?to_fire,
794 extractor = %self.extractor_id.name,
795 snapshot_block = header.number - 1,
796 "snapshot_deferred_to_background"
797 );
798 self.spawn_snapshot_task(
799 to_fire,
800 snapshot_header,
801 &deltas,
802 );
803 }
804
805 for id in truly_new {
808 self.snapshot_queue
809 .insert(id, SnapshotStatus::Deferred);
810 }
811 } else if !truly_new.is_empty() {
812 debug!(
813 components = ?truly_new,
814 extractor = %self.extractor_id.name,
815 block_number = ?header.number,
816 "snapshot_deferred_to_background"
817 );
818 let snapshot_header =
819 BlockHeader { revert: false, ..header.clone() };
820 self.spawn_snapshot_task(truly_new, snapshot_header, &deltas);
821 }
822
823 let snapshots = background_snapshots;
824
825 let removed_components = if !to_remove.is_empty() {
826 self.component_tracker.stop_tracking(&to_remove)
827 } else {
828 Default::default()
829 };
830
831 (snapshots, removed_components)
832 };
833
834 self.component_tracker.process_entrypoints(&deltas.dci_update);
836
837 self.filter_deltas(&mut deltas);
839 let n_changes = deltas.n_changes();
840
841 let next = StateSyncMessage {
842 header: header.clone(),
843 snapshots,
844 deltas: Some(deltas),
845 removed_components,
846 };
847 block_tx.send(Ok(next)).await?;
848 self.last_synced_block = Some(header.clone());
849
850 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
851 } else {
852 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
853 }
854 },
855 _ = &mut end_rx => {
856 info!("Received close signal during state_sync");
857 return Ok(());
858 }
859 }
860 }
861 }.await;
862
863 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
865 let _ = self
867 .deltas_client
868 .unsubscribe(subscription_id)
869 .await
870 .map_err(|err| {
871 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
872 });
873
874 match result {
877 Ok(()) => Ok(()), Err(e) => {
879 Err((e, Some(end_rx)))
883 }
884 }
885 }
886
887 fn apply_deltas_to_snapshot(
890 &self,
891 snapshot: &mut Snapshot,
892 snapshot_block: u64,
893 contract_ids: &HashSet<Bytes>,
894 ) {
895 for delta in &self.buffered_deltas {
896 if delta.block.number <= snapshot_block {
897 continue;
898 }
899 for (component_id, state_delta) in &delta.state_deltas {
900 if let Some(cws) = snapshot.states.get_mut(component_id) {
901 cws.state.attributes.extend(
902 state_delta
903 .updated_attributes
904 .iter()
905 .map(|(k, v)| (k.clone(), v.clone())),
906 );
907 for key in &state_delta.deleted_attributes {
908 cws.state.attributes.remove(key);
909 }
910 }
911 }
912 for (component_id, token_balances) in &delta.component_balances {
913 if let Some(cws) = snapshot.states.get_mut(component_id) {
914 for (token, bal) in token_balances {
915 cws.state
916 .balances
917 .insert(token.clone(), bal.balance.clone());
918 }
919 }
920 }
921 for (address, account_delta) in &delta.account_deltas {
922 if contract_ids.contains(address) {
923 if let Some(account) = snapshot.vm_storage.get_mut(address) {
924 account.slots.extend(
925 account_delta
926 .slots
927 .iter()
928 .filter_map(|(k, v)| {
929 v.as_ref()
930 .map(|v| (k.clone(), v.clone()))
931 }),
932 );
933 if let Some(balance) = &account_delta.balance {
934 account.native_balance = balance.clone();
935 }
936 if let Some(code) = account_delta.code() {
937 account.code = code.clone();
938 }
939 }
940 }
941 }
942 }
943 }
944
945 fn spawn_snapshot_task(
949 &mut self,
950 component_ids: Vec<String>,
951 snapshot_header: BlockHeader,
952 current_delta: &BlockAggregatedChanges,
953 ) {
954 let snapshot_block = snapshot_header.number;
955
956 if self.snapshot_tasks.is_empty() {
957 self.buffered_deltas
958 .push(current_delta.clone());
959 }
960
961 let (tx, rx) = oneshot::channel();
962 let rpc = self.rpc_client.clone();
963 let bg_params = FetchSnapshotParams {
964 chain: self.extractor_id.chain,
965 protocol_system: self.extractor_id.name.clone(),
966 block_number: snapshot_block,
967 uses_dci: self.uses_dci,
968 retrieve_balances: self.retrieve_balances,
969 include_tvl: self.include_tvl,
970 };
971 let ids = component_ids.clone();
972 tokio::spawn(async move {
973 let _ = tx.send(fetch_snapshot_background(rpc, ids, bg_params).await);
974 });
975 for id in &component_ids {
976 self.snapshot_queue
977 .insert(id.clone(), SnapshotStatus::InFlight);
978 }
979 self.snapshot_tasks
980 .push(SnapshotTask { component_ids, snapshot_block, receiver: rx });
981 }
982
983 fn drain_completed_snapshots(&mut self) -> Snapshot {
986 let mut result = Snapshot::default();
987 let pending = std::mem::take(&mut self.snapshot_tasks);
988
989 for mut p in pending {
990 match p.receiver.try_recv() {
991 Ok(Ok(fetch_result)) => {
992 debug!(
993 components = ?p.component_ids,
994 extractor = %self.extractor_id.name,
995 "snapshot_background_ready"
996 );
997 for id in &p.component_ids {
998 self.snapshot_queue.remove(id);
999 }
1000 let new_component_ids: Vec<String> = fetch_result
1001 .components
1002 .keys()
1003 .cloned()
1004 .collect();
1005 self.component_tracker
1006 .components
1007 .extend(fetch_result.components);
1008 self.component_tracker
1009 .process_entrypoints(&fetch_result.dci_update);
1010 self.component_tracker
1011 .update_contracts(new_component_ids);
1012 let mut snapshot = fetch_result.snapshot;
1013 self.apply_deltas_to_snapshot(
1014 &mut snapshot,
1015 fetch_result.snapshot_block,
1016 &fetch_result.contract_ids,
1017 );
1018 result.extend(snapshot);
1019 }
1020 Ok(Err(e)) => {
1021 if e.is_transient() {
1022 warn!(
1023 components = ?p.component_ids,
1024 extractor = %self.extractor_id.name,
1025 err = %e,
1026 "Background snapshot fetch failed transiently; will retry next block"
1027 );
1028 for id in &p.component_ids {
1029 self.snapshot_queue
1030 .insert(id.clone(), SnapshotStatus::RetryNext);
1031 }
1032 } else {
1033 warn!(
1034 components = ?p.component_ids,
1035 extractor = %self.extractor_id.name,
1036 err = %e,
1037 "Background snapshot fetch failed permanently; \
1038 components blacklisted until restart"
1039 );
1040 for id in &p.component_ids {
1041 self.snapshot_queue
1042 .insert(id.clone(), SnapshotStatus::Blacklisted);
1043 }
1044 }
1045 }
1046 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
1047 self.snapshot_tasks.push(p);
1048 }
1049 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
1050 warn!(
1051 components = ?p.component_ids,
1052 extractor = %self.extractor_id.name,
1053 "Background snapshot task dropped before sending result"
1054 );
1055 for id in &p.component_ids {
1056 self.snapshot_queue.remove(id);
1057 }
1058 }
1059 }
1060 }
1061
1062 result
1063 }
1064
1065 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
1066 if let Some(block) = self.last_synced_block.as_ref() {
1067 return incoming.parent_hash == block.hash;
1068 }
1069 false
1070 }
1071 fn filter_deltas(&self, deltas: &mut BlockAggregatedChanges) {
1072 deltas.filter_by_component(|id| {
1073 self.component_tracker
1074 .components
1075 .contains_key(id)
1076 });
1077 deltas.filter_by_contract(|id| {
1078 self.component_tracker
1079 .contracts
1080 .contains(id)
1081 });
1082 }
1083}
1084
1085#[async_trait]
1086impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
1087where
1088 R: RPCClient + Clone + Send + Sync + 'static,
1089 D: DeltasClient + Clone + Send + Sync + 'static,
1090{
1091 async fn initialize(&mut self) -> SyncResult<()> {
1092 info!("Retrieving relevant protocol components");
1093 self.component_tracker
1094 .initialise_components()
1095 .await?;
1096 info!(
1097 n_components = self.component_tracker.components.len(),
1098 n_contracts = self.component_tracker.contracts.len(),
1099 extractor = %self.extractor_id,
1100 "Finished retrieving components",
1101 );
1102
1103 Ok(())
1104 }
1105
1106 async fn start(
1107 mut self,
1108 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1109 let (mut tx, rx) = channel(15);
1110 let (end_tx, end_rx) = oneshot::channel::<()>();
1111
1112 let jh = tokio::spawn(async move {
1113 let mut retry_count = 0;
1114 let mut current_end_rx = end_rx;
1115 let mut final_error = None;
1116
1117 while retry_count < self.max_retries {
1118 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
1119
1120 let prev_block = self
1121 .last_synced_block
1122 .as_ref()
1123 .map(|h| h.number);
1124 let res = self
1125 .state_sync(&mut tx, current_end_rx)
1126 .await;
1127 let made_progress = self
1128 .last_synced_block
1129 .as_ref()
1130 .map(|h| h.number) >
1131 prev_block;
1132 match res {
1133 Ok(()) => {
1134 info!(
1135 extractor_id=%&self.extractor_id,
1136 retry_count,
1137 "State synchronization exited cleanly"
1138 );
1139 return;
1140 }
1141 Err((e, maybe_end_rx)) => {
1142 warn!(
1143 extractor_id=%&self.extractor_id,
1144 retry_count,
1145 error=%e,
1146 "State synchronization errored!"
1147 );
1148
1149 if let Some(recovered_end_rx) = maybe_end_rx {
1151 current_end_rx = recovered_end_rx;
1152
1153 if let SynchronizerError::ConnectionClosed = e {
1154 error!(
1156 "Websocket connection closed. State synchronization exiting."
1157 );
1158 let _ = tx.send(Err(e)).await;
1159 return;
1160 } else {
1161 final_error = Some(e);
1163 }
1164 } else {
1165 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
1167 return;
1168 }
1169 }
1170 }
1171 sleep(self.retry_cooldown).await;
1172 if made_progress {
1175 retry_count = 0;
1176 } else {
1177 retry_count += 1;
1178 }
1179 }
1180 if let Some(e) = final_error {
1181 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
1182 let _ = tx.send(Err(e)).await;
1183 }
1184 });
1185
1186 let handle = SynchronizerTaskHandle::new(jh, end_tx);
1187 (handle, rx)
1188 }
1189}
1190
1191#[cfg(test)]
1192mod test {
1193 use std::{collections::HashSet, sync::Arc};
1212
1213 use tycho_common::models::{
1214 blockchain::{
1215 AddressStorageLocation, Block, BlockAggregatedChanges, DCIUpdate, EntryPoint,
1216 EntryPointWithTracingParams, RPCTracerParams, TracingParams, TracingResult,
1217 },
1218 protocol::{ProtocolComponent, ProtocolComponentState},
1219 token::Token,
1220 Chain,
1221 };
1222 use uuid::Uuid;
1223
1224 use super::*;
1225 use crate::{
1226 deltas::MockDeltasClient,
1227 rpc::{MockRPCClient, Page},
1228 DeltasError, RPCError,
1229 };
1230
1231 struct ArcRPCClient<T>(Arc<T>);
1233
1234 impl<T> Clone for ArcRPCClient<T> {
1236 fn clone(&self) -> Self {
1237 ArcRPCClient(self.0.clone())
1238 }
1239 }
1240
1241 #[async_trait]
1242 impl<T> RPCClient for ArcRPCClient<T>
1243 where
1244 T: RPCClient + Sync + Send + 'static,
1245 {
1246 async fn get_tokens(
1247 &self,
1248 params: crate::rpc::TokensParams,
1249 ) -> Result<crate::rpc::Page<Vec<Token>>, RPCError> {
1250 self.0.get_tokens(params).await
1251 }
1252
1253 async fn get_contract_state(
1254 &self,
1255 params: crate::rpc::ContractStateParams,
1256 ) -> Result<crate::rpc::Page<Vec<Account>>, RPCError> {
1257 self.0.get_contract_state(params).await
1258 }
1259
1260 async fn get_protocol_components(
1261 &self,
1262 params: crate::rpc::ProtocolComponentsParams,
1263 ) -> Result<crate::rpc::Page<Vec<ProtocolComponent>>, RPCError> {
1264 self.0
1265 .get_protocol_components(params)
1266 .await
1267 }
1268
1269 async fn get_protocol_states(
1270 &self,
1271 params: crate::rpc::ProtocolStatesParams,
1272 ) -> Result<crate::rpc::Page<Vec<ProtocolComponentState>>, RPCError> {
1273 self.0.get_protocol_states(params).await
1274 }
1275
1276 async fn get_protocol_systems(
1277 &self,
1278 params: crate::rpc::ProtocolSystemsParams,
1279 ) -> Result<crate::rpc::Page<crate::rpc::ProtocolSystems>, RPCError> {
1280 self.0
1281 .get_protocol_systems(params)
1282 .await
1283 }
1284
1285 async fn get_component_tvl(
1286 &self,
1287 params: crate::rpc::ComponentTvlParams,
1288 ) -> Result<crate::rpc::Page<HashMap<String, f64>>, RPCError> {
1289 self.0.get_component_tvl(params).await
1290 }
1291
1292 async fn get_traced_entry_points(
1293 &self,
1294 params: crate::rpc::TracedEntryPointsParams,
1295 ) -> Result<
1296 crate::rpc::Page<HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>>,
1297 RPCError,
1298 > {
1299 self.0
1300 .get_traced_entry_points(params)
1301 .await
1302 }
1303
1304 #[allow(clippy::extra_unused_lifetimes)]
1305 async fn get_snapshots<'a>(
1306 &self,
1307 request: &SnapshotParameters<'a>,
1308 chunk_size: Option<usize>,
1309 concurrency: usize,
1310 ) -> Result<Snapshot, RPCError> {
1311 self.0
1312 .get_snapshots(request, chunk_size, concurrency)
1313 .await
1314 }
1315
1316 fn compression(&self) -> bool {
1317 self.0.compression()
1318 }
1319 }
1320
1321 struct ArcDeltasClient<T>(Arc<T>);
1323
1324 impl<T> Clone for ArcDeltasClient<T> {
1326 fn clone(&self) -> Self {
1327 ArcDeltasClient(self.0.clone())
1328 }
1329 }
1330
1331 #[async_trait]
1332 impl<T> DeltasClient for ArcDeltasClient<T>
1333 where
1334 T: DeltasClient + Sync + Send + 'static,
1335 {
1336 async fn subscribe(
1337 &self,
1338 extractor_id: tycho_common::models::ExtractorIdentity,
1339 options: SubscriptionOptions,
1340 ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError> {
1341 self.0
1342 .subscribe(extractor_id, options)
1343 .await
1344 }
1345
1346 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
1347 self.0
1348 .unsubscribe(subscription_id)
1349 .await
1350 }
1351
1352 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
1353 self.0.connect().await
1354 }
1355
1356 async fn close(&self) -> Result<(), DeltasError> {
1357 self.0.close().await
1358 }
1359 }
1360
1361 fn with_mocked_clients(
1362 native: bool,
1363 include_tvl: bool,
1364 rpc_client: Option<MockRPCClient>,
1365 deltas_client: Option<MockDeltasClient>,
1366 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
1367 {
1368 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
1369 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
1370
1371 ProtocolStateSynchronizer::new(
1372 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1373 native,
1374 ComponentFilter::with_tvl_range(50.0, 50.0),
1375 1,
1376 Duration::from_secs(0),
1377 true,
1378 include_tvl,
1379 true, rpc_client,
1381 deltas_client,
1382 10_u64,
1383 )
1384 }
1385
1386 fn state_snapshot_native() -> Vec<ProtocolComponentState> {
1387 vec![ProtocolComponentState {
1388 component_id: "Component1".to_string(),
1389 attributes: HashMap::new(),
1390 balances: HashMap::new(),
1391 }]
1392 }
1393
1394 fn make_mock_client() -> MockRPCClient {
1395 let mut m = MockRPCClient::new();
1396 m.expect_compression()
1397 .return_const(false);
1398 m
1399 }
1400
1401 #[test_log::test(tokio::test)]
1402 async fn test_get_snapshots_native() {
1403 let header = BlockHeader::default();
1404 let mut rpc = make_mock_client();
1405 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1406
1407 let component_clone = component.clone();
1408 rpc.expect_get_snapshots()
1409 .returning(move |_request, _chunk_size, _concurrency| {
1410 Ok(Snapshot {
1411 states: state_snapshot_native()
1412 .into_iter()
1413 .map(|state| {
1414 (
1415 state.component_id.clone(),
1416 ComponentWithState {
1417 state,
1418 component: component_clone.clone(),
1419 entrypoints: vec![],
1420 component_tvl: None,
1421 },
1422 )
1423 })
1424 .collect(),
1425 vm_storage: HashMap::new(),
1426 })
1427 });
1428
1429 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1430 state_sync
1431 .component_tracker
1432 .components
1433 .insert("Component1".to_string(), component.clone());
1434 let components_arg = ["Component1".to_string()];
1435 let exp = StateSyncMessage {
1436 header: header.clone(),
1437 snapshots: Snapshot {
1438 states: state_snapshot_native()
1439 .into_iter()
1440 .map(|state| {
1441 (
1442 state.component_id.clone(),
1443 ComponentWithState {
1444 state,
1445 component: component.clone(),
1446 entrypoints: vec![],
1447 component_tvl: None,
1448 },
1449 )
1450 })
1451 .collect(),
1452 vm_storage: HashMap::new(),
1453 },
1454 deltas: None,
1455 removed_components: Default::default(),
1456 };
1457
1458 let req_ids: Vec<String> = components_arg.to_vec();
1459 let components: HashMap<_, _> = state_sync
1460 .component_tracker
1461 .components
1462 .iter()
1463 .filter(|(id, _)| req_ids.contains(id))
1464 .map(|(k, v)| (k.clone(), v.clone()))
1465 .collect();
1466 let contract_ids: HashSet<Bytes> = state_sync
1467 .component_tracker
1468 .get_contracts_by_component(&req_ids)
1469 .into_iter()
1470 .collect();
1471 let params = FetchSnapshotParams {
1472 chain: Chain::Ethereum,
1473 protocol_system: "uniswap-v2".to_string(),
1474 block_number: header.number,
1475 uses_dci: false,
1476 retrieve_balances: true,
1477 include_tvl: false,
1478 };
1479 let (snapshot, _, _) =
1480 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1481 .await
1482 .expect("Retrieving snapshot failed");
1483 let snap = StateSyncMessage {
1484 header: header.clone(),
1485 snapshots: snapshot,
1486 deltas: None,
1487 removed_components: Default::default(),
1488 };
1489
1490 assert_eq!(snap, exp);
1491 }
1492
1493 #[test_log::test(tokio::test)]
1494 async fn test_get_snapshots_native_with_tvl() {
1495 let header = BlockHeader::default();
1496 let mut rpc = make_mock_client();
1497 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1498
1499 let component_clone = component.clone();
1500 rpc.expect_get_snapshots()
1501 .returning(move |_request, _chunk_size, _concurrency| {
1502 Ok(Snapshot {
1503 states: state_snapshot_native()
1504 .into_iter()
1505 .map(|state| {
1506 (
1507 state.component_id.clone(),
1508 ComponentWithState {
1509 state,
1510 component: component_clone.clone(),
1511 component_tvl: Some(100.0),
1512 entrypoints: vec![],
1513 },
1514 )
1515 })
1516 .collect(),
1517 vm_storage: HashMap::new(),
1518 })
1519 });
1520
1521 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1522 state_sync
1523 .component_tracker
1524 .components
1525 .insert("Component1".to_string(), component.clone());
1526 let components_arg = ["Component1".to_string()];
1527 let exp = StateSyncMessage {
1528 header: header.clone(),
1529 snapshots: Snapshot {
1530 states: state_snapshot_native()
1531 .into_iter()
1532 .map(|state| {
1533 (
1534 state.component_id.clone(),
1535 ComponentWithState {
1536 state,
1537 component: component.clone(),
1538 component_tvl: Some(100.0),
1539 entrypoints: vec![],
1540 },
1541 )
1542 })
1543 .collect(),
1544 vm_storage: HashMap::new(),
1545 },
1546 deltas: None,
1547 removed_components: Default::default(),
1548 };
1549
1550 let req_ids: Vec<String> = components_arg.to_vec();
1551 let components: HashMap<_, _> = state_sync
1552 .component_tracker
1553 .components
1554 .iter()
1555 .filter(|(id, _)| req_ids.contains(id))
1556 .map(|(k, v)| (k.clone(), v.clone()))
1557 .collect();
1558 let contract_ids: HashSet<Bytes> = state_sync
1559 .component_tracker
1560 .get_contracts_by_component(&req_ids)
1561 .into_iter()
1562 .collect();
1563 let params = FetchSnapshotParams {
1564 chain: Chain::Ethereum,
1565 protocol_system: "uniswap-v2".to_string(),
1566 block_number: header.number,
1567 uses_dci: false,
1568 retrieve_balances: true,
1569 include_tvl: true,
1570 };
1571 let (snapshot, _, _) =
1572 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1573 .await
1574 .expect("Retrieving snapshot failed");
1575 let snap = StateSyncMessage {
1576 header: header.clone(),
1577 snapshots: snapshot,
1578 deltas: None,
1579 removed_components: Default::default(),
1580 };
1581
1582 assert_eq!(snap, exp);
1583 }
1584
1585 fn state_snapshot_vm() -> Vec<Account> {
1586 vec![
1587 Account::new(
1588 Chain::default(),
1589 Bytes::from("0x0badc0ffee"),
1590 String::new(),
1591 HashMap::new(),
1592 Bytes::default(),
1593 HashMap::new(),
1594 Bytes::default(),
1595 Bytes::default(),
1596 Bytes::default(),
1597 Bytes::default(),
1598 None,
1599 ),
1600 Account::new(
1601 Chain::default(),
1602 Bytes::from("0xbabe42"),
1603 String::new(),
1604 HashMap::new(),
1605 Bytes::default(),
1606 HashMap::new(),
1607 Bytes::default(),
1608 Bytes::default(),
1609 Bytes::default(),
1610 Bytes::default(),
1611 None,
1612 ),
1613 ]
1614 }
1615
1616 fn traced_entry_point_response(
1617 ) -> HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>> {
1618 HashMap::from([(
1619 "Component1".to_string(),
1620 vec![(
1621 EntryPointWithTracingParams {
1622 entry_point: EntryPoint {
1623 external_id: "entrypoint_a".to_string(),
1624 target: Bytes::from("0x0badc0ffee"),
1625 signature: "sig()".to_string(),
1626 },
1627 params: TracingParams::RPCTracer(RPCTracerParams {
1628 caller: Some(Bytes::from("0x0badc0ffee")),
1629 calldata: Bytes::from("0x0badc0ffee"),
1630 state_overrides: None,
1631 prune_addresses: None,
1632 }),
1633 },
1634 TracingResult {
1635 retriggers: HashSet::from([(
1636 Bytes::from("0x0badc0ffee"),
1637 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1638 )]),
1639 accessed_slots: HashMap::from([(
1640 Bytes::from("0x0badc0ffee"),
1641 HashSet::from([Bytes::from("0xbadbeef0")]),
1642 )]),
1643 },
1644 )],
1645 )])
1646 }
1647
1648 #[test_log::test(tokio::test)]
1649 async fn test_get_snapshots_vm() {
1650 let header = BlockHeader::default();
1651 let mut rpc = make_mock_client();
1652
1653 let traced_ep_response = traced_entry_point_response();
1654 rpc.expect_get_snapshots()
1655 .returning(move |_request, _chunk_size, _concurrency| {
1656 let vm_storage_accounts = state_snapshot_vm();
1657 Ok(Snapshot {
1658 states: [(
1659 "Component1".to_string(),
1660 ComponentWithState {
1661 state: ProtocolComponentState {
1662 component_id: "Component1".to_string(),
1663 attributes: HashMap::new(),
1664 balances: HashMap::new(),
1665 },
1666 component: ProtocolComponent {
1667 id: "Component1".to_string(),
1668 contract_addresses: vec![
1669 Bytes::from("0x0badc0ffee"),
1670 Bytes::from("0xbabe42"),
1671 ],
1672 ..Default::default()
1673 },
1674 component_tvl: None,
1675 entrypoints: traced_ep_response
1676 .get("Component1")
1677 .cloned()
1678 .unwrap_or_default(),
1679 },
1680 )]
1681 .into_iter()
1682 .collect(),
1683 vm_storage: vm_storage_accounts
1684 .into_iter()
1685 .map(|account| (account.address.clone(), account))
1686 .collect(),
1687 })
1688 });
1689
1690 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1691 let component = ProtocolComponent {
1692 id: "Component1".to_string(),
1693 contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1694 ..Default::default()
1695 };
1696 state_sync
1697 .component_tracker
1698 .components
1699 .insert("Component1".to_string(), component.clone());
1700 let components_arg = ["Component1".to_string()];
1701 let exp = StateSyncMessage {
1702 header: header.clone(),
1703 snapshots: Snapshot {
1704 states: [(
1705 component.id.clone(),
1706 ComponentWithState {
1707 state: ProtocolComponentState {
1708 component_id: "Component1".to_string(),
1709 attributes: HashMap::new(),
1710 balances: HashMap::new(),
1711 },
1712 component: component.clone(),
1713 component_tvl: None,
1714 entrypoints: traced_entry_point_response()
1715 .remove("Component1")
1716 .unwrap_or_default(),
1717 },
1718 )]
1719 .into_iter()
1720 .collect(),
1721 vm_storage: state_snapshot_vm()
1722 .into_iter()
1723 .map(|account| (account.address.clone(), account))
1724 .collect(),
1725 },
1726 deltas: None,
1727 removed_components: Default::default(),
1728 };
1729
1730 let req_ids: Vec<String> = components_arg.to_vec();
1731 let components: HashMap<_, _> = state_sync
1732 .component_tracker
1733 .components
1734 .iter()
1735 .filter(|(id, _)| req_ids.contains(id))
1736 .map(|(k, v)| (k.clone(), v.clone()))
1737 .collect();
1738 let contract_ids: HashSet<Bytes> = state_sync
1739 .component_tracker
1740 .get_contracts_by_component(&req_ids)
1741 .into_iter()
1742 .collect();
1743 let params = FetchSnapshotParams {
1744 chain: Chain::Ethereum,
1745 protocol_system: "uniswap-v2".to_string(),
1746 block_number: header.number,
1747 uses_dci: false,
1748 retrieve_balances: false,
1749 include_tvl: false,
1750 };
1751 let (snapshot, _, _) =
1752 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1753 .await
1754 .expect("Retrieving snapshot failed");
1755 let snap = StateSyncMessage {
1756 header: header.clone(),
1757 snapshots: snapshot,
1758 deltas: None,
1759 removed_components: Default::default(),
1760 };
1761
1762 assert_eq!(snap, exp);
1763 }
1764
1765 #[test_log::test(tokio::test)]
1766 async fn test_get_snapshots_vm_with_tvl() {
1767 let header = BlockHeader::default();
1768 let mut rpc = make_mock_client();
1769 let component = ProtocolComponent {
1770 id: "Component1".to_string(),
1771 contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1772 ..Default::default()
1773 };
1774
1775 let component_clone = component.clone();
1776 rpc.expect_get_snapshots()
1777 .returning(move |_request, _chunk_size, _concurrency| {
1778 let vm_storage_accounts = state_snapshot_vm();
1779 Ok(Snapshot {
1780 states: [(
1781 "Component1".to_string(),
1782 ComponentWithState {
1783 state: ProtocolComponentState {
1784 component_id: "Component1".to_string(),
1785 attributes: HashMap::new(),
1786 balances: HashMap::new(),
1787 },
1788 component: component_clone.clone(),
1789 component_tvl: Some(100.0),
1790 entrypoints: vec![],
1791 },
1792 )]
1793 .into_iter()
1794 .collect(),
1795 vm_storage: vm_storage_accounts
1796 .into_iter()
1797 .map(|account| (account.address.clone(), account))
1798 .collect(),
1799 })
1800 });
1801
1802 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1803 state_sync
1804 .component_tracker
1805 .components
1806 .insert("Component1".to_string(), component.clone());
1807 let components_arg = ["Component1".to_string()];
1808 let exp = StateSyncMessage {
1809 header: header.clone(),
1810 snapshots: Snapshot {
1811 states: [(
1812 component.id.clone(),
1813 ComponentWithState {
1814 state: ProtocolComponentState {
1815 component_id: "Component1".to_string(),
1816 attributes: HashMap::new(),
1817 balances: HashMap::new(),
1818 },
1819 component: component.clone(),
1820 component_tvl: Some(100.0),
1821 entrypoints: vec![],
1822 },
1823 )]
1824 .into_iter()
1825 .collect(),
1826 vm_storage: state_snapshot_vm()
1827 .into_iter()
1828 .map(|account| (account.address.clone(), account))
1829 .collect(),
1830 },
1831 deltas: None,
1832 removed_components: Default::default(),
1833 };
1834
1835 let req_ids: Vec<String> = components_arg.to_vec();
1836 let components: HashMap<_, _> = state_sync
1837 .component_tracker
1838 .components
1839 .iter()
1840 .filter(|(id, _)| req_ids.contains(id))
1841 .map(|(k, v)| (k.clone(), v.clone()))
1842 .collect();
1843 let contract_ids: HashSet<Bytes> = state_sync
1844 .component_tracker
1845 .get_contracts_by_component(&req_ids)
1846 .into_iter()
1847 .collect();
1848 let params = FetchSnapshotParams {
1849 chain: Chain::Ethereum,
1850 protocol_system: "uniswap-v2".to_string(),
1851 block_number: header.number,
1852 uses_dci: false,
1853 retrieve_balances: false,
1854 include_tvl: true,
1855 };
1856 let (snapshot, _, _) =
1857 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1858 .await
1859 .expect("Retrieving snapshot failed");
1860 let snap = StateSyncMessage {
1861 header: header.clone(),
1862 snapshots: snapshot,
1863 deltas: None,
1864 removed_components: Default::default(),
1865 };
1866
1867 assert_eq!(snap, exp);
1868 }
1869
1870 #[test_log::test(tokio::test)]
1874 async fn test_get_snapshots_filters_to_requested_components_only() {
1875 let header = BlockHeader::default();
1876 let mut rpc = make_mock_client();
1877
1878 let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1880 let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1881 let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1882
1883 let component2_clone = component2.clone();
1884
1885 rpc.expect_get_snapshots()
1887 .withf(
1888 |request: &SnapshotParameters,
1889 _chunk_size: &Option<usize>,
1890 _concurrency: &usize| {
1891 request.components.len() == 1 &&
1893 request
1894 .components
1895 .contains_key("Component2")
1896 },
1897 )
1898 .times(1)
1899 .returning(move |_request, _chunk_size, _concurrency| {
1900 Ok(Snapshot {
1901 states: [(
1902 "Component2".to_string(),
1903 ComponentWithState {
1904 state: ProtocolComponentState {
1905 component_id: "Component2".to_string(),
1906 attributes: HashMap::new(),
1907 balances: HashMap::new(),
1908 },
1909 component: component2_clone.clone(),
1910 entrypoints: vec![],
1911 component_tvl: None,
1912 },
1913 )]
1914 .into_iter()
1915 .collect(),
1916 vm_storage: HashMap::new(),
1917 })
1918 });
1919
1920 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1921
1922 state_sync
1924 .component_tracker
1925 .components
1926 .insert("Component1".to_string(), component1.clone());
1927 state_sync
1928 .component_tracker
1929 .components
1930 .insert("Component2".to_string(), component2.clone());
1931 state_sync
1932 .component_tracker
1933 .components
1934 .insert("Component3".to_string(), component3.clone());
1935
1936 let components_arg = ["Component2".to_string()];
1938 let req_ids: Vec<String> = components_arg.to_vec();
1939 let components: HashMap<_, _> = state_sync
1940 .component_tracker
1941 .components
1942 .iter()
1943 .filter(|(id, _)| req_ids.contains(id))
1944 .map(|(k, v)| (k.clone(), v.clone()))
1945 .collect();
1946 let contract_ids: HashSet<Bytes> = state_sync
1947 .component_tracker
1948 .get_contracts_by_component(&req_ids)
1949 .into_iter()
1950 .collect();
1951 let params = FetchSnapshotParams {
1952 chain: Chain::Ethereum,
1953 protocol_system: "uniswap-v2".to_string(),
1954 block_number: header.number,
1955 uses_dci: false,
1956 retrieve_balances: true,
1957 include_tvl: false,
1958 };
1959 let (snapshot, _, _) =
1960 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1961 .await
1962 .expect("Retrieving snapshot failed");
1963
1964 assert_eq!(snapshot.states.len(), 1);
1966 assert!(snapshot
1967 .states
1968 .contains_key("Component2"));
1969 assert!(!snapshot
1970 .states
1971 .contains_key("Component1"));
1972 assert!(!snapshot
1973 .states
1974 .contains_key("Component3"));
1975 }
1976
1977 fn mock_clients_for_state_sync(
1978 bg_done: Option<Arc<tokio::sync::Notify>>,
1979 ) -> (MockRPCClient, MockDeltasClient, Sender<BlockAggregatedChanges>) {
1980 let mut rpc_client = make_mock_client();
1981 rpc_client
1984 .expect_get_protocol_components()
1985 .withf(|params: &crate::rpc::ProtocolComponentsParams| {
1986 params
1987 .component_ids()
1988 .is_some_and(|ids| ids.contains(&"Component3".to_string()))
1989 })
1990 .returning(|_| {
1991 Ok(Page::new(
1993 vec![
1994 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1996 ],
1997 1,
1998 0,
1999 100,
2000 ))
2001 });
2002 rpc_client
2004 .expect_get_snapshots()
2005 .withf(
2006 |request: &SnapshotParameters,
2007 _chunk_size: &Option<usize>,
2008 _concurrency: &usize| {
2009 request
2010 .components
2011 .contains_key("Component3")
2012 },
2013 )
2014 .returning(move |_request, _chunk_size, _concurrency| {
2015 let snap = Ok(Snapshot {
2016 states: [(
2017 "Component3".to_string(),
2018 ComponentWithState {
2019 state: ProtocolComponentState::new(
2020 "Component3",
2021 Default::default(),
2022 Default::default(),
2023 ),
2024 component: ProtocolComponent {
2025 id: "Component3".to_string(),
2026 ..Default::default()
2027 },
2028 component_tvl: Some(1000.0),
2029 entrypoints: vec![],
2030 },
2031 )]
2032 .into_iter()
2033 .collect(),
2034 vm_storage: HashMap::new(),
2035 });
2036 if let Some(n) = &bg_done {
2037 n.notify_one();
2038 }
2039 snap
2040 });
2041
2042 rpc_client
2044 .expect_get_protocol_components()
2045 .returning(|_| {
2046 Ok(Page::new(
2048 vec![
2049 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2051 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2053 ],
2055 2,
2056 0,
2057 100,
2058 ))
2059 });
2060
2061 rpc_client
2062 .expect_get_snapshots()
2063 .returning(|_request, _chunk_size, _concurrency| {
2064 Ok(Snapshot {
2065 states: [
2066 (
2067 "Component1".to_string(),
2068 ComponentWithState {
2069 state: ProtocolComponentState::new(
2070 "Component1",
2071 Default::default(),
2072 Default::default(),
2073 ),
2074 component: ProtocolComponent {
2075 id: "Component1".to_string(),
2076 ..Default::default()
2077 },
2078 component_tvl: Some(100.0),
2079 entrypoints: vec![],
2080 },
2081 ),
2082 (
2083 "Component2".to_string(),
2084 ComponentWithState {
2085 state: ProtocolComponentState::new(
2086 "Component2",
2087 Default::default(),
2088 Default::default(),
2089 ),
2090 component: ProtocolComponent {
2091 id: "Component2".to_string(),
2092 ..Default::default()
2093 },
2094 component_tvl: Some(0.0),
2095 entrypoints: vec![],
2096 },
2097 ),
2098 ]
2099 .into_iter()
2100 .collect(),
2101 vm_storage: HashMap::new(),
2102 })
2103 });
2104
2105 rpc_client
2107 .expect_get_traced_entry_points()
2108 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2109
2110 let mut deltas_client = MockDeltasClient::new();
2112 let (tx, rx) = channel(1);
2113 deltas_client
2114 .expect_subscribe()
2115 .return_once(move |_, _| {
2116 Ok((Uuid::default(), rx))
2118 });
2119
2120 deltas_client
2122 .expect_unsubscribe()
2123 .return_once(|_| Ok(()));
2124
2125 (rpc_client, deltas_client, tx)
2126 }
2127
2128 #[test_log::test(tokio::test)]
2136 async fn test_state_sync() {
2137 let bg_done = Arc::new(tokio::sync::Notify::new());
2138 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(Some(bg_done.clone()));
2139 let deltas = [
2140 BlockAggregatedChanges {
2141 extractor: "uniswap-v2".to_string(),
2142 chain: Chain::Ethereum,
2143 block: Block {
2144 number: 1,
2145 hash: Bytes::from("0x01"),
2146 parent_hash: Bytes::from("0x00"),
2147 chain: Chain::Ethereum,
2148 ts: Default::default(),
2149 },
2150 revert: false,
2151 dci_update: DCIUpdate {
2152 new_entrypoints: HashMap::from([(
2153 "Component1".to_string(),
2154 HashSet::from([EntryPoint {
2155 external_id: "entrypoint_a".to_string(),
2156 target: Bytes::from("0x0badc0ffee"),
2157 signature: "sig()".to_string(),
2158 }]),
2159 )]),
2160 new_entrypoint_params: HashMap::from([(
2161 "entrypoint_a".to_string(),
2162 HashSet::from([(
2163 TracingParams::RPCTracer(RPCTracerParams {
2164 caller: Some(Bytes::from("0x0badc0ffee")),
2165 calldata: Bytes::from("0x0badc0ffee"),
2166 state_overrides: None,
2167 prune_addresses: None,
2168 }),
2169 "Component1".to_string(),
2170 )]),
2171 )]),
2172 trace_results: HashMap::from([(
2173 "entrypoint_a".to_string(),
2174 TracingResult {
2175 retriggers: HashSet::from([(
2176 Bytes::from("0x0badc0ffee"),
2177 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
2178 )]),
2179 accessed_slots: HashMap::from([(
2180 Bytes::from("0x0badc0ffee"),
2181 HashSet::from([Bytes::from("0xbadbeef0")]),
2182 )]),
2183 },
2184 )]),
2185 },
2186 ..Default::default()
2187 },
2188 BlockAggregatedChanges {
2189 extractor: "uniswap-v2".to_string(),
2190 chain: Chain::Ethereum,
2191 block: Block {
2192 number: 2,
2193 hash: Bytes::from("0x02"),
2194 parent_hash: Bytes::from("0x01"),
2195 chain: Chain::Ethereum,
2196 ts: Default::default(),
2197 },
2198 revert: false,
2199 component_tvl: [
2200 ("Component1".to_string(), 100.0),
2201 ("Component2".to_string(), 0.0),
2202 ("Component3".to_string(), 1000.0),
2203 ]
2204 .into_iter()
2205 .collect(),
2206 ..Default::default()
2207 },
2208 BlockAggregatedChanges {
2211 extractor: "uniswap-v2".to_string(),
2212 chain: Chain::Ethereum,
2213 block: Block {
2214 number: 3,
2215 hash: Bytes::from("0x03"),
2216 parent_hash: Bytes::from("0x02"),
2217 chain: Chain::Ethereum,
2218 ts: Default::default(),
2219 },
2220 revert: false,
2221 ..Default::default()
2222 },
2223 ];
2224 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
2225 state_sync
2226 .initialize()
2227 .await
2228 .expect("Init failed");
2229
2230 let (handle, mut rx) = state_sync.start().await;
2232 let (jh, close_tx) = handle.split();
2233 tx.send(deltas[0].clone())
2234 .await
2235 .expect("deltas channel msg 0 closed!");
2236 let first_msg = timeout(Duration::from_millis(200), rx.recv())
2237 .await
2238 .expect("waiting for first state msg timed out!")
2239 .expect("state sync block sender closed!");
2240 tx.send(deltas[1].clone())
2241 .await
2242 .expect("deltas channel msg 1 closed!");
2243 let second_msg = timeout(Duration::from_millis(200), rx.recv())
2244 .await
2245 .expect("waiting for second state msg timed out!")
2246 .expect("state sync block sender closed!");
2247 bg_done.notified().await;
2249 tx.send(deltas[2].clone())
2250 .await
2251 .expect("deltas channel msg 2 closed!");
2252 let third_msg = timeout(Duration::from_millis(200), rx.recv())
2253 .await
2254 .expect("waiting for third state msg timed out!")
2255 .expect("state sync block sender closed!");
2256 let _ = close_tx.send(());
2257 jh.await
2258 .expect("state sync task panicked!");
2259
2260 let exp1 = StateSyncMessage {
2262 header: BlockHeader {
2263 number: 1,
2264 hash: Bytes::from("0x01"),
2265 parent_hash: Bytes::from("0x00"),
2266 revert: false,
2267 ..Default::default()
2268 },
2269 snapshots: Snapshot {
2270 states: [
2271 (
2272 "Component1".to_string(),
2273 ComponentWithState {
2274 state: ProtocolComponentState::new(
2275 "Component1",
2276 Default::default(),
2277 Default::default(),
2278 ),
2279 component: ProtocolComponent {
2280 id: "Component1".to_string(),
2281 ..Default::default()
2282 },
2283 component_tvl: Some(100.0),
2284 entrypoints: vec![],
2285 },
2286 ),
2287 (
2288 "Component2".to_string(),
2289 ComponentWithState {
2290 state: ProtocolComponentState::new(
2291 "Component2",
2292 Default::default(),
2293 Default::default(),
2294 ),
2295 component: ProtocolComponent {
2296 id: "Component2".to_string(),
2297 ..Default::default()
2298 },
2299 component_tvl: Some(0.0),
2300 entrypoints: vec![],
2301 },
2302 ),
2303 ]
2304 .into_iter()
2305 .collect(),
2306 vm_storage: HashMap::new(),
2307 },
2308 deltas: Some(deltas[0].clone()),
2309 removed_components: Default::default(),
2310 };
2311
2312 let exp2 = StateSyncMessage {
2315 header: BlockHeader {
2316 number: 2,
2317 hash: Bytes::from("0x02"),
2318 parent_hash: Bytes::from("0x01"),
2319 revert: false,
2320 ..Default::default()
2321 },
2322 snapshots: Snapshot::default(),
2323 deltas: Some(BlockAggregatedChanges {
2324 extractor: "uniswap-v2".to_string(),
2325 chain: Chain::Ethereum,
2326 block: Block {
2327 number: 2,
2328 hash: Bytes::from("0x02"),
2329 parent_hash: Bytes::from("0x01"),
2330 chain: Chain::Ethereum,
2331 ts: Default::default(),
2332 },
2333 revert: false,
2334 component_tvl: [
2335 ("Component1".to_string(), 100.0),
2337 ]
2338 .into_iter()
2339 .collect(),
2340 ..Default::default()
2341 }),
2342 removed_components: [(
2344 "Component2".to_string(),
2345 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2346 )]
2347 .into_iter()
2348 .collect(),
2349 };
2350
2351 let exp3 = StateSyncMessage {
2353 header: BlockHeader {
2354 number: 3,
2355 hash: Bytes::from("0x03"),
2356 parent_hash: Bytes::from("0x02"),
2357 revert: false,
2358 ..Default::default()
2359 },
2360 snapshots: Snapshot {
2361 states: [(
2362 "Component3".to_string(),
2363 ComponentWithState {
2364 state: ProtocolComponentState::new(
2365 "Component3",
2366 Default::default(),
2367 Default::default(),
2368 ),
2369 component: ProtocolComponent {
2370 id: "Component3".to_string(),
2371 ..Default::default()
2372 },
2373 component_tvl: Some(1000.0),
2374 entrypoints: vec![],
2375 },
2376 )]
2377 .into_iter()
2378 .collect(),
2379 vm_storage: HashMap::new(),
2380 },
2381 deltas: Some(deltas[2].clone()),
2382 removed_components: Default::default(),
2383 };
2384 assert_eq!(first_msg.unwrap(), exp1);
2385 assert_eq!(second_msg.unwrap(), exp2);
2386 assert_eq!(third_msg.unwrap(), exp3);
2387 }
2388
2389 #[test_log::test(tokio::test)]
2390 async fn test_state_sync_with_tvl_range() {
2391 let remove_tvl_threshold = 5.0;
2392 let add_tvl_threshold = 7.0;
2393 let bg_done = Arc::new(tokio::sync::Notify::new());
2394
2395 let mut rpc_client = make_mock_client();
2396 let mut deltas_client = MockDeltasClient::new();
2397
2398 rpc_client
2399 .expect_get_protocol_components()
2400 .withf(|params: &crate::rpc::ProtocolComponentsParams| {
2401 params
2402 .component_ids()
2403 .is_some_and(|ids| ids.contains(&"Component3".to_string()))
2404 })
2405 .returning(|_| {
2406 Ok(Page::new(
2407 vec![ProtocolComponent { id: "Component3".to_string(), ..Default::default() }],
2408 1,
2409 0,
2410 100,
2411 ))
2412 });
2413 let bg_done_clone = bg_done.clone();
2415 rpc_client
2416 .expect_get_snapshots()
2417 .withf(
2418 |request: &SnapshotParameters,
2419 _chunk_size: &Option<usize>,
2420 _concurrency: &usize| {
2421 request
2422 .components
2423 .contains_key("Component3")
2424 },
2425 )
2426 .returning(move |_request, _chunk_size, _concurrency| {
2427 let snap = Ok(Snapshot {
2428 states: [(
2429 "Component3".to_string(),
2430 ComponentWithState {
2431 state: ProtocolComponentState::new(
2432 "Component3",
2433 Default::default(),
2434 Default::default(),
2435 ),
2436 component: ProtocolComponent {
2437 id: "Component3".to_string(),
2438 ..Default::default()
2439 },
2440 component_tvl: Some(10.0),
2441 entrypoints: vec![],
2442 },
2443 )]
2444 .into_iter()
2445 .collect(),
2446 vm_storage: HashMap::new(),
2447 });
2448 bg_done_clone.notify_one();
2449 snap
2450 });
2451
2452 rpc_client
2454 .expect_get_protocol_components()
2455 .returning(|_| {
2456 Ok(Page::new(
2457 vec![
2458 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2459 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2460 ],
2461 2,
2462 0,
2463 100,
2464 ))
2465 });
2466
2467 rpc_client
2469 .expect_get_snapshots()
2470 .returning(|_request, _chunk_size, _concurrency| {
2471 Ok(Snapshot {
2472 states: [
2473 (
2474 "Component1".to_string(),
2475 ComponentWithState {
2476 state: ProtocolComponentState::new(
2477 "Component1",
2478 Default::default(),
2479 Default::default(),
2480 ),
2481 component: ProtocolComponent {
2482 id: "Component1".to_string(),
2483 ..Default::default()
2484 },
2485 component_tvl: Some(6.0),
2486 entrypoints: vec![],
2487 },
2488 ),
2489 (
2490 "Component2".to_string(),
2491 ComponentWithState {
2492 state: ProtocolComponentState::new(
2493 "Component2",
2494 Default::default(),
2495 Default::default(),
2496 ),
2497 component: ProtocolComponent {
2498 id: "Component2".to_string(),
2499 ..Default::default()
2500 },
2501 component_tvl: Some(2.0),
2502 entrypoints: vec![],
2503 },
2504 ),
2505 ]
2506 .into_iter()
2507 .collect(),
2508 vm_storage: HashMap::new(),
2509 })
2510 });
2511
2512 rpc_client
2514 .expect_get_traced_entry_points()
2515 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2516
2517 let (tx, rx) = channel(1);
2518 deltas_client
2519 .expect_subscribe()
2520 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2521
2522 deltas_client
2524 .expect_unsubscribe()
2525 .return_once(|_| Ok(()));
2526
2527 let mut state_sync = ProtocolStateSynchronizer::new(
2528 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2529 true,
2530 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
2531 1,
2532 Duration::from_secs(0),
2533 true,
2534 true,
2535 true,
2536 ArcRPCClient(Arc::new(rpc_client)),
2537 ArcDeltasClient(Arc::new(deltas_client)),
2538 10_u64,
2539 );
2540 state_sync
2541 .initialize()
2542 .await
2543 .expect("Init failed");
2544
2545 let deltas = [
2547 BlockAggregatedChanges {
2548 extractor: "uniswap-v2".to_string(),
2549 chain: Chain::Ethereum,
2550 block: Block {
2551 number: 1,
2552 hash: Bytes::from("0x01"),
2553 parent_hash: Bytes::from("0x00"),
2554 chain: Chain::Ethereum,
2555 ts: Default::default(),
2556 },
2557 revert: false,
2558 ..Default::default()
2559 },
2560 BlockAggregatedChanges {
2561 extractor: "uniswap-v2".to_string(),
2562 chain: Chain::Ethereum,
2563 block: Block {
2564 number: 2,
2565 hash: Bytes::from("0x02"),
2566 parent_hash: Bytes::from("0x01"),
2567 chain: Chain::Ethereum,
2568 ts: Default::default(),
2569 },
2570 revert: false,
2571 component_tvl: [
2572 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
2576 .into_iter()
2577 .collect(),
2578 ..Default::default()
2579 },
2580 BlockAggregatedChanges {
2582 extractor: "uniswap-v2".to_string(),
2583 chain: Chain::Ethereum,
2584 block: Block {
2585 number: 3,
2586 hash: Bytes::from("0x03"),
2587 parent_hash: Bytes::from("0x02"),
2588 chain: Chain::Ethereum,
2589 ts: Default::default(),
2590 },
2591 revert: false,
2592 ..Default::default()
2593 },
2594 ];
2595
2596 let (handle, mut rx) = state_sync.start().await;
2597 let (jh, close_tx) = handle.split();
2598
2599 tx.send(deltas[0].clone())
2601 .await
2602 .expect("deltas channel msg 0 closed!");
2603
2604 let _ = timeout(Duration::from_millis(200), rx.recv())
2606 .await
2607 .expect("waiting for first state msg timed out!")
2608 .expect("state sync block sender closed!");
2609
2610 tx.send(deltas[1].clone())
2613 .await
2614 .expect("deltas channel msg 1 closed!");
2615 let second_msg = timeout(Duration::from_millis(200), rx.recv())
2616 .await
2617 .expect("waiting for second state msg timed out!")
2618 .expect("state sync block sender closed!")
2619 .expect("no error");
2620
2621 bg_done.notified().await;
2623
2624 tx.send(deltas[2].clone())
2625 .await
2626 .expect("deltas channel msg 2 closed!");
2627 let third_msg = timeout(Duration::from_millis(200), rx.recv())
2628 .await
2629 .expect("waiting for third state msg timed out!")
2630 .expect("state sync block sender closed!")
2631 .expect("no error");
2632
2633 let _ = close_tx.send(());
2634 jh.await
2635 .expect("state sync task panicked!");
2636
2637 let expected_second_msg = StateSyncMessage {
2639 header: BlockHeader {
2640 number: 2,
2641 hash: Bytes::from("0x02"),
2642 parent_hash: Bytes::from("0x01"),
2643 revert: false,
2644 ..Default::default()
2645 },
2646 snapshots: Snapshot::default(),
2647 deltas: Some(BlockAggregatedChanges {
2648 extractor: "uniswap-v2".to_string(),
2649 chain: Chain::Ethereum,
2650 block: Block {
2651 number: 2,
2652 hash: Bytes::from("0x02"),
2653 parent_hash: Bytes::from("0x01"),
2654 chain: Chain::Ethereum,
2655 ts: Default::default(),
2656 },
2657 revert: false,
2658 component_tvl: [
2659 ("Component1".to_string(), 6.0), ]
2663 .into_iter()
2664 .collect(),
2665 ..Default::default()
2666 }),
2667 removed_components: [(
2668 "Component2".to_string(),
2669 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2670 )]
2671 .into_iter()
2672 .collect(),
2673 };
2674
2675 let expected_third_msg = StateSyncMessage {
2677 header: BlockHeader {
2678 number: 3,
2679 hash: Bytes::from("0x03"),
2680 parent_hash: Bytes::from("0x02"),
2681 revert: false,
2682 ..Default::default()
2683 },
2684 snapshots: Snapshot {
2685 states: [(
2686 "Component3".to_string(),
2687 ComponentWithState {
2688 state: ProtocolComponentState::new(
2689 "Component3",
2690 Default::default(),
2691 Default::default(),
2692 ),
2693 component: ProtocolComponent {
2694 id: "Component3".to_string(),
2695 ..Default::default()
2696 },
2697 component_tvl: Some(10.0),
2698 entrypoints: vec![],
2699 },
2700 )]
2701 .into_iter()
2702 .collect(),
2703 vm_storage: HashMap::new(),
2704 },
2705 deltas: Some(deltas[2].clone()),
2706 removed_components: Default::default(),
2707 };
2708
2709 assert_eq!(second_msg, expected_second_msg);
2710 assert_eq!(third_msg, expected_third_msg);
2711 }
2712
2713 #[test_log::test(tokio::test)]
2714 async fn test_public_close_api_functionality() {
2715 let mut rpc_client = make_mock_client();
2722 let mut deltas_client = MockDeltasClient::new();
2723
2724 rpc_client
2726 .expect_get_protocol_components()
2727 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2728
2729 let (_tx, rx) = channel(1);
2731 deltas_client
2732 .expect_subscribe()
2733 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2734
2735 deltas_client
2737 .expect_unsubscribe()
2738 .return_once(|_| Ok(()));
2739
2740 let mut state_sync = ProtocolStateSynchronizer::new(
2741 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2742 true,
2743 ComponentFilter::with_tvl_range(0.0, 0.0),
2744 5, Duration::from_secs(0),
2746 true,
2747 false,
2748 true,
2749 ArcRPCClient(Arc::new(rpc_client)),
2750 ArcDeltasClient(Arc::new(deltas_client)),
2751 10000_u64, );
2753
2754 state_sync
2755 .initialize()
2756 .await
2757 .expect("Init should succeed");
2758
2759 let (handle, _rx) = state_sync.start().await;
2761 let (jh, close_tx) = handle.split();
2762
2763 tokio::time::sleep(Duration::from_millis(100)).await;
2765
2766 close_tx
2768 .send(())
2769 .expect("Should be able to send close signal");
2770 jh.await.expect("Task should not panic");
2772 }
2773
2774 #[test_log::test(tokio::test)]
2775 async fn test_cleanup_runs_when_state_sync_processing_errors() {
2776 let mut rpc_client = make_mock_client();
2781 let mut deltas_client = MockDeltasClient::new();
2782
2783 rpc_client
2785 .expect_get_protocol_components()
2786 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2787
2788 rpc_client
2790 .expect_get_protocol_states()
2791 .returning(|_| {
2792 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2793 });
2794
2795 let (tx, rx) = channel(10);
2797 deltas_client
2798 .expect_subscribe()
2799 .return_once(move |_, _| {
2800 let delta = BlockAggregatedChanges {
2802 extractor: "test".to_string(),
2803 chain: Chain::Ethereum,
2804 block: Block {
2805 hash: Bytes::from("0x0123"),
2806 number: 1,
2807 parent_hash: Bytes::from("0x0000"),
2808 chain: Chain::Ethereum,
2809 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2810 .unwrap()
2811 .naive_utc(),
2812 },
2813 revert: false,
2814 new_protocol_components: [(
2816 "new_component".to_string(),
2817 ProtocolComponent { id: "new_component".to_string(), ..Default::default() },
2818 )]
2819 .into_iter()
2820 .collect(),
2821 component_tvl: [("new_component".to_string(), 100.0)]
2822 .into_iter()
2823 .collect(),
2824 ..Default::default()
2825 };
2826
2827 tokio::spawn(async move {
2828 let _ = tx.send(delta).await;
2829 });
2831
2832 Ok((Uuid::default(), rx))
2833 });
2834
2835 deltas_client
2837 .expect_unsubscribe()
2838 .return_once(|_| Ok(()));
2839
2840 let mut state_sync = ProtocolStateSynchronizer::new(
2841 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2842 true,
2843 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2845 Duration::from_secs(0),
2846 true,
2847 false,
2848 true,
2849 ArcRPCClient(Arc::new(rpc_client)),
2850 ArcDeltasClient(Arc::new(deltas_client)),
2851 5000_u64,
2852 );
2853
2854 state_sync
2855 .initialize()
2856 .await
2857 .expect("Init should succeed");
2858
2859 state_sync.last_synced_block = Some(BlockHeader {
2861 hash: Bytes::from("0x0badc0ffee"),
2862 number: 42,
2863 parent_hash: Bytes::from("0xbadbeef0"),
2864 revert: false,
2865 timestamp: 123456789,
2866 partial_block_index: None,
2867 });
2868
2869 let (mut block_tx, _block_rx) = channel(10);
2871
2872 let (_end_tx, end_rx) = oneshot::channel::<()>();
2874 let result = state_sync
2875 .state_sync(&mut block_tx, end_rx)
2876 .await;
2877 assert!(result.is_err(), "state_sync should have errored during processing");
2879
2880 }
2883
2884 #[test_log::test(tokio::test)]
2885 async fn test_close_signal_while_waiting_for_first_deltas() {
2886 let mut rpc_client = make_mock_client();
2890 let mut deltas_client = MockDeltasClient::new();
2891
2892 rpc_client
2893 .expect_get_protocol_components()
2894 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2895
2896 let (_tx, rx) = channel(1);
2897 deltas_client
2898 .expect_subscribe()
2899 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2900
2901 deltas_client
2902 .expect_unsubscribe()
2903 .return_once(|_| Ok(()));
2904
2905 let mut state_sync = ProtocolStateSynchronizer::new(
2906 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2907 true,
2908 ComponentFilter::with_tvl_range(0.0, 0.0),
2909 1,
2910 Duration::from_secs(0),
2911 true,
2912 true,
2913 false,
2914 ArcRPCClient(Arc::new(rpc_client)),
2915 ArcDeltasClient(Arc::new(deltas_client)),
2916 10000_u64,
2917 );
2918
2919 state_sync
2920 .initialize()
2921 .await
2922 .expect("Init should succeed");
2923
2924 let (mut block_tx, _block_rx) = channel(10);
2925 let (end_tx, end_rx) = oneshot::channel::<()>();
2926
2927 let state_sync_handle = tokio::spawn(async move {
2929 state_sync
2930 .state_sync(&mut block_tx, end_rx)
2931 .await
2932 });
2933
2934 tokio::time::sleep(Duration::from_millis(100)).await;
2936
2937 let _ = end_tx.send(());
2939
2940 let result = state_sync_handle
2942 .await
2943 .expect("Task should not panic");
2944 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2945
2946 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2947 }
2948
2949 #[test_log::test(tokio::test)]
2950 async fn test_close_signal_during_main_processing_loop() {
2951 let mut rpc_client = make_mock_client();
2957 let mut deltas_client = MockDeltasClient::new();
2958
2959 rpc_client
2961 .expect_get_protocol_components()
2962 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2963
2964 rpc_client
2966 .expect_get_protocol_states()
2967 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2968
2969 rpc_client
2970 .expect_get_component_tvl()
2971 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2972
2973 rpc_client
2974 .expect_get_traced_entry_points()
2975 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2976
2977 let (tx, rx) = channel(10);
2979 deltas_client
2980 .expect_subscribe()
2981 .return_once(move |_, _| {
2982 let first_delta = BlockAggregatedChanges {
2984 extractor: "test".to_string(),
2985 chain: Chain::Ethereum,
2986 block: Block {
2987 hash: Bytes::from("0x0123"),
2988 number: 1,
2989 parent_hash: Bytes::from("0x0000"),
2990 chain: Chain::Ethereum,
2991 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2992 .unwrap()
2993 .naive_utc(),
2994 },
2995 revert: false,
2996 ..Default::default()
2997 };
2998
2999 tokio::spawn(async move {
3000 let _ = tx.send(first_delta).await;
3001 tokio::time::sleep(Duration::from_secs(30)).await;
3004 });
3005
3006 Ok((Uuid::default(), rx))
3007 });
3008
3009 deltas_client
3010 .expect_unsubscribe()
3011 .return_once(|_| Ok(()));
3012
3013 let mut state_sync = ProtocolStateSynchronizer::new(
3014 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3015 true,
3016 ComponentFilter::with_tvl_range(0.0, 1000.0),
3017 1,
3018 Duration::from_secs(0),
3019 true,
3020 false,
3021 true,
3022 ArcRPCClient(Arc::new(rpc_client)),
3023 ArcDeltasClient(Arc::new(deltas_client)),
3024 10000_u64,
3025 );
3026
3027 state_sync
3028 .initialize()
3029 .await
3030 .expect("Init should succeed");
3031
3032 let (mut block_tx, mut block_rx) = channel(10);
3033 let (end_tx, end_rx) = oneshot::channel::<()>();
3034
3035 let state_sync_handle = tokio::spawn(async move {
3037 state_sync
3038 .state_sync(&mut block_tx, end_rx)
3039 .await
3040 });
3041
3042 let first_snapshot = block_rx
3044 .recv()
3045 .await
3046 .expect("Should receive first snapshot")
3047 .expect("Synchronizer error");
3048 assert!(
3049 !first_snapshot
3050 .snapshots
3051 .states
3052 .is_empty() ||
3053 first_snapshot.deltas.is_some()
3054 );
3055 let _ = end_tx.send(());
3057
3058 let result = state_sync_handle
3060 .await
3061 .expect("Task should not panic");
3062 assert!(
3063 result.is_ok(),
3064 "state_sync should exit cleanly when closed after first message: {result:?}"
3065 );
3066 }
3067
3068 #[test_log::test(tokio::test)]
3069 async fn test_max_retries_exceeded_error_propagation() {
3070 let mut rpc_client = make_mock_client();
3074 let mut deltas_client = MockDeltasClient::new();
3075
3076 rpc_client
3078 .expect_get_protocol_components()
3079 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
3080
3081 deltas_client
3084 .expect_subscribe()
3085 .returning(|_, _| {
3086 Err(DeltasError::NotConnected)
3088 });
3089
3090 deltas_client
3092 .expect_unsubscribe()
3093 .returning(|_| Ok(()))
3094 .times(0..=5);
3095
3096 let mut state_sync = ProtocolStateSynchronizer::new(
3098 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3099 true,
3100 ComponentFilter::with_tvl_range(0.0, 1000.0),
3101 2, Duration::from_millis(10), true,
3104 false,
3105 true,
3106 ArcRPCClient(Arc::new(rpc_client)),
3107 ArcDeltasClient(Arc::new(deltas_client)),
3108 1000_u64,
3109 );
3110
3111 state_sync
3112 .initialize()
3113 .await
3114 .expect("Init should succeed");
3115
3116 let (handle, mut rx) = state_sync.start().await;
3118 let (jh, _close_tx) = handle.split();
3119
3120 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3121 .await
3122 .expect("responsds in time")
3123 .expect("channel open");
3124
3125 if let Err(err) = res {
3127 assert!(
3128 matches!(err, SynchronizerError::ConnectionClosed),
3129 "Expected ConnectionClosed error, got: {:?}",
3130 err
3131 );
3132 } else {
3133 panic!("Expected an error")
3134 }
3135
3136 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
3138 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
3139 }
3140
3141 #[test_log::test(tokio::test)]
3142 async fn test_is_next_expected() {
3143 let mut state_sync = with_mocked_clients(true, false, None, None);
3147
3148 let incoming_header = BlockHeader {
3150 number: 100,
3151 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3152 parent_hash: Bytes::from(
3153 "0x0000000000000000000000000000000000000000000000000000000000000000",
3154 ),
3155 revert: false,
3156 timestamp: 123456789,
3157 partial_block_index: None,
3158 };
3159 assert!(
3160 !state_sync.is_next_expected(&incoming_header),
3161 "Should return false when no previous block is set"
3162 );
3163
3164 let previous_header = BlockHeader {
3166 number: 99,
3167 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
3168 parent_hash: Bytes::from(
3169 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
3170 ),
3171 revert: false,
3172 timestamp: 123456788,
3173 partial_block_index: None,
3174 };
3175 state_sync.last_synced_block = Some(previous_header.clone());
3176
3177 assert!(
3178 state_sync.is_next_expected(&incoming_header),
3179 "Should return true when incoming parent_hash matches previous hash"
3180 );
3181
3182 let non_matching_header = BlockHeader {
3184 number: 100,
3185 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3186 parent_hash: Bytes::from(
3187 "0x1111111111111111111111111111111111111111111111111111111111111111",
3188 ), revert: false,
3190 timestamp: 123456789,
3191 partial_block_index: None,
3192 };
3193 assert!(
3194 !state_sync.is_next_expected(&non_matching_header),
3195 "Should return false when incoming parent_hash doesn't match previous hash"
3196 );
3197 }
3198
3199 #[test_log::test(tokio::test)]
3200 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
3201 let mut rpc_client = make_mock_client();
3205 let mut deltas_client = MockDeltasClient::new();
3206
3207 rpc_client
3209 .expect_get_protocol_components()
3210 .returning(|_| {
3211 Ok(Page::new(
3212 vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3213 1,
3214 0,
3215 100,
3216 ))
3217 });
3218
3219 let (tx, rx) = channel(10);
3221 deltas_client
3222 .expect_subscribe()
3223 .return_once(move |_, _| {
3224 let expected_next_delta = BlockAggregatedChanges {
3225 extractor: "uniswap-v2".to_string(),
3226 chain: Chain::Ethereum,
3227 block: Block {
3228 hash: Bytes::from(
3229 "0x0000000000000000000000000000000000000000000000000000000000000002",
3230 ), number: 2,
3232 parent_hash: Bytes::from(
3233 "0x0000000000000000000000000000000000000000000000000000000000000001",
3234 ), chain: Chain::Ethereum,
3236 ts: chrono::DateTime::from_timestamp(1234567890, 0)
3237 .unwrap()
3238 .naive_utc(),
3239 },
3240 revert: false,
3241 ..Default::default()
3242 };
3243
3244 tokio::spawn(async move {
3245 let _ = tx.send(expected_next_delta).await;
3246 });
3247
3248 Ok((Uuid::default(), rx))
3249 });
3250
3251 deltas_client
3252 .expect_unsubscribe()
3253 .return_once(|_| Ok(()));
3254
3255 let mut state_sync = ProtocolStateSynchronizer::new(
3256 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3257 true,
3258 ComponentFilter::with_tvl_range(0.0, 1000.0),
3259 1,
3260 Duration::from_secs(0),
3261 true, false,
3263 true,
3264 ArcRPCClient(Arc::new(rpc_client)),
3265 ArcDeltasClient(Arc::new(deltas_client)),
3266 10000_u64,
3267 );
3268
3269 state_sync
3271 .initialize()
3272 .await
3273 .expect("Init should succeed");
3274
3275 state_sync.last_synced_block = Some(BlockHeader {
3277 number: 1,
3278 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
3280 "0x0000000000000000000000000000000000000000000000000000000000000000",
3281 ),
3282 revert: false,
3283 timestamp: 123456789,
3284 partial_block_index: None,
3285 });
3286
3287 let (mut block_tx, mut block_rx) = channel(10);
3288 let (end_tx, end_rx) = oneshot::channel::<()>();
3289
3290 let state_sync_handle = tokio::spawn(async move {
3292 state_sync
3293 .state_sync(&mut block_tx, end_rx)
3294 .await
3295 });
3296
3297 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
3299 .await
3300 .expect("Should receive message within timeout")
3301 .expect("Channel should be open")
3302 .expect("Should not be an error");
3303
3304 let _ = end_tx.send(());
3306
3307 let _ = state_sync_handle
3309 .await
3310 .expect("Task should not panic");
3311
3312 assert!(result_msg.deltas.is_some(), "Should contain deltas");
3315 assert!(
3316 result_msg.snapshots.states.is_empty(),
3317 "Should not contain snapshots when next expected block is received"
3318 );
3319
3320 if let Some(deltas) = &result_msg.deltas {
3322 assert_eq!(deltas.block.number, 2);
3323 assert_eq!(
3324 deltas.block.hash,
3325 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
3326 );
3327 assert_eq!(
3328 deltas.block.parent_hash,
3329 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
3330 );
3331 }
3332 }
3333
3334 #[test_log::test(tokio::test)]
3335 async fn test_skip_previously_processed_messages() {
3336 let mut rpc_client = make_mock_client();
3340 let mut deltas_client = MockDeltasClient::new();
3341
3342 rpc_client
3344 .expect_get_protocol_components()
3345 .returning(|_| {
3346 Ok(Page::new(
3347 vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3348 1,
3349 0,
3350 100,
3351 ))
3352 });
3353
3354 rpc_client
3356 .expect_get_protocol_states()
3357 .returning(|_| {
3358 Ok(Page::new(
3359 vec![ProtocolComponentState::new(
3360 "Component1",
3361 Default::default(),
3362 Default::default(),
3363 )],
3364 1,
3365 0,
3366 100,
3367 ))
3368 });
3369
3370 rpc_client
3371 .expect_get_component_tvl()
3372 .returning(|_| {
3373 Ok(Page::new(
3374 [("Component1".to_string(), 100.0)]
3375 .into_iter()
3376 .collect(),
3377 1,
3378 0,
3379 100,
3380 ))
3381 });
3382
3383 rpc_client
3384 .expect_get_traced_entry_points()
3385 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3386
3387 let (tx, rx) = channel(10);
3389 deltas_client
3390 .expect_subscribe()
3391 .return_once(move |_, _| {
3392 let old_messages = vec![
3394 BlockAggregatedChanges {
3395 extractor: "uniswap-v2".to_string(),
3396 chain: Chain::Ethereum,
3397 block: Block {
3398 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3399 number: 3,
3400 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
3401 chain: Chain::Ethereum,
3402 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
3403 },
3404 revert: false,
3405 ..Default::default()
3406 },
3407 BlockAggregatedChanges {
3408 extractor: "uniswap-v2".to_string(),
3409 chain: Chain::Ethereum,
3410 block: Block {
3411 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3412 number: 4,
3413 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3414 chain: Chain::Ethereum,
3415 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
3416 },
3417 revert: false,
3418 ..Default::default()
3419 },
3420 BlockAggregatedChanges {
3421 extractor: "uniswap-v2".to_string(),
3422 chain: Chain::Ethereum,
3423 block: Block {
3424 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3425 number: 5,
3426 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3427 chain: Chain::Ethereum,
3428 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
3429 },
3430 revert: false,
3431 ..Default::default()
3432 },
3433 BlockAggregatedChanges {
3435 extractor: "uniswap-v2".to_string(),
3436 chain: Chain::Ethereum,
3437 block: Block {
3438 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
3439 number: 6,
3440 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3441 chain: Chain::Ethereum,
3442 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
3443 },
3444 revert: false,
3445 ..Default::default()
3446 },
3447 ];
3448
3449 tokio::spawn(async move {
3450 for message in old_messages {
3451 let _ = tx.send(message).await;
3452 tokio::time::sleep(Duration::from_millis(10)).await;
3453 }
3454 });
3455
3456 Ok((Uuid::default(), rx))
3457 });
3458
3459 deltas_client
3460 .expect_unsubscribe()
3461 .return_once(|_| Ok(()));
3462
3463 let mut state_sync = ProtocolStateSynchronizer::new(
3464 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3465 true,
3466 ComponentFilter::with_tvl_range(0.0, 1000.0),
3467 1,
3468 Duration::from_secs(0),
3469 true,
3470 true,
3471 true,
3472 ArcRPCClient(Arc::new(rpc_client)),
3473 ArcDeltasClient(Arc::new(deltas_client)),
3474 10000_u64,
3475 );
3476
3477 state_sync
3479 .initialize()
3480 .await
3481 .expect("Init should succeed");
3482
3483 state_sync.last_synced_block = Some(BlockHeader {
3484 number: 5,
3485 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3486 parent_hash: Bytes::from(
3487 "0x0000000000000000000000000000000000000000000000000000000000000004",
3488 ),
3489 revert: false,
3490 timestamp: 1234567892,
3491 partial_block_index: None,
3492 });
3493
3494 let (mut block_tx, mut block_rx) = channel(10);
3495 let (end_tx, end_rx) = oneshot::channel::<()>();
3496
3497 let state_sync_handle = tokio::spawn(async move {
3499 state_sync
3500 .state_sync(&mut block_tx, end_rx)
3501 .await
3502 });
3503
3504 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
3506 .await
3507 .expect("Should receive message within timeout")
3508 .expect("Channel should be open")
3509 .expect("Should not be an error");
3510
3511 let _ = end_tx.send(());
3513
3514 let _ = state_sync_handle
3516 .await
3517 .expect("Task should not panic");
3518
3519 assert!(result_msg.deltas.is_some(), "Should contain deltas");
3521 if let Some(deltas) = &result_msg.deltas {
3522 assert_eq!(
3523 deltas.block.number, 6,
3524 "Should only process block 6, skipping earlier blocks"
3525 );
3526 assert_eq!(
3527 deltas.block.hash,
3528 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
3529 );
3530 }
3531
3532 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3535 Err(_) => {
3536 }
3538 Ok(Some(Err(_))) => {
3539 }
3541 Ok(Some(Ok(_))) => {
3542 panic!("Should not receive additional messages - old blocks should be skipped");
3543 }
3544 Ok(None) => {
3545 }
3547 }
3548 }
3549
3550 fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockAggregatedChanges {
3551 let hash = Bytes::from(vec![block_num as u8; 32]);
3553 let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
3554 BlockAggregatedChanges {
3555 extractor: "uniswap-v2".to_string(),
3556 chain: Chain::Ethereum,
3557 block: Block {
3558 number: block_num,
3559 hash,
3560 parent_hash,
3561 chain: Chain::Ethereum,
3562 ts: Default::default(),
3563 },
3564 revert: false,
3565 partial_block_index: partial_idx,
3566 ..Default::default()
3567 }
3568 }
3569
3570 #[test_log::test(tokio::test)]
3572 async fn test_partial_mode_accepts_full_block_as_first_message() {
3573 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3574 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3575 .with_partial_blocks(true);
3576 state_sync
3577 .initialize()
3578 .await
3579 .expect("Init failed");
3580
3581 let (handle, mut block_rx) = state_sync.start().await;
3582 let (jh, close_tx) = handle.split();
3583
3584 tx.send(make_block_changes(1, None))
3586 .await
3587 .unwrap();
3588
3589 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3591 .await
3592 .expect("Should receive message")
3593 .expect("Channel open")
3594 .expect("No error");
3595
3596 assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
3597 assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
3598
3599 let _ = close_tx.send(());
3600 jh.await.expect("Task should not panic");
3601 }
3602
3603 #[test_log::test(tokio::test)]
3605 async fn test_partial_mode_detects_block_number_increase() {
3606 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3607 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3608 .with_partial_blocks(true);
3609 state_sync
3610 .initialize()
3611 .await
3612 .expect("Init failed");
3613
3614 let (handle, mut block_rx) = state_sync.start().await;
3615 let (jh, close_tx) = handle.split();
3616
3617 tx.send(make_block_changes(1, Some(0)))
3619 .await
3620 .unwrap();
3621 tx.send(make_block_changes(1, Some(3)))
3622 .await
3623 .unwrap();
3624
3625 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3627 Err(_) => { }
3628 Ok(_) => panic!("Should not receive message while waiting for new block"),
3629 }
3630
3631 tx.send(make_block_changes(2, Some(5)))
3634 .await
3635 .unwrap();
3636
3637 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3639 .await
3640 .expect("Should receive message")
3641 .expect("Channel open")
3642 .expect("No error");
3643
3644 assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3645 assert_eq!(msg.header.partial_block_index, Some(5));
3646
3647 let _ = close_tx.send(());
3648 jh.await.expect("Task should not panic");
3649 }
3650
3651 #[test_log::test(tokio::test)]
3653 async fn test_partial_mode_skips_already_synced_blocks() {
3654 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3655 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3656 .with_partial_blocks(true);
3657 state_sync
3658 .initialize()
3659 .await
3660 .expect("Init failed");
3661
3662 state_sync.last_synced_block = Some(BlockHeader {
3664 number: 5,
3665 hash: Bytes::from("0x05"),
3666 parent_hash: Bytes::from("0x04"),
3667 revert: false,
3668 timestamp: 0,
3669 partial_block_index: None,
3670 });
3671
3672 let (handle, mut block_rx) = state_sync.start().await;
3673 let (jh, close_tx) = handle.split();
3674
3675 tx.send(make_block_changes(3, Some(2)))
3677 .await
3678 .unwrap();
3679
3680 tx.send(make_block_changes(4, Some(0)))
3682 .await
3683 .unwrap();
3684
3685 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3687 Err(_) => { }
3688 Ok(_) => panic!("Should skip block 4 because it's already synced"),
3689 }
3690
3691 tx.send(make_block_changes(5, Some(3)))
3694 .await
3695 .unwrap();
3696 tx.send(make_block_changes(6, Some(0)))
3698 .await
3699 .unwrap();
3700
3701 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3702 .await
3703 .expect("Should receive message")
3704 .expect("Channel open")
3705 .expect("No error");
3706
3707 assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3708
3709 let _ = close_tx.send(());
3710 jh.await.expect("Task should not panic");
3711 }
3712
3713 #[test_log::test(tokio::test)]
3714 async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3715 let header = BlockHeader::default();
3716 let mut rpc = make_mock_client();
3717 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3718
3719 let component_clone = component.clone();
3720 rpc.expect_get_snapshots()
3721 .returning(move |_request, _chunk_size, _concurrency| {
3722 Ok(Snapshot {
3723 states: [(
3724 "Component1".to_string(),
3725 ComponentWithState {
3726 state: ProtocolComponentState::new(
3727 "Component1",
3728 Default::default(),
3729 Default::default(),
3730 ),
3731 component: component_clone.clone(),
3732 entrypoints: vec![],
3733 component_tvl: None,
3734 },
3735 )]
3736 .into_iter()
3737 .collect(),
3738 vm_storage: HashMap::new(),
3739 })
3740 });
3741
3742 rpc.expect_get_traced_entry_points()
3744 .never();
3745
3746 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3747 state_sync
3749 .component_tracker
3750 .components
3751 .insert("Component1".to_string(), component);
3752
3753 let components_arg = ["Component1".to_string()];
3754 let req_ids: Vec<String> = components_arg.to_vec();
3755 let components: HashMap<_, _> = state_sync
3756 .component_tracker
3757 .components
3758 .iter()
3759 .filter(|(id, _)| req_ids.contains(id))
3760 .map(|(k, v)| (k.clone(), v.clone()))
3761 .collect();
3762 let contract_ids: HashSet<Bytes> = state_sync
3763 .component_tracker
3764 .get_contracts_by_component(&req_ids)
3765 .into_iter()
3766 .collect();
3767 let params = FetchSnapshotParams {
3768 chain: Chain::Ethereum,
3769 protocol_system: "uniswap-v2".to_string(),
3770 block_number: header.number,
3771 uses_dci: false,
3772 retrieve_balances: true,
3773 include_tvl: false,
3774 };
3775 let (snapshot, _, _) =
3776 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
3777 .await
3778 .expect("Retrieving snapshot failed");
3779
3780 assert!(snapshot
3781 .states
3782 .contains_key("Component1"));
3783 }
3784
3785 #[test_log::test(tokio::test)]
3786 async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3787 let header = BlockHeader::default();
3788 let mut rpc = make_mock_client();
3789 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3790
3791 let component_clone = component.clone();
3792 rpc.expect_get_snapshots()
3793 .returning(move |_request, _chunk_size, _concurrency| {
3794 Ok(Snapshot {
3795 states: [(
3796 "Component1".to_string(),
3797 ComponentWithState {
3798 state: ProtocolComponentState::new(
3799 "Component1",
3800 Default::default(),
3801 Default::default(),
3802 ),
3803 component: component_clone.clone(),
3804 entrypoints: vec![],
3805 component_tvl: None,
3806 },
3807 )]
3808 .into_iter()
3809 .collect(),
3810 vm_storage: HashMap::new(),
3811 })
3812 });
3813
3814 rpc.expect_get_traced_entry_points()
3816 .times(1)
3817 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3818
3819 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3820 state_sync
3821 .component_tracker
3822 .components
3823 .insert("Component1".to_string(), component);
3824
3825 let components_arg = ["Component1".to_string()];
3826 let req_ids: Vec<String> = components_arg.to_vec();
3827 let components: HashMap<_, _> = state_sync
3828 .component_tracker
3829 .components
3830 .iter()
3831 .filter(|(id, _)| req_ids.contains(id))
3832 .map(|(k, v)| (k.clone(), v.clone()))
3833 .collect();
3834 let contract_ids: HashSet<Bytes> = state_sync
3835 .component_tracker
3836 .get_contracts_by_component(&req_ids)
3837 .into_iter()
3838 .collect();
3839 let params = FetchSnapshotParams {
3840 chain: Chain::Ethereum,
3841 protocol_system: "uniswap-v2".to_string(),
3842 block_number: header.number,
3843 uses_dci: true,
3844 retrieve_balances: true,
3845 include_tvl: false,
3846 };
3847 let (snapshot, _, _) =
3848 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
3849 .await
3850 .expect("Retrieving snapshot failed");
3851
3852 assert!(snapshot
3853 .states
3854 .contains_key("Component1"));
3855 }
3856
3857 #[test_log::test(tokio::test)]
3868 async fn test_partial_mode_defers_brand_new_component_snapshot_to_next_block() {
3869 use std::time::Duration;
3870
3871 use tokio::{sync::mpsc::channel, time::timeout};
3872
3873 let bg_done = Arc::new(tokio::sync::Notify::new());
3874 let mut rpc_client = make_mock_client();
3875 rpc_client
3877 .expect_get_protocol_components()
3878 .withf(|params: &crate::rpc::ProtocolComponentsParams| {
3879 params
3880 .component_ids()
3881 .is_some_and(|ids| ids.contains(&"BrandNew".to_string()))
3882 })
3883 .returning(|_| {
3884 Ok(Page::new(
3885 vec![
3886 ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3887 ProtocolComponent { id: "Preexisting".to_string(), ..Default::default() },
3888 ],
3889 2,
3890 0,
3891 100,
3892 ))
3893 });
3894 rpc_client
3896 .expect_get_protocol_components()
3897 .returning(|_| {
3898 Ok(Page::new(
3899 vec![
3900 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
3901 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
3902 ],
3903 2,
3904 0,
3905 100,
3906 ))
3907 });
3908 let bg_done_clone = bg_done.clone();
3910 rpc_client
3911 .expect_get_snapshots()
3912 .withf(
3913 |request: &SnapshotParameters,
3914 _chunk_size: &Option<usize>,
3915 _concurrency: &usize| {
3916 request.block_number == 2 &&
3917 (request
3918 .components
3919 .contains_key("BrandNew") ||
3920 request
3921 .components
3922 .contains_key("Preexisting"))
3923 },
3924 )
3925 .returning(move |_request, _chunk_size, _concurrency| {
3926 let snap = Ok(Snapshot {
3927 states: [
3928 (
3929 "BrandNew".to_string(),
3930 ComponentWithState {
3931 state: ProtocolComponentState::new(
3932 "BrandNew",
3933 Default::default(),
3934 Default::default(),
3935 ),
3936 component: ProtocolComponent {
3937 id: "BrandNew".to_string(),
3938 ..Default::default()
3939 },
3940 component_tvl: Some(100.0),
3941 entrypoints: vec![],
3942 },
3943 ),
3944 (
3945 "Preexisting".to_string(),
3946 ComponentWithState {
3947 state: ProtocolComponentState::new(
3948 "Preexisting",
3949 Default::default(),
3950 Default::default(),
3951 ),
3952 component: ProtocolComponent {
3953 id: "Preexisting".to_string(),
3954 ..Default::default()
3955 },
3956 component_tvl: Some(75.0),
3957 entrypoints: vec![],
3958 },
3959 ),
3960 ]
3961 .into_iter()
3962 .collect(),
3963 vm_storage: HashMap::new(),
3964 });
3965 bg_done_clone.notify_one();
3966 snap
3967 });
3968 rpc_client
3970 .expect_get_snapshots()
3971 .returning(|_request, _chunk_size, _concurrency| {
3972 Ok(Snapshot {
3973 states: [
3974 (
3975 "Component1".to_string(),
3976 ComponentWithState {
3977 state: ProtocolComponentState::new(
3978 "Component1",
3979 Default::default(),
3980 Default::default(),
3981 ),
3982 component: ProtocolComponent {
3983 id: "Component1".to_string(),
3984 ..Default::default()
3985 },
3986 component_tvl: Some(100.0),
3987 entrypoints: vec![],
3988 },
3989 ),
3990 (
3991 "Component2".to_string(),
3992 ComponentWithState {
3993 state: ProtocolComponentState::new(
3994 "Component2",
3995 Default::default(),
3996 Default::default(),
3997 ),
3998 component: ProtocolComponent {
3999 id: "Component2".to_string(),
4000 ..Default::default()
4001 },
4002 component_tvl: Some(0.0),
4003 entrypoints: vec![],
4004 },
4005 ),
4006 ]
4007 .into_iter()
4008 .collect(),
4009 vm_storage: HashMap::new(),
4010 })
4011 });
4012 rpc_client
4013 .expect_get_traced_entry_points()
4014 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
4015
4016 let mut deltas_client = MockDeltasClient::new();
4017 let (tx, rx) = channel(4);
4018 deltas_client
4019 .expect_subscribe()
4020 .return_once(move |_, _| Ok((Uuid::default(), rx)));
4021 deltas_client
4022 .expect_unsubscribe()
4023 .return_once(|_| Ok(()));
4024
4025 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
4026 .with_partial_blocks(true);
4027 state_sync
4028 .initialize()
4029 .await
4030 .expect("Init failed");
4031
4032 let (handle, mut block_rx) = state_sync.start().await;
4033 let (jh, close_tx) = handle.split();
4034
4035 tx.send(make_block_changes(1, None))
4037 .await
4038 .unwrap();
4039 let _msg1 = timeout(Duration::from_millis(200), block_rx.recv())
4040 .await
4041 .expect("Should receive initial + block 1")
4042 .expect("Channel open")
4043 .expect("No error");
4044
4045 let mut block2 = make_block_changes(2, Some(2));
4048 block2.new_protocol_components = HashMap::from([(
4049 "BrandNew".to_string(),
4050 ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
4051 )]);
4052 block2.component_tvl =
4053 HashMap::from([("BrandNew".to_string(), 100.0), ("Preexisting".to_string(), 75.0)]);
4054 tx.send(block2).await.unwrap();
4055 let msg2 = timeout(Duration::from_millis(200), block_rx.recv())
4056 .await
4057 .expect("Should receive block 2")
4058 .expect("Channel open")
4059 .expect("No error");
4060
4061 assert!(
4062 !msg2
4063 .snapshots
4064 .states
4065 .contains_key("Preexisting"),
4066 "Preexisting should still be deferred in block 2, not yet snapshotted; got: {:?}",
4067 msg2.snapshots
4068 .states
4069 .keys()
4070 .collect::<Vec<_>>()
4071 );
4072 assert!(
4073 !msg2
4074 .snapshots
4075 .states
4076 .contains_key("BrandNew"),
4077 "BrandNew should still be deferred in block 2, not yet snapshotted"
4078 );
4079
4080 tx.send(make_block_changes(3, Some(1)))
4083 .await
4084 .unwrap();
4085 let msg3 = timeout(Duration::from_millis(200), block_rx.recv())
4086 .await
4087 .expect("Should receive block 3")
4088 .expect("Channel open")
4089 .expect("No error");
4090
4091 assert_eq!(msg3.header.number, 3);
4092 assert_eq!(msg3.header.partial_block_index, Some(1));
4093 assert!(
4094 !msg3
4095 .snapshots
4096 .states
4097 .contains_key("BrandNew"),
4098 "BrandNew task just fired; snapshot not yet available in msg3"
4099 );
4100 assert!(
4101 !msg3
4102 .snapshots
4103 .states
4104 .contains_key("Preexisting"),
4105 "Preexisting task just fired; snapshot not yet available in msg3"
4106 );
4107
4108 bg_done.notified().await;
4110
4111 tx.send(make_block_changes(4, Some(0)))
4113 .await
4114 .unwrap();
4115 let msg4 = timeout(Duration::from_millis(200), block_rx.recv())
4116 .await
4117 .expect("Should receive block 4")
4118 .expect("Channel open")
4119 .expect("No error");
4120
4121 assert_eq!(msg4.header.number, 4);
4122 assert_eq!(msg4.header.partial_block_index, Some(0));
4123 assert!(
4124 msg4.snapshots
4125 .states
4126 .contains_key("BrandNew"),
4127 "BrandNew snapshot should be in msg4 after background task drains; got keys: {:?}",
4128 msg4.snapshots
4129 .states
4130 .keys()
4131 .collect::<Vec<_>>()
4132 );
4133 assert!(
4134 msg4.snapshots
4135 .states
4136 .contains_key("Preexisting"),
4137 "Preexisting snapshot should be in msg4 after background task drains; got keys: {:?}",
4138 msg4.snapshots
4139 .states
4140 .keys()
4141 .collect::<Vec<_>>()
4142 );
4143
4144 let _ = close_tx.send(());
4145 jh.await.expect("Task should not panic");
4146 }
4147
4148 #[test]
4152 fn test_apply_deltas_to_snapshot() {
4153 use tycho_common::models::{
4154 contract::{Account, AccountDelta},
4155 protocol::{ComponentBalance, ProtocolComponentStateDelta},
4156 ChangeType,
4157 };
4158
4159 let contract_addr = Bytes::from("0xc0ffee");
4160 let token_addr = Bytes::from("0xdeadbeef");
4161
4162 let mut snapshot = Snapshot {
4165 states: [(
4166 "comp1".to_string(),
4167 ComponentWithState {
4168 state: ProtocolComponentState::new(
4169 "comp1",
4170 [
4171 ("keep".to_string(), Bytes::from("0x01")),
4172 ("delete_me".to_string(), Bytes::from("0x02")),
4173 ]
4174 .into_iter()
4175 .collect(),
4176 [(token_addr.clone(), Bytes::from("0x64"))]
4177 .into_iter()
4178 .collect(),
4179 ),
4180 component: ProtocolComponent::default(),
4181 component_tvl: None,
4182 entrypoints: vec![],
4183 },
4184 )]
4185 .into_iter()
4186 .collect(),
4187 vm_storage: [(
4188 contract_addr.clone(),
4189 Account {
4190 chain: Chain::Ethereum,
4191 address: contract_addr.clone(),
4192 title: String::new(),
4193 slots: [(Bytes::from("0x01"), Bytes::from("0xaa"))]
4194 .into_iter()
4195 .collect(),
4196 native_balance: Bytes::from("0x10"),
4197 token_balances: HashMap::new(),
4198 code: Bytes::from("0x0a0b"),
4199 code_hash: Default::default(),
4200 balance_modify_tx: Default::default(),
4201 code_modify_tx: Default::default(),
4202 creation_tx: None,
4203 },
4204 )]
4205 .into_iter()
4206 .collect(),
4207 };
4208
4209 let skipped_delta = BlockAggregatedChanges {
4212 block: Block { number: 5, ..Default::default() },
4213 state_deltas: [(
4214 "comp1".to_string(),
4215 ProtocolComponentStateDelta::new(
4216 "comp1",
4217 [("keep".to_string(), Bytes::from("0xff"))]
4218 .into_iter()
4219 .collect(),
4220 HashSet::new(),
4221 ),
4222 )]
4223 .into_iter()
4224 .collect(),
4225 ..Default::default()
4226 };
4227 let applied_delta = BlockAggregatedChanges {
4228 block: Block { number: 6, ..Default::default() },
4229 state_deltas: [(
4230 "comp1".to_string(),
4231 ProtocolComponentStateDelta::new(
4232 "comp1",
4233 [("keep".to_string(), Bytes::from("0x99"))]
4234 .into_iter()
4235 .collect(),
4236 ["delete_me".to_string()]
4237 .into_iter()
4238 .collect(),
4239 ),
4240 )]
4241 .into_iter()
4242 .collect(),
4243 component_balances: [(
4244 "comp1".to_string(),
4245 [(
4246 token_addr.clone(),
4247 ComponentBalance::new(
4248 token_addr.clone(),
4249 Bytes::from("0xc8"),
4250 200.0,
4251 Default::default(),
4252 "comp1",
4253 ),
4254 )]
4255 .into_iter()
4256 .collect(),
4257 )]
4258 .into_iter()
4259 .collect(),
4260 account_deltas: [(
4261 contract_addr.clone(),
4262 AccountDelta::new(
4263 Chain::Ethereum,
4264 contract_addr.clone(),
4265 [
4266 (Bytes::from("0x01"), Some(Bytes::from("0xbb"))),
4267 (Bytes::from("0x02"), Some(Bytes::from("0xcc"))),
4268 ]
4269 .into_iter()
4270 .collect(),
4271 Some(Bytes::from("0x20")),
4272 Some(Bytes::from("0x0c0d")),
4273 ChangeType::Update,
4274 ),
4275 )]
4276 .into_iter()
4277 .collect(),
4278 ..Default::default()
4279 };
4280
4281 let mut sync = with_mocked_clients(true, false, None, None);
4282 sync.buffered_deltas = vec![skipped_delta, applied_delta];
4283
4284 let contract_ids: HashSet<Bytes> = [contract_addr.clone()]
4285 .into_iter()
4286 .collect();
4287 sync.apply_deltas_to_snapshot(&mut snapshot, 5, &contract_ids);
4288
4289 let comp = &snapshot.states["comp1"].state;
4290
4291 assert_eq!(comp.attributes["keep"], Bytes::from("0x99"));
4293 assert!(!comp
4295 .attributes
4296 .contains_key("delete_me"));
4297 assert_eq!(comp.balances[&token_addr], Bytes::from("0xc8"));
4299
4300 let account = &snapshot.vm_storage[&contract_addr];
4301 assert_eq!(account.slots[&Bytes::from("0x01")], Bytes::from("0xbb"));
4303 assert_eq!(account.slots[&Bytes::from("0x02")], Bytes::from("0xcc"));
4304 assert_eq!(account.native_balance, Bytes::from("0x20"));
4306 assert_eq!(account.code, Bytes::from("0x0c0d"));
4308 }
4309}