Skip to main content

tycho_client/feed/
synchronizer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    time::Duration,
4};
5
6use async_trait::async_trait;
7use thiserror::Error;
8use tokio::{
9    select,
10    sync::{
11        mpsc::{channel, error::SendError, Receiver, Sender},
12        oneshot,
13    },
14    task::JoinHandle,
15    time::{sleep, timeout},
16};
17use tracing::{debug, error, info, instrument, warn};
18use tycho_common::{
19    models::{
20        blockchain::{
21            BlockAggregatedChanges, DCIUpdate, EntryPointWithTracingParams, TracingResult,
22        },
23        contract::Account,
24        protocol::{ProtocolComponent, ProtocolComponentState},
25        Chain, ExtractorIdentity,
26    },
27    Bytes,
28};
29
30use crate::{
31    deltas::{DeltasClient, SubscriptionOptions},
32    feed::{
33        component_tracker::{ComponentFilter, ComponentTracker},
34        BlockHeader, HeaderLike,
35    },
36    rpc::{
37        RPCClient, RPCError, SnapshotParameters, TracedEntryPointsPaginatedParams,
38        RPC_CLIENT_CONCURRENCY,
39    },
40    DeltasError,
41};
42
43#[derive(Error, Debug)]
44pub enum SynchronizerError {
45    /// RPC client failures.
46    #[error("RPC error: {0}")]
47    RPCError(#[from] RPCError),
48
49    /// Issues with the main channel
50    #[error("{0}")]
51    ChannelError(String),
52
53    /// Timeout elapsed errors.
54    #[error("Timeout error: {0}")]
55    Timeout(String),
56
57    /// Failed to close the synchronizer.
58    #[error("Failed to close synchronizer: {0}")]
59    CloseError(String),
60
61    /// Server connection failures or interruptions.
62    #[error("Connection error: {0}")]
63    ConnectionError(String),
64
65    /// Connection closed
66    #[error("Connection closed")]
67    ConnectionClosed,
68
69    /// Internal error that should not happen under normal operation.
70    #[error("Internal error: {0}")]
71    Internal(String),
72}
73
74pub type SyncResult<T> = Result<T, SynchronizerError>;
75
76impl SynchronizerError {
77    /// Returns true if the error is transient and the failing operation can be retried.
78    ///
79    /// Transient: network/HTTP failures, rate limiting, server unavailability. These are
80    /// infrastructure problems that may resolve without any change to the request.
81    ///
82    /// Permanent: malformed data, fatal server errors, invalid requests. Retrying would produce
83    /// the same failure.
84    pub fn is_transient(&self) -> bool {
85        match self {
86            SynchronizerError::RPCError(e) => matches!(
87                e,
88                RPCError::HttpClient(_, _) |
89                    RPCError::RateLimited(_) |
90                    RPCError::ServerUnreachable(_) |
91                    RPCError::StaleBlock(_)
92            ),
93            SynchronizerError::Timeout(_) |
94            SynchronizerError::ConnectionError(_) |
95            SynchronizerError::ConnectionClosed => true,
96            _ => false,
97        }
98    }
99}
100
101impl<T> From<SendError<T>> for SynchronizerError {
102    fn from(err: SendError<T>) -> Self {
103        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
104    }
105}
106
107impl From<DeltasError> for SynchronizerError {
108    fn from(err: DeltasError) -> Self {
109        match err {
110            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
111            _ => SynchronizerError::ConnectionError(err.to_string()),
112        }
113    }
114}
115
116pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
117    extractor_id: ExtractorIdentity,
118    retrieve_balances: bool,
119    rpc_client: R,
120    deltas_client: D,
121    max_retries: u64,
122    retry_cooldown: Duration,
123    include_snapshots: bool,
124    component_tracker: ComponentTracker<R>,
125    last_synced_block: Option<BlockHeader>,
126    timeout: u64,
127    include_tvl: bool,
128    compression: bool,
129    partial_blocks: bool,
130    uses_dci: bool,
131    /// Background snapshot tasks spawned for new components. Each task may be in-flight or
132    /// finished; completed ones are harvested at the start of each delta iteration and their
133    /// results included in that block's message.
134    snapshot_tasks: Vec<SnapshotTask>,
135    /// Unfiltered deltas buffered while any snapshot task is in-flight, starting from the block
136    /// at which the oldest task was spawned. Applied to each snapshot at drain time to reconstruct
137    /// the component's current state.
138    buffered_deltas: Vec<BlockAggregatedChanges>,
139    /// State machine tracking components awaiting their initial snapshot. A component lives here
140    /// from the moment it's queued until its snapshot is successfully applied (at which point it
141    /// moves into `component_tracker.components`).
142    snapshot_queue: HashMap<String, SnapshotStatus>,
143}
144
145#[derive(Clone, PartialEq, Debug)]
146pub struct ComponentWithState {
147    pub state: ProtocolComponentState,
148    pub component: ProtocolComponent,
149    pub component_tvl: Option<f64>,
150    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
151}
152
153#[derive(Clone, PartialEq, Debug, Default)]
154pub struct Snapshot {
155    pub states: HashMap<String, ComponentWithState>,
156    pub vm_storage: HashMap<Bytes, Account>,
157}
158
159impl Snapshot {
160    fn extend(&mut self, other: Snapshot) {
161        self.states.extend(other.states);
162        self.vm_storage.extend(other.vm_storage);
163    }
164
165    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
166        &self.states
167    }
168
169    pub fn get_vm_storage(&self) -> &HashMap<Bytes, Account> {
170        &self.vm_storage
171    }
172}
173
174#[derive(Clone, PartialEq, Debug, Default)]
175pub struct StateSyncMessage<H>
176where
177    H: HeaderLike,
178{
179    /// The block information for this update.
180    pub header: H,
181    /// Snapshot for new components.
182    pub snapshots: Snapshot,
183    /// A single delta contains state updates for all tracked components, as well as additional
184    /// information about the system components e.g. newly added components (even below tvl), tvl
185    /// updates, balance updates.
186    pub deltas: Option<BlockAggregatedChanges>,
187    /// Components that stopped being tracked.
188    pub removed_components: HashMap<String, ProtocolComponent>,
189}
190
191impl<H> StateSyncMessage<H>
192where
193    H: HeaderLike,
194{
195    pub fn merge(mut self, other: Self) -> Self {
196        // be careful with removed and snapshots attributes here, these can be ambiguous.
197        self.removed_components
198            .retain(|k, _| !other.snapshots.states.contains_key(k));
199        self.snapshots
200            .states
201            .retain(|k, _| !other.removed_components.contains_key(k));
202
203        self.snapshots.extend(other.snapshots);
204        let deltas = match (self.deltas, other.deltas) {
205            (Some(l), Some(r)) => Some(l.merge(r)),
206            (None, Some(r)) => Some(r),
207            (Some(l), None) => Some(l),
208            (None, None) => None,
209        };
210        self.removed_components
211            .extend(other.removed_components);
212        Self {
213            header: other.header,
214            snapshots: self.snapshots,
215            deltas,
216            removed_components: self.removed_components,
217        }
218    }
219}
220
221/// Tracks the lifecycle of a component's initial snapshot request.
222#[derive(Debug, Clone, PartialEq)]
223enum SnapshotStatus {
224    /// Waiting for the next block boundary before fetching. Only used with partial blocks.
225    Deferred,
226    /// A background snapshot task has been spawned and is in-flight.
227    InFlight,
228    /// The last fetch attempt failed transiently; will be re-queued on the next delta.
229    RetryNext,
230    /// The fetch failed permanently; component is excluded until the synchronizer restarts.
231    Blacklisted,
232}
233
234struct SnapshotFetchResult {
235    components: HashMap<String, ProtocolComponent>,
236    contract_ids: HashSet<Bytes>,
237    dci_update: DCIUpdate,
238    snapshot: Snapshot,
239    snapshot_block: u64,
240}
241
242struct SnapshotTask {
243    component_ids: Vec<String>,
244    snapshot_block: u64,
245    receiver: oneshot::Receiver<Result<SnapshotFetchResult, SynchronizerError>>,
246}
247
248/// Handle for controlling a running synchronizer task.
249///
250/// This handle provides methods to gracefully shut down the synchronizer
251/// and await its completion with a timeout.
252pub struct SynchronizerTaskHandle {
253    join_handle: JoinHandle<()>,
254    close_tx: oneshot::Sender<()>,
255}
256
257/// StateSynchronizer
258///
259/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
260/// delivering messages to the client that let him reconstruct subsets of the protocol state.
261///
262/// This involves deciding which components to track according to the clients preferences,
263/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
264/// delivering delta messages for the components that have changed.
265impl SynchronizerTaskHandle {
266    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
267        Self { join_handle, close_tx }
268    }
269
270    /// Splits the handle into its join handle and close sender.
271    ///
272    /// This allows monitoring the task completion separately from controlling shutdown.
273    /// The join handle can be used with FuturesUnordered for monitoring, while the
274    /// close sender can be used to signal graceful shutdown.
275    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
276        (self.join_handle, self.close_tx)
277    }
278}
279
280#[async_trait]
281pub trait StateSynchronizer: Send + Sync + 'static {
282    async fn initialize(&mut self) -> SyncResult<()>;
283    /// Starts the state synchronization, consuming the synchronizer.
284    /// Returns a handle for controlling the running task and a receiver for messages.
285    async fn start(
286        mut self,
287    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
288}
289
290struct FetchSnapshotParams {
291    chain: Chain,
292    protocol_system: String,
293    block_number: u64,
294    uses_dci: bool,
295    retrieve_balances: bool,
296    include_tvl: bool,
297}
298
299/// Fetches a snapshot for given components. If DCI is enabled, also traces entry
300/// points and extends `contract_ids` with any contracts they access.
301///
302/// Returns the snapshot, the DCI update, and the complete set of contract IDs (original +
303/// DCI-discovered).
304async fn fetch_snapshot<R: RPCClient>(
305    rpc_client: &R,
306    components: HashMap<String, ProtocolComponent>,
307    mut contract_ids: HashSet<Bytes>,
308    params: &FetchSnapshotParams,
309) -> Result<(Snapshot, DCIUpdate, HashSet<Bytes>), SynchronizerError> {
310    if components.is_empty() {
311        return Ok((Snapshot::default(), DCIUpdate::default(), contract_ids));
312    }
313
314    let component_ids: Vec<String> = components.keys().cloned().collect();
315
316    let (dci_update, entrypoints_result) = if params.uses_dci {
317        let result = rpc_client
318            .get_traced_entry_points_paginated(TracedEntryPointsPaginatedParams::new(
319                params.chain,
320                &params.protocol_system,
321                component_ids.clone(),
322                RPC_CLIENT_CONCURRENCY,
323            ))
324            .await?;
325        let dci_contracts: HashSet<Bytes> = result
326            .values()
327            .flat_map(|traces| {
328                traces
329                    .iter()
330                    .flat_map(|(_, tr)| tr.accessed_slots.keys().cloned())
331            })
332            .collect();
333        contract_ids.extend(dci_contracts);
334        let eps = result.clone();
335        let dci: DCIUpdate = result.into();
336        (dci, eps)
337    } else {
338        (DCIUpdate::default(), HashMap::new())
339    };
340
341    let contract_ids_vec: Vec<Bytes> = contract_ids.iter().cloned().collect();
342    let request = SnapshotParameters::new(
343        params.chain,
344        &params.protocol_system,
345        &components,
346        &contract_ids_vec,
347        params.block_number,
348    )
349    .entrypoints(&entrypoints_result)
350    .include_balances(params.retrieve_balances)
351    .include_tvl(params.include_tvl);
352
353    let snapshot = rpc_client
354        .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
355        .await?;
356
357    Ok((snapshot, dci_update, contract_ids))
358}
359
360/// Fetches a snapshot for new components not yet in the tracker. Calls `get_protocol_components`
361/// to resolve the components, then delegates to `fetch_snapshot`.
362async fn fetch_snapshot_background<R: RPCClient>(
363    rpc_client: R,
364    component_ids: Vec<String>,
365    params: FetchSnapshotParams,
366) -> Result<SnapshotFetchResult, SynchronizerError> {
367    if component_ids.is_empty() {
368        return Ok(SnapshotFetchResult {
369            components: HashMap::new(),
370            contract_ids: HashSet::new(),
371            dci_update: DCIUpdate::default(),
372            snapshot: Snapshot::default(),
373            snapshot_block: params.block_number,
374        });
375    }
376
377    let request = crate::rpc::ProtocolComponentsParams::new(params.chain, &params.protocol_system)
378        .with_component_ids(component_ids);
379    let components: HashMap<String, ProtocolComponent> = rpc_client
380        .get_protocol_components(request)
381        .await?
382        .into_data()
383        .into_iter()
384        .map(|pc| (pc.id.clone(), pc))
385        .collect();
386
387    let contract_ids: HashSet<Bytes> = components
388        .values()
389        .flat_map(|c| c.contract_addresses.iter().cloned())
390        .collect();
391
392    let snapshot_block = params.block_number;
393    let (snapshot, dci_update, contract_ids) =
394        fetch_snapshot(&rpc_client, components.clone(), contract_ids, &params).await?;
395
396    Ok(SnapshotFetchResult { components, contract_ids, dci_update, snapshot, snapshot_block })
397}
398
399impl<R, D> ProtocolStateSynchronizer<R, D>
400where
401    // TODO: Consider moving these constraints directly to the
402    // client...
403    R: RPCClient + Clone + Send + Sync + 'static,
404    D: DeltasClient + Clone + Send + Sync + 'static,
405{
406    /// Creates a new state synchronizer.
407    #[allow(clippy::too_many_arguments)]
408    pub fn new(
409        extractor_id: ExtractorIdentity,
410        retrieve_balances: bool,
411        component_filter: ComponentFilter,
412        max_retries: u64,
413        retry_cooldown: Duration,
414        include_snapshots: bool,
415        include_tvl: bool,
416        compression: bool,
417        rpc_client: R,
418        deltas_client: D,
419        timeout: u64,
420    ) -> Self {
421        Self {
422            extractor_id: extractor_id.clone(),
423            retrieve_balances,
424            rpc_client: rpc_client.clone(),
425            include_snapshots,
426            deltas_client,
427            component_tracker: ComponentTracker::new(
428                extractor_id.chain,
429                extractor_id.name.as_str(),
430                component_filter,
431                rpc_client,
432            ),
433            max_retries,
434            retry_cooldown,
435            last_synced_block: None,
436            timeout,
437            include_tvl,
438            compression,
439            partial_blocks: false,
440            uses_dci: false,
441            snapshot_tasks: Vec::new(),
442            buffered_deltas: Vec::new(),
443            snapshot_queue: HashMap::new(),
444        }
445    }
446
447    /// Sets whether this protocol uses Dynamic Contract Indexing (DCI).
448    /// When true, entrypoints will be fetched during snapshot retrieval.
449    pub fn with_dci(mut self, uses_dci: bool) -> Self {
450        self.uses_dci = uses_dci;
451        self
452    }
453
454    /// Enables receiving partial block updates.
455    pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
456        self.partial_blocks = partial_blocks;
457        self
458    }
459
460    /// Main method that does all the work.
461    ///
462    /// ## Return Value
463    ///
464    /// Returns a `Result` where:
465    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
466    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
467    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
468    ///   retry)
469    ///
470    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
471    /// signal may still arrive and we want to remain cancellable across retries.
472    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
473    async fn state_sync(
474        &mut self,
475        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
476        mut end_rx: oneshot::Receiver<()>,
477    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
478        // initialisation
479        let subscription_options = SubscriptionOptions::new()
480            .with_state(self.include_snapshots)
481            .with_compression(self.compression)
482            .with_partial_blocks(self.partial_blocks);
483        let (subscription_id, mut msg_rx) = match self
484            .deltas_client
485            .subscribe(self.extractor_id.clone(), subscription_options)
486            .await
487        {
488            Ok(result) => result,
489            Err(e) => return Err((e.into(), Some(end_rx))),
490        };
491
492        let result = async {
493            info!("Waiting for deltas...");
494            // Track the last seen block number such that we know when we get the first partial
495            let mut last_block_number: Option<u64> = None;
496
497            // Outer loop: find a suitable first block and fetch its snapshot. Retries within the
498            // same subscription when the snapshot endpoint rejects the block as too old — this
499            // happens after a server restart whose persisted state is outside the plan retention
500            // window. Consuming the stale delta and waiting for the next one lets the server catch
501            // up without tearing down the WS subscription and rebuilding state from scratch.
502            const MAX_STALE_RETRIES: u32 = 5;
503            let mut stale_retries: u32 = 0;
504            let (msg, header) = 'init: loop {
505                let mut warned_waiting_for_new_block = false;
506                let mut warned_skipping_synced = false;
507                let mut first_msg = loop {
508                    let msg = select! {
509                        deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
510                            deltas_result
511                                .map_err(|_| {
512                                    SynchronizerError::Timeout(format!(
513                                        "First deltas took longer than {t}s to arrive",
514                                        t = self.timeout
515                                    ))
516                                })?
517                                .ok_or_else(|| {
518                                    SynchronizerError::ConnectionError(
519                                        "Deltas channel closed before first message".to_string(),
520                                    )
521                                })?
522                        },
523                        _ = &mut end_rx => {
524                            info!("Received close signal while waiting for first deltas");
525                            return Ok(());
526                        }
527                    };
528
529                    let incoming: BlockHeader = (&msg).into();
530
531                    // Determine if this message is a candidate for starting synchronization.
532                    // In partial mode, we wait for a new block to start (block number increases).
533                    // In non-partial mode, all messages are candidates.
534                    let is_new_block_candidate = if self.partial_blocks {
535                        match msg.partial_block_index {
536                            None => {
537                                // If we get a full block, it is a candidate
538                                last_block_number = Some(incoming.number);
539                                true
540                            }
541                            Some(current_partial_idx) => {
542                                let is_new_block = last_block_number
543                                    .map(|prev_block| incoming.number > prev_block)
544                                    .unwrap_or(false);
545
546                                if !warned_waiting_for_new_block {
547                                    info!(
548                                        extractor=%self.extractor_id,
549                                        block=incoming.number,
550                                        partial_idx=current_partial_idx,
551                                        "Syncing. Waiting for new block to start"
552                                    );
553                                    warned_waiting_for_new_block = true;
554                                }
555                                last_block_number = Some(incoming.number);
556                                is_new_block
557                            }
558                        }
559                    } else {
560                        true // Non-partial mode: all messages are candidates
561                    };
562
563                    if !is_new_block_candidate {
564                        continue;
565                    }
566
567                    // Check if we've already synced this block (applies to both modes)
568                    if let Some(current) = &self.last_synced_block {
569                        if current.number >= incoming.number && !self.is_next_expected(&incoming) {
570                            if !warned_skipping_synced {
571                                info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
572                                warned_skipping_synced = true;
573                            }
574                            continue;
575                        }
576                    }
577                    break msg;
578                };
579
580                self.filter_deltas(&mut first_msg);
581
582                // initial snapshot
583                info!(height = first_msg.get_block().number, "First deltas received");
584                let header: BlockHeader = (&first_msg).into();
585                let deltas_msg = StateSyncMessage {
586                    header: header.clone(),
587                    snapshots: Default::default(),
588                    deltas: Some(first_msg),
589                    removed_components: Default::default(),
590                };
591
592                // If possible skip retrieving snapshots
593                if !self.is_next_expected(&header) {
594                    info!("Retrieving snapshot");
595                    // With partial blocks, the server only has full blocks in its buffer; pass the
596                    // previous block's header so we request state at N-1, then merge with deltas.
597                    let snapshot_header = if self.partial_blocks && header.number > 0 {
598                        BlockHeader {
599                            number: header.number - 1,
600                            hash: header.parent_hash.clone(),
601                            ..Default::default()
602                        }
603                    } else {
604                        BlockHeader { revert: false, ..header.clone() }
605                    };
606                    let component_ids =
607                        self.component_tracker.get_tracked_component_ids();
608                    let init_snapshot = if !self.include_snapshots ||
609                        component_ids.is_empty()
610                    {
611                        Snapshot::default()
612                    } else {
613                        // Fetch initial snapshots
614                        let components: HashMap<_, _> = self
615                            .component_tracker
616                            .components
617                            .iter()
618                            .filter(|(id, _)| component_ids.contains(id))
619                            .map(|(k, v)| (k.clone(), v.clone()))
620                            .collect();
621                        let contract_ids: HashSet<Bytes> = self
622                            .component_tracker
623                            .get_contracts_by_component(&component_ids)
624                            .into_iter()
625                            .collect();
626                        let fetch_params = FetchSnapshotParams {
627                            chain: self.extractor_id.chain,
628                            protocol_system: self.extractor_id.name.clone(),
629                            block_number: snapshot_header.number,
630                            uses_dci: self.uses_dci,
631                            retrieve_balances: self.retrieve_balances,
632                            include_tvl: self.include_tvl,
633                        };
634                        match fetch_snapshot(
635                            &self.rpc_client,
636                            components,
637                            contract_ids,
638                            &fetch_params,
639                        )
640                        .await
641                        {
642                            Ok((snap, dci_update, _)) => {
643                                self.component_tracker
644                                    .process_entrypoints(&dci_update);
645                                snap
646                            }
647                            Err(SynchronizerError::RPCError(
648                                crate::rpc::RPCError::StaleBlock(reason),
649                            )) => {
650                                stale_retries += 1;
651                                if stale_retries > MAX_STALE_RETRIES {
652                                    return Err(SynchronizerError::RPCError(
653                                        crate::rpc::RPCError::StaleBlock(reason),
654                                    ));
655                                }
656                                // The server's persisted state for this block is outside
657                                // the plan retention window. Discard this delta and wait
658                                // for a fresher block from the same subscription rather
659                                // than restarting from scratch.
660                                warn!(
661                                    block = header.number,
662                                    stale_retries,
663                                    %reason,
664                                    "Snapshot block is outside server retention \
665                                     window; waiting for a more recent block"
666                                );
667                                continue 'init;
668                            }
669                            Err(e) => return Err(e),
670                        }
671                    };
672                    let n_components = self.component_tracker.components.len();
673                    let n_snapshots = init_snapshot.states.len();
674                    info!(
675                        n_components,
676                        n_snapshots,
677                        "Initial snapshot retrieved, starting delta message feed"
678                    );
679                    let snapshot_msg = StateSyncMessage {
680                        header: snapshot_header,
681                        snapshots: init_snapshot,
682                        deltas: None,
683                        removed_components: HashMap::new(),
684                    };
685                    break 'init (snapshot_msg.merge(deltas_msg), header);
686                } else {
687                    break 'init (deltas_msg, header);
688                }
689            };
690
691            block_tx.send(Ok(msg)).await?;
692            self.last_synced_block = Some(header);
693            loop {
694                select! {
695                    deltas_opt = msg_rx.recv() => {
696                        if let Some(mut deltas) = deltas_opt {
697                            let header: BlockHeader = (&deltas).into();
698                            debug!(block_number=?header.number, "Received delta message");
699
700                            // Buffer unfiltered delta while any snapshot task is in-flight.
701                            if !self.snapshot_tasks.is_empty() {
702                                self.buffered_deltas.push(deltas.clone());
703                            }
704
705                            let background_snapshots = self.drain_completed_snapshots();
706
707                            // Trim buffered_deltas: discard blocks no longer needed by any pending task.
708                            if self.snapshot_tasks.is_empty() {
709                                self.buffered_deltas.clear();
710                            } else {
711                                let oldest_pending_block = self
712                                    .snapshot_tasks
713                                    .iter()
714                                    .map(|p| p.snapshot_block)
715                                    .min()
716                                    .unwrap_or(u64::MAX);
717                                self.buffered_deltas
718                                    .retain(|d| d.block.number > oldest_pending_block);
719                            }
720
721                            let (snapshots, removed_components) = {
722                                let (to_add, to_remove) =
723                                    self.component_tracker.filter_updated_components(&deltas);
724
725                                // Harvest transient retries now so they feed into truly_new.
726                                // TVL changes are not re-emitted, so without explicit re-queuing
727                                // a transiently failed component would never be retried.
728                                // Remove from the map first so the truly_new filter below treats
729                                // them the same as brand-new components.
730                                let retry_ids: Vec<String> = self
731                                    .snapshot_queue
732                                    .iter()
733                                    .filter(|(_, s)| matches!(s, SnapshotStatus::RetryNext))
734                                    .map(|(id, _)| id.clone())
735                                    .collect();
736                                for id in &retry_ids {
737                                    self.snapshot_queue.remove(id);
738                                }
739
740                                // Components not yet tracked and not in the staged state machine
741                                // (not in-flight, not deferred, not blacklisted). Merges
742                                // delta-triggered new components with transient retries; `seen`
743                                // deduplicates the two sources.
744                                let truly_new: Vec<String> = {
745                                    let mut seen = HashSet::new();
746                                    to_add
747                                        .iter()
748                                        .chain(retry_ids.iter())
749                                        .filter(|id| {
750                                            !self.component_tracker
751                                                .components
752                                                .contains_key(id.as_str())
753                                                && !self
754                                                    .snapshot_queue
755                                                    .contains_key(id.as_str())
756                                                && seen.insert(id.as_str())
757                                        })
758                                        .cloned()
759                                        .collect()
760                                };
761
762                                if self.partial_blocks {
763                                    let is_new_block = self
764                                        .last_synced_block
765                                        .as_ref()
766                                        .map(|b| header.number > b.number)
767                                        .unwrap_or(true);
768
769                                    let has_deferred = self
770                                        .snapshot_queue
771                                        .values()
772                                        .any(|s| matches!(s, SnapshotStatus::Deferred));
773                                    if is_new_block && has_deferred && header.number > 0 {
774                                        // Block number incremented: the previous block is
775                                        // complete. Fire deferred components at that block's
776                                        // height.
777                                        let to_fire: Vec<String> = self
778                                            .snapshot_queue
779                                            .iter()
780                                            .filter(|(_, s)| matches!(s, SnapshotStatus::Deferred))
781                                            .map(|(id, _)| id.clone())
782                                            .collect();
783                                        for id in &to_fire {
784                                            self.snapshot_queue.remove(id);
785                                        }
786                                        let snapshot_header = BlockHeader {
787                                            number: header.number - 1,
788                                            hash: header.parent_hash.clone(),
789                                            ..Default::default()
790                                        };
791                                        debug!(
792                                            components = ?to_fire,
793                                            extractor = %self.extractor_id.name,
794                                            snapshot_block = header.number - 1,
795                                            "snapshot_deferred_to_background"
796                                        );
797                                        self.spawn_snapshot_task(
798                                            to_fire,
799                                            snapshot_header,
800                                            &deltas,
801                                        );
802                                    }
803
804                                    // Accumulate truly_new into the deferred set for the current
805                                    // block; they will be fired when the next block arrives.
806                                    for id in truly_new {
807                                        self.snapshot_queue
808                                            .insert(id, SnapshotStatus::Deferred);
809                                    }
810                                } else if !truly_new.is_empty() {
811                                    debug!(
812                                        components = ?truly_new,
813                                        extractor = %self.extractor_id.name,
814                                        block_number = ?header.number,
815                                        "snapshot_deferred_to_background"
816                                    );
817                                    let snapshot_header =
818                                        BlockHeader { revert: false, ..header.clone() };
819                                    self.spawn_snapshot_task(truly_new, snapshot_header, &deltas);
820                                }
821
822                                let snapshots = background_snapshots;
823
824                                let removed_components = if !to_remove.is_empty() {
825                                    self.component_tracker.stop_tracking(&to_remove)
826                                } else {
827                                    Default::default()
828                                };
829
830                                (snapshots, removed_components)
831                            };
832
833                            // Update entrypoints on the tracker (affects which contracts are tracked for DCI).
834                            self.component_tracker.process_entrypoints(&deltas.dci_update);
835
836                            // Filter deltas by currently tracked components / contracts.
837                            self.filter_deltas(&mut deltas);
838                            let n_changes = deltas.n_changes();
839
840                            let next = StateSyncMessage {
841                                header: header.clone(),
842                                snapshots,
843                                deltas: Some(deltas),
844                                removed_components,
845                            };
846                            block_tx.send(Ok(next)).await?;
847                            self.last_synced_block = Some(header.clone());
848
849                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
850                        } else {
851                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
852                        }
853                    },
854                    _ = &mut end_rx => {
855                        info!("Received close signal during state_sync");
856                        return Ok(());
857                    }
858                }
859            }
860        }.await;
861
862        // This cleanup code now runs regardless of how the function exits (error or channel close)
863        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
864        //Ignore error
865        let _ = self
866            .deltas_client
867            .unsubscribe(subscription_id)
868            .await
869            .map_err(|err| {
870                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
871            });
872
873        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
874        // whether the end_rx was consumed (close signal received) or not
875        match result {
876            Ok(()) => Ok(()), // Success, likely due to close signal
877            Err(e) => {
878                // The error came from the inner async block. Since the async block
879                // can receive close signals (which would return Ok), any error means
880                // the close signal was NOT received, so we can return the end_rx for retry
881                Err((e, Some(end_rx)))
882            }
883        }
884    }
885
886    /// Applies `self.buffered_deltas` to `snapshot`, updating attributes, balances, and contract
887    /// storage for deltas strictly after `snapshot_block`.
888    fn apply_deltas_to_snapshot(
889        &self,
890        snapshot: &mut Snapshot,
891        snapshot_block: u64,
892        contract_ids: &HashSet<Bytes>,
893    ) {
894        for delta in &self.buffered_deltas {
895            if delta.block.number <= snapshot_block {
896                continue;
897            }
898            for (component_id, state_delta) in &delta.state_deltas {
899                if let Some(cws) = snapshot.states.get_mut(component_id) {
900                    cws.state.attributes.extend(
901                        state_delta
902                            .updated_attributes
903                            .iter()
904                            .map(|(k, v)| (k.clone(), v.clone())),
905                    );
906                    for key in &state_delta.deleted_attributes {
907                        cws.state.attributes.remove(key);
908                    }
909                }
910            }
911            for (component_id, token_balances) in &delta.component_balances {
912                if let Some(cws) = snapshot.states.get_mut(component_id) {
913                    for (token, bal) in token_balances {
914                        cws.state
915                            .balances
916                            .insert(token.clone(), bal.balance.clone());
917                    }
918                }
919            }
920            for (address, account_delta) in &delta.account_deltas {
921                if contract_ids.contains(address) {
922                    if let Some(account) = snapshot.vm_storage.get_mut(address) {
923                        account.slots.extend(
924                            account_delta
925                                .slots
926                                .iter()
927                                .filter_map(|(k, v)| {
928                                    v.as_ref()
929                                        .map(|v| (k.clone(), v.clone()))
930                                }),
931                        );
932                        if let Some(balance) = &account_delta.balance {
933                            account.native_balance = balance.clone();
934                        }
935                        if let Some(code) = account_delta.code() {
936                            account.code = code.clone();
937                        }
938                    }
939                }
940            }
941        }
942    }
943
944    /// Spawns a background snapshot task for `component_ids` at `snapshot_header`. If no other
945    /// task is already in-flight, starts buffering deltas from `current_delta` so the snapshot
946    /// can be brought up to date when the task drains.
947    fn spawn_snapshot_task(
948        &mut self,
949        component_ids: Vec<String>,
950        snapshot_header: BlockHeader,
951        current_delta: &BlockAggregatedChanges,
952    ) {
953        let snapshot_block = snapshot_header.number;
954
955        if self.snapshot_tasks.is_empty() {
956            self.buffered_deltas
957                .push(current_delta.clone());
958        }
959
960        let (tx, rx) = oneshot::channel();
961        let rpc = self.rpc_client.clone();
962        let bg_params = FetchSnapshotParams {
963            chain: self.extractor_id.chain,
964            protocol_system: self.extractor_id.name.clone(),
965            block_number: snapshot_block,
966            uses_dci: self.uses_dci,
967            retrieve_balances: self.retrieve_balances,
968            include_tvl: self.include_tvl,
969        };
970        let ids = component_ids.clone();
971        tokio::spawn(async move {
972            let _ = tx.send(fetch_snapshot_background(rpc, ids, bg_params).await);
973        });
974        for id in &component_ids {
975            self.snapshot_queue
976                .insert(id.clone(), SnapshotStatus::InFlight);
977        }
978        self.snapshot_tasks
979            .push(SnapshotTask { component_ids, snapshot_block, receiver: rx });
980    }
981
982    /// Drains any background snapshot tasks that have completed. Returns a `Snapshot` containing
983    /// all ready results, with buffered deltas applied to bring each snapshot up to date.
984    fn drain_completed_snapshots(&mut self) -> Snapshot {
985        let mut result = Snapshot::default();
986        let pending = std::mem::take(&mut self.snapshot_tasks);
987
988        for mut p in pending {
989            match p.receiver.try_recv() {
990                Ok(Ok(fetch_result)) => {
991                    debug!(
992                        components = ?p.component_ids,
993                        extractor = %self.extractor_id.name,
994                        "snapshot_background_ready"
995                    );
996                    for id in &p.component_ids {
997                        self.snapshot_queue.remove(id);
998                    }
999                    let new_component_ids: Vec<String> = fetch_result
1000                        .components
1001                        .keys()
1002                        .cloned()
1003                        .collect();
1004                    self.component_tracker
1005                        .components
1006                        .extend(fetch_result.components);
1007                    self.component_tracker
1008                        .process_entrypoints(&fetch_result.dci_update);
1009                    self.component_tracker
1010                        .update_contracts(new_component_ids);
1011                    let mut snapshot = fetch_result.snapshot;
1012                    self.apply_deltas_to_snapshot(
1013                        &mut snapshot,
1014                        fetch_result.snapshot_block,
1015                        &fetch_result.contract_ids,
1016                    );
1017                    result.extend(snapshot);
1018                }
1019                Ok(Err(e)) => {
1020                    if e.is_transient() {
1021                        warn!(
1022                            components = ?p.component_ids,
1023                            extractor = %self.extractor_id.name,
1024                            err = %e,
1025                            "Background snapshot fetch failed transiently; will retry next block"
1026                        );
1027                        for id in &p.component_ids {
1028                            self.snapshot_queue
1029                                .insert(id.clone(), SnapshotStatus::RetryNext);
1030                        }
1031                    } else {
1032                        warn!(
1033                            components = ?p.component_ids,
1034                            extractor = %self.extractor_id.name,
1035                            err = %e,
1036                            "Background snapshot fetch failed permanently; \
1037                             components blacklisted until restart"
1038                        );
1039                        for id in &p.component_ids {
1040                            self.snapshot_queue
1041                                .insert(id.clone(), SnapshotStatus::Blacklisted);
1042                        }
1043                    }
1044                }
1045                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
1046                    self.snapshot_tasks.push(p);
1047                }
1048                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
1049                    warn!(
1050                        components = ?p.component_ids,
1051                        extractor = %self.extractor_id.name,
1052                        "Background snapshot task dropped before sending result"
1053                    );
1054                    for id in &p.component_ids {
1055                        self.snapshot_queue.remove(id);
1056                    }
1057                }
1058            }
1059        }
1060
1061        result
1062    }
1063
1064    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
1065        if let Some(block) = self.last_synced_block.as_ref() {
1066            return incoming.parent_hash == block.hash;
1067        }
1068        false
1069    }
1070    fn filter_deltas(&self, deltas: &mut BlockAggregatedChanges) {
1071        deltas.filter_by_component(|id| {
1072            self.component_tracker
1073                .components
1074                .contains_key(id)
1075        });
1076        deltas.filter_by_contract(|id| {
1077            self.component_tracker
1078                .contracts
1079                .contains(id)
1080        });
1081    }
1082}
1083
1084#[async_trait]
1085impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
1086where
1087    R: RPCClient + Clone + Send + Sync + 'static,
1088    D: DeltasClient + Clone + Send + Sync + 'static,
1089{
1090    async fn initialize(&mut self) -> SyncResult<()> {
1091        info!("Retrieving relevant protocol components");
1092        self.component_tracker
1093            .initialise_components()
1094            .await?;
1095        info!(
1096            n_components = self.component_tracker.components.len(),
1097            n_contracts = self.component_tracker.contracts.len(),
1098            extractor = %self.extractor_id,
1099            "Finished retrieving components",
1100        );
1101
1102        Ok(())
1103    }
1104
1105    async fn start(
1106        mut self,
1107    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1108        let (mut tx, rx) = channel(15);
1109        let (end_tx, end_rx) = oneshot::channel::<()>();
1110
1111        let jh = tokio::spawn(async move {
1112            let mut retry_count = 0;
1113            let mut current_end_rx = end_rx;
1114            let mut final_error = None;
1115
1116            while retry_count < self.max_retries {
1117                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
1118
1119                let prev_block = self
1120                    .last_synced_block
1121                    .as_ref()
1122                    .map(|h| h.number);
1123                let res = self
1124                    .state_sync(&mut tx, current_end_rx)
1125                    .await;
1126                let made_progress = self
1127                    .last_synced_block
1128                    .as_ref()
1129                    .map(|h| h.number) >
1130                    prev_block;
1131                match res {
1132                    Ok(()) => {
1133                        info!(
1134                            extractor_id=%&self.extractor_id,
1135                            retry_count,
1136                            "State synchronization exited cleanly"
1137                        );
1138                        return;
1139                    }
1140                    Err((e, maybe_end_rx)) => {
1141                        warn!(
1142                            extractor_id=%&self.extractor_id,
1143                            retry_count,
1144                            error=%e,
1145                            "State synchronization errored!"
1146                        );
1147
1148                        // If we have the end_rx back, we can retry
1149                        if let Some(recovered_end_rx) = maybe_end_rx {
1150                            current_end_rx = recovered_end_rx;
1151
1152                            if let SynchronizerError::ConnectionClosed = e {
1153                                // break synchronization loop if websocket client is dead
1154                                error!(
1155                                    "Websocket connection closed. State synchronization exiting."
1156                                );
1157                                let _ = tx.send(Err(e)).await;
1158                                return;
1159                            } else {
1160                                // Store error in case this is our last retry
1161                                final_error = Some(e);
1162                            }
1163                        } else {
1164                            // Close signal was received, exit cleanly
1165                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
1166                            return;
1167                        }
1168                    }
1169                }
1170                sleep(self.retry_cooldown).await;
1171                // A run that processed blocks is a healthy run — reset the counter so
1172                // transient failures after a long successful period get a fresh retry budget.
1173                if made_progress {
1174                    retry_count = 0;
1175                } else {
1176                    retry_count += 1;
1177                }
1178            }
1179            if let Some(e) = final_error {
1180                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
1181                let _ = tx.send(Err(e)).await;
1182            }
1183        });
1184
1185        let handle = SynchronizerTaskHandle::new(jh, end_tx);
1186        (handle, rx)
1187    }
1188}
1189
1190#[cfg(test)]
1191mod test {
1192    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
1193    //!
1194    //! ## Test Coverage Strategy:
1195    //!
1196    //! ### Shutdown & Close Signal Tests:
1197    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
1198    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
1199    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
1200    //!
1201    //! ### Cleanup & Error Handling Tests:
1202    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
1203    //!
1204    //! ### Coverage Summary:
1205    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
1206    //! ✓ Close signal before first deltas   ✓ Close signal during processing
1207    //! ✓ Processing errors                  ✓ Channel closure
1208    //! ✓ Public API close operations        ✓ Normal completion
1209
1210    use std::{collections::HashSet, sync::Arc};
1211
1212    use tycho_common::models::{
1213        blockchain::{
1214            AddressStorageLocation, Block, BlockAggregatedChanges, DCIUpdate, EntryPoint,
1215            EntryPointWithTracingParams, RPCTracerParams, TracingParams, TracingResult,
1216        },
1217        protocol::{ProtocolComponent, ProtocolComponentState},
1218        token::Token,
1219        Chain,
1220    };
1221    use uuid::Uuid;
1222
1223    use super::*;
1224    use crate::{
1225        deltas::MockDeltasClient,
1226        rpc::{MockRPCClient, Page},
1227        DeltasError, RPCError,
1228    };
1229
1230    // Required for mock client to implement clone
1231    struct ArcRPCClient<T>(Arc<T>);
1232
1233    // Default derive(Clone) does require T to be Clone as well.
1234    impl<T> Clone for ArcRPCClient<T> {
1235        fn clone(&self) -> Self {
1236            ArcRPCClient(self.0.clone())
1237        }
1238    }
1239
1240    #[async_trait]
1241    impl<T> RPCClient for ArcRPCClient<T>
1242    where
1243        T: RPCClient + Sync + Send + 'static,
1244    {
1245        async fn get_tokens(
1246            &self,
1247            params: crate::rpc::TokensParams,
1248        ) -> Result<crate::rpc::Page<Vec<Token>>, RPCError> {
1249            self.0.get_tokens(params).await
1250        }
1251
1252        async fn get_contract_state(
1253            &self,
1254            params: crate::rpc::ContractStateParams,
1255        ) -> Result<crate::rpc::Page<Vec<Account>>, RPCError> {
1256            self.0.get_contract_state(params).await
1257        }
1258
1259        async fn get_protocol_components(
1260            &self,
1261            params: crate::rpc::ProtocolComponentsParams,
1262        ) -> Result<crate::rpc::Page<Vec<ProtocolComponent>>, RPCError> {
1263            self.0
1264                .get_protocol_components(params)
1265                .await
1266        }
1267
1268        async fn get_protocol_states(
1269            &self,
1270            params: crate::rpc::ProtocolStatesParams,
1271        ) -> Result<crate::rpc::Page<Vec<ProtocolComponentState>>, RPCError> {
1272            self.0.get_protocol_states(params).await
1273        }
1274
1275        async fn get_protocol_systems(
1276            &self,
1277            params: crate::rpc::ProtocolSystemsParams,
1278        ) -> Result<crate::rpc::Page<crate::rpc::ProtocolSystems>, RPCError> {
1279            self.0
1280                .get_protocol_systems(params)
1281                .await
1282        }
1283
1284        async fn get_component_tvl(
1285            &self,
1286            params: crate::rpc::ComponentTvlParams,
1287        ) -> Result<crate::rpc::Page<HashMap<String, f64>>, RPCError> {
1288            self.0.get_component_tvl(params).await
1289        }
1290
1291        async fn get_traced_entry_points(
1292            &self,
1293            params: crate::rpc::TracedEntryPointsParams,
1294        ) -> Result<
1295            crate::rpc::Page<HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>>,
1296            RPCError,
1297        > {
1298            self.0
1299                .get_traced_entry_points(params)
1300                .await
1301        }
1302
1303        async fn get_snapshots<'a>(
1304            &self,
1305            request: &SnapshotParameters<'a>,
1306            chunk_size: Option<usize>,
1307            concurrency: usize,
1308        ) -> Result<Snapshot, RPCError> {
1309            self.0
1310                .get_snapshots(request, chunk_size, concurrency)
1311                .await
1312        }
1313
1314        fn compression(&self) -> bool {
1315            self.0.compression()
1316        }
1317    }
1318
1319    // Required for mock client to implement clone
1320    struct ArcDeltasClient<T>(Arc<T>);
1321
1322    // Default derive(Clone) does require T to be Clone as well.
1323    impl<T> Clone for ArcDeltasClient<T> {
1324        fn clone(&self) -> Self {
1325            ArcDeltasClient(self.0.clone())
1326        }
1327    }
1328
1329    #[async_trait]
1330    impl<T> DeltasClient for ArcDeltasClient<T>
1331    where
1332        T: DeltasClient + Sync + Send + 'static,
1333    {
1334        async fn subscribe(
1335            &self,
1336            extractor_id: tycho_common::models::ExtractorIdentity,
1337            options: SubscriptionOptions,
1338        ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError> {
1339            self.0
1340                .subscribe(extractor_id, options)
1341                .await
1342        }
1343
1344        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
1345            self.0
1346                .unsubscribe(subscription_id)
1347                .await
1348        }
1349
1350        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
1351            self.0.connect().await
1352        }
1353
1354        async fn close(&self) -> Result<(), DeltasError> {
1355            self.0.close().await
1356        }
1357    }
1358
1359    fn with_mocked_clients(
1360        native: bool,
1361        include_tvl: bool,
1362        rpc_client: Option<MockRPCClient>,
1363        deltas_client: Option<MockDeltasClient>,
1364    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
1365    {
1366        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
1367        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
1368
1369        ProtocolStateSynchronizer::new(
1370            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1371            native,
1372            ComponentFilter::with_tvl_range(50.0, 50.0),
1373            1,
1374            Duration::from_secs(0),
1375            true,
1376            include_tvl,
1377            true, // Does not matter as we mock the client that never compresses
1378            rpc_client,
1379            deltas_client,
1380            10_u64,
1381        )
1382    }
1383
1384    fn state_snapshot_native() -> Vec<ProtocolComponentState> {
1385        vec![ProtocolComponentState {
1386            component_id: "Component1".to_string(),
1387            attributes: HashMap::new(),
1388            balances: HashMap::new(),
1389        }]
1390    }
1391
1392    fn make_mock_client() -> MockRPCClient {
1393        let mut m = MockRPCClient::new();
1394        m.expect_compression()
1395            .return_const(false);
1396        m
1397    }
1398
1399    #[test_log::test(tokio::test)]
1400    async fn test_get_snapshots_native() {
1401        let header = BlockHeader::default();
1402        let mut rpc = make_mock_client();
1403        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1404
1405        let component_clone = component.clone();
1406        rpc.expect_get_snapshots()
1407            .returning(move |_request, _chunk_size, _concurrency| {
1408                Ok(Snapshot {
1409                    states: state_snapshot_native()
1410                        .into_iter()
1411                        .map(|state| {
1412                            (
1413                                state.component_id.clone(),
1414                                ComponentWithState {
1415                                    state,
1416                                    component: component_clone.clone(),
1417                                    entrypoints: vec![],
1418                                    component_tvl: None,
1419                                },
1420                            )
1421                        })
1422                        .collect(),
1423                    vm_storage: HashMap::new(),
1424                })
1425            });
1426
1427        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1428        state_sync
1429            .component_tracker
1430            .components
1431            .insert("Component1".to_string(), component.clone());
1432        let components_arg = ["Component1".to_string()];
1433        let exp = StateSyncMessage {
1434            header: header.clone(),
1435            snapshots: Snapshot {
1436                states: state_snapshot_native()
1437                    .into_iter()
1438                    .map(|state| {
1439                        (
1440                            state.component_id.clone(),
1441                            ComponentWithState {
1442                                state,
1443                                component: component.clone(),
1444                                entrypoints: vec![],
1445                                component_tvl: None,
1446                            },
1447                        )
1448                    })
1449                    .collect(),
1450                vm_storage: HashMap::new(),
1451            },
1452            deltas: None,
1453            removed_components: Default::default(),
1454        };
1455
1456        let req_ids: Vec<String> = components_arg.to_vec();
1457        let components: HashMap<_, _> = state_sync
1458            .component_tracker
1459            .components
1460            .iter()
1461            .filter(|(id, _)| req_ids.contains(id))
1462            .map(|(k, v)| (k.clone(), v.clone()))
1463            .collect();
1464        let contract_ids: HashSet<Bytes> = state_sync
1465            .component_tracker
1466            .get_contracts_by_component(&req_ids)
1467            .into_iter()
1468            .collect();
1469        let params = FetchSnapshotParams {
1470            chain: Chain::Ethereum,
1471            protocol_system: "uniswap-v2".to_string(),
1472            block_number: header.number,
1473            uses_dci: false,
1474            retrieve_balances: true,
1475            include_tvl: false,
1476        };
1477        let (snapshot, _, _) =
1478            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1479                .await
1480                .expect("Retrieving snapshot failed");
1481        let snap = StateSyncMessage {
1482            header: header.clone(),
1483            snapshots: snapshot,
1484            deltas: None,
1485            removed_components: Default::default(),
1486        };
1487
1488        assert_eq!(snap, exp);
1489    }
1490
1491    #[test_log::test(tokio::test)]
1492    async fn test_get_snapshots_native_with_tvl() {
1493        let header = BlockHeader::default();
1494        let mut rpc = make_mock_client();
1495        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1496
1497        let component_clone = component.clone();
1498        rpc.expect_get_snapshots()
1499            .returning(move |_request, _chunk_size, _concurrency| {
1500                Ok(Snapshot {
1501                    states: state_snapshot_native()
1502                        .into_iter()
1503                        .map(|state| {
1504                            (
1505                                state.component_id.clone(),
1506                                ComponentWithState {
1507                                    state,
1508                                    component: component_clone.clone(),
1509                                    component_tvl: Some(100.0),
1510                                    entrypoints: vec![],
1511                                },
1512                            )
1513                        })
1514                        .collect(),
1515                    vm_storage: HashMap::new(),
1516                })
1517            });
1518
1519        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1520        state_sync
1521            .component_tracker
1522            .components
1523            .insert("Component1".to_string(), component.clone());
1524        let components_arg = ["Component1".to_string()];
1525        let exp = StateSyncMessage {
1526            header: header.clone(),
1527            snapshots: Snapshot {
1528                states: state_snapshot_native()
1529                    .into_iter()
1530                    .map(|state| {
1531                        (
1532                            state.component_id.clone(),
1533                            ComponentWithState {
1534                                state,
1535                                component: component.clone(),
1536                                component_tvl: Some(100.0),
1537                                entrypoints: vec![],
1538                            },
1539                        )
1540                    })
1541                    .collect(),
1542                vm_storage: HashMap::new(),
1543            },
1544            deltas: None,
1545            removed_components: Default::default(),
1546        };
1547
1548        let req_ids: Vec<String> = components_arg.to_vec();
1549        let components: HashMap<_, _> = state_sync
1550            .component_tracker
1551            .components
1552            .iter()
1553            .filter(|(id, _)| req_ids.contains(id))
1554            .map(|(k, v)| (k.clone(), v.clone()))
1555            .collect();
1556        let contract_ids: HashSet<Bytes> = state_sync
1557            .component_tracker
1558            .get_contracts_by_component(&req_ids)
1559            .into_iter()
1560            .collect();
1561        let params = FetchSnapshotParams {
1562            chain: Chain::Ethereum,
1563            protocol_system: "uniswap-v2".to_string(),
1564            block_number: header.number,
1565            uses_dci: false,
1566            retrieve_balances: true,
1567            include_tvl: true,
1568        };
1569        let (snapshot, _, _) =
1570            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1571                .await
1572                .expect("Retrieving snapshot failed");
1573        let snap = StateSyncMessage {
1574            header: header.clone(),
1575            snapshots: snapshot,
1576            deltas: None,
1577            removed_components: Default::default(),
1578        };
1579
1580        assert_eq!(snap, exp);
1581    }
1582
1583    fn state_snapshot_vm() -> Vec<Account> {
1584        vec![
1585            Account::new(
1586                Chain::default(),
1587                Bytes::from("0x0badc0ffee"),
1588                String::new(),
1589                HashMap::new(),
1590                Bytes::default(),
1591                HashMap::new(),
1592                Bytes::default(),
1593                Bytes::default(),
1594                Bytes::default(),
1595                Bytes::default(),
1596                None,
1597            ),
1598            Account::new(
1599                Chain::default(),
1600                Bytes::from("0xbabe42"),
1601                String::new(),
1602                HashMap::new(),
1603                Bytes::default(),
1604                HashMap::new(),
1605                Bytes::default(),
1606                Bytes::default(),
1607                Bytes::default(),
1608                Bytes::default(),
1609                None,
1610            ),
1611        ]
1612    }
1613
1614    fn traced_entry_point_response(
1615    ) -> HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>> {
1616        HashMap::from([(
1617            "Component1".to_string(),
1618            vec![(
1619                EntryPointWithTracingParams {
1620                    entry_point: EntryPoint {
1621                        external_id: "entrypoint_a".to_string(),
1622                        target: Bytes::from("0x0badc0ffee"),
1623                        signature: "sig()".to_string(),
1624                    },
1625                    params: TracingParams::RPCTracer(RPCTracerParams {
1626                        caller: Some(Bytes::from("0x0badc0ffee")),
1627                        calldata: Bytes::from("0x0badc0ffee"),
1628                        state_overrides: None,
1629                        prune_addresses: None,
1630                    }),
1631                },
1632                TracingResult {
1633                    retriggers: HashSet::from([(
1634                        Bytes::from("0x0badc0ffee"),
1635                        AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1636                    )]),
1637                    accessed_slots: HashMap::from([(
1638                        Bytes::from("0x0badc0ffee"),
1639                        HashSet::from([Bytes::from("0xbadbeef0")]),
1640                    )]),
1641                },
1642            )],
1643        )])
1644    }
1645
1646    #[test_log::test(tokio::test)]
1647    async fn test_get_snapshots_vm() {
1648        let header = BlockHeader::default();
1649        let mut rpc = make_mock_client();
1650
1651        let traced_ep_response = traced_entry_point_response();
1652        rpc.expect_get_snapshots()
1653            .returning(move |_request, _chunk_size, _concurrency| {
1654                let vm_storage_accounts = state_snapshot_vm();
1655                Ok(Snapshot {
1656                    states: [(
1657                        "Component1".to_string(),
1658                        ComponentWithState {
1659                            state: ProtocolComponentState {
1660                                component_id: "Component1".to_string(),
1661                                attributes: HashMap::new(),
1662                                balances: HashMap::new(),
1663                            },
1664                            component: ProtocolComponent {
1665                                id: "Component1".to_string(),
1666                                contract_addresses: vec![
1667                                    Bytes::from("0x0badc0ffee"),
1668                                    Bytes::from("0xbabe42"),
1669                                ],
1670                                ..Default::default()
1671                            },
1672                            component_tvl: None,
1673                            entrypoints: traced_ep_response
1674                                .get("Component1")
1675                                .cloned()
1676                                .unwrap_or_default(),
1677                        },
1678                    )]
1679                    .into_iter()
1680                    .collect(),
1681                    vm_storage: vm_storage_accounts
1682                        .into_iter()
1683                        .map(|account| (account.address.clone(), account))
1684                        .collect(),
1685                })
1686            });
1687
1688        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1689        let component = ProtocolComponent {
1690            id: "Component1".to_string(),
1691            contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1692            ..Default::default()
1693        };
1694        state_sync
1695            .component_tracker
1696            .components
1697            .insert("Component1".to_string(), component.clone());
1698        let components_arg = ["Component1".to_string()];
1699        let exp = StateSyncMessage {
1700            header: header.clone(),
1701            snapshots: Snapshot {
1702                states: [(
1703                    component.id.clone(),
1704                    ComponentWithState {
1705                        state: ProtocolComponentState {
1706                            component_id: "Component1".to_string(),
1707                            attributes: HashMap::new(),
1708                            balances: HashMap::new(),
1709                        },
1710                        component: component.clone(),
1711                        component_tvl: None,
1712                        entrypoints: traced_entry_point_response()
1713                            .remove("Component1")
1714                            .unwrap_or_default(),
1715                    },
1716                )]
1717                .into_iter()
1718                .collect(),
1719                vm_storage: state_snapshot_vm()
1720                    .into_iter()
1721                    .map(|account| (account.address.clone(), account))
1722                    .collect(),
1723            },
1724            deltas: None,
1725            removed_components: Default::default(),
1726        };
1727
1728        let req_ids: Vec<String> = components_arg.to_vec();
1729        let components: HashMap<_, _> = state_sync
1730            .component_tracker
1731            .components
1732            .iter()
1733            .filter(|(id, _)| req_ids.contains(id))
1734            .map(|(k, v)| (k.clone(), v.clone()))
1735            .collect();
1736        let contract_ids: HashSet<Bytes> = state_sync
1737            .component_tracker
1738            .get_contracts_by_component(&req_ids)
1739            .into_iter()
1740            .collect();
1741        let params = FetchSnapshotParams {
1742            chain: Chain::Ethereum,
1743            protocol_system: "uniswap-v2".to_string(),
1744            block_number: header.number,
1745            uses_dci: false,
1746            retrieve_balances: false,
1747            include_tvl: false,
1748        };
1749        let (snapshot, _, _) =
1750            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1751                .await
1752                .expect("Retrieving snapshot failed");
1753        let snap = StateSyncMessage {
1754            header: header.clone(),
1755            snapshots: snapshot,
1756            deltas: None,
1757            removed_components: Default::default(),
1758        };
1759
1760        assert_eq!(snap, exp);
1761    }
1762
1763    #[test_log::test(tokio::test)]
1764    async fn test_get_snapshots_vm_with_tvl() {
1765        let header = BlockHeader::default();
1766        let mut rpc = make_mock_client();
1767        let component = ProtocolComponent {
1768            id: "Component1".to_string(),
1769            contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1770            ..Default::default()
1771        };
1772
1773        let component_clone = component.clone();
1774        rpc.expect_get_snapshots()
1775            .returning(move |_request, _chunk_size, _concurrency| {
1776                let vm_storage_accounts = state_snapshot_vm();
1777                Ok(Snapshot {
1778                    states: [(
1779                        "Component1".to_string(),
1780                        ComponentWithState {
1781                            state: ProtocolComponentState {
1782                                component_id: "Component1".to_string(),
1783                                attributes: HashMap::new(),
1784                                balances: HashMap::new(),
1785                            },
1786                            component: component_clone.clone(),
1787                            component_tvl: Some(100.0),
1788                            entrypoints: vec![],
1789                        },
1790                    )]
1791                    .into_iter()
1792                    .collect(),
1793                    vm_storage: vm_storage_accounts
1794                        .into_iter()
1795                        .map(|account| (account.address.clone(), account))
1796                        .collect(),
1797                })
1798            });
1799
1800        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1801        state_sync
1802            .component_tracker
1803            .components
1804            .insert("Component1".to_string(), component.clone());
1805        let components_arg = ["Component1".to_string()];
1806        let exp = StateSyncMessage {
1807            header: header.clone(),
1808            snapshots: Snapshot {
1809                states: [(
1810                    component.id.clone(),
1811                    ComponentWithState {
1812                        state: ProtocolComponentState {
1813                            component_id: "Component1".to_string(),
1814                            attributes: HashMap::new(),
1815                            balances: HashMap::new(),
1816                        },
1817                        component: component.clone(),
1818                        component_tvl: Some(100.0),
1819                        entrypoints: vec![],
1820                    },
1821                )]
1822                .into_iter()
1823                .collect(),
1824                vm_storage: state_snapshot_vm()
1825                    .into_iter()
1826                    .map(|account| (account.address.clone(), account))
1827                    .collect(),
1828            },
1829            deltas: None,
1830            removed_components: Default::default(),
1831        };
1832
1833        let req_ids: Vec<String> = components_arg.to_vec();
1834        let components: HashMap<_, _> = state_sync
1835            .component_tracker
1836            .components
1837            .iter()
1838            .filter(|(id, _)| req_ids.contains(id))
1839            .map(|(k, v)| (k.clone(), v.clone()))
1840            .collect();
1841        let contract_ids: HashSet<Bytes> = state_sync
1842            .component_tracker
1843            .get_contracts_by_component(&req_ids)
1844            .into_iter()
1845            .collect();
1846        let params = FetchSnapshotParams {
1847            chain: Chain::Ethereum,
1848            protocol_system: "uniswap-v2".to_string(),
1849            block_number: header.number,
1850            uses_dci: false,
1851            retrieve_balances: false,
1852            include_tvl: true,
1853        };
1854        let (snapshot, _, _) =
1855            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1856                .await
1857                .expect("Retrieving snapshot failed");
1858        let snap = StateSyncMessage {
1859            header: header.clone(),
1860            snapshots: snapshot,
1861            deltas: None,
1862            removed_components: Default::default(),
1863        };
1864
1865        assert_eq!(snap, exp);
1866    }
1867
1868    /// Test that get_snapshots only fetches snapshots for requested components,
1869    /// not all tracked components. This prevents returning full snapshots repeatedly
1870    /// when only a subset of components need updates.
1871    #[test_log::test(tokio::test)]
1872    async fn test_get_snapshots_filters_to_requested_components_only() {
1873        let header = BlockHeader::default();
1874        let mut rpc = make_mock_client();
1875
1876        // Create three components
1877        let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1878        let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1879        let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1880
1881        let component2_clone = component2.clone();
1882
1883        // Mock the RPC call and verify it only receives Component2
1884        rpc.expect_get_snapshots()
1885            .withf(
1886                |request: &SnapshotParameters,
1887                 _chunk_size: &Option<usize>,
1888                 _concurrency: &usize| {
1889                    // Verify that the request contains ONLY Component2, not all tracked components
1890                    request.components.len() == 1 &&
1891                        request
1892                            .components
1893                            .contains_key("Component2")
1894                },
1895            )
1896            .times(1)
1897            .returning(move |_request, _chunk_size, _concurrency| {
1898                Ok(Snapshot {
1899                    states: [(
1900                        "Component2".to_string(),
1901                        ComponentWithState {
1902                            state: ProtocolComponentState {
1903                                component_id: "Component2".to_string(),
1904                                attributes: HashMap::new(),
1905                                balances: HashMap::new(),
1906                            },
1907                            component: component2_clone.clone(),
1908                            entrypoints: vec![],
1909                            component_tvl: None,
1910                        },
1911                    )]
1912                    .into_iter()
1913                    .collect(),
1914                    vm_storage: HashMap::new(),
1915                })
1916            });
1917
1918        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1919
1920        // Track all three components
1921        state_sync
1922            .component_tracker
1923            .components
1924            .insert("Component1".to_string(), component1.clone());
1925        state_sync
1926            .component_tracker
1927            .components
1928            .insert("Component2".to_string(), component2.clone());
1929        state_sync
1930            .component_tracker
1931            .components
1932            .insert("Component3".to_string(), component3.clone());
1933
1934        // Request snapshot for ONLY Component2
1935        let components_arg = ["Component2".to_string()];
1936        let req_ids: Vec<String> = components_arg.to_vec();
1937        let components: HashMap<_, _> = state_sync
1938            .component_tracker
1939            .components
1940            .iter()
1941            .filter(|(id, _)| req_ids.contains(id))
1942            .map(|(k, v)| (k.clone(), v.clone()))
1943            .collect();
1944        let contract_ids: HashSet<Bytes> = state_sync
1945            .component_tracker
1946            .get_contracts_by_component(&req_ids)
1947            .into_iter()
1948            .collect();
1949        let params = FetchSnapshotParams {
1950            chain: Chain::Ethereum,
1951            protocol_system: "uniswap-v2".to_string(),
1952            block_number: header.number,
1953            uses_dci: false,
1954            retrieve_balances: true,
1955            include_tvl: false,
1956        };
1957        let (snapshot, _, _) =
1958            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1959                .await
1960                .expect("Retrieving snapshot failed");
1961
1962        // Verify we only got Component2 back
1963        assert_eq!(snapshot.states.len(), 1);
1964        assert!(snapshot
1965            .states
1966            .contains_key("Component2"));
1967        assert!(!snapshot
1968            .states
1969            .contains_key("Component1"));
1970        assert!(!snapshot
1971            .states
1972            .contains_key("Component3"));
1973    }
1974
1975    fn mock_clients_for_state_sync(
1976        bg_done: Option<Arc<tokio::sync::Notify>>,
1977    ) -> (MockRPCClient, MockDeltasClient, Sender<BlockAggregatedChanges>) {
1978        let mut rpc_client = make_mock_client();
1979        // Mocks for the start_tracking call, these need to come first because they are more
1980        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1981        rpc_client
1982            .expect_get_protocol_components()
1983            .withf(|params: &crate::rpc::ProtocolComponentsParams| {
1984                params
1985                    .component_ids()
1986                    .is_some_and(|ids| ids.contains(&"Component3".to_string()))
1987            })
1988            .returning(|_| {
1989                // return Component3
1990                Ok(Page::new(
1991                    vec![
1992                        // this component shall have a tvl update above threshold
1993                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1994                    ],
1995                    1,
1996                    0,
1997                    100,
1998                ))
1999            });
2000        // Mock get_snapshots for Component3
2001        rpc_client
2002            .expect_get_snapshots()
2003            .withf(
2004                |request: &SnapshotParameters,
2005                 _chunk_size: &Option<usize>,
2006                 _concurrency: &usize| {
2007                    request
2008                        .components
2009                        .contains_key("Component3")
2010                },
2011            )
2012            .returning(move |_request, _chunk_size, _concurrency| {
2013                let snap = Ok(Snapshot {
2014                    states: [(
2015                        "Component3".to_string(),
2016                        ComponentWithState {
2017                            state: ProtocolComponentState::new(
2018                                "Component3",
2019                                Default::default(),
2020                                Default::default(),
2021                            ),
2022                            component: ProtocolComponent {
2023                                id: "Component3".to_string(),
2024                                ..Default::default()
2025                            },
2026                            component_tvl: Some(1000.0),
2027                            entrypoints: vec![],
2028                        },
2029                    )]
2030                    .into_iter()
2031                    .collect(),
2032                    vm_storage: HashMap::new(),
2033                });
2034                if let Some(n) = &bg_done {
2035                    n.notify_one();
2036                }
2037                snap
2038            });
2039
2040        // mock calls for the initial state snapshots
2041        rpc_client
2042            .expect_get_protocol_components()
2043            .returning(|_| {
2044                // Initial sync of components
2045                Ok(Page::new(
2046                    vec![
2047                        // this component shall have a tvl update above threshold
2048                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2049                        // this component shall have a tvl update below threshold.
2050                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2051                        // a third component will have a tvl update above threshold
2052                    ],
2053                    2,
2054                    0,
2055                    100,
2056                ))
2057            });
2058
2059        rpc_client
2060            .expect_get_snapshots()
2061            .returning(|_request, _chunk_size, _concurrency| {
2062                Ok(Snapshot {
2063                    states: [
2064                        (
2065                            "Component1".to_string(),
2066                            ComponentWithState {
2067                                state: ProtocolComponentState::new(
2068                                    "Component1",
2069                                    Default::default(),
2070                                    Default::default(),
2071                                ),
2072                                component: ProtocolComponent {
2073                                    id: "Component1".to_string(),
2074                                    ..Default::default()
2075                                },
2076                                component_tvl: Some(100.0),
2077                                entrypoints: vec![],
2078                            },
2079                        ),
2080                        (
2081                            "Component2".to_string(),
2082                            ComponentWithState {
2083                                state: ProtocolComponentState::new(
2084                                    "Component2",
2085                                    Default::default(),
2086                                    Default::default(),
2087                                ),
2088                                component: ProtocolComponent {
2089                                    id: "Component2".to_string(),
2090                                    ..Default::default()
2091                                },
2092                                component_tvl: Some(0.0),
2093                                entrypoints: vec![],
2094                            },
2095                        ),
2096                    ]
2097                    .into_iter()
2098                    .collect(),
2099                    vm_storage: HashMap::new(),
2100                })
2101            });
2102
2103        // Mock get_traced_entry_points for Ethereum chain
2104        rpc_client
2105            .expect_get_traced_entry_points()
2106            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2107
2108        // Mock deltas client and messages
2109        let mut deltas_client = MockDeltasClient::new();
2110        let (tx, rx) = channel(1);
2111        deltas_client
2112            .expect_subscribe()
2113            .return_once(move |_, _| {
2114                // Return subscriber id and a channel
2115                Ok((Uuid::default(), rx))
2116            });
2117
2118        // Expect unsubscribe call during cleanup
2119        deltas_client
2120            .expect_unsubscribe()
2121            .return_once(|_| Ok(()));
2122
2123        (rpc_client, deltas_client, tx)
2124    }
2125
2126    /// Test strategy
2127    ///
2128    /// - initial snapshot retrieval returns component1 and component2 as snapshots
2129    /// - block 1: DCI update for Component1; no new components
2130    /// - block 2: Component3 TVL crosses threshold → background snapshot task spawned; snapshot
2131    ///   appears in block 3 after the background task drains
2132    /// - block 3: empty block; drain produces Component3 snapshot
2133    #[test_log::test(tokio::test)]
2134    async fn test_state_sync() {
2135        let bg_done = Arc::new(tokio::sync::Notify::new());
2136        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(Some(bg_done.clone()));
2137        let deltas = [
2138            BlockAggregatedChanges {
2139                extractor: "uniswap-v2".to_string(),
2140                chain: Chain::Ethereum,
2141                block: Block {
2142                    number: 1,
2143                    hash: Bytes::from("0x01"),
2144                    parent_hash: Bytes::from("0x00"),
2145                    chain: Chain::Ethereum,
2146                    ts: Default::default(),
2147                },
2148                revert: false,
2149                dci_update: DCIUpdate {
2150                    new_entrypoints: HashMap::from([(
2151                        "Component1".to_string(),
2152                        HashSet::from([EntryPoint {
2153                            external_id: "entrypoint_a".to_string(),
2154                            target: Bytes::from("0x0badc0ffee"),
2155                            signature: "sig()".to_string(),
2156                        }]),
2157                    )]),
2158                    new_entrypoint_params: HashMap::from([(
2159                        "entrypoint_a".to_string(),
2160                        HashSet::from([(
2161                            TracingParams::RPCTracer(RPCTracerParams {
2162                                caller: Some(Bytes::from("0x0badc0ffee")),
2163                                calldata: Bytes::from("0x0badc0ffee"),
2164                                state_overrides: None,
2165                                prune_addresses: None,
2166                            }),
2167                            "Component1".to_string(),
2168                        )]),
2169                    )]),
2170                    trace_results: HashMap::from([(
2171                        "entrypoint_a".to_string(),
2172                        TracingResult {
2173                            retriggers: HashSet::from([(
2174                                Bytes::from("0x0badc0ffee"),
2175                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
2176                            )]),
2177                            accessed_slots: HashMap::from([(
2178                                Bytes::from("0x0badc0ffee"),
2179                                HashSet::from([Bytes::from("0xbadbeef0")]),
2180                            )]),
2181                        },
2182                    )]),
2183                },
2184                ..Default::default()
2185            },
2186            BlockAggregatedChanges {
2187                extractor: "uniswap-v2".to_string(),
2188                chain: Chain::Ethereum,
2189                block: Block {
2190                    number: 2,
2191                    hash: Bytes::from("0x02"),
2192                    parent_hash: Bytes::from("0x01"),
2193                    chain: Chain::Ethereum,
2194                    ts: Default::default(),
2195                },
2196                revert: false,
2197                component_tvl: [
2198                    ("Component1".to_string(), 100.0),
2199                    ("Component2".to_string(), 0.0),
2200                    ("Component3".to_string(), 1000.0),
2201                ]
2202                .into_iter()
2203                .collect(),
2204                ..Default::default()
2205            },
2206            // Block 3: empty block; the background task for Component3 should have completed,
2207            // so drain_completed_snapshots returns the Component3 snapshot.
2208            BlockAggregatedChanges {
2209                extractor: "uniswap-v2".to_string(),
2210                chain: Chain::Ethereum,
2211                block: Block {
2212                    number: 3,
2213                    hash: Bytes::from("0x03"),
2214                    parent_hash: Bytes::from("0x02"),
2215                    chain: Chain::Ethereum,
2216                    ts: Default::default(),
2217                },
2218                revert: false,
2219                ..Default::default()
2220            },
2221        ];
2222        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
2223        state_sync
2224            .initialize()
2225            .await
2226            .expect("Init failed");
2227
2228        // Test starts here
2229        let (handle, mut rx) = state_sync.start().await;
2230        let (jh, close_tx) = handle.split();
2231        tx.send(deltas[0].clone())
2232            .await
2233            .expect("deltas channel msg 0 closed!");
2234        let first_msg = timeout(Duration::from_millis(200), rx.recv())
2235            .await
2236            .expect("waiting for first state msg timed out!")
2237            .expect("state sync block sender closed!");
2238        tx.send(deltas[1].clone())
2239            .await
2240            .expect("deltas channel msg 1 closed!");
2241        let second_msg = timeout(Duration::from_millis(200), rx.recv())
2242            .await
2243            .expect("waiting for second state msg timed out!")
2244            .expect("state sync block sender closed!");
2245        // Wait for the background snapshot task to complete before sending block 3.
2246        bg_done.notified().await;
2247        tx.send(deltas[2].clone())
2248            .await
2249            .expect("deltas channel msg 2 closed!");
2250        let third_msg = timeout(Duration::from_millis(200), rx.recv())
2251            .await
2252            .expect("waiting for third state msg timed out!")
2253            .expect("state sync block sender closed!");
2254        let _ = close_tx.send(());
2255        jh.await
2256            .expect("state sync task panicked!");
2257
2258        // assertions
2259        let exp1 = StateSyncMessage {
2260            header: BlockHeader {
2261                number: 1,
2262                hash: Bytes::from("0x01"),
2263                parent_hash: Bytes::from("0x00"),
2264                revert: false,
2265                ..Default::default()
2266            },
2267            snapshots: Snapshot {
2268                states: [
2269                    (
2270                        "Component1".to_string(),
2271                        ComponentWithState {
2272                            state: ProtocolComponentState::new(
2273                                "Component1",
2274                                Default::default(),
2275                                Default::default(),
2276                            ),
2277                            component: ProtocolComponent {
2278                                id: "Component1".to_string(),
2279                                ..Default::default()
2280                            },
2281                            component_tvl: Some(100.0),
2282                            entrypoints: vec![],
2283                        },
2284                    ),
2285                    (
2286                        "Component2".to_string(),
2287                        ComponentWithState {
2288                            state: ProtocolComponentState::new(
2289                                "Component2",
2290                                Default::default(),
2291                                Default::default(),
2292                            ),
2293                            component: ProtocolComponent {
2294                                id: "Component2".to_string(),
2295                                ..Default::default()
2296                            },
2297                            component_tvl: Some(0.0),
2298                            entrypoints: vec![],
2299                        },
2300                    ),
2301                ]
2302                .into_iter()
2303                .collect(),
2304                vm_storage: HashMap::new(),
2305            },
2306            deltas: Some(deltas[0].clone()),
2307            removed_components: Default::default(),
2308        };
2309
2310        // Block 2: Component3 snapshot task is spawned in the background. Component3 is not
2311        // yet tracked, so it is filtered from component_tvl. Snapshot is empty.
2312        let exp2 = StateSyncMessage {
2313            header: BlockHeader {
2314                number: 2,
2315                hash: Bytes::from("0x02"),
2316                parent_hash: Bytes::from("0x01"),
2317                revert: false,
2318                ..Default::default()
2319            },
2320            snapshots: Snapshot::default(),
2321            deltas: Some(BlockAggregatedChanges {
2322                extractor: "uniswap-v2".to_string(),
2323                chain: Chain::Ethereum,
2324                block: Block {
2325                    number: 2,
2326                    hash: Bytes::from("0x02"),
2327                    parent_hash: Bytes::from("0x01"),
2328                    chain: Chain::Ethereum,
2329                    ts: Default::default(),
2330                },
2331                revert: false,
2332                component_tvl: [
2333                    // Component2 removed (tvl=0), Component3 not yet tracked → filtered out.
2334                    ("Component1".to_string(), 100.0),
2335                ]
2336                .into_iter()
2337                .collect(),
2338                ..Default::default()
2339            }),
2340            // "Component2" was removed, because its tvl changed to 0.
2341            removed_components: [(
2342                "Component2".to_string(),
2343                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2344            )]
2345            .into_iter()
2346            .collect(),
2347        };
2348
2349        // Block 3: background task has completed; Component3 snapshot is drained and included.
2350        let exp3 = StateSyncMessage {
2351            header: BlockHeader {
2352                number: 3,
2353                hash: Bytes::from("0x03"),
2354                parent_hash: Bytes::from("0x02"),
2355                revert: false,
2356                ..Default::default()
2357            },
2358            snapshots: Snapshot {
2359                states: [(
2360                    "Component3".to_string(),
2361                    ComponentWithState {
2362                        state: ProtocolComponentState::new(
2363                            "Component3",
2364                            Default::default(),
2365                            Default::default(),
2366                        ),
2367                        component: ProtocolComponent {
2368                            id: "Component3".to_string(),
2369                            ..Default::default()
2370                        },
2371                        component_tvl: Some(1000.0),
2372                        entrypoints: vec![],
2373                    },
2374                )]
2375                .into_iter()
2376                .collect(),
2377                vm_storage: HashMap::new(),
2378            },
2379            deltas: Some(deltas[2].clone()),
2380            removed_components: Default::default(),
2381        };
2382        assert_eq!(first_msg.unwrap(), exp1);
2383        assert_eq!(second_msg.unwrap(), exp2);
2384        assert_eq!(third_msg.unwrap(), exp3);
2385    }
2386
2387    #[test_log::test(tokio::test)]
2388    async fn test_state_sync_with_tvl_range() {
2389        let remove_tvl_threshold = 5.0;
2390        let add_tvl_threshold = 7.0;
2391        let bg_done = Arc::new(tokio::sync::Notify::new());
2392
2393        let mut rpc_client = make_mock_client();
2394        let mut deltas_client = MockDeltasClient::new();
2395
2396        rpc_client
2397            .expect_get_protocol_components()
2398            .withf(|params: &crate::rpc::ProtocolComponentsParams| {
2399                params
2400                    .component_ids()
2401                    .is_some_and(|ids| ids.contains(&"Component3".to_string()))
2402            })
2403            .returning(|_| {
2404                Ok(Page::new(
2405                    vec![ProtocolComponent { id: "Component3".to_string(), ..Default::default() }],
2406                    1,
2407                    0,
2408                    100,
2409                ))
2410            });
2411        // Mock get_snapshots for Component3
2412        let bg_done_clone = bg_done.clone();
2413        rpc_client
2414            .expect_get_snapshots()
2415            .withf(
2416                |request: &SnapshotParameters,
2417                 _chunk_size: &Option<usize>,
2418                 _concurrency: &usize| {
2419                    request
2420                        .components
2421                        .contains_key("Component3")
2422                },
2423            )
2424            .returning(move |_request, _chunk_size, _concurrency| {
2425                let snap = Ok(Snapshot {
2426                    states: [(
2427                        "Component3".to_string(),
2428                        ComponentWithState {
2429                            state: ProtocolComponentState::new(
2430                                "Component3",
2431                                Default::default(),
2432                                Default::default(),
2433                            ),
2434                            component: ProtocolComponent {
2435                                id: "Component3".to_string(),
2436                                ..Default::default()
2437                            },
2438                            component_tvl: Some(10.0),
2439                            entrypoints: vec![],
2440                        },
2441                    )]
2442                    .into_iter()
2443                    .collect(),
2444                    vm_storage: HashMap::new(),
2445                });
2446                bg_done_clone.notify_one();
2447                snap
2448            });
2449
2450        // Mock for the initial snapshot retrieval
2451        rpc_client
2452            .expect_get_protocol_components()
2453            .returning(|_| {
2454                Ok(Page::new(
2455                    vec![
2456                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2457                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2458                    ],
2459                    2,
2460                    0,
2461                    100,
2462                ))
2463            });
2464
2465        // Mock get_snapshots for initial snapshot
2466        rpc_client
2467            .expect_get_snapshots()
2468            .returning(|_request, _chunk_size, _concurrency| {
2469                Ok(Snapshot {
2470                    states: [
2471                        (
2472                            "Component1".to_string(),
2473                            ComponentWithState {
2474                                state: ProtocolComponentState::new(
2475                                    "Component1",
2476                                    Default::default(),
2477                                    Default::default(),
2478                                ),
2479                                component: ProtocolComponent {
2480                                    id: "Component1".to_string(),
2481                                    ..Default::default()
2482                                },
2483                                component_tvl: Some(6.0),
2484                                entrypoints: vec![],
2485                            },
2486                        ),
2487                        (
2488                            "Component2".to_string(),
2489                            ComponentWithState {
2490                                state: ProtocolComponentState::new(
2491                                    "Component2",
2492                                    Default::default(),
2493                                    Default::default(),
2494                                ),
2495                                component: ProtocolComponent {
2496                                    id: "Component2".to_string(),
2497                                    ..Default::default()
2498                                },
2499                                component_tvl: Some(2.0),
2500                                entrypoints: vec![],
2501                            },
2502                        ),
2503                    ]
2504                    .into_iter()
2505                    .collect(),
2506                    vm_storage: HashMap::new(),
2507                })
2508            });
2509
2510        // Mock get_traced_entry_points for Ethereum chain
2511        rpc_client
2512            .expect_get_traced_entry_points()
2513            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2514
2515        let (tx, rx) = channel(1);
2516        deltas_client
2517            .expect_subscribe()
2518            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2519
2520        // Expect unsubscribe call during cleanup
2521        deltas_client
2522            .expect_unsubscribe()
2523            .return_once(|_| Ok(()));
2524
2525        let mut state_sync = ProtocolStateSynchronizer::new(
2526            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2527            true,
2528            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
2529            1,
2530            Duration::from_secs(0),
2531            true,
2532            true,
2533            true,
2534            ArcRPCClient(Arc::new(rpc_client)),
2535            ArcDeltasClient(Arc::new(deltas_client)),
2536            10_u64,
2537        );
2538        state_sync
2539            .initialize()
2540            .await
2541            .expect("Init failed");
2542
2543        // Simulate the incoming BlockAggregatedChanges
2544        let deltas = [
2545            BlockAggregatedChanges {
2546                extractor: "uniswap-v2".to_string(),
2547                chain: Chain::Ethereum,
2548                block: Block {
2549                    number: 1,
2550                    hash: Bytes::from("0x01"),
2551                    parent_hash: Bytes::from("0x00"),
2552                    chain: Chain::Ethereum,
2553                    ts: Default::default(),
2554                },
2555                revert: false,
2556                ..Default::default()
2557            },
2558            BlockAggregatedChanges {
2559                extractor: "uniswap-v2".to_string(),
2560                chain: Chain::Ethereum,
2561                block: Block {
2562                    number: 2,
2563                    hash: Bytes::from("0x02"),
2564                    parent_hash: Bytes::from("0x01"),
2565                    chain: Chain::Ethereum,
2566                    ts: Default::default(),
2567                },
2568                revert: false,
2569                component_tvl: [
2570                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
2571                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
2572                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
2573                ]
2574                .into_iter()
2575                .collect(),
2576                ..Default::default()
2577            },
2578            // Block 3: empty; background task for Component3 should have completed.
2579            BlockAggregatedChanges {
2580                extractor: "uniswap-v2".to_string(),
2581                chain: Chain::Ethereum,
2582                block: Block {
2583                    number: 3,
2584                    hash: Bytes::from("0x03"),
2585                    parent_hash: Bytes::from("0x02"),
2586                    chain: Chain::Ethereum,
2587                    ts: Default::default(),
2588                },
2589                revert: false,
2590                ..Default::default()
2591            },
2592        ];
2593
2594        let (handle, mut rx) = state_sync.start().await;
2595        let (jh, close_tx) = handle.split();
2596
2597        // Simulate sending delta messages
2598        tx.send(deltas[0].clone())
2599            .await
2600            .expect("deltas channel msg 0 closed!");
2601
2602        // Expecting to receive the initial state message
2603        let _ = timeout(Duration::from_millis(200), rx.recv())
2604            .await
2605            .expect("waiting for first state msg timed out!")
2606            .expect("state sync block sender closed!");
2607
2608        // Send the second message, which should trigger TVL-based changes.
2609        // Component3 snapshot is deferred to background; not in this block's message.
2610        tx.send(deltas[1].clone())
2611            .await
2612            .expect("deltas channel msg 1 closed!");
2613        let second_msg = timeout(Duration::from_millis(200), rx.recv())
2614            .await
2615            .expect("waiting for second state msg timed out!")
2616            .expect("state sync block sender closed!")
2617            .expect("no error");
2618
2619        // Wait for the background snapshot task to complete before sending block 3.
2620        bg_done.notified().await;
2621
2622        tx.send(deltas[2].clone())
2623            .await
2624            .expect("deltas channel msg 2 closed!");
2625        let third_msg = timeout(Duration::from_millis(200), rx.recv())
2626            .await
2627            .expect("waiting for third state msg timed out!")
2628            .expect("state sync block sender closed!")
2629            .expect("no error");
2630
2631        let _ = close_tx.send(());
2632        jh.await
2633            .expect("state sync task panicked!");
2634
2635        // Block 2: Component3 task spawned; snapshot is empty, Component3 filtered from deltas.
2636        let expected_second_msg = StateSyncMessage {
2637            header: BlockHeader {
2638                number: 2,
2639                hash: Bytes::from("0x02"),
2640                parent_hash: Bytes::from("0x01"),
2641                revert: false,
2642                ..Default::default()
2643            },
2644            snapshots: Snapshot::default(),
2645            deltas: Some(BlockAggregatedChanges {
2646                extractor: "uniswap-v2".to_string(),
2647                chain: Chain::Ethereum,
2648                block: Block {
2649                    number: 2,
2650                    hash: Bytes::from("0x02"),
2651                    parent_hash: Bytes::from("0x01"),
2652                    chain: Chain::Ethereum,
2653                    ts: Default::default(),
2654                },
2655                revert: false,
2656                component_tvl: [
2657                    ("Component1".to_string(), 6.0), /* Within range, should not trigger changes
2658                                                      * Component3 not yet tracked → filtered
2659                                                      * out */
2660                ]
2661                .into_iter()
2662                .collect(),
2663                ..Default::default()
2664            }),
2665            removed_components: [(
2666                "Component2".to_string(),
2667                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2668            )]
2669            .into_iter()
2670            .collect(),
2671        };
2672
2673        // Block 3: background task drained; Component3 snapshot present.
2674        let expected_third_msg = StateSyncMessage {
2675            header: BlockHeader {
2676                number: 3,
2677                hash: Bytes::from("0x03"),
2678                parent_hash: Bytes::from("0x02"),
2679                revert: false,
2680                ..Default::default()
2681            },
2682            snapshots: Snapshot {
2683                states: [(
2684                    "Component3".to_string(),
2685                    ComponentWithState {
2686                        state: ProtocolComponentState::new(
2687                            "Component3",
2688                            Default::default(),
2689                            Default::default(),
2690                        ),
2691                        component: ProtocolComponent {
2692                            id: "Component3".to_string(),
2693                            ..Default::default()
2694                        },
2695                        component_tvl: Some(10.0),
2696                        entrypoints: vec![],
2697                    },
2698                )]
2699                .into_iter()
2700                .collect(),
2701                vm_storage: HashMap::new(),
2702            },
2703            deltas: Some(deltas[2].clone()),
2704            removed_components: Default::default(),
2705        };
2706
2707        assert_eq!(second_msg, expected_second_msg);
2708        assert_eq!(third_msg, expected_third_msg);
2709    }
2710
2711    #[test_log::test(tokio::test)]
2712    async fn test_public_close_api_functionality() {
2713        // Tests the public close() API through the StateSynchronizer trait:
2714        // - close() fails before start() is called
2715        // - close() succeeds while synchronizer is running
2716        // - close() fails after already closed
2717        // This tests the full start/close lifecycle via the public API
2718
2719        let mut rpc_client = make_mock_client();
2720        let mut deltas_client = MockDeltasClient::new();
2721
2722        // Mock the initial components call
2723        rpc_client
2724            .expect_get_protocol_components()
2725            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2726
2727        // Set up deltas client that will wait for messages (blocking in state_sync)
2728        let (_tx, rx) = channel(1);
2729        deltas_client
2730            .expect_subscribe()
2731            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2732
2733        // Expect unsubscribe call during cleanup
2734        deltas_client
2735            .expect_unsubscribe()
2736            .return_once(|_| Ok(()));
2737
2738        let mut state_sync = ProtocolStateSynchronizer::new(
2739            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2740            true,
2741            ComponentFilter::with_tvl_range(0.0, 0.0),
2742            5, // Enough retries
2743            Duration::from_secs(0),
2744            true,
2745            false,
2746            true,
2747            ArcRPCClient(Arc::new(rpc_client)),
2748            ArcDeltasClient(Arc::new(deltas_client)),
2749            10000_u64, // Long timeout so task doesn't exit on its own
2750        );
2751
2752        state_sync
2753            .initialize()
2754            .await
2755            .expect("Init should succeed");
2756
2757        // Start the synchronizer and test the new split-based close mechanism
2758        let (handle, _rx) = state_sync.start().await;
2759        let (jh, close_tx) = handle.split();
2760
2761        // Give it time to start up and enter state_sync
2762        tokio::time::sleep(Duration::from_millis(100)).await;
2763
2764        // Send close signal should succeed
2765        close_tx
2766            .send(())
2767            .expect("Should be able to send close signal");
2768        // Task should stop cleanly
2769        jh.await.expect("Task should not panic");
2770    }
2771
2772    #[test_log::test(tokio::test)]
2773    async fn test_cleanup_runs_when_state_sync_processing_errors() {
2774        // Tests that cleanup code runs when state_sync() errors during delta processing.
2775        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
2776        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
2777
2778        let mut rpc_client = make_mock_client();
2779        let mut deltas_client = MockDeltasClient::new();
2780
2781        // Mock the initial components call
2782        rpc_client
2783            .expect_get_protocol_components()
2784            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2785
2786        // Mock to fail during snapshot retrieval (this will cause an error during processing)
2787        rpc_client
2788            .expect_get_protocol_states()
2789            .returning(|_| {
2790                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2791            });
2792
2793        // Set up deltas client to send one message that will trigger snapshot retrieval
2794        let (tx, rx) = channel(10);
2795        deltas_client
2796            .expect_subscribe()
2797            .return_once(move |_, _| {
2798                // Send a delta message that will require a snapshot
2799                let delta = BlockAggregatedChanges {
2800                    extractor: "test".to_string(),
2801                    chain: Chain::Ethereum,
2802                    block: Block {
2803                        hash: Bytes::from("0x0123"),
2804                        number: 1,
2805                        parent_hash: Bytes::from("0x0000"),
2806                        chain: Chain::Ethereum,
2807                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2808                            .unwrap()
2809                            .naive_utc(),
2810                    },
2811                    revert: false,
2812                    // Add a new component to trigger snapshot request
2813                    new_protocol_components: [(
2814                        "new_component".to_string(),
2815                        ProtocolComponent { id: "new_component".to_string(), ..Default::default() },
2816                    )]
2817                    .into_iter()
2818                    .collect(),
2819                    component_tvl: [("new_component".to_string(), 100.0)]
2820                        .into_iter()
2821                        .collect(),
2822                    ..Default::default()
2823                };
2824
2825                tokio::spawn(async move {
2826                    let _ = tx.send(delta).await;
2827                    // Close the channel after sending one message
2828                });
2829
2830                Ok((Uuid::default(), rx))
2831            });
2832
2833        // Expect unsubscribe call during cleanup
2834        deltas_client
2835            .expect_unsubscribe()
2836            .return_once(|_| Ok(()));
2837
2838        let mut state_sync = ProtocolStateSynchronizer::new(
2839            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2840            true,
2841            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2842            1,
2843            Duration::from_secs(0),
2844            true,
2845            false,
2846            true,
2847            ArcRPCClient(Arc::new(rpc_client)),
2848            ArcDeltasClient(Arc::new(deltas_client)),
2849            5000_u64,
2850        );
2851
2852        state_sync
2853            .initialize()
2854            .await
2855            .expect("Init should succeed");
2856
2857        // Before calling state_sync, set a value in last_synced_block
2858        state_sync.last_synced_block = Some(BlockHeader {
2859            hash: Bytes::from("0x0badc0ffee"),
2860            number: 42,
2861            parent_hash: Bytes::from("0xbadbeef0"),
2862            revert: false,
2863            timestamp: 123456789,
2864            partial_block_index: None,
2865        });
2866
2867        // Create a channel for state_sync to send messages to
2868        let (mut block_tx, _block_rx) = channel(10);
2869
2870        // Call state_sync directly - this should error during processing
2871        let (_end_tx, end_rx) = oneshot::channel::<()>();
2872        let result = state_sync
2873            .state_sync(&mut block_tx, end_rx)
2874            .await;
2875        // Verify that state_sync returned an error
2876        assert!(result.is_err(), "state_sync should have errored during processing");
2877
2878        // Note: We can't verify internal state cleanup since state_sync consumes self,
2879        // but the cleanup logic is still tested by the fact that the method returns properly.
2880    }
2881
2882    #[test_log::test(tokio::test)]
2883    async fn test_close_signal_while_waiting_for_first_deltas() {
2884        // Tests close signal handling during the initial "waiting for deltas" phase.
2885        // This is the earliest possible close scenario - before any deltas are received.
2886        // Verifies: close signal received while waiting for first message triggers cleanup
2887        let mut rpc_client = make_mock_client();
2888        let mut deltas_client = MockDeltasClient::new();
2889
2890        rpc_client
2891            .expect_get_protocol_components()
2892            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2893
2894        let (_tx, rx) = channel(1);
2895        deltas_client
2896            .expect_subscribe()
2897            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2898
2899        deltas_client
2900            .expect_unsubscribe()
2901            .return_once(|_| Ok(()));
2902
2903        let mut state_sync = ProtocolStateSynchronizer::new(
2904            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2905            true,
2906            ComponentFilter::with_tvl_range(0.0, 0.0),
2907            1,
2908            Duration::from_secs(0),
2909            true,
2910            true,
2911            false,
2912            ArcRPCClient(Arc::new(rpc_client)),
2913            ArcDeltasClient(Arc::new(deltas_client)),
2914            10000_u64,
2915        );
2916
2917        state_sync
2918            .initialize()
2919            .await
2920            .expect("Init should succeed");
2921
2922        let (mut block_tx, _block_rx) = channel(10);
2923        let (end_tx, end_rx) = oneshot::channel::<()>();
2924
2925        // Start state_sync in a task
2926        let state_sync_handle = tokio::spawn(async move {
2927            state_sync
2928                .state_sync(&mut block_tx, end_rx)
2929                .await
2930        });
2931
2932        // Give it a moment to start
2933        tokio::time::sleep(Duration::from_millis(100)).await;
2934
2935        // Send close signal
2936        let _ = end_tx.send(());
2937
2938        // state_sync should exit cleanly
2939        let result = state_sync_handle
2940            .await
2941            .expect("Task should not panic");
2942        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2943
2944        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2945    }
2946
2947    #[test_log::test(tokio::test)]
2948    async fn test_close_signal_during_main_processing_loop() {
2949        // Tests close signal handling during the main delta processing loop.
2950        // This tests the scenario where first message is processed successfully,
2951        // then close signal is received while waiting for subsequent deltas.
2952        // Verifies: close signal in main loop (after initialization) triggers cleanup
2953
2954        let mut rpc_client = make_mock_client();
2955        let mut deltas_client = MockDeltasClient::new();
2956
2957        // Mock the initial components call
2958        rpc_client
2959            .expect_get_protocol_components()
2960            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2961
2962        // Mock the snapshot retrieval that happens after first message
2963        rpc_client
2964            .expect_get_protocol_states()
2965            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2966
2967        rpc_client
2968            .expect_get_component_tvl()
2969            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2970
2971        rpc_client
2972            .expect_get_traced_entry_points()
2973            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2974
2975        // Set up deltas client to send one message, then keep channel open
2976        let (tx, rx) = channel(10);
2977        deltas_client
2978            .expect_subscribe()
2979            .return_once(move |_, _| {
2980                // Send first message immediately
2981                let first_delta = BlockAggregatedChanges {
2982                    extractor: "test".to_string(),
2983                    chain: Chain::Ethereum,
2984                    block: Block {
2985                        hash: Bytes::from("0x0123"),
2986                        number: 1,
2987                        parent_hash: Bytes::from("0x0000"),
2988                        chain: Chain::Ethereum,
2989                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2990                            .unwrap()
2991                            .naive_utc(),
2992                    },
2993                    revert: false,
2994                    ..Default::default()
2995                };
2996
2997                tokio::spawn(async move {
2998                    let _ = tx.send(first_delta).await;
2999                    // Keep the sender alive but don't send more messages
3000                    // This will make the recv() block waiting for the next message
3001                    tokio::time::sleep(Duration::from_secs(30)).await;
3002                });
3003
3004                Ok((Uuid::default(), rx))
3005            });
3006
3007        deltas_client
3008            .expect_unsubscribe()
3009            .return_once(|_| Ok(()));
3010
3011        let mut state_sync = ProtocolStateSynchronizer::new(
3012            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3013            true,
3014            ComponentFilter::with_tvl_range(0.0, 1000.0),
3015            1,
3016            Duration::from_secs(0),
3017            true,
3018            false,
3019            true,
3020            ArcRPCClient(Arc::new(rpc_client)),
3021            ArcDeltasClient(Arc::new(deltas_client)),
3022            10000_u64,
3023        );
3024
3025        state_sync
3026            .initialize()
3027            .await
3028            .expect("Init should succeed");
3029
3030        let (mut block_tx, mut block_rx) = channel(10);
3031        let (end_tx, end_rx) = oneshot::channel::<()>();
3032
3033        // Start state_sync in a task
3034        let state_sync_handle = tokio::spawn(async move {
3035            state_sync
3036                .state_sync(&mut block_tx, end_rx)
3037                .await
3038        });
3039
3040        // Wait for the first message to be processed (snapshot sent)
3041        let first_snapshot = block_rx
3042            .recv()
3043            .await
3044            .expect("Should receive first snapshot")
3045            .expect("Synchronizer error");
3046        assert!(
3047            !first_snapshot
3048                .snapshots
3049                .states
3050                .is_empty() ||
3051                first_snapshot.deltas.is_some()
3052        );
3053        // Now send close signal - this should be handled in the main processing loop
3054        let _ = end_tx.send(());
3055
3056        // state_sync should exit cleanly after receiving close signal in main loop
3057        let result = state_sync_handle
3058            .await
3059            .expect("Task should not panic");
3060        assert!(
3061            result.is_ok(),
3062            "state_sync should exit cleanly when closed after first message: {result:?}"
3063        );
3064    }
3065
3066    #[test_log::test(tokio::test)]
3067    async fn test_max_retries_exceeded_error_propagation() {
3068        // Test that when max_retries is exceeded, the final error is sent through the channel
3069        // to the receiver and the synchronizer task exits cleanly
3070
3071        let mut rpc_client = make_mock_client();
3072        let mut deltas_client = MockDeltasClient::new();
3073
3074        // Mock the initial components call to succeed
3075        rpc_client
3076            .expect_get_protocol_components()
3077            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
3078
3079        // Set up deltas client to consistently fail after subscription
3080        // This will cause connection errors and trigger retries
3081        deltas_client
3082            .expect_subscribe()
3083            .returning(|_, _| {
3084                // Return a connection error to trigger retries
3085                Err(DeltasError::NotConnected)
3086            });
3087
3088        // Expect multiple unsubscribe calls during retries
3089        deltas_client
3090            .expect_unsubscribe()
3091            .returning(|_| Ok(()))
3092            .times(0..=5);
3093
3094        // Create synchronizer with only 2 retries and short cooldown
3095        let mut state_sync = ProtocolStateSynchronizer::new(
3096            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3097            true,
3098            ComponentFilter::with_tvl_range(0.0, 1000.0),
3099            2,                         // max_retries = 2
3100            Duration::from_millis(10), // short retry cooldown
3101            true,
3102            false,
3103            true,
3104            ArcRPCClient(Arc::new(rpc_client)),
3105            ArcDeltasClient(Arc::new(deltas_client)),
3106            1000_u64,
3107        );
3108
3109        state_sync
3110            .initialize()
3111            .await
3112            .expect("Init should succeed");
3113
3114        // Start the synchronizer - it should fail to subscribe and retry
3115        let (handle, mut rx) = state_sync.start().await;
3116        let (jh, _close_tx) = handle.split();
3117
3118        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3119            .await
3120            .expect("responsds in time")
3121            .expect("channel open");
3122
3123        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
3124        if let Err(err) = res {
3125            assert!(
3126                matches!(err, SynchronizerError::ConnectionClosed),
3127                "Expected ConnectionClosed error, got: {:?}",
3128                err
3129            );
3130        } else {
3131            panic!("Expected an error")
3132        }
3133
3134        // The task should complete (not hang) after max retries
3135        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
3136        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
3137    }
3138
3139    #[test_log::test(tokio::test)]
3140    async fn test_is_next_expected() {
3141        // Test the is_next_expected function to ensure it correctly identifies
3142        // when an incoming block is the expected next block in the chain
3143
3144        let mut state_sync = with_mocked_clients(true, false, None, None);
3145
3146        // Test 1: No previous block - should return false
3147        let incoming_header = BlockHeader {
3148            number: 100,
3149            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3150            parent_hash: Bytes::from(
3151                "0x0000000000000000000000000000000000000000000000000000000000000000",
3152            ),
3153            revert: false,
3154            timestamp: 123456789,
3155            partial_block_index: None,
3156        };
3157        assert!(
3158            !state_sync.is_next_expected(&incoming_header),
3159            "Should return false when no previous block is set"
3160        );
3161
3162        // Test 2: Set a previous block and test with matching parent hash
3163        let previous_header = BlockHeader {
3164            number: 99,
3165            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
3166            parent_hash: Bytes::from(
3167                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
3168            ),
3169            revert: false,
3170            timestamp: 123456788,
3171            partial_block_index: None,
3172        };
3173        state_sync.last_synced_block = Some(previous_header.clone());
3174
3175        assert!(
3176            state_sync.is_next_expected(&incoming_header),
3177            "Should return true when incoming parent_hash matches previous hash"
3178        );
3179
3180        // Test 3: Test with non-matching parent hash
3181        let non_matching_header = BlockHeader {
3182            number: 100,
3183            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3184            parent_hash: Bytes::from(
3185                "0x1111111111111111111111111111111111111111111111111111111111111111",
3186            ), // Wrong parent hash
3187            revert: false,
3188            timestamp: 123456789,
3189            partial_block_index: None,
3190        };
3191        assert!(
3192            !state_sync.is_next_expected(&non_matching_header),
3193            "Should return false when incoming parent_hash doesn't match previous hash"
3194        );
3195    }
3196
3197    #[test_log::test(tokio::test)]
3198    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
3199        // Test that on synchronizer restart with the next expected block,
3200        // get_snapshot is not called and only deltas are sent
3201
3202        let mut rpc_client = make_mock_client();
3203        let mut deltas_client = MockDeltasClient::new();
3204
3205        // Mock the initial components call
3206        rpc_client
3207            .expect_get_protocol_components()
3208            .returning(|_| {
3209                Ok(Page::new(
3210                    vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3211                    1,
3212                    0,
3213                    100,
3214                ))
3215            });
3216
3217        // Set up deltas client to send a message that is the next expected block
3218        let (tx, rx) = channel(10);
3219        deltas_client
3220            .expect_subscribe()
3221            .return_once(move |_, _| {
3222                let expected_next_delta = BlockAggregatedChanges {
3223                    extractor: "uniswap-v2".to_string(),
3224                    chain: Chain::Ethereum,
3225                    block: Block {
3226                        hash: Bytes::from(
3227                            "0x0000000000000000000000000000000000000000000000000000000000000002",
3228                        ), // This will be the next expected block
3229                        number: 2,
3230                        parent_hash: Bytes::from(
3231                            "0x0000000000000000000000000000000000000000000000000000000000000001",
3232                        ), // This matches our last synced block hash
3233                        chain: Chain::Ethereum,
3234                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
3235                            .unwrap()
3236                            .naive_utc(),
3237                    },
3238                    revert: false,
3239                    ..Default::default()
3240                };
3241
3242                tokio::spawn(async move {
3243                    let _ = tx.send(expected_next_delta).await;
3244                });
3245
3246                Ok((Uuid::default(), rx))
3247            });
3248
3249        deltas_client
3250            .expect_unsubscribe()
3251            .return_once(|_| Ok(()));
3252
3253        let mut state_sync = ProtocolStateSynchronizer::new(
3254            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3255            true,
3256            ComponentFilter::with_tvl_range(0.0, 1000.0),
3257            1,
3258            Duration::from_secs(0),
3259            true, // include_snapshots = true
3260            false,
3261            true,
3262            ArcRPCClient(Arc::new(rpc_client)),
3263            ArcDeltasClient(Arc::new(deltas_client)),
3264            10000_u64,
3265        );
3266
3267        // Initialize and set up the last synced block to simulate a restart scenario
3268        state_sync
3269            .initialize()
3270            .await
3271            .expect("Init should succeed");
3272
3273        // Set last_synced_block to simulate that we've previously synced block 1
3274        state_sync.last_synced_block = Some(BlockHeader {
3275            number: 1,
3276            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
3277            parent_hash: Bytes::from(
3278                "0x0000000000000000000000000000000000000000000000000000000000000000",
3279            ),
3280            revert: false,
3281            timestamp: 123456789,
3282            partial_block_index: None,
3283        });
3284
3285        let (mut block_tx, mut block_rx) = channel(10);
3286        let (end_tx, end_rx) = oneshot::channel::<()>();
3287
3288        // Start state_sync
3289        let state_sync_handle = tokio::spawn(async move {
3290            state_sync
3291                .state_sync(&mut block_tx, end_rx)
3292                .await
3293        });
3294
3295        // Wait for the message - it should be a delta-only message (no snapshots)
3296        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
3297            .await
3298            .expect("Should receive message within timeout")
3299            .expect("Channel should be open")
3300            .expect("Should not be an error");
3301
3302        // Send close signal
3303        let _ = end_tx.send(());
3304
3305        // Wait for state_sync to finish
3306        let _ = state_sync_handle
3307            .await
3308            .expect("Task should not panic");
3309
3310        // Verify the message contains deltas but no snapshots
3311        // (because we skipped snapshot retrieval)
3312        assert!(result_msg.deltas.is_some(), "Should contain deltas");
3313        assert!(
3314            result_msg.snapshots.states.is_empty(),
3315            "Should not contain snapshots when next expected block is received"
3316        );
3317
3318        // Verify the block details match our expected next block
3319        if let Some(deltas) = &result_msg.deltas {
3320            assert_eq!(deltas.block.number, 2);
3321            assert_eq!(
3322                deltas.block.hash,
3323                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
3324            );
3325            assert_eq!(
3326                deltas.block.parent_hash,
3327                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
3328            );
3329        }
3330    }
3331
3332    #[test_log::test(tokio::test)]
3333    async fn test_skip_previously_processed_messages() {
3334        // Test that the synchronizer skips messages for blocks that have already been processed
3335        // This simulates a service restart scenario where old messages are re-emitted
3336
3337        let mut rpc_client = make_mock_client();
3338        let mut deltas_client = MockDeltasClient::new();
3339
3340        // Mock the initial components call
3341        rpc_client
3342            .expect_get_protocol_components()
3343            .returning(|_| {
3344                Ok(Page::new(
3345                    vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3346                    1,
3347                    0,
3348                    100,
3349                ))
3350            });
3351
3352        // Mock snapshot calls for when we process the expected next block (block 6)
3353        rpc_client
3354            .expect_get_protocol_states()
3355            .returning(|_| {
3356                Ok(Page::new(
3357                    vec![ProtocolComponentState::new(
3358                        "Component1",
3359                        Default::default(),
3360                        Default::default(),
3361                    )],
3362                    1,
3363                    0,
3364                    100,
3365                ))
3366            });
3367
3368        rpc_client
3369            .expect_get_component_tvl()
3370            .returning(|_| {
3371                Ok(Page::new(
3372                    [("Component1".to_string(), 100.0)]
3373                        .into_iter()
3374                        .collect(),
3375                    1,
3376                    0,
3377                    100,
3378                ))
3379            });
3380
3381        rpc_client
3382            .expect_get_traced_entry_points()
3383            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3384
3385        // Set up deltas client to send old messages first, then the expected next block
3386        let (tx, rx) = channel(10);
3387        deltas_client
3388            .expect_subscribe()
3389            .return_once(move |_, _| {
3390                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
3391                let old_messages = vec![
3392                    BlockAggregatedChanges {
3393                        extractor: "uniswap-v2".to_string(),
3394                        chain: Chain::Ethereum,
3395                        block: Block {
3396                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3397                            number: 3,
3398                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
3399                            chain: Chain::Ethereum,
3400                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
3401                        },
3402                        revert: false,
3403                        ..Default::default()
3404                    },
3405                    BlockAggregatedChanges {
3406                        extractor: "uniswap-v2".to_string(),
3407                        chain: Chain::Ethereum,
3408                        block: Block {
3409                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3410                            number: 4,
3411                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3412                            chain: Chain::Ethereum,
3413                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
3414                        },
3415                        revert: false,
3416                        ..Default::default()
3417                    },
3418                    BlockAggregatedChanges {
3419                        extractor: "uniswap-v2".to_string(),
3420                        chain: Chain::Ethereum,
3421                        block: Block {
3422                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3423                            number: 5,
3424                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3425                            chain: Chain::Ethereum,
3426                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
3427                        },
3428                        revert: false,
3429                        ..Default::default()
3430                    },
3431                    // This is the expected next block (block 6)
3432                    BlockAggregatedChanges {
3433                        extractor: "uniswap-v2".to_string(),
3434                        chain: Chain::Ethereum,
3435                        block: Block {
3436                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
3437                            number: 6,
3438                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3439                            chain: Chain::Ethereum,
3440                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
3441                        },
3442                        revert: false,
3443                        ..Default::default()
3444                    },
3445                ];
3446
3447                tokio::spawn(async move {
3448                    for message in old_messages {
3449                        let _ = tx.send(message).await;
3450                        tokio::time::sleep(Duration::from_millis(10)).await;
3451                    }
3452                });
3453
3454                Ok((Uuid::default(), rx))
3455            });
3456
3457        deltas_client
3458            .expect_unsubscribe()
3459            .return_once(|_| Ok(()));
3460
3461        let mut state_sync = ProtocolStateSynchronizer::new(
3462            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3463            true,
3464            ComponentFilter::with_tvl_range(0.0, 1000.0),
3465            1,
3466            Duration::from_secs(0),
3467            true,
3468            true,
3469            true,
3470            ArcRPCClient(Arc::new(rpc_client)),
3471            ArcDeltasClient(Arc::new(deltas_client)),
3472            10000_u64,
3473        );
3474
3475        // Initialize and set last_synced_block to simulate we've already processed block 5
3476        state_sync
3477            .initialize()
3478            .await
3479            .expect("Init should succeed");
3480
3481        state_sync.last_synced_block = Some(BlockHeader {
3482            number: 5,
3483            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3484            parent_hash: Bytes::from(
3485                "0x0000000000000000000000000000000000000000000000000000000000000004",
3486            ),
3487            revert: false,
3488            timestamp: 1234567892,
3489            partial_block_index: None,
3490        });
3491
3492        let (mut block_tx, mut block_rx) = channel(10);
3493        let (end_tx, end_rx) = oneshot::channel::<()>();
3494
3495        // Start state_sync
3496        let state_sync_handle = tokio::spawn(async move {
3497            state_sync
3498                .state_sync(&mut block_tx, end_rx)
3499                .await
3500        });
3501
3502        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
3503        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
3504            .await
3505            .expect("Should receive message within timeout")
3506            .expect("Channel should be open")
3507            .expect("Should not be an error");
3508
3509        // Send close signal
3510        let _ = end_tx.send(());
3511
3512        // Wait for state_sync to finish
3513        let _ = state_sync_handle
3514            .await
3515            .expect("Task should not panic");
3516
3517        // Verify we only got the message for block 6 (the expected next block)
3518        assert!(result_msg.deltas.is_some(), "Should contain deltas");
3519        if let Some(deltas) = &result_msg.deltas {
3520            assert_eq!(
3521                deltas.block.number, 6,
3522                "Should only process block 6, skipping earlier blocks"
3523            );
3524            assert_eq!(
3525                deltas.block.hash,
3526                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
3527            );
3528        }
3529
3530        // Verify that no additional messages are received immediately
3531        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
3532        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3533            Err(_) => {
3534                // Timeout is expected - no more messages should come
3535            }
3536            Ok(Some(Err(_))) => {
3537                // Error received is also acceptable (connection closed)
3538            }
3539            Ok(Some(Ok(_))) => {
3540                panic!("Should not receive additional messages - old blocks should be skipped");
3541            }
3542            Ok(None) => {
3543                // Channel closed is also acceptable
3544            }
3545        }
3546    }
3547
3548    fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockAggregatedChanges {
3549        // Use vec to create Bytes from block number
3550        let hash = Bytes::from(vec![block_num as u8; 32]);
3551        let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
3552        BlockAggregatedChanges {
3553            extractor: "uniswap-v2".to_string(),
3554            chain: Chain::Ethereum,
3555            block: Block {
3556                number: block_num,
3557                hash,
3558                parent_hash,
3559                chain: Chain::Ethereum,
3560                ts: Default::default(),
3561            },
3562            revert: false,
3563            partial_block_index: partial_idx,
3564            ..Default::default()
3565        }
3566    }
3567
3568    /// Test that full block as first message in partial mode is accepted
3569    #[test_log::test(tokio::test)]
3570    async fn test_partial_mode_accepts_full_block_as_first_message() {
3571        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3572        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3573            .with_partial_blocks(true);
3574        state_sync
3575            .initialize()
3576            .await
3577            .expect("Init failed");
3578
3579        let (handle, mut block_rx) = state_sync.start().await;
3580        let (jh, close_tx) = handle.split();
3581
3582        // Send full block as first message - should be accepted
3583        tx.send(make_block_changes(1, None))
3584            .await
3585            .unwrap();
3586
3587        // Should receive the full block immediately
3588        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3589            .await
3590            .expect("Should receive message")
3591            .expect("Channel open")
3592            .expect("No error");
3593
3594        assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
3595        assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
3596
3597        let _ = close_tx.send(());
3598        jh.await.expect("Task should not panic");
3599    }
3600
3601    /// Test that block number increase is detected as new block
3602    #[test_log::test(tokio::test)]
3603    async fn test_partial_mode_detects_block_number_increase() {
3604        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3605        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3606            .with_partial_blocks(true);
3607        state_sync
3608            .initialize()
3609            .await
3610            .expect("Init failed");
3611
3612        let (handle, mut block_rx) = state_sync.start().await;
3613        let (jh, close_tx) = handle.split();
3614
3615        // Send partial messages for block 1 (will be skipped - waiting for new block)
3616        tx.send(make_block_changes(1, Some(0)))
3617            .await
3618            .unwrap();
3619        tx.send(make_block_changes(1, Some(3)))
3620            .await
3621            .unwrap();
3622
3623        // Verify no message received yet
3624        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3625            Err(_) => { /* Expected: timeout, no message yet */ }
3626            Ok(_) => panic!("Should not receive message while waiting for new block"),
3627        }
3628
3629        // Send partial for block 2 with HIGHER index (5 > 3) - should still be detected
3630        // because block number increased
3631        tx.send(make_block_changes(2, Some(5)))
3632            .await
3633            .unwrap();
3634
3635        // Should receive the message for block 2
3636        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3637            .await
3638            .expect("Should receive message")
3639            .expect("Channel open")
3640            .expect("No error");
3641
3642        assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3643        assert_eq!(msg.header.partial_block_index, Some(5));
3644
3645        let _ = close_tx.send(());
3646        jh.await.expect("Task should not panic");
3647    }
3648
3649    /// Test that partial mode skips new blocks that are already synced
3650    #[test_log::test(tokio::test)]
3651    async fn test_partial_mode_skips_already_synced_blocks() {
3652        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3653        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3654            .with_partial_blocks(true);
3655        state_sync
3656            .initialize()
3657            .await
3658            .expect("Init failed");
3659
3660        // Set last_synced_block to block 5 - we've already synced up to here
3661        state_sync.last_synced_block = Some(BlockHeader {
3662            number: 5,
3663            hash: Bytes::from("0x05"),
3664            parent_hash: Bytes::from("0x04"),
3665            revert: false,
3666            timestamp: 0,
3667            partial_block_index: None,
3668        });
3669
3670        let (handle, mut block_rx) = state_sync.start().await;
3671        let (jh, close_tx) = handle.split();
3672
3673        // Send partial for block 3 to establish baseline
3674        tx.send(make_block_changes(3, Some(2)))
3675            .await
3676            .unwrap();
3677
3678        // Send "new block" for block 4 (partial index decreased) - but block 4 < last_synced (5)
3679        tx.send(make_block_changes(4, Some(0)))
3680            .await
3681            .unwrap();
3682
3683        // Should be skipped because block 4 is already synced
3684        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3685            Err(_) => { /* Expected: skipped because already synced */ }
3686            Ok(_) => panic!("Should skip block 4 because it's already synced"),
3687        }
3688
3689        // Now send new block for block 6 (after last_synced)
3690        // First establish new partial index
3691        tx.send(make_block_changes(5, Some(3)))
3692            .await
3693            .unwrap();
3694        // Then trigger new block detection
3695        tx.send(make_block_changes(6, Some(0)))
3696            .await
3697            .unwrap();
3698
3699        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3700            .await
3701            .expect("Should receive message")
3702            .expect("Channel open")
3703            .expect("No error");
3704
3705        assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3706
3707        let _ = close_tx.send(());
3708        jh.await.expect("Task should not panic");
3709    }
3710
3711    #[test_log::test(tokio::test)]
3712    async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3713        let header = BlockHeader::default();
3714        let mut rpc = make_mock_client();
3715        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3716
3717        let component_clone = component.clone();
3718        rpc.expect_get_snapshots()
3719            .returning(move |_request, _chunk_size, _concurrency| {
3720                Ok(Snapshot {
3721                    states: [(
3722                        "Component1".to_string(),
3723                        ComponentWithState {
3724                            state: ProtocolComponentState::new(
3725                                "Component1",
3726                                Default::default(),
3727                                Default::default(),
3728                            ),
3729                            component: component_clone.clone(),
3730                            entrypoints: vec![],
3731                            component_tvl: None,
3732                        },
3733                    )]
3734                    .into_iter()
3735                    .collect(),
3736                    vm_storage: HashMap::new(),
3737                })
3738            });
3739
3740        // get_traced_entry_points should NOT be called for a non-DCI protocol
3741        rpc.expect_get_traced_entry_points()
3742            .never();
3743
3744        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3745        // uses_dci defaults to false, no .with_dci() call needed
3746        state_sync
3747            .component_tracker
3748            .components
3749            .insert("Component1".to_string(), component);
3750
3751        let components_arg = ["Component1".to_string()];
3752        let req_ids: Vec<String> = components_arg.to_vec();
3753        let components: HashMap<_, _> = state_sync
3754            .component_tracker
3755            .components
3756            .iter()
3757            .filter(|(id, _)| req_ids.contains(id))
3758            .map(|(k, v)| (k.clone(), v.clone()))
3759            .collect();
3760        let contract_ids: HashSet<Bytes> = state_sync
3761            .component_tracker
3762            .get_contracts_by_component(&req_ids)
3763            .into_iter()
3764            .collect();
3765        let params = FetchSnapshotParams {
3766            chain: Chain::Ethereum,
3767            protocol_system: "uniswap-v2".to_string(),
3768            block_number: header.number,
3769            uses_dci: false,
3770            retrieve_balances: true,
3771            include_tvl: false,
3772        };
3773        let (snapshot, _, _) =
3774            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
3775                .await
3776                .expect("Retrieving snapshot failed");
3777
3778        assert!(snapshot
3779            .states
3780            .contains_key("Component1"));
3781    }
3782
3783    #[test_log::test(tokio::test)]
3784    async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3785        let header = BlockHeader::default();
3786        let mut rpc = make_mock_client();
3787        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3788
3789        let component_clone = component.clone();
3790        rpc.expect_get_snapshots()
3791            .returning(move |_request, _chunk_size, _concurrency| {
3792                Ok(Snapshot {
3793                    states: [(
3794                        "Component1".to_string(),
3795                        ComponentWithState {
3796                            state: ProtocolComponentState::new(
3797                                "Component1",
3798                                Default::default(),
3799                                Default::default(),
3800                            ),
3801                            component: component_clone.clone(),
3802                            entrypoints: vec![],
3803                            component_tvl: None,
3804                        },
3805                    )]
3806                    .into_iter()
3807                    .collect(),
3808                    vm_storage: HashMap::new(),
3809                })
3810            });
3811
3812        // get_traced_entry_points SHOULD be called for a DCI protocol
3813        rpc.expect_get_traced_entry_points()
3814            .times(1)
3815            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3816
3817        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3818        state_sync
3819            .component_tracker
3820            .components
3821            .insert("Component1".to_string(), component);
3822
3823        let components_arg = ["Component1".to_string()];
3824        let req_ids: Vec<String> = components_arg.to_vec();
3825        let components: HashMap<_, _> = state_sync
3826            .component_tracker
3827            .components
3828            .iter()
3829            .filter(|(id, _)| req_ids.contains(id))
3830            .map(|(k, v)| (k.clone(), v.clone()))
3831            .collect();
3832        let contract_ids: HashSet<Bytes> = state_sync
3833            .component_tracker
3834            .get_contracts_by_component(&req_ids)
3835            .into_iter()
3836            .collect();
3837        let params = FetchSnapshotParams {
3838            chain: Chain::Ethereum,
3839            protocol_system: "uniswap-v2".to_string(),
3840            block_number: header.number,
3841            uses_dci: true,
3842            retrieve_balances: true,
3843            include_tvl: false,
3844        };
3845        let (snapshot, _, _) =
3846            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
3847                .await
3848                .expect("Retrieving snapshot failed");
3849
3850        assert!(snapshot
3851            .states
3852            .contains_key("Component1"));
3853    }
3854
3855    /// Test that in partial-blocks mode, new components are deferred until the block number
3856    /// increments (confirming the previous block is complete), then fired as a background task at
3857    /// the previous block's height. The snapshots appear in the first message of the block AFTER
3858    /// the one where the task was fired.
3859    ///
3860    /// Timeline:
3861    /// - Block 1 (full): initial sync
3862    /// - Block 2 partial: BrandNew + Preexisting added to deferred set (no task yet)
3863    /// - Block 3 partial: block number increments → task fired at snapshot_block=2; msg3 empty
3864    /// - Block 4 partial: task has completed → drain returns both snapshots in msg4
3865    #[test_log::test(tokio::test)]
3866    async fn test_partial_mode_defers_brand_new_component_snapshot_to_next_block() {
3867        use std::time::Duration;
3868
3869        use tokio::{sync::mpsc::channel, time::timeout};
3870
3871        let bg_done = Arc::new(tokio::sync::Notify::new());
3872        let mut rpc_client = make_mock_client();
3873        // get_protocol_components for BrandNew + Preexisting (background task fires at block 3)
3874        rpc_client
3875            .expect_get_protocol_components()
3876            .withf(|params: &crate::rpc::ProtocolComponentsParams| {
3877                params
3878                    .component_ids()
3879                    .is_some_and(|ids| ids.contains(&"BrandNew".to_string()))
3880            })
3881            .returning(|_| {
3882                Ok(Page::new(
3883                    vec![
3884                        ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3885                        ProtocolComponent { id: "Preexisting".to_string(), ..Default::default() },
3886                    ],
3887                    2,
3888                    0,
3889                    100,
3890                ))
3891            });
3892        // get_protocol_components for initial sync
3893        rpc_client
3894            .expect_get_protocol_components()
3895            .returning(|_| {
3896                Ok(Page::new(
3897                    vec![
3898                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
3899                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
3900                    ],
3901                    2,
3902                    0,
3903                    100,
3904                ))
3905            });
3906        // Background task fires when block 3 arrives: snapshot at block 2 (3 - 1).
3907        let bg_done_clone = bg_done.clone();
3908        rpc_client
3909            .expect_get_snapshots()
3910            .withf(
3911                |request: &SnapshotParameters,
3912                 _chunk_size: &Option<usize>,
3913                 _concurrency: &usize| {
3914                    request.block_number == 2 &&
3915                        (request
3916                            .components
3917                            .contains_key("BrandNew") ||
3918                            request
3919                                .components
3920                                .contains_key("Preexisting"))
3921                },
3922            )
3923            .returning(move |_request, _chunk_size, _concurrency| {
3924                let snap = Ok(Snapshot {
3925                    states: [
3926                        (
3927                            "BrandNew".to_string(),
3928                            ComponentWithState {
3929                                state: ProtocolComponentState::new(
3930                                    "BrandNew",
3931                                    Default::default(),
3932                                    Default::default(),
3933                                ),
3934                                component: ProtocolComponent {
3935                                    id: "BrandNew".to_string(),
3936                                    ..Default::default()
3937                                },
3938                                component_tvl: Some(100.0),
3939                                entrypoints: vec![],
3940                            },
3941                        ),
3942                        (
3943                            "Preexisting".to_string(),
3944                            ComponentWithState {
3945                                state: ProtocolComponentState::new(
3946                                    "Preexisting",
3947                                    Default::default(),
3948                                    Default::default(),
3949                                ),
3950                                component: ProtocolComponent {
3951                                    id: "Preexisting".to_string(),
3952                                    ..Default::default()
3953                                },
3954                                component_tvl: Some(75.0),
3955                                entrypoints: vec![],
3956                            },
3957                        ),
3958                    ]
3959                    .into_iter()
3960                    .collect(),
3961                    vm_storage: HashMap::new(),
3962                });
3963                bg_done_clone.notify_one();
3964                snap
3965            });
3966        // get_snapshots for initial sync (block 0, Component1+Component2)
3967        rpc_client
3968            .expect_get_snapshots()
3969            .returning(|_request, _chunk_size, _concurrency| {
3970                Ok(Snapshot {
3971                    states: [
3972                        (
3973                            "Component1".to_string(),
3974                            ComponentWithState {
3975                                state: ProtocolComponentState::new(
3976                                    "Component1",
3977                                    Default::default(),
3978                                    Default::default(),
3979                                ),
3980                                component: ProtocolComponent {
3981                                    id: "Component1".to_string(),
3982                                    ..Default::default()
3983                                },
3984                                component_tvl: Some(100.0),
3985                                entrypoints: vec![],
3986                            },
3987                        ),
3988                        (
3989                            "Component2".to_string(),
3990                            ComponentWithState {
3991                                state: ProtocolComponentState::new(
3992                                    "Component2",
3993                                    Default::default(),
3994                                    Default::default(),
3995                                ),
3996                                component: ProtocolComponent {
3997                                    id: "Component2".to_string(),
3998                                    ..Default::default()
3999                                },
4000                                component_tvl: Some(0.0),
4001                                entrypoints: vec![],
4002                            },
4003                        ),
4004                    ]
4005                    .into_iter()
4006                    .collect(),
4007                    vm_storage: HashMap::new(),
4008                })
4009            });
4010        rpc_client
4011            .expect_get_traced_entry_points()
4012            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
4013
4014        let mut deltas_client = MockDeltasClient::new();
4015        let (tx, rx) = channel(4);
4016        deltas_client
4017            .expect_subscribe()
4018            .return_once(move |_, _| Ok((Uuid::default(), rx)));
4019        deltas_client
4020            .expect_unsubscribe()
4021            .return_once(|_| Ok(()));
4022
4023        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
4024            .with_partial_blocks(true);
4025        state_sync
4026            .initialize()
4027            .await
4028            .expect("Init failed");
4029
4030        let (handle, mut block_rx) = state_sync.start().await;
4031        let (jh, close_tx) = handle.split();
4032
4033        // Block 1 (full): used for initial sync merge
4034        tx.send(make_block_changes(1, None))
4035            .await
4036            .unwrap();
4037        let _msg1 = timeout(Duration::from_millis(200), block_rx.recv())
4038            .await
4039            .expect("Should receive initial + block 1")
4040            .expect("Channel open")
4041            .expect("No error");
4042
4043        // Block 2 partial: BrandNew and Preexisting both appear. Neither task is fired yet —
4044        // both are added to deferred_snapshot_components.
4045        let mut block2 = make_block_changes(2, Some(2));
4046        block2.new_protocol_components = HashMap::from([(
4047            "BrandNew".to_string(),
4048            ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
4049        )]);
4050        block2.component_tvl =
4051            HashMap::from([("BrandNew".to_string(), 100.0), ("Preexisting".to_string(), 75.0)]);
4052        tx.send(block2).await.unwrap();
4053        let msg2 = timeout(Duration::from_millis(200), block_rx.recv())
4054            .await
4055            .expect("Should receive block 2")
4056            .expect("Channel open")
4057            .expect("No error");
4058
4059        assert!(
4060            !msg2
4061                .snapshots
4062                .states
4063                .contains_key("Preexisting"),
4064            "Preexisting should still be deferred in block 2, not yet snapshotted; got: {:?}",
4065            msg2.snapshots
4066                .states
4067                .keys()
4068                .collect::<Vec<_>>()
4069        );
4070        assert!(
4071            !msg2
4072                .snapshots
4073                .states
4074                .contains_key("BrandNew"),
4075            "BrandNew should still be deferred in block 2, not yet snapshotted"
4076        );
4077
4078        // Block 3 partial: block number increments → deferred components fire as background task
4079        // at snapshot_block=2. msg3 has no snapshots (task just spawned).
4080        tx.send(make_block_changes(3, Some(1)))
4081            .await
4082            .unwrap();
4083        let msg3 = timeout(Duration::from_millis(200), block_rx.recv())
4084            .await
4085            .expect("Should receive block 3")
4086            .expect("Channel open")
4087            .expect("No error");
4088
4089        assert_eq!(msg3.header.number, 3);
4090        assert_eq!(msg3.header.partial_block_index, Some(1));
4091        assert!(
4092            !msg3
4093                .snapshots
4094                .states
4095                .contains_key("BrandNew"),
4096            "BrandNew task just fired; snapshot not yet available in msg3"
4097        );
4098        assert!(
4099            !msg3
4100                .snapshots
4101                .states
4102                .contains_key("Preexisting"),
4103            "Preexisting task just fired; snapshot not yet available in msg3"
4104        );
4105
4106        // Wait for the background snapshot task to complete before the next block arrives.
4107        bg_done.notified().await;
4108
4109        // Block 4 partial: drain finds the completed task → both snapshots present in msg4.
4110        tx.send(make_block_changes(4, Some(0)))
4111            .await
4112            .unwrap();
4113        let msg4 = timeout(Duration::from_millis(200), block_rx.recv())
4114            .await
4115            .expect("Should receive block 4")
4116            .expect("Channel open")
4117            .expect("No error");
4118
4119        assert_eq!(msg4.header.number, 4);
4120        assert_eq!(msg4.header.partial_block_index, Some(0));
4121        assert!(
4122            msg4.snapshots
4123                .states
4124                .contains_key("BrandNew"),
4125            "BrandNew snapshot should be in msg4 after background task drains; got keys: {:?}",
4126            msg4.snapshots
4127                .states
4128                .keys()
4129                .collect::<Vec<_>>()
4130        );
4131        assert!(
4132            msg4.snapshots
4133                .states
4134                .contains_key("Preexisting"),
4135            "Preexisting snapshot should be in msg4 after background task drains; got keys: {:?}",
4136            msg4.snapshots
4137                .states
4138                .keys()
4139                .collect::<Vec<_>>()
4140        );
4141
4142        let _ = close_tx.send(());
4143        jh.await.expect("Task should not panic");
4144    }
4145
4146    /// Directly exercises all four mutation paths of `apply_deltas_to_snapshot`:
4147    /// attribute update, attribute deletion, balance merge, and VM slot/balance/code overwrite.
4148    /// Also verifies that deltas at or before `snapshot_block` are skipped.
4149    #[test]
4150    fn test_apply_deltas_to_snapshot() {
4151        use tycho_common::models::{
4152            contract::{Account, AccountDelta},
4153            protocol::{ComponentBalance, ProtocolComponentStateDelta},
4154            ChangeType,
4155        };
4156
4157        let contract_addr = Bytes::from("0xc0ffee");
4158        let token_addr = Bytes::from("0xdeadbeef");
4159
4160        // Build the snapshot at block 5: one component with attributes and balances,
4161        // one VM contract with slots and native balance.
4162        let mut snapshot = Snapshot {
4163            states: [(
4164                "comp1".to_string(),
4165                ComponentWithState {
4166                    state: ProtocolComponentState::new(
4167                        "comp1",
4168                        [
4169                            ("keep".to_string(), Bytes::from("0x01")),
4170                            ("delete_me".to_string(), Bytes::from("0x02")),
4171                        ]
4172                        .into_iter()
4173                        .collect(),
4174                        [(token_addr.clone(), Bytes::from("0x64"))]
4175                            .into_iter()
4176                            .collect(),
4177                    ),
4178                    component: ProtocolComponent::default(),
4179                    component_tvl: None,
4180                    entrypoints: vec![],
4181                },
4182            )]
4183            .into_iter()
4184            .collect(),
4185            vm_storage: [(
4186                contract_addr.clone(),
4187                Account {
4188                    chain: Chain::Ethereum,
4189                    address: contract_addr.clone(),
4190                    title: String::new(),
4191                    slots: [(Bytes::from("0x01"), Bytes::from("0xaa"))]
4192                        .into_iter()
4193                        .collect(),
4194                    native_balance: Bytes::from("0x10"),
4195                    token_balances: HashMap::new(),
4196                    code: Bytes::from("0x0a0b"),
4197                    code_hash: Default::default(),
4198                    balance_modify_tx: Default::default(),
4199                    code_modify_tx: Default::default(),
4200                    creation_tx: None,
4201                },
4202            )]
4203            .into_iter()
4204            .collect(),
4205        };
4206
4207        // Two buffered deltas: block 5 (at snapshot_block, must be skipped) and block 6
4208        // (after snapshot_block, must be applied).
4209        let skipped_delta = BlockAggregatedChanges {
4210            block: Block { number: 5, ..Default::default() },
4211            state_deltas: [(
4212                "comp1".to_string(),
4213                ProtocolComponentStateDelta::new(
4214                    "comp1",
4215                    [("keep".to_string(), Bytes::from("0xff"))]
4216                        .into_iter()
4217                        .collect(),
4218                    HashSet::new(),
4219                ),
4220            )]
4221            .into_iter()
4222            .collect(),
4223            ..Default::default()
4224        };
4225        let applied_delta = BlockAggregatedChanges {
4226            block: Block { number: 6, ..Default::default() },
4227            state_deltas: [(
4228                "comp1".to_string(),
4229                ProtocolComponentStateDelta::new(
4230                    "comp1",
4231                    [("keep".to_string(), Bytes::from("0x99"))]
4232                        .into_iter()
4233                        .collect(),
4234                    ["delete_me".to_string()]
4235                        .into_iter()
4236                        .collect(),
4237                ),
4238            )]
4239            .into_iter()
4240            .collect(),
4241            component_balances: [(
4242                "comp1".to_string(),
4243                [(
4244                    token_addr.clone(),
4245                    ComponentBalance::new(
4246                        token_addr.clone(),
4247                        Bytes::from("0xc8"),
4248                        200.0,
4249                        Default::default(),
4250                        "comp1",
4251                    ),
4252                )]
4253                .into_iter()
4254                .collect(),
4255            )]
4256            .into_iter()
4257            .collect(),
4258            account_deltas: [(
4259                contract_addr.clone(),
4260                AccountDelta::new(
4261                    Chain::Ethereum,
4262                    contract_addr.clone(),
4263                    [
4264                        (Bytes::from("0x01"), Some(Bytes::from("0xbb"))),
4265                        (Bytes::from("0x02"), Some(Bytes::from("0xcc"))),
4266                    ]
4267                    .into_iter()
4268                    .collect(),
4269                    Some(Bytes::from("0x20")),
4270                    Some(Bytes::from("0x0c0d")),
4271                    ChangeType::Update,
4272                ),
4273            )]
4274            .into_iter()
4275            .collect(),
4276            ..Default::default()
4277        };
4278
4279        let mut sync = with_mocked_clients(true, false, None, None);
4280        sync.buffered_deltas = vec![skipped_delta, applied_delta];
4281
4282        let contract_ids: HashSet<Bytes> = [contract_addr.clone()]
4283            .into_iter()
4284            .collect();
4285        sync.apply_deltas_to_snapshot(&mut snapshot, 5, &contract_ids);
4286
4287        let comp = &snapshot.states["comp1"].state;
4288
4289        // Attribute update applied
4290        assert_eq!(comp.attributes["keep"], Bytes::from("0x99"));
4291        // Attribute deletion applied
4292        assert!(!comp
4293            .attributes
4294            .contains_key("delete_me"));
4295        // Balance merge applied
4296        assert_eq!(comp.balances[&token_addr], Bytes::from("0xc8"));
4297
4298        let account = &snapshot.vm_storage[&contract_addr];
4299        // Existing slot overwritten, new slot added
4300        assert_eq!(account.slots[&Bytes::from("0x01")], Bytes::from("0xbb"));
4301        assert_eq!(account.slots[&Bytes::from("0x02")], Bytes::from("0xcc"));
4302        // Native balance updated
4303        assert_eq!(account.native_balance, Bytes::from("0x20"));
4304        // Code updated
4305        assert_eq!(account.code, Bytes::from("0x0c0d"));
4306    }
4307}