Skip to main content

tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59
60    /// Internal error that should not happen under normal operation.
61    #[error("Internal error: {0}")]
62    Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68    fn from(err: SendError<T>) -> Self {
69        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70    }
71}
72
73impl From<DeltasError> for SynchronizerError {
74    fn from(err: DeltasError) -> Self {
75        match err {
76            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77            _ => SynchronizerError::ConnectionError(err.to_string()),
78        }
79    }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83    extractor_id: ExtractorIdentity,
84    retrieve_balances: bool,
85    rpc_client: R,
86    deltas_client: D,
87    max_retries: u64,
88    retry_cooldown: Duration,
89    include_snapshots: bool,
90    component_tracker: ComponentTracker<R>,
91    last_synced_block: Option<BlockHeader>,
92    timeout: u64,
93    include_tvl: bool,
94    compression: bool,
95    partial_blocks: bool,
96    uses_dci: bool,
97    /// Brand-new components (in new_protocol_components) whose snapshot we deferred until the
98    /// first message of the next block. Only used when partial_blocks is true.
99    deferred_snapshot_components: Vec<String>,
100}
101
102#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
103pub struct ComponentWithState {
104    pub state: ResponseProtocolState,
105    pub component: ProtocolComponent,
106    pub component_tvl: Option<f64>,
107    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
108}
109
110#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
111pub struct Snapshot {
112    pub states: HashMap<String, ComponentWithState>,
113    pub vm_storage: HashMap<Bytes, ResponseAccount>,
114}
115
116impl Snapshot {
117    fn extend(&mut self, other: Snapshot) {
118        self.states.extend(other.states);
119        self.vm_storage.extend(other.vm_storage);
120    }
121
122    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
123        &self.states
124    }
125
126    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
127        &self.vm_storage
128    }
129}
130
131#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
132pub struct StateSyncMessage<H>
133where
134    H: HeaderLike,
135{
136    /// The block information for this update.
137    pub header: H,
138    /// Snapshot for new components.
139    pub snapshots: Snapshot,
140    /// A single delta contains state updates for all tracked components, as well as additional
141    /// information about the system components e.g. newly added components (even below tvl), tvl
142    /// updates, balance updates.
143    pub deltas: Option<BlockChanges>,
144    /// Components that stopped being tracked.
145    pub removed_components: HashMap<String, ProtocolComponent>,
146}
147
148impl<H> StateSyncMessage<H>
149where
150    H: HeaderLike,
151{
152    pub fn merge(mut self, other: Self) -> Self {
153        // be careful with removed and snapshots attributes here, these can be ambiguous.
154        self.removed_components
155            .retain(|k, _| !other.snapshots.states.contains_key(k));
156        self.snapshots
157            .states
158            .retain(|k, _| !other.removed_components.contains_key(k));
159
160        self.snapshots.extend(other.snapshots);
161        let deltas = match (self.deltas, other.deltas) {
162            (Some(l), Some(r)) => Some(l.merge(r)),
163            (None, Some(r)) => Some(r),
164            (Some(l), None) => Some(l),
165            (None, None) => None,
166        };
167        self.removed_components
168            .extend(other.removed_components);
169        Self {
170            header: other.header,
171            snapshots: self.snapshots,
172            deltas,
173            removed_components: self.removed_components,
174        }
175    }
176}
177
178/// Handle for controlling a running synchronizer task.
179///
180/// This handle provides methods to gracefully shut down the synchronizer
181/// and await its completion with a timeout.
182pub struct SynchronizerTaskHandle {
183    join_handle: JoinHandle<()>,
184    close_tx: oneshot::Sender<()>,
185}
186
187/// StateSynchronizer
188///
189/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
190/// delivering messages to the client that let him reconstruct subsets of the protocol state.
191///
192/// This involves deciding which components to track according to the clients preferences,
193/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
194/// delivering delta messages for the components that have changed.
195impl SynchronizerTaskHandle {
196    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
197        Self { join_handle, close_tx }
198    }
199
200    /// Splits the handle into its join handle and close sender.
201    ///
202    /// This allows monitoring the task completion separately from controlling shutdown.
203    /// The join handle can be used with FuturesUnordered for monitoring, while the
204    /// close sender can be used to signal graceful shutdown.
205    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
206        (self.join_handle, self.close_tx)
207    }
208}
209
210#[async_trait]
211pub trait StateSynchronizer: Send + Sync + 'static {
212    async fn initialize(&mut self) -> SyncResult<()>;
213    /// Starts the state synchronization, consuming the synchronizer.
214    /// Returns a handle for controlling the running task and a receiver for messages.
215    async fn start(
216        mut self,
217    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
218}
219
220impl<R, D> ProtocolStateSynchronizer<R, D>
221where
222    // TODO: Consider moving these constraints directly to the
223    // client...
224    R: RPCClient + Clone + Send + Sync + 'static,
225    D: DeltasClient + Clone + Send + Sync + 'static,
226{
227    /// Creates a new state synchronizer.
228    #[allow(clippy::too_many_arguments)]
229    pub fn new(
230        extractor_id: ExtractorIdentity,
231        retrieve_balances: bool,
232        component_filter: ComponentFilter,
233        max_retries: u64,
234        retry_cooldown: Duration,
235        include_snapshots: bool,
236        include_tvl: bool,
237        compression: bool,
238        rpc_client: R,
239        deltas_client: D,
240        timeout: u64,
241    ) -> Self {
242        Self {
243            extractor_id: extractor_id.clone(),
244            retrieve_balances,
245            rpc_client: rpc_client.clone(),
246            include_snapshots,
247            deltas_client,
248            component_tracker: ComponentTracker::new(
249                extractor_id.chain,
250                extractor_id.name.as_str(),
251                component_filter,
252                rpc_client,
253            ),
254            max_retries,
255            retry_cooldown,
256            last_synced_block: None,
257            timeout,
258            include_tvl,
259            compression,
260            partial_blocks: false,
261            uses_dci: false,
262            deferred_snapshot_components: Vec::new(),
263        }
264    }
265
266    /// Sets whether this protocol uses Dynamic Contract Indexing (DCI).
267    /// When true, entrypoints will be fetched during snapshot retrieval.
268    pub fn with_dci(mut self, uses_dci: bool) -> Self {
269        self.uses_dci = uses_dci;
270        self
271    }
272
273    /// Enables receiving partial block updates.
274    pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
275        self.partial_blocks = partial_blocks;
276        self
277    }
278
279    /// When using partial blocks, brand-new components are deferred until the first message of the
280    /// next block. This returns snapshots for those deferred components if we just advanced to a
281    /// new block; otherwise returns an empty snapshot. For full blocks this always returns empty.
282    async fn take_flushed_deferred_snapshots(
283        &mut self,
284        header: &BlockHeader,
285    ) -> SyncResult<Snapshot> {
286        if !self.partial_blocks {
287            return Ok(Snapshot::default());
288        }
289        let prev = match self
290            .last_synced_block
291            .as_ref()
292            .filter(|p| header.number > p.number)
293        {
294            Some(p) => p,
295            None => return Ok(Snapshot::default()),
296        };
297        let deferred = std::mem::take(&mut self.deferred_snapshot_components);
298        if deferred.is_empty() {
299            return Ok(Snapshot::default());
300        }
301        let snapshot_header =
302            BlockHeader { number: prev.number, hash: prev.hash.clone(), ..Default::default() };
303        let msg = self
304            .get_snapshots(snapshot_header, Some(&deferred))
305            .await?;
306        Ok(msg.snapshots)
307    }
308
309    /// Retrieves state snapshots of the requested components
310    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
311        &mut self,
312        header: BlockHeader,
313        ids: Option<I>,
314    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
315        if !self.include_snapshots {
316            return Ok(StateSyncMessage { header, ..Default::default() });
317        }
318
319        // Use given ids or use all if not passed
320        let component_ids: Vec<_> = match ids {
321            Some(ids) => ids.into_iter().cloned().collect(),
322            None => self
323                .component_tracker
324                .get_tracked_component_ids(),
325        };
326
327        if component_ids.is_empty() {
328            return Ok(StateSyncMessage { header, ..Default::default() });
329        }
330
331        let entrypoints_result = if self.uses_dci {
332            let result = self
333                .rpc_client
334                .get_traced_entry_points_paginated(
335                    self.extractor_id.chain,
336                    &self.extractor_id.name,
337                    &component_ids,
338                    None,
339                    RPC_CLIENT_CONCURRENCY,
340                )
341                .await?;
342            self.component_tracker
343                .process_entrypoints(&result.clone().into());
344            result.traced_entry_points.clone()
345        } else {
346            HashMap::new()
347        };
348
349        // Get contract IDs from component tracker
350        let contract_ids: Vec<Bytes> = self
351            .component_tracker
352            .get_contracts_by_component(&component_ids)
353            .into_iter()
354            .collect();
355
356        // Filter components to only include the requested ones
357        let filtered_components: HashMap<_, _> = self
358            .component_tracker
359            .components
360            .iter()
361            .filter(|(id, _)| component_ids.contains(id))
362            .map(|(k, v)| (k.clone(), v.clone()))
363            .collect();
364
365        let request = SnapshotParameters::new(
366            self.extractor_id.chain,
367            &self.extractor_id.name,
368            &filtered_components,
369            &contract_ids,
370            header.number,
371        )
372        .entrypoints(&entrypoints_result)
373        .include_balances(self.retrieve_balances)
374        .include_tvl(self.include_tvl);
375        let snapshot_response = self
376            .rpc_client
377            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
378            .await?;
379
380        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
381        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
382
383        Ok(StateSyncMessage {
384            header,
385            snapshots: snapshot_response,
386            deltas: None,
387            removed_components: HashMap::new(),
388        })
389    }
390
391    /// Main method that does all the work.
392    ///
393    /// ## Return Value
394    ///
395    /// Returns a `Result` where:
396    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
397    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
398    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
399    ///   retry)
400    ///
401    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
402    /// signal may still arrive and we want to remain cancellable across retries.
403    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
404    async fn state_sync(
405        &mut self,
406        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
407        mut end_rx: oneshot::Receiver<()>,
408    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
409        // initialisation
410        let subscription_options = SubscriptionOptions::new()
411            .with_state(self.include_snapshots)
412            .with_compression(self.compression)
413            .with_partial_blocks(self.partial_blocks);
414        let (subscription_id, mut msg_rx) = match self
415            .deltas_client
416            .subscribe(self.extractor_id.clone(), subscription_options)
417            .await
418        {
419            Ok(result) => result,
420            Err(e) => return Err((e.into(), Some(end_rx))),
421        };
422
423        let result = async {
424            info!("Waiting for deltas...");
425            let mut warned_waiting_for_new_block = false;
426            let mut warned_skipping_synced = false;
427            // Track the last seen block number such that we know when we get the first partial
428            let mut last_block_number: Option<u64> = None;
429            let mut first_msg = loop {
430                let msg = select! {
431                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
432                        deltas_result
433                            .map_err(|_| {
434                                SynchronizerError::Timeout(format!(
435                                    "First deltas took longer than {t}s to arrive",
436                                    t = self.timeout
437                                ))
438                            })?
439                            .ok_or_else(|| {
440                                SynchronizerError::ConnectionError(
441                                    "Deltas channel closed before first message".to_string(),
442                                )
443                            })?
444                    },
445                    _ = &mut end_rx => {
446                        info!("Received close signal while waiting for first deltas");
447                        return Ok(());
448                    }
449                };
450
451                let incoming: BlockHeader = (&msg).into();
452
453                // Determine if this message is a candidate for starting synchronization.
454                // In partial mode, we wait for a new block to start (block number increases).
455                // In non-partial mode, all messages are candidates.
456                let is_new_block_candidate = if self.partial_blocks {
457                    match msg.partial_block_index {
458                        None => {
459                            // If we get a full block, it is a candidate
460                            last_block_number = Some(incoming.number);
461                            true
462                        }
463                        Some(current_partial_idx) => {
464                            let is_new_block = last_block_number
465                                .map(|prev_block| incoming.number > prev_block)
466                                .unwrap_or(false);
467
468                            if !warned_waiting_for_new_block {
469                                info!(
470                                    extractor=%self.extractor_id,
471                                    block=incoming.number,
472                                    partial_idx=current_partial_idx,
473                                    "Syncing. Waiting for new block to start"
474                                );
475                                warned_waiting_for_new_block = true;
476                            }
477                            last_block_number = Some(incoming.number);
478                            is_new_block
479                        }
480                    }
481                } else {
482                    true // Non-partial mode: all messages are candidates
483                };
484
485                if !is_new_block_candidate {
486                    continue;
487                }
488
489                // Check if we've already synced this block (applies to both modes)
490                if let Some(current) = &self.last_synced_block {
491                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
492                        if !warned_skipping_synced {
493                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
494                            warned_skipping_synced = true;
495                        }
496                        continue;
497                    }
498                }
499                break msg;
500            };
501
502            self.filter_deltas(&mut first_msg);
503
504            // initial snapshot
505            info!(height = first_msg.get_block().number, "First deltas received");
506            let header: BlockHeader = (&first_msg).into();
507            let deltas_msg = StateSyncMessage {
508                header: header.clone(),
509                snapshots: Default::default(),
510                deltas: Some(first_msg),
511                removed_components: Default::default(),
512            };
513
514            // If possible skip retrieving snapshots
515            let msg = if !self.is_next_expected(&header) {
516                info!("Retrieving snapshot");
517                // With partial blocks, the server only has full blocks in its buffer; pass the
518                // previous block's header so we request state at N-1, then merge with deltas.
519                let snapshot_header = if self.partial_blocks && header.number > 0 {
520                    BlockHeader {
521                        number: header.number - 1,
522                        hash: header.parent_hash.clone(),
523                        ..Default::default()
524                    }
525                } else {
526                    BlockHeader { revert: false, ..header.clone() }
527                };
528                let snapshot = self
529                    .get_snapshots::<Vec<&String>>(snapshot_header, None)
530                    .await?
531                    .merge(deltas_msg);
532                let n_components = self.component_tracker.components.len();
533                let n_snapshots = snapshot.snapshots.states.len();
534                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
535                snapshot
536            } else {
537                deltas_msg
538            };
539            block_tx.send(Ok(msg)).await?;
540            self.last_synced_block = Some(header.clone());
541            loop {
542                select! {
543                    deltas_opt = msg_rx.recv() => {
544                        if let Some(mut deltas) = deltas_opt {
545                            let header: BlockHeader = (&deltas).into();
546                            debug!(block_number=?header.number, "Received delta message");
547
548                            let flushed_snapshots =
549                                self.take_flushed_deferred_snapshots(&header).await?;
550
551                            let (snapshots, removed_components) = {
552                                // 1. Remove components based on latest changes
553                                // 2. Add components based on latest changes, query those for snapshots
554                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
555
556                                // Only components we don't track yet need a snapshot.
557                                let requiring_snapshot: Vec<String> = to_add
558                                    .iter()
559                                    .filter(|id| {
560                                        !self.component_tracker
561                                            .components
562                                            .contains_key(id.as_str())
563                                    })
564                                    .map(|id| id.to_string())
565                                    .collect();
566                                debug!(components=?requiring_snapshot, "SnapshotRequest");
567                                let requiring_snapshot_refs: Vec<&String> = requiring_snapshot.iter().collect();
568                                self.component_tracker
569                                    .start_tracking(&requiring_snapshot_refs)
570                                    .await?;
571
572                                // When partial_blocks: defer brand-new to next block's first message; only preexisting get snapshot now.
573                                let request_now: Vec<String> = if self.partial_blocks {
574                                    let (preexisting, brand_new) = requiring_snapshot
575                                        .into_iter()
576                                        .partition(|id| {
577                                            !deltas.new_protocol_components.contains_key(id.as_str())
578                                        });
579                                    self.deferred_snapshot_components.extend(brand_new);
580                                    preexisting
581                                } else {
582                                    requiring_snapshot
583                                };
584
585                                let mut snapshots = if request_now.is_empty() {
586                                    Snapshot::default()
587                                } else {
588                                    let snapshot_header = if self.partial_blocks && header.number > 0
589                                    {
590                                        // get snapshot at the previous block (current block is only partial and not available for querying)
591                                        BlockHeader {
592                                            number: header.number - 1,
593                                            hash: header.parent_hash.clone(),
594                                            ..Default::default()
595                                        }
596                                    } else {
597                                        BlockHeader { revert: false, ..header.clone() }
598                                    };
599                                    self.get_snapshots(snapshot_header, Some(&request_now))
600                                        .await?
601                                        .snapshots
602                                };
603
604                                snapshots.extend(flushed_snapshots);
605
606                                let removed_components = if !to_remove.is_empty() {
607                                    self.component_tracker.stop_tracking(&to_remove)
608                                } else {
609                                    Default::default()
610                                };
611
612                                (snapshots, removed_components)
613                            };
614
615                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
616                            self.component_tracker.process_entrypoints(&deltas.dci_update);
617
618                            // 4. Filter deltas by currently tracked components / contracts
619                            self.filter_deltas(&mut deltas);
620                            let n_changes = deltas.n_changes();
621
622                            // 5. Send the message
623                            let next = StateSyncMessage {
624                                header: header.clone(),
625                                snapshots,
626                                deltas: Some(deltas),
627                                removed_components,
628                            };
629                            block_tx.send(Ok(next)).await?;
630                            self.last_synced_block = Some(header.clone());
631
632                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
633                        } else {
634                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
635                        }
636                    },
637                    _ = &mut end_rx => {
638                        info!("Received close signal during state_sync");
639                        return Ok(());
640                    }
641                }
642            }
643        }.await;
644
645        // This cleanup code now runs regardless of how the function exits (error or channel close)
646        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
647        //Ignore error
648        let _ = self
649            .deltas_client
650            .unsubscribe(subscription_id)
651            .await
652            .map_err(|err| {
653                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
654            });
655
656        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
657        // whether the end_rx was consumed (close signal received) or not
658        match result {
659            Ok(()) => Ok(()), // Success, likely due to close signal
660            Err(e) => {
661                // The error came from the inner async block. Since the async block
662                // can receive close signals (which would return Ok), any error means
663                // the close signal was NOT received, so we can return the end_rx for retry
664                Err((e, Some(end_rx)))
665            }
666        }
667    }
668
669    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
670        if let Some(block) = self.last_synced_block.as_ref() {
671            return incoming.parent_hash == block.hash;
672        }
673        false
674    }
675    fn filter_deltas(&self, deltas: &mut BlockChanges) {
676        deltas.filter_by_component(|id| {
677            self.component_tracker
678                .components
679                .contains_key(id)
680        });
681        deltas.filter_by_contract(|id| {
682            self.component_tracker
683                .contracts
684                .contains(id)
685        });
686    }
687}
688
689#[async_trait]
690impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
691where
692    R: RPCClient + Clone + Send + Sync + 'static,
693    D: DeltasClient + Clone + Send + Sync + 'static,
694{
695    async fn initialize(&mut self) -> SyncResult<()> {
696        info!("Retrieving relevant protocol components");
697        self.component_tracker
698            .initialise_components()
699            .await?;
700        info!(
701            n_components = self.component_tracker.components.len(),
702            n_contracts = self.component_tracker.contracts.len(),
703            extractor = %self.extractor_id,
704            "Finished retrieving components",
705        );
706
707        Ok(())
708    }
709
710    async fn start(
711        mut self,
712    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
713        let (mut tx, rx) = channel(15);
714        let (end_tx, end_rx) = oneshot::channel::<()>();
715
716        let jh = tokio::spawn(async move {
717            let mut retry_count = 0;
718            let mut current_end_rx = end_rx;
719            let mut final_error = None;
720
721            while retry_count < self.max_retries {
722                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
723
724                let res = self
725                    .state_sync(&mut tx, current_end_rx)
726                    .await;
727                match res {
728                    Ok(()) => {
729                        info!(
730                            extractor_id=%&self.extractor_id,
731                            retry_count,
732                            "State synchronization exited cleanly"
733                        );
734                        return;
735                    }
736                    Err((e, maybe_end_rx)) => {
737                        warn!(
738                            extractor_id=%&self.extractor_id,
739                            retry_count,
740                            error=%e,
741                            "State synchronization errored!"
742                        );
743
744                        // If we have the end_rx back, we can retry
745                        if let Some(recovered_end_rx) = maybe_end_rx {
746                            current_end_rx = recovered_end_rx;
747
748                            if let SynchronizerError::ConnectionClosed = e {
749                                // break synchronization loop if websocket client is dead
750                                error!(
751                                    "Websocket connection closed. State synchronization exiting."
752                                );
753                                let _ = tx.send(Err(e)).await;
754                                return;
755                            } else {
756                                // Store error in case this is our last retry
757                                final_error = Some(e);
758                            }
759                        } else {
760                            // Close signal was received, exit cleanly
761                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
762                            return;
763                        }
764                    }
765                }
766                sleep(self.retry_cooldown).await;
767                retry_count += 1;
768            }
769            if let Some(e) = final_error {
770                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
771                let _ = tx.send(Err(e)).await;
772            }
773        });
774
775        let handle = SynchronizerTaskHandle::new(jh, end_tx);
776        (handle, rx)
777    }
778}
779
780#[cfg(test)]
781mod test {
782    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
783    //!
784    //! ## Test Coverage Strategy:
785    //!
786    //! ### Shutdown & Close Signal Tests:
787    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
788    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
789    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
790    //!
791    //! ### Cleanup & Error Handling Tests:
792    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
793    //!
794    //! ### Coverage Summary:
795    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
796    //! ✓ Close signal before first deltas   ✓ Close signal during processing
797    //! ✓ Processing errors                  ✓ Channel closure
798    //! ✓ Public API close operations        ✓ Normal completion
799
800    use std::{collections::HashSet, sync::Arc};
801
802    use tycho_common::dto::{
803        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
804        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
805        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
806        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
807        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
808        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
809    };
810    use uuid::Uuid;
811
812    use super::*;
813    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
814
815    // Required for mock client to implement clone
816    struct ArcRPCClient<T>(Arc<T>);
817
818    // Default derive(Clone) does require T to be Clone as well.
819    impl<T> Clone for ArcRPCClient<T> {
820        fn clone(&self) -> Self {
821            ArcRPCClient(self.0.clone())
822        }
823    }
824
825    #[async_trait]
826    impl<T> RPCClient for ArcRPCClient<T>
827    where
828        T: RPCClient + Sync + Send + 'static,
829    {
830        async fn get_tokens(
831            &self,
832            request: &TokensRequestBody,
833        ) -> Result<TokensRequestResponse, RPCError> {
834            self.0.get_tokens(request).await
835        }
836
837        async fn get_contract_state(
838            &self,
839            request: &StateRequestBody,
840        ) -> Result<StateRequestResponse, RPCError> {
841            self.0.get_contract_state(request).await
842        }
843
844        async fn get_protocol_components(
845            &self,
846            request: &ProtocolComponentsRequestBody,
847        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
848            self.0
849                .get_protocol_components(request)
850                .await
851        }
852
853        async fn get_protocol_states(
854            &self,
855            request: &ProtocolStateRequestBody,
856        ) -> Result<ProtocolStateRequestResponse, RPCError> {
857            self.0
858                .get_protocol_states(request)
859                .await
860        }
861
862        async fn get_protocol_systems(
863            &self,
864            request: &ProtocolSystemsRequestBody,
865        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
866            self.0
867                .get_protocol_systems(request)
868                .await
869        }
870
871        async fn get_component_tvl(
872            &self,
873            request: &ComponentTvlRequestBody,
874        ) -> Result<ComponentTvlRequestResponse, RPCError> {
875            self.0.get_component_tvl(request).await
876        }
877
878        async fn get_traced_entry_points(
879            &self,
880            request: &TracedEntryPointRequestBody,
881        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
882            self.0
883                .get_traced_entry_points(request)
884                .await
885        }
886
887        async fn get_snapshots<'a>(
888            &self,
889            request: &SnapshotParameters<'a>,
890            chunk_size: Option<usize>,
891            concurrency: usize,
892        ) -> Result<Snapshot, RPCError> {
893            self.0
894                .get_snapshots(request, chunk_size, concurrency)
895                .await
896        }
897
898        fn compression(&self) -> bool {
899            self.0.compression()
900        }
901    }
902
903    // Required for mock client to implement clone
904    struct ArcDeltasClient<T>(Arc<T>);
905
906    // Default derive(Clone) does require T to be Clone as well.
907    impl<T> Clone for ArcDeltasClient<T> {
908        fn clone(&self) -> Self {
909            ArcDeltasClient(self.0.clone())
910        }
911    }
912
913    #[async_trait]
914    impl<T> DeltasClient for ArcDeltasClient<T>
915    where
916        T: DeltasClient + Sync + Send + 'static,
917    {
918        async fn subscribe(
919            &self,
920            extractor_id: ExtractorIdentity,
921            options: SubscriptionOptions,
922        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
923            self.0
924                .subscribe(extractor_id, options)
925                .await
926        }
927
928        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
929            self.0
930                .unsubscribe(subscription_id)
931                .await
932        }
933
934        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
935            self.0.connect().await
936        }
937
938        async fn close(&self) -> Result<(), DeltasError> {
939            self.0.close().await
940        }
941    }
942
943    fn with_mocked_clients(
944        native: bool,
945        include_tvl: bool,
946        rpc_client: Option<MockRPCClient>,
947        deltas_client: Option<MockDeltasClient>,
948    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
949    {
950        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
951        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
952
953        ProtocolStateSynchronizer::new(
954            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
955            native,
956            ComponentFilter::with_tvl_range(50.0, 50.0),
957            1,
958            Duration::from_secs(0),
959            true,
960            include_tvl,
961            true, // Does not matter as we mock the client that never compresses
962            rpc_client,
963            deltas_client,
964            10_u64,
965        )
966    }
967
968    fn state_snapshot_native() -> ProtocolStateRequestResponse {
969        ProtocolStateRequestResponse {
970            states: vec![ResponseProtocolState {
971                component_id: "Component1".to_string(),
972                ..Default::default()
973            }],
974            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
975        }
976    }
977
978    fn make_mock_client() -> MockRPCClient {
979        let mut m = MockRPCClient::new();
980        m.expect_compression()
981            .return_const(false);
982        m
983    }
984
985    #[test_log::test(tokio::test)]
986    async fn test_get_snapshots_native() {
987        let header = BlockHeader::default();
988        let mut rpc = make_mock_client();
989        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
990
991        let component_clone = component.clone();
992        rpc.expect_get_snapshots()
993            .returning(move |_request, _chunk_size, _concurrency| {
994                Ok(Snapshot {
995                    states: state_snapshot_native()
996                        .states
997                        .into_iter()
998                        .map(|state| {
999                            (
1000                                state.component_id.clone(),
1001                                ComponentWithState {
1002                                    state,
1003                                    component: component_clone.clone(),
1004                                    entrypoints: vec![],
1005                                    component_tvl: None,
1006                                },
1007                            )
1008                        })
1009                        .collect(),
1010                    vm_storage: HashMap::new(),
1011                })
1012            });
1013
1014        rpc.expect_get_traced_entry_points()
1015            .returning(|_| {
1016                Ok(TracedEntryPointRequestResponse {
1017                    traced_entry_points: HashMap::new(),
1018                    pagination: PaginationResponse::new(0, 20, 0),
1019                })
1020            });
1021
1022        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1023        state_sync
1024            .component_tracker
1025            .components
1026            .insert("Component1".to_string(), component.clone());
1027        let components_arg = ["Component1".to_string()];
1028        let exp = StateSyncMessage {
1029            header: header.clone(),
1030            snapshots: Snapshot {
1031                states: state_snapshot_native()
1032                    .states
1033                    .into_iter()
1034                    .map(|state| {
1035                        (
1036                            state.component_id.clone(),
1037                            ComponentWithState {
1038                                state,
1039                                component: component.clone(),
1040                                entrypoints: vec![],
1041                                component_tvl: None,
1042                            },
1043                        )
1044                    })
1045                    .collect(),
1046                vm_storage: HashMap::new(),
1047            },
1048            deltas: None,
1049            removed_components: Default::default(),
1050        };
1051
1052        let snap = state_sync
1053            .get_snapshots(header, Some(&components_arg))
1054            .await
1055            .expect("Retrieving snapshot failed");
1056
1057        assert_eq!(snap, exp);
1058    }
1059
1060    #[test_log::test(tokio::test)]
1061    async fn test_get_snapshots_native_with_tvl() {
1062        let header = BlockHeader::default();
1063        let mut rpc = make_mock_client();
1064        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1065
1066        let component_clone = component.clone();
1067        rpc.expect_get_snapshots()
1068            .returning(move |_request, _chunk_size, _concurrency| {
1069                Ok(Snapshot {
1070                    states: state_snapshot_native()
1071                        .states
1072                        .into_iter()
1073                        .map(|state| {
1074                            (
1075                                state.component_id.clone(),
1076                                ComponentWithState {
1077                                    state,
1078                                    component: component_clone.clone(),
1079                                    component_tvl: Some(100.0),
1080                                    entrypoints: vec![],
1081                                },
1082                            )
1083                        })
1084                        .collect(),
1085                    vm_storage: HashMap::new(),
1086                })
1087            });
1088
1089        rpc.expect_get_traced_entry_points()
1090            .returning(|_| {
1091                Ok(TracedEntryPointRequestResponse {
1092                    traced_entry_points: HashMap::new(),
1093                    pagination: PaginationResponse::new(0, 20, 0),
1094                })
1095            });
1096
1097        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1098        state_sync
1099            .component_tracker
1100            .components
1101            .insert("Component1".to_string(), component.clone());
1102        let components_arg = ["Component1".to_string()];
1103        let exp = StateSyncMessage {
1104            header: header.clone(),
1105            snapshots: Snapshot {
1106                states: state_snapshot_native()
1107                    .states
1108                    .into_iter()
1109                    .map(|state| {
1110                        (
1111                            state.component_id.clone(),
1112                            ComponentWithState {
1113                                state,
1114                                component: component.clone(),
1115                                component_tvl: Some(100.0),
1116                                entrypoints: vec![],
1117                            },
1118                        )
1119                    })
1120                    .collect(),
1121                vm_storage: HashMap::new(),
1122            },
1123            deltas: None,
1124            removed_components: Default::default(),
1125        };
1126
1127        let snap = state_sync
1128            .get_snapshots(header, Some(&components_arg))
1129            .await
1130            .expect("Retrieving snapshot failed");
1131
1132        assert_eq!(snap, exp);
1133    }
1134
1135    fn state_snapshot_vm() -> StateRequestResponse {
1136        StateRequestResponse {
1137            accounts: vec![
1138                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1139                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1140            ],
1141            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1142        }
1143    }
1144
1145    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1146        TracedEntryPointRequestResponse {
1147            traced_entry_points: HashMap::from([(
1148                "Component1".to_string(),
1149                vec![(
1150                    EntryPointWithTracingParams {
1151                        entry_point: EntryPoint {
1152                            external_id: "entrypoint_a".to_string(),
1153                            target: Bytes::from("0x0badc0ffee"),
1154                            signature: "sig()".to_string(),
1155                        },
1156                        params: TracingParams::RPCTracer(RPCTracerParams {
1157                            caller: Some(Bytes::from("0x0badc0ffee")),
1158                            calldata: Bytes::from("0x0badc0ffee"),
1159                            state_overrides: None,
1160                            prune_addresses: None,
1161                        }),
1162                    },
1163                    TracingResult {
1164                        retriggers: HashSet::from([(
1165                            Bytes::from("0x0badc0ffee"),
1166                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1167                        )]),
1168                        accessed_slots: HashMap::from([(
1169                            Bytes::from("0x0badc0ffee"),
1170                            HashSet::from([Bytes::from("0xbadbeef0")]),
1171                        )]),
1172                    },
1173                )],
1174            )]),
1175            pagination: PaginationResponse::new(0, 20, 0),
1176        }
1177    }
1178
1179    #[test_log::test(tokio::test)]
1180    async fn test_get_snapshots_vm() {
1181        let header = BlockHeader::default();
1182        let mut rpc = make_mock_client();
1183
1184        let traced_ep_response = traced_entry_point_response();
1185        rpc.expect_get_snapshots()
1186            .returning(move |_request, _chunk_size, _concurrency| {
1187                let vm_storage_accounts = state_snapshot_vm();
1188                Ok(Snapshot {
1189                    states: [(
1190                        "Component1".to_string(),
1191                        ComponentWithState {
1192                            state: ResponseProtocolState {
1193                                component_id: "Component1".to_string(),
1194                                ..Default::default()
1195                            },
1196                            component: ProtocolComponent {
1197                                id: "Component1".to_string(),
1198                                contract_ids: vec![
1199                                    Bytes::from("0x0badc0ffee"),
1200                                    Bytes::from("0xbabe42"),
1201                                ],
1202                                ..Default::default()
1203                            },
1204                            component_tvl: None,
1205                            entrypoints: traced_ep_response
1206                                .traced_entry_points
1207                                .get("Component1")
1208                                .cloned()
1209                                .unwrap_or_default(),
1210                        },
1211                    )]
1212                    .into_iter()
1213                    .collect(),
1214                    vm_storage: vm_storage_accounts
1215                        .accounts
1216                        .into_iter()
1217                        .map(|state| (state.address.clone(), state))
1218                        .collect(),
1219                })
1220            });
1221
1222        rpc.expect_get_traced_entry_points()
1223            .returning(|_| Ok(traced_entry_point_response()));
1224
1225        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1226        let component = ProtocolComponent {
1227            id: "Component1".to_string(),
1228            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1229            ..Default::default()
1230        };
1231        state_sync
1232            .component_tracker
1233            .components
1234            .insert("Component1".to_string(), component.clone());
1235        let components_arg = ["Component1".to_string()];
1236        let exp = StateSyncMessage {
1237            header: header.clone(),
1238            snapshots: Snapshot {
1239                states: [(
1240                    component.id.clone(),
1241                    ComponentWithState {
1242                        state: ResponseProtocolState {
1243                            component_id: "Component1".to_string(),
1244                            ..Default::default()
1245                        },
1246                        component: component.clone(),
1247                        component_tvl: None,
1248                        entrypoints: vec![(
1249                            EntryPointWithTracingParams {
1250                                entry_point: EntryPoint {
1251                                    external_id: "entrypoint_a".to_string(),
1252                                    target: Bytes::from("0x0badc0ffee"),
1253                                    signature: "sig()".to_string(),
1254                                },
1255                                params: TracingParams::RPCTracer(RPCTracerParams {
1256                                    caller: Some(Bytes::from("0x0badc0ffee")),
1257                                    calldata: Bytes::from("0x0badc0ffee"),
1258                                    state_overrides: None,
1259                                    prune_addresses: None,
1260                                }),
1261                            },
1262                            TracingResult {
1263                                retriggers: HashSet::from([(
1264                                    Bytes::from("0x0badc0ffee"),
1265                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1266                                )]),
1267                                accessed_slots: HashMap::from([(
1268                                    Bytes::from("0x0badc0ffee"),
1269                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1270                                )]),
1271                            },
1272                        )],
1273                    },
1274                )]
1275                .into_iter()
1276                .collect(),
1277                vm_storage: state_snapshot_vm()
1278                    .accounts
1279                    .into_iter()
1280                    .map(|state| (state.address.clone(), state))
1281                    .collect(),
1282            },
1283            deltas: None,
1284            removed_components: Default::default(),
1285        };
1286
1287        let snap = state_sync
1288            .get_snapshots(header, Some(&components_arg))
1289            .await
1290            .expect("Retrieving snapshot failed");
1291
1292        assert_eq!(snap, exp);
1293    }
1294
1295    #[test_log::test(tokio::test)]
1296    async fn test_get_snapshots_vm_with_tvl() {
1297        let header = BlockHeader::default();
1298        let mut rpc = make_mock_client();
1299        let component = ProtocolComponent {
1300            id: "Component1".to_string(),
1301            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1302            ..Default::default()
1303        };
1304
1305        let component_clone = component.clone();
1306        rpc.expect_get_snapshots()
1307            .returning(move |_request, _chunk_size, _concurrency| {
1308                let vm_storage_accounts = state_snapshot_vm();
1309                Ok(Snapshot {
1310                    states: [(
1311                        "Component1".to_string(),
1312                        ComponentWithState {
1313                            state: ResponseProtocolState {
1314                                component_id: "Component1".to_string(),
1315                                ..Default::default()
1316                            },
1317                            component: component_clone.clone(),
1318                            component_tvl: Some(100.0),
1319                            entrypoints: vec![],
1320                        },
1321                    )]
1322                    .into_iter()
1323                    .collect(),
1324                    vm_storage: vm_storage_accounts
1325                        .accounts
1326                        .into_iter()
1327                        .map(|state| (state.address.clone(), state))
1328                        .collect(),
1329                })
1330            });
1331
1332        rpc.expect_get_traced_entry_points()
1333            .returning(|_| {
1334                Ok(TracedEntryPointRequestResponse {
1335                    traced_entry_points: HashMap::new(),
1336                    pagination: PaginationResponse::new(0, 20, 0),
1337                })
1338            });
1339
1340        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1341        state_sync
1342            .component_tracker
1343            .components
1344            .insert("Component1".to_string(), component.clone());
1345        let components_arg = ["Component1".to_string()];
1346        let exp = StateSyncMessage {
1347            header: header.clone(),
1348            snapshots: Snapshot {
1349                states: [(
1350                    component.id.clone(),
1351                    ComponentWithState {
1352                        state: ResponseProtocolState {
1353                            component_id: "Component1".to_string(),
1354                            ..Default::default()
1355                        },
1356                        component: component.clone(),
1357                        component_tvl: Some(100.0),
1358                        entrypoints: vec![],
1359                    },
1360                )]
1361                .into_iter()
1362                .collect(),
1363                vm_storage: state_snapshot_vm()
1364                    .accounts
1365                    .into_iter()
1366                    .map(|state| (state.address.clone(), state))
1367                    .collect(),
1368            },
1369            deltas: None,
1370            removed_components: Default::default(),
1371        };
1372
1373        let snap = state_sync
1374            .get_snapshots(header, Some(&components_arg))
1375            .await
1376            .expect("Retrieving snapshot failed");
1377
1378        assert_eq!(snap, exp);
1379    }
1380
1381    /// Test that get_snapshots only fetches snapshots for requested components,
1382    /// not all tracked components. This prevents returning full snapshots repeatedly
1383    /// when only a subset of components need updates.
1384    #[test_log::test(tokio::test)]
1385    async fn test_get_snapshots_filters_to_requested_components_only() {
1386        let header = BlockHeader::default();
1387        let mut rpc = make_mock_client();
1388
1389        // Create three components
1390        let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1391        let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1392        let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1393
1394        let component2_clone = component2.clone();
1395
1396        // Mock the RPC call and verify it only receives Component2
1397        rpc.expect_get_snapshots()
1398            .withf(
1399                |request: &SnapshotParameters,
1400                 _chunk_size: &Option<usize>,
1401                 _concurrency: &usize| {
1402                    // Verify that the request contains ONLY Component2, not all tracked components
1403                    request.components.len() == 1 &&
1404                        request
1405                            .components
1406                            .contains_key("Component2")
1407                },
1408            )
1409            .times(1)
1410            .returning(move |_request, _chunk_size, _concurrency| {
1411                Ok(Snapshot {
1412                    states: [(
1413                        "Component2".to_string(),
1414                        ComponentWithState {
1415                            state: ResponseProtocolState {
1416                                component_id: "Component2".to_string(),
1417                                ..Default::default()
1418                            },
1419                            component: component2_clone.clone(),
1420                            entrypoints: vec![],
1421                            component_tvl: None,
1422                        },
1423                    )]
1424                    .into_iter()
1425                    .collect(),
1426                    vm_storage: HashMap::new(),
1427                })
1428            });
1429
1430        rpc.expect_get_traced_entry_points()
1431            .returning(|_| {
1432                Ok(TracedEntryPointRequestResponse {
1433                    traced_entry_points: HashMap::new(),
1434                    pagination: PaginationResponse::new(0, 20, 0),
1435                })
1436            });
1437
1438        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1439
1440        // Track all three components
1441        state_sync
1442            .component_tracker
1443            .components
1444            .insert("Component1".to_string(), component1.clone());
1445        state_sync
1446            .component_tracker
1447            .components
1448            .insert("Component2".to_string(), component2.clone());
1449        state_sync
1450            .component_tracker
1451            .components
1452            .insert("Component3".to_string(), component3.clone());
1453
1454        // Request snapshot for ONLY Component2
1455        let components_arg = ["Component2".to_string()];
1456
1457        let snap = state_sync
1458            .get_snapshots(header.clone(), Some(&components_arg))
1459            .await
1460            .expect("Retrieving snapshot failed");
1461
1462        // Verify we only got Component2 back
1463        assert_eq!(snap.snapshots.states.len(), 1);
1464        assert!(snap
1465            .snapshots
1466            .states
1467            .contains_key("Component2"));
1468        assert!(!snap
1469            .snapshots
1470            .states
1471            .contains_key("Component1"));
1472        assert!(!snap
1473            .snapshots
1474            .states
1475            .contains_key("Component3"));
1476    }
1477
1478    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1479        let mut rpc_client = make_mock_client();
1480        // Mocks for the start_tracking call, these need to come first because they are more
1481        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1482        rpc_client
1483            .expect_get_protocol_components()
1484            .with(mockall::predicate::function(
1485                move |request_params: &ProtocolComponentsRequestBody| {
1486                    if let Some(ids) = request_params.component_ids.as_ref() {
1487                        ids.contains(&"Component3".to_string())
1488                    } else {
1489                        false
1490                    }
1491                },
1492            ))
1493            .returning(|_| {
1494                // return Component3
1495                Ok(ProtocolComponentRequestResponse {
1496                    protocol_components: vec![
1497                        // this component shall have a tvl update above threshold
1498                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1499                    ],
1500                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1501                })
1502            });
1503        // Mock get_snapshots for Component3
1504        rpc_client
1505            .expect_get_snapshots()
1506            .withf(
1507                |request: &SnapshotParameters,
1508                 _chunk_size: &Option<usize>,
1509                 _concurrency: &usize| {
1510                    request
1511                        .components
1512                        .contains_key("Component3")
1513                },
1514            )
1515            .returning(|_request, _chunk_size, _concurrency| {
1516                Ok(Snapshot {
1517                    states: [(
1518                        "Component3".to_string(),
1519                        ComponentWithState {
1520                            state: ResponseProtocolState {
1521                                component_id: "Component3".to_string(),
1522                                ..Default::default()
1523                            },
1524                            component: ProtocolComponent {
1525                                id: "Component3".to_string(),
1526                                ..Default::default()
1527                            },
1528                            component_tvl: Some(1000.0),
1529                            entrypoints: vec![],
1530                        },
1531                    )]
1532                    .into_iter()
1533                    .collect(),
1534                    vm_storage: HashMap::new(),
1535                })
1536            });
1537
1538        // mock calls for the initial state snapshots
1539        rpc_client
1540            .expect_get_protocol_components()
1541            .returning(|_| {
1542                // Initial sync of components
1543                Ok(ProtocolComponentRequestResponse {
1544                    protocol_components: vec![
1545                        // this component shall have a tvl update above threshold
1546                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1547                        // this component shall have a tvl update below threshold.
1548                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1549                        // a third component will have a tvl update above threshold
1550                    ],
1551                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1552                })
1553            });
1554
1555        rpc_client
1556            .expect_get_snapshots()
1557            .returning(|_request, _chunk_size, _concurrency| {
1558                Ok(Snapshot {
1559                    states: [
1560                        (
1561                            "Component1".to_string(),
1562                            ComponentWithState {
1563                                state: ResponseProtocolState {
1564                                    component_id: "Component1".to_string(),
1565                                    ..Default::default()
1566                                },
1567                                component: ProtocolComponent {
1568                                    id: "Component1".to_string(),
1569                                    ..Default::default()
1570                                },
1571                                component_tvl: Some(100.0),
1572                                entrypoints: vec![],
1573                            },
1574                        ),
1575                        (
1576                            "Component2".to_string(),
1577                            ComponentWithState {
1578                                state: ResponseProtocolState {
1579                                    component_id: "Component2".to_string(),
1580                                    ..Default::default()
1581                                },
1582                                component: ProtocolComponent {
1583                                    id: "Component2".to_string(),
1584                                    ..Default::default()
1585                                },
1586                                component_tvl: Some(0.0),
1587                                entrypoints: vec![],
1588                            },
1589                        ),
1590                    ]
1591                    .into_iter()
1592                    .collect(),
1593                    vm_storage: HashMap::new(),
1594                })
1595            });
1596
1597        // Mock get_traced_entry_points for Ethereum chain
1598        rpc_client
1599            .expect_get_traced_entry_points()
1600            .returning(|_| {
1601                Ok(TracedEntryPointRequestResponse {
1602                    traced_entry_points: HashMap::new(),
1603                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1604                })
1605            });
1606
1607        // Mock deltas client and messages
1608        let mut deltas_client = MockDeltasClient::new();
1609        let (tx, rx) = channel(1);
1610        deltas_client
1611            .expect_subscribe()
1612            .return_once(move |_, _| {
1613                // Return subscriber id and a channel
1614                Ok((Uuid::default(), rx))
1615            });
1616
1617        // Expect unsubscribe call during cleanup
1618        deltas_client
1619            .expect_unsubscribe()
1620            .return_once(|_| Ok(()));
1621
1622        (rpc_client, deltas_client, tx)
1623    }
1624
1625    /// Test strategy
1626    ///
1627    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1628    /// - send 2 dummy messages, containing only blocks
1629    /// - third message contains a new component with some significant tvl, one initial component
1630    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1631    #[test_log::test(tokio::test)]
1632    async fn test_state_sync() {
1633        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1634        let deltas = [
1635            BlockChanges {
1636                extractor: "uniswap-v2".to_string(),
1637                chain: Chain::Ethereum,
1638                block: Block {
1639                    number: 1,
1640                    hash: Bytes::from("0x01"),
1641                    parent_hash: Bytes::from("0x00"),
1642                    chain: Chain::Ethereum,
1643                    ts: Default::default(),
1644                },
1645                revert: false,
1646                dci_update: DCIUpdate {
1647                    new_entrypoints: HashMap::from([(
1648                        "Component1".to_string(),
1649                        HashSet::from([EntryPoint {
1650                            external_id: "entrypoint_a".to_string(),
1651                            target: Bytes::from("0x0badc0ffee"),
1652                            signature: "sig()".to_string(),
1653                        }]),
1654                    )]),
1655                    new_entrypoint_params: HashMap::from([(
1656                        "entrypoint_a".to_string(),
1657                        HashSet::from([(
1658                            TracingParams::RPCTracer(RPCTracerParams {
1659                                caller: Some(Bytes::from("0x0badc0ffee")),
1660                                calldata: Bytes::from("0x0badc0ffee"),
1661                                state_overrides: None,
1662                                prune_addresses: None,
1663                            }),
1664                            "Component1".to_string(),
1665                        )]),
1666                    )]),
1667                    trace_results: HashMap::from([(
1668                        "entrypoint_a".to_string(),
1669                        TracingResult {
1670                            retriggers: HashSet::from([(
1671                                Bytes::from("0x0badc0ffee"),
1672                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1673                            )]),
1674                            accessed_slots: HashMap::from([(
1675                                Bytes::from("0x0badc0ffee"),
1676                                HashSet::from([Bytes::from("0xbadbeef0")]),
1677                            )]),
1678                        },
1679                    )]),
1680                },
1681                ..Default::default()
1682            },
1683            BlockChanges {
1684                extractor: "uniswap-v2".to_string(),
1685                chain: Chain::Ethereum,
1686                block: Block {
1687                    number: 2,
1688                    hash: Bytes::from("0x02"),
1689                    parent_hash: Bytes::from("0x01"),
1690                    chain: Chain::Ethereum,
1691                    ts: Default::default(),
1692                },
1693                revert: false,
1694                component_tvl: [
1695                    ("Component1".to_string(), 100.0),
1696                    ("Component2".to_string(), 0.0),
1697                    ("Component3".to_string(), 1000.0),
1698                ]
1699                .into_iter()
1700                .collect(),
1701                ..Default::default()
1702            },
1703        ];
1704        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1705        state_sync
1706            .initialize()
1707            .await
1708            .expect("Init failed");
1709
1710        // Test starts here
1711        let (handle, mut rx) = state_sync.start().await;
1712        let (jh, close_tx) = handle.split();
1713        tx.send(deltas[0].clone())
1714            .await
1715            .expect("deltas channel msg 0 closed!");
1716        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1717            .await
1718            .expect("waiting for first state msg timed out!")
1719            .expect("state sync block sender closed!");
1720        tx.send(deltas[1].clone())
1721            .await
1722            .expect("deltas channel msg 1 closed!");
1723        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1724            .await
1725            .expect("waiting for second state msg timed out!")
1726            .expect("state sync block sender closed!");
1727        let _ = close_tx.send(());
1728        jh.await
1729            .expect("state sync task panicked!");
1730
1731        // assertions
1732        let exp1 = StateSyncMessage {
1733            header: BlockHeader {
1734                number: 1,
1735                hash: Bytes::from("0x01"),
1736                parent_hash: Bytes::from("0x00"),
1737                revert: false,
1738                ..Default::default()
1739            },
1740            snapshots: Snapshot {
1741                states: [
1742                    (
1743                        "Component1".to_string(),
1744                        ComponentWithState {
1745                            state: ResponseProtocolState {
1746                                component_id: "Component1".to_string(),
1747                                ..Default::default()
1748                            },
1749                            component: ProtocolComponent {
1750                                id: "Component1".to_string(),
1751                                ..Default::default()
1752                            },
1753                            component_tvl: Some(100.0),
1754                            entrypoints: vec![],
1755                        },
1756                    ),
1757                    (
1758                        "Component2".to_string(),
1759                        ComponentWithState {
1760                            state: ResponseProtocolState {
1761                                component_id: "Component2".to_string(),
1762                                ..Default::default()
1763                            },
1764                            component: ProtocolComponent {
1765                                id: "Component2".to_string(),
1766                                ..Default::default()
1767                            },
1768                            component_tvl: Some(0.0),
1769                            entrypoints: vec![],
1770                        },
1771                    ),
1772                ]
1773                .into_iter()
1774                .collect(),
1775                vm_storage: HashMap::new(),
1776            },
1777            deltas: Some(deltas[0].clone()),
1778            removed_components: Default::default(),
1779        };
1780
1781        let exp2 = StateSyncMessage {
1782            header: BlockHeader {
1783                number: 2,
1784                hash: Bytes::from("0x02"),
1785                parent_hash: Bytes::from("0x01"),
1786                revert: false,
1787                ..Default::default()
1788            },
1789            snapshots: Snapshot {
1790                states: [
1791                    // This is the new component we queried once it passed the tvl threshold.
1792                    (
1793                        "Component3".to_string(),
1794                        ComponentWithState {
1795                            state: ResponseProtocolState {
1796                                component_id: "Component3".to_string(),
1797                                ..Default::default()
1798                            },
1799                            component: ProtocolComponent {
1800                                id: "Component3".to_string(),
1801                                ..Default::default()
1802                            },
1803                            component_tvl: Some(1000.0),
1804                            entrypoints: vec![],
1805                        },
1806                    ),
1807                ]
1808                .into_iter()
1809                .collect(),
1810                vm_storage: HashMap::new(),
1811            },
1812            // Our deltas are empty and since merge methods are
1813            // tested in tycho-common we don't have much to do here.
1814            deltas: Some(BlockChanges {
1815                extractor: "uniswap-v2".to_string(),
1816                chain: Chain::Ethereum,
1817                block: Block {
1818                    number: 2,
1819                    hash: Bytes::from("0x02"),
1820                    parent_hash: Bytes::from("0x01"),
1821                    chain: Chain::Ethereum,
1822                    ts: Default::default(),
1823                },
1824                revert: false,
1825                component_tvl: [
1826                    // "Component2" should not show here.
1827                    ("Component1".to_string(), 100.0),
1828                    ("Component3".to_string(), 1000.0),
1829                ]
1830                .into_iter()
1831                .collect(),
1832                ..Default::default()
1833            }),
1834            // "Component2" was removed, because its tvl changed to 0.
1835            removed_components: [(
1836                "Component2".to_string(),
1837                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1838            )]
1839            .into_iter()
1840            .collect(),
1841        };
1842        assert_eq!(first_msg.unwrap(), exp1);
1843        assert_eq!(second_msg.unwrap(), exp2);
1844    }
1845
1846    #[test_log::test(tokio::test)]
1847    async fn test_state_sync_with_tvl_range() {
1848        // Define the range for testing
1849        let remove_tvl_threshold = 5.0;
1850        let add_tvl_threshold = 7.0;
1851
1852        let mut rpc_client = make_mock_client();
1853        let mut deltas_client = MockDeltasClient::new();
1854
1855        rpc_client
1856            .expect_get_protocol_components()
1857            .with(mockall::predicate::function(
1858                move |request_params: &ProtocolComponentsRequestBody| {
1859                    if let Some(ids) = request_params.component_ids.as_ref() {
1860                        ids.contains(&"Component3".to_string())
1861                    } else {
1862                        false
1863                    }
1864                },
1865            ))
1866            .returning(|_| {
1867                Ok(ProtocolComponentRequestResponse {
1868                    protocol_components: vec![ProtocolComponent {
1869                        id: "Component3".to_string(),
1870                        ..Default::default()
1871                    }],
1872                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1873                })
1874            });
1875        // Mock get_snapshots for Component3
1876        rpc_client
1877            .expect_get_snapshots()
1878            .withf(
1879                |request: &SnapshotParameters,
1880                 _chunk_size: &Option<usize>,
1881                 _concurrency: &usize| {
1882                    request
1883                        .components
1884                        .contains_key("Component3")
1885                },
1886            )
1887            .returning(|_request, _chunk_size, _concurrency| {
1888                Ok(Snapshot {
1889                    states: [(
1890                        "Component3".to_string(),
1891                        ComponentWithState {
1892                            state: ResponseProtocolState {
1893                                component_id: "Component3".to_string(),
1894                                ..Default::default()
1895                            },
1896                            component: ProtocolComponent {
1897                                id: "Component3".to_string(),
1898                                ..Default::default()
1899                            },
1900                            component_tvl: Some(10.0),
1901                            entrypoints: vec![],
1902                        },
1903                    )]
1904                    .into_iter()
1905                    .collect(),
1906                    vm_storage: HashMap::new(),
1907                })
1908            });
1909
1910        // Mock for the initial snapshot retrieval
1911        rpc_client
1912            .expect_get_protocol_components()
1913            .returning(|_| {
1914                Ok(ProtocolComponentRequestResponse {
1915                    protocol_components: vec![
1916                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1917                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1918                    ],
1919                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1920                })
1921            });
1922
1923        // Mock get_snapshots for initial snapshot
1924        rpc_client
1925            .expect_get_snapshots()
1926            .returning(|_request, _chunk_size, _concurrency| {
1927                Ok(Snapshot {
1928                    states: [
1929                        (
1930                            "Component1".to_string(),
1931                            ComponentWithState {
1932                                state: ResponseProtocolState {
1933                                    component_id: "Component1".to_string(),
1934                                    ..Default::default()
1935                                },
1936                                component: ProtocolComponent {
1937                                    id: "Component1".to_string(),
1938                                    ..Default::default()
1939                                },
1940                                component_tvl: Some(6.0),
1941                                entrypoints: vec![],
1942                            },
1943                        ),
1944                        (
1945                            "Component2".to_string(),
1946                            ComponentWithState {
1947                                state: ResponseProtocolState {
1948                                    component_id: "Component2".to_string(),
1949                                    ..Default::default()
1950                                },
1951                                component: ProtocolComponent {
1952                                    id: "Component2".to_string(),
1953                                    ..Default::default()
1954                                },
1955                                component_tvl: Some(2.0),
1956                                entrypoints: vec![],
1957                            },
1958                        ),
1959                    ]
1960                    .into_iter()
1961                    .collect(),
1962                    vm_storage: HashMap::new(),
1963                })
1964            });
1965
1966        // Mock get_traced_entry_points for Ethereum chain
1967        rpc_client
1968            .expect_get_traced_entry_points()
1969            .returning(|_| {
1970                Ok(TracedEntryPointRequestResponse {
1971                    traced_entry_points: HashMap::new(),
1972                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1973                })
1974            });
1975
1976        let (tx, rx) = channel(1);
1977        deltas_client
1978            .expect_subscribe()
1979            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1980
1981        // Expect unsubscribe call during cleanup
1982        deltas_client
1983            .expect_unsubscribe()
1984            .return_once(|_| Ok(()));
1985
1986        let mut state_sync = ProtocolStateSynchronizer::new(
1987            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1988            true,
1989            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1990            1,
1991            Duration::from_secs(0),
1992            true,
1993            true,
1994            true,
1995            ArcRPCClient(Arc::new(rpc_client)),
1996            ArcDeltasClient(Arc::new(deltas_client)),
1997            10_u64,
1998        );
1999        state_sync
2000            .initialize()
2001            .await
2002            .expect("Init failed");
2003
2004        // Simulate the incoming BlockChanges
2005        let deltas = [
2006            BlockChanges {
2007                extractor: "uniswap-v2".to_string(),
2008                chain: Chain::Ethereum,
2009                block: Block {
2010                    number: 1,
2011                    hash: Bytes::from("0x01"),
2012                    parent_hash: Bytes::from("0x00"),
2013                    chain: Chain::Ethereum,
2014                    ts: Default::default(),
2015                },
2016                revert: false,
2017                ..Default::default()
2018            },
2019            BlockChanges {
2020                extractor: "uniswap-v2".to_string(),
2021                chain: Chain::Ethereum,
2022                block: Block {
2023                    number: 2,
2024                    hash: Bytes::from("0x02"),
2025                    parent_hash: Bytes::from("0x01"),
2026                    chain: Chain::Ethereum,
2027                    ts: Default::default(),
2028                },
2029                revert: false,
2030                component_tvl: [
2031                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
2032                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
2033                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
2034                ]
2035                .into_iter()
2036                .collect(),
2037                ..Default::default()
2038            },
2039        ];
2040
2041        let (handle, mut rx) = state_sync.start().await;
2042        let (jh, close_tx) = handle.split();
2043
2044        // Simulate sending delta messages
2045        tx.send(deltas[0].clone())
2046            .await
2047            .expect("deltas channel msg 0 closed!");
2048
2049        // Expecting to receive the initial state message
2050        let _ = timeout(Duration::from_millis(100), rx.recv())
2051            .await
2052            .expect("waiting for first state msg timed out!")
2053            .expect("state sync block sender closed!");
2054
2055        // Send the third message, which should trigger TVL-based changes
2056        tx.send(deltas[1].clone())
2057            .await
2058            .expect("deltas channel msg 1 closed!");
2059        let second_msg = timeout(Duration::from_millis(100), rx.recv())
2060            .await
2061            .expect("waiting for second state msg timed out!")
2062            .expect("state sync block sender closed!")
2063            .expect("no error");
2064
2065        let _ = close_tx.send(());
2066        jh.await
2067            .expect("state sync task panicked!");
2068
2069        let expected_second_msg = StateSyncMessage {
2070            header: BlockHeader {
2071                number: 2,
2072                hash: Bytes::from("0x02"),
2073                parent_hash: Bytes::from("0x01"),
2074                revert: false,
2075                ..Default::default()
2076            },
2077            snapshots: Snapshot {
2078                states: [(
2079                    "Component3".to_string(),
2080                    ComponentWithState {
2081                        state: ResponseProtocolState {
2082                            component_id: "Component3".to_string(),
2083                            ..Default::default()
2084                        },
2085                        component: ProtocolComponent {
2086                            id: "Component3".to_string(),
2087                            ..Default::default()
2088                        },
2089                        component_tvl: Some(10.0),
2090                        entrypoints: vec![], // TODO: add entrypoints?
2091                    },
2092                )]
2093                .into_iter()
2094                .collect(),
2095                vm_storage: HashMap::new(),
2096            },
2097            deltas: Some(BlockChanges {
2098                extractor: "uniswap-v2".to_string(),
2099                chain: Chain::Ethereum,
2100                block: Block {
2101                    number: 2,
2102                    hash: Bytes::from("0x02"),
2103                    parent_hash: Bytes::from("0x01"),
2104                    chain: Chain::Ethereum,
2105                    ts: Default::default(),
2106                },
2107                revert: false,
2108                component_tvl: [
2109                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
2110                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
2111                ]
2112                .into_iter()
2113                .collect(),
2114                ..Default::default()
2115            }),
2116            removed_components: [(
2117                "Component2".to_string(),
2118                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2119            )]
2120            .into_iter()
2121            .collect(),
2122        };
2123
2124        assert_eq!(second_msg, expected_second_msg);
2125    }
2126
2127    #[test_log::test(tokio::test)]
2128    async fn test_public_close_api_functionality() {
2129        // Tests the public close() API through the StateSynchronizer trait:
2130        // - close() fails before start() is called
2131        // - close() succeeds while synchronizer is running
2132        // - close() fails after already closed
2133        // This tests the full start/close lifecycle via the public API
2134
2135        let mut rpc_client = make_mock_client();
2136        let mut deltas_client = MockDeltasClient::new();
2137
2138        // Mock the initial components call
2139        rpc_client
2140            .expect_get_protocol_components()
2141            .returning(|_| {
2142                Ok(ProtocolComponentRequestResponse {
2143                    protocol_components: vec![],
2144                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2145                })
2146            });
2147
2148        // Set up deltas client that will wait for messages (blocking in state_sync)
2149        let (_tx, rx) = channel(1);
2150        deltas_client
2151            .expect_subscribe()
2152            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2153
2154        // Expect unsubscribe call during cleanup
2155        deltas_client
2156            .expect_unsubscribe()
2157            .return_once(|_| Ok(()));
2158
2159        let mut state_sync = ProtocolStateSynchronizer::new(
2160            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2161            true,
2162            ComponentFilter::with_tvl_range(0.0, 0.0),
2163            5, // Enough retries
2164            Duration::from_secs(0),
2165            true,
2166            false,
2167            true,
2168            ArcRPCClient(Arc::new(rpc_client)),
2169            ArcDeltasClient(Arc::new(deltas_client)),
2170            10000_u64, // Long timeout so task doesn't exit on its own
2171        );
2172
2173        state_sync
2174            .initialize()
2175            .await
2176            .expect("Init should succeed");
2177
2178        // Start the synchronizer and test the new split-based close mechanism
2179        let (handle, _rx) = state_sync.start().await;
2180        let (jh, close_tx) = handle.split();
2181
2182        // Give it time to start up and enter state_sync
2183        tokio::time::sleep(Duration::from_millis(100)).await;
2184
2185        // Send close signal should succeed
2186        close_tx
2187            .send(())
2188            .expect("Should be able to send close signal");
2189        // Task should stop cleanly
2190        jh.await.expect("Task should not panic");
2191    }
2192
2193    #[test_log::test(tokio::test)]
2194    async fn test_cleanup_runs_when_state_sync_processing_errors() {
2195        // Tests that cleanup code runs when state_sync() errors during delta processing.
2196        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
2197        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
2198
2199        let mut rpc_client = make_mock_client();
2200        let mut deltas_client = MockDeltasClient::new();
2201
2202        // Mock the initial components call
2203        rpc_client
2204            .expect_get_protocol_components()
2205            .returning(|_| {
2206                Ok(ProtocolComponentRequestResponse {
2207                    protocol_components: vec![],
2208                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2209                })
2210            });
2211
2212        // Mock to fail during snapshot retrieval (this will cause an error during processing)
2213        rpc_client
2214            .expect_get_protocol_states()
2215            .returning(|_| {
2216                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2217            });
2218
2219        // Set up deltas client to send one message that will trigger snapshot retrieval
2220        let (tx, rx) = channel(10);
2221        deltas_client
2222            .expect_subscribe()
2223            .return_once(move |_, _| {
2224                // Send a delta message that will require a snapshot
2225                let delta = BlockChanges {
2226                    extractor: "test".to_string(),
2227                    chain: Chain::Ethereum,
2228                    block: Block {
2229                        hash: Bytes::from("0x0123"),
2230                        number: 1,
2231                        parent_hash: Bytes::from("0x0000"),
2232                        chain: Chain::Ethereum,
2233                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2234                            .unwrap()
2235                            .naive_utc(),
2236                    },
2237                    revert: false,
2238                    // Add a new component to trigger snapshot request
2239                    new_protocol_components: [(
2240                        "new_component".to_string(),
2241                        ProtocolComponent {
2242                            id: "new_component".to_string(),
2243                            protocol_system: "test_protocol".to_string(),
2244                            protocol_type_name: "test".to_string(),
2245                            chain: Chain::Ethereum,
2246                            tokens: vec![Bytes::from("0x0badc0ffee")],
2247                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
2248                            static_attributes: Default::default(),
2249                            creation_tx: Default::default(),
2250                            created_at: Default::default(),
2251                            change: Default::default(),
2252                        },
2253                    )]
2254                    .into_iter()
2255                    .collect(),
2256                    component_tvl: [("new_component".to_string(), 100.0)]
2257                        .into_iter()
2258                        .collect(),
2259                    ..Default::default()
2260                };
2261
2262                tokio::spawn(async move {
2263                    let _ = tx.send(delta).await;
2264                    // Close the channel after sending one message
2265                });
2266
2267                Ok((Uuid::default(), rx))
2268            });
2269
2270        // Expect unsubscribe call during cleanup
2271        deltas_client
2272            .expect_unsubscribe()
2273            .return_once(|_| Ok(()));
2274
2275        let mut state_sync = ProtocolStateSynchronizer::new(
2276            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2277            true,
2278            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2279            1,
2280            Duration::from_secs(0),
2281            true,
2282            false,
2283            true,
2284            ArcRPCClient(Arc::new(rpc_client)),
2285            ArcDeltasClient(Arc::new(deltas_client)),
2286            5000_u64,
2287        );
2288
2289        state_sync
2290            .initialize()
2291            .await
2292            .expect("Init should succeed");
2293
2294        // Before calling state_sync, set a value in last_synced_block
2295        state_sync.last_synced_block = Some(BlockHeader {
2296            hash: Bytes::from("0x0badc0ffee"),
2297            number: 42,
2298            parent_hash: Bytes::from("0xbadbeef0"),
2299            revert: false,
2300            timestamp: 123456789,
2301            partial_block_index: None,
2302        });
2303
2304        // Create a channel for state_sync to send messages to
2305        let (mut block_tx, _block_rx) = channel(10);
2306
2307        // Call state_sync directly - this should error during processing
2308        let (_end_tx, end_rx) = oneshot::channel::<()>();
2309        let result = state_sync
2310            .state_sync(&mut block_tx, end_rx)
2311            .await;
2312        // Verify that state_sync returned an error
2313        assert!(result.is_err(), "state_sync should have errored during processing");
2314
2315        // Note: We can't verify internal state cleanup since state_sync consumes self,
2316        // but the cleanup logic is still tested by the fact that the method returns properly.
2317    }
2318
2319    #[test_log::test(tokio::test)]
2320    async fn test_close_signal_while_waiting_for_first_deltas() {
2321        // Tests close signal handling during the initial "waiting for deltas" phase.
2322        // This is the earliest possible close scenario - before any deltas are received.
2323        // Verifies: close signal received while waiting for first message triggers cleanup
2324        let mut rpc_client = make_mock_client();
2325        let mut deltas_client = MockDeltasClient::new();
2326
2327        rpc_client
2328            .expect_get_protocol_components()
2329            .returning(|_| {
2330                Ok(ProtocolComponentRequestResponse {
2331                    protocol_components: vec![],
2332                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2333                })
2334            });
2335
2336        let (_tx, rx) = channel(1);
2337        deltas_client
2338            .expect_subscribe()
2339            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2340
2341        deltas_client
2342            .expect_unsubscribe()
2343            .return_once(|_| Ok(()));
2344
2345        let mut state_sync = ProtocolStateSynchronizer::new(
2346            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2347            true,
2348            ComponentFilter::with_tvl_range(0.0, 0.0),
2349            1,
2350            Duration::from_secs(0),
2351            true,
2352            true,
2353            false,
2354            ArcRPCClient(Arc::new(rpc_client)),
2355            ArcDeltasClient(Arc::new(deltas_client)),
2356            10000_u64,
2357        );
2358
2359        state_sync
2360            .initialize()
2361            .await
2362            .expect("Init should succeed");
2363
2364        let (mut block_tx, _block_rx) = channel(10);
2365        let (end_tx, end_rx) = oneshot::channel::<()>();
2366
2367        // Start state_sync in a task
2368        let state_sync_handle = tokio::spawn(async move {
2369            state_sync
2370                .state_sync(&mut block_tx, end_rx)
2371                .await
2372        });
2373
2374        // Give it a moment to start
2375        tokio::time::sleep(Duration::from_millis(100)).await;
2376
2377        // Send close signal
2378        let _ = end_tx.send(());
2379
2380        // state_sync should exit cleanly
2381        let result = state_sync_handle
2382            .await
2383            .expect("Task should not panic");
2384        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2385
2386        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2387    }
2388
2389    #[test_log::test(tokio::test)]
2390    async fn test_close_signal_during_main_processing_loop() {
2391        // Tests close signal handling during the main delta processing loop.
2392        // This tests the scenario where first message is processed successfully,
2393        // then close signal is received while waiting for subsequent deltas.
2394        // Verifies: close signal in main loop (after initialization) triggers cleanup
2395
2396        let mut rpc_client = make_mock_client();
2397        let mut deltas_client = MockDeltasClient::new();
2398
2399        // Mock the initial components call
2400        rpc_client
2401            .expect_get_protocol_components()
2402            .returning(|_| {
2403                Ok(ProtocolComponentRequestResponse {
2404                    protocol_components: vec![],
2405                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2406                })
2407            });
2408
2409        // Mock the snapshot retrieval that happens after first message
2410        rpc_client
2411            .expect_get_protocol_states()
2412            .returning(|_| {
2413                Ok(ProtocolStateRequestResponse {
2414                    states: vec![],
2415                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2416                })
2417            });
2418
2419        rpc_client
2420            .expect_get_component_tvl()
2421            .returning(|_| {
2422                Ok(ComponentTvlRequestResponse {
2423                    tvl: HashMap::new(),
2424                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2425                })
2426            });
2427
2428        rpc_client
2429            .expect_get_traced_entry_points()
2430            .returning(|_| {
2431                Ok(TracedEntryPointRequestResponse {
2432                    traced_entry_points: HashMap::new(),
2433                    pagination: PaginationResponse::new(0, 20, 0),
2434                })
2435            });
2436
2437        // Set up deltas client to send one message, then keep channel open
2438        let (tx, rx) = channel(10);
2439        deltas_client
2440            .expect_subscribe()
2441            .return_once(move |_, _| {
2442                // Send first message immediately
2443                let first_delta = BlockChanges {
2444                    extractor: "test".to_string(),
2445                    chain: Chain::Ethereum,
2446                    block: Block {
2447                        hash: Bytes::from("0x0123"),
2448                        number: 1,
2449                        parent_hash: Bytes::from("0x0000"),
2450                        chain: Chain::Ethereum,
2451                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2452                            .unwrap()
2453                            .naive_utc(),
2454                    },
2455                    revert: false,
2456                    ..Default::default()
2457                };
2458
2459                tokio::spawn(async move {
2460                    let _ = tx.send(first_delta).await;
2461                    // Keep the sender alive but don't send more messages
2462                    // This will make the recv() block waiting for the next message
2463                    tokio::time::sleep(Duration::from_secs(30)).await;
2464                });
2465
2466                Ok((Uuid::default(), rx))
2467            });
2468
2469        deltas_client
2470            .expect_unsubscribe()
2471            .return_once(|_| Ok(()));
2472
2473        let mut state_sync = ProtocolStateSynchronizer::new(
2474            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2475            true,
2476            ComponentFilter::with_tvl_range(0.0, 1000.0),
2477            1,
2478            Duration::from_secs(0),
2479            true,
2480            false,
2481            true,
2482            ArcRPCClient(Arc::new(rpc_client)),
2483            ArcDeltasClient(Arc::new(deltas_client)),
2484            10000_u64,
2485        );
2486
2487        state_sync
2488            .initialize()
2489            .await
2490            .expect("Init should succeed");
2491
2492        let (mut block_tx, mut block_rx) = channel(10);
2493        let (end_tx, end_rx) = oneshot::channel::<()>();
2494
2495        // Start state_sync in a task
2496        let state_sync_handle = tokio::spawn(async move {
2497            state_sync
2498                .state_sync(&mut block_tx, end_rx)
2499                .await
2500        });
2501
2502        // Wait for the first message to be processed (snapshot sent)
2503        let first_snapshot = block_rx
2504            .recv()
2505            .await
2506            .expect("Should receive first snapshot")
2507            .expect("Synchronizer error");
2508        assert!(
2509            !first_snapshot
2510                .snapshots
2511                .states
2512                .is_empty() ||
2513                first_snapshot.deltas.is_some()
2514        );
2515        // Now send close signal - this should be handled in the main processing loop
2516        let _ = end_tx.send(());
2517
2518        // state_sync should exit cleanly after receiving close signal in main loop
2519        let result = state_sync_handle
2520            .await
2521            .expect("Task should not panic");
2522        assert!(
2523            result.is_ok(),
2524            "state_sync should exit cleanly when closed after first message: {result:?}"
2525        );
2526    }
2527
2528    #[test_log::test(tokio::test)]
2529    async fn test_max_retries_exceeded_error_propagation() {
2530        // Test that when max_retries is exceeded, the final error is sent through the channel
2531        // to the receiver and the synchronizer task exits cleanly
2532
2533        let mut rpc_client = make_mock_client();
2534        let mut deltas_client = MockDeltasClient::new();
2535
2536        // Mock the initial components call to succeed
2537        rpc_client
2538            .expect_get_protocol_components()
2539            .returning(|_| {
2540                Ok(ProtocolComponentRequestResponse {
2541                    protocol_components: vec![],
2542                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2543                })
2544            });
2545
2546        // Set up deltas client to consistently fail after subscription
2547        // This will cause connection errors and trigger retries
2548        deltas_client
2549            .expect_subscribe()
2550            .returning(|_, _| {
2551                // Return a connection error to trigger retries
2552                Err(DeltasError::NotConnected)
2553            });
2554
2555        // Expect multiple unsubscribe calls during retries
2556        deltas_client
2557            .expect_unsubscribe()
2558            .returning(|_| Ok(()))
2559            .times(0..=5);
2560
2561        // Create synchronizer with only 2 retries and short cooldown
2562        let mut state_sync = ProtocolStateSynchronizer::new(
2563            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2564            true,
2565            ComponentFilter::with_tvl_range(0.0, 1000.0),
2566            2,                         // max_retries = 2
2567            Duration::from_millis(10), // short retry cooldown
2568            true,
2569            false,
2570            true,
2571            ArcRPCClient(Arc::new(rpc_client)),
2572            ArcDeltasClient(Arc::new(deltas_client)),
2573            1000_u64,
2574        );
2575
2576        state_sync
2577            .initialize()
2578            .await
2579            .expect("Init should succeed");
2580
2581        // Start the synchronizer - it should fail to subscribe and retry
2582        let (handle, mut rx) = state_sync.start().await;
2583        let (jh, _close_tx) = handle.split();
2584
2585        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2586            .await
2587            .expect("responsds in time")
2588            .expect("channel open");
2589
2590        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2591        if let Err(err) = res {
2592            assert!(
2593                matches!(err, SynchronizerError::ConnectionClosed),
2594                "Expected ConnectionClosed error, got: {:?}",
2595                err
2596            );
2597        } else {
2598            panic!("Expected an error")
2599        }
2600
2601        // The task should complete (not hang) after max retries
2602        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2603        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2604    }
2605
2606    #[test_log::test(tokio::test)]
2607    async fn test_is_next_expected() {
2608        // Test the is_next_expected function to ensure it correctly identifies
2609        // when an incoming block is the expected next block in the chain
2610
2611        let mut state_sync = with_mocked_clients(true, false, None, None);
2612
2613        // Test 1: No previous block - should return false
2614        let incoming_header = BlockHeader {
2615            number: 100,
2616            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2617            parent_hash: Bytes::from(
2618                "0x0000000000000000000000000000000000000000000000000000000000000000",
2619            ),
2620            revert: false,
2621            timestamp: 123456789,
2622            partial_block_index: None,
2623        };
2624        assert!(
2625            !state_sync.is_next_expected(&incoming_header),
2626            "Should return false when no previous block is set"
2627        );
2628
2629        // Test 2: Set a previous block and test with matching parent hash
2630        let previous_header = BlockHeader {
2631            number: 99,
2632            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2633            parent_hash: Bytes::from(
2634                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2635            ),
2636            revert: false,
2637            timestamp: 123456788,
2638            partial_block_index: None,
2639        };
2640        state_sync.last_synced_block = Some(previous_header.clone());
2641
2642        assert!(
2643            state_sync.is_next_expected(&incoming_header),
2644            "Should return true when incoming parent_hash matches previous hash"
2645        );
2646
2647        // Test 3: Test with non-matching parent hash
2648        let non_matching_header = BlockHeader {
2649            number: 100,
2650            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2651            parent_hash: Bytes::from(
2652                "0x1111111111111111111111111111111111111111111111111111111111111111",
2653            ), // Wrong parent hash
2654            revert: false,
2655            timestamp: 123456789,
2656            partial_block_index: None,
2657        };
2658        assert!(
2659            !state_sync.is_next_expected(&non_matching_header),
2660            "Should return false when incoming parent_hash doesn't match previous hash"
2661        );
2662    }
2663
2664    #[test_log::test(tokio::test)]
2665    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2666        // Test that on synchronizer restart with the next expected block,
2667        // get_snapshot is not called and only deltas are sent
2668
2669        let mut rpc_client = make_mock_client();
2670        let mut deltas_client = MockDeltasClient::new();
2671
2672        // Mock the initial components call
2673        rpc_client
2674            .expect_get_protocol_components()
2675            .returning(|_| {
2676                Ok(ProtocolComponentRequestResponse {
2677                    protocol_components: vec![ProtocolComponent {
2678                        id: "Component1".to_string(),
2679                        ..Default::default()
2680                    }],
2681                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2682                })
2683            });
2684
2685        // Set up deltas client to send a message that is the next expected block
2686        let (tx, rx) = channel(10);
2687        deltas_client
2688            .expect_subscribe()
2689            .return_once(move |_, _| {
2690                let expected_next_delta = BlockChanges {
2691                    extractor: "uniswap-v2".to_string(),
2692                    chain: Chain::Ethereum,
2693                    block: Block {
2694                        hash: Bytes::from(
2695                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2696                        ), // This will be the next expected block
2697                        number: 2,
2698                        parent_hash: Bytes::from(
2699                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2700                        ), // This matches our last synced block hash
2701                        chain: Chain::Ethereum,
2702                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2703                            .unwrap()
2704                            .naive_utc(),
2705                    },
2706                    revert: false,
2707                    ..Default::default()
2708                };
2709
2710                tokio::spawn(async move {
2711                    let _ = tx.send(expected_next_delta).await;
2712                });
2713
2714                Ok((Uuid::default(), rx))
2715            });
2716
2717        deltas_client
2718            .expect_unsubscribe()
2719            .return_once(|_| Ok(()));
2720
2721        let mut state_sync = ProtocolStateSynchronizer::new(
2722            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2723            true,
2724            ComponentFilter::with_tvl_range(0.0, 1000.0),
2725            1,
2726            Duration::from_secs(0),
2727            true, // include_snapshots = true
2728            false,
2729            true,
2730            ArcRPCClient(Arc::new(rpc_client)),
2731            ArcDeltasClient(Arc::new(deltas_client)),
2732            10000_u64,
2733        );
2734
2735        // Initialize and set up the last synced block to simulate a restart scenario
2736        state_sync
2737            .initialize()
2738            .await
2739            .expect("Init should succeed");
2740
2741        // Set last_synced_block to simulate that we've previously synced block 1
2742        state_sync.last_synced_block = Some(BlockHeader {
2743            number: 1,
2744            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2745            parent_hash: Bytes::from(
2746                "0x0000000000000000000000000000000000000000000000000000000000000000",
2747            ),
2748            revert: false,
2749            timestamp: 123456789,
2750            partial_block_index: None,
2751        });
2752
2753        let (mut block_tx, mut block_rx) = channel(10);
2754        let (end_tx, end_rx) = oneshot::channel::<()>();
2755
2756        // Start state_sync
2757        let state_sync_handle = tokio::spawn(async move {
2758            state_sync
2759                .state_sync(&mut block_tx, end_rx)
2760                .await
2761        });
2762
2763        // Wait for the message - it should be a delta-only message (no snapshots)
2764        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2765            .await
2766            .expect("Should receive message within timeout")
2767            .expect("Channel should be open")
2768            .expect("Should not be an error");
2769
2770        // Send close signal
2771        let _ = end_tx.send(());
2772
2773        // Wait for state_sync to finish
2774        let _ = state_sync_handle
2775            .await
2776            .expect("Task should not panic");
2777
2778        // Verify the message contains deltas but no snapshots
2779        // (because we skipped snapshot retrieval)
2780        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2781        assert!(
2782            result_msg.snapshots.states.is_empty(),
2783            "Should not contain snapshots when next expected block is received"
2784        );
2785
2786        // Verify the block details match our expected next block
2787        if let Some(deltas) = &result_msg.deltas {
2788            assert_eq!(deltas.block.number, 2);
2789            assert_eq!(
2790                deltas.block.hash,
2791                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2792            );
2793            assert_eq!(
2794                deltas.block.parent_hash,
2795                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2796            );
2797        }
2798    }
2799
2800    #[test_log::test(tokio::test)]
2801    async fn test_skip_previously_processed_messages() {
2802        // Test that the synchronizer skips messages for blocks that have already been processed
2803        // This simulates a service restart scenario where old messages are re-emitted
2804
2805        let mut rpc_client = make_mock_client();
2806        let mut deltas_client = MockDeltasClient::new();
2807
2808        // Mock the initial components call
2809        rpc_client
2810            .expect_get_protocol_components()
2811            .returning(|_| {
2812                Ok(ProtocolComponentRequestResponse {
2813                    protocol_components: vec![ProtocolComponent {
2814                        id: "Component1".to_string(),
2815                        ..Default::default()
2816                    }],
2817                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2818                })
2819            });
2820
2821        // Mock snapshot calls for when we process the expected next block (block 6)
2822        rpc_client
2823            .expect_get_protocol_states()
2824            .returning(|_| {
2825                Ok(ProtocolStateRequestResponse {
2826                    states: vec![ResponseProtocolState {
2827                        component_id: "Component1".to_string(),
2828                        ..Default::default()
2829                    }],
2830                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2831                })
2832            });
2833
2834        rpc_client
2835            .expect_get_component_tvl()
2836            .returning(|_| {
2837                Ok(ComponentTvlRequestResponse {
2838                    tvl: [("Component1".to_string(), 100.0)]
2839                        .into_iter()
2840                        .collect(),
2841                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2842                })
2843            });
2844
2845        rpc_client
2846            .expect_get_traced_entry_points()
2847            .returning(|_| {
2848                Ok(TracedEntryPointRequestResponse {
2849                    traced_entry_points: HashMap::new(),
2850                    pagination: PaginationResponse::new(0, 20, 0),
2851                })
2852            });
2853
2854        // Set up deltas client to send old messages first, then the expected next block
2855        let (tx, rx) = channel(10);
2856        deltas_client
2857            .expect_subscribe()
2858            .return_once(move |_, _| {
2859                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2860                let old_messages = vec![
2861                    BlockChanges {
2862                        extractor: "uniswap-v2".to_string(),
2863                        chain: Chain::Ethereum,
2864                        block: Block {
2865                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2866                            number: 3,
2867                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2868                            chain: Chain::Ethereum,
2869                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2870                        },
2871                        revert: false,
2872                        ..Default::default()
2873                    },
2874                    BlockChanges {
2875                        extractor: "uniswap-v2".to_string(),
2876                        chain: Chain::Ethereum,
2877                        block: Block {
2878                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2879                            number: 4,
2880                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2881                            chain: Chain::Ethereum,
2882                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2883                        },
2884                        revert: false,
2885                        ..Default::default()
2886                    },
2887                    BlockChanges {
2888                        extractor: "uniswap-v2".to_string(),
2889                        chain: Chain::Ethereum,
2890                        block: Block {
2891                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2892                            number: 5,
2893                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2894                            chain: Chain::Ethereum,
2895                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2896                        },
2897                        revert: false,
2898                        ..Default::default()
2899                    },
2900                    // This is the expected next block (block 6)
2901                    BlockChanges {
2902                        extractor: "uniswap-v2".to_string(),
2903                        chain: Chain::Ethereum,
2904                        block: Block {
2905                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2906                            number: 6,
2907                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2908                            chain: Chain::Ethereum,
2909                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2910                        },
2911                        revert: false,
2912                        ..Default::default()
2913                    },
2914                ];
2915
2916                tokio::spawn(async move {
2917                    for message in old_messages {
2918                        let _ = tx.send(message).await;
2919                        tokio::time::sleep(Duration::from_millis(10)).await;
2920                    }
2921                });
2922
2923                Ok((Uuid::default(), rx))
2924            });
2925
2926        deltas_client
2927            .expect_unsubscribe()
2928            .return_once(|_| Ok(()));
2929
2930        let mut state_sync = ProtocolStateSynchronizer::new(
2931            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2932            true,
2933            ComponentFilter::with_tvl_range(0.0, 1000.0),
2934            1,
2935            Duration::from_secs(0),
2936            true,
2937            true,
2938            true,
2939            ArcRPCClient(Arc::new(rpc_client)),
2940            ArcDeltasClient(Arc::new(deltas_client)),
2941            10000_u64,
2942        );
2943
2944        // Initialize and set last_synced_block to simulate we've already processed block 5
2945        state_sync
2946            .initialize()
2947            .await
2948            .expect("Init should succeed");
2949
2950        state_sync.last_synced_block = Some(BlockHeader {
2951            number: 5,
2952            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2953            parent_hash: Bytes::from(
2954                "0x0000000000000000000000000000000000000000000000000000000000000004",
2955            ),
2956            revert: false,
2957            timestamp: 1234567892,
2958            partial_block_index: None,
2959        });
2960
2961        let (mut block_tx, mut block_rx) = channel(10);
2962        let (end_tx, end_rx) = oneshot::channel::<()>();
2963
2964        // Start state_sync
2965        let state_sync_handle = tokio::spawn(async move {
2966            state_sync
2967                .state_sync(&mut block_tx, end_rx)
2968                .await
2969        });
2970
2971        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2972        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2973            .await
2974            .expect("Should receive message within timeout")
2975            .expect("Channel should be open")
2976            .expect("Should not be an error");
2977
2978        // Send close signal
2979        let _ = end_tx.send(());
2980
2981        // Wait for state_sync to finish
2982        let _ = state_sync_handle
2983            .await
2984            .expect("Task should not panic");
2985
2986        // Verify we only got the message for block 6 (the expected next block)
2987        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2988        if let Some(deltas) = &result_msg.deltas {
2989            assert_eq!(
2990                deltas.block.number, 6,
2991                "Should only process block 6, skipping earlier blocks"
2992            );
2993            assert_eq!(
2994                deltas.block.hash,
2995                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2996            );
2997        }
2998
2999        // Verify that no additional messages are received immediately
3000        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
3001        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3002            Err(_) => {
3003                // Timeout is expected - no more messages should come
3004            }
3005            Ok(Some(Err(_))) => {
3006                // Error received is also acceptable (connection closed)
3007            }
3008            Ok(Some(Ok(_))) => {
3009                panic!("Should not receive additional messages - old blocks should be skipped");
3010            }
3011            Ok(None) => {
3012                // Channel closed is also acceptable
3013            }
3014        }
3015    }
3016
3017    fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
3018        // Use vec to create Bytes from block number
3019        let hash = Bytes::from(vec![block_num as u8; 32]);
3020        let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
3021        BlockChanges {
3022            extractor: "uniswap-v2".to_string(),
3023            chain: Chain::Ethereum,
3024            block: Block {
3025                number: block_num,
3026                hash,
3027                parent_hash,
3028                chain: Chain::Ethereum,
3029                ts: Default::default(),
3030            },
3031            revert: false,
3032            partial_block_index: partial_idx,
3033            ..Default::default()
3034        }
3035    }
3036
3037    /// Test that full block as first message in partial mode is accepted
3038    #[test_log::test(tokio::test)]
3039    async fn test_partial_mode_accepts_full_block_as_first_message() {
3040        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3041        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3042            .with_partial_blocks(true);
3043        state_sync
3044            .initialize()
3045            .await
3046            .expect("Init failed");
3047
3048        let (handle, mut block_rx) = state_sync.start().await;
3049        let (jh, close_tx) = handle.split();
3050
3051        // Send full block as first message - should be accepted
3052        tx.send(make_block_changes(1, None))
3053            .await
3054            .unwrap();
3055
3056        // Should receive the full block immediately
3057        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3058            .await
3059            .expect("Should receive message")
3060            .expect("Channel open")
3061            .expect("No error");
3062
3063        assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
3064        assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
3065
3066        let _ = close_tx.send(());
3067        jh.await.expect("Task should not panic");
3068    }
3069
3070    /// Test that block number increase is detected as new block
3071    #[test_log::test(tokio::test)]
3072    async fn test_partial_mode_detects_block_number_increase() {
3073        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3074        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3075            .with_partial_blocks(true);
3076        state_sync
3077            .initialize()
3078            .await
3079            .expect("Init failed");
3080
3081        let (handle, mut block_rx) = state_sync.start().await;
3082        let (jh, close_tx) = handle.split();
3083
3084        // Send partial messages for block 1 (will be skipped - waiting for new block)
3085        tx.send(make_block_changes(1, Some(0)))
3086            .await
3087            .unwrap();
3088        tx.send(make_block_changes(1, Some(3)))
3089            .await
3090            .unwrap();
3091
3092        // Verify no message received yet
3093        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3094            Err(_) => { /* Expected: timeout, no message yet */ }
3095            Ok(_) => panic!("Should not receive message while waiting for new block"),
3096        }
3097
3098        // Send partial for block 2 with HIGHER index (5 > 3) - should still be detected
3099        // because block number increased
3100        tx.send(make_block_changes(2, Some(5)))
3101            .await
3102            .unwrap();
3103
3104        // Should receive the message for block 2
3105        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3106            .await
3107            .expect("Should receive message")
3108            .expect("Channel open")
3109            .expect("No error");
3110
3111        assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3112        assert_eq!(msg.header.partial_block_index, Some(5));
3113
3114        let _ = close_tx.send(());
3115        jh.await.expect("Task should not panic");
3116    }
3117
3118    /// Test that partial mode skips new blocks that are already synced
3119    #[test_log::test(tokio::test)]
3120    async fn test_partial_mode_skips_already_synced_blocks() {
3121        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3122        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3123            .with_partial_blocks(true);
3124        state_sync
3125            .initialize()
3126            .await
3127            .expect("Init failed");
3128
3129        // Set last_synced_block to block 5 - we've already synced up to here
3130        state_sync.last_synced_block = Some(BlockHeader {
3131            number: 5,
3132            hash: Bytes::from("0x05"),
3133            parent_hash: Bytes::from("0x04"),
3134            revert: false,
3135            timestamp: 0,
3136            partial_block_index: None,
3137        });
3138
3139        let (handle, mut block_rx) = state_sync.start().await;
3140        let (jh, close_tx) = handle.split();
3141
3142        // Send partial for block 3 to establish baseline
3143        tx.send(make_block_changes(3, Some(2)))
3144            .await
3145            .unwrap();
3146
3147        // Send "new block" for block 4 (partial index decreased) - but block 4 < last_synced (5)
3148        tx.send(make_block_changes(4, Some(0)))
3149            .await
3150            .unwrap();
3151
3152        // Should be skipped because block 4 is already synced
3153        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3154            Err(_) => { /* Expected: skipped because already synced */ }
3155            Ok(_) => panic!("Should skip block 4 because it's already synced"),
3156        }
3157
3158        // Now send new block for block 6 (after last_synced)
3159        // First establish new partial index
3160        tx.send(make_block_changes(5, Some(3)))
3161            .await
3162            .unwrap();
3163        // Then trigger new block detection
3164        tx.send(make_block_changes(6, Some(0)))
3165            .await
3166            .unwrap();
3167
3168        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3169            .await
3170            .expect("Should receive message")
3171            .expect("Channel open")
3172            .expect("No error");
3173
3174        assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3175
3176        let _ = close_tx.send(());
3177        jh.await.expect("Task should not panic");
3178    }
3179
3180    #[test_log::test(tokio::test)]
3181    async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3182        let header = BlockHeader::default();
3183        let mut rpc = make_mock_client();
3184        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3185
3186        let component_clone = component.clone();
3187        rpc.expect_get_snapshots()
3188            .returning(move |_request, _chunk_size, _concurrency| {
3189                Ok(Snapshot {
3190                    states: [(
3191                        "Component1".to_string(),
3192                        ComponentWithState {
3193                            state: ResponseProtocolState {
3194                                component_id: "Component1".to_string(),
3195                                ..Default::default()
3196                            },
3197                            component: component_clone.clone(),
3198                            entrypoints: vec![],
3199                            component_tvl: None,
3200                        },
3201                    )]
3202                    .into_iter()
3203                    .collect(),
3204                    vm_storage: HashMap::new(),
3205                })
3206            });
3207
3208        // get_traced_entry_points should NOT be called for a non-DCI protocol
3209        rpc.expect_get_traced_entry_points()
3210            .never();
3211
3212        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3213        // uses_dci defaults to false, no .with_dci() call needed
3214        state_sync
3215            .component_tracker
3216            .components
3217            .insert("Component1".to_string(), component);
3218
3219        let components_arg = ["Component1".to_string()];
3220        let snap = state_sync
3221            .get_snapshots(header, Some(&components_arg))
3222            .await
3223            .expect("Retrieving snapshot failed");
3224
3225        assert!(snap
3226            .snapshots
3227            .states
3228            .contains_key("Component1"));
3229    }
3230
3231    #[test_log::test(tokio::test)]
3232    async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3233        let header = BlockHeader::default();
3234        let mut rpc = make_mock_client();
3235        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3236
3237        let component_clone = component.clone();
3238        rpc.expect_get_snapshots()
3239            .returning(move |_request, _chunk_size, _concurrency| {
3240                Ok(Snapshot {
3241                    states: [(
3242                        "Component1".to_string(),
3243                        ComponentWithState {
3244                            state: ResponseProtocolState {
3245                                component_id: "Component1".to_string(),
3246                                ..Default::default()
3247                            },
3248                            component: component_clone.clone(),
3249                            entrypoints: vec![],
3250                            component_tvl: None,
3251                        },
3252                    )]
3253                    .into_iter()
3254                    .collect(),
3255                    vm_storage: HashMap::new(),
3256                })
3257            });
3258
3259        // get_traced_entry_points SHOULD be called for a DCI protocol
3260        rpc.expect_get_traced_entry_points()
3261            .times(1)
3262            .returning(|_| {
3263                Ok(TracedEntryPointRequestResponse {
3264                    traced_entry_points: HashMap::new(),
3265                    pagination: PaginationResponse::new(0, 20, 0),
3266                })
3267            });
3268
3269        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3270        state_sync
3271            .component_tracker
3272            .components
3273            .insert("Component1".to_string(), component);
3274
3275        let components_arg = ["Component1".to_string()];
3276        let snap = state_sync
3277            .get_snapshots(header, Some(&components_arg))
3278            .await
3279            .expect("Retrieving snapshot failed");
3280
3281        assert!(snap
3282            .snapshots
3283            .states
3284            .contains_key("Component1"));
3285    }
3286
3287    /// Test that in partial_blocks mode:
3288    /// - Preexisting components (in to_add but not in new_protocol_components) get their snapshot
3289    ///   requested immediately at the previous block and included in the same message.
3290    /// - Brand-new components (in new_protocol_components) have their snapshot deferred to the
3291    ///   first message of the next block, then requested at the completed previous block.
3292    #[test_log::test(tokio::test)]
3293    async fn test_partial_mode_defers_brand_new_component_snapshot_to_next_block() {
3294        use std::time::Duration;
3295
3296        use tokio::{sync::mpsc::channel, time::timeout};
3297
3298        let mut rpc_client = make_mock_client();
3299        // get_protocol_components for BrandNew + Preexisting when start_tracking is called (block
3300        // 2)
3301        rpc_client
3302            .expect_get_protocol_components()
3303            .with(mockall::predicate::function(|req: &ProtocolComponentsRequestBody| {
3304                req.component_ids
3305                    .as_ref()
3306                    .is_some_and(|ids| ids.contains(&"BrandNew".to_string()))
3307            }))
3308            .returning(|_| {
3309                Ok(ProtocolComponentRequestResponse {
3310                    protocol_components: vec![
3311                        ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3312                        ProtocolComponent { id: "Preexisting".to_string(), ..Default::default() },
3313                    ],
3314                    pagination: PaginationResponse { page: 0, page_size: 20, total: 2 },
3315                })
3316            });
3317        // get_protocol_components for initial sync (via get_protocol_components_paginated)
3318        rpc_client
3319            .expect_get_protocol_components()
3320            .returning(|_| {
3321                Ok(ProtocolComponentRequestResponse {
3322                    protocol_components: vec![
3323                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
3324                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
3325                    ],
3326                    pagination: PaginationResponse { page: 0, page_size: 20, total: 2 },
3327                })
3328            });
3329        // get_snapshots: more specific expectations first so init (block 0, all components)
3330        // does not match the block 1/2 ones.
3331        // get_snapshots for deferred flush: block 2, BrandNew (when processing block 3)
3332        rpc_client
3333            .expect_get_snapshots()
3334            .withf(
3335                |request: &SnapshotParameters,
3336                 _chunk_size: &Option<usize>,
3337                 _concurrency: &usize| {
3338                    request.block_number == 2 &&
3339                        request
3340                            .components
3341                            .contains_key("BrandNew")
3342                },
3343            )
3344            .returning(|_request, _chunk_size, _concurrency| {
3345                Ok(Snapshot {
3346                    states: [(
3347                        "BrandNew".to_string(),
3348                        ComponentWithState {
3349                            state: ResponseProtocolState {
3350                                component_id: "BrandNew".to_string(),
3351                                ..Default::default()
3352                            },
3353                            component: ProtocolComponent {
3354                                id: "BrandNew".to_string(),
3355                                ..Default::default()
3356                            },
3357                            component_tvl: Some(100.0),
3358                            entrypoints: vec![],
3359                        },
3360                    )]
3361                    .into_iter()
3362                    .collect(),
3363                    vm_storage: HashMap::new(),
3364                })
3365            });
3366        // get_snapshots for preexisting: block 1, Preexisting (when processing block 2)
3367        rpc_client
3368            .expect_get_snapshots()
3369            .withf(
3370                |request: &SnapshotParameters,
3371                 _chunk_size: &Option<usize>,
3372                 _concurrency: &usize| {
3373                    request.block_number == 1 &&
3374                        request
3375                            .components
3376                            .contains_key("Preexisting")
3377                },
3378            )
3379            .returning(|_request, _chunk_size, _concurrency| {
3380                Ok(Snapshot {
3381                    states: [(
3382                        "Preexisting".to_string(),
3383                        ComponentWithState {
3384                            state: ResponseProtocolState {
3385                                component_id: "Preexisting".to_string(),
3386                                ..Default::default()
3387                            },
3388                            component: ProtocolComponent {
3389                                id: "Preexisting".to_string(),
3390                                ..Default::default()
3391                            },
3392                            component_tvl: Some(75.0),
3393                            entrypoints: vec![],
3394                        },
3395                    )]
3396                    .into_iter()
3397                    .collect(),
3398                    vm_storage: HashMap::new(),
3399                })
3400            });
3401        // get_snapshots for initial sync (block 0, all components)
3402        rpc_client
3403            .expect_get_snapshots()
3404            .returning(|_request, _chunk_size, _concurrency| {
3405                Ok(Snapshot {
3406                    states: [
3407                        (
3408                            "Component1".to_string(),
3409                            ComponentWithState {
3410                                state: ResponseProtocolState {
3411                                    component_id: "Component1".to_string(),
3412                                    ..Default::default()
3413                                },
3414                                component: ProtocolComponent {
3415                                    id: "Component1".to_string(),
3416                                    ..Default::default()
3417                                },
3418                                component_tvl: Some(100.0),
3419                                entrypoints: vec![],
3420                            },
3421                        ),
3422                        (
3423                            "Component2".to_string(),
3424                            ComponentWithState {
3425                                state: ResponseProtocolState {
3426                                    component_id: "Component2".to_string(),
3427                                    ..Default::default()
3428                                },
3429                                component: ProtocolComponent {
3430                                    id: "Component2".to_string(),
3431                                    ..Default::default()
3432                                },
3433                                component_tvl: Some(0.0),
3434                                entrypoints: vec![],
3435                            },
3436                        ),
3437                    ]
3438                    .into_iter()
3439                    .collect(),
3440                    vm_storage: HashMap::new(),
3441                })
3442            });
3443        rpc_client
3444            .expect_get_traced_entry_points()
3445            .returning(|_| {
3446                Ok(TracedEntryPointRequestResponse {
3447                    traced_entry_points: HashMap::new(),
3448                    pagination: PaginationResponse::new(0, 100, 0),
3449                })
3450            });
3451
3452        let mut deltas_client = MockDeltasClient::new();
3453        let (tx, rx) = channel(1);
3454        deltas_client
3455            .expect_subscribe()
3456            .return_once(move |_, _| Ok((Uuid::default(), rx)));
3457        deltas_client
3458            .expect_unsubscribe()
3459            .return_once(|_| Ok(()));
3460
3461        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3462            .with_partial_blocks(true);
3463        state_sync
3464            .initialize()
3465            .await
3466            .expect("Init failed");
3467
3468        let (handle, mut block_rx) = state_sync.start().await;
3469        let (jh, close_tx) = handle.split();
3470
3471        // Block 1 (full): used for initial sync merge
3472        tx.send(make_block_changes(1, None))
3473            .await
3474            .unwrap();
3475        let _msg1 = timeout(Duration::from_millis(200), block_rx.recv())
3476            .await
3477            .expect("Should receive initial + block 1")
3478            .expect("Channel open")
3479            .expect("No error");
3480
3481        // Block 2 partial (index 2): BrandNew (in new_protocol_components → deferred) and
3482        // Preexisting (not in new_protocol_components → snapshot requested immediately at block 1).
3483        let mut block2 = make_block_changes(2, Some(2));
3484        block2.new_protocol_components = HashMap::from([(
3485            "BrandNew".to_string(),
3486            ProtocolComponent { id: "BrandNew".to_string(), ..Default::default() },
3487        )]);
3488        block2.component_tvl = HashMap::from([
3489            ("BrandNew".to_string(), 100.0),
3490            ("Preexisting".to_string(), 75.0), // > add_tvl (50), not in new_protocol_components
3491        ]);
3492        tx.send(block2).await.unwrap();
3493        let msg2 = timeout(Duration::from_millis(200), block_rx.recv())
3494            .await
3495            .expect("Should receive block 2")
3496            .expect("Channel open")
3497            .expect("No error");
3498
3499        assert!(
3500            msg2.snapshots.states.contains_key("Preexisting"),
3501            "Preexisting component snapshot should be requested immediately in same block; got keys: {:?}",
3502            msg2.snapshots.states.keys().collect::<Vec<_>>()
3503        );
3504        assert!(
3505            !msg2
3506                .snapshots
3507                .states
3508                .contains_key("BrandNew"),
3509            "Brand-new component snapshot should be deferred, not in block 2 message"
3510        );
3511
3512        // Block 3 partial (index 1): first partial of next block triggers flush of deferred
3513        // snapshot at block 2
3514        tx.send(make_block_changes(3, Some(1)))
3515            .await
3516            .unwrap();
3517        let msg3 = timeout(Duration::from_millis(200), block_rx.recv())
3518            .await
3519            .expect("Should receive block 3")
3520            .expect("Channel open")
3521            .expect("No error");
3522
3523        assert_eq!(msg3.header.number, 3);
3524        assert_eq!(msg3.header.partial_block_index, Some(1), "First partial of block 3");
3525        assert!(
3526            msg3.snapshots.states.contains_key("BrandNew"),
3527            "Deferred brand-new component snapshot should be included in next block message; got keys: {:?}",
3528            msg3.snapshots.states.keys().collect::<Vec<_>>()
3529        );
3530
3531        let _ = close_tx.send(());
3532        jh.await.expect("Task should not panic");
3533    }
3534}