1use std::{
2 collections::{HashMap, HashSet},
3 time::Duration,
4};
5
6use async_trait::async_trait;
7use thiserror::Error;
8use tokio::{
9 select,
10 sync::{
11 mpsc::{channel, error::SendError, Receiver, Sender},
12 oneshot,
13 },
14 task::JoinHandle,
15 time::{sleep, timeout},
16};
17use tracing::{debug, error, info, instrument, warn};
18use tycho_common::{
19 models::{
20 blockchain::{
21 BlockAggregatedChanges, DCIUpdate, EntryPointWithTracingParams, TracingResult,
22 },
23 contract::Account,
24 protocol::{ProtocolComponent, ProtocolComponentState},
25 Chain, ExtractorIdentity,
26 },
27 Bytes,
28};
29
30use crate::{
31 deltas::{DeltasClient, SubscriptionOptions},
32 feed::{
33 component_tracker::{ComponentFilter, ComponentTracker},
34 BlockHeader, HeaderLike,
35 },
36 rpc::{
37 RPCClient, RPCError, SnapshotParameters, TracedEntryPointsPaginatedParams,
38 RPC_CLIENT_CONCURRENCY,
39 },
40 DeltasError,
41};
42
43#[derive(Error, Debug)]
44pub enum SynchronizerError {
45 #[error("RPC error: {0}")]
47 RPCError(#[from] RPCError),
48
49 #[error("{0}")]
51 ChannelError(String),
52
53 #[error("Timeout error: {0}")]
55 Timeout(String),
56
57 #[error("Failed to close synchronizer: {0}")]
59 CloseError(String),
60
61 #[error("Connection error: {0}")]
63 ConnectionError(String),
64
65 #[error("Connection closed")]
67 ConnectionClosed,
68
69 #[error("Internal error: {0}")]
71 Internal(String),
72}
73
74pub type SyncResult<T> = Result<T, SynchronizerError>;
75
76impl SynchronizerError {
77 pub fn is_transient(&self) -> bool {
85 match self {
86 SynchronizerError::RPCError(e) => matches!(
87 e,
88 RPCError::HttpClient(_, _) |
89 RPCError::RateLimited(_) |
90 RPCError::ServerUnreachable(_) |
91 RPCError::StaleBlock(_)
92 ),
93 SynchronizerError::Timeout(_) |
94 SynchronizerError::ConnectionError(_) |
95 SynchronizerError::ConnectionClosed => true,
96 _ => false,
97 }
98 }
99}
100
101impl<T> From<SendError<T>> for SynchronizerError {
102 fn from(err: SendError<T>) -> Self {
103 SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
104 }
105}
106
107impl From<DeltasError> for SynchronizerError {
108 fn from(err: DeltasError) -> Self {
109 match err {
110 DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
111 _ => SynchronizerError::ConnectionError(err.to_string()),
112 }
113 }
114}
115
116pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
117 extractor_id: ExtractorIdentity,
118 retrieve_balances: bool,
119 rpc_client: R,
120 deltas_client: D,
121 max_retries: u64,
122 retry_cooldown: Duration,
123 include_snapshots: bool,
124 component_tracker: ComponentTracker<R>,
125 last_synced_block: Option<BlockHeader>,
126 timeout: u64,
127 include_tvl: bool,
128 compression: bool,
129 partial_blocks: bool,
130 uses_dci: bool,
131 snapshot_tasks: Vec<SnapshotTask>,
135 buffered_deltas: Vec<BlockAggregatedChanges>,
139 snapshot_queue: HashMap<String, SnapshotStatus>,
143}
144
145#[derive(Clone, PartialEq, Debug)]
146pub struct ComponentWithState {
147 pub state: ProtocolComponentState,
148 pub component: ProtocolComponent,
149 pub component_tvl: Option<f64>,
150 pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
151}
152
153#[derive(Clone, PartialEq, Debug, Default)]
154pub struct Snapshot {
155 pub states: HashMap<String, ComponentWithState>,
156 pub vm_storage: HashMap<Bytes, Account>,
157}
158
159impl Snapshot {
160 fn extend(&mut self, other: Snapshot) {
161 self.states.extend(other.states);
162 self.vm_storage.extend(other.vm_storage);
163 }
164
165 pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
166 &self.states
167 }
168
169 pub fn get_vm_storage(&self) -> &HashMap<Bytes, Account> {
170 &self.vm_storage
171 }
172}
173
174#[derive(Clone, PartialEq, Debug, Default)]
175pub struct StateSyncMessage<H>
176where
177 H: HeaderLike,
178{
179 pub header: H,
181 pub snapshots: Snapshot,
183 pub deltas: Option<BlockAggregatedChanges>,
187 pub removed_components: HashMap<String, ProtocolComponent>,
189}
190
191impl<H> StateSyncMessage<H>
192where
193 H: HeaderLike,
194{
195 pub fn merge(mut self, other: Self) -> Self {
196 self.removed_components
198 .retain(|k, _| !other.snapshots.states.contains_key(k));
199 self.snapshots
200 .states
201 .retain(|k, _| !other.removed_components.contains_key(k));
202
203 self.snapshots.extend(other.snapshots);
204 let deltas = match (self.deltas, other.deltas) {
205 (Some(l), Some(r)) => Some(l.merge(r)),
206 (None, Some(r)) => Some(r),
207 (Some(l), None) => Some(l),
208 (None, None) => None,
209 };
210 self.removed_components
211 .extend(other.removed_components);
212 Self {
213 header: other.header,
214 snapshots: self.snapshots,
215 deltas,
216 removed_components: self.removed_components,
217 }
218 }
219}
220
221#[derive(Debug, Clone, PartialEq)]
223enum SnapshotStatus {
224 Deferred,
226 InFlight,
228 RetryNext,
230 Blacklisted,
232}
233
234struct SnapshotFetchResult {
235 components: HashMap<String, ProtocolComponent>,
236 contract_ids: HashSet<Bytes>,
237 dci_update: DCIUpdate,
238 snapshot: Snapshot,
239 snapshot_block: u64,
240}
241
242struct SnapshotTask {
243 component_ids: Vec<String>,
244 snapshot_block: u64,
245 receiver: oneshot::Receiver<Result<SnapshotFetchResult, SynchronizerError>>,
246}
247
248pub struct SynchronizerTaskHandle {
253 join_handle: JoinHandle<()>,
254 close_tx: oneshot::Sender<()>,
255}
256
257impl SynchronizerTaskHandle {
266 pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
267 Self { join_handle, close_tx }
268 }
269
270 pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
276 (self.join_handle, self.close_tx)
277 }
278}
279
280#[async_trait]
281pub trait StateSynchronizer: Send + Sync + 'static {
282 async fn initialize(&mut self) -> SyncResult<()>;
283 async fn start(
286 mut self,
287 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
288}
289
290struct FetchSnapshotParams {
291 chain: Chain,
292 protocol_system: String,
293 block_number: u64,
294 uses_dci: bool,
295 retrieve_balances: bool,
296 include_tvl: bool,
297}
298
299async fn fetch_snapshot<R: RPCClient>(
305 rpc_client: &R,
306 components: HashMap<String, ProtocolComponent>,
307 mut contract_ids: HashSet<Bytes>,
308 params: &FetchSnapshotParams,
309) -> Result<(Snapshot, DCIUpdate, HashSet<Bytes>), SynchronizerError> {
310 if components.is_empty() {
311 return Ok((Snapshot::default(), DCIUpdate::default(), contract_ids));
312 }
313
314 let component_ids: Vec<String> = components.keys().cloned().collect();
315
316 let (dci_update, entrypoints_result) = if params.uses_dci {
317 let result = rpc_client
318 .get_traced_entry_points_paginated(TracedEntryPointsPaginatedParams::new(
319 params.chain,
320 ¶ms.protocol_system,
321 component_ids.clone(),
322 RPC_CLIENT_CONCURRENCY,
323 ))
324 .await?;
325 let dci_contracts: HashSet<Bytes> = result
326 .values()
327 .flat_map(|traces| {
328 traces
329 .iter()
330 .flat_map(|(_, tr)| tr.accessed_slots.keys().cloned())
331 })
332 .collect();
333 contract_ids.extend(dci_contracts);
334 let eps = result.clone();
335 let dci: DCIUpdate = result.into();
336 (dci, eps)
337 } else {
338 (DCIUpdate::default(), HashMap::new())
339 };
340
341 let contract_ids_vec: Vec<Bytes> = contract_ids.iter().cloned().collect();
342 let request = SnapshotParameters::new(
343 params.chain,
344 ¶ms.protocol_system,
345 &components,
346 &contract_ids_vec,
347 params.block_number,
348 )
349 .entrypoints(&entrypoints_result)
350 .include_balances(params.retrieve_balances)
351 .include_tvl(params.include_tvl);
352
353 let snapshot = rpc_client
354 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
355 .await?;
356
357 Ok((snapshot, dci_update, contract_ids))
358}
359
360async fn fetch_snapshot_background<R: RPCClient>(
363 rpc_client: R,
364 component_ids: Vec<String>,
365 params: FetchSnapshotParams,
366) -> Result<SnapshotFetchResult, SynchronizerError> {
367 if component_ids.is_empty() {
368 return Ok(SnapshotFetchResult {
369 components: HashMap::new(),
370 contract_ids: HashSet::new(),
371 dci_update: DCIUpdate::default(),
372 snapshot: Snapshot::default(),
373 snapshot_block: params.block_number,
374 });
375 }
376
377 let request = crate::rpc::ProtocolComponentsParams::new(params.chain, ¶ms.protocol_system)
378 .with_component_ids(component_ids);
379 let components: HashMap<String, ProtocolComponent> = rpc_client
380 .get_protocol_components(request)
381 .await?
382 .into_data()
383 .into_iter()
384 .map(|pc| (pc.id.clone(), pc))
385 .collect();
386
387 let contract_ids: HashSet<Bytes> = components
388 .values()
389 .flat_map(|c| c.contract_addresses.iter().cloned())
390 .collect();
391
392 let snapshot_block = params.block_number;
393 let (snapshot, dci_update, contract_ids) =
394 fetch_snapshot(&rpc_client, components.clone(), contract_ids, ¶ms).await?;
395
396 Ok(SnapshotFetchResult { components, contract_ids, dci_update, snapshot, snapshot_block })
397}
398
399impl<R, D> ProtocolStateSynchronizer<R, D>
400where
401 R: RPCClient + Clone + Send + Sync + 'static,
404 D: DeltasClient + Clone + Send + Sync + 'static,
405{
406 #[allow(clippy::too_many_arguments)]
408 pub fn new(
409 extractor_id: ExtractorIdentity,
410 retrieve_balances: bool,
411 component_filter: ComponentFilter,
412 max_retries: u64,
413 retry_cooldown: Duration,
414 include_snapshots: bool,
415 include_tvl: bool,
416 compression: bool,
417 rpc_client: R,
418 deltas_client: D,
419 timeout: u64,
420 ) -> Self {
421 Self {
422 extractor_id: extractor_id.clone(),
423 retrieve_balances,
424 rpc_client: rpc_client.clone(),
425 include_snapshots,
426 deltas_client,
427 component_tracker: ComponentTracker::new(
428 extractor_id.chain,
429 extractor_id.name.as_str(),
430 component_filter,
431 rpc_client,
432 ),
433 max_retries,
434 retry_cooldown,
435 last_synced_block: None,
436 timeout,
437 include_tvl,
438 compression,
439 partial_blocks: false,
440 uses_dci: false,
441 snapshot_tasks: Vec::new(),
442 buffered_deltas: Vec::new(),
443 snapshot_queue: HashMap::new(),
444 }
445 }
446
447 pub fn with_dci(mut self, uses_dci: bool) -> Self {
450 self.uses_dci = uses_dci;
451 self
452 }
453
454 pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
456 self.partial_blocks = partial_blocks;
457 self
458 }
459
460 #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
473 async fn state_sync(
474 &mut self,
475 block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
476 mut end_rx: oneshot::Receiver<()>,
477 ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
478 let subscription_options = SubscriptionOptions::new()
480 .with_state(self.include_snapshots)
481 .with_compression(self.compression)
482 .with_partial_blocks(self.partial_blocks);
483 let (subscription_id, mut msg_rx) = match self
484 .deltas_client
485 .subscribe(self.extractor_id.clone(), subscription_options)
486 .await
487 {
488 Ok(result) => result,
489 Err(e) => return Err((e.into(), Some(end_rx))),
490 };
491
492 let result = async {
493 info!("Waiting for deltas...");
494 let mut last_block_number: Option<u64> = None;
496
497 const MAX_STALE_RETRIES: u32 = 5;
503 let mut stale_retries: u32 = 0;
504 let (msg, header) = 'init: loop {
505 let mut warned_waiting_for_new_block = false;
506 let mut warned_skipping_synced = false;
507 let mut first_msg = loop {
508 let msg = select! {
509 deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
510 deltas_result
511 .map_err(|_| {
512 SynchronizerError::Timeout(format!(
513 "First deltas took longer than {t}s to arrive",
514 t = self.timeout
515 ))
516 })?
517 .ok_or_else(|| {
518 SynchronizerError::ConnectionError(
519 "Deltas channel closed before first message".to_string(),
520 )
521 })?
522 },
523 _ = &mut end_rx => {
524 info!("Received close signal while waiting for first deltas");
525 return Ok(());
526 }
527 };
528
529 let incoming: BlockHeader = (&msg).into();
530
531 let is_new_block_candidate = if self.partial_blocks {
535 match msg.partial_block_index {
536 None => {
537 last_block_number = Some(incoming.number);
539 true
540 }
541 Some(current_partial_idx) => {
542 let is_new_block = last_block_number
543 .map(|prev_block| incoming.number > prev_block)
544 .unwrap_or(false);
545
546 if !warned_waiting_for_new_block {
547 info!(
548 extractor=%self.extractor_id,
549 block=incoming.number,
550 partial_idx=current_partial_idx,
551 "Syncing. Waiting for new block to start"
552 );
553 warned_waiting_for_new_block = true;
554 }
555 last_block_number = Some(incoming.number);
556 is_new_block
557 }
558 }
559 } else {
560 true };
562
563 if !is_new_block_candidate {
564 continue;
565 }
566
567 if let Some(current) = &self.last_synced_block {
569 if current.number >= incoming.number && !self.is_next_expected(&incoming) {
570 if !warned_skipping_synced {
571 info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
572 warned_skipping_synced = true;
573 }
574 continue;
575 }
576 }
577 break msg;
578 };
579
580 self.filter_deltas(&mut first_msg);
581
582 info!(height = first_msg.get_block().number, "First deltas received");
584 let header: BlockHeader = (&first_msg).into();
585 let deltas_msg = StateSyncMessage {
586 header: header.clone(),
587 snapshots: Default::default(),
588 deltas: Some(first_msg),
589 removed_components: Default::default(),
590 };
591
592 if !self.is_next_expected(&header) {
594 info!("Retrieving snapshot");
595 let snapshot_header = if self.partial_blocks && header.number > 0 {
598 BlockHeader {
599 number: header.number - 1,
600 hash: header.parent_hash.clone(),
601 ..Default::default()
602 }
603 } else {
604 BlockHeader { revert: false, ..header.clone() }
605 };
606 let component_ids =
607 self.component_tracker.get_tracked_component_ids();
608 let init_snapshot = if !self.include_snapshots ||
609 component_ids.is_empty()
610 {
611 Snapshot::default()
612 } else {
613 let components: HashMap<_, _> = self
615 .component_tracker
616 .components
617 .iter()
618 .filter(|(id, _)| component_ids.contains(id))
619 .map(|(k, v)| (k.clone(), v.clone()))
620 .collect();
621 let contract_ids: HashSet<Bytes> = self
622 .component_tracker
623 .get_contracts_by_component(&component_ids)
624 .into_iter()
625 .collect();
626 let fetch_params = FetchSnapshotParams {
627 chain: self.extractor_id.chain,
628 protocol_system: self.extractor_id.name.clone(),
629 block_number: snapshot_header.number,
630 uses_dci: self.uses_dci,
631 retrieve_balances: self.retrieve_balances,
632 include_tvl: self.include_tvl,
633 };
634 match fetch_snapshot(
635 &self.rpc_client,
636 components,
637 contract_ids,
638 &fetch_params,
639 )
640 .await
641 {
642 Ok((snap, dci_update, _)) => {
643 self.component_tracker
644 .process_entrypoints(&dci_update);
645 snap
646 }
647 Err(SynchronizerError::RPCError(
648 crate::rpc::RPCError::StaleBlock(reason),
649 )) => {
650 stale_retries += 1;
651 if stale_retries > MAX_STALE_RETRIES {
652 return Err(SynchronizerError::RPCError(
653 crate::rpc::RPCError::StaleBlock(reason),
654 ));
655 }
656 warn!(
661 block = header.number,
662 stale_retries,
663 %reason,
664 "Snapshot block is outside server retention \
665 window; waiting for a more recent block"
666 );
667 continue 'init;
668 }
669 Err(e) => return Err(e),
670 }
671 };
672 let n_components = self.component_tracker.components.len();
673 let n_snapshots = init_snapshot.states.len();
674 info!(
675 n_components,
676 n_snapshots,
677 "Initial snapshot retrieved, starting delta message feed"
678 );
679 let snapshot_msg = StateSyncMessage {
680 header: snapshot_header,
681 snapshots: init_snapshot,
682 deltas: None,
683 removed_components: HashMap::new(),
684 };
685 break 'init (snapshot_msg.merge(deltas_msg), header);
686 } else {
687 break 'init (deltas_msg, header);
688 }
689 };
690
691 block_tx.send(Ok(msg)).await?;
692 self.last_synced_block = Some(header);
693 loop {
694 select! {
695 deltas_opt = msg_rx.recv() => {
696 if let Some(mut deltas) = deltas_opt {
697 let header: BlockHeader = (&deltas).into();
698 debug!(block_number=?header.number, "Received delta message");
699
700 if !self.snapshot_tasks.is_empty() {
702 self.buffered_deltas.push(deltas.clone());
703 }
704
705 let background_snapshots = self.drain_completed_snapshots();
706
707 if self.snapshot_tasks.is_empty() {
709 self.buffered_deltas.clear();
710 } else {
711 let oldest_pending_block = self
712 .snapshot_tasks
713 .iter()
714 .map(|p| p.snapshot_block)
715 .min()
716 .unwrap_or(u64::MAX);
717 self.buffered_deltas
718 .retain(|d| d.block.number > oldest_pending_block);
719 }
720
721 let (snapshots, removed_components) = {
722 let (to_add, to_remove) =
723 self.component_tracker.filter_updated_components(&deltas);
724
725 let retry_ids: Vec<String> = self
731 .snapshot_queue
732 .iter()
733 .filter(|(_, s)| matches!(s, SnapshotStatus::RetryNext))
734 .map(|(id, _)| id.clone())
735 .collect();
736 for id in &retry_ids {
737 self.snapshot_queue.remove(id);
738 }
739
740 let truly_new: Vec<String> = {
745 let mut seen = HashSet::new();
746 to_add
747 .iter()
748 .chain(retry_ids.iter())
749 .filter(|id| {
750 !self.component_tracker
751 .components
752 .contains_key(id.as_str())
753 && !self
754 .snapshot_queue
755 .contains_key(id.as_str())
756 && seen.insert(id.as_str())
757 })
758 .cloned()
759 .collect()
760 };
761
762 if self.partial_blocks {
763 let is_new_block = self
764 .last_synced_block
765 .as_ref()
766 .map(|b| header.number > b.number)
767 .unwrap_or(true);
768
769 let has_deferred = self
770 .snapshot_queue
771 .values()
772 .any(|s| matches!(s, SnapshotStatus::Deferred));
773 if is_new_block && has_deferred && header.number > 0 {
774 let to_fire: Vec<String> = self
778 .snapshot_queue
779 .iter()
780 .filter(|(_, s)| matches!(s, SnapshotStatus::Deferred))
781 .map(|(id, _)| id.clone())
782 .collect();
783 for id in &to_fire {
784 self.snapshot_queue.remove(id);
785 }
786 let snapshot_header = BlockHeader {
787 number: header.number - 1,
788 hash: header.parent_hash.clone(),
789 ..Default::default()
790 };
791 debug!(
792 components = ?to_fire,
793 extractor = %self.extractor_id.name,
794 snapshot_block = header.number - 1,
795 "snapshot_deferred_to_background"
796 );
797 self.spawn_snapshot_task(
798 to_fire,
799 snapshot_header,
800 &deltas,
801 );
802 }
803
804 for id in truly_new {
807 self.snapshot_queue
808 .insert(id, SnapshotStatus::Deferred);
809 }
810 } else if !truly_new.is_empty() {
811 debug!(
812 components = ?truly_new,
813 extractor = %self.extractor_id.name,
814 block_number = ?header.number,
815 "snapshot_deferred_to_background"
816 );
817 let snapshot_header =
818 BlockHeader { revert: false, ..header.clone() };
819 self.spawn_snapshot_task(truly_new, snapshot_header, &deltas);
820 }
821
822 let snapshots = background_snapshots;
823
824 let removed_components = if !to_remove.is_empty() {
825 self.component_tracker.stop_tracking(&to_remove)
826 } else {
827 Default::default()
828 };
829
830 (snapshots, removed_components)
831 };
832
833 self.component_tracker.process_entrypoints(&deltas.dci_update);
835
836 self.filter_deltas(&mut deltas);
838 let n_changes = deltas.n_changes();
839
840 let next = StateSyncMessage {
841 header: header.clone(),
842 snapshots,
843 deltas: Some(deltas),
844 removed_components,
845 };
846 block_tx.send(Ok(next)).await?;
847 self.last_synced_block = Some(header.clone());
848
849 debug!(block_number=?header.number, n_changes, "Finished processing delta message");
850 } else {
851 return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
852 }
853 },
854 _ = &mut end_rx => {
855 info!("Received close signal during state_sync");
856 return Ok(());
857 }
858 }
859 }
860 }.await;
861
862 warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
864 let _ = self
866 .deltas_client
867 .unsubscribe(subscription_id)
868 .await
869 .map_err(|err| {
870 warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
871 });
872
873 match result {
876 Ok(()) => Ok(()), Err(e) => {
878 Err((e, Some(end_rx)))
882 }
883 }
884 }
885
886 fn apply_deltas_to_snapshot(
889 &self,
890 snapshot: &mut Snapshot,
891 snapshot_block: u64,
892 contract_ids: &HashSet<Bytes>,
893 ) {
894 for delta in &self.buffered_deltas {
895 if delta.block.number <= snapshot_block {
896 continue;
897 }
898 for (component_id, state_delta) in &delta.state_deltas {
899 if let Some(cws) = snapshot.states.get_mut(component_id) {
900 cws.state.attributes.extend(
901 state_delta
902 .updated_attributes
903 .iter()
904 .map(|(k, v)| (k.clone(), v.clone())),
905 );
906 for key in &state_delta.deleted_attributes {
907 cws.state.attributes.remove(key);
908 }
909 }
910 }
911 for (component_id, token_balances) in &delta.component_balances {
912 if let Some(cws) = snapshot.states.get_mut(component_id) {
913 for (token, bal) in token_balances {
914 cws.state
915 .balances
916 .insert(token.clone(), bal.balance.clone());
917 }
918 }
919 }
920 for (address, account_delta) in &delta.account_deltas {
921 if contract_ids.contains(address) {
922 if let Some(account) = snapshot.vm_storage.get_mut(address) {
923 account.slots.extend(
924 account_delta
925 .slots
926 .iter()
927 .filter_map(|(k, v)| {
928 v.as_ref()
929 .map(|v| (k.clone(), v.clone()))
930 }),
931 );
932 if let Some(balance) = &account_delta.balance {
933 account.native_balance = balance.clone();
934 }
935 if let Some(code) = account_delta.code() {
936 account.code = code.clone();
937 }
938 }
939 }
940 }
941 }
942 }
943
944 fn spawn_snapshot_task(
948 &mut self,
949 component_ids: Vec<String>,
950 snapshot_header: BlockHeader,
951 current_delta: &BlockAggregatedChanges,
952 ) {
953 let snapshot_block = snapshot_header.number;
954
955 if self.snapshot_tasks.is_empty() {
956 self.buffered_deltas
957 .push(current_delta.clone());
958 }
959
960 let (tx, rx) = oneshot::channel();
961 let rpc = self.rpc_client.clone();
962 let bg_params = FetchSnapshotParams {
963 chain: self.extractor_id.chain,
964 protocol_system: self.extractor_id.name.clone(),
965 block_number: snapshot_block,
966 uses_dci: self.uses_dci,
967 retrieve_balances: self.retrieve_balances,
968 include_tvl: self.include_tvl,
969 };
970 let ids = component_ids.clone();
971 tokio::spawn(async move {
972 let _ = tx.send(fetch_snapshot_background(rpc, ids, bg_params).await);
973 });
974 for id in &component_ids {
975 self.snapshot_queue
976 .insert(id.clone(), SnapshotStatus::InFlight);
977 }
978 self.snapshot_tasks
979 .push(SnapshotTask { component_ids, snapshot_block, receiver: rx });
980 }
981
982 fn drain_completed_snapshots(&mut self) -> Snapshot {
985 let mut result = Snapshot::default();
986 let pending = std::mem::take(&mut self.snapshot_tasks);
987
988 for mut p in pending {
989 match p.receiver.try_recv() {
990 Ok(Ok(fetch_result)) => {
991 debug!(
992 components = ?p.component_ids,
993 extractor = %self.extractor_id.name,
994 "snapshot_background_ready"
995 );
996 for id in &p.component_ids {
997 self.snapshot_queue.remove(id);
998 }
999 let new_component_ids: Vec<String> = fetch_result
1000 .components
1001 .keys()
1002 .cloned()
1003 .collect();
1004 self.component_tracker
1005 .components
1006 .extend(fetch_result.components);
1007 self.component_tracker
1008 .process_entrypoints(&fetch_result.dci_update);
1009 self.component_tracker
1010 .update_contracts(new_component_ids);
1011 let mut snapshot = fetch_result.snapshot;
1012 self.apply_deltas_to_snapshot(
1013 &mut snapshot,
1014 fetch_result.snapshot_block,
1015 &fetch_result.contract_ids,
1016 );
1017 result.extend(snapshot);
1018 }
1019 Ok(Err(e)) => {
1020 if e.is_transient() {
1021 warn!(
1022 components = ?p.component_ids,
1023 extractor = %self.extractor_id.name,
1024 err = %e,
1025 "Background snapshot fetch failed transiently; will retry next block"
1026 );
1027 for id in &p.component_ids {
1028 self.snapshot_queue
1029 .insert(id.clone(), SnapshotStatus::RetryNext);
1030 }
1031 } else {
1032 warn!(
1033 components = ?p.component_ids,
1034 extractor = %self.extractor_id.name,
1035 err = %e,
1036 "Background snapshot fetch failed permanently; \
1037 components blacklisted until restart"
1038 );
1039 for id in &p.component_ids {
1040 self.snapshot_queue
1041 .insert(id.clone(), SnapshotStatus::Blacklisted);
1042 }
1043 }
1044 }
1045 Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
1046 self.snapshot_tasks.push(p);
1047 }
1048 Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
1049 warn!(
1050 components = ?p.component_ids,
1051 extractor = %self.extractor_id.name,
1052 "Background snapshot task dropped before sending result"
1053 );
1054 for id in &p.component_ids {
1055 self.snapshot_queue.remove(id);
1056 }
1057 }
1058 }
1059 }
1060
1061 result
1062 }
1063
1064 fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
1065 if let Some(block) = self.last_synced_block.as_ref() {
1066 return incoming.parent_hash == block.hash;
1067 }
1068 false
1069 }
1070 fn filter_deltas(&self, deltas: &mut BlockAggregatedChanges) {
1071 deltas.filter_by_component(|id| {
1072 self.component_tracker
1073 .components
1074 .contains_key(id)
1075 });
1076 deltas.filter_by_contract(|id| {
1077 self.component_tracker
1078 .contracts
1079 .contains(id)
1080 });
1081 }
1082}
1083
1084#[async_trait]
1085impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
1086where
1087 R: RPCClient + Clone + Send + Sync + 'static,
1088 D: DeltasClient + Clone + Send + Sync + 'static,
1089{
1090 async fn initialize(&mut self) -> SyncResult<()> {
1091 info!("Retrieving relevant protocol components");
1092 self.component_tracker
1093 .initialise_components()
1094 .await?;
1095 info!(
1096 n_components = self.component_tracker.components.len(),
1097 n_contracts = self.component_tracker.contracts.len(),
1098 extractor = %self.extractor_id,
1099 "Finished retrieving components",
1100 );
1101
1102 Ok(())
1103 }
1104
1105 async fn start(
1106 mut self,
1107 ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1108 let (mut tx, rx) = channel(15);
1109 let (end_tx, end_rx) = oneshot::channel::<()>();
1110
1111 let jh = tokio::spawn(async move {
1112 let mut retry_count = 0;
1113 let mut current_end_rx = end_rx;
1114 let mut final_error = None;
1115
1116 while retry_count < self.max_retries {
1117 info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
1118
1119 let prev_block = self
1120 .last_synced_block
1121 .as_ref()
1122 .map(|h| h.number);
1123 let res = self
1124 .state_sync(&mut tx, current_end_rx)
1125 .await;
1126 let made_progress = self
1127 .last_synced_block
1128 .as_ref()
1129 .map(|h| h.number) >
1130 prev_block;
1131 match res {
1132 Ok(()) => {
1133 info!(
1134 extractor_id=%&self.extractor_id,
1135 retry_count,
1136 "State synchronization exited cleanly"
1137 );
1138 return;
1139 }
1140 Err((e, maybe_end_rx)) => {
1141 warn!(
1142 extractor_id=%&self.extractor_id,
1143 retry_count,
1144 error=%e,
1145 "State synchronization errored!"
1146 );
1147
1148 if let Some(recovered_end_rx) = maybe_end_rx {
1150 current_end_rx = recovered_end_rx;
1151
1152 if let SynchronizerError::ConnectionClosed = e {
1153 error!(
1155 "Websocket connection closed. State synchronization exiting."
1156 );
1157 let _ = tx.send(Err(e)).await;
1158 return;
1159 } else {
1160 final_error = Some(e);
1162 }
1163 } else {
1164 info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
1166 return;
1167 }
1168 }
1169 }
1170 sleep(self.retry_cooldown).await;
1171 if made_progress {
1174 retry_count = 0;
1175 } else {
1176 retry_count += 1;
1177 }
1178 }
1179 if let Some(e) = final_error {
1180 warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
1181 let _ = tx.send(Err(e)).await;
1182 }
1183 });
1184
1185 let handle = SynchronizerTaskHandle::new(jh, end_tx);
1186 (handle, rx)
1187 }
1188}
1189
1190#[cfg(test)]
1191mod test {
1192 use std::{collections::HashSet, sync::Arc};
1211
1212 use tycho_common::models::{
1213 blockchain::{
1214 AddressStorageLocation, Block, BlockAggregatedChanges, DCIUpdate, EntryPoint,
1215 EntryPointWithTracingParams, RPCTracerParams, TracingParams, TracingResult,
1216 },
1217 protocol::{ProtocolComponent, ProtocolComponentState},
1218 token::Token,
1219 Chain,
1220 };
1221 use uuid::Uuid;
1222
1223 use super::*;
1224 use crate::{
1225 deltas::MockDeltasClient,
1226 rpc::{MockRPCClient, Page},
1227 DeltasError, RPCError,
1228 };
1229
1230 struct ArcRPCClient<T>(Arc<T>);
1232
1233 impl<T> Clone for ArcRPCClient<T> {
1235 fn clone(&self) -> Self {
1236 ArcRPCClient(self.0.clone())
1237 }
1238 }
1239
1240 #[async_trait]
1241 impl<T> RPCClient for ArcRPCClient<T>
1242 where
1243 T: RPCClient + Sync + Send + 'static,
1244 {
1245 async fn get_tokens(
1246 &self,
1247 params: crate::rpc::TokensParams,
1248 ) -> Result<crate::rpc::Page<Vec<Token>>, RPCError> {
1249 self.0.get_tokens(params).await
1250 }
1251
1252 async fn get_contract_state(
1253 &self,
1254 params: crate::rpc::ContractStateParams,
1255 ) -> Result<crate::rpc::Page<Vec<Account>>, RPCError> {
1256 self.0.get_contract_state(params).await
1257 }
1258
1259 async fn get_protocol_components(
1260 &self,
1261 params: crate::rpc::ProtocolComponentsParams,
1262 ) -> Result<crate::rpc::Page<Vec<ProtocolComponent>>, RPCError> {
1263 self.0
1264 .get_protocol_components(params)
1265 .await
1266 }
1267
1268 async fn get_protocol_states(
1269 &self,
1270 params: crate::rpc::ProtocolStatesParams,
1271 ) -> Result<crate::rpc::Page<Vec<ProtocolComponentState>>, RPCError> {
1272 self.0.get_protocol_states(params).await
1273 }
1274
1275 async fn get_protocol_systems(
1276 &self,
1277 params: crate::rpc::ProtocolSystemsParams,
1278 ) -> Result<crate::rpc::Page<crate::rpc::ProtocolSystems>, RPCError> {
1279 self.0
1280 .get_protocol_systems(params)
1281 .await
1282 }
1283
1284 async fn get_component_tvl(
1285 &self,
1286 params: crate::rpc::ComponentTvlParams,
1287 ) -> Result<crate::rpc::Page<HashMap<String, f64>>, RPCError> {
1288 self.0.get_component_tvl(params).await
1289 }
1290
1291 async fn get_traced_entry_points(
1292 &self,
1293 params: crate::rpc::TracedEntryPointsParams,
1294 ) -> Result<
1295 crate::rpc::Page<HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>>,
1296 RPCError,
1297 > {
1298 self.0
1299 .get_traced_entry_points(params)
1300 .await
1301 }
1302
1303 async fn get_snapshots<'a>(
1304 &self,
1305 request: &SnapshotParameters<'a>,
1306 chunk_size: Option<usize>,
1307 concurrency: usize,
1308 ) -> Result<Snapshot, RPCError> {
1309 self.0
1310 .get_snapshots(request, chunk_size, concurrency)
1311 .await
1312 }
1313
1314 fn compression(&self) -> bool {
1315 self.0.compression()
1316 }
1317 }
1318
1319 struct ArcDeltasClient<T>(Arc<T>);
1321
1322 impl<T> Clone for ArcDeltasClient<T> {
1324 fn clone(&self) -> Self {
1325 ArcDeltasClient(self.0.clone())
1326 }
1327 }
1328
1329 #[async_trait]
1330 impl<T> DeltasClient for ArcDeltasClient<T>
1331 where
1332 T: DeltasClient + Sync + Send + 'static,
1333 {
1334 async fn subscribe(
1335 &self,
1336 extractor_id: tycho_common::models::ExtractorIdentity,
1337 options: SubscriptionOptions,
1338 ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError> {
1339 self.0
1340 .subscribe(extractor_id, options)
1341 .await
1342 }
1343
1344 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
1345 self.0
1346 .unsubscribe(subscription_id)
1347 .await
1348 }
1349
1350 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
1351 self.0.connect().await
1352 }
1353
1354 async fn close(&self) -> Result<(), DeltasError> {
1355 self.0.close().await
1356 }
1357 }
1358
1359 fn with_mocked_clients(
1360 native: bool,
1361 include_tvl: bool,
1362 rpc_client: Option<MockRPCClient>,
1363 deltas_client: Option<MockDeltasClient>,
1364 ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
1365 {
1366 let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
1367 let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
1368
1369 ProtocolStateSynchronizer::new(
1370 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1371 native,
1372 ComponentFilter::with_tvl_range(50.0, 50.0),
1373 1,
1374 Duration::from_secs(0),
1375 true,
1376 include_tvl,
1377 true, rpc_client,
1379 deltas_client,
1380 10_u64,
1381 )
1382 }
1383
1384 fn state_snapshot_native() -> Vec<ProtocolComponentState> {
1385 vec![ProtocolComponentState {
1386 component_id: "Component1".to_string(),
1387 attributes: HashMap::new(),
1388 balances: HashMap::new(),
1389 }]
1390 }
1391
1392 fn make_mock_client() -> MockRPCClient {
1393 let mut m = MockRPCClient::new();
1394 m.expect_compression()
1395 .return_const(false);
1396 m
1397 }
1398
1399 #[test_log::test(tokio::test)]
1400 async fn test_get_snapshots_native() {
1401 let header = BlockHeader::default();
1402 let mut rpc = make_mock_client();
1403 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1404
1405 let component_clone = component.clone();
1406 rpc.expect_get_snapshots()
1407 .returning(move |_request, _chunk_size, _concurrency| {
1408 Ok(Snapshot {
1409 states: state_snapshot_native()
1410 .into_iter()
1411 .map(|state| {
1412 (
1413 state.component_id.clone(),
1414 ComponentWithState {
1415 state,
1416 component: component_clone.clone(),
1417 entrypoints: vec![],
1418 component_tvl: None,
1419 },
1420 )
1421 })
1422 .collect(),
1423 vm_storage: HashMap::new(),
1424 })
1425 });
1426
1427 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1428 state_sync
1429 .component_tracker
1430 .components
1431 .insert("Component1".to_string(), component.clone());
1432 let components_arg = ["Component1".to_string()];
1433 let exp = StateSyncMessage {
1434 header: header.clone(),
1435 snapshots: Snapshot {
1436 states: state_snapshot_native()
1437 .into_iter()
1438 .map(|state| {
1439 (
1440 state.component_id.clone(),
1441 ComponentWithState {
1442 state,
1443 component: component.clone(),
1444 entrypoints: vec![],
1445 component_tvl: None,
1446 },
1447 )
1448 })
1449 .collect(),
1450 vm_storage: HashMap::new(),
1451 },
1452 deltas: None,
1453 removed_components: Default::default(),
1454 };
1455
1456 let req_ids: Vec<String> = components_arg.to_vec();
1457 let components: HashMap<_, _> = state_sync
1458 .component_tracker
1459 .components
1460 .iter()
1461 .filter(|(id, _)| req_ids.contains(id))
1462 .map(|(k, v)| (k.clone(), v.clone()))
1463 .collect();
1464 let contract_ids: HashSet<Bytes> = state_sync
1465 .component_tracker
1466 .get_contracts_by_component(&req_ids)
1467 .into_iter()
1468 .collect();
1469 let params = FetchSnapshotParams {
1470 chain: Chain::Ethereum,
1471 protocol_system: "uniswap-v2".to_string(),
1472 block_number: header.number,
1473 uses_dci: false,
1474 retrieve_balances: true,
1475 include_tvl: false,
1476 };
1477 let (snapshot, _, _) =
1478 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1479 .await
1480 .expect("Retrieving snapshot failed");
1481 let snap = StateSyncMessage {
1482 header: header.clone(),
1483 snapshots: snapshot,
1484 deltas: None,
1485 removed_components: Default::default(),
1486 };
1487
1488 assert_eq!(snap, exp);
1489 }
1490
1491 #[test_log::test(tokio::test)]
1492 async fn test_get_snapshots_native_with_tvl() {
1493 let header = BlockHeader::default();
1494 let mut rpc = make_mock_client();
1495 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1496
1497 let component_clone = component.clone();
1498 rpc.expect_get_snapshots()
1499 .returning(move |_request, _chunk_size, _concurrency| {
1500 Ok(Snapshot {
1501 states: state_snapshot_native()
1502 .into_iter()
1503 .map(|state| {
1504 (
1505 state.component_id.clone(),
1506 ComponentWithState {
1507 state,
1508 component: component_clone.clone(),
1509 component_tvl: Some(100.0),
1510 entrypoints: vec![],
1511 },
1512 )
1513 })
1514 .collect(),
1515 vm_storage: HashMap::new(),
1516 })
1517 });
1518
1519 let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1520 state_sync
1521 .component_tracker
1522 .components
1523 .insert("Component1".to_string(), component.clone());
1524 let components_arg = ["Component1".to_string()];
1525 let exp = StateSyncMessage {
1526 header: header.clone(),
1527 snapshots: Snapshot {
1528 states: state_snapshot_native()
1529 .into_iter()
1530 .map(|state| {
1531 (
1532 state.component_id.clone(),
1533 ComponentWithState {
1534 state,
1535 component: component.clone(),
1536 component_tvl: Some(100.0),
1537 entrypoints: vec![],
1538 },
1539 )
1540 })
1541 .collect(),
1542 vm_storage: HashMap::new(),
1543 },
1544 deltas: None,
1545 removed_components: Default::default(),
1546 };
1547
1548 let req_ids: Vec<String> = components_arg.to_vec();
1549 let components: HashMap<_, _> = state_sync
1550 .component_tracker
1551 .components
1552 .iter()
1553 .filter(|(id, _)| req_ids.contains(id))
1554 .map(|(k, v)| (k.clone(), v.clone()))
1555 .collect();
1556 let contract_ids: HashSet<Bytes> = state_sync
1557 .component_tracker
1558 .get_contracts_by_component(&req_ids)
1559 .into_iter()
1560 .collect();
1561 let params = FetchSnapshotParams {
1562 chain: Chain::Ethereum,
1563 protocol_system: "uniswap-v2".to_string(),
1564 block_number: header.number,
1565 uses_dci: false,
1566 retrieve_balances: true,
1567 include_tvl: true,
1568 };
1569 let (snapshot, _, _) =
1570 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1571 .await
1572 .expect("Retrieving snapshot failed");
1573 let snap = StateSyncMessage {
1574 header: header.clone(),
1575 snapshots: snapshot,
1576 deltas: None,
1577 removed_components: Default::default(),
1578 };
1579
1580 assert_eq!(snap, exp);
1581 }
1582
1583 fn state_snapshot_vm() -> Vec<Account> {
1584 vec![
1585 Account::new(
1586 Chain::default(),
1587 Bytes::from("0x0badc0ffee"),
1588 String::new(),
1589 HashMap::new(),
1590 Bytes::default(),
1591 HashMap::new(),
1592 Bytes::default(),
1593 Bytes::default(),
1594 Bytes::default(),
1595 Bytes::default(),
1596 None,
1597 ),
1598 Account::new(
1599 Chain::default(),
1600 Bytes::from("0xbabe42"),
1601 String::new(),
1602 HashMap::new(),
1603 Bytes::default(),
1604 HashMap::new(),
1605 Bytes::default(),
1606 Bytes::default(),
1607 Bytes::default(),
1608 Bytes::default(),
1609 None,
1610 ),
1611 ]
1612 }
1613
1614 fn traced_entry_point_response(
1615 ) -> HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>> {
1616 HashMap::from([(
1617 "Component1".to_string(),
1618 vec![(
1619 EntryPointWithTracingParams {
1620 entry_point: EntryPoint {
1621 external_id: "entrypoint_a".to_string(),
1622 target: Bytes::from("0x0badc0ffee"),
1623 signature: "sig()".to_string(),
1624 },
1625 params: TracingParams::RPCTracer(RPCTracerParams {
1626 caller: Some(Bytes::from("0x0badc0ffee")),
1627 calldata: Bytes::from("0x0badc0ffee"),
1628 state_overrides: None,
1629 prune_addresses: None,
1630 }),
1631 },
1632 TracingResult {
1633 retriggers: HashSet::from([(
1634 Bytes::from("0x0badc0ffee"),
1635 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1636 )]),
1637 accessed_slots: HashMap::from([(
1638 Bytes::from("0x0badc0ffee"),
1639 HashSet::from([Bytes::from("0xbadbeef0")]),
1640 )]),
1641 },
1642 )],
1643 )])
1644 }
1645
1646 #[test_log::test(tokio::test)]
1647 async fn test_get_snapshots_vm() {
1648 let header = BlockHeader::default();
1649 let mut rpc = make_mock_client();
1650
1651 let traced_ep_response = traced_entry_point_response();
1652 rpc.expect_get_snapshots()
1653 .returning(move |_request, _chunk_size, _concurrency| {
1654 let vm_storage_accounts = state_snapshot_vm();
1655 Ok(Snapshot {
1656 states: [(
1657 "Component1".to_string(),
1658 ComponentWithState {
1659 state: ProtocolComponentState {
1660 component_id: "Component1".to_string(),
1661 attributes: HashMap::new(),
1662 balances: HashMap::new(),
1663 },
1664 component: ProtocolComponent {
1665 id: "Component1".to_string(),
1666 contract_addresses: vec![
1667 Bytes::from("0x0badc0ffee"),
1668 Bytes::from("0xbabe42"),
1669 ],
1670 ..Default::default()
1671 },
1672 component_tvl: None,
1673 entrypoints: traced_ep_response
1674 .get("Component1")
1675 .cloned()
1676 .unwrap_or_default(),
1677 },
1678 )]
1679 .into_iter()
1680 .collect(),
1681 vm_storage: vm_storage_accounts
1682 .into_iter()
1683 .map(|account| (account.address.clone(), account))
1684 .collect(),
1685 })
1686 });
1687
1688 let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1689 let component = ProtocolComponent {
1690 id: "Component1".to_string(),
1691 contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1692 ..Default::default()
1693 };
1694 state_sync
1695 .component_tracker
1696 .components
1697 .insert("Component1".to_string(), component.clone());
1698 let components_arg = ["Component1".to_string()];
1699 let exp = StateSyncMessage {
1700 header: header.clone(),
1701 snapshots: Snapshot {
1702 states: [(
1703 component.id.clone(),
1704 ComponentWithState {
1705 state: ProtocolComponentState {
1706 component_id: "Component1".to_string(),
1707 attributes: HashMap::new(),
1708 balances: HashMap::new(),
1709 },
1710 component: component.clone(),
1711 component_tvl: None,
1712 entrypoints: traced_entry_point_response()
1713 .remove("Component1")
1714 .unwrap_or_default(),
1715 },
1716 )]
1717 .into_iter()
1718 .collect(),
1719 vm_storage: state_snapshot_vm()
1720 .into_iter()
1721 .map(|account| (account.address.clone(), account))
1722 .collect(),
1723 },
1724 deltas: None,
1725 removed_components: Default::default(),
1726 };
1727
1728 let req_ids: Vec<String> = components_arg.to_vec();
1729 let components: HashMap<_, _> = state_sync
1730 .component_tracker
1731 .components
1732 .iter()
1733 .filter(|(id, _)| req_ids.contains(id))
1734 .map(|(k, v)| (k.clone(), v.clone()))
1735 .collect();
1736 let contract_ids: HashSet<Bytes> = state_sync
1737 .component_tracker
1738 .get_contracts_by_component(&req_ids)
1739 .into_iter()
1740 .collect();
1741 let params = FetchSnapshotParams {
1742 chain: Chain::Ethereum,
1743 protocol_system: "uniswap-v2".to_string(),
1744 block_number: header.number,
1745 uses_dci: false,
1746 retrieve_balances: false,
1747 include_tvl: false,
1748 };
1749 let (snapshot, _, _) =
1750 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1751 .await
1752 .expect("Retrieving snapshot failed");
1753 let snap = StateSyncMessage {
1754 header: header.clone(),
1755 snapshots: snapshot,
1756 deltas: None,
1757 removed_components: Default::default(),
1758 };
1759
1760 assert_eq!(snap, exp);
1761 }
1762
1763 #[test_log::test(tokio::test)]
1764 async fn test_get_snapshots_vm_with_tvl() {
1765 let header = BlockHeader::default();
1766 let mut rpc = make_mock_client();
1767 let component = ProtocolComponent {
1768 id: "Component1".to_string(),
1769 contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1770 ..Default::default()
1771 };
1772
1773 let component_clone = component.clone();
1774 rpc.expect_get_snapshots()
1775 .returning(move |_request, _chunk_size, _concurrency| {
1776 let vm_storage_accounts = state_snapshot_vm();
1777 Ok(Snapshot {
1778 states: [(
1779 "Component1".to_string(),
1780 ComponentWithState {
1781 state: ProtocolComponentState {
1782 component_id: "Component1".to_string(),
1783 attributes: HashMap::new(),
1784 balances: HashMap::new(),
1785 },
1786 component: component_clone.clone(),
1787 component_tvl: Some(100.0),
1788 entrypoints: vec![],
1789 },
1790 )]
1791 .into_iter()
1792 .collect(),
1793 vm_storage: vm_storage_accounts
1794 .into_iter()
1795 .map(|account| (account.address.clone(), account))
1796 .collect(),
1797 })
1798 });
1799
1800 let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1801 state_sync
1802 .component_tracker
1803 .components
1804 .insert("Component1".to_string(), component.clone());
1805 let components_arg = ["Component1".to_string()];
1806 let exp = StateSyncMessage {
1807 header: header.clone(),
1808 snapshots: Snapshot {
1809 states: [(
1810 component.id.clone(),
1811 ComponentWithState {
1812 state: ProtocolComponentState {
1813 component_id: "Component1".to_string(),
1814 attributes: HashMap::new(),
1815 balances: HashMap::new(),
1816 },
1817 component: component.clone(),
1818 component_tvl: Some(100.0),
1819 entrypoints: vec![],
1820 },
1821 )]
1822 .into_iter()
1823 .collect(),
1824 vm_storage: state_snapshot_vm()
1825 .into_iter()
1826 .map(|account| (account.address.clone(), account))
1827 .collect(),
1828 },
1829 deltas: None,
1830 removed_components: Default::default(),
1831 };
1832
1833 let req_ids: Vec<String> = components_arg.to_vec();
1834 let components: HashMap<_, _> = state_sync
1835 .component_tracker
1836 .components
1837 .iter()
1838 .filter(|(id, _)| req_ids.contains(id))
1839 .map(|(k, v)| (k.clone(), v.clone()))
1840 .collect();
1841 let contract_ids: HashSet<Bytes> = state_sync
1842 .component_tracker
1843 .get_contracts_by_component(&req_ids)
1844 .into_iter()
1845 .collect();
1846 let params = FetchSnapshotParams {
1847 chain: Chain::Ethereum,
1848 protocol_system: "uniswap-v2".to_string(),
1849 block_number: header.number,
1850 uses_dci: false,
1851 retrieve_balances: false,
1852 include_tvl: true,
1853 };
1854 let (snapshot, _, _) =
1855 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1856 .await
1857 .expect("Retrieving snapshot failed");
1858 let snap = StateSyncMessage {
1859 header: header.clone(),
1860 snapshots: snapshot,
1861 deltas: None,
1862 removed_components: Default::default(),
1863 };
1864
1865 assert_eq!(snap, exp);
1866 }
1867
1868 #[test_log::test(tokio::test)]
1872 async fn test_get_snapshots_filters_to_requested_components_only() {
1873 let header = BlockHeader::default();
1874 let mut rpc = make_mock_client();
1875
1876 let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1878 let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1879 let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1880
1881 let component2_clone = component2.clone();
1882
1883 rpc.expect_get_snapshots()
1885 .withf(
1886 |request: &SnapshotParameters,
1887 _chunk_size: &Option<usize>,
1888 _concurrency: &usize| {
1889 request.components.len() == 1 &&
1891 request
1892 .components
1893 .contains_key("Component2")
1894 },
1895 )
1896 .times(1)
1897 .returning(move |_request, _chunk_size, _concurrency| {
1898 Ok(Snapshot {
1899 states: [(
1900 "Component2".to_string(),
1901 ComponentWithState {
1902 state: ProtocolComponentState {
1903 component_id: "Component2".to_string(),
1904 attributes: HashMap::new(),
1905 balances: HashMap::new(),
1906 },
1907 component: component2_clone.clone(),
1908 entrypoints: vec![],
1909 component_tvl: None,
1910 },
1911 )]
1912 .into_iter()
1913 .collect(),
1914 vm_storage: HashMap::new(),
1915 })
1916 });
1917
1918 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1919
1920 state_sync
1922 .component_tracker
1923 .components
1924 .insert("Component1".to_string(), component1.clone());
1925 state_sync
1926 .component_tracker
1927 .components
1928 .insert("Component2".to_string(), component2.clone());
1929 state_sync
1930 .component_tracker
1931 .components
1932 .insert("Component3".to_string(), component3.clone());
1933
1934 let components_arg = ["Component2".to_string()];
1936 let req_ids: Vec<String> = components_arg.to_vec();
1937 let components: HashMap<_, _> = state_sync
1938 .component_tracker
1939 .components
1940 .iter()
1941 .filter(|(id, _)| req_ids.contains(id))
1942 .map(|(k, v)| (k.clone(), v.clone()))
1943 .collect();
1944 let contract_ids: HashSet<Bytes> = state_sync
1945 .component_tracker
1946 .get_contracts_by_component(&req_ids)
1947 .into_iter()
1948 .collect();
1949 let params = FetchSnapshotParams {
1950 chain: Chain::Ethereum,
1951 protocol_system: "uniswap-v2".to_string(),
1952 block_number: header.number,
1953 uses_dci: false,
1954 retrieve_balances: true,
1955 include_tvl: false,
1956 };
1957 let (snapshot, _, _) =
1958 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
1959 .await
1960 .expect("Retrieving snapshot failed");
1961
1962 assert_eq!(snapshot.states.len(), 1);
1964 assert!(snapshot
1965 .states
1966 .contains_key("Component2"));
1967 assert!(!snapshot
1968 .states
1969 .contains_key("Component1"));
1970 assert!(!snapshot
1971 .states
1972 .contains_key("Component3"));
1973 }
1974
1975 fn mock_clients_for_state_sync(
1976 bg_done: Option<Arc<tokio::sync::Notify>>,
1977 ) -> (MockRPCClient, MockDeltasClient, Sender<BlockAggregatedChanges>) {
1978 let mut rpc_client = make_mock_client();
1979 rpc_client
1982 .expect_get_protocol_components()
1983 .withf(|params: &crate::rpc::ProtocolComponentsParams| {
1984 params
1985 .component_ids()
1986 .is_some_and(|ids| ids.contains(&"Component3".to_string()))
1987 })
1988 .returning(|_| {
1989 Ok(Page::new(
1991 vec![
1992 ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1994 ],
1995 1,
1996 0,
1997 100,
1998 ))
1999 });
2000 rpc_client
2002 .expect_get_snapshots()
2003 .withf(
2004 |request: &SnapshotParameters,
2005 _chunk_size: &Option<usize>,
2006 _concurrency: &usize| {
2007 request
2008 .components
2009 .contains_key("Component3")
2010 },
2011 )
2012 .returning(move |_request, _chunk_size, _concurrency| {
2013 let snap = Ok(Snapshot {
2014 states: [(
2015 "Component3".to_string(),
2016 ComponentWithState {
2017 state: ProtocolComponentState::new(
2018 "Component3",
2019 Default::default(),
2020 Default::default(),
2021 ),
2022 component: ProtocolComponent {
2023 id: "Component3".to_string(),
2024 ..Default::default()
2025 },
2026 component_tvl: Some(1000.0),
2027 entrypoints: vec![],
2028 },
2029 )]
2030 .into_iter()
2031 .collect(),
2032 vm_storage: HashMap::new(),
2033 });
2034 if let Some(n) = &bg_done {
2035 n.notify_one();
2036 }
2037 snap
2038 });
2039
2040 rpc_client
2042 .expect_get_protocol_components()
2043 .returning(|_| {
2044 Ok(Page::new(
2046 vec![
2047 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2049 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2051 ],
2053 2,
2054 0,
2055 100,
2056 ))
2057 });
2058
2059 rpc_client
2060 .expect_get_snapshots()
2061 .returning(|_request, _chunk_size, _concurrency| {
2062 Ok(Snapshot {
2063 states: [
2064 (
2065 "Component1".to_string(),
2066 ComponentWithState {
2067 state: ProtocolComponentState::new(
2068 "Component1",
2069 Default::default(),
2070 Default::default(),
2071 ),
2072 component: ProtocolComponent {
2073 id: "Component1".to_string(),
2074 ..Default::default()
2075 },
2076 component_tvl: Some(100.0),
2077 entrypoints: vec![],
2078 },
2079 ),
2080 (
2081 "Component2".to_string(),
2082 ComponentWithState {
2083 state: ProtocolComponentState::new(
2084 "Component2",
2085 Default::default(),
2086 Default::default(),
2087 ),
2088 component: ProtocolComponent {
2089 id: "Component2".to_string(),
2090 ..Default::default()
2091 },
2092 component_tvl: Some(0.0),
2093 entrypoints: vec![],
2094 },
2095 ),
2096 ]
2097 .into_iter()
2098 .collect(),
2099 vm_storage: HashMap::new(),
2100 })
2101 });
2102
2103 rpc_client
2105 .expect_get_traced_entry_points()
2106 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2107
2108 let mut deltas_client = MockDeltasClient::new();
2110 let (tx, rx) = channel(1);
2111 deltas_client
2112 .expect_subscribe()
2113 .return_once(move |_, _| {
2114 Ok((Uuid::default(), rx))
2116 });
2117
2118 deltas_client
2120 .expect_unsubscribe()
2121 .return_once(|_| Ok(()));
2122
2123 (rpc_client, deltas_client, tx)
2124 }
2125
2126 #[test_log::test(tokio::test)]
2134 async fn test_state_sync() {
2135 let bg_done = Arc::new(tokio::sync::Notify::new());
2136 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(Some(bg_done.clone()));
2137 let deltas = [
2138 BlockAggregatedChanges {
2139 extractor: "uniswap-v2".to_string(),
2140 chain: Chain::Ethereum,
2141 block: Block {
2142 number: 1,
2143 hash: Bytes::from("0x01"),
2144 parent_hash: Bytes::from("0x00"),
2145 chain: Chain::Ethereum,
2146 ts: Default::default(),
2147 },
2148 revert: false,
2149 dci_update: DCIUpdate {
2150 new_entrypoints: HashMap::from([(
2151 "Component1".to_string(),
2152 HashSet::from([EntryPoint {
2153 external_id: "entrypoint_a".to_string(),
2154 target: Bytes::from("0x0badc0ffee"),
2155 signature: "sig()".to_string(),
2156 }]),
2157 )]),
2158 new_entrypoint_params: HashMap::from([(
2159 "entrypoint_a".to_string(),
2160 HashSet::from([(
2161 TracingParams::RPCTracer(RPCTracerParams {
2162 caller: Some(Bytes::from("0x0badc0ffee")),
2163 calldata: Bytes::from("0x0badc0ffee"),
2164 state_overrides: None,
2165 prune_addresses: None,
2166 }),
2167 "Component1".to_string(),
2168 )]),
2169 )]),
2170 trace_results: HashMap::from([(
2171 "entrypoint_a".to_string(),
2172 TracingResult {
2173 retriggers: HashSet::from([(
2174 Bytes::from("0x0badc0ffee"),
2175 AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
2176 )]),
2177 accessed_slots: HashMap::from([(
2178 Bytes::from("0x0badc0ffee"),
2179 HashSet::from([Bytes::from("0xbadbeef0")]),
2180 )]),
2181 },
2182 )]),
2183 },
2184 ..Default::default()
2185 },
2186 BlockAggregatedChanges {
2187 extractor: "uniswap-v2".to_string(),
2188 chain: Chain::Ethereum,
2189 block: Block {
2190 number: 2,
2191 hash: Bytes::from("0x02"),
2192 parent_hash: Bytes::from("0x01"),
2193 chain: Chain::Ethereum,
2194 ts: Default::default(),
2195 },
2196 revert: false,
2197 component_tvl: [
2198 ("Component1".to_string(), 100.0),
2199 ("Component2".to_string(), 0.0),
2200 ("Component3".to_string(), 1000.0),
2201 ]
2202 .into_iter()
2203 .collect(),
2204 ..Default::default()
2205 },
2206 BlockAggregatedChanges {
2209 extractor: "uniswap-v2".to_string(),
2210 chain: Chain::Ethereum,
2211 block: Block {
2212 number: 3,
2213 hash: Bytes::from("0x03"),
2214 parent_hash: Bytes::from("0x02"),
2215 chain: Chain::Ethereum,
2216 ts: Default::default(),
2217 },
2218 revert: false,
2219 ..Default::default()
2220 },
2221 ];
2222 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
2223 state_sync
2224 .initialize()
2225 .await
2226 .expect("Init failed");
2227
2228 let (handle, mut rx) = state_sync.start().await;
2230 let (jh, close_tx) = handle.split();
2231 tx.send(deltas[0].clone())
2232 .await
2233 .expect("deltas channel msg 0 closed!");
2234 let first_msg = timeout(Duration::from_millis(200), rx.recv())
2235 .await
2236 .expect("waiting for first state msg timed out!")
2237 .expect("state sync block sender closed!");
2238 tx.send(deltas[1].clone())
2239 .await
2240 .expect("deltas channel msg 1 closed!");
2241 let second_msg = timeout(Duration::from_millis(200), rx.recv())
2242 .await
2243 .expect("waiting for second state msg timed out!")
2244 .expect("state sync block sender closed!");
2245 bg_done.notified().await;
2247 tx.send(deltas[2].clone())
2248 .await
2249 .expect("deltas channel msg 2 closed!");
2250 let third_msg = timeout(Duration::from_millis(200), rx.recv())
2251 .await
2252 .expect("waiting for third state msg timed out!")
2253 .expect("state sync block sender closed!");
2254 let _ = close_tx.send(());
2255 jh.await
2256 .expect("state sync task panicked!");
2257
2258 let exp1 = StateSyncMessage {
2260 header: BlockHeader {
2261 number: 1,
2262 hash: Bytes::from("0x01"),
2263 parent_hash: Bytes::from("0x00"),
2264 revert: false,
2265 ..Default::default()
2266 },
2267 snapshots: Snapshot {
2268 states: [
2269 (
2270 "Component1".to_string(),
2271 ComponentWithState {
2272 state: ProtocolComponentState::new(
2273 "Component1",
2274 Default::default(),
2275 Default::default(),
2276 ),
2277 component: ProtocolComponent {
2278 id: "Component1".to_string(),
2279 ..Default::default()
2280 },
2281 component_tvl: Some(100.0),
2282 entrypoints: vec![],
2283 },
2284 ),
2285 (
2286 "Component2".to_string(),
2287 ComponentWithState {
2288 state: ProtocolComponentState::new(
2289 "Component2",
2290 Default::default(),
2291 Default::default(),
2292 ),
2293 component: ProtocolComponent {
2294 id: "Component2".to_string(),
2295 ..Default::default()
2296 },
2297 component_tvl: Some(0.0),
2298 entrypoints: vec![],
2299 },
2300 ),
2301 ]
2302 .into_iter()
2303 .collect(),
2304 vm_storage: HashMap::new(),
2305 },
2306 deltas: Some(deltas[0].clone()),
2307 removed_components: Default::default(),
2308 };
2309
2310 let exp2 = StateSyncMessage {
2313 header: BlockHeader {
2314 number: 2,
2315 hash: Bytes::from("0x02"),
2316 parent_hash: Bytes::from("0x01"),
2317 revert: false,
2318 ..Default::default()
2319 },
2320 snapshots: Snapshot::default(),
2321 deltas: Some(BlockAggregatedChanges {
2322 extractor: "uniswap-v2".to_string(),
2323 chain: Chain::Ethereum,
2324 block: Block {
2325 number: 2,
2326 hash: Bytes::from("0x02"),
2327 parent_hash: Bytes::from("0x01"),
2328 chain: Chain::Ethereum,
2329 ts: Default::default(),
2330 },
2331 revert: false,
2332 component_tvl: [
2333 ("Component1".to_string(), 100.0),
2335 ]
2336 .into_iter()
2337 .collect(),
2338 ..Default::default()
2339 }),
2340 removed_components: [(
2342 "Component2".to_string(),
2343 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2344 )]
2345 .into_iter()
2346 .collect(),
2347 };
2348
2349 let exp3 = StateSyncMessage {
2351 header: BlockHeader {
2352 number: 3,
2353 hash: Bytes::from("0x03"),
2354 parent_hash: Bytes::from("0x02"),
2355 revert: false,
2356 ..Default::default()
2357 },
2358 snapshots: Snapshot {
2359 states: [(
2360 "Component3".to_string(),
2361 ComponentWithState {
2362 state: ProtocolComponentState::new(
2363 "Component3",
2364 Default::default(),
2365 Default::default(),
2366 ),
2367 component: ProtocolComponent {
2368 id: "Component3".to_string(),
2369 ..Default::default()
2370 },
2371 component_tvl: Some(1000.0),
2372 entrypoints: vec![],
2373 },
2374 )]
2375 .into_iter()
2376 .collect(),
2377 vm_storage: HashMap::new(),
2378 },
2379 deltas: Some(deltas[2].clone()),
2380 removed_components: Default::default(),
2381 };
2382 assert_eq!(first_msg.unwrap(), exp1);
2383 assert_eq!(second_msg.unwrap(), exp2);
2384 assert_eq!(third_msg.unwrap(), exp3);
2385 }
2386
2387 #[test_log::test(tokio::test)]
2388 async fn test_state_sync_with_tvl_range() {
2389 let remove_tvl_threshold = 5.0;
2390 let add_tvl_threshold = 7.0;
2391 let bg_done = Arc::new(tokio::sync::Notify::new());
2392
2393 let mut rpc_client = make_mock_client();
2394 let mut deltas_client = MockDeltasClient::new();
2395
2396 rpc_client
2397 .expect_get_protocol_components()
2398 .withf(|params: &crate::rpc::ProtocolComponentsParams| {
2399 params
2400 .component_ids()
2401 .is_some_and(|ids| ids.contains(&"Component3".to_string()))
2402 })
2403 .returning(|_| {
2404 Ok(Page::new(
2405 vec![ProtocolComponent { id: "Component3".to_string(), ..Default::default() }],
2406 1,
2407 0,
2408 100,
2409 ))
2410 });
2411 let bg_done_clone = bg_done.clone();
2413 rpc_client
2414 .expect_get_snapshots()
2415 .withf(
2416 |request: &SnapshotParameters,
2417 _chunk_size: &Option<usize>,
2418 _concurrency: &usize| {
2419 request
2420 .components
2421 .contains_key("Component3")
2422 },
2423 )
2424 .returning(move |_request, _chunk_size, _concurrency| {
2425 let snap = Ok(Snapshot {
2426 states: [(
2427 "Component3".to_string(),
2428 ComponentWithState {
2429 state: ProtocolComponentState::new(
2430 "Component3",
2431 Default::default(),
2432 Default::default(),
2433 ),
2434 component: ProtocolComponent {
2435 id: "Component3".to_string(),
2436 ..Default::default()
2437 },
2438 component_tvl: Some(10.0),
2439 entrypoints: vec![],
2440 },
2441 )]
2442 .into_iter()
2443 .collect(),
2444 vm_storage: HashMap::new(),
2445 });
2446 bg_done_clone.notify_one();
2447 snap
2448 });
2449
2450 rpc_client
2452 .expect_get_protocol_components()
2453 .returning(|_| {
2454 Ok(Page::new(
2455 vec![
2456 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2457 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2458 ],
2459 2,
2460 0,
2461 100,
2462 ))
2463 });
2464
2465 rpc_client
2467 .expect_get_snapshots()
2468 .returning(|_request, _chunk_size, _concurrency| {
2469 Ok(Snapshot {
2470 states: [
2471 (
2472 "Component1".to_string(),
2473 ComponentWithState {
2474 state: ProtocolComponentState::new(
2475 "Component1",
2476 Default::default(),
2477 Default::default(),
2478 ),
2479 component: ProtocolComponent {
2480 id: "Component1".to_string(),
2481 ..Default::default()
2482 },
2483 component_tvl: Some(6.0),
2484 entrypoints: vec![],
2485 },
2486 ),
2487 (
2488 "Component2".to_string(),
2489 ComponentWithState {
2490 state: ProtocolComponentState::new(
2491 "Component2",
2492 Default::default(),
2493 Default::default(),
2494 ),
2495 component: ProtocolComponent {
2496 id: "Component2".to_string(),
2497 ..Default::default()
2498 },
2499 component_tvl: Some(2.0),
2500 entrypoints: vec![],
2501 },
2502 ),
2503 ]
2504 .into_iter()
2505 .collect(),
2506 vm_storage: HashMap::new(),
2507 })
2508 });
2509
2510 rpc_client
2512 .expect_get_traced_entry_points()
2513 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2514
2515 let (tx, rx) = channel(1);
2516 deltas_client
2517 .expect_subscribe()
2518 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2519
2520 deltas_client
2522 .expect_unsubscribe()
2523 .return_once(|_| Ok(()));
2524
2525 let mut state_sync = ProtocolStateSynchronizer::new(
2526 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2527 true,
2528 ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
2529 1,
2530 Duration::from_secs(0),
2531 true,
2532 true,
2533 true,
2534 ArcRPCClient(Arc::new(rpc_client)),
2535 ArcDeltasClient(Arc::new(deltas_client)),
2536 10_u64,
2537 );
2538 state_sync
2539 .initialize()
2540 .await
2541 .expect("Init failed");
2542
2543 let deltas = [
2545 BlockAggregatedChanges {
2546 extractor: "uniswap-v2".to_string(),
2547 chain: Chain::Ethereum,
2548 block: Block {
2549 number: 1,
2550 hash: Bytes::from("0x01"),
2551 parent_hash: Bytes::from("0x00"),
2552 chain: Chain::Ethereum,
2553 ts: Default::default(),
2554 },
2555 revert: false,
2556 ..Default::default()
2557 },
2558 BlockAggregatedChanges {
2559 extractor: "uniswap-v2".to_string(),
2560 chain: Chain::Ethereum,
2561 block: Block {
2562 number: 2,
2563 hash: Bytes::from("0x02"),
2564 parent_hash: Bytes::from("0x01"),
2565 chain: Chain::Ethereum,
2566 ts: Default::default(),
2567 },
2568 revert: false,
2569 component_tvl: [
2570 ("Component1".to_string(), 6.0), ("Component2".to_string(), 2.0), ("Component3".to_string(), 10.0), ]
2574 .into_iter()
2575 .collect(),
2576 ..Default::default()
2577 },
2578 BlockAggregatedChanges {
2580 extractor: "uniswap-v2".to_string(),
2581 chain: Chain::Ethereum,
2582 block: Block {
2583 number: 3,
2584 hash: Bytes::from("0x03"),
2585 parent_hash: Bytes::from("0x02"),
2586 chain: Chain::Ethereum,
2587 ts: Default::default(),
2588 },
2589 revert: false,
2590 ..Default::default()
2591 },
2592 ];
2593
2594 let (handle, mut rx) = state_sync.start().await;
2595 let (jh, close_tx) = handle.split();
2596
2597 tx.send(deltas[0].clone())
2599 .await
2600 .expect("deltas channel msg 0 closed!");
2601
2602 let _ = timeout(Duration::from_millis(200), rx.recv())
2604 .await
2605 .expect("waiting for first state msg timed out!")
2606 .expect("state sync block sender closed!");
2607
2608 tx.send(deltas[1].clone())
2611 .await
2612 .expect("deltas channel msg 1 closed!");
2613 let second_msg = timeout(Duration::from_millis(200), rx.recv())
2614 .await
2615 .expect("waiting for second state msg timed out!")
2616 .expect("state sync block sender closed!")
2617 .expect("no error");
2618
2619 bg_done.notified().await;
2621
2622 tx.send(deltas[2].clone())
2623 .await
2624 .expect("deltas channel msg 2 closed!");
2625 let third_msg = timeout(Duration::from_millis(200), rx.recv())
2626 .await
2627 .expect("waiting for third state msg timed out!")
2628 .expect("state sync block sender closed!")
2629 .expect("no error");
2630
2631 let _ = close_tx.send(());
2632 jh.await
2633 .expect("state sync task panicked!");
2634
2635 let expected_second_msg = StateSyncMessage {
2637 header: BlockHeader {
2638 number: 2,
2639 hash: Bytes::from("0x02"),
2640 parent_hash: Bytes::from("0x01"),
2641 revert: false,
2642 ..Default::default()
2643 },
2644 snapshots: Snapshot::default(),
2645 deltas: Some(BlockAggregatedChanges {
2646 extractor: "uniswap-v2".to_string(),
2647 chain: Chain::Ethereum,
2648 block: Block {
2649 number: 2,
2650 hash: Bytes::from("0x02"),
2651 parent_hash: Bytes::from("0x01"),
2652 chain: Chain::Ethereum,
2653 ts: Default::default(),
2654 },
2655 revert: false,
2656 component_tvl: [
2657 ("Component1".to_string(), 6.0), ]
2661 .into_iter()
2662 .collect(),
2663 ..Default::default()
2664 }),
2665 removed_components: [(
2666 "Component2".to_string(),
2667 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2668 )]
2669 .into_iter()
2670 .collect(),
2671 };
2672
2673 let expected_third_msg = StateSyncMessage {
2675 header: BlockHeader {
2676 number: 3,
2677 hash: Bytes::from("0x03"),
2678 parent_hash: Bytes::from("0x02"),
2679 revert: false,
2680 ..Default::default()
2681 },
2682 snapshots: Snapshot {
2683 states: [(
2684 "Component3".to_string(),
2685 ComponentWithState {
2686 state: ProtocolComponentState::new(
2687 "Component3",
2688 Default::default(),
2689 Default::default(),
2690 ),
2691 component: ProtocolComponent {
2692 id: "Component3".to_string(),
2693 ..Default::default()
2694 },
2695 component_tvl: Some(10.0),
2696 entrypoints: vec![],
2697 },
2698 )]
2699 .into_iter()
2700 .collect(),
2701 vm_storage: HashMap::new(),
2702 },
2703 deltas: Some(deltas[2].clone()),
2704 removed_components: Default::default(),
2705 };
2706
2707 assert_eq!(second_msg, expected_second_msg);
2708 assert_eq!(third_msg, expected_third_msg);
2709 }
2710
2711 #[test_log::test(tokio::test)]
2712 async fn test_public_close_api_functionality() {
2713 let mut rpc_client = make_mock_client();
2720 let mut deltas_client = MockDeltasClient::new();
2721
2722 rpc_client
2724 .expect_get_protocol_components()
2725 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2726
2727 let (_tx, rx) = channel(1);
2729 deltas_client
2730 .expect_subscribe()
2731 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2732
2733 deltas_client
2735 .expect_unsubscribe()
2736 .return_once(|_| Ok(()));
2737
2738 let mut state_sync = ProtocolStateSynchronizer::new(
2739 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2740 true,
2741 ComponentFilter::with_tvl_range(0.0, 0.0),
2742 5, Duration::from_secs(0),
2744 true,
2745 false,
2746 true,
2747 ArcRPCClient(Arc::new(rpc_client)),
2748 ArcDeltasClient(Arc::new(deltas_client)),
2749 10000_u64, );
2751
2752 state_sync
2753 .initialize()
2754 .await
2755 .expect("Init should succeed");
2756
2757 let (handle, _rx) = state_sync.start().await;
2759 let (jh, close_tx) = handle.split();
2760
2761 tokio::time::sleep(Duration::from_millis(100)).await;
2763
2764 close_tx
2766 .send(())
2767 .expect("Should be able to send close signal");
2768 jh.await.expect("Task should not panic");
2770 }
2771
2772 #[test_log::test(tokio::test)]
2773 async fn test_cleanup_runs_when_state_sync_processing_errors() {
2774 let mut rpc_client = make_mock_client();
2779 let mut deltas_client = MockDeltasClient::new();
2780
2781 rpc_client
2783 .expect_get_protocol_components()
2784 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2785
2786 rpc_client
2788 .expect_get_protocol_states()
2789 .returning(|_| {
2790 Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2791 });
2792
2793 let (tx, rx) = channel(10);
2795 deltas_client
2796 .expect_subscribe()
2797 .return_once(move |_, _| {
2798 let delta = BlockAggregatedChanges {
2800 extractor: "test".to_string(),
2801 chain: Chain::Ethereum,
2802 block: Block {
2803 hash: Bytes::from("0x0123"),
2804 number: 1,
2805 parent_hash: Bytes::from("0x0000"),
2806 chain: Chain::Ethereum,
2807 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2808 .unwrap()
2809 .naive_utc(),
2810 },
2811 revert: false,
2812 new_protocol_components: [(
2814 "new_component".to_string(),
2815 ProtocolComponent { id: "new_component".to_string(), ..Default::default() },
2816 )]
2817 .into_iter()
2818 .collect(),
2819 component_tvl: [("new_component".to_string(), 100.0)]
2820 .into_iter()
2821 .collect(),
2822 ..Default::default()
2823 };
2824
2825 tokio::spawn(async move {
2826 let _ = tx.send(delta).await;
2827 });
2829
2830 Ok((Uuid::default(), rx))
2831 });
2832
2833 deltas_client
2835 .expect_unsubscribe()
2836 .return_once(|_| Ok(()));
2837
2838 let mut state_sync = ProtocolStateSynchronizer::new(
2839 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2840 true,
2841 ComponentFilter::with_tvl_range(0.0, 1000.0), 1,
2843 Duration::from_secs(0),
2844 true,
2845 false,
2846 true,
2847 ArcRPCClient(Arc::new(rpc_client)),
2848 ArcDeltasClient(Arc::new(deltas_client)),
2849 5000_u64,
2850 );
2851
2852 state_sync
2853 .initialize()
2854 .await
2855 .expect("Init should succeed");
2856
2857 state_sync.last_synced_block = Some(BlockHeader {
2859 hash: Bytes::from("0x0badc0ffee"),
2860 number: 42,
2861 parent_hash: Bytes::from("0xbadbeef0"),
2862 revert: false,
2863 timestamp: 123456789,
2864 partial_block_index: None,
2865 });
2866
2867 let (mut block_tx, _block_rx) = channel(10);
2869
2870 let (_end_tx, end_rx) = oneshot::channel::<()>();
2872 let result = state_sync
2873 .state_sync(&mut block_tx, end_rx)
2874 .await;
2875 assert!(result.is_err(), "state_sync should have errored during processing");
2877
2878 }
2881
2882 #[test_log::test(tokio::test)]
2883 async fn test_close_signal_while_waiting_for_first_deltas() {
2884 let mut rpc_client = make_mock_client();
2888 let mut deltas_client = MockDeltasClient::new();
2889
2890 rpc_client
2891 .expect_get_protocol_components()
2892 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2893
2894 let (_tx, rx) = channel(1);
2895 deltas_client
2896 .expect_subscribe()
2897 .return_once(move |_, _| Ok((Uuid::default(), rx)));
2898
2899 deltas_client
2900 .expect_unsubscribe()
2901 .return_once(|_| Ok(()));
2902
2903 let mut state_sync = ProtocolStateSynchronizer::new(
2904 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2905 true,
2906 ComponentFilter::with_tvl_range(0.0, 0.0),
2907 1,
2908 Duration::from_secs(0),
2909 true,
2910 true,
2911 false,
2912 ArcRPCClient(Arc::new(rpc_client)),
2913 ArcDeltasClient(Arc::new(deltas_client)),
2914 10000_u64,
2915 );
2916
2917 state_sync
2918 .initialize()
2919 .await
2920 .expect("Init should succeed");
2921
2922 let (mut block_tx, _block_rx) = channel(10);
2923 let (end_tx, end_rx) = oneshot::channel::<()>();
2924
2925 let state_sync_handle = tokio::spawn(async move {
2927 state_sync
2928 .state_sync(&mut block_tx, end_rx)
2929 .await
2930 });
2931
2932 tokio::time::sleep(Duration::from_millis(100)).await;
2934
2935 let _ = end_tx.send(());
2937
2938 let result = state_sync_handle
2940 .await
2941 .expect("Task should not panic");
2942 assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2943
2944 println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2945 }
2946
2947 #[test_log::test(tokio::test)]
2948 async fn test_close_signal_during_main_processing_loop() {
2949 let mut rpc_client = make_mock_client();
2955 let mut deltas_client = MockDeltasClient::new();
2956
2957 rpc_client
2959 .expect_get_protocol_components()
2960 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2961
2962 rpc_client
2964 .expect_get_protocol_states()
2965 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2966
2967 rpc_client
2968 .expect_get_component_tvl()
2969 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2970
2971 rpc_client
2972 .expect_get_traced_entry_points()
2973 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2974
2975 let (tx, rx) = channel(10);
2977 deltas_client
2978 .expect_subscribe()
2979 .return_once(move |_, _| {
2980 let first_delta = BlockAggregatedChanges {
2982 extractor: "test".to_string(),
2983 chain: Chain::Ethereum,
2984 block: Block {
2985 hash: Bytes::from("0x0123"),
2986 number: 1,
2987 parent_hash: Bytes::from("0x0000"),
2988 chain: Chain::Ethereum,
2989 ts: chrono::DateTime::from_timestamp(1234567890, 0)
2990 .unwrap()
2991 .naive_utc(),
2992 },
2993 revert: false,
2994 ..Default::default()
2995 };
2996
2997 tokio::spawn(async move {
2998 let _ = tx.send(first_delta).await;
2999 tokio::time::sleep(Duration::from_secs(30)).await;
3002 });
3003
3004 Ok((Uuid::default(), rx))
3005 });
3006
3007 deltas_client
3008 .expect_unsubscribe()
3009 .return_once(|_| Ok(()));
3010
3011 let mut state_sync = ProtocolStateSynchronizer::new(
3012 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3013 true,
3014 ComponentFilter::with_tvl_range(0.0, 1000.0),
3015 1,
3016 Duration::from_secs(0),
3017 true,
3018 false,
3019 true,
3020 ArcRPCClient(Arc::new(rpc_client)),
3021 ArcDeltasClient(Arc::new(deltas_client)),
3022 10000_u64,
3023 );
3024
3025 state_sync
3026 .initialize()
3027 .await
3028 .expect("Init should succeed");
3029
3030 let (mut block_tx, mut block_rx) = channel(10);
3031 let (end_tx, end_rx) = oneshot::channel::<()>();
3032
3033 let state_sync_handle = tokio::spawn(async move {
3035 state_sync
3036 .state_sync(&mut block_tx, end_rx)
3037 .await
3038 });
3039
3040 let first_snapshot = block_rx
3042 .recv()
3043 .await
3044 .expect("Should receive first snapshot")
3045 .expect("Synchronizer error");
3046 assert!(
3047 !first_snapshot
3048 .snapshots
3049 .states
3050 .is_empty() ||
3051 first_snapshot.deltas.is_some()
3052 );
3053 let _ = end_tx.send(());
3055
3056 let result = state_sync_handle
3058 .await
3059 .expect("Task should not panic");
3060 assert!(
3061 result.is_ok(),
3062 "state_sync should exit cleanly when closed after first message: {result:?}"
3063 );
3064 }
3065
3066 #[test_log::test(tokio::test)]
3067 async fn test_max_retries_exceeded_error_propagation() {
3068 let mut rpc_client = make_mock_client();
3072 let mut deltas_client = MockDeltasClient::new();
3073
3074 rpc_client
3076 .expect_get_protocol_components()
3077 .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
3078
3079 deltas_client
3082 .expect_subscribe()
3083 .returning(|_, _| {
3084 Err(DeltasError::NotConnected)
3086 });
3087
3088 deltas_client
3090 .expect_unsubscribe()
3091 .returning(|_| Ok(()))
3092 .times(0..=5);
3093
3094 let mut state_sync = ProtocolStateSynchronizer::new(
3096 ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3097 true,
3098 ComponentFilter::with_tvl_range(0.0, 1000.0),
3099 2, Duration::from_millis(10), true,
3102 false,
3103 true,
3104 ArcRPCClient(Arc::new(rpc_client)),
3105 ArcDeltasClient(Arc::new(deltas_client)),
3106 1000_u64,
3107 );
3108
3109 state_sync
3110 .initialize()
3111 .await
3112 .expect("Init should succeed");
3113
3114 let (handle, mut rx) = state_sync.start().await;
3116 let (jh, _close_tx) = handle.split();
3117
3118 let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3119 .await
3120 .expect("responsds in time")
3121 .expect("channel open");
3122
3123 if let Err(err) = res {
3125 assert!(
3126 matches!(err, SynchronizerError::ConnectionClosed),
3127 "Expected ConnectionClosed error, got: {:?}",
3128 err
3129 );
3130 } else {
3131 panic!("Expected an error")
3132 }
3133
3134 let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
3136 assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
3137 }
3138
3139 #[test_log::test(tokio::test)]
3140 async fn test_is_next_expected() {
3141 let mut state_sync = with_mocked_clients(true, false, None, None);
3145
3146 let incoming_header = BlockHeader {
3148 number: 100,
3149 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3150 parent_hash: Bytes::from(
3151 "0x0000000000000000000000000000000000000000000000000000000000000000",
3152 ),
3153 revert: false,
3154 timestamp: 123456789,
3155 partial_block_index: None,
3156 };
3157 assert!(
3158 !state_sync.is_next_expected(&incoming_header),
3159 "Should return false when no previous block is set"
3160 );
3161
3162 let previous_header = BlockHeader {
3164 number: 99,
3165 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
3166 parent_hash: Bytes::from(
3167 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
3168 ),
3169 revert: false,
3170 timestamp: 123456788,
3171 partial_block_index: None,
3172 };
3173 state_sync.last_synced_block = Some(previous_header.clone());
3174
3175 assert!(
3176 state_sync.is_next_expected(&incoming_header),
3177 "Should return true when incoming parent_hash matches previous hash"
3178 );
3179
3180 let non_matching_header = BlockHeader {
3182 number: 100,
3183 hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3184 parent_hash: Bytes::from(
3185 "0x1111111111111111111111111111111111111111111111111111111111111111",
3186 ), revert: false,
3188 timestamp: 123456789,
3189 partial_block_index: None,
3190 };
3191 assert!(
3192 !state_sync.is_next_expected(&non_matching_header),
3193 "Should return false when incoming parent_hash doesn't match previous hash"
3194 );
3195 }
3196
3197 #[test_log::test(tokio::test)]
3198 async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
3199 let mut rpc_client = make_mock_client();
3203 let mut deltas_client = MockDeltasClient::new();
3204
3205 rpc_client
3207 .expect_get_protocol_components()
3208 .returning(|_| {
3209 Ok(Page::new(
3210 vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3211 1,
3212 0,
3213 100,
3214 ))
3215 });
3216
3217 let (tx, rx) = channel(10);
3219 deltas_client
3220 .expect_subscribe()
3221 .return_once(move |_, _| {
3222 let expected_next_delta = BlockAggregatedChanges {
3223 extractor: "uniswap-v2".to_string(),
3224 chain: Chain::Ethereum,
3225 block: Block {
3226 hash: Bytes::from(
3227 "0x0000000000000000000000000000000000000000000000000000000000000002",
3228 ), number: 2,
3230 parent_hash: Bytes::from(
3231 "0x0000000000000000000000000000000000000000000000000000000000000001",
3232 ), chain: Chain::Ethereum,
3234 ts: chrono::DateTime::from_timestamp(1234567890, 0)
3235 .unwrap()
3236 .naive_utc(),
3237 },
3238 revert: false,
3239 ..Default::default()
3240 };
3241
3242 tokio::spawn(async move {
3243 let _ = tx.send(expected_next_delta).await;
3244 });
3245
3246 Ok((Uuid::default(), rx))
3247 });
3248
3249 deltas_client
3250 .expect_unsubscribe()
3251 .return_once(|_| Ok(()));
3252
3253 let mut state_sync = ProtocolStateSynchronizer::new(
3254 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3255 true,
3256 ComponentFilter::with_tvl_range(0.0, 1000.0),
3257 1,
3258 Duration::from_secs(0),
3259 true, false,
3261 true,
3262 ArcRPCClient(Arc::new(rpc_client)),
3263 ArcDeltasClient(Arc::new(deltas_client)),
3264 10000_u64,
3265 );
3266
3267 state_sync
3269 .initialize()
3270 .await
3271 .expect("Init should succeed");
3272
3273 state_sync.last_synced_block = Some(BlockHeader {
3275 number: 1,
3276 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), parent_hash: Bytes::from(
3278 "0x0000000000000000000000000000000000000000000000000000000000000000",
3279 ),
3280 revert: false,
3281 timestamp: 123456789,
3282 partial_block_index: None,
3283 });
3284
3285 let (mut block_tx, mut block_rx) = channel(10);
3286 let (end_tx, end_rx) = oneshot::channel::<()>();
3287
3288 let state_sync_handle = tokio::spawn(async move {
3290 state_sync
3291 .state_sync(&mut block_tx, end_rx)
3292 .await
3293 });
3294
3295 let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
3297 .await
3298 .expect("Should receive message within timeout")
3299 .expect("Channel should be open")
3300 .expect("Should not be an error");
3301
3302 let _ = end_tx.send(());
3304
3305 let _ = state_sync_handle
3307 .await
3308 .expect("Task should not panic");
3309
3310 assert!(result_msg.deltas.is_some(), "Should contain deltas");
3313 assert!(
3314 result_msg.snapshots.states.is_empty(),
3315 "Should not contain snapshots when next expected block is received"
3316 );
3317
3318 if let Some(deltas) = &result_msg.deltas {
3320 assert_eq!(deltas.block.number, 2);
3321 assert_eq!(
3322 deltas.block.hash,
3323 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
3324 );
3325 assert_eq!(
3326 deltas.block.parent_hash,
3327 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
3328 );
3329 }
3330 }
3331
3332 #[test_log::test(tokio::test)]
3333 async fn test_skip_previously_processed_messages() {
3334 let mut rpc_client = make_mock_client();
3338 let mut deltas_client = MockDeltasClient::new();
3339
3340 rpc_client
3342 .expect_get_protocol_components()
3343 .returning(|_| {
3344 Ok(Page::new(
3345 vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3346 1,
3347 0,
3348 100,
3349 ))
3350 });
3351
3352 rpc_client
3354 .expect_get_protocol_states()
3355 .returning(|_| {
3356 Ok(Page::new(
3357 vec![ProtocolComponentState::new(
3358 "Component1",
3359 Default::default(),
3360 Default::default(),
3361 )],
3362 1,
3363 0,
3364 100,
3365 ))
3366 });
3367
3368 rpc_client
3369 .expect_get_component_tvl()
3370 .returning(|_| {
3371 Ok(Page::new(
3372 [("Component1".to_string(), 100.0)]
3373 .into_iter()
3374 .collect(),
3375 1,
3376 0,
3377 100,
3378 ))
3379 });
3380
3381 rpc_client
3382 .expect_get_traced_entry_points()
3383 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3384
3385 let (tx, rx) = channel(10);
3387 deltas_client
3388 .expect_subscribe()
3389 .return_once(move |_, _| {
3390 let old_messages = vec![
3392 BlockAggregatedChanges {
3393 extractor: "uniswap-v2".to_string(),
3394 chain: Chain::Ethereum,
3395 block: Block {
3396 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3397 number: 3,
3398 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
3399 chain: Chain::Ethereum,
3400 ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
3401 },
3402 revert: false,
3403 ..Default::default()
3404 },
3405 BlockAggregatedChanges {
3406 extractor: "uniswap-v2".to_string(),
3407 chain: Chain::Ethereum,
3408 block: Block {
3409 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3410 number: 4,
3411 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3412 chain: Chain::Ethereum,
3413 ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
3414 },
3415 revert: false,
3416 ..Default::default()
3417 },
3418 BlockAggregatedChanges {
3419 extractor: "uniswap-v2".to_string(),
3420 chain: Chain::Ethereum,
3421 block: Block {
3422 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3423 number: 5,
3424 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3425 chain: Chain::Ethereum,
3426 ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
3427 },
3428 revert: false,
3429 ..Default::default()
3430 },
3431 BlockAggregatedChanges {
3433 extractor: "uniswap-v2".to_string(),
3434 chain: Chain::Ethereum,
3435 block: Block {
3436 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
3437 number: 6,
3438 parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3439 chain: Chain::Ethereum,
3440 ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
3441 },
3442 revert: false,
3443 ..Default::default()
3444 },
3445 ];
3446
3447 tokio::spawn(async move {
3448 for message in old_messages {
3449 let _ = tx.send(message).await;
3450 tokio::time::sleep(Duration::from_millis(10)).await;
3451 }
3452 });
3453
3454 Ok((Uuid::default(), rx))
3455 });
3456
3457 deltas_client
3458 .expect_unsubscribe()
3459 .return_once(|_| Ok(()));
3460
3461 let mut state_sync = ProtocolStateSynchronizer::new(
3462 ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3463 true,
3464 ComponentFilter::with_tvl_range(0.0, 1000.0),
3465 1,
3466 Duration::from_secs(0),
3467 true,
3468 true,
3469 true,
3470 ArcRPCClient(Arc::new(rpc_client)),
3471 ArcDeltasClient(Arc::new(deltas_client)),
3472 10000_u64,
3473 );
3474
3475 state_sync
3477 .initialize()
3478 .await
3479 .expect("Init should succeed");
3480
3481 state_sync.last_synced_block = Some(BlockHeader {
3482 number: 5,
3483 hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3484 parent_hash: Bytes::from(
3485 "0x0000000000000000000000000000000000000000000000000000000000000004",
3486 ),
3487 revert: false,
3488 timestamp: 1234567892,
3489 partial_block_index: None,
3490 });
3491
3492 let (mut block_tx, mut block_rx) = channel(10);
3493 let (end_tx, end_rx) = oneshot::channel::<()>();
3494
3495 let state_sync_handle = tokio::spawn(async move {
3497 state_sync
3498 .state_sync(&mut block_tx, end_rx)
3499 .await
3500 });
3501
3502 let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
3504 .await
3505 .expect("Should receive message within timeout")
3506 .expect("Channel should be open")
3507 .expect("Should not be an error");
3508
3509 let _ = end_tx.send(());
3511
3512 let _ = state_sync_handle
3514 .await
3515 .expect("Task should not panic");
3516
3517 assert!(result_msg.deltas.is_some(), "Should contain deltas");
3519 if let Some(deltas) = &result_msg.deltas {
3520 assert_eq!(
3521 deltas.block.number, 6,
3522 "Should only process block 6, skipping earlier blocks"
3523 );
3524 assert_eq!(
3525 deltas.block.hash,
3526 Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
3527 );
3528 }
3529
3530 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3533 Err(_) => {
3534 }
3536 Ok(Some(Err(_))) => {
3537 }
3539 Ok(Some(Ok(_))) => {
3540 panic!("Should not receive additional messages - old blocks should be skipped");
3541 }
3542 Ok(None) => {
3543 }
3545 }
3546 }
3547
3548 fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockAggregatedChanges {
3549 let hash = Bytes::from(vec![block_num as u8; 32]);
3551 let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
3552 BlockAggregatedChanges {
3553 extractor: "uniswap-v2".to_string(),
3554 chain: Chain::Ethereum,
3555 block: Block {
3556 number: block_num,
3557 hash,
3558 parent_hash,
3559 chain: Chain::Ethereum,
3560 ts: Default::default(),
3561 },
3562 revert: false,
3563 partial_block_index: partial_idx,
3564 ..Default::default()
3565 }
3566 }
3567
3568 #[test_log::test(tokio::test)]
3570 async fn test_partial_mode_accepts_full_block_as_first_message() {
3571 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3572 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3573 .with_partial_blocks(true);
3574 state_sync
3575 .initialize()
3576 .await
3577 .expect("Init failed");
3578
3579 let (handle, mut block_rx) = state_sync.start().await;
3580 let (jh, close_tx) = handle.split();
3581
3582 tx.send(make_block_changes(1, None))
3584 .await
3585 .unwrap();
3586
3587 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3589 .await
3590 .expect("Should receive message")
3591 .expect("Channel open")
3592 .expect("No error");
3593
3594 assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
3595 assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
3596
3597 let _ = close_tx.send(());
3598 jh.await.expect("Task should not panic");
3599 }
3600
3601 #[test_log::test(tokio::test)]
3603 async fn test_partial_mode_detects_block_number_increase() {
3604 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3605 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3606 .with_partial_blocks(true);
3607 state_sync
3608 .initialize()
3609 .await
3610 .expect("Init failed");
3611
3612 let (handle, mut block_rx) = state_sync.start().await;
3613 let (jh, close_tx) = handle.split();
3614
3615 tx.send(make_block_changes(1, Some(0)))
3617 .await
3618 .unwrap();
3619 tx.send(make_block_changes(1, Some(3)))
3620 .await
3621 .unwrap();
3622
3623 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3625 Err(_) => { }
3626 Ok(_) => panic!("Should not receive message while waiting for new block"),
3627 }
3628
3629 tx.send(make_block_changes(2, Some(5)))
3632 .await
3633 .unwrap();
3634
3635 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3637 .await
3638 .expect("Should receive message")
3639 .expect("Channel open")
3640 .expect("No error");
3641
3642 assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3643 assert_eq!(msg.header.partial_block_index, Some(5));
3644
3645 let _ = close_tx.send(());
3646 jh.await.expect("Task should not panic");
3647 }
3648
3649 #[test_log::test(tokio::test)]
3651 async fn test_partial_mode_skips_already_synced_blocks() {
3652 let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3653 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3654 .with_partial_blocks(true);
3655 state_sync
3656 .initialize()
3657 .await
3658 .expect("Init failed");
3659
3660 state_sync.last_synced_block = Some(BlockHeader {
3662 number: 5,
3663 hash: Bytes::from("0x05"),
3664 parent_hash: Bytes::from("0x04"),
3665 revert: false,
3666 timestamp: 0,
3667 partial_block_index: None,
3668 });
3669
3670 let (handle, mut block_rx) = state_sync.start().await;
3671 let (jh, close_tx) = handle.split();
3672
3673 tx.send(make_block_changes(3, Some(2)))
3675 .await
3676 .unwrap();
3677
3678 tx.send(make_block_changes(4, Some(0)))
3680 .await
3681 .unwrap();
3682
3683 match timeout(Duration::from_millis(50), block_rx.recv()).await {
3685 Err(_) => { }
3686 Ok(_) => panic!("Should skip block 4 because it's already synced"),
3687 }
3688
3689 tx.send(make_block_changes(5, Some(3)))
3692 .await
3693 .unwrap();
3694 tx.send(make_block_changes(6, Some(0)))
3696 .await
3697 .unwrap();
3698
3699 let msg = timeout(Duration::from_millis(100), block_rx.recv())
3700 .await
3701 .expect("Should receive message")
3702 .expect("Channel open")
3703 .expect("No error");
3704
3705 assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3706
3707 let _ = close_tx.send(());
3708 jh.await.expect("Task should not panic");
3709 }
3710
3711 #[test_log::test(tokio::test)]
3712 async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3713 let header = BlockHeader::default();
3714 let mut rpc = make_mock_client();
3715 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3716
3717 let component_clone = component.clone();
3718 rpc.expect_get_snapshots()
3719 .returning(move |_request, _chunk_size, _concurrency| {
3720 Ok(Snapshot {
3721 states: [(
3722 "Component1".to_string(),
3723 ComponentWithState {
3724 state: ProtocolComponentState::new(
3725 "Component1",
3726 Default::default(),
3727 Default::default(),
3728 ),
3729 component: component_clone.clone(),
3730 entrypoints: vec![],
3731 component_tvl: None,
3732 },
3733 )]
3734 .into_iter()
3735 .collect(),
3736 vm_storage: HashMap::new(),
3737 })
3738 });
3739
3740 rpc.expect_get_traced_entry_points()
3742 .never();
3743
3744 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3745 state_sync
3747 .component_tracker
3748 .components
3749 .insert("Component1".to_string(), component);
3750
3751 let components_arg = ["Component1".to_string()];
3752 let req_ids: Vec<String> = components_arg.to_vec();
3753 let components: HashMap<_, _> = state_sync
3754 .component_tracker
3755 .components
3756 .iter()
3757 .filter(|(id, _)| req_ids.contains(id))
3758 .map(|(k, v)| (k.clone(), v.clone()))
3759 .collect();
3760 let contract_ids: HashSet<Bytes> = state_sync
3761 .component_tracker
3762 .get_contracts_by_component(&req_ids)
3763 .into_iter()
3764 .collect();
3765 let params = FetchSnapshotParams {
3766 chain: Chain::Ethereum,
3767 protocol_system: "uniswap-v2".to_string(),
3768 block_number: header.number,
3769 uses_dci: false,
3770 retrieve_balances: true,
3771 include_tvl: false,
3772 };
3773 let (snapshot, _, _) =
3774 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
3775 .await
3776 .expect("Retrieving snapshot failed");
3777
3778 assert!(snapshot
3779 .states
3780 .contains_key("Component1"));
3781 }
3782
3783 #[test_log::test(tokio::test)]
3784 async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3785 let header = BlockHeader::default();
3786 let mut rpc = make_mock_client();
3787 let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3788
3789 let component_clone = component.clone();
3790 rpc.expect_get_snapshots()
3791 .returning(move |_request, _chunk_size, _concurrency| {
3792 Ok(Snapshot {
3793 states: [(
3794 "Component1".to_string(),
3795 ComponentWithState {
3796 state: ProtocolComponentState::new(
3797 "Component1",
3798 Default::default(),
3799 Default::default(),
3800 ),
3801 component: component_clone.clone(),
3802 entrypoints: vec![],
3803 component_tvl: None,
3804 },
3805 )]
3806 .into_iter()
3807 .collect(),
3808 vm_storage: HashMap::new(),
3809 })
3810 });
3811
3812 rpc.expect_get_traced_entry_points()
3814 .times(1)
3815 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3816
3817 let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3818 state_sync
3819 .component_tracker
3820 .components
3821 .insert("Component1".to_string(), component);
3822
3823 let components_arg = ["Component1".to_string()];
3824 let req_ids: Vec<String> = components_arg.to_vec();
3825 let components: HashMap<_, _> = state_sync
3826 .component_tracker
3827 .components
3828 .iter()
3829 .filter(|(id, _)| req_ids.contains(id))
3830 .map(|(k, v)| (k.clone(), v.clone()))
3831 .collect();
3832 let contract_ids: HashSet<Bytes> = state_sync
3833 .component_tracker
3834 .get_contracts_by_component(&req_ids)
3835 .into_iter()
3836 .collect();
3837 let params = FetchSnapshotParams {
3838 chain: Chain::Ethereum,
3839 protocol_system: "uniswap-v2".to_string(),
3840 block_number: header.number,
3841 uses_dci: true,
3842 retrieve_balances: true,
3843 include_tvl: false,
3844 };
3845 let (snapshot, _, _) =
3846 fetch_snapshot(&state_sync.rpc_client, components, contract_ids, ¶ms)
3847 .await
3848 .expect("Retrieving snapshot failed");
3849
3850 assert!(snapshot
3851 .states
3852 .contains_key("Component1"));
3853 }
3854
3855 #[test_log::test(tokio::test)]
3866 async fn test_partial_mode_defers_brand_new_component_snapshot_to_next_block() {
3867 use std::time::Duration;
3868
3869 use tokio::{sync::mpsc::channel, time::timeout};
3870
3871 let bg_done = Arc::new(tokio::sync::Notify::new());
3872 let mut rpc_client = make_mock_client();
3873 rpc_client
3875 .expect_get_protocol_components()
3876 .withf(|params: &crate::rpc::ProtocolComponentsParams| {
3877 params
3878 .component_ids()
3879 .is_some_and(|ids| ids.contains(&"BrandNew".to_string()))
3880 })
3881 .returning(|_| {
3882 Ok(Page::new(
3883 vec![
3884 ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3885 ProtocolComponent { id: "Preexisting".to_string(), ..Default::default() },
3886 ],
3887 2,
3888 0,
3889 100,
3890 ))
3891 });
3892 rpc_client
3894 .expect_get_protocol_components()
3895 .returning(|_| {
3896 Ok(Page::new(
3897 vec![
3898 ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
3899 ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
3900 ],
3901 2,
3902 0,
3903 100,
3904 ))
3905 });
3906 let bg_done_clone = bg_done.clone();
3908 rpc_client
3909 .expect_get_snapshots()
3910 .withf(
3911 |request: &SnapshotParameters,
3912 _chunk_size: &Option<usize>,
3913 _concurrency: &usize| {
3914 request.block_number == 2 &&
3915 (request
3916 .components
3917 .contains_key("BrandNew") ||
3918 request
3919 .components
3920 .contains_key("Preexisting"))
3921 },
3922 )
3923 .returning(move |_request, _chunk_size, _concurrency| {
3924 let snap = Ok(Snapshot {
3925 states: [
3926 (
3927 "BrandNew".to_string(),
3928 ComponentWithState {
3929 state: ProtocolComponentState::new(
3930 "BrandNew",
3931 Default::default(),
3932 Default::default(),
3933 ),
3934 component: ProtocolComponent {
3935 id: "BrandNew".to_string(),
3936 ..Default::default()
3937 },
3938 component_tvl: Some(100.0),
3939 entrypoints: vec![],
3940 },
3941 ),
3942 (
3943 "Preexisting".to_string(),
3944 ComponentWithState {
3945 state: ProtocolComponentState::new(
3946 "Preexisting",
3947 Default::default(),
3948 Default::default(),
3949 ),
3950 component: ProtocolComponent {
3951 id: "Preexisting".to_string(),
3952 ..Default::default()
3953 },
3954 component_tvl: Some(75.0),
3955 entrypoints: vec![],
3956 },
3957 ),
3958 ]
3959 .into_iter()
3960 .collect(),
3961 vm_storage: HashMap::new(),
3962 });
3963 bg_done_clone.notify_one();
3964 snap
3965 });
3966 rpc_client
3968 .expect_get_snapshots()
3969 .returning(|_request, _chunk_size, _concurrency| {
3970 Ok(Snapshot {
3971 states: [
3972 (
3973 "Component1".to_string(),
3974 ComponentWithState {
3975 state: ProtocolComponentState::new(
3976 "Component1",
3977 Default::default(),
3978 Default::default(),
3979 ),
3980 component: ProtocolComponent {
3981 id: "Component1".to_string(),
3982 ..Default::default()
3983 },
3984 component_tvl: Some(100.0),
3985 entrypoints: vec![],
3986 },
3987 ),
3988 (
3989 "Component2".to_string(),
3990 ComponentWithState {
3991 state: ProtocolComponentState::new(
3992 "Component2",
3993 Default::default(),
3994 Default::default(),
3995 ),
3996 component: ProtocolComponent {
3997 id: "Component2".to_string(),
3998 ..Default::default()
3999 },
4000 component_tvl: Some(0.0),
4001 entrypoints: vec![],
4002 },
4003 ),
4004 ]
4005 .into_iter()
4006 .collect(),
4007 vm_storage: HashMap::new(),
4008 })
4009 });
4010 rpc_client
4011 .expect_get_traced_entry_points()
4012 .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
4013
4014 let mut deltas_client = MockDeltasClient::new();
4015 let (tx, rx) = channel(4);
4016 deltas_client
4017 .expect_subscribe()
4018 .return_once(move |_, _| Ok((Uuid::default(), rx)));
4019 deltas_client
4020 .expect_unsubscribe()
4021 .return_once(|_| Ok(()));
4022
4023 let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
4024 .with_partial_blocks(true);
4025 state_sync
4026 .initialize()
4027 .await
4028 .expect("Init failed");
4029
4030 let (handle, mut block_rx) = state_sync.start().await;
4031 let (jh, close_tx) = handle.split();
4032
4033 tx.send(make_block_changes(1, None))
4035 .await
4036 .unwrap();
4037 let _msg1 = timeout(Duration::from_millis(200), block_rx.recv())
4038 .await
4039 .expect("Should receive initial + block 1")
4040 .expect("Channel open")
4041 .expect("No error");
4042
4043 let mut block2 = make_block_changes(2, Some(2));
4046 block2.new_protocol_components = HashMap::from([(
4047 "BrandNew".to_string(),
4048 ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
4049 )]);
4050 block2.component_tvl =
4051 HashMap::from([("BrandNew".to_string(), 100.0), ("Preexisting".to_string(), 75.0)]);
4052 tx.send(block2).await.unwrap();
4053 let msg2 = timeout(Duration::from_millis(200), block_rx.recv())
4054 .await
4055 .expect("Should receive block 2")
4056 .expect("Channel open")
4057 .expect("No error");
4058
4059 assert!(
4060 !msg2
4061 .snapshots
4062 .states
4063 .contains_key("Preexisting"),
4064 "Preexisting should still be deferred in block 2, not yet snapshotted; got: {:?}",
4065 msg2.snapshots
4066 .states
4067 .keys()
4068 .collect::<Vec<_>>()
4069 );
4070 assert!(
4071 !msg2
4072 .snapshots
4073 .states
4074 .contains_key("BrandNew"),
4075 "BrandNew should still be deferred in block 2, not yet snapshotted"
4076 );
4077
4078 tx.send(make_block_changes(3, Some(1)))
4081 .await
4082 .unwrap();
4083 let msg3 = timeout(Duration::from_millis(200), block_rx.recv())
4084 .await
4085 .expect("Should receive block 3")
4086 .expect("Channel open")
4087 .expect("No error");
4088
4089 assert_eq!(msg3.header.number, 3);
4090 assert_eq!(msg3.header.partial_block_index, Some(1));
4091 assert!(
4092 !msg3
4093 .snapshots
4094 .states
4095 .contains_key("BrandNew"),
4096 "BrandNew task just fired; snapshot not yet available in msg3"
4097 );
4098 assert!(
4099 !msg3
4100 .snapshots
4101 .states
4102 .contains_key("Preexisting"),
4103 "Preexisting task just fired; snapshot not yet available in msg3"
4104 );
4105
4106 bg_done.notified().await;
4108
4109 tx.send(make_block_changes(4, Some(0)))
4111 .await
4112 .unwrap();
4113 let msg4 = timeout(Duration::from_millis(200), block_rx.recv())
4114 .await
4115 .expect("Should receive block 4")
4116 .expect("Channel open")
4117 .expect("No error");
4118
4119 assert_eq!(msg4.header.number, 4);
4120 assert_eq!(msg4.header.partial_block_index, Some(0));
4121 assert!(
4122 msg4.snapshots
4123 .states
4124 .contains_key("BrandNew"),
4125 "BrandNew snapshot should be in msg4 after background task drains; got keys: {:?}",
4126 msg4.snapshots
4127 .states
4128 .keys()
4129 .collect::<Vec<_>>()
4130 );
4131 assert!(
4132 msg4.snapshots
4133 .states
4134 .contains_key("Preexisting"),
4135 "Preexisting snapshot should be in msg4 after background task drains; got keys: {:?}",
4136 msg4.snapshots
4137 .states
4138 .keys()
4139 .collect::<Vec<_>>()
4140 );
4141
4142 let _ = close_tx.send(());
4143 jh.await.expect("Task should not panic");
4144 }
4145
4146 #[test]
4150 fn test_apply_deltas_to_snapshot() {
4151 use tycho_common::models::{
4152 contract::{Account, AccountDelta},
4153 protocol::{ComponentBalance, ProtocolComponentStateDelta},
4154 ChangeType,
4155 };
4156
4157 let contract_addr = Bytes::from("0xc0ffee");
4158 let token_addr = Bytes::from("0xdeadbeef");
4159
4160 let mut snapshot = Snapshot {
4163 states: [(
4164 "comp1".to_string(),
4165 ComponentWithState {
4166 state: ProtocolComponentState::new(
4167 "comp1",
4168 [
4169 ("keep".to_string(), Bytes::from("0x01")),
4170 ("delete_me".to_string(), Bytes::from("0x02")),
4171 ]
4172 .into_iter()
4173 .collect(),
4174 [(token_addr.clone(), Bytes::from("0x64"))]
4175 .into_iter()
4176 .collect(),
4177 ),
4178 component: ProtocolComponent::default(),
4179 component_tvl: None,
4180 entrypoints: vec![],
4181 },
4182 )]
4183 .into_iter()
4184 .collect(),
4185 vm_storage: [(
4186 contract_addr.clone(),
4187 Account {
4188 chain: Chain::Ethereum,
4189 address: contract_addr.clone(),
4190 title: String::new(),
4191 slots: [(Bytes::from("0x01"), Bytes::from("0xaa"))]
4192 .into_iter()
4193 .collect(),
4194 native_balance: Bytes::from("0x10"),
4195 token_balances: HashMap::new(),
4196 code: Bytes::from("0x0a0b"),
4197 code_hash: Default::default(),
4198 balance_modify_tx: Default::default(),
4199 code_modify_tx: Default::default(),
4200 creation_tx: None,
4201 },
4202 )]
4203 .into_iter()
4204 .collect(),
4205 };
4206
4207 let skipped_delta = BlockAggregatedChanges {
4210 block: Block { number: 5, ..Default::default() },
4211 state_deltas: [(
4212 "comp1".to_string(),
4213 ProtocolComponentStateDelta::new(
4214 "comp1",
4215 [("keep".to_string(), Bytes::from("0xff"))]
4216 .into_iter()
4217 .collect(),
4218 HashSet::new(),
4219 ),
4220 )]
4221 .into_iter()
4222 .collect(),
4223 ..Default::default()
4224 };
4225 let applied_delta = BlockAggregatedChanges {
4226 block: Block { number: 6, ..Default::default() },
4227 state_deltas: [(
4228 "comp1".to_string(),
4229 ProtocolComponentStateDelta::new(
4230 "comp1",
4231 [("keep".to_string(), Bytes::from("0x99"))]
4232 .into_iter()
4233 .collect(),
4234 ["delete_me".to_string()]
4235 .into_iter()
4236 .collect(),
4237 ),
4238 )]
4239 .into_iter()
4240 .collect(),
4241 component_balances: [(
4242 "comp1".to_string(),
4243 [(
4244 token_addr.clone(),
4245 ComponentBalance::new(
4246 token_addr.clone(),
4247 Bytes::from("0xc8"),
4248 200.0,
4249 Default::default(),
4250 "comp1",
4251 ),
4252 )]
4253 .into_iter()
4254 .collect(),
4255 )]
4256 .into_iter()
4257 .collect(),
4258 account_deltas: [(
4259 contract_addr.clone(),
4260 AccountDelta::new(
4261 Chain::Ethereum,
4262 contract_addr.clone(),
4263 [
4264 (Bytes::from("0x01"), Some(Bytes::from("0xbb"))),
4265 (Bytes::from("0x02"), Some(Bytes::from("0xcc"))),
4266 ]
4267 .into_iter()
4268 .collect(),
4269 Some(Bytes::from("0x20")),
4270 Some(Bytes::from("0x0c0d")),
4271 ChangeType::Update,
4272 ),
4273 )]
4274 .into_iter()
4275 .collect(),
4276 ..Default::default()
4277 };
4278
4279 let mut sync = with_mocked_clients(true, false, None, None);
4280 sync.buffered_deltas = vec![skipped_delta, applied_delta];
4281
4282 let contract_ids: HashSet<Bytes> = [contract_addr.clone()]
4283 .into_iter()
4284 .collect();
4285 sync.apply_deltas_to_snapshot(&mut snapshot, 5, &contract_ids);
4286
4287 let comp = &snapshot.states["comp1"].state;
4288
4289 assert_eq!(comp.attributes["keep"], Bytes::from("0x99"));
4291 assert!(!comp
4293 .attributes
4294 .contains_key("delete_me"));
4295 assert_eq!(comp.balances[&token_addr], Bytes::from("0xc8"));
4297
4298 let account = &snapshot.vm_storage[&contract_addr];
4299 assert_eq!(account.slots[&Bytes::from("0x01")], Bytes::from("0xbb"));
4301 assert_eq!(account.slots[&Bytes::from("0x02")], Bytes::from("0xcc"));
4302 assert_eq!(account.native_balance, Bytes::from("0x20"));
4304 assert_eq!(account.code, Bytes::from("0x0c0d"));
4306 }
4307}