Skip to main content

tycho_client/feed/
synchronizer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    time::Duration,
4};
5
6use async_trait::async_trait;
7use thiserror::Error;
8use tokio::{
9    select,
10    sync::{
11        mpsc::{channel, error::SendError, Receiver, Sender},
12        oneshot,
13    },
14    task::JoinHandle,
15    time::{sleep, timeout},
16};
17use tracing::{debug, error, info, instrument, warn};
18use tycho_common::{
19    models::{
20        blockchain::{
21            BlockAggregatedChanges, DCIUpdate, EntryPointWithTracingParams, TracingResult,
22        },
23        contract::Account,
24        protocol::{ProtocolComponent, ProtocolComponentState},
25        Chain, ExtractorIdentity,
26    },
27    Bytes,
28};
29
30use crate::{
31    deltas::{DeltasClient, SubscriptionOptions},
32    feed::{
33        component_tracker::{ComponentFilter, ComponentTracker},
34        BlockHeader, HeaderLike,
35    },
36    rpc::{
37        RPCClient, RPCError, SnapshotParameters, TracedEntryPointsPaginatedParams,
38        RPC_CLIENT_CONCURRENCY,
39    },
40    DeltasError,
41};
42
43#[derive(Error, Debug)]
44pub enum SynchronizerError {
45    /// RPC client failures.
46    #[error("RPC error: {0}")]
47    RPCError(#[from] RPCError),
48
49    /// Issues with the main channel
50    #[error("{0}")]
51    ChannelError(String),
52
53    /// Timeout elapsed errors.
54    #[error("Timeout error: {0}")]
55    Timeout(String),
56
57    /// Failed to close the synchronizer.
58    #[error("Failed to close synchronizer: {0}")]
59    CloseError(String),
60
61    /// Server connection failures or interruptions.
62    #[error("Connection error: {0}")]
63    ConnectionError(String),
64
65    /// Connection closed
66    #[error("Connection closed")]
67    ConnectionClosed,
68
69    /// Internal error that should not happen under normal operation.
70    #[error("Internal error: {0}")]
71    Internal(String),
72}
73
74pub type SyncResult<T> = Result<T, SynchronizerError>;
75
76impl SynchronizerError {
77    /// Returns true if the error is transient and the failing operation can be retried.
78    ///
79    /// Transient: network/HTTP failures, rate limiting, server unavailability. These are
80    /// infrastructure problems that may resolve without any change to the request.
81    ///
82    /// Permanent: malformed data, fatal server errors, invalid requests. Retrying would produce
83    /// the same failure.
84    pub fn is_transient(&self) -> bool {
85        match self {
86            SynchronizerError::RPCError(e) => matches!(
87                e,
88                RPCError::HttpClient(_, _) |
89                    RPCError::RateLimited(_) |
90                    RPCError::ServerUnreachable(_) |
91                    RPCError::StaleBlock(_)
92            ),
93            SynchronizerError::Timeout(_) |
94            SynchronizerError::ConnectionError(_) |
95            SynchronizerError::ConnectionClosed => true,
96            _ => false,
97        }
98    }
99}
100
101impl<T> From<SendError<T>> for SynchronizerError {
102    fn from(err: SendError<T>) -> Self {
103        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
104    }
105}
106
107impl From<DeltasError> for SynchronizerError {
108    fn from(err: DeltasError) -> Self {
109        match err {
110            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
111            _ => SynchronizerError::ConnectionError(err.to_string()),
112        }
113    }
114}
115
116pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
117    extractor_id: ExtractorIdentity,
118    retrieve_balances: bool,
119    rpc_client: R,
120    deltas_client: D,
121    max_retries: u64,
122    retry_cooldown: Duration,
123    include_snapshots: bool,
124    component_tracker: ComponentTracker<R>,
125    last_synced_block: Option<BlockHeader>,
126    timeout: u64,
127    include_tvl: bool,
128    compression: bool,
129    partial_blocks: bool,
130    uses_dci: bool,
131    /// Background snapshot tasks spawned for new components. Each task may be in-flight or
132    /// finished; completed ones are harvested at the start of each delta iteration and their
133    /// results included in that block's message.
134    snapshot_tasks: Vec<SnapshotTask>,
135    /// Unfiltered deltas buffered while any snapshot task is in-flight, starting from the block
136    /// at which the oldest task was spawned. Applied to each snapshot at drain time to reconstruct
137    /// the component's current state.
138    buffered_deltas: Vec<BlockAggregatedChanges>,
139    /// State machine tracking components awaiting their initial snapshot. A component lives here
140    /// from the moment it's queued until its snapshot is successfully applied (at which point it
141    /// moves into `component_tracker.components`).
142    snapshot_queue: HashMap<String, SnapshotStatus>,
143}
144
145#[derive(Clone, PartialEq, Debug)]
146pub struct ComponentWithState {
147    pub state: ProtocolComponentState,
148    pub component: ProtocolComponent,
149    pub component_tvl: Option<f64>,
150    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
151}
152
153#[derive(Clone, PartialEq, Debug, Default)]
154pub struct Snapshot {
155    pub states: HashMap<String, ComponentWithState>,
156    pub vm_storage: HashMap<Bytes, Account>,
157}
158
159impl Snapshot {
160    fn extend(&mut self, other: Snapshot) {
161        self.states.extend(other.states);
162        self.vm_storage.extend(other.vm_storage);
163    }
164
165    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
166        &self.states
167    }
168
169    pub fn get_vm_storage(&self) -> &HashMap<Bytes, Account> {
170        &self.vm_storage
171    }
172}
173
174#[derive(Clone, PartialEq, Debug, Default)]
175pub struct StateSyncMessage<H>
176where
177    H: HeaderLike,
178{
179    /// The block information for this update.
180    pub header: H,
181    /// Snapshot for new components.
182    pub snapshots: Snapshot,
183    /// A single delta contains state updates for all tracked components, as well as additional
184    /// information about the system components e.g. newly added components (even below tvl), tvl
185    /// updates, balance updates.
186    pub deltas: Option<BlockAggregatedChanges>,
187    /// Components that stopped being tracked.
188    pub removed_components: HashMap<String, ProtocolComponent>,
189}
190
191impl<H> StateSyncMessage<H>
192where
193    H: HeaderLike,
194{
195    pub fn merge(mut self, other: Self) -> Self {
196        // be careful with removed and snapshots attributes here, these can be ambiguous.
197        self.removed_components
198            .retain(|k, _| !other.snapshots.states.contains_key(k));
199        self.snapshots
200            .states
201            .retain(|k, _| !other.removed_components.contains_key(k));
202
203        self.snapshots.extend(other.snapshots);
204        let deltas = match (self.deltas, other.deltas) {
205            (Some(l), Some(r)) => Some(l.merge(r)),
206            (None, Some(r)) => Some(r),
207            (Some(l), None) => Some(l),
208            (None, None) => None,
209        };
210        self.removed_components
211            .extend(other.removed_components);
212        Self {
213            header: other.header,
214            snapshots: self.snapshots,
215            deltas,
216            removed_components: self.removed_components,
217        }
218    }
219}
220
221/// Tracks the lifecycle of a component's initial snapshot request.
222#[derive(Debug, Clone, PartialEq)]
223enum SnapshotStatus {
224    /// Waiting for the next block boundary before fetching. Only used with partial blocks.
225    Deferred,
226    /// A background snapshot task has been spawned and is in-flight.
227    InFlight,
228    /// The last fetch attempt failed transiently; will be re-queued on the next delta.
229    RetryNext,
230    /// The fetch failed permanently; component is excluded until the synchronizer restarts.
231    Blacklisted,
232}
233
234struct SnapshotFetchResult {
235    components: HashMap<String, ProtocolComponent>,
236    contract_ids: HashSet<Bytes>,
237    dci_update: DCIUpdate,
238    snapshot: Snapshot,
239    snapshot_block: u64,
240}
241
242struct SnapshotTask {
243    component_ids: Vec<String>,
244    snapshot_block: u64,
245    receiver: oneshot::Receiver<Result<SnapshotFetchResult, SynchronizerError>>,
246}
247
248/// Handle for controlling a running synchronizer task.
249///
250/// This handle provides methods to gracefully shut down the synchronizer
251/// and await its completion with a timeout.
252pub struct SynchronizerTaskHandle {
253    join_handle: JoinHandle<()>,
254    close_tx: oneshot::Sender<()>,
255}
256
257/// StateSynchronizer
258///
259/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
260/// delivering messages to the client that let him reconstruct subsets of the protocol state.
261///
262/// This involves deciding which components to track according to the clients preferences,
263/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
264/// delivering delta messages for the components that have changed.
265impl SynchronizerTaskHandle {
266    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
267        Self { join_handle, close_tx }
268    }
269
270    /// Splits the handle into its join handle and close sender.
271    ///
272    /// This allows monitoring the task completion separately from controlling shutdown.
273    /// The join handle can be used with FuturesUnordered for monitoring, while the
274    /// close sender can be used to signal graceful shutdown.
275    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
276        (self.join_handle, self.close_tx)
277    }
278}
279
280#[async_trait]
281pub trait StateSynchronizer: Send + Sync + 'static {
282    async fn initialize(&mut self) -> SyncResult<()>;
283    /// Starts the state synchronization, consuming the synchronizer.
284    /// Returns a handle for controlling the running task and a receiver for messages.
285    async fn start(
286        mut self,
287    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
288}
289
290struct FetchSnapshotParams {
291    chain: Chain,
292    protocol_system: String,
293    block_number: u64,
294    uses_dci: bool,
295    retrieve_balances: bool,
296    include_tvl: bool,
297}
298
299/// Fetches a snapshot for given components. If DCI is enabled, also traces entry
300/// points and extends `contract_ids` with any contracts they access.
301///
302/// Returns the snapshot, the DCI update, and the complete set of contract IDs (original +
303/// DCI-discovered).
304async fn fetch_snapshot<R: RPCClient>(
305    rpc_client: &R,
306    components: HashMap<String, ProtocolComponent>,
307    mut contract_ids: HashSet<Bytes>,
308    params: &FetchSnapshotParams,
309) -> Result<(Snapshot, DCIUpdate, HashSet<Bytes>), SynchronizerError> {
310    if components.is_empty() {
311        return Ok((Snapshot::default(), DCIUpdate::default(), contract_ids));
312    }
313
314    let component_ids: Vec<String> = components.keys().cloned().collect();
315
316    let (dci_update, entrypoints_result) = if params.uses_dci {
317        let result = rpc_client
318            .get_traced_entry_points_paginated(TracedEntryPointsPaginatedParams::new(
319                params.chain,
320                &params.protocol_system,
321                component_ids.clone(),
322                RPC_CLIENT_CONCURRENCY,
323            ))
324            .await?;
325        let dci_contracts: HashSet<Bytes> = result
326            .values()
327            .flat_map(|traces| {
328                traces
329                    .iter()
330                    .flat_map(|(_, tr)| tr.accessed_slots.keys().cloned())
331            })
332            .collect();
333        contract_ids.extend(dci_contracts);
334        let eps = result.clone();
335        let dci: DCIUpdate = result.into();
336        (dci, eps)
337    } else {
338        (DCIUpdate::default(), HashMap::new())
339    };
340
341    let contract_ids_vec: Vec<Bytes> = contract_ids.iter().cloned().collect();
342    let request = SnapshotParameters::new(
343        params.chain,
344        &params.protocol_system,
345        &components,
346        &contract_ids_vec,
347        params.block_number,
348    )
349    .entrypoints(&entrypoints_result)
350    .include_balances(params.retrieve_balances)
351    .include_tvl(params.include_tvl);
352
353    let snapshot = rpc_client
354        .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
355        .await?;
356
357    Ok((snapshot, dci_update, contract_ids))
358}
359
360/// Fetches a snapshot for new components not yet in the tracker. Calls `get_protocol_components`
361/// to resolve the components, then delegates to `fetch_snapshot`.
362async fn fetch_snapshot_background<R: RPCClient>(
363    rpc_client: R,
364    component_ids: Vec<String>,
365    params: FetchSnapshotParams,
366) -> Result<SnapshotFetchResult, SynchronizerError> {
367    if component_ids.is_empty() {
368        return Ok(SnapshotFetchResult {
369            components: HashMap::new(),
370            contract_ids: HashSet::new(),
371            dci_update: DCIUpdate::default(),
372            snapshot: Snapshot::default(),
373            snapshot_block: params.block_number,
374        });
375    }
376
377    let request = crate::rpc::ProtocolComponentsParams::new(params.chain, &params.protocol_system)
378        .with_component_ids(component_ids);
379    let components: HashMap<String, ProtocolComponent> = rpc_client
380        .get_protocol_components(request)
381        .await?
382        .into_data()
383        .into_iter()
384        .map(|pc| (pc.id.clone(), pc))
385        .collect();
386
387    let contract_ids: HashSet<Bytes> = components
388        .values()
389        .flat_map(|c| c.contract_addresses.iter().cloned())
390        .collect();
391
392    let snapshot_block = params.block_number;
393    let (snapshot, dci_update, contract_ids) =
394        fetch_snapshot(&rpc_client, components.clone(), contract_ids, &params).await?;
395
396    Ok(SnapshotFetchResult { components, contract_ids, dci_update, snapshot, snapshot_block })
397}
398
399impl<R, D> ProtocolStateSynchronizer<R, D>
400where
401    // TODO: Consider moving these constraints directly to the
402    // client...
403    R: RPCClient + Clone + Send + Sync + 'static,
404    D: DeltasClient + Clone + Send + Sync + 'static,
405{
406    /// Creates a new state synchronizer.
407    #[allow(clippy::too_many_arguments)]
408    pub fn new(
409        extractor_id: ExtractorIdentity,
410        retrieve_balances: bool,
411        component_filter: ComponentFilter,
412        max_retries: u64,
413        retry_cooldown: Duration,
414        include_snapshots: bool,
415        include_tvl: bool,
416        compression: bool,
417        rpc_client: R,
418        deltas_client: D,
419        timeout: u64,
420    ) -> Self {
421        Self {
422            extractor_id: extractor_id.clone(),
423            retrieve_balances,
424            rpc_client: rpc_client.clone(),
425            include_snapshots,
426            deltas_client,
427            component_tracker: ComponentTracker::new(
428                extractor_id.chain,
429                extractor_id.name.as_str(),
430                component_filter,
431                rpc_client,
432            ),
433            max_retries,
434            retry_cooldown,
435            last_synced_block: None,
436            timeout,
437            include_tvl,
438            compression,
439            partial_blocks: false,
440            uses_dci: false,
441            snapshot_tasks: Vec::new(),
442            buffered_deltas: Vec::new(),
443            snapshot_queue: HashMap::new(),
444        }
445    }
446
447    /// Sets whether this protocol uses Dynamic Contract Indexing (DCI).
448    /// When true, entrypoints will be fetched during snapshot retrieval.
449    pub fn with_dci(mut self, uses_dci: bool) -> Self {
450        self.uses_dci = uses_dci;
451        self
452    }
453
454    /// Enables receiving partial block updates.
455    pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
456        self.partial_blocks = partial_blocks;
457        self
458    }
459
460    /// Main method that does all the work.
461    ///
462    /// ## Return Value
463    ///
464    /// Returns a `Result` where:
465    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
466    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
467    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
468    ///   retry)
469    ///
470    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
471    /// signal may still arrive and we want to remain cancellable across retries.
472    #[allow(clippy::result_large_err)]
473    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
474    async fn state_sync(
475        &mut self,
476        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
477        mut end_rx: oneshot::Receiver<()>,
478    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
479        // initialisation
480        let subscription_options = SubscriptionOptions::new()
481            .with_state(self.include_snapshots)
482            .with_compression(self.compression)
483            .with_partial_blocks(self.partial_blocks);
484        let (subscription_id, mut msg_rx) = match self
485            .deltas_client
486            .subscribe(self.extractor_id.clone(), subscription_options)
487            .await
488        {
489            Ok(result) => result,
490            Err(e) => return Err((e.into(), Some(end_rx))),
491        };
492
493        let result = async {
494            info!("Waiting for deltas...");
495            // Track the last seen block number such that we know when we get the first partial
496            let mut last_block_number: Option<u64> = None;
497
498            // Outer loop: find a suitable first block and fetch its snapshot. Retries within the
499            // same subscription when the snapshot endpoint rejects the block as too old — this
500            // happens after a server restart whose persisted state is outside the plan retention
501            // window. Consuming the stale delta and waiting for the next one lets the server catch
502            // up without tearing down the WS subscription and rebuilding state from scratch.
503            const MAX_STALE_RETRIES: u32 = 5;
504            let mut stale_retries: u32 = 0;
505            let (msg, header) = 'init: loop {
506                let mut warned_waiting_for_new_block = false;
507                let mut warned_skipping_synced = false;
508                let mut first_msg = loop {
509                    let msg = select! {
510                        deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
511                            deltas_result
512                                .map_err(|_| {
513                                    SynchronizerError::Timeout(format!(
514                                        "First deltas took longer than {t}s to arrive",
515                                        t = self.timeout
516                                    ))
517                                })?
518                                .ok_or_else(|| {
519                                    SynchronizerError::ConnectionError(
520                                        "Deltas channel closed before first message".to_string(),
521                                    )
522                                })?
523                        },
524                        _ = &mut end_rx => {
525                            info!("Received close signal while waiting for first deltas");
526                            return Ok(());
527                        }
528                    };
529
530                    let incoming: BlockHeader = (&msg).into();
531
532                    // Determine if this message is a candidate for starting synchronization.
533                    // In partial mode, we wait for a new block to start (block number increases).
534                    // In non-partial mode, all messages are candidates.
535                    let is_new_block_candidate = if self.partial_blocks {
536                        match msg.partial_block_index {
537                            None => {
538                                // If we get a full block, it is a candidate
539                                last_block_number = Some(incoming.number);
540                                true
541                            }
542                            Some(current_partial_idx) => {
543                                let is_new_block = last_block_number
544                                    .map(|prev_block| incoming.number > prev_block)
545                                    .unwrap_or(false);
546
547                                if !warned_waiting_for_new_block {
548                                    info!(
549                                        extractor=%self.extractor_id,
550                                        block=incoming.number,
551                                        partial_idx=current_partial_idx,
552                                        "Syncing. Waiting for new block to start"
553                                    );
554                                    warned_waiting_for_new_block = true;
555                                }
556                                last_block_number = Some(incoming.number);
557                                is_new_block
558                            }
559                        }
560                    } else {
561                        true // Non-partial mode: all messages are candidates
562                    };
563
564                    if !is_new_block_candidate {
565                        continue;
566                    }
567
568                    // Check if we've already synced this block (applies to both modes)
569                    if let Some(current) = &self.last_synced_block {
570                        if current.number >= incoming.number && !self.is_next_expected(&incoming) {
571                            if !warned_skipping_synced {
572                                info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
573                                warned_skipping_synced = true;
574                            }
575                            continue;
576                        }
577                    }
578                    break msg;
579                };
580
581                self.filter_deltas(&mut first_msg);
582
583                // initial snapshot
584                info!(height = first_msg.get_block().number, "First deltas received");
585                let header: BlockHeader = (&first_msg).into();
586                let deltas_msg = StateSyncMessage {
587                    header: header.clone(),
588                    snapshots: Default::default(),
589                    deltas: Some(first_msg),
590                    removed_components: Default::default(),
591                };
592
593                // If possible skip retrieving snapshots
594                if !self.is_next_expected(&header) {
595                    info!("Retrieving snapshot");
596                    // With partial blocks, the server only has full blocks in its buffer; pass the
597                    // previous block's header so we request state at N-1, then merge with deltas.
598                    let snapshot_header = if self.partial_blocks && header.number > 0 {
599                        BlockHeader {
600                            number: header.number - 1,
601                            hash: header.parent_hash.clone(),
602                            ..Default::default()
603                        }
604                    } else {
605                        BlockHeader { revert: false, ..header.clone() }
606                    };
607                    let component_ids =
608                        self.component_tracker.get_tracked_component_ids();
609                    let init_snapshot = if !self.include_snapshots ||
610                        component_ids.is_empty()
611                    {
612                        Snapshot::default()
613                    } else {
614                        // Fetch initial snapshots
615                        let components: HashMap<_, _> = self
616                            .component_tracker
617                            .components
618                            .iter()
619                            .filter(|(id, _)| component_ids.contains(id))
620                            .map(|(k, v)| (k.clone(), v.clone()))
621                            .collect();
622                        let contract_ids: HashSet<Bytes> = self
623                            .component_tracker
624                            .get_contracts_by_component(&component_ids)
625                            .into_iter()
626                            .collect();
627                        let fetch_params = FetchSnapshotParams {
628                            chain: self.extractor_id.chain,
629                            protocol_system: self.extractor_id.name.clone(),
630                            block_number: snapshot_header.number,
631                            uses_dci: self.uses_dci,
632                            retrieve_balances: self.retrieve_balances,
633                            include_tvl: self.include_tvl,
634                        };
635                        match fetch_snapshot(
636                            &self.rpc_client,
637                            components,
638                            contract_ids,
639                            &fetch_params,
640                        )
641                        .await
642                        {
643                            Ok((snap, dci_update, _)) => {
644                                self.component_tracker
645                                    .process_entrypoints(&dci_update);
646                                snap
647                            }
648                            Err(SynchronizerError::RPCError(
649                                crate::rpc::RPCError::StaleBlock(reason),
650                            )) => {
651                                stale_retries += 1;
652                                if stale_retries > MAX_STALE_RETRIES {
653                                    return Err(SynchronizerError::RPCError(
654                                        crate::rpc::RPCError::StaleBlock(reason),
655                                    ));
656                                }
657                                // The server's persisted state for this block is outside
658                                // the plan retention window. Discard this delta and wait
659                                // for a fresher block from the same subscription rather
660                                // than restarting from scratch.
661                                warn!(
662                                    block = header.number,
663                                    stale_retries,
664                                    %reason,
665                                    "Snapshot block is outside server retention \
666                                     window; waiting for a more recent block"
667                                );
668                                continue 'init;
669                            }
670                            Err(e) => return Err(e),
671                        }
672                    };
673                    let n_components = self.component_tracker.components.len();
674                    let n_snapshots = init_snapshot.states.len();
675                    info!(
676                        n_components,
677                        n_snapshots,
678                        "Initial snapshot retrieved, starting delta message feed"
679                    );
680                    let snapshot_msg = StateSyncMessage {
681                        header: snapshot_header,
682                        snapshots: init_snapshot,
683                        deltas: None,
684                        removed_components: HashMap::new(),
685                    };
686                    break 'init (snapshot_msg.merge(deltas_msg), header);
687                } else {
688                    break 'init (deltas_msg, header);
689                }
690            };
691
692            block_tx.send(Ok(msg)).await?;
693            self.last_synced_block = Some(header);
694            loop {
695                select! {
696                    deltas_opt = msg_rx.recv() => {
697                        if let Some(mut deltas) = deltas_opt {
698                            let header: BlockHeader = (&deltas).into();
699                            debug!(block_number=?header.number, "Received delta message");
700
701                            // Buffer unfiltered delta while any snapshot task is in-flight.
702                            if !self.snapshot_tasks.is_empty() {
703                                self.buffered_deltas.push(deltas.clone());
704                            }
705
706                            let background_snapshots = self.drain_completed_snapshots();
707
708                            // Trim buffered_deltas: discard blocks no longer needed by any pending task.
709                            if self.snapshot_tasks.is_empty() {
710                                self.buffered_deltas.clear();
711                            } else {
712                                let oldest_pending_block = self
713                                    .snapshot_tasks
714                                    .iter()
715                                    .map(|p| p.snapshot_block)
716                                    .min()
717                                    .unwrap_or(u64::MAX);
718                                self.buffered_deltas
719                                    .retain(|d| d.block.number > oldest_pending_block);
720                            }
721
722                            let (snapshots, removed_components) = {
723                                let (to_add, to_remove) =
724                                    self.component_tracker.filter_updated_components(&deltas);
725
726                                // Harvest transient retries now so they feed into truly_new.
727                                // TVL changes are not re-emitted, so without explicit re-queuing
728                                // a transiently failed component would never be retried.
729                                // Remove from the map first so the truly_new filter below treats
730                                // them the same as brand-new components.
731                                let retry_ids: Vec<String> = self
732                                    .snapshot_queue
733                                    .iter()
734                                    .filter(|(_, s)| matches!(s, SnapshotStatus::RetryNext))
735                                    .map(|(id, _)| id.clone())
736                                    .collect();
737                                for id in &retry_ids {
738                                    self.snapshot_queue.remove(id);
739                                }
740
741                                // Components not yet tracked and not in the staged state machine
742                                // (not in-flight, not deferred, not blacklisted). Merges
743                                // delta-triggered new components with transient retries; `seen`
744                                // deduplicates the two sources.
745                                let truly_new: Vec<String> = {
746                                    let mut seen = HashSet::new();
747                                    to_add
748                                        .iter()
749                                        .chain(retry_ids.iter())
750                                        .filter(|id| {
751                                            !self.component_tracker
752                                                .components
753                                                .contains_key(id.as_str())
754                                                && !self
755                                                    .snapshot_queue
756                                                    .contains_key(id.as_str())
757                                                && seen.insert(id.as_str())
758                                        })
759                                        .cloned()
760                                        .collect()
761                                };
762
763                                if self.partial_blocks {
764                                    let is_new_block = self
765                                        .last_synced_block
766                                        .as_ref()
767                                        .map(|b| header.number > b.number)
768                                        .unwrap_or(true);
769
770                                    let has_deferred = self
771                                        .snapshot_queue
772                                        .values()
773                                        .any(|s| matches!(s, SnapshotStatus::Deferred));
774                                    if is_new_block && has_deferred && header.number > 0 {
775                                        // Block number incremented: the previous block is
776                                        // complete. Fire deferred components at that block's
777                                        // height.
778                                        let to_fire: Vec<String> = self
779                                            .snapshot_queue
780                                            .iter()
781                                            .filter(|(_, s)| matches!(s, SnapshotStatus::Deferred))
782                                            .map(|(id, _)| id.clone())
783                                            .collect();
784                                        for id in &to_fire {
785                                            self.snapshot_queue.remove(id);
786                                        }
787                                        let snapshot_header = BlockHeader {
788                                            number: header.number - 1,
789                                            hash: header.parent_hash.clone(),
790                                            ..Default::default()
791                                        };
792                                        debug!(
793                                            components = ?to_fire,
794                                            extractor = %self.extractor_id.name,
795                                            snapshot_block = header.number - 1,
796                                            "snapshot_deferred_to_background"
797                                        );
798                                        self.spawn_snapshot_task(
799                                            to_fire,
800                                            snapshot_header,
801                                            &deltas,
802                                        );
803                                    }
804
805                                    // Accumulate truly_new into the deferred set for the current
806                                    // block; they will be fired when the next block arrives.
807                                    for id in truly_new {
808                                        self.snapshot_queue
809                                            .insert(id, SnapshotStatus::Deferred);
810                                    }
811                                } else if !truly_new.is_empty() {
812                                    debug!(
813                                        components = ?truly_new,
814                                        extractor = %self.extractor_id.name,
815                                        block_number = ?header.number,
816                                        "snapshot_deferred_to_background"
817                                    );
818                                    let snapshot_header =
819                                        BlockHeader { revert: false, ..header.clone() };
820                                    self.spawn_snapshot_task(truly_new, snapshot_header, &deltas);
821                                }
822
823                                let snapshots = background_snapshots;
824
825                                let removed_components = if !to_remove.is_empty() {
826                                    self.component_tracker.stop_tracking(&to_remove)
827                                } else {
828                                    Default::default()
829                                };
830
831                                (snapshots, removed_components)
832                            };
833
834                            // Update entrypoints on the tracker (affects which contracts are tracked for DCI).
835                            self.component_tracker.process_entrypoints(&deltas.dci_update);
836
837                            // Filter deltas by currently tracked components / contracts.
838                            self.filter_deltas(&mut deltas);
839                            let n_changes = deltas.n_changes();
840
841                            let next = StateSyncMessage {
842                                header: header.clone(),
843                                snapshots,
844                                deltas: Some(deltas),
845                                removed_components,
846                            };
847                            block_tx.send(Ok(next)).await?;
848                            self.last_synced_block = Some(header.clone());
849
850                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
851                        } else {
852                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
853                        }
854                    },
855                    _ = &mut end_rx => {
856                        info!("Received close signal during state_sync");
857                        return Ok(());
858                    }
859                }
860            }
861        }.await;
862
863        // This cleanup code now runs regardless of how the function exits (error or channel close)
864        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
865        //Ignore error
866        let _ = self
867            .deltas_client
868            .unsubscribe(subscription_id)
869            .await
870            .map_err(|err| {
871                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
872            });
873
874        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
875        // whether the end_rx was consumed (close signal received) or not
876        match result {
877            Ok(()) => Ok(()), // Success, likely due to close signal
878            Err(e) => {
879                // The error came from the inner async block. Since the async block
880                // can receive close signals (which would return Ok), any error means
881                // the close signal was NOT received, so we can return the end_rx for retry
882                Err((e, Some(end_rx)))
883            }
884        }
885    }
886
887    /// Applies `self.buffered_deltas` to `snapshot`, updating attributes, balances, and contract
888    /// storage for deltas strictly after `snapshot_block`.
889    fn apply_deltas_to_snapshot(
890        &self,
891        snapshot: &mut Snapshot,
892        snapshot_block: u64,
893        contract_ids: &HashSet<Bytes>,
894    ) {
895        for delta in &self.buffered_deltas {
896            if delta.block.number <= snapshot_block {
897                continue;
898            }
899            for (component_id, state_delta) in &delta.state_deltas {
900                if let Some(cws) = snapshot.states.get_mut(component_id) {
901                    cws.state.attributes.extend(
902                        state_delta
903                            .updated_attributes
904                            .iter()
905                            .map(|(k, v)| (k.clone(), v.clone())),
906                    );
907                    for key in &state_delta.deleted_attributes {
908                        cws.state.attributes.remove(key);
909                    }
910                }
911            }
912            for (component_id, token_balances) in &delta.component_balances {
913                if let Some(cws) = snapshot.states.get_mut(component_id) {
914                    for (token, bal) in token_balances {
915                        cws.state
916                            .balances
917                            .insert(token.clone(), bal.balance.clone());
918                    }
919                }
920            }
921            for (address, account_delta) in &delta.account_deltas {
922                if contract_ids.contains(address) {
923                    if let Some(account) = snapshot.vm_storage.get_mut(address) {
924                        account.slots.extend(
925                            account_delta
926                                .slots
927                                .iter()
928                                .filter_map(|(k, v)| {
929                                    v.as_ref()
930                                        .map(|v| (k.clone(), v.clone()))
931                                }),
932                        );
933                        if let Some(balance) = &account_delta.balance {
934                            account.native_balance = balance.clone();
935                        }
936                        if let Some(code) = account_delta.code() {
937                            account.code = code.clone();
938                        }
939                    }
940                }
941            }
942        }
943    }
944
945    /// Spawns a background snapshot task for `component_ids` at `snapshot_header`. If no other
946    /// task is already in-flight, starts buffering deltas from `current_delta` so the snapshot
947    /// can be brought up to date when the task drains.
948    fn spawn_snapshot_task(
949        &mut self,
950        component_ids: Vec<String>,
951        snapshot_header: BlockHeader,
952        current_delta: &BlockAggregatedChanges,
953    ) {
954        let snapshot_block = snapshot_header.number;
955
956        if self.snapshot_tasks.is_empty() {
957            self.buffered_deltas
958                .push(current_delta.clone());
959        }
960
961        let (tx, rx) = oneshot::channel();
962        let rpc = self.rpc_client.clone();
963        let bg_params = FetchSnapshotParams {
964            chain: self.extractor_id.chain,
965            protocol_system: self.extractor_id.name.clone(),
966            block_number: snapshot_block,
967            uses_dci: self.uses_dci,
968            retrieve_balances: self.retrieve_balances,
969            include_tvl: self.include_tvl,
970        };
971        let ids = component_ids.clone();
972        tokio::spawn(async move {
973            let _ = tx.send(fetch_snapshot_background(rpc, ids, bg_params).await);
974        });
975        for id in &component_ids {
976            self.snapshot_queue
977                .insert(id.clone(), SnapshotStatus::InFlight);
978        }
979        self.snapshot_tasks
980            .push(SnapshotTask { component_ids, snapshot_block, receiver: rx });
981    }
982
983    /// Drains any background snapshot tasks that have completed. Returns a `Snapshot` containing
984    /// all ready results, with buffered deltas applied to bring each snapshot up to date.
985    fn drain_completed_snapshots(&mut self) -> Snapshot {
986        let mut result = Snapshot::default();
987        let pending = std::mem::take(&mut self.snapshot_tasks);
988
989        for mut p in pending {
990            match p.receiver.try_recv() {
991                Ok(Ok(fetch_result)) => {
992                    debug!(
993                        components = ?p.component_ids,
994                        extractor = %self.extractor_id.name,
995                        "snapshot_background_ready"
996                    );
997                    for id in &p.component_ids {
998                        self.snapshot_queue.remove(id);
999                    }
1000                    let new_component_ids: Vec<String> = fetch_result
1001                        .components
1002                        .keys()
1003                        .cloned()
1004                        .collect();
1005                    self.component_tracker
1006                        .components
1007                        .extend(fetch_result.components);
1008                    self.component_tracker
1009                        .process_entrypoints(&fetch_result.dci_update);
1010                    self.component_tracker
1011                        .update_contracts(new_component_ids);
1012                    let mut snapshot = fetch_result.snapshot;
1013                    self.apply_deltas_to_snapshot(
1014                        &mut snapshot,
1015                        fetch_result.snapshot_block,
1016                        &fetch_result.contract_ids,
1017                    );
1018                    result.extend(snapshot);
1019                }
1020                Ok(Err(e)) => {
1021                    if e.is_transient() {
1022                        warn!(
1023                            components = ?p.component_ids,
1024                            extractor = %self.extractor_id.name,
1025                            err = %e,
1026                            "Background snapshot fetch failed transiently; will retry next block"
1027                        );
1028                        for id in &p.component_ids {
1029                            self.snapshot_queue
1030                                .insert(id.clone(), SnapshotStatus::RetryNext);
1031                        }
1032                    } else {
1033                        warn!(
1034                            components = ?p.component_ids,
1035                            extractor = %self.extractor_id.name,
1036                            err = %e,
1037                            "Background snapshot fetch failed permanently; \
1038                             components blacklisted until restart"
1039                        );
1040                        for id in &p.component_ids {
1041                            self.snapshot_queue
1042                                .insert(id.clone(), SnapshotStatus::Blacklisted);
1043                        }
1044                    }
1045                }
1046                Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
1047                    self.snapshot_tasks.push(p);
1048                }
1049                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
1050                    warn!(
1051                        components = ?p.component_ids,
1052                        extractor = %self.extractor_id.name,
1053                        "Background snapshot task dropped before sending result"
1054                    );
1055                    for id in &p.component_ids {
1056                        self.snapshot_queue.remove(id);
1057                    }
1058                }
1059            }
1060        }
1061
1062        result
1063    }
1064
1065    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
1066        if let Some(block) = self.last_synced_block.as_ref() {
1067            return incoming.parent_hash == block.hash;
1068        }
1069        false
1070    }
1071    fn filter_deltas(&self, deltas: &mut BlockAggregatedChanges) {
1072        deltas.filter_by_component(|id| {
1073            self.component_tracker
1074                .components
1075                .contains_key(id)
1076        });
1077        deltas.filter_by_contract(|id| {
1078            self.component_tracker
1079                .contracts
1080                .contains(id)
1081        });
1082    }
1083}
1084
1085#[async_trait]
1086impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
1087where
1088    R: RPCClient + Clone + Send + Sync + 'static,
1089    D: DeltasClient + Clone + Send + Sync + 'static,
1090{
1091    async fn initialize(&mut self) -> SyncResult<()> {
1092        info!("Retrieving relevant protocol components");
1093        self.component_tracker
1094            .initialise_components()
1095            .await?;
1096        info!(
1097            n_components = self.component_tracker.components.len(),
1098            n_contracts = self.component_tracker.contracts.len(),
1099            extractor = %self.extractor_id,
1100            "Finished retrieving components",
1101        );
1102
1103        Ok(())
1104    }
1105
1106    async fn start(
1107        mut self,
1108    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
1109        let (mut tx, rx) = channel(15);
1110        let (end_tx, end_rx) = oneshot::channel::<()>();
1111
1112        let jh = tokio::spawn(async move {
1113            let mut retry_count = 0;
1114            let mut current_end_rx = end_rx;
1115            let mut final_error = None;
1116
1117            while retry_count < self.max_retries {
1118                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
1119
1120                let prev_block = self
1121                    .last_synced_block
1122                    .as_ref()
1123                    .map(|h| h.number);
1124                let res = self
1125                    .state_sync(&mut tx, current_end_rx)
1126                    .await;
1127                let made_progress = self
1128                    .last_synced_block
1129                    .as_ref()
1130                    .map(|h| h.number) >
1131                    prev_block;
1132                match res {
1133                    Ok(()) => {
1134                        info!(
1135                            extractor_id=%&self.extractor_id,
1136                            retry_count,
1137                            "State synchronization exited cleanly"
1138                        );
1139                        return;
1140                    }
1141                    Err((e, maybe_end_rx)) => {
1142                        warn!(
1143                            extractor_id=%&self.extractor_id,
1144                            retry_count,
1145                            error=%e,
1146                            "State synchronization errored!"
1147                        );
1148
1149                        // If we have the end_rx back, we can retry
1150                        if let Some(recovered_end_rx) = maybe_end_rx {
1151                            current_end_rx = recovered_end_rx;
1152
1153                            if let SynchronizerError::ConnectionClosed = e {
1154                                // break synchronization loop if websocket client is dead
1155                                error!(
1156                                    "Websocket connection closed. State synchronization exiting."
1157                                );
1158                                let _ = tx.send(Err(e)).await;
1159                                return;
1160                            } else {
1161                                // Store error in case this is our last retry
1162                                final_error = Some(e);
1163                            }
1164                        } else {
1165                            // Close signal was received, exit cleanly
1166                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
1167                            return;
1168                        }
1169                    }
1170                }
1171                sleep(self.retry_cooldown).await;
1172                // A run that processed blocks is a healthy run — reset the counter so
1173                // transient failures after a long successful period get a fresh retry budget.
1174                if made_progress {
1175                    retry_count = 0;
1176                } else {
1177                    retry_count += 1;
1178                }
1179            }
1180            if let Some(e) = final_error {
1181                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
1182                let _ = tx.send(Err(e)).await;
1183            }
1184        });
1185
1186        let handle = SynchronizerTaskHandle::new(jh, end_tx);
1187        (handle, rx)
1188    }
1189}
1190
1191#[cfg(test)]
1192mod test {
1193    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
1194    //!
1195    //! ## Test Coverage Strategy:
1196    //!
1197    //! ### Shutdown & Close Signal Tests:
1198    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
1199    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
1200    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
1201    //!
1202    //! ### Cleanup & Error Handling Tests:
1203    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
1204    //!
1205    //! ### Coverage Summary:
1206    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
1207    //! ✓ Close signal before first deltas   ✓ Close signal during processing
1208    //! ✓ Processing errors                  ✓ Channel closure
1209    //! ✓ Public API close operations        ✓ Normal completion
1210
1211    use std::{collections::HashSet, sync::Arc};
1212
1213    use tycho_common::models::{
1214        blockchain::{
1215            AddressStorageLocation, Block, BlockAggregatedChanges, DCIUpdate, EntryPoint,
1216            EntryPointWithTracingParams, RPCTracerParams, TracingParams, TracingResult,
1217        },
1218        protocol::{ProtocolComponent, ProtocolComponentState},
1219        token::Token,
1220        Chain,
1221    };
1222    use uuid::Uuid;
1223
1224    use super::*;
1225    use crate::{
1226        deltas::MockDeltasClient,
1227        rpc::{MockRPCClient, Page},
1228        DeltasError, RPCError,
1229    };
1230
1231    // Required for mock client to implement clone
1232    struct ArcRPCClient<T>(Arc<T>);
1233
1234    // Default derive(Clone) does require T to be Clone as well.
1235    impl<T> Clone for ArcRPCClient<T> {
1236        fn clone(&self) -> Self {
1237            ArcRPCClient(self.0.clone())
1238        }
1239    }
1240
1241    #[async_trait]
1242    impl<T> RPCClient for ArcRPCClient<T>
1243    where
1244        T: RPCClient + Sync + Send + 'static,
1245    {
1246        async fn get_tokens(
1247            &self,
1248            params: crate::rpc::TokensParams,
1249        ) -> Result<crate::rpc::Page<Vec<Token>>, RPCError> {
1250            self.0.get_tokens(params).await
1251        }
1252
1253        async fn get_contract_state(
1254            &self,
1255            params: crate::rpc::ContractStateParams,
1256        ) -> Result<crate::rpc::Page<Vec<Account>>, RPCError> {
1257            self.0.get_contract_state(params).await
1258        }
1259
1260        async fn get_protocol_components(
1261            &self,
1262            params: crate::rpc::ProtocolComponentsParams,
1263        ) -> Result<crate::rpc::Page<Vec<ProtocolComponent>>, RPCError> {
1264            self.0
1265                .get_protocol_components(params)
1266                .await
1267        }
1268
1269        async fn get_protocol_states(
1270            &self,
1271            params: crate::rpc::ProtocolStatesParams,
1272        ) -> Result<crate::rpc::Page<Vec<ProtocolComponentState>>, RPCError> {
1273            self.0.get_protocol_states(params).await
1274        }
1275
1276        async fn get_protocol_systems(
1277            &self,
1278            params: crate::rpc::ProtocolSystemsParams,
1279        ) -> Result<crate::rpc::Page<crate::rpc::ProtocolSystems>, RPCError> {
1280            self.0
1281                .get_protocol_systems(params)
1282                .await
1283        }
1284
1285        async fn get_component_tvl(
1286            &self,
1287            params: crate::rpc::ComponentTvlParams,
1288        ) -> Result<crate::rpc::Page<HashMap<String, f64>>, RPCError> {
1289            self.0.get_component_tvl(params).await
1290        }
1291
1292        async fn get_traced_entry_points(
1293            &self,
1294            params: crate::rpc::TracedEntryPointsParams,
1295        ) -> Result<
1296            crate::rpc::Page<HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>>,
1297            RPCError,
1298        > {
1299            self.0
1300                .get_traced_entry_points(params)
1301                .await
1302        }
1303
1304        #[allow(clippy::extra_unused_lifetimes)]
1305        async fn get_snapshots<'a>(
1306            &self,
1307            request: &SnapshotParameters<'a>,
1308            chunk_size: Option<usize>,
1309            concurrency: usize,
1310        ) -> Result<Snapshot, RPCError> {
1311            self.0
1312                .get_snapshots(request, chunk_size, concurrency)
1313                .await
1314        }
1315
1316        fn compression(&self) -> bool {
1317            self.0.compression()
1318        }
1319    }
1320
1321    // Required for mock client to implement clone
1322    struct ArcDeltasClient<T>(Arc<T>);
1323
1324    // Default derive(Clone) does require T to be Clone as well.
1325    impl<T> Clone for ArcDeltasClient<T> {
1326        fn clone(&self) -> Self {
1327            ArcDeltasClient(self.0.clone())
1328        }
1329    }
1330
1331    #[async_trait]
1332    impl<T> DeltasClient for ArcDeltasClient<T>
1333    where
1334        T: DeltasClient + Sync + Send + 'static,
1335    {
1336        async fn subscribe(
1337            &self,
1338            extractor_id: tycho_common::models::ExtractorIdentity,
1339            options: SubscriptionOptions,
1340        ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError> {
1341            self.0
1342                .subscribe(extractor_id, options)
1343                .await
1344        }
1345
1346        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
1347            self.0
1348                .unsubscribe(subscription_id)
1349                .await
1350        }
1351
1352        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
1353            self.0.connect().await
1354        }
1355
1356        async fn close(&self) -> Result<(), DeltasError> {
1357            self.0.close().await
1358        }
1359    }
1360
1361    fn with_mocked_clients(
1362        native: bool,
1363        include_tvl: bool,
1364        rpc_client: Option<MockRPCClient>,
1365        deltas_client: Option<MockDeltasClient>,
1366    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
1367    {
1368        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
1369        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
1370
1371        ProtocolStateSynchronizer::new(
1372            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1373            native,
1374            ComponentFilter::with_tvl_range(50.0, 50.0),
1375            1,
1376            Duration::from_secs(0),
1377            true,
1378            include_tvl,
1379            true, // Does not matter as we mock the client that never compresses
1380            rpc_client,
1381            deltas_client,
1382            10_u64,
1383        )
1384    }
1385
1386    fn state_snapshot_native() -> Vec<ProtocolComponentState> {
1387        vec![ProtocolComponentState {
1388            component_id: "Component1".to_string(),
1389            attributes: HashMap::new(),
1390            balances: HashMap::new(),
1391        }]
1392    }
1393
1394    fn make_mock_client() -> MockRPCClient {
1395        let mut m = MockRPCClient::new();
1396        m.expect_compression()
1397            .return_const(false);
1398        m
1399    }
1400
1401    #[test_log::test(tokio::test)]
1402    async fn test_get_snapshots_native() {
1403        let header = BlockHeader::default();
1404        let mut rpc = make_mock_client();
1405        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1406
1407        let component_clone = component.clone();
1408        rpc.expect_get_snapshots()
1409            .returning(move |_request, _chunk_size, _concurrency| {
1410                Ok(Snapshot {
1411                    states: state_snapshot_native()
1412                        .into_iter()
1413                        .map(|state| {
1414                            (
1415                                state.component_id.clone(),
1416                                ComponentWithState {
1417                                    state,
1418                                    component: component_clone.clone(),
1419                                    entrypoints: vec![],
1420                                    component_tvl: None,
1421                                },
1422                            )
1423                        })
1424                        .collect(),
1425                    vm_storage: HashMap::new(),
1426                })
1427            });
1428
1429        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1430        state_sync
1431            .component_tracker
1432            .components
1433            .insert("Component1".to_string(), component.clone());
1434        let components_arg = ["Component1".to_string()];
1435        let exp = StateSyncMessage {
1436            header: header.clone(),
1437            snapshots: Snapshot {
1438                states: state_snapshot_native()
1439                    .into_iter()
1440                    .map(|state| {
1441                        (
1442                            state.component_id.clone(),
1443                            ComponentWithState {
1444                                state,
1445                                component: component.clone(),
1446                                entrypoints: vec![],
1447                                component_tvl: None,
1448                            },
1449                        )
1450                    })
1451                    .collect(),
1452                vm_storage: HashMap::new(),
1453            },
1454            deltas: None,
1455            removed_components: Default::default(),
1456        };
1457
1458        let req_ids: Vec<String> = components_arg.to_vec();
1459        let components: HashMap<_, _> = state_sync
1460            .component_tracker
1461            .components
1462            .iter()
1463            .filter(|(id, _)| req_ids.contains(id))
1464            .map(|(k, v)| (k.clone(), v.clone()))
1465            .collect();
1466        let contract_ids: HashSet<Bytes> = state_sync
1467            .component_tracker
1468            .get_contracts_by_component(&req_ids)
1469            .into_iter()
1470            .collect();
1471        let params = FetchSnapshotParams {
1472            chain: Chain::Ethereum,
1473            protocol_system: "uniswap-v2".to_string(),
1474            block_number: header.number,
1475            uses_dci: false,
1476            retrieve_balances: true,
1477            include_tvl: false,
1478        };
1479        let (snapshot, _, _) =
1480            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1481                .await
1482                .expect("Retrieving snapshot failed");
1483        let snap = StateSyncMessage {
1484            header: header.clone(),
1485            snapshots: snapshot,
1486            deltas: None,
1487            removed_components: Default::default(),
1488        };
1489
1490        assert_eq!(snap, exp);
1491    }
1492
1493    #[test_log::test(tokio::test)]
1494    async fn test_get_snapshots_native_with_tvl() {
1495        let header = BlockHeader::default();
1496        let mut rpc = make_mock_client();
1497        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1498
1499        let component_clone = component.clone();
1500        rpc.expect_get_snapshots()
1501            .returning(move |_request, _chunk_size, _concurrency| {
1502                Ok(Snapshot {
1503                    states: state_snapshot_native()
1504                        .into_iter()
1505                        .map(|state| {
1506                            (
1507                                state.component_id.clone(),
1508                                ComponentWithState {
1509                                    state,
1510                                    component: component_clone.clone(),
1511                                    component_tvl: Some(100.0),
1512                                    entrypoints: vec![],
1513                                },
1514                            )
1515                        })
1516                        .collect(),
1517                    vm_storage: HashMap::new(),
1518                })
1519            });
1520
1521        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1522        state_sync
1523            .component_tracker
1524            .components
1525            .insert("Component1".to_string(), component.clone());
1526        let components_arg = ["Component1".to_string()];
1527        let exp = StateSyncMessage {
1528            header: header.clone(),
1529            snapshots: Snapshot {
1530                states: state_snapshot_native()
1531                    .into_iter()
1532                    .map(|state| {
1533                        (
1534                            state.component_id.clone(),
1535                            ComponentWithState {
1536                                state,
1537                                component: component.clone(),
1538                                component_tvl: Some(100.0),
1539                                entrypoints: vec![],
1540                            },
1541                        )
1542                    })
1543                    .collect(),
1544                vm_storage: HashMap::new(),
1545            },
1546            deltas: None,
1547            removed_components: Default::default(),
1548        };
1549
1550        let req_ids: Vec<String> = components_arg.to_vec();
1551        let components: HashMap<_, _> = state_sync
1552            .component_tracker
1553            .components
1554            .iter()
1555            .filter(|(id, _)| req_ids.contains(id))
1556            .map(|(k, v)| (k.clone(), v.clone()))
1557            .collect();
1558        let contract_ids: HashSet<Bytes> = state_sync
1559            .component_tracker
1560            .get_contracts_by_component(&req_ids)
1561            .into_iter()
1562            .collect();
1563        let params = FetchSnapshotParams {
1564            chain: Chain::Ethereum,
1565            protocol_system: "uniswap-v2".to_string(),
1566            block_number: header.number,
1567            uses_dci: false,
1568            retrieve_balances: true,
1569            include_tvl: true,
1570        };
1571        let (snapshot, _, _) =
1572            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1573                .await
1574                .expect("Retrieving snapshot failed");
1575        let snap = StateSyncMessage {
1576            header: header.clone(),
1577            snapshots: snapshot,
1578            deltas: None,
1579            removed_components: Default::default(),
1580        };
1581
1582        assert_eq!(snap, exp);
1583    }
1584
1585    fn state_snapshot_vm() -> Vec<Account> {
1586        vec![
1587            Account::new(
1588                Chain::default(),
1589                Bytes::from("0x0badc0ffee"),
1590                String::new(),
1591                HashMap::new(),
1592                Bytes::default(),
1593                HashMap::new(),
1594                Bytes::default(),
1595                Bytes::default(),
1596                Bytes::default(),
1597                Bytes::default(),
1598                None,
1599            ),
1600            Account::new(
1601                Chain::default(),
1602                Bytes::from("0xbabe42"),
1603                String::new(),
1604                HashMap::new(),
1605                Bytes::default(),
1606                HashMap::new(),
1607                Bytes::default(),
1608                Bytes::default(),
1609                Bytes::default(),
1610                Bytes::default(),
1611                None,
1612            ),
1613        ]
1614    }
1615
1616    fn traced_entry_point_response(
1617    ) -> HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>> {
1618        HashMap::from([(
1619            "Component1".to_string(),
1620            vec![(
1621                EntryPointWithTracingParams {
1622                    entry_point: EntryPoint {
1623                        external_id: "entrypoint_a".to_string(),
1624                        target: Bytes::from("0x0badc0ffee"),
1625                        signature: "sig()".to_string(),
1626                    },
1627                    params: TracingParams::RPCTracer(RPCTracerParams {
1628                        caller: Some(Bytes::from("0x0badc0ffee")),
1629                        calldata: Bytes::from("0x0badc0ffee"),
1630                        state_overrides: None,
1631                        prune_addresses: None,
1632                    }),
1633                },
1634                TracingResult {
1635                    retriggers: HashSet::from([(
1636                        Bytes::from("0x0badc0ffee"),
1637                        AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1638                    )]),
1639                    accessed_slots: HashMap::from([(
1640                        Bytes::from("0x0badc0ffee"),
1641                        HashSet::from([Bytes::from("0xbadbeef0")]),
1642                    )]),
1643                },
1644            )],
1645        )])
1646    }
1647
1648    #[test_log::test(tokio::test)]
1649    async fn test_get_snapshots_vm() {
1650        let header = BlockHeader::default();
1651        let mut rpc = make_mock_client();
1652
1653        let traced_ep_response = traced_entry_point_response();
1654        rpc.expect_get_snapshots()
1655            .returning(move |_request, _chunk_size, _concurrency| {
1656                let vm_storage_accounts = state_snapshot_vm();
1657                Ok(Snapshot {
1658                    states: [(
1659                        "Component1".to_string(),
1660                        ComponentWithState {
1661                            state: ProtocolComponentState {
1662                                component_id: "Component1".to_string(),
1663                                attributes: HashMap::new(),
1664                                balances: HashMap::new(),
1665                            },
1666                            component: ProtocolComponent {
1667                                id: "Component1".to_string(),
1668                                contract_addresses: vec![
1669                                    Bytes::from("0x0badc0ffee"),
1670                                    Bytes::from("0xbabe42"),
1671                                ],
1672                                ..Default::default()
1673                            },
1674                            component_tvl: None,
1675                            entrypoints: traced_ep_response
1676                                .get("Component1")
1677                                .cloned()
1678                                .unwrap_or_default(),
1679                        },
1680                    )]
1681                    .into_iter()
1682                    .collect(),
1683                    vm_storage: vm_storage_accounts
1684                        .into_iter()
1685                        .map(|account| (account.address.clone(), account))
1686                        .collect(),
1687                })
1688            });
1689
1690        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1691        let component = ProtocolComponent {
1692            id: "Component1".to_string(),
1693            contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1694            ..Default::default()
1695        };
1696        state_sync
1697            .component_tracker
1698            .components
1699            .insert("Component1".to_string(), component.clone());
1700        let components_arg = ["Component1".to_string()];
1701        let exp = StateSyncMessage {
1702            header: header.clone(),
1703            snapshots: Snapshot {
1704                states: [(
1705                    component.id.clone(),
1706                    ComponentWithState {
1707                        state: ProtocolComponentState {
1708                            component_id: "Component1".to_string(),
1709                            attributes: HashMap::new(),
1710                            balances: HashMap::new(),
1711                        },
1712                        component: component.clone(),
1713                        component_tvl: None,
1714                        entrypoints: traced_entry_point_response()
1715                            .remove("Component1")
1716                            .unwrap_or_default(),
1717                    },
1718                )]
1719                .into_iter()
1720                .collect(),
1721                vm_storage: state_snapshot_vm()
1722                    .into_iter()
1723                    .map(|account| (account.address.clone(), account))
1724                    .collect(),
1725            },
1726            deltas: None,
1727            removed_components: Default::default(),
1728        };
1729
1730        let req_ids: Vec<String> = components_arg.to_vec();
1731        let components: HashMap<_, _> = state_sync
1732            .component_tracker
1733            .components
1734            .iter()
1735            .filter(|(id, _)| req_ids.contains(id))
1736            .map(|(k, v)| (k.clone(), v.clone()))
1737            .collect();
1738        let contract_ids: HashSet<Bytes> = state_sync
1739            .component_tracker
1740            .get_contracts_by_component(&req_ids)
1741            .into_iter()
1742            .collect();
1743        let params = FetchSnapshotParams {
1744            chain: Chain::Ethereum,
1745            protocol_system: "uniswap-v2".to_string(),
1746            block_number: header.number,
1747            uses_dci: false,
1748            retrieve_balances: false,
1749            include_tvl: false,
1750        };
1751        let (snapshot, _, _) =
1752            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1753                .await
1754                .expect("Retrieving snapshot failed");
1755        let snap = StateSyncMessage {
1756            header: header.clone(),
1757            snapshots: snapshot,
1758            deltas: None,
1759            removed_components: Default::default(),
1760        };
1761
1762        assert_eq!(snap, exp);
1763    }
1764
1765    #[test_log::test(tokio::test)]
1766    async fn test_get_snapshots_vm_with_tvl() {
1767        let header = BlockHeader::default();
1768        let mut rpc = make_mock_client();
1769        let component = ProtocolComponent {
1770            id: "Component1".to_string(),
1771            contract_addresses: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1772            ..Default::default()
1773        };
1774
1775        let component_clone = component.clone();
1776        rpc.expect_get_snapshots()
1777            .returning(move |_request, _chunk_size, _concurrency| {
1778                let vm_storage_accounts = state_snapshot_vm();
1779                Ok(Snapshot {
1780                    states: [(
1781                        "Component1".to_string(),
1782                        ComponentWithState {
1783                            state: ProtocolComponentState {
1784                                component_id: "Component1".to_string(),
1785                                attributes: HashMap::new(),
1786                                balances: HashMap::new(),
1787                            },
1788                            component: component_clone.clone(),
1789                            component_tvl: Some(100.0),
1790                            entrypoints: vec![],
1791                        },
1792                    )]
1793                    .into_iter()
1794                    .collect(),
1795                    vm_storage: vm_storage_accounts
1796                        .into_iter()
1797                        .map(|account| (account.address.clone(), account))
1798                        .collect(),
1799                })
1800            });
1801
1802        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1803        state_sync
1804            .component_tracker
1805            .components
1806            .insert("Component1".to_string(), component.clone());
1807        let components_arg = ["Component1".to_string()];
1808        let exp = StateSyncMessage {
1809            header: header.clone(),
1810            snapshots: Snapshot {
1811                states: [(
1812                    component.id.clone(),
1813                    ComponentWithState {
1814                        state: ProtocolComponentState {
1815                            component_id: "Component1".to_string(),
1816                            attributes: HashMap::new(),
1817                            balances: HashMap::new(),
1818                        },
1819                        component: component.clone(),
1820                        component_tvl: Some(100.0),
1821                        entrypoints: vec![],
1822                    },
1823                )]
1824                .into_iter()
1825                .collect(),
1826                vm_storage: state_snapshot_vm()
1827                    .into_iter()
1828                    .map(|account| (account.address.clone(), account))
1829                    .collect(),
1830            },
1831            deltas: None,
1832            removed_components: Default::default(),
1833        };
1834
1835        let req_ids: Vec<String> = components_arg.to_vec();
1836        let components: HashMap<_, _> = state_sync
1837            .component_tracker
1838            .components
1839            .iter()
1840            .filter(|(id, _)| req_ids.contains(id))
1841            .map(|(k, v)| (k.clone(), v.clone()))
1842            .collect();
1843        let contract_ids: HashSet<Bytes> = state_sync
1844            .component_tracker
1845            .get_contracts_by_component(&req_ids)
1846            .into_iter()
1847            .collect();
1848        let params = FetchSnapshotParams {
1849            chain: Chain::Ethereum,
1850            protocol_system: "uniswap-v2".to_string(),
1851            block_number: header.number,
1852            uses_dci: false,
1853            retrieve_balances: false,
1854            include_tvl: true,
1855        };
1856        let (snapshot, _, _) =
1857            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1858                .await
1859                .expect("Retrieving snapshot failed");
1860        let snap = StateSyncMessage {
1861            header: header.clone(),
1862            snapshots: snapshot,
1863            deltas: None,
1864            removed_components: Default::default(),
1865        };
1866
1867        assert_eq!(snap, exp);
1868    }
1869
1870    /// Test that get_snapshots only fetches snapshots for requested components,
1871    /// not all tracked components. This prevents returning full snapshots repeatedly
1872    /// when only a subset of components need updates.
1873    #[test_log::test(tokio::test)]
1874    async fn test_get_snapshots_filters_to_requested_components_only() {
1875        let header = BlockHeader::default();
1876        let mut rpc = make_mock_client();
1877
1878        // Create three components
1879        let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1880        let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1881        let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1882
1883        let component2_clone = component2.clone();
1884
1885        // Mock the RPC call and verify it only receives Component2
1886        rpc.expect_get_snapshots()
1887            .withf(
1888                |request: &SnapshotParameters,
1889                 _chunk_size: &Option<usize>,
1890                 _concurrency: &usize| {
1891                    // Verify that the request contains ONLY Component2, not all tracked components
1892                    request.components.len() == 1 &&
1893                        request
1894                            .components
1895                            .contains_key("Component2")
1896                },
1897            )
1898            .times(1)
1899            .returning(move |_request, _chunk_size, _concurrency| {
1900                Ok(Snapshot {
1901                    states: [(
1902                        "Component2".to_string(),
1903                        ComponentWithState {
1904                            state: ProtocolComponentState {
1905                                component_id: "Component2".to_string(),
1906                                attributes: HashMap::new(),
1907                                balances: HashMap::new(),
1908                            },
1909                            component: component2_clone.clone(),
1910                            entrypoints: vec![],
1911                            component_tvl: None,
1912                        },
1913                    )]
1914                    .into_iter()
1915                    .collect(),
1916                    vm_storage: HashMap::new(),
1917                })
1918            });
1919
1920        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1921
1922        // Track all three components
1923        state_sync
1924            .component_tracker
1925            .components
1926            .insert("Component1".to_string(), component1.clone());
1927        state_sync
1928            .component_tracker
1929            .components
1930            .insert("Component2".to_string(), component2.clone());
1931        state_sync
1932            .component_tracker
1933            .components
1934            .insert("Component3".to_string(), component3.clone());
1935
1936        // Request snapshot for ONLY Component2
1937        let components_arg = ["Component2".to_string()];
1938        let req_ids: Vec<String> = components_arg.to_vec();
1939        let components: HashMap<_, _> = state_sync
1940            .component_tracker
1941            .components
1942            .iter()
1943            .filter(|(id, _)| req_ids.contains(id))
1944            .map(|(k, v)| (k.clone(), v.clone()))
1945            .collect();
1946        let contract_ids: HashSet<Bytes> = state_sync
1947            .component_tracker
1948            .get_contracts_by_component(&req_ids)
1949            .into_iter()
1950            .collect();
1951        let params = FetchSnapshotParams {
1952            chain: Chain::Ethereum,
1953            protocol_system: "uniswap-v2".to_string(),
1954            block_number: header.number,
1955            uses_dci: false,
1956            retrieve_balances: true,
1957            include_tvl: false,
1958        };
1959        let (snapshot, _, _) =
1960            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
1961                .await
1962                .expect("Retrieving snapshot failed");
1963
1964        // Verify we only got Component2 back
1965        assert_eq!(snapshot.states.len(), 1);
1966        assert!(snapshot
1967            .states
1968            .contains_key("Component2"));
1969        assert!(!snapshot
1970            .states
1971            .contains_key("Component1"));
1972        assert!(!snapshot
1973            .states
1974            .contains_key("Component3"));
1975    }
1976
1977    fn mock_clients_for_state_sync(
1978        bg_done: Option<Arc<tokio::sync::Notify>>,
1979    ) -> (MockRPCClient, MockDeltasClient, Sender<BlockAggregatedChanges>) {
1980        let mut rpc_client = make_mock_client();
1981        // Mocks for the start_tracking call, these need to come first because they are more
1982        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1983        rpc_client
1984            .expect_get_protocol_components()
1985            .withf(|params: &crate::rpc::ProtocolComponentsParams| {
1986                params
1987                    .component_ids()
1988                    .is_some_and(|ids| ids.contains(&"Component3".to_string()))
1989            })
1990            .returning(|_| {
1991                // return Component3
1992                Ok(Page::new(
1993                    vec![
1994                        // this component shall have a tvl update above threshold
1995                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1996                    ],
1997                    1,
1998                    0,
1999                    100,
2000                ))
2001            });
2002        // Mock get_snapshots for Component3
2003        rpc_client
2004            .expect_get_snapshots()
2005            .withf(
2006                |request: &SnapshotParameters,
2007                 _chunk_size: &Option<usize>,
2008                 _concurrency: &usize| {
2009                    request
2010                        .components
2011                        .contains_key("Component3")
2012                },
2013            )
2014            .returning(move |_request, _chunk_size, _concurrency| {
2015                let snap = Ok(Snapshot {
2016                    states: [(
2017                        "Component3".to_string(),
2018                        ComponentWithState {
2019                            state: ProtocolComponentState::new(
2020                                "Component3",
2021                                Default::default(),
2022                                Default::default(),
2023                            ),
2024                            component: ProtocolComponent {
2025                                id: "Component3".to_string(),
2026                                ..Default::default()
2027                            },
2028                            component_tvl: Some(1000.0),
2029                            entrypoints: vec![],
2030                        },
2031                    )]
2032                    .into_iter()
2033                    .collect(),
2034                    vm_storage: HashMap::new(),
2035                });
2036                if let Some(n) = &bg_done {
2037                    n.notify_one();
2038                }
2039                snap
2040            });
2041
2042        // mock calls for the initial state snapshots
2043        rpc_client
2044            .expect_get_protocol_components()
2045            .returning(|_| {
2046                // Initial sync of components
2047                Ok(Page::new(
2048                    vec![
2049                        // this component shall have a tvl update above threshold
2050                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2051                        // this component shall have a tvl update below threshold.
2052                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2053                        // a third component will have a tvl update above threshold
2054                    ],
2055                    2,
2056                    0,
2057                    100,
2058                ))
2059            });
2060
2061        rpc_client
2062            .expect_get_snapshots()
2063            .returning(|_request, _chunk_size, _concurrency| {
2064                Ok(Snapshot {
2065                    states: [
2066                        (
2067                            "Component1".to_string(),
2068                            ComponentWithState {
2069                                state: ProtocolComponentState::new(
2070                                    "Component1",
2071                                    Default::default(),
2072                                    Default::default(),
2073                                ),
2074                                component: ProtocolComponent {
2075                                    id: "Component1".to_string(),
2076                                    ..Default::default()
2077                                },
2078                                component_tvl: Some(100.0),
2079                                entrypoints: vec![],
2080                            },
2081                        ),
2082                        (
2083                            "Component2".to_string(),
2084                            ComponentWithState {
2085                                state: ProtocolComponentState::new(
2086                                    "Component2",
2087                                    Default::default(),
2088                                    Default::default(),
2089                                ),
2090                                component: ProtocolComponent {
2091                                    id: "Component2".to_string(),
2092                                    ..Default::default()
2093                                },
2094                                component_tvl: Some(0.0),
2095                                entrypoints: vec![],
2096                            },
2097                        ),
2098                    ]
2099                    .into_iter()
2100                    .collect(),
2101                    vm_storage: HashMap::new(),
2102                })
2103            });
2104
2105        // Mock get_traced_entry_points for Ethereum chain
2106        rpc_client
2107            .expect_get_traced_entry_points()
2108            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2109
2110        // Mock deltas client and messages
2111        let mut deltas_client = MockDeltasClient::new();
2112        let (tx, rx) = channel(1);
2113        deltas_client
2114            .expect_subscribe()
2115            .return_once(move |_, _| {
2116                // Return subscriber id and a channel
2117                Ok((Uuid::default(), rx))
2118            });
2119
2120        // Expect unsubscribe call during cleanup
2121        deltas_client
2122            .expect_unsubscribe()
2123            .return_once(|_| Ok(()));
2124
2125        (rpc_client, deltas_client, tx)
2126    }
2127
2128    /// Test strategy
2129    ///
2130    /// - initial snapshot retrieval returns component1 and component2 as snapshots
2131    /// - block 1: DCI update for Component1; no new components
2132    /// - block 2: Component3 TVL crosses threshold → background snapshot task spawned; snapshot
2133    ///   appears in block 3 after the background task drains
2134    /// - block 3: empty block; drain produces Component3 snapshot
2135    #[test_log::test(tokio::test)]
2136    async fn test_state_sync() {
2137        let bg_done = Arc::new(tokio::sync::Notify::new());
2138        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(Some(bg_done.clone()));
2139        let deltas = [
2140            BlockAggregatedChanges {
2141                extractor: "uniswap-v2".to_string(),
2142                chain: Chain::Ethereum,
2143                block: Block {
2144                    number: 1,
2145                    hash: Bytes::from("0x01"),
2146                    parent_hash: Bytes::from("0x00"),
2147                    chain: Chain::Ethereum,
2148                    ts: Default::default(),
2149                },
2150                revert: false,
2151                dci_update: DCIUpdate {
2152                    new_entrypoints: HashMap::from([(
2153                        "Component1".to_string(),
2154                        HashSet::from([EntryPoint {
2155                            external_id: "entrypoint_a".to_string(),
2156                            target: Bytes::from("0x0badc0ffee"),
2157                            signature: "sig()".to_string(),
2158                        }]),
2159                    )]),
2160                    new_entrypoint_params: HashMap::from([(
2161                        "entrypoint_a".to_string(),
2162                        HashSet::from([(
2163                            TracingParams::RPCTracer(RPCTracerParams {
2164                                caller: Some(Bytes::from("0x0badc0ffee")),
2165                                calldata: Bytes::from("0x0badc0ffee"),
2166                                state_overrides: None,
2167                                prune_addresses: None,
2168                            }),
2169                            "Component1".to_string(),
2170                        )]),
2171                    )]),
2172                    trace_results: HashMap::from([(
2173                        "entrypoint_a".to_string(),
2174                        TracingResult {
2175                            retriggers: HashSet::from([(
2176                                Bytes::from("0x0badc0ffee"),
2177                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
2178                            )]),
2179                            accessed_slots: HashMap::from([(
2180                                Bytes::from("0x0badc0ffee"),
2181                                HashSet::from([Bytes::from("0xbadbeef0")]),
2182                            )]),
2183                        },
2184                    )]),
2185                },
2186                ..Default::default()
2187            },
2188            BlockAggregatedChanges {
2189                extractor: "uniswap-v2".to_string(),
2190                chain: Chain::Ethereum,
2191                block: Block {
2192                    number: 2,
2193                    hash: Bytes::from("0x02"),
2194                    parent_hash: Bytes::from("0x01"),
2195                    chain: Chain::Ethereum,
2196                    ts: Default::default(),
2197                },
2198                revert: false,
2199                component_tvl: [
2200                    ("Component1".to_string(), 100.0),
2201                    ("Component2".to_string(), 0.0),
2202                    ("Component3".to_string(), 1000.0),
2203                ]
2204                .into_iter()
2205                .collect(),
2206                ..Default::default()
2207            },
2208            // Block 3: empty block; the background task for Component3 should have completed,
2209            // so drain_completed_snapshots returns the Component3 snapshot.
2210            BlockAggregatedChanges {
2211                extractor: "uniswap-v2".to_string(),
2212                chain: Chain::Ethereum,
2213                block: Block {
2214                    number: 3,
2215                    hash: Bytes::from("0x03"),
2216                    parent_hash: Bytes::from("0x02"),
2217                    chain: Chain::Ethereum,
2218                    ts: Default::default(),
2219                },
2220                revert: false,
2221                ..Default::default()
2222            },
2223        ];
2224        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
2225        state_sync
2226            .initialize()
2227            .await
2228            .expect("Init failed");
2229
2230        // Test starts here
2231        let (handle, mut rx) = state_sync.start().await;
2232        let (jh, close_tx) = handle.split();
2233        tx.send(deltas[0].clone())
2234            .await
2235            .expect("deltas channel msg 0 closed!");
2236        let first_msg = timeout(Duration::from_millis(200), rx.recv())
2237            .await
2238            .expect("waiting for first state msg timed out!")
2239            .expect("state sync block sender closed!");
2240        tx.send(deltas[1].clone())
2241            .await
2242            .expect("deltas channel msg 1 closed!");
2243        let second_msg = timeout(Duration::from_millis(200), rx.recv())
2244            .await
2245            .expect("waiting for second state msg timed out!")
2246            .expect("state sync block sender closed!");
2247        // Wait for the background snapshot task to complete before sending block 3.
2248        bg_done.notified().await;
2249        tx.send(deltas[2].clone())
2250            .await
2251            .expect("deltas channel msg 2 closed!");
2252        let third_msg = timeout(Duration::from_millis(200), rx.recv())
2253            .await
2254            .expect("waiting for third state msg timed out!")
2255            .expect("state sync block sender closed!");
2256        let _ = close_tx.send(());
2257        jh.await
2258            .expect("state sync task panicked!");
2259
2260        // assertions
2261        let exp1 = StateSyncMessage {
2262            header: BlockHeader {
2263                number: 1,
2264                hash: Bytes::from("0x01"),
2265                parent_hash: Bytes::from("0x00"),
2266                revert: false,
2267                ..Default::default()
2268            },
2269            snapshots: Snapshot {
2270                states: [
2271                    (
2272                        "Component1".to_string(),
2273                        ComponentWithState {
2274                            state: ProtocolComponentState::new(
2275                                "Component1",
2276                                Default::default(),
2277                                Default::default(),
2278                            ),
2279                            component: ProtocolComponent {
2280                                id: "Component1".to_string(),
2281                                ..Default::default()
2282                            },
2283                            component_tvl: Some(100.0),
2284                            entrypoints: vec![],
2285                        },
2286                    ),
2287                    (
2288                        "Component2".to_string(),
2289                        ComponentWithState {
2290                            state: ProtocolComponentState::new(
2291                                "Component2",
2292                                Default::default(),
2293                                Default::default(),
2294                            ),
2295                            component: ProtocolComponent {
2296                                id: "Component2".to_string(),
2297                                ..Default::default()
2298                            },
2299                            component_tvl: Some(0.0),
2300                            entrypoints: vec![],
2301                        },
2302                    ),
2303                ]
2304                .into_iter()
2305                .collect(),
2306                vm_storage: HashMap::new(),
2307            },
2308            deltas: Some(deltas[0].clone()),
2309            removed_components: Default::default(),
2310        };
2311
2312        // Block 2: Component3 snapshot task is spawned in the background. Component3 is not
2313        // yet tracked, so it is filtered from component_tvl. Snapshot is empty.
2314        let exp2 = StateSyncMessage {
2315            header: BlockHeader {
2316                number: 2,
2317                hash: Bytes::from("0x02"),
2318                parent_hash: Bytes::from("0x01"),
2319                revert: false,
2320                ..Default::default()
2321            },
2322            snapshots: Snapshot::default(),
2323            deltas: Some(BlockAggregatedChanges {
2324                extractor: "uniswap-v2".to_string(),
2325                chain: Chain::Ethereum,
2326                block: Block {
2327                    number: 2,
2328                    hash: Bytes::from("0x02"),
2329                    parent_hash: Bytes::from("0x01"),
2330                    chain: Chain::Ethereum,
2331                    ts: Default::default(),
2332                },
2333                revert: false,
2334                component_tvl: [
2335                    // Component2 removed (tvl=0), Component3 not yet tracked → filtered out.
2336                    ("Component1".to_string(), 100.0),
2337                ]
2338                .into_iter()
2339                .collect(),
2340                ..Default::default()
2341            }),
2342            // "Component2" was removed, because its tvl changed to 0.
2343            removed_components: [(
2344                "Component2".to_string(),
2345                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2346            )]
2347            .into_iter()
2348            .collect(),
2349        };
2350
2351        // Block 3: background task has completed; Component3 snapshot is drained and included.
2352        let exp3 = StateSyncMessage {
2353            header: BlockHeader {
2354                number: 3,
2355                hash: Bytes::from("0x03"),
2356                parent_hash: Bytes::from("0x02"),
2357                revert: false,
2358                ..Default::default()
2359            },
2360            snapshots: Snapshot {
2361                states: [(
2362                    "Component3".to_string(),
2363                    ComponentWithState {
2364                        state: ProtocolComponentState::new(
2365                            "Component3",
2366                            Default::default(),
2367                            Default::default(),
2368                        ),
2369                        component: ProtocolComponent {
2370                            id: "Component3".to_string(),
2371                            ..Default::default()
2372                        },
2373                        component_tvl: Some(1000.0),
2374                        entrypoints: vec![],
2375                    },
2376                )]
2377                .into_iter()
2378                .collect(),
2379                vm_storage: HashMap::new(),
2380            },
2381            deltas: Some(deltas[2].clone()),
2382            removed_components: Default::default(),
2383        };
2384        assert_eq!(first_msg.unwrap(), exp1);
2385        assert_eq!(second_msg.unwrap(), exp2);
2386        assert_eq!(third_msg.unwrap(), exp3);
2387    }
2388
2389    #[test_log::test(tokio::test)]
2390    async fn test_state_sync_with_tvl_range() {
2391        let remove_tvl_threshold = 5.0;
2392        let add_tvl_threshold = 7.0;
2393        let bg_done = Arc::new(tokio::sync::Notify::new());
2394
2395        let mut rpc_client = make_mock_client();
2396        let mut deltas_client = MockDeltasClient::new();
2397
2398        rpc_client
2399            .expect_get_protocol_components()
2400            .withf(|params: &crate::rpc::ProtocolComponentsParams| {
2401                params
2402                    .component_ids()
2403                    .is_some_and(|ids| ids.contains(&"Component3".to_string()))
2404            })
2405            .returning(|_| {
2406                Ok(Page::new(
2407                    vec![ProtocolComponent { id: "Component3".to_string(), ..Default::default() }],
2408                    1,
2409                    0,
2410                    100,
2411                ))
2412            });
2413        // Mock get_snapshots for Component3
2414        let bg_done_clone = bg_done.clone();
2415        rpc_client
2416            .expect_get_snapshots()
2417            .withf(
2418                |request: &SnapshotParameters,
2419                 _chunk_size: &Option<usize>,
2420                 _concurrency: &usize| {
2421                    request
2422                        .components
2423                        .contains_key("Component3")
2424                },
2425            )
2426            .returning(move |_request, _chunk_size, _concurrency| {
2427                let snap = Ok(Snapshot {
2428                    states: [(
2429                        "Component3".to_string(),
2430                        ComponentWithState {
2431                            state: ProtocolComponentState::new(
2432                                "Component3",
2433                                Default::default(),
2434                                Default::default(),
2435                            ),
2436                            component: ProtocolComponent {
2437                                id: "Component3".to_string(),
2438                                ..Default::default()
2439                            },
2440                            component_tvl: Some(10.0),
2441                            entrypoints: vec![],
2442                        },
2443                    )]
2444                    .into_iter()
2445                    .collect(),
2446                    vm_storage: HashMap::new(),
2447                });
2448                bg_done_clone.notify_one();
2449                snap
2450            });
2451
2452        // Mock for the initial snapshot retrieval
2453        rpc_client
2454            .expect_get_protocol_components()
2455            .returning(|_| {
2456                Ok(Page::new(
2457                    vec![
2458                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
2459                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2460                    ],
2461                    2,
2462                    0,
2463                    100,
2464                ))
2465            });
2466
2467        // Mock get_snapshots for initial snapshot
2468        rpc_client
2469            .expect_get_snapshots()
2470            .returning(|_request, _chunk_size, _concurrency| {
2471                Ok(Snapshot {
2472                    states: [
2473                        (
2474                            "Component1".to_string(),
2475                            ComponentWithState {
2476                                state: ProtocolComponentState::new(
2477                                    "Component1",
2478                                    Default::default(),
2479                                    Default::default(),
2480                                ),
2481                                component: ProtocolComponent {
2482                                    id: "Component1".to_string(),
2483                                    ..Default::default()
2484                                },
2485                                component_tvl: Some(6.0),
2486                                entrypoints: vec![],
2487                            },
2488                        ),
2489                        (
2490                            "Component2".to_string(),
2491                            ComponentWithState {
2492                                state: ProtocolComponentState::new(
2493                                    "Component2",
2494                                    Default::default(),
2495                                    Default::default(),
2496                                ),
2497                                component: ProtocolComponent {
2498                                    id: "Component2".to_string(),
2499                                    ..Default::default()
2500                                },
2501                                component_tvl: Some(2.0),
2502                                entrypoints: vec![],
2503                            },
2504                        ),
2505                    ]
2506                    .into_iter()
2507                    .collect(),
2508                    vm_storage: HashMap::new(),
2509                })
2510            });
2511
2512        // Mock get_traced_entry_points for Ethereum chain
2513        rpc_client
2514            .expect_get_traced_entry_points()
2515            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2516
2517        let (tx, rx) = channel(1);
2518        deltas_client
2519            .expect_subscribe()
2520            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2521
2522        // Expect unsubscribe call during cleanup
2523        deltas_client
2524            .expect_unsubscribe()
2525            .return_once(|_| Ok(()));
2526
2527        let mut state_sync = ProtocolStateSynchronizer::new(
2528            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2529            true,
2530            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
2531            1,
2532            Duration::from_secs(0),
2533            true,
2534            true,
2535            true,
2536            ArcRPCClient(Arc::new(rpc_client)),
2537            ArcDeltasClient(Arc::new(deltas_client)),
2538            10_u64,
2539        );
2540        state_sync
2541            .initialize()
2542            .await
2543            .expect("Init failed");
2544
2545        // Simulate the incoming BlockAggregatedChanges
2546        let deltas = [
2547            BlockAggregatedChanges {
2548                extractor: "uniswap-v2".to_string(),
2549                chain: Chain::Ethereum,
2550                block: Block {
2551                    number: 1,
2552                    hash: Bytes::from("0x01"),
2553                    parent_hash: Bytes::from("0x00"),
2554                    chain: Chain::Ethereum,
2555                    ts: Default::default(),
2556                },
2557                revert: false,
2558                ..Default::default()
2559            },
2560            BlockAggregatedChanges {
2561                extractor: "uniswap-v2".to_string(),
2562                chain: Chain::Ethereum,
2563                block: Block {
2564                    number: 2,
2565                    hash: Bytes::from("0x02"),
2566                    parent_hash: Bytes::from("0x01"),
2567                    chain: Chain::Ethereum,
2568                    ts: Default::default(),
2569                },
2570                revert: false,
2571                component_tvl: [
2572                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
2573                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
2574                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
2575                ]
2576                .into_iter()
2577                .collect(),
2578                ..Default::default()
2579            },
2580            // Block 3: empty; background task for Component3 should have completed.
2581            BlockAggregatedChanges {
2582                extractor: "uniswap-v2".to_string(),
2583                chain: Chain::Ethereum,
2584                block: Block {
2585                    number: 3,
2586                    hash: Bytes::from("0x03"),
2587                    parent_hash: Bytes::from("0x02"),
2588                    chain: Chain::Ethereum,
2589                    ts: Default::default(),
2590                },
2591                revert: false,
2592                ..Default::default()
2593            },
2594        ];
2595
2596        let (handle, mut rx) = state_sync.start().await;
2597        let (jh, close_tx) = handle.split();
2598
2599        // Simulate sending delta messages
2600        tx.send(deltas[0].clone())
2601            .await
2602            .expect("deltas channel msg 0 closed!");
2603
2604        // Expecting to receive the initial state message
2605        let _ = timeout(Duration::from_millis(200), rx.recv())
2606            .await
2607            .expect("waiting for first state msg timed out!")
2608            .expect("state sync block sender closed!");
2609
2610        // Send the second message, which should trigger TVL-based changes.
2611        // Component3 snapshot is deferred to background; not in this block's message.
2612        tx.send(deltas[1].clone())
2613            .await
2614            .expect("deltas channel msg 1 closed!");
2615        let second_msg = timeout(Duration::from_millis(200), rx.recv())
2616            .await
2617            .expect("waiting for second state msg timed out!")
2618            .expect("state sync block sender closed!")
2619            .expect("no error");
2620
2621        // Wait for the background snapshot task to complete before sending block 3.
2622        bg_done.notified().await;
2623
2624        tx.send(deltas[2].clone())
2625            .await
2626            .expect("deltas channel msg 2 closed!");
2627        let third_msg = timeout(Duration::from_millis(200), rx.recv())
2628            .await
2629            .expect("waiting for third state msg timed out!")
2630            .expect("state sync block sender closed!")
2631            .expect("no error");
2632
2633        let _ = close_tx.send(());
2634        jh.await
2635            .expect("state sync task panicked!");
2636
2637        // Block 2: Component3 task spawned; snapshot is empty, Component3 filtered from deltas.
2638        let expected_second_msg = StateSyncMessage {
2639            header: BlockHeader {
2640                number: 2,
2641                hash: Bytes::from("0x02"),
2642                parent_hash: Bytes::from("0x01"),
2643                revert: false,
2644                ..Default::default()
2645            },
2646            snapshots: Snapshot::default(),
2647            deltas: Some(BlockAggregatedChanges {
2648                extractor: "uniswap-v2".to_string(),
2649                chain: Chain::Ethereum,
2650                block: Block {
2651                    number: 2,
2652                    hash: Bytes::from("0x02"),
2653                    parent_hash: Bytes::from("0x01"),
2654                    chain: Chain::Ethereum,
2655                    ts: Default::default(),
2656                },
2657                revert: false,
2658                component_tvl: [
2659                    ("Component1".to_string(), 6.0), /* Within range, should not trigger changes
2660                                                      * Component3 not yet tracked → filtered
2661                                                      * out */
2662                ]
2663                .into_iter()
2664                .collect(),
2665                ..Default::default()
2666            }),
2667            removed_components: [(
2668                "Component2".to_string(),
2669                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2670            )]
2671            .into_iter()
2672            .collect(),
2673        };
2674
2675        // Block 3: background task drained; Component3 snapshot present.
2676        let expected_third_msg = StateSyncMessage {
2677            header: BlockHeader {
2678                number: 3,
2679                hash: Bytes::from("0x03"),
2680                parent_hash: Bytes::from("0x02"),
2681                revert: false,
2682                ..Default::default()
2683            },
2684            snapshots: Snapshot {
2685                states: [(
2686                    "Component3".to_string(),
2687                    ComponentWithState {
2688                        state: ProtocolComponentState::new(
2689                            "Component3",
2690                            Default::default(),
2691                            Default::default(),
2692                        ),
2693                        component: ProtocolComponent {
2694                            id: "Component3".to_string(),
2695                            ..Default::default()
2696                        },
2697                        component_tvl: Some(10.0),
2698                        entrypoints: vec![],
2699                    },
2700                )]
2701                .into_iter()
2702                .collect(),
2703                vm_storage: HashMap::new(),
2704            },
2705            deltas: Some(deltas[2].clone()),
2706            removed_components: Default::default(),
2707        };
2708
2709        assert_eq!(second_msg, expected_second_msg);
2710        assert_eq!(third_msg, expected_third_msg);
2711    }
2712
2713    #[test_log::test(tokio::test)]
2714    async fn test_public_close_api_functionality() {
2715        // Tests the public close() API through the StateSynchronizer trait:
2716        // - close() fails before start() is called
2717        // - close() succeeds while synchronizer is running
2718        // - close() fails after already closed
2719        // This tests the full start/close lifecycle via the public API
2720
2721        let mut rpc_client = make_mock_client();
2722        let mut deltas_client = MockDeltasClient::new();
2723
2724        // Mock the initial components call
2725        rpc_client
2726            .expect_get_protocol_components()
2727            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2728
2729        // Set up deltas client that will wait for messages (blocking in state_sync)
2730        let (_tx, rx) = channel(1);
2731        deltas_client
2732            .expect_subscribe()
2733            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2734
2735        // Expect unsubscribe call during cleanup
2736        deltas_client
2737            .expect_unsubscribe()
2738            .return_once(|_| Ok(()));
2739
2740        let mut state_sync = ProtocolStateSynchronizer::new(
2741            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2742            true,
2743            ComponentFilter::with_tvl_range(0.0, 0.0),
2744            5, // Enough retries
2745            Duration::from_secs(0),
2746            true,
2747            false,
2748            true,
2749            ArcRPCClient(Arc::new(rpc_client)),
2750            ArcDeltasClient(Arc::new(deltas_client)),
2751            10000_u64, // Long timeout so task doesn't exit on its own
2752        );
2753
2754        state_sync
2755            .initialize()
2756            .await
2757            .expect("Init should succeed");
2758
2759        // Start the synchronizer and test the new split-based close mechanism
2760        let (handle, _rx) = state_sync.start().await;
2761        let (jh, close_tx) = handle.split();
2762
2763        // Give it time to start up and enter state_sync
2764        tokio::time::sleep(Duration::from_millis(100)).await;
2765
2766        // Send close signal should succeed
2767        close_tx
2768            .send(())
2769            .expect("Should be able to send close signal");
2770        // Task should stop cleanly
2771        jh.await.expect("Task should not panic");
2772    }
2773
2774    #[test_log::test(tokio::test)]
2775    async fn test_cleanup_runs_when_state_sync_processing_errors() {
2776        // Tests that cleanup code runs when state_sync() errors during delta processing.
2777        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
2778        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
2779
2780        let mut rpc_client = make_mock_client();
2781        let mut deltas_client = MockDeltasClient::new();
2782
2783        // Mock the initial components call
2784        rpc_client
2785            .expect_get_protocol_components()
2786            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2787
2788        // Mock to fail during snapshot retrieval (this will cause an error during processing)
2789        rpc_client
2790            .expect_get_protocol_states()
2791            .returning(|_| {
2792                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2793            });
2794
2795        // Set up deltas client to send one message that will trigger snapshot retrieval
2796        let (tx, rx) = channel(10);
2797        deltas_client
2798            .expect_subscribe()
2799            .return_once(move |_, _| {
2800                // Send a delta message that will require a snapshot
2801                let delta = BlockAggregatedChanges {
2802                    extractor: "test".to_string(),
2803                    chain: Chain::Ethereum,
2804                    block: Block {
2805                        hash: Bytes::from("0x0123"),
2806                        number: 1,
2807                        parent_hash: Bytes::from("0x0000"),
2808                        chain: Chain::Ethereum,
2809                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2810                            .unwrap()
2811                            .naive_utc(),
2812                    },
2813                    revert: false,
2814                    // Add a new component to trigger snapshot request
2815                    new_protocol_components: [(
2816                        "new_component".to_string(),
2817                        ProtocolComponent { id: "new_component".to_string(), ..Default::default() },
2818                    )]
2819                    .into_iter()
2820                    .collect(),
2821                    component_tvl: [("new_component".to_string(), 100.0)]
2822                        .into_iter()
2823                        .collect(),
2824                    ..Default::default()
2825                };
2826
2827                tokio::spawn(async move {
2828                    let _ = tx.send(delta).await;
2829                    // Close the channel after sending one message
2830                });
2831
2832                Ok((Uuid::default(), rx))
2833            });
2834
2835        // Expect unsubscribe call during cleanup
2836        deltas_client
2837            .expect_unsubscribe()
2838            .return_once(|_| Ok(()));
2839
2840        let mut state_sync = ProtocolStateSynchronizer::new(
2841            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2842            true,
2843            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2844            1,
2845            Duration::from_secs(0),
2846            true,
2847            false,
2848            true,
2849            ArcRPCClient(Arc::new(rpc_client)),
2850            ArcDeltasClient(Arc::new(deltas_client)),
2851            5000_u64,
2852        );
2853
2854        state_sync
2855            .initialize()
2856            .await
2857            .expect("Init should succeed");
2858
2859        // Before calling state_sync, set a value in last_synced_block
2860        state_sync.last_synced_block = Some(BlockHeader {
2861            hash: Bytes::from("0x0badc0ffee"),
2862            number: 42,
2863            parent_hash: Bytes::from("0xbadbeef0"),
2864            revert: false,
2865            timestamp: 123456789,
2866            partial_block_index: None,
2867        });
2868
2869        // Create a channel for state_sync to send messages to
2870        let (mut block_tx, _block_rx) = channel(10);
2871
2872        // Call state_sync directly - this should error during processing
2873        let (_end_tx, end_rx) = oneshot::channel::<()>();
2874        let result = state_sync
2875            .state_sync(&mut block_tx, end_rx)
2876            .await;
2877        // Verify that state_sync returned an error
2878        assert!(result.is_err(), "state_sync should have errored during processing");
2879
2880        // Note: We can't verify internal state cleanup since state_sync consumes self,
2881        // but the cleanup logic is still tested by the fact that the method returns properly.
2882    }
2883
2884    #[test_log::test(tokio::test)]
2885    async fn test_close_signal_while_waiting_for_first_deltas() {
2886        // Tests close signal handling during the initial "waiting for deltas" phase.
2887        // This is the earliest possible close scenario - before any deltas are received.
2888        // Verifies: close signal received while waiting for first message triggers cleanup
2889        let mut rpc_client = make_mock_client();
2890        let mut deltas_client = MockDeltasClient::new();
2891
2892        rpc_client
2893            .expect_get_protocol_components()
2894            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2895
2896        let (_tx, rx) = channel(1);
2897        deltas_client
2898            .expect_subscribe()
2899            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2900
2901        deltas_client
2902            .expect_unsubscribe()
2903            .return_once(|_| Ok(()));
2904
2905        let mut state_sync = ProtocolStateSynchronizer::new(
2906            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2907            true,
2908            ComponentFilter::with_tvl_range(0.0, 0.0),
2909            1,
2910            Duration::from_secs(0),
2911            true,
2912            true,
2913            false,
2914            ArcRPCClient(Arc::new(rpc_client)),
2915            ArcDeltasClient(Arc::new(deltas_client)),
2916            10000_u64,
2917        );
2918
2919        state_sync
2920            .initialize()
2921            .await
2922            .expect("Init should succeed");
2923
2924        let (mut block_tx, _block_rx) = channel(10);
2925        let (end_tx, end_rx) = oneshot::channel::<()>();
2926
2927        // Start state_sync in a task
2928        let state_sync_handle = tokio::spawn(async move {
2929            state_sync
2930                .state_sync(&mut block_tx, end_rx)
2931                .await
2932        });
2933
2934        // Give it a moment to start
2935        tokio::time::sleep(Duration::from_millis(100)).await;
2936
2937        // Send close signal
2938        let _ = end_tx.send(());
2939
2940        // state_sync should exit cleanly
2941        let result = state_sync_handle
2942            .await
2943            .expect("Task should not panic");
2944        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2945
2946        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2947    }
2948
2949    #[test_log::test(tokio::test)]
2950    async fn test_close_signal_during_main_processing_loop() {
2951        // Tests close signal handling during the main delta processing loop.
2952        // This tests the scenario where first message is processed successfully,
2953        // then close signal is received while waiting for subsequent deltas.
2954        // Verifies: close signal in main loop (after initialization) triggers cleanup
2955
2956        let mut rpc_client = make_mock_client();
2957        let mut deltas_client = MockDeltasClient::new();
2958
2959        // Mock the initial components call
2960        rpc_client
2961            .expect_get_protocol_components()
2962            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2963
2964        // Mock the snapshot retrieval that happens after first message
2965        rpc_client
2966            .expect_get_protocol_states()
2967            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
2968
2969        rpc_client
2970            .expect_get_component_tvl()
2971            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2972
2973        rpc_client
2974            .expect_get_traced_entry_points()
2975            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
2976
2977        // Set up deltas client to send one message, then keep channel open
2978        let (tx, rx) = channel(10);
2979        deltas_client
2980            .expect_subscribe()
2981            .return_once(move |_, _| {
2982                // Send first message immediately
2983                let first_delta = BlockAggregatedChanges {
2984                    extractor: "test".to_string(),
2985                    chain: Chain::Ethereum,
2986                    block: Block {
2987                        hash: Bytes::from("0x0123"),
2988                        number: 1,
2989                        parent_hash: Bytes::from("0x0000"),
2990                        chain: Chain::Ethereum,
2991                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2992                            .unwrap()
2993                            .naive_utc(),
2994                    },
2995                    revert: false,
2996                    ..Default::default()
2997                };
2998
2999                tokio::spawn(async move {
3000                    let _ = tx.send(first_delta).await;
3001                    // Keep the sender alive but don't send more messages
3002                    // This will make the recv() block waiting for the next message
3003                    tokio::time::sleep(Duration::from_secs(30)).await;
3004                });
3005
3006                Ok((Uuid::default(), rx))
3007            });
3008
3009        deltas_client
3010            .expect_unsubscribe()
3011            .return_once(|_| Ok(()));
3012
3013        let mut state_sync = ProtocolStateSynchronizer::new(
3014            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3015            true,
3016            ComponentFilter::with_tvl_range(0.0, 1000.0),
3017            1,
3018            Duration::from_secs(0),
3019            true,
3020            false,
3021            true,
3022            ArcRPCClient(Arc::new(rpc_client)),
3023            ArcDeltasClient(Arc::new(deltas_client)),
3024            10000_u64,
3025        );
3026
3027        state_sync
3028            .initialize()
3029            .await
3030            .expect("Init should succeed");
3031
3032        let (mut block_tx, mut block_rx) = channel(10);
3033        let (end_tx, end_rx) = oneshot::channel::<()>();
3034
3035        // Start state_sync in a task
3036        let state_sync_handle = tokio::spawn(async move {
3037            state_sync
3038                .state_sync(&mut block_tx, end_rx)
3039                .await
3040        });
3041
3042        // Wait for the first message to be processed (snapshot sent)
3043        let first_snapshot = block_rx
3044            .recv()
3045            .await
3046            .expect("Should receive first snapshot")
3047            .expect("Synchronizer error");
3048        assert!(
3049            !first_snapshot
3050                .snapshots
3051                .states
3052                .is_empty() ||
3053                first_snapshot.deltas.is_some()
3054        );
3055        // Now send close signal - this should be handled in the main processing loop
3056        let _ = end_tx.send(());
3057
3058        // state_sync should exit cleanly after receiving close signal in main loop
3059        let result = state_sync_handle
3060            .await
3061            .expect("Task should not panic");
3062        assert!(
3063            result.is_ok(),
3064            "state_sync should exit cleanly when closed after first message: {result:?}"
3065        );
3066    }
3067
3068    #[test_log::test(tokio::test)]
3069    async fn test_max_retries_exceeded_error_propagation() {
3070        // Test that when max_retries is exceeded, the final error is sent through the channel
3071        // to the receiver and the synchronizer task exits cleanly
3072
3073        let mut rpc_client = make_mock_client();
3074        let mut deltas_client = MockDeltasClient::new();
3075
3076        // Mock the initial components call to succeed
3077        rpc_client
3078            .expect_get_protocol_components()
3079            .returning(|_| Ok(Page::new(vec![], 0, 0, 0)));
3080
3081        // Set up deltas client to consistently fail after subscription
3082        // This will cause connection errors and trigger retries
3083        deltas_client
3084            .expect_subscribe()
3085            .returning(|_, _| {
3086                // Return a connection error to trigger retries
3087                Err(DeltasError::NotConnected)
3088            });
3089
3090        // Expect multiple unsubscribe calls during retries
3091        deltas_client
3092            .expect_unsubscribe()
3093            .returning(|_| Ok(()))
3094            .times(0..=5);
3095
3096        // Create synchronizer with only 2 retries and short cooldown
3097        let mut state_sync = ProtocolStateSynchronizer::new(
3098            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
3099            true,
3100            ComponentFilter::with_tvl_range(0.0, 1000.0),
3101            2,                         // max_retries = 2
3102            Duration::from_millis(10), // short retry cooldown
3103            true,
3104            false,
3105            true,
3106            ArcRPCClient(Arc::new(rpc_client)),
3107            ArcDeltasClient(Arc::new(deltas_client)),
3108            1000_u64,
3109        );
3110
3111        state_sync
3112            .initialize()
3113            .await
3114            .expect("Init should succeed");
3115
3116        // Start the synchronizer - it should fail to subscribe and retry
3117        let (handle, mut rx) = state_sync.start().await;
3118        let (jh, _close_tx) = handle.split();
3119
3120        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
3121            .await
3122            .expect("responsds in time")
3123            .expect("channel open");
3124
3125        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
3126        if let Err(err) = res {
3127            assert!(
3128                matches!(err, SynchronizerError::ConnectionClosed),
3129                "Expected ConnectionClosed error, got: {:?}",
3130                err
3131            );
3132        } else {
3133            panic!("Expected an error")
3134        }
3135
3136        // The task should complete (not hang) after max retries
3137        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
3138        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
3139    }
3140
3141    #[test_log::test(tokio::test)]
3142    async fn test_is_next_expected() {
3143        // Test the is_next_expected function to ensure it correctly identifies
3144        // when an incoming block is the expected next block in the chain
3145
3146        let mut state_sync = with_mocked_clients(true, false, None, None);
3147
3148        // Test 1: No previous block - should return false
3149        let incoming_header = BlockHeader {
3150            number: 100,
3151            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3152            parent_hash: Bytes::from(
3153                "0x0000000000000000000000000000000000000000000000000000000000000000",
3154            ),
3155            revert: false,
3156            timestamp: 123456789,
3157            partial_block_index: None,
3158        };
3159        assert!(
3160            !state_sync.is_next_expected(&incoming_header),
3161            "Should return false when no previous block is set"
3162        );
3163
3164        // Test 2: Set a previous block and test with matching parent hash
3165        let previous_header = BlockHeader {
3166            number: 99,
3167            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
3168            parent_hash: Bytes::from(
3169                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
3170            ),
3171            revert: false,
3172            timestamp: 123456788,
3173            partial_block_index: None,
3174        };
3175        state_sync.last_synced_block = Some(previous_header.clone());
3176
3177        assert!(
3178            state_sync.is_next_expected(&incoming_header),
3179            "Should return true when incoming parent_hash matches previous hash"
3180        );
3181
3182        // Test 3: Test with non-matching parent hash
3183        let non_matching_header = BlockHeader {
3184            number: 100,
3185            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
3186            parent_hash: Bytes::from(
3187                "0x1111111111111111111111111111111111111111111111111111111111111111",
3188            ), // Wrong parent hash
3189            revert: false,
3190            timestamp: 123456789,
3191            partial_block_index: None,
3192        };
3193        assert!(
3194            !state_sync.is_next_expected(&non_matching_header),
3195            "Should return false when incoming parent_hash doesn't match previous hash"
3196        );
3197    }
3198
3199    #[test_log::test(tokio::test)]
3200    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
3201        // Test that on synchronizer restart with the next expected block,
3202        // get_snapshot is not called and only deltas are sent
3203
3204        let mut rpc_client = make_mock_client();
3205        let mut deltas_client = MockDeltasClient::new();
3206
3207        // Mock the initial components call
3208        rpc_client
3209            .expect_get_protocol_components()
3210            .returning(|_| {
3211                Ok(Page::new(
3212                    vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3213                    1,
3214                    0,
3215                    100,
3216                ))
3217            });
3218
3219        // Set up deltas client to send a message that is the next expected block
3220        let (tx, rx) = channel(10);
3221        deltas_client
3222            .expect_subscribe()
3223            .return_once(move |_, _| {
3224                let expected_next_delta = BlockAggregatedChanges {
3225                    extractor: "uniswap-v2".to_string(),
3226                    chain: Chain::Ethereum,
3227                    block: Block {
3228                        hash: Bytes::from(
3229                            "0x0000000000000000000000000000000000000000000000000000000000000002",
3230                        ), // This will be the next expected block
3231                        number: 2,
3232                        parent_hash: Bytes::from(
3233                            "0x0000000000000000000000000000000000000000000000000000000000000001",
3234                        ), // This matches our last synced block hash
3235                        chain: Chain::Ethereum,
3236                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
3237                            .unwrap()
3238                            .naive_utc(),
3239                    },
3240                    revert: false,
3241                    ..Default::default()
3242                };
3243
3244                tokio::spawn(async move {
3245                    let _ = tx.send(expected_next_delta).await;
3246                });
3247
3248                Ok((Uuid::default(), rx))
3249            });
3250
3251        deltas_client
3252            .expect_unsubscribe()
3253            .return_once(|_| Ok(()));
3254
3255        let mut state_sync = ProtocolStateSynchronizer::new(
3256            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3257            true,
3258            ComponentFilter::with_tvl_range(0.0, 1000.0),
3259            1,
3260            Duration::from_secs(0),
3261            true, // include_snapshots = true
3262            false,
3263            true,
3264            ArcRPCClient(Arc::new(rpc_client)),
3265            ArcDeltasClient(Arc::new(deltas_client)),
3266            10000_u64,
3267        );
3268
3269        // Initialize and set up the last synced block to simulate a restart scenario
3270        state_sync
3271            .initialize()
3272            .await
3273            .expect("Init should succeed");
3274
3275        // Set last_synced_block to simulate that we've previously synced block 1
3276        state_sync.last_synced_block = Some(BlockHeader {
3277            number: 1,
3278            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
3279            parent_hash: Bytes::from(
3280                "0x0000000000000000000000000000000000000000000000000000000000000000",
3281            ),
3282            revert: false,
3283            timestamp: 123456789,
3284            partial_block_index: None,
3285        });
3286
3287        let (mut block_tx, mut block_rx) = channel(10);
3288        let (end_tx, end_rx) = oneshot::channel::<()>();
3289
3290        // Start state_sync
3291        let state_sync_handle = tokio::spawn(async move {
3292            state_sync
3293                .state_sync(&mut block_tx, end_rx)
3294                .await
3295        });
3296
3297        // Wait for the message - it should be a delta-only message (no snapshots)
3298        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
3299            .await
3300            .expect("Should receive message within timeout")
3301            .expect("Channel should be open")
3302            .expect("Should not be an error");
3303
3304        // Send close signal
3305        let _ = end_tx.send(());
3306
3307        // Wait for state_sync to finish
3308        let _ = state_sync_handle
3309            .await
3310            .expect("Task should not panic");
3311
3312        // Verify the message contains deltas but no snapshots
3313        // (because we skipped snapshot retrieval)
3314        assert!(result_msg.deltas.is_some(), "Should contain deltas");
3315        assert!(
3316            result_msg.snapshots.states.is_empty(),
3317            "Should not contain snapshots when next expected block is received"
3318        );
3319
3320        // Verify the block details match our expected next block
3321        if let Some(deltas) = &result_msg.deltas {
3322            assert_eq!(deltas.block.number, 2);
3323            assert_eq!(
3324                deltas.block.hash,
3325                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
3326            );
3327            assert_eq!(
3328                deltas.block.parent_hash,
3329                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
3330            );
3331        }
3332    }
3333
3334    #[test_log::test(tokio::test)]
3335    async fn test_skip_previously_processed_messages() {
3336        // Test that the synchronizer skips messages for blocks that have already been processed
3337        // This simulates a service restart scenario where old messages are re-emitted
3338
3339        let mut rpc_client = make_mock_client();
3340        let mut deltas_client = MockDeltasClient::new();
3341
3342        // Mock the initial components call
3343        rpc_client
3344            .expect_get_protocol_components()
3345            .returning(|_| {
3346                Ok(Page::new(
3347                    vec![ProtocolComponent { id: "Component1".to_string(), ..Default::default() }],
3348                    1,
3349                    0,
3350                    100,
3351                ))
3352            });
3353
3354        // Mock snapshot calls for when we process the expected next block (block 6)
3355        rpc_client
3356            .expect_get_protocol_states()
3357            .returning(|_| {
3358                Ok(Page::new(
3359                    vec![ProtocolComponentState::new(
3360                        "Component1",
3361                        Default::default(),
3362                        Default::default(),
3363                    )],
3364                    1,
3365                    0,
3366                    100,
3367                ))
3368            });
3369
3370        rpc_client
3371            .expect_get_component_tvl()
3372            .returning(|_| {
3373                Ok(Page::new(
3374                    [("Component1".to_string(), 100.0)]
3375                        .into_iter()
3376                        .collect(),
3377                    1,
3378                    0,
3379                    100,
3380                ))
3381            });
3382
3383        rpc_client
3384            .expect_get_traced_entry_points()
3385            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3386
3387        // Set up deltas client to send old messages first, then the expected next block
3388        let (tx, rx) = channel(10);
3389        deltas_client
3390            .expect_subscribe()
3391            .return_once(move |_, _| {
3392                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
3393                let old_messages = vec![
3394                    BlockAggregatedChanges {
3395                        extractor: "uniswap-v2".to_string(),
3396                        chain: Chain::Ethereum,
3397                        block: Block {
3398                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3399                            number: 3,
3400                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
3401                            chain: Chain::Ethereum,
3402                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
3403                        },
3404                        revert: false,
3405                        ..Default::default()
3406                    },
3407                    BlockAggregatedChanges {
3408                        extractor: "uniswap-v2".to_string(),
3409                        chain: Chain::Ethereum,
3410                        block: Block {
3411                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3412                            number: 4,
3413                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
3414                            chain: Chain::Ethereum,
3415                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
3416                        },
3417                        revert: false,
3418                        ..Default::default()
3419                    },
3420                    BlockAggregatedChanges {
3421                        extractor: "uniswap-v2".to_string(),
3422                        chain: Chain::Ethereum,
3423                        block: Block {
3424                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3425                            number: 5,
3426                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
3427                            chain: Chain::Ethereum,
3428                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
3429                        },
3430                        revert: false,
3431                        ..Default::default()
3432                    },
3433                    // This is the expected next block (block 6)
3434                    BlockAggregatedChanges {
3435                        extractor: "uniswap-v2".to_string(),
3436                        chain: Chain::Ethereum,
3437                        block: Block {
3438                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
3439                            number: 6,
3440                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3441                            chain: Chain::Ethereum,
3442                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
3443                        },
3444                        revert: false,
3445                        ..Default::default()
3446                    },
3447                ];
3448
3449                tokio::spawn(async move {
3450                    for message in old_messages {
3451                        let _ = tx.send(message).await;
3452                        tokio::time::sleep(Duration::from_millis(10)).await;
3453                    }
3454                });
3455
3456                Ok((Uuid::default(), rx))
3457            });
3458
3459        deltas_client
3460            .expect_unsubscribe()
3461            .return_once(|_| Ok(()));
3462
3463        let mut state_sync = ProtocolStateSynchronizer::new(
3464            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
3465            true,
3466            ComponentFilter::with_tvl_range(0.0, 1000.0),
3467            1,
3468            Duration::from_secs(0),
3469            true,
3470            true,
3471            true,
3472            ArcRPCClient(Arc::new(rpc_client)),
3473            ArcDeltasClient(Arc::new(deltas_client)),
3474            10000_u64,
3475        );
3476
3477        // Initialize and set last_synced_block to simulate we've already processed block 5
3478        state_sync
3479            .initialize()
3480            .await
3481            .expect("Init should succeed");
3482
3483        state_sync.last_synced_block = Some(BlockHeader {
3484            number: 5,
3485            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
3486            parent_hash: Bytes::from(
3487                "0x0000000000000000000000000000000000000000000000000000000000000004",
3488            ),
3489            revert: false,
3490            timestamp: 1234567892,
3491            partial_block_index: None,
3492        });
3493
3494        let (mut block_tx, mut block_rx) = channel(10);
3495        let (end_tx, end_rx) = oneshot::channel::<()>();
3496
3497        // Start state_sync
3498        let state_sync_handle = tokio::spawn(async move {
3499            state_sync
3500                .state_sync(&mut block_tx, end_rx)
3501                .await
3502        });
3503
3504        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
3505        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
3506            .await
3507            .expect("Should receive message within timeout")
3508            .expect("Channel should be open")
3509            .expect("Should not be an error");
3510
3511        // Send close signal
3512        let _ = end_tx.send(());
3513
3514        // Wait for state_sync to finish
3515        let _ = state_sync_handle
3516            .await
3517            .expect("Task should not panic");
3518
3519        // Verify we only got the message for block 6 (the expected next block)
3520        assert!(result_msg.deltas.is_some(), "Should contain deltas");
3521        if let Some(deltas) = &result_msg.deltas {
3522            assert_eq!(
3523                deltas.block.number, 6,
3524                "Should only process block 6, skipping earlier blocks"
3525            );
3526            assert_eq!(
3527                deltas.block.hash,
3528                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
3529            );
3530        }
3531
3532        // Verify that no additional messages are received immediately
3533        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
3534        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3535            Err(_) => {
3536                // Timeout is expected - no more messages should come
3537            }
3538            Ok(Some(Err(_))) => {
3539                // Error received is also acceptable (connection closed)
3540            }
3541            Ok(Some(Ok(_))) => {
3542                panic!("Should not receive additional messages - old blocks should be skipped");
3543            }
3544            Ok(None) => {
3545                // Channel closed is also acceptable
3546            }
3547        }
3548    }
3549
3550    fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockAggregatedChanges {
3551        // Use vec to create Bytes from block number
3552        let hash = Bytes::from(vec![block_num as u8; 32]);
3553        let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
3554        BlockAggregatedChanges {
3555            extractor: "uniswap-v2".to_string(),
3556            chain: Chain::Ethereum,
3557            block: Block {
3558                number: block_num,
3559                hash,
3560                parent_hash,
3561                chain: Chain::Ethereum,
3562                ts: Default::default(),
3563            },
3564            revert: false,
3565            partial_block_index: partial_idx,
3566            ..Default::default()
3567        }
3568    }
3569
3570    /// Test that full block as first message in partial mode is accepted
3571    #[test_log::test(tokio::test)]
3572    async fn test_partial_mode_accepts_full_block_as_first_message() {
3573        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3574        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3575            .with_partial_blocks(true);
3576        state_sync
3577            .initialize()
3578            .await
3579            .expect("Init failed");
3580
3581        let (handle, mut block_rx) = state_sync.start().await;
3582        let (jh, close_tx) = handle.split();
3583
3584        // Send full block as first message - should be accepted
3585        tx.send(make_block_changes(1, None))
3586            .await
3587            .unwrap();
3588
3589        // Should receive the full block immediately
3590        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3591            .await
3592            .expect("Should receive message")
3593            .expect("Channel open")
3594            .expect("No error");
3595
3596        assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
3597        assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
3598
3599        let _ = close_tx.send(());
3600        jh.await.expect("Task should not panic");
3601    }
3602
3603    /// Test that block number increase is detected as new block
3604    #[test_log::test(tokio::test)]
3605    async fn test_partial_mode_detects_block_number_increase() {
3606        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3607        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3608            .with_partial_blocks(true);
3609        state_sync
3610            .initialize()
3611            .await
3612            .expect("Init failed");
3613
3614        let (handle, mut block_rx) = state_sync.start().await;
3615        let (jh, close_tx) = handle.split();
3616
3617        // Send partial messages for block 1 (will be skipped - waiting for new block)
3618        tx.send(make_block_changes(1, Some(0)))
3619            .await
3620            .unwrap();
3621        tx.send(make_block_changes(1, Some(3)))
3622            .await
3623            .unwrap();
3624
3625        // Verify no message received yet
3626        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3627            Err(_) => { /* Expected: timeout, no message yet */ }
3628            Ok(_) => panic!("Should not receive message while waiting for new block"),
3629        }
3630
3631        // Send partial for block 2 with HIGHER index (5 > 3) - should still be detected
3632        // because block number increased
3633        tx.send(make_block_changes(2, Some(5)))
3634            .await
3635            .unwrap();
3636
3637        // Should receive the message for block 2
3638        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3639            .await
3640            .expect("Should receive message")
3641            .expect("Channel open")
3642            .expect("No error");
3643
3644        assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3645        assert_eq!(msg.header.partial_block_index, Some(5));
3646
3647        let _ = close_tx.send(());
3648        jh.await.expect("Task should not panic");
3649    }
3650
3651    /// Test that partial mode skips new blocks that are already synced
3652    #[test_log::test(tokio::test)]
3653    async fn test_partial_mode_skips_already_synced_blocks() {
3654        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync(None);
3655        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3656            .with_partial_blocks(true);
3657        state_sync
3658            .initialize()
3659            .await
3660            .expect("Init failed");
3661
3662        // Set last_synced_block to block 5 - we've already synced up to here
3663        state_sync.last_synced_block = Some(BlockHeader {
3664            number: 5,
3665            hash: Bytes::from("0x05"),
3666            parent_hash: Bytes::from("0x04"),
3667            revert: false,
3668            timestamp: 0,
3669            partial_block_index: None,
3670        });
3671
3672        let (handle, mut block_rx) = state_sync.start().await;
3673        let (jh, close_tx) = handle.split();
3674
3675        // Send partial for block 3 to establish baseline
3676        tx.send(make_block_changes(3, Some(2)))
3677            .await
3678            .unwrap();
3679
3680        // Send "new block" for block 4 (partial index decreased) - but block 4 < last_synced (5)
3681        tx.send(make_block_changes(4, Some(0)))
3682            .await
3683            .unwrap();
3684
3685        // Should be skipped because block 4 is already synced
3686        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3687            Err(_) => { /* Expected: skipped because already synced */ }
3688            Ok(_) => panic!("Should skip block 4 because it's already synced"),
3689        }
3690
3691        // Now send new block for block 6 (after last_synced)
3692        // First establish new partial index
3693        tx.send(make_block_changes(5, Some(3)))
3694            .await
3695            .unwrap();
3696        // Then trigger new block detection
3697        tx.send(make_block_changes(6, Some(0)))
3698            .await
3699            .unwrap();
3700
3701        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3702            .await
3703            .expect("Should receive message")
3704            .expect("Channel open")
3705            .expect("No error");
3706
3707        assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3708
3709        let _ = close_tx.send(());
3710        jh.await.expect("Task should not panic");
3711    }
3712
3713    #[test_log::test(tokio::test)]
3714    async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3715        let header = BlockHeader::default();
3716        let mut rpc = make_mock_client();
3717        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3718
3719        let component_clone = component.clone();
3720        rpc.expect_get_snapshots()
3721            .returning(move |_request, _chunk_size, _concurrency| {
3722                Ok(Snapshot {
3723                    states: [(
3724                        "Component1".to_string(),
3725                        ComponentWithState {
3726                            state: ProtocolComponentState::new(
3727                                "Component1",
3728                                Default::default(),
3729                                Default::default(),
3730                            ),
3731                            component: component_clone.clone(),
3732                            entrypoints: vec![],
3733                            component_tvl: None,
3734                        },
3735                    )]
3736                    .into_iter()
3737                    .collect(),
3738                    vm_storage: HashMap::new(),
3739                })
3740            });
3741
3742        // get_traced_entry_points should NOT be called for a non-DCI protocol
3743        rpc.expect_get_traced_entry_points()
3744            .never();
3745
3746        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3747        // uses_dci defaults to false, no .with_dci() call needed
3748        state_sync
3749            .component_tracker
3750            .components
3751            .insert("Component1".to_string(), component);
3752
3753        let components_arg = ["Component1".to_string()];
3754        let req_ids: Vec<String> = components_arg.to_vec();
3755        let components: HashMap<_, _> = state_sync
3756            .component_tracker
3757            .components
3758            .iter()
3759            .filter(|(id, _)| req_ids.contains(id))
3760            .map(|(k, v)| (k.clone(), v.clone()))
3761            .collect();
3762        let contract_ids: HashSet<Bytes> = state_sync
3763            .component_tracker
3764            .get_contracts_by_component(&req_ids)
3765            .into_iter()
3766            .collect();
3767        let params = FetchSnapshotParams {
3768            chain: Chain::Ethereum,
3769            protocol_system: "uniswap-v2".to_string(),
3770            block_number: header.number,
3771            uses_dci: false,
3772            retrieve_balances: true,
3773            include_tvl: false,
3774        };
3775        let (snapshot, _, _) =
3776            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
3777                .await
3778                .expect("Retrieving snapshot failed");
3779
3780        assert!(snapshot
3781            .states
3782            .contains_key("Component1"));
3783    }
3784
3785    #[test_log::test(tokio::test)]
3786    async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3787        let header = BlockHeader::default();
3788        let mut rpc = make_mock_client();
3789        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3790
3791        let component_clone = component.clone();
3792        rpc.expect_get_snapshots()
3793            .returning(move |_request, _chunk_size, _concurrency| {
3794                Ok(Snapshot {
3795                    states: [(
3796                        "Component1".to_string(),
3797                        ComponentWithState {
3798                            state: ProtocolComponentState::new(
3799                                "Component1",
3800                                Default::default(),
3801                                Default::default(),
3802                            ),
3803                            component: component_clone.clone(),
3804                            entrypoints: vec![],
3805                            component_tvl: None,
3806                        },
3807                    )]
3808                    .into_iter()
3809                    .collect(),
3810                    vm_storage: HashMap::new(),
3811                })
3812            });
3813
3814        // get_traced_entry_points SHOULD be called for a DCI protocol
3815        rpc.expect_get_traced_entry_points()
3816            .times(1)
3817            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
3818
3819        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3820        state_sync
3821            .component_tracker
3822            .components
3823            .insert("Component1".to_string(), component);
3824
3825        let components_arg = ["Component1".to_string()];
3826        let req_ids: Vec<String> = components_arg.to_vec();
3827        let components: HashMap<_, _> = state_sync
3828            .component_tracker
3829            .components
3830            .iter()
3831            .filter(|(id, _)| req_ids.contains(id))
3832            .map(|(k, v)| (k.clone(), v.clone()))
3833            .collect();
3834        let contract_ids: HashSet<Bytes> = state_sync
3835            .component_tracker
3836            .get_contracts_by_component(&req_ids)
3837            .into_iter()
3838            .collect();
3839        let params = FetchSnapshotParams {
3840            chain: Chain::Ethereum,
3841            protocol_system: "uniswap-v2".to_string(),
3842            block_number: header.number,
3843            uses_dci: true,
3844            retrieve_balances: true,
3845            include_tvl: false,
3846        };
3847        let (snapshot, _, _) =
3848            fetch_snapshot(&state_sync.rpc_client, components, contract_ids, &params)
3849                .await
3850                .expect("Retrieving snapshot failed");
3851
3852        assert!(snapshot
3853            .states
3854            .contains_key("Component1"));
3855    }
3856
3857    /// Test that in partial-blocks mode, new components are deferred until the block number
3858    /// increments (confirming the previous block is complete), then fired as a background task at
3859    /// the previous block's height. The snapshots appear in the first message of the block AFTER
3860    /// the one where the task was fired.
3861    ///
3862    /// Timeline:
3863    /// - Block 1 (full): initial sync
3864    /// - Block 2 partial: BrandNew + Preexisting added to deferred set (no task yet)
3865    /// - Block 3 partial: block number increments → task fired at snapshot_block=2; msg3 empty
3866    /// - Block 4 partial: task has completed → drain returns both snapshots in msg4
3867    #[test_log::test(tokio::test)]
3868    async fn test_partial_mode_defers_brand_new_component_snapshot_to_next_block() {
3869        use std::time::Duration;
3870
3871        use tokio::{sync::mpsc::channel, time::timeout};
3872
3873        let bg_done = Arc::new(tokio::sync::Notify::new());
3874        let mut rpc_client = make_mock_client();
3875        // get_protocol_components for BrandNew + Preexisting (background task fires at block 3)
3876        rpc_client
3877            .expect_get_protocol_components()
3878            .withf(|params: &crate::rpc::ProtocolComponentsParams| {
3879                params
3880                    .component_ids()
3881                    .is_some_and(|ids| ids.contains(&"BrandNew".to_string()))
3882            })
3883            .returning(|_| {
3884                Ok(Page::new(
3885                    vec![
3886                        ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3887                        ProtocolComponent { id: "Preexisting".to_string(), ..Default::default() },
3888                    ],
3889                    2,
3890                    0,
3891                    100,
3892                ))
3893            });
3894        // get_protocol_components for initial sync
3895        rpc_client
3896            .expect_get_protocol_components()
3897            .returning(|_| {
3898                Ok(Page::new(
3899                    vec![
3900                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
3901                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
3902                    ],
3903                    2,
3904                    0,
3905                    100,
3906                ))
3907            });
3908        // Background task fires when block 3 arrives: snapshot at block 2 (3 - 1).
3909        let bg_done_clone = bg_done.clone();
3910        rpc_client
3911            .expect_get_snapshots()
3912            .withf(
3913                |request: &SnapshotParameters,
3914                 _chunk_size: &Option<usize>,
3915                 _concurrency: &usize| {
3916                    request.block_number == 2 &&
3917                        (request
3918                            .components
3919                            .contains_key("BrandNew") ||
3920                            request
3921                                .components
3922                                .contains_key("Preexisting"))
3923                },
3924            )
3925            .returning(move |_request, _chunk_size, _concurrency| {
3926                let snap = Ok(Snapshot {
3927                    states: [
3928                        (
3929                            "BrandNew".to_string(),
3930                            ComponentWithState {
3931                                state: ProtocolComponentState::new(
3932                                    "BrandNew",
3933                                    Default::default(),
3934                                    Default::default(),
3935                                ),
3936                                component: ProtocolComponent {
3937                                    id: "BrandNew".to_string(),
3938                                    ..Default::default()
3939                                },
3940                                component_tvl: Some(100.0),
3941                                entrypoints: vec![],
3942                            },
3943                        ),
3944                        (
3945                            "Preexisting".to_string(),
3946                            ComponentWithState {
3947                                state: ProtocolComponentState::new(
3948                                    "Preexisting",
3949                                    Default::default(),
3950                                    Default::default(),
3951                                ),
3952                                component: ProtocolComponent {
3953                                    id: "Preexisting".to_string(),
3954                                    ..Default::default()
3955                                },
3956                                component_tvl: Some(75.0),
3957                                entrypoints: vec![],
3958                            },
3959                        ),
3960                    ]
3961                    .into_iter()
3962                    .collect(),
3963                    vm_storage: HashMap::new(),
3964                });
3965                bg_done_clone.notify_one();
3966                snap
3967            });
3968        // get_snapshots for initial sync (block 0, Component1+Component2)
3969        rpc_client
3970            .expect_get_snapshots()
3971            .returning(|_request, _chunk_size, _concurrency| {
3972                Ok(Snapshot {
3973                    states: [
3974                        (
3975                            "Component1".to_string(),
3976                            ComponentWithState {
3977                                state: ProtocolComponentState::new(
3978                                    "Component1",
3979                                    Default::default(),
3980                                    Default::default(),
3981                                ),
3982                                component: ProtocolComponent {
3983                                    id: "Component1".to_string(),
3984                                    ..Default::default()
3985                                },
3986                                component_tvl: Some(100.0),
3987                                entrypoints: vec![],
3988                            },
3989                        ),
3990                        (
3991                            "Component2".to_string(),
3992                            ComponentWithState {
3993                                state: ProtocolComponentState::new(
3994                                    "Component2",
3995                                    Default::default(),
3996                                    Default::default(),
3997                                ),
3998                                component: ProtocolComponent {
3999                                    id: "Component2".to_string(),
4000                                    ..Default::default()
4001                                },
4002                                component_tvl: Some(0.0),
4003                                entrypoints: vec![],
4004                            },
4005                        ),
4006                    ]
4007                    .into_iter()
4008                    .collect(),
4009                    vm_storage: HashMap::new(),
4010                })
4011            });
4012        rpc_client
4013            .expect_get_traced_entry_points()
4014            .returning(|_| Ok(Page::new(HashMap::new(), 0, 0, 0)));
4015
4016        let mut deltas_client = MockDeltasClient::new();
4017        let (tx, rx) = channel(4);
4018        deltas_client
4019            .expect_subscribe()
4020            .return_once(move |_, _| Ok((Uuid::default(), rx)));
4021        deltas_client
4022            .expect_unsubscribe()
4023            .return_once(|_| Ok(()));
4024
4025        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
4026            .with_partial_blocks(true);
4027        state_sync
4028            .initialize()
4029            .await
4030            .expect("Init failed");
4031
4032        let (handle, mut block_rx) = state_sync.start().await;
4033        let (jh, close_tx) = handle.split();
4034
4035        // Block 1 (full): used for initial sync merge
4036        tx.send(make_block_changes(1, None))
4037            .await
4038            .unwrap();
4039        let _msg1 = timeout(Duration::from_millis(200), block_rx.recv())
4040            .await
4041            .expect("Should receive initial + block 1")
4042            .expect("Channel open")
4043            .expect("No error");
4044
4045        // Block 2 partial: BrandNew and Preexisting both appear. Neither task is fired yet —
4046        // both are added to deferred_snapshot_components.
4047        let mut block2 = make_block_changes(2, Some(2));
4048        block2.new_protocol_components = HashMap::from([(
4049            "BrandNew".to_string(),
4050            ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
4051        )]);
4052        block2.component_tvl =
4053            HashMap::from([("BrandNew".to_string(), 100.0), ("Preexisting".to_string(), 75.0)]);
4054        tx.send(block2).await.unwrap();
4055        let msg2 = timeout(Duration::from_millis(200), block_rx.recv())
4056            .await
4057            .expect("Should receive block 2")
4058            .expect("Channel open")
4059            .expect("No error");
4060
4061        assert!(
4062            !msg2
4063                .snapshots
4064                .states
4065                .contains_key("Preexisting"),
4066            "Preexisting should still be deferred in block 2, not yet snapshotted; got: {:?}",
4067            msg2.snapshots
4068                .states
4069                .keys()
4070                .collect::<Vec<_>>()
4071        );
4072        assert!(
4073            !msg2
4074                .snapshots
4075                .states
4076                .contains_key("BrandNew"),
4077            "BrandNew should still be deferred in block 2, not yet snapshotted"
4078        );
4079
4080        // Block 3 partial: block number increments → deferred components fire as background task
4081        // at snapshot_block=2. msg3 has no snapshots (task just spawned).
4082        tx.send(make_block_changes(3, Some(1)))
4083            .await
4084            .unwrap();
4085        let msg3 = timeout(Duration::from_millis(200), block_rx.recv())
4086            .await
4087            .expect("Should receive block 3")
4088            .expect("Channel open")
4089            .expect("No error");
4090
4091        assert_eq!(msg3.header.number, 3);
4092        assert_eq!(msg3.header.partial_block_index, Some(1));
4093        assert!(
4094            !msg3
4095                .snapshots
4096                .states
4097                .contains_key("BrandNew"),
4098            "BrandNew task just fired; snapshot not yet available in msg3"
4099        );
4100        assert!(
4101            !msg3
4102                .snapshots
4103                .states
4104                .contains_key("Preexisting"),
4105            "Preexisting task just fired; snapshot not yet available in msg3"
4106        );
4107
4108        // Wait for the background snapshot task to complete before the next block arrives.
4109        bg_done.notified().await;
4110
4111        // Block 4 partial: drain finds the completed task → both snapshots present in msg4.
4112        tx.send(make_block_changes(4, Some(0)))
4113            .await
4114            .unwrap();
4115        let msg4 = timeout(Duration::from_millis(200), block_rx.recv())
4116            .await
4117            .expect("Should receive block 4")
4118            .expect("Channel open")
4119            .expect("No error");
4120
4121        assert_eq!(msg4.header.number, 4);
4122        assert_eq!(msg4.header.partial_block_index, Some(0));
4123        assert!(
4124            msg4.snapshots
4125                .states
4126                .contains_key("BrandNew"),
4127            "BrandNew snapshot should be in msg4 after background task drains; got keys: {:?}",
4128            msg4.snapshots
4129                .states
4130                .keys()
4131                .collect::<Vec<_>>()
4132        );
4133        assert!(
4134            msg4.snapshots
4135                .states
4136                .contains_key("Preexisting"),
4137            "Preexisting snapshot should be in msg4 after background task drains; got keys: {:?}",
4138            msg4.snapshots
4139                .states
4140                .keys()
4141                .collect::<Vec<_>>()
4142        );
4143
4144        let _ = close_tx.send(());
4145        jh.await.expect("Task should not panic");
4146    }
4147
4148    /// Directly exercises all four mutation paths of `apply_deltas_to_snapshot`:
4149    /// attribute update, attribute deletion, balance merge, and VM slot/balance/code overwrite.
4150    /// Also verifies that deltas at or before `snapshot_block` are skipped.
4151    #[test]
4152    fn test_apply_deltas_to_snapshot() {
4153        use tycho_common::models::{
4154            contract::{Account, AccountDelta},
4155            protocol::{ComponentBalance, ProtocolComponentStateDelta},
4156            ChangeType,
4157        };
4158
4159        let contract_addr = Bytes::from("0xc0ffee");
4160        let token_addr = Bytes::from("0xdeadbeef");
4161
4162        // Build the snapshot at block 5: one component with attributes and balances,
4163        // one VM contract with slots and native balance.
4164        let mut snapshot = Snapshot {
4165            states: [(
4166                "comp1".to_string(),
4167                ComponentWithState {
4168                    state: ProtocolComponentState::new(
4169                        "comp1",
4170                        [
4171                            ("keep".to_string(), Bytes::from("0x01")),
4172                            ("delete_me".to_string(), Bytes::from("0x02")),
4173                        ]
4174                        .into_iter()
4175                        .collect(),
4176                        [(token_addr.clone(), Bytes::from("0x64"))]
4177                            .into_iter()
4178                            .collect(),
4179                    ),
4180                    component: ProtocolComponent::default(),
4181                    component_tvl: None,
4182                    entrypoints: vec![],
4183                },
4184            )]
4185            .into_iter()
4186            .collect(),
4187            vm_storage: [(
4188                contract_addr.clone(),
4189                Account {
4190                    chain: Chain::Ethereum,
4191                    address: contract_addr.clone(),
4192                    title: String::new(),
4193                    slots: [(Bytes::from("0x01"), Bytes::from("0xaa"))]
4194                        .into_iter()
4195                        .collect(),
4196                    native_balance: Bytes::from("0x10"),
4197                    token_balances: HashMap::new(),
4198                    code: Bytes::from("0x0a0b"),
4199                    code_hash: Default::default(),
4200                    balance_modify_tx: Default::default(),
4201                    code_modify_tx: Default::default(),
4202                    creation_tx: None,
4203                },
4204            )]
4205            .into_iter()
4206            .collect(),
4207        };
4208
4209        // Two buffered deltas: block 5 (at snapshot_block, must be skipped) and block 6
4210        // (after snapshot_block, must be applied).
4211        let skipped_delta = BlockAggregatedChanges {
4212            block: Block { number: 5, ..Default::default() },
4213            state_deltas: [(
4214                "comp1".to_string(),
4215                ProtocolComponentStateDelta::new(
4216                    "comp1",
4217                    [("keep".to_string(), Bytes::from("0xff"))]
4218                        .into_iter()
4219                        .collect(),
4220                    HashSet::new(),
4221                ),
4222            )]
4223            .into_iter()
4224            .collect(),
4225            ..Default::default()
4226        };
4227        let applied_delta = BlockAggregatedChanges {
4228            block: Block { number: 6, ..Default::default() },
4229            state_deltas: [(
4230                "comp1".to_string(),
4231                ProtocolComponentStateDelta::new(
4232                    "comp1",
4233                    [("keep".to_string(), Bytes::from("0x99"))]
4234                        .into_iter()
4235                        .collect(),
4236                    ["delete_me".to_string()]
4237                        .into_iter()
4238                        .collect(),
4239                ),
4240            )]
4241            .into_iter()
4242            .collect(),
4243            component_balances: [(
4244                "comp1".to_string(),
4245                [(
4246                    token_addr.clone(),
4247                    ComponentBalance::new(
4248                        token_addr.clone(),
4249                        Bytes::from("0xc8"),
4250                        200.0,
4251                        Default::default(),
4252                        "comp1",
4253                    ),
4254                )]
4255                .into_iter()
4256                .collect(),
4257            )]
4258            .into_iter()
4259            .collect(),
4260            account_deltas: [(
4261                contract_addr.clone(),
4262                AccountDelta::new(
4263                    Chain::Ethereum,
4264                    contract_addr.clone(),
4265                    [
4266                        (Bytes::from("0x01"), Some(Bytes::from("0xbb"))),
4267                        (Bytes::from("0x02"), Some(Bytes::from("0xcc"))),
4268                    ]
4269                    .into_iter()
4270                    .collect(),
4271                    Some(Bytes::from("0x20")),
4272                    Some(Bytes::from("0x0c0d")),
4273                    ChangeType::Update,
4274                ),
4275            )]
4276            .into_iter()
4277            .collect(),
4278            ..Default::default()
4279        };
4280
4281        let mut sync = with_mocked_clients(true, false, None, None);
4282        sync.buffered_deltas = vec![skipped_delta, applied_delta];
4283
4284        let contract_ids: HashSet<Bytes> = [contract_addr.clone()]
4285            .into_iter()
4286            .collect();
4287        sync.apply_deltas_to_snapshot(&mut snapshot, 5, &contract_ids);
4288
4289        let comp = &snapshot.states["comp1"].state;
4290
4291        // Attribute update applied
4292        assert_eq!(comp.attributes["keep"], Bytes::from("0x99"));
4293        // Attribute deletion applied
4294        assert!(!comp
4295            .attributes
4296            .contains_key("delete_me"));
4297        // Balance merge applied
4298        assert_eq!(comp.balances[&token_addr], Bytes::from("0xc8"));
4299
4300        let account = &snapshot.vm_storage[&contract_addr];
4301        // Existing slot overwritten, new slot added
4302        assert_eq!(account.slots[&Bytes::from("0x01")], Bytes::from("0xbb"));
4303        assert_eq!(account.slots[&Bytes::from("0x02")], Bytes::from("0xcc"));
4304        // Native balance updated
4305        assert_eq!(account.native_balance, Bytes::from("0x20"));
4306        // Code updated
4307        assert_eq!(account.code, Bytes::from("0x0c0d"));
4308    }
4309}