Skip to main content

tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59
60    /// Internal error that should not happen under normal operation.
61    #[error("Internal error: {0}")]
62    Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68    fn from(err: SendError<T>) -> Self {
69        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70    }
71}
72
73impl From<DeltasError> for SynchronizerError {
74    fn from(err: DeltasError) -> Self {
75        match err {
76            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77            _ => SynchronizerError::ConnectionError(err.to_string()),
78        }
79    }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83    extractor_id: ExtractorIdentity,
84    retrieve_balances: bool,
85    rpc_client: R,
86    deltas_client: D,
87    max_retries: u64,
88    retry_cooldown: Duration,
89    include_snapshots: bool,
90    component_tracker: ComponentTracker<R>,
91    last_synced_block: Option<BlockHeader>,
92    timeout: u64,
93    include_tvl: bool,
94    compression: bool,
95    partial_blocks: bool,
96    uses_dci: bool,
97}
98
99#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
100pub struct ComponentWithState {
101    pub state: ResponseProtocolState,
102    pub component: ProtocolComponent,
103    pub component_tvl: Option<f64>,
104    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
105}
106
107#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
108pub struct Snapshot {
109    pub states: HashMap<String, ComponentWithState>,
110    pub vm_storage: HashMap<Bytes, ResponseAccount>,
111}
112
113impl Snapshot {
114    fn extend(&mut self, other: Snapshot) {
115        self.states.extend(other.states);
116        self.vm_storage.extend(other.vm_storage);
117    }
118
119    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
120        &self.states
121    }
122
123    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
124        &self.vm_storage
125    }
126}
127
128#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
129pub struct StateSyncMessage<H>
130where
131    H: HeaderLike,
132{
133    /// The block information for this update.
134    pub header: H,
135    /// Snapshot for new components.
136    pub snapshots: Snapshot,
137    /// A single delta contains state updates for all tracked components, as well as additional
138    /// information about the system components e.g. newly added components (even below tvl), tvl
139    /// updates, balance updates.
140    pub deltas: Option<BlockChanges>,
141    /// Components that stopped being tracked.
142    pub removed_components: HashMap<String, ProtocolComponent>,
143}
144
145impl<H> StateSyncMessage<H>
146where
147    H: HeaderLike,
148{
149    pub fn merge(mut self, other: Self) -> Self {
150        // be careful with removed and snapshots attributes here, these can be ambiguous.
151        self.removed_components
152            .retain(|k, _| !other.snapshots.states.contains_key(k));
153        self.snapshots
154            .states
155            .retain(|k, _| !other.removed_components.contains_key(k));
156
157        self.snapshots.extend(other.snapshots);
158        let deltas = match (self.deltas, other.deltas) {
159            (Some(l), Some(r)) => Some(l.merge(r)),
160            (None, Some(r)) => Some(r),
161            (Some(l), None) => Some(l),
162            (None, None) => None,
163        };
164        self.removed_components
165            .extend(other.removed_components);
166        Self {
167            header: other.header,
168            snapshots: self.snapshots,
169            deltas,
170            removed_components: self.removed_components,
171        }
172    }
173}
174
175/// Handle for controlling a running synchronizer task.
176///
177/// This handle provides methods to gracefully shut down the synchronizer
178/// and await its completion with a timeout.
179pub struct SynchronizerTaskHandle {
180    join_handle: JoinHandle<()>,
181    close_tx: oneshot::Sender<()>,
182}
183
184/// StateSynchronizer
185///
186/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
187/// delivering messages to the client that let him reconstruct subsets of the protocol state.
188///
189/// This involves deciding which components to track according to the clients preferences,
190/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
191/// delivering delta messages for the components that have changed.
192impl SynchronizerTaskHandle {
193    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
194        Self { join_handle, close_tx }
195    }
196
197    /// Splits the handle into its join handle and close sender.
198    ///
199    /// This allows monitoring the task completion separately from controlling shutdown.
200    /// The join handle can be used with FuturesUnordered for monitoring, while the
201    /// close sender can be used to signal graceful shutdown.
202    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
203        (self.join_handle, self.close_tx)
204    }
205}
206
207#[async_trait]
208pub trait StateSynchronizer: Send + Sync + 'static {
209    async fn initialize(&mut self) -> SyncResult<()>;
210    /// Starts the state synchronization, consuming the synchronizer.
211    /// Returns a handle for controlling the running task and a receiver for messages.
212    async fn start(
213        mut self,
214    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
215}
216
217impl<R, D> ProtocolStateSynchronizer<R, D>
218where
219    // TODO: Consider moving these constraints directly to the
220    // client...
221    R: RPCClient + Clone + Send + Sync + 'static,
222    D: DeltasClient + Clone + Send + Sync + 'static,
223{
224    /// Creates a new state synchronizer.
225    #[allow(clippy::too_many_arguments)]
226    pub fn new(
227        extractor_id: ExtractorIdentity,
228        retrieve_balances: bool,
229        component_filter: ComponentFilter,
230        max_retries: u64,
231        retry_cooldown: Duration,
232        include_snapshots: bool,
233        include_tvl: bool,
234        compression: bool,
235        rpc_client: R,
236        deltas_client: D,
237        timeout: u64,
238    ) -> Self {
239        Self {
240            extractor_id: extractor_id.clone(),
241            retrieve_balances,
242            rpc_client: rpc_client.clone(),
243            include_snapshots,
244            deltas_client,
245            component_tracker: ComponentTracker::new(
246                extractor_id.chain,
247                extractor_id.name.as_str(),
248                component_filter,
249                rpc_client,
250            ),
251            max_retries,
252            retry_cooldown,
253            last_synced_block: None,
254            timeout,
255            include_tvl,
256            compression,
257            partial_blocks: false,
258            uses_dci: false,
259        }
260    }
261
262    /// Sets whether this protocol uses Dynamic Contract Indexing (DCI).
263    /// When true, entrypoints will be fetched during snapshot retrieval.
264    pub fn with_dci(mut self, uses_dci: bool) -> Self {
265        self.uses_dci = uses_dci;
266        self
267    }
268
269    /// Enables receiving partial block updates.
270    pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
271        self.partial_blocks = partial_blocks;
272        self
273    }
274
275    /// Retrieves state snapshots of the requested components
276    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
277        &mut self,
278        header: BlockHeader,
279        ids: Option<I>,
280    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
281        if !self.include_snapshots {
282            return Ok(StateSyncMessage { header, ..Default::default() });
283        }
284
285        // Use given ids or use all if not passed
286        let component_ids: Vec<_> = match ids {
287            Some(ids) => ids.into_iter().cloned().collect(),
288            None => self
289                .component_tracker
290                .get_tracked_component_ids(),
291        };
292
293        if component_ids.is_empty() {
294            return Ok(StateSyncMessage { header, ..Default::default() });
295        }
296
297        let entrypoints_result = if self.uses_dci {
298            let result = self
299                .rpc_client
300                .get_traced_entry_points_paginated(
301                    self.extractor_id.chain,
302                    &self.extractor_id.name,
303                    &component_ids,
304                    None,
305                    RPC_CLIENT_CONCURRENCY,
306                )
307                .await?;
308            self.component_tracker
309                .process_entrypoints(&result.clone().into());
310            result.traced_entry_points.clone()
311        } else {
312            HashMap::new()
313        };
314
315        // Get contract IDs from component tracker
316        let contract_ids: Vec<Bytes> = self
317            .component_tracker
318            .get_contracts_by_component(&component_ids)
319            .into_iter()
320            .collect();
321
322        // Filter components to only include the requested ones
323        let filtered_components: HashMap<_, _> = self
324            .component_tracker
325            .components
326            .iter()
327            .filter(|(id, _)| component_ids.contains(id))
328            .map(|(k, v)| (k.clone(), v.clone()))
329            .collect();
330
331        let request = SnapshotParameters::new(
332            self.extractor_id.chain,
333            &self.extractor_id.name,
334            &filtered_components,
335            &contract_ids,
336            header.number,
337        )
338        .entrypoints(&entrypoints_result)
339        .include_balances(self.retrieve_balances)
340        .include_tvl(self.include_tvl);
341        let snapshot_response = self
342            .rpc_client
343            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
344            .await?;
345
346        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
347        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
348
349        Ok(StateSyncMessage {
350            header,
351            snapshots: snapshot_response,
352            deltas: None,
353            removed_components: HashMap::new(),
354        })
355    }
356
357    /// Main method that does all the work.
358    ///
359    /// ## Return Value
360    ///
361    /// Returns a `Result` where:
362    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
363    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
364    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
365    ///   retry)
366    ///
367    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
368    /// signal may still arrive and we want to remain cancellable across retries.
369    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
370    async fn state_sync(
371        &mut self,
372        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
373        mut end_rx: oneshot::Receiver<()>,
374    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
375        // initialisation
376        let subscription_options = SubscriptionOptions::new()
377            .with_state(self.include_snapshots)
378            .with_compression(self.compression)
379            .with_partial_blocks(self.partial_blocks);
380        let (subscription_id, mut msg_rx) = match self
381            .deltas_client
382            .subscribe(self.extractor_id.clone(), subscription_options)
383            .await
384        {
385            Ok(result) => result,
386            Err(e) => return Err((e.into(), Some(end_rx))),
387        };
388
389        let result = async {
390            info!("Waiting for deltas...");
391            let mut warned_waiting_for_new_block = false;
392            let mut warned_skipping_synced = false;
393            // Track the last seen block number such that we know when we get the first partial
394            let mut last_block_number: Option<u64> = None;
395            let mut first_msg = loop {
396                let msg = select! {
397                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
398                        deltas_result
399                            .map_err(|_| {
400                                SynchronizerError::Timeout(format!(
401                                    "First deltas took longer than {t}s to arrive",
402                                    t = self.timeout
403                                ))
404                            })?
405                            .ok_or_else(|| {
406                                SynchronizerError::ConnectionError(
407                                    "Deltas channel closed before first message".to_string(),
408                                )
409                            })?
410                    },
411                    _ = &mut end_rx => {
412                        info!("Received close signal while waiting for first deltas");
413                        return Ok(());
414                    }
415                };
416
417                let incoming: BlockHeader = (&msg).into();
418
419                // Determine if this message is a candidate for starting synchronization.
420                // In partial mode, we wait for a new block to start (block number increases).
421                // In non-partial mode, all messages are candidates.
422                let is_new_block_candidate = if self.partial_blocks {
423                    match msg.partial_block_index {
424                        None => {
425                            // If we get a full block, it is a candidate
426                            last_block_number = Some(incoming.number);
427                            true
428                        }
429                        Some(current_partial_idx) => {
430                            let is_new_block = last_block_number
431                                .map(|prev_block| incoming.number > prev_block)
432                                .unwrap_or(false);
433
434                            if !warned_waiting_for_new_block {
435                                info!(
436                                    extractor=%self.extractor_id,
437                                    block=incoming.number,
438                                    partial_idx=current_partial_idx,
439                                    "Syncing. Waiting for new block to start"
440                                );
441                                warned_waiting_for_new_block = true;
442                            }
443                            last_block_number = Some(incoming.number);
444                            is_new_block
445                        }
446                    }
447                } else {
448                    true // Non-partial mode: all messages are candidates
449                };
450
451                if !is_new_block_candidate {
452                    continue;
453                }
454
455                // Check if we've already synced this block (applies to both modes)
456                if let Some(current) = &self.last_synced_block {
457                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
458                        if !warned_skipping_synced {
459                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
460                            warned_skipping_synced = true;
461                        }
462                        continue;
463                    }
464                }
465                break msg;
466            };
467
468            self.filter_deltas(&mut first_msg);
469
470            // initial snapshot
471            info!(height = first_msg.get_block().number, "First deltas received");
472            let header: BlockHeader = (&first_msg).into();
473            let deltas_msg = StateSyncMessage {
474                header: header.clone(),
475                snapshots: Default::default(),
476                deltas: Some(first_msg),
477                removed_components: Default::default(),
478            };
479
480            // If possible skip retrieving snapshots
481            let msg = if !self.is_next_expected(&header) {
482                info!("Retrieving snapshot");
483                 let snapshot_header = BlockHeader { revert: false, ..header.clone() };
484                let snapshot = self
485                    .get_snapshots::<Vec<&String>>(
486                        snapshot_header,
487                        None,
488                    )
489                    .await?
490                    .merge(deltas_msg);
491                let n_components = self.component_tracker.components.len();
492                let n_snapshots = snapshot.snapshots.states.len();
493                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
494                snapshot
495            } else {
496                deltas_msg
497            };
498            block_tx.send(Ok(msg)).await?;
499            self.last_synced_block = Some(header.clone());
500            loop {
501                select! {
502                    deltas_opt = msg_rx.recv() => {
503                        if let Some(mut deltas) = deltas_opt {
504                            let header: BlockHeader = (&deltas).into();
505                            debug!(block_number=?header.number, "Received delta message");
506
507                            let (snapshots, removed_components) = {
508                                // 1. Remove components based on latest changes
509                                // 2. Add components based on latest changes, query those for snapshots
510                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
511
512                                // Only components we don't track yet need a snapshot,
513                                let requiring_snapshot: Vec<_> = to_add
514                                    .iter()
515                                    .filter(|id| {
516                                        !self.component_tracker
517                                            .components
518                                            .contains_key(id.as_str())
519                                    })
520                                    .collect();
521                                debug!(components=?requiring_snapshot, "SnapshotRequest");
522                                self.component_tracker
523                                    .start_tracking(requiring_snapshot.as_slice())
524                                    .await?;
525
526                                let snapshots = self
527                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
528                                    .await?
529                                    .snapshots;
530
531                                let removed_components = if !to_remove.is_empty() {
532                                    self.component_tracker.stop_tracking(&to_remove)
533                                } else {
534                                    Default::default()
535                                };
536
537                                (snapshots, removed_components)
538                            };
539
540                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
541                            self.component_tracker.process_entrypoints(&deltas.dci_update);
542
543                            // 4. Filter deltas by currently tracked components / contracts
544                            self.filter_deltas(&mut deltas);
545                            let n_changes = deltas.n_changes();
546
547                            // 5. Send the message
548                            let next = StateSyncMessage {
549                                header: header.clone(),
550                                snapshots,
551                                deltas: Some(deltas),
552                                removed_components,
553                            };
554                            block_tx.send(Ok(next)).await?;
555                            self.last_synced_block = Some(header.clone());
556
557                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
558                        } else {
559                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
560                        }
561                    },
562                    _ = &mut end_rx => {
563                        info!("Received close signal during state_sync");
564                        return Ok(());
565                    }
566                }
567            }
568        }.await;
569
570        // This cleanup code now runs regardless of how the function exits (error or channel close)
571        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
572        //Ignore error
573        let _ = self
574            .deltas_client
575            .unsubscribe(subscription_id)
576            .await
577            .map_err(|err| {
578                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
579            });
580
581        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
582        // whether the end_rx was consumed (close signal received) or not
583        match result {
584            Ok(()) => Ok(()), // Success, likely due to close signal
585            Err(e) => {
586                // The error came from the inner async block. Since the async block
587                // can receive close signals (which would return Ok), any error means
588                // the close signal was NOT received, so we can return the end_rx for retry
589                Err((e, Some(end_rx)))
590            }
591        }
592    }
593
594    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
595        if let Some(block) = self.last_synced_block.as_ref() {
596            return incoming.parent_hash == block.hash;
597        }
598        false
599    }
600    fn filter_deltas(&self, deltas: &mut BlockChanges) {
601        deltas.filter_by_component(|id| {
602            self.component_tracker
603                .components
604                .contains_key(id)
605        });
606        deltas.filter_by_contract(|id| {
607            self.component_tracker
608                .contracts
609                .contains(id)
610        });
611    }
612}
613
614#[async_trait]
615impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
616where
617    R: RPCClient + Clone + Send + Sync + 'static,
618    D: DeltasClient + Clone + Send + Sync + 'static,
619{
620    async fn initialize(&mut self) -> SyncResult<()> {
621        info!("Retrieving relevant protocol components");
622        self.component_tracker
623            .initialise_components()
624            .await?;
625        info!(
626            n_components = self.component_tracker.components.len(),
627            n_contracts = self.component_tracker.contracts.len(),
628            extractor = %self.extractor_id,
629            "Finished retrieving components",
630        );
631
632        Ok(())
633    }
634
635    async fn start(
636        mut self,
637    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
638        let (mut tx, rx) = channel(15);
639        let (end_tx, end_rx) = oneshot::channel::<()>();
640
641        let jh = tokio::spawn(async move {
642            let mut retry_count = 0;
643            let mut current_end_rx = end_rx;
644            let mut final_error = None;
645
646            while retry_count < self.max_retries {
647                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
648
649                let res = self
650                    .state_sync(&mut tx, current_end_rx)
651                    .await;
652                match res {
653                    Ok(()) => {
654                        info!(
655                            extractor_id=%&self.extractor_id,
656                            retry_count,
657                            "State synchronization exited cleanly"
658                        );
659                        return;
660                    }
661                    Err((e, maybe_end_rx)) => {
662                        warn!(
663                            extractor_id=%&self.extractor_id,
664                            retry_count,
665                            error=%e,
666                            "State synchronization errored!"
667                        );
668
669                        // If we have the end_rx back, we can retry
670                        if let Some(recovered_end_rx) = maybe_end_rx {
671                            current_end_rx = recovered_end_rx;
672
673                            if let SynchronizerError::ConnectionClosed = e {
674                                // break synchronization loop if websocket client is dead
675                                error!(
676                                    "Websocket connection closed. State synchronization exiting."
677                                );
678                                let _ = tx.send(Err(e)).await;
679                                return;
680                            } else {
681                                // Store error in case this is our last retry
682                                final_error = Some(e);
683                            }
684                        } else {
685                            // Close signal was received, exit cleanly
686                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
687                            return;
688                        }
689                    }
690                }
691                sleep(self.retry_cooldown).await;
692                retry_count += 1;
693            }
694            if let Some(e) = final_error {
695                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
696                let _ = tx.send(Err(e)).await;
697            }
698        });
699
700        let handle = SynchronizerTaskHandle::new(jh, end_tx);
701        (handle, rx)
702    }
703}
704
705#[cfg(test)]
706mod test {
707    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
708    //!
709    //! ## Test Coverage Strategy:
710    //!
711    //! ### Shutdown & Close Signal Tests:
712    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
713    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
714    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
715    //!
716    //! ### Cleanup & Error Handling Tests:
717    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
718    //!
719    //! ### Coverage Summary:
720    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
721    //! ✓ Close signal before first deltas   ✓ Close signal during processing
722    //! ✓ Processing errors                  ✓ Channel closure
723    //! ✓ Public API close operations        ✓ Normal completion
724
725    use std::{collections::HashSet, sync::Arc};
726
727    use tycho_common::dto::{
728        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
729        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
730        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
731        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
732        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
733        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
734    };
735    use uuid::Uuid;
736
737    use super::*;
738    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
739
740    // Required for mock client to implement clone
741    struct ArcRPCClient<T>(Arc<T>);
742
743    // Default derive(Clone) does require T to be Clone as well.
744    impl<T> Clone for ArcRPCClient<T> {
745        fn clone(&self) -> Self {
746            ArcRPCClient(self.0.clone())
747        }
748    }
749
750    #[async_trait]
751    impl<T> RPCClient for ArcRPCClient<T>
752    where
753        T: RPCClient + Sync + Send + 'static,
754    {
755        async fn get_tokens(
756            &self,
757            request: &TokensRequestBody,
758        ) -> Result<TokensRequestResponse, RPCError> {
759            self.0.get_tokens(request).await
760        }
761
762        async fn get_contract_state(
763            &self,
764            request: &StateRequestBody,
765        ) -> Result<StateRequestResponse, RPCError> {
766            self.0.get_contract_state(request).await
767        }
768
769        async fn get_protocol_components(
770            &self,
771            request: &ProtocolComponentsRequestBody,
772        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
773            self.0
774                .get_protocol_components(request)
775                .await
776        }
777
778        async fn get_protocol_states(
779            &self,
780            request: &ProtocolStateRequestBody,
781        ) -> Result<ProtocolStateRequestResponse, RPCError> {
782            self.0
783                .get_protocol_states(request)
784                .await
785        }
786
787        async fn get_protocol_systems(
788            &self,
789            request: &ProtocolSystemsRequestBody,
790        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
791            self.0
792                .get_protocol_systems(request)
793                .await
794        }
795
796        async fn get_component_tvl(
797            &self,
798            request: &ComponentTvlRequestBody,
799        ) -> Result<ComponentTvlRequestResponse, RPCError> {
800            self.0.get_component_tvl(request).await
801        }
802
803        async fn get_traced_entry_points(
804            &self,
805            request: &TracedEntryPointRequestBody,
806        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
807            self.0
808                .get_traced_entry_points(request)
809                .await
810        }
811
812        async fn get_snapshots<'a>(
813            &self,
814            request: &SnapshotParameters<'a>,
815            chunk_size: Option<usize>,
816            concurrency: usize,
817        ) -> Result<Snapshot, RPCError> {
818            self.0
819                .get_snapshots(request, chunk_size, concurrency)
820                .await
821        }
822
823        fn compression(&self) -> bool {
824            self.0.compression()
825        }
826    }
827
828    // Required for mock client to implement clone
829    struct ArcDeltasClient<T>(Arc<T>);
830
831    // Default derive(Clone) does require T to be Clone as well.
832    impl<T> Clone for ArcDeltasClient<T> {
833        fn clone(&self) -> Self {
834            ArcDeltasClient(self.0.clone())
835        }
836    }
837
838    #[async_trait]
839    impl<T> DeltasClient for ArcDeltasClient<T>
840    where
841        T: DeltasClient + Sync + Send + 'static,
842    {
843        async fn subscribe(
844            &self,
845            extractor_id: ExtractorIdentity,
846            options: SubscriptionOptions,
847        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
848            self.0
849                .subscribe(extractor_id, options)
850                .await
851        }
852
853        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
854            self.0
855                .unsubscribe(subscription_id)
856                .await
857        }
858
859        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
860            self.0.connect().await
861        }
862
863        async fn close(&self) -> Result<(), DeltasError> {
864            self.0.close().await
865        }
866    }
867
868    fn with_mocked_clients(
869        native: bool,
870        include_tvl: bool,
871        rpc_client: Option<MockRPCClient>,
872        deltas_client: Option<MockDeltasClient>,
873    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
874    {
875        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
876        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
877
878        ProtocolStateSynchronizer::new(
879            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
880            native,
881            ComponentFilter::with_tvl_range(50.0, 50.0),
882            1,
883            Duration::from_secs(0),
884            true,
885            include_tvl,
886            true, // Does not matter as we mock the client that never compresses
887            rpc_client,
888            deltas_client,
889            10_u64,
890        )
891    }
892
893    fn state_snapshot_native() -> ProtocolStateRequestResponse {
894        ProtocolStateRequestResponse {
895            states: vec![ResponseProtocolState {
896                component_id: "Component1".to_string(),
897                ..Default::default()
898            }],
899            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
900        }
901    }
902
903    fn make_mock_client() -> MockRPCClient {
904        let mut m = MockRPCClient::new();
905        m.expect_compression()
906            .return_const(false);
907        m
908    }
909
910    #[test_log::test(tokio::test)]
911    async fn test_get_snapshots_native() {
912        let header = BlockHeader::default();
913        let mut rpc = make_mock_client();
914        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
915
916        let component_clone = component.clone();
917        rpc.expect_get_snapshots()
918            .returning(move |_request, _chunk_size, _concurrency| {
919                Ok(Snapshot {
920                    states: state_snapshot_native()
921                        .states
922                        .into_iter()
923                        .map(|state| {
924                            (
925                                state.component_id.clone(),
926                                ComponentWithState {
927                                    state,
928                                    component: component_clone.clone(),
929                                    entrypoints: vec![],
930                                    component_tvl: None,
931                                },
932                            )
933                        })
934                        .collect(),
935                    vm_storage: HashMap::new(),
936                })
937            });
938
939        rpc.expect_get_traced_entry_points()
940            .returning(|_| {
941                Ok(TracedEntryPointRequestResponse {
942                    traced_entry_points: HashMap::new(),
943                    pagination: PaginationResponse::new(0, 20, 0),
944                })
945            });
946
947        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
948        state_sync
949            .component_tracker
950            .components
951            .insert("Component1".to_string(), component.clone());
952        let components_arg = ["Component1".to_string()];
953        let exp = StateSyncMessage {
954            header: header.clone(),
955            snapshots: Snapshot {
956                states: state_snapshot_native()
957                    .states
958                    .into_iter()
959                    .map(|state| {
960                        (
961                            state.component_id.clone(),
962                            ComponentWithState {
963                                state,
964                                component: component.clone(),
965                                entrypoints: vec![],
966                                component_tvl: None,
967                            },
968                        )
969                    })
970                    .collect(),
971                vm_storage: HashMap::new(),
972            },
973            deltas: None,
974            removed_components: Default::default(),
975        };
976
977        let snap = state_sync
978            .get_snapshots(header, Some(&components_arg))
979            .await
980            .expect("Retrieving snapshot failed");
981
982        assert_eq!(snap, exp);
983    }
984
985    #[test_log::test(tokio::test)]
986    async fn test_get_snapshots_native_with_tvl() {
987        let header = BlockHeader::default();
988        let mut rpc = make_mock_client();
989        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
990
991        let component_clone = component.clone();
992        rpc.expect_get_snapshots()
993            .returning(move |_request, _chunk_size, _concurrency| {
994                Ok(Snapshot {
995                    states: state_snapshot_native()
996                        .states
997                        .into_iter()
998                        .map(|state| {
999                            (
1000                                state.component_id.clone(),
1001                                ComponentWithState {
1002                                    state,
1003                                    component: component_clone.clone(),
1004                                    component_tvl: Some(100.0),
1005                                    entrypoints: vec![],
1006                                },
1007                            )
1008                        })
1009                        .collect(),
1010                    vm_storage: HashMap::new(),
1011                })
1012            });
1013
1014        rpc.expect_get_traced_entry_points()
1015            .returning(|_| {
1016                Ok(TracedEntryPointRequestResponse {
1017                    traced_entry_points: HashMap::new(),
1018                    pagination: PaginationResponse::new(0, 20, 0),
1019                })
1020            });
1021
1022        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1023        state_sync
1024            .component_tracker
1025            .components
1026            .insert("Component1".to_string(), component.clone());
1027        let components_arg = ["Component1".to_string()];
1028        let exp = StateSyncMessage {
1029            header: header.clone(),
1030            snapshots: Snapshot {
1031                states: state_snapshot_native()
1032                    .states
1033                    .into_iter()
1034                    .map(|state| {
1035                        (
1036                            state.component_id.clone(),
1037                            ComponentWithState {
1038                                state,
1039                                component: component.clone(),
1040                                component_tvl: Some(100.0),
1041                                entrypoints: vec![],
1042                            },
1043                        )
1044                    })
1045                    .collect(),
1046                vm_storage: HashMap::new(),
1047            },
1048            deltas: None,
1049            removed_components: Default::default(),
1050        };
1051
1052        let snap = state_sync
1053            .get_snapshots(header, Some(&components_arg))
1054            .await
1055            .expect("Retrieving snapshot failed");
1056
1057        assert_eq!(snap, exp);
1058    }
1059
1060    fn state_snapshot_vm() -> StateRequestResponse {
1061        StateRequestResponse {
1062            accounts: vec![
1063                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1064                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1065            ],
1066            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1067        }
1068    }
1069
1070    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1071        TracedEntryPointRequestResponse {
1072            traced_entry_points: HashMap::from([(
1073                "Component1".to_string(),
1074                vec![(
1075                    EntryPointWithTracingParams {
1076                        entry_point: EntryPoint {
1077                            external_id: "entrypoint_a".to_string(),
1078                            target: Bytes::from("0x0badc0ffee"),
1079                            signature: "sig()".to_string(),
1080                        },
1081                        params: TracingParams::RPCTracer(RPCTracerParams {
1082                            caller: Some(Bytes::from("0x0badc0ffee")),
1083                            calldata: Bytes::from("0x0badc0ffee"),
1084                            state_overrides: None,
1085                            prune_addresses: None,
1086                        }),
1087                    },
1088                    TracingResult {
1089                        retriggers: HashSet::from([(
1090                            Bytes::from("0x0badc0ffee"),
1091                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1092                        )]),
1093                        accessed_slots: HashMap::from([(
1094                            Bytes::from("0x0badc0ffee"),
1095                            HashSet::from([Bytes::from("0xbadbeef0")]),
1096                        )]),
1097                    },
1098                )],
1099            )]),
1100            pagination: PaginationResponse::new(0, 20, 0),
1101        }
1102    }
1103
1104    #[test_log::test(tokio::test)]
1105    async fn test_get_snapshots_vm() {
1106        let header = BlockHeader::default();
1107        let mut rpc = make_mock_client();
1108
1109        let traced_ep_response = traced_entry_point_response();
1110        rpc.expect_get_snapshots()
1111            .returning(move |_request, _chunk_size, _concurrency| {
1112                let vm_storage_accounts = state_snapshot_vm();
1113                Ok(Snapshot {
1114                    states: [(
1115                        "Component1".to_string(),
1116                        ComponentWithState {
1117                            state: ResponseProtocolState {
1118                                component_id: "Component1".to_string(),
1119                                ..Default::default()
1120                            },
1121                            component: ProtocolComponent {
1122                                id: "Component1".to_string(),
1123                                contract_ids: vec![
1124                                    Bytes::from("0x0badc0ffee"),
1125                                    Bytes::from("0xbabe42"),
1126                                ],
1127                                ..Default::default()
1128                            },
1129                            component_tvl: None,
1130                            entrypoints: traced_ep_response
1131                                .traced_entry_points
1132                                .get("Component1")
1133                                .cloned()
1134                                .unwrap_or_default(),
1135                        },
1136                    )]
1137                    .into_iter()
1138                    .collect(),
1139                    vm_storage: vm_storage_accounts
1140                        .accounts
1141                        .into_iter()
1142                        .map(|state| (state.address.clone(), state))
1143                        .collect(),
1144                })
1145            });
1146
1147        rpc.expect_get_traced_entry_points()
1148            .returning(|_| Ok(traced_entry_point_response()));
1149
1150        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1151        let component = ProtocolComponent {
1152            id: "Component1".to_string(),
1153            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1154            ..Default::default()
1155        };
1156        state_sync
1157            .component_tracker
1158            .components
1159            .insert("Component1".to_string(), component.clone());
1160        let components_arg = ["Component1".to_string()];
1161        let exp = StateSyncMessage {
1162            header: header.clone(),
1163            snapshots: Snapshot {
1164                states: [(
1165                    component.id.clone(),
1166                    ComponentWithState {
1167                        state: ResponseProtocolState {
1168                            component_id: "Component1".to_string(),
1169                            ..Default::default()
1170                        },
1171                        component: component.clone(),
1172                        component_tvl: None,
1173                        entrypoints: vec![(
1174                            EntryPointWithTracingParams {
1175                                entry_point: EntryPoint {
1176                                    external_id: "entrypoint_a".to_string(),
1177                                    target: Bytes::from("0x0badc0ffee"),
1178                                    signature: "sig()".to_string(),
1179                                },
1180                                params: TracingParams::RPCTracer(RPCTracerParams {
1181                                    caller: Some(Bytes::from("0x0badc0ffee")),
1182                                    calldata: Bytes::from("0x0badc0ffee"),
1183                                    state_overrides: None,
1184                                    prune_addresses: None,
1185                                }),
1186                            },
1187                            TracingResult {
1188                                retriggers: HashSet::from([(
1189                                    Bytes::from("0x0badc0ffee"),
1190                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1191                                )]),
1192                                accessed_slots: HashMap::from([(
1193                                    Bytes::from("0x0badc0ffee"),
1194                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1195                                )]),
1196                            },
1197                        )],
1198                    },
1199                )]
1200                .into_iter()
1201                .collect(),
1202                vm_storage: state_snapshot_vm()
1203                    .accounts
1204                    .into_iter()
1205                    .map(|state| (state.address.clone(), state))
1206                    .collect(),
1207            },
1208            deltas: None,
1209            removed_components: Default::default(),
1210        };
1211
1212        let snap = state_sync
1213            .get_snapshots(header, Some(&components_arg))
1214            .await
1215            .expect("Retrieving snapshot failed");
1216
1217        assert_eq!(snap, exp);
1218    }
1219
1220    #[test_log::test(tokio::test)]
1221    async fn test_get_snapshots_vm_with_tvl() {
1222        let header = BlockHeader::default();
1223        let mut rpc = make_mock_client();
1224        let component = ProtocolComponent {
1225            id: "Component1".to_string(),
1226            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1227            ..Default::default()
1228        };
1229
1230        let component_clone = component.clone();
1231        rpc.expect_get_snapshots()
1232            .returning(move |_request, _chunk_size, _concurrency| {
1233                let vm_storage_accounts = state_snapshot_vm();
1234                Ok(Snapshot {
1235                    states: [(
1236                        "Component1".to_string(),
1237                        ComponentWithState {
1238                            state: ResponseProtocolState {
1239                                component_id: "Component1".to_string(),
1240                                ..Default::default()
1241                            },
1242                            component: component_clone.clone(),
1243                            component_tvl: Some(100.0),
1244                            entrypoints: vec![],
1245                        },
1246                    )]
1247                    .into_iter()
1248                    .collect(),
1249                    vm_storage: vm_storage_accounts
1250                        .accounts
1251                        .into_iter()
1252                        .map(|state| (state.address.clone(), state))
1253                        .collect(),
1254                })
1255            });
1256
1257        rpc.expect_get_traced_entry_points()
1258            .returning(|_| {
1259                Ok(TracedEntryPointRequestResponse {
1260                    traced_entry_points: HashMap::new(),
1261                    pagination: PaginationResponse::new(0, 20, 0),
1262                })
1263            });
1264
1265        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1266        state_sync
1267            .component_tracker
1268            .components
1269            .insert("Component1".to_string(), component.clone());
1270        let components_arg = ["Component1".to_string()];
1271        let exp = StateSyncMessage {
1272            header: header.clone(),
1273            snapshots: Snapshot {
1274                states: [(
1275                    component.id.clone(),
1276                    ComponentWithState {
1277                        state: ResponseProtocolState {
1278                            component_id: "Component1".to_string(),
1279                            ..Default::default()
1280                        },
1281                        component: component.clone(),
1282                        component_tvl: Some(100.0),
1283                        entrypoints: vec![],
1284                    },
1285                )]
1286                .into_iter()
1287                .collect(),
1288                vm_storage: state_snapshot_vm()
1289                    .accounts
1290                    .into_iter()
1291                    .map(|state| (state.address.clone(), state))
1292                    .collect(),
1293            },
1294            deltas: None,
1295            removed_components: Default::default(),
1296        };
1297
1298        let snap = state_sync
1299            .get_snapshots(header, Some(&components_arg))
1300            .await
1301            .expect("Retrieving snapshot failed");
1302
1303        assert_eq!(snap, exp);
1304    }
1305
1306    /// Test that get_snapshots only fetches snapshots for requested components,
1307    /// not all tracked components. This prevents returning full snapshots repeatedly
1308    /// when only a subset of components need updates.
1309    #[test_log::test(tokio::test)]
1310    async fn test_get_snapshots_filters_to_requested_components_only() {
1311        let header = BlockHeader::default();
1312        let mut rpc = make_mock_client();
1313
1314        // Create three components
1315        let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1316        let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1317        let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1318
1319        let component2_clone = component2.clone();
1320
1321        // Mock the RPC call and verify it only receives Component2
1322        rpc.expect_get_snapshots()
1323            .withf(
1324                |request: &SnapshotParameters,
1325                 _chunk_size: &Option<usize>,
1326                 _concurrency: &usize| {
1327                    // Verify that the request contains ONLY Component2, not all tracked components
1328                    request.components.len() == 1 &&
1329                        request
1330                            .components
1331                            .contains_key("Component2")
1332                },
1333            )
1334            .times(1)
1335            .returning(move |_request, _chunk_size, _concurrency| {
1336                Ok(Snapshot {
1337                    states: [(
1338                        "Component2".to_string(),
1339                        ComponentWithState {
1340                            state: ResponseProtocolState {
1341                                component_id: "Component2".to_string(),
1342                                ..Default::default()
1343                            },
1344                            component: component2_clone.clone(),
1345                            entrypoints: vec![],
1346                            component_tvl: None,
1347                        },
1348                    )]
1349                    .into_iter()
1350                    .collect(),
1351                    vm_storage: HashMap::new(),
1352                })
1353            });
1354
1355        rpc.expect_get_traced_entry_points()
1356            .returning(|_| {
1357                Ok(TracedEntryPointRequestResponse {
1358                    traced_entry_points: HashMap::new(),
1359                    pagination: PaginationResponse::new(0, 20, 0),
1360                })
1361            });
1362
1363        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1364
1365        // Track all three components
1366        state_sync
1367            .component_tracker
1368            .components
1369            .insert("Component1".to_string(), component1.clone());
1370        state_sync
1371            .component_tracker
1372            .components
1373            .insert("Component2".to_string(), component2.clone());
1374        state_sync
1375            .component_tracker
1376            .components
1377            .insert("Component3".to_string(), component3.clone());
1378
1379        // Request snapshot for ONLY Component2
1380        let components_arg = ["Component2".to_string()];
1381
1382        let snap = state_sync
1383            .get_snapshots(header.clone(), Some(&components_arg))
1384            .await
1385            .expect("Retrieving snapshot failed");
1386
1387        // Verify we only got Component2 back
1388        assert_eq!(snap.snapshots.states.len(), 1);
1389        assert!(snap
1390            .snapshots
1391            .states
1392            .contains_key("Component2"));
1393        assert!(!snap
1394            .snapshots
1395            .states
1396            .contains_key("Component1"));
1397        assert!(!snap
1398            .snapshots
1399            .states
1400            .contains_key("Component3"));
1401    }
1402
1403    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1404        let mut rpc_client = make_mock_client();
1405        // Mocks for the start_tracking call, these need to come first because they are more
1406        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1407        rpc_client
1408            .expect_get_protocol_components()
1409            .with(mockall::predicate::function(
1410                move |request_params: &ProtocolComponentsRequestBody| {
1411                    if let Some(ids) = request_params.component_ids.as_ref() {
1412                        ids.contains(&"Component3".to_string())
1413                    } else {
1414                        false
1415                    }
1416                },
1417            ))
1418            .returning(|_| {
1419                // return Component3
1420                Ok(ProtocolComponentRequestResponse {
1421                    protocol_components: vec![
1422                        // this component shall have a tvl update above threshold
1423                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1424                    ],
1425                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1426                })
1427            });
1428        // Mock get_snapshots for Component3
1429        rpc_client
1430            .expect_get_snapshots()
1431            .withf(
1432                |request: &SnapshotParameters,
1433                 _chunk_size: &Option<usize>,
1434                 _concurrency: &usize| {
1435                    request
1436                        .components
1437                        .contains_key("Component3")
1438                },
1439            )
1440            .returning(|_request, _chunk_size, _concurrency| {
1441                Ok(Snapshot {
1442                    states: [(
1443                        "Component3".to_string(),
1444                        ComponentWithState {
1445                            state: ResponseProtocolState {
1446                                component_id: "Component3".to_string(),
1447                                ..Default::default()
1448                            },
1449                            component: ProtocolComponent {
1450                                id: "Component3".to_string(),
1451                                ..Default::default()
1452                            },
1453                            component_tvl: Some(1000.0),
1454                            entrypoints: vec![],
1455                        },
1456                    )]
1457                    .into_iter()
1458                    .collect(),
1459                    vm_storage: HashMap::new(),
1460                })
1461            });
1462
1463        // mock calls for the initial state snapshots
1464        rpc_client
1465            .expect_get_protocol_components()
1466            .returning(|_| {
1467                // Initial sync of components
1468                Ok(ProtocolComponentRequestResponse {
1469                    protocol_components: vec![
1470                        // this component shall have a tvl update above threshold
1471                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1472                        // this component shall have a tvl update below threshold.
1473                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1474                        // a third component will have a tvl update above threshold
1475                    ],
1476                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1477                })
1478            });
1479
1480        rpc_client
1481            .expect_get_snapshots()
1482            .returning(|_request, _chunk_size, _concurrency| {
1483                Ok(Snapshot {
1484                    states: [
1485                        (
1486                            "Component1".to_string(),
1487                            ComponentWithState {
1488                                state: ResponseProtocolState {
1489                                    component_id: "Component1".to_string(),
1490                                    ..Default::default()
1491                                },
1492                                component: ProtocolComponent {
1493                                    id: "Component1".to_string(),
1494                                    ..Default::default()
1495                                },
1496                                component_tvl: Some(100.0),
1497                                entrypoints: vec![],
1498                            },
1499                        ),
1500                        (
1501                            "Component2".to_string(),
1502                            ComponentWithState {
1503                                state: ResponseProtocolState {
1504                                    component_id: "Component2".to_string(),
1505                                    ..Default::default()
1506                                },
1507                                component: ProtocolComponent {
1508                                    id: "Component2".to_string(),
1509                                    ..Default::default()
1510                                },
1511                                component_tvl: Some(0.0),
1512                                entrypoints: vec![],
1513                            },
1514                        ),
1515                    ]
1516                    .into_iter()
1517                    .collect(),
1518                    vm_storage: HashMap::new(),
1519                })
1520            });
1521
1522        // Mock get_traced_entry_points for Ethereum chain
1523        rpc_client
1524            .expect_get_traced_entry_points()
1525            .returning(|_| {
1526                Ok(TracedEntryPointRequestResponse {
1527                    traced_entry_points: HashMap::new(),
1528                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1529                })
1530            });
1531
1532        // Mock deltas client and messages
1533        let mut deltas_client = MockDeltasClient::new();
1534        let (tx, rx) = channel(1);
1535        deltas_client
1536            .expect_subscribe()
1537            .return_once(move |_, _| {
1538                // Return subscriber id and a channel
1539                Ok((Uuid::default(), rx))
1540            });
1541
1542        // Expect unsubscribe call during cleanup
1543        deltas_client
1544            .expect_unsubscribe()
1545            .return_once(|_| Ok(()));
1546
1547        (rpc_client, deltas_client, tx)
1548    }
1549
1550    /// Test strategy
1551    ///
1552    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1553    /// - send 2 dummy messages, containing only blocks
1554    /// - third message contains a new component with some significant tvl, one initial component
1555    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1556    #[test_log::test(tokio::test)]
1557    async fn test_state_sync() {
1558        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1559        let deltas = [
1560            BlockChanges {
1561                extractor: "uniswap-v2".to_string(),
1562                chain: Chain::Ethereum,
1563                block: Block {
1564                    number: 1,
1565                    hash: Bytes::from("0x01"),
1566                    parent_hash: Bytes::from("0x00"),
1567                    chain: Chain::Ethereum,
1568                    ts: Default::default(),
1569                },
1570                revert: false,
1571                dci_update: DCIUpdate {
1572                    new_entrypoints: HashMap::from([(
1573                        "Component1".to_string(),
1574                        HashSet::from([EntryPoint {
1575                            external_id: "entrypoint_a".to_string(),
1576                            target: Bytes::from("0x0badc0ffee"),
1577                            signature: "sig()".to_string(),
1578                        }]),
1579                    )]),
1580                    new_entrypoint_params: HashMap::from([(
1581                        "entrypoint_a".to_string(),
1582                        HashSet::from([(
1583                            TracingParams::RPCTracer(RPCTracerParams {
1584                                caller: Some(Bytes::from("0x0badc0ffee")),
1585                                calldata: Bytes::from("0x0badc0ffee"),
1586                                state_overrides: None,
1587                                prune_addresses: None,
1588                            }),
1589                            "Component1".to_string(),
1590                        )]),
1591                    )]),
1592                    trace_results: HashMap::from([(
1593                        "entrypoint_a".to_string(),
1594                        TracingResult {
1595                            retriggers: HashSet::from([(
1596                                Bytes::from("0x0badc0ffee"),
1597                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1598                            )]),
1599                            accessed_slots: HashMap::from([(
1600                                Bytes::from("0x0badc0ffee"),
1601                                HashSet::from([Bytes::from("0xbadbeef0")]),
1602                            )]),
1603                        },
1604                    )]),
1605                },
1606                ..Default::default()
1607            },
1608            BlockChanges {
1609                extractor: "uniswap-v2".to_string(),
1610                chain: Chain::Ethereum,
1611                block: Block {
1612                    number: 2,
1613                    hash: Bytes::from("0x02"),
1614                    parent_hash: Bytes::from("0x01"),
1615                    chain: Chain::Ethereum,
1616                    ts: Default::default(),
1617                },
1618                revert: false,
1619                component_tvl: [
1620                    ("Component1".to_string(), 100.0),
1621                    ("Component2".to_string(), 0.0),
1622                    ("Component3".to_string(), 1000.0),
1623                ]
1624                .into_iter()
1625                .collect(),
1626                ..Default::default()
1627            },
1628        ];
1629        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1630        state_sync
1631            .initialize()
1632            .await
1633            .expect("Init failed");
1634
1635        // Test starts here
1636        let (handle, mut rx) = state_sync.start().await;
1637        let (jh, close_tx) = handle.split();
1638        tx.send(deltas[0].clone())
1639            .await
1640            .expect("deltas channel msg 0 closed!");
1641        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1642            .await
1643            .expect("waiting for first state msg timed out!")
1644            .expect("state sync block sender closed!");
1645        tx.send(deltas[1].clone())
1646            .await
1647            .expect("deltas channel msg 1 closed!");
1648        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1649            .await
1650            .expect("waiting for second state msg timed out!")
1651            .expect("state sync block sender closed!");
1652        let _ = close_tx.send(());
1653        jh.await
1654            .expect("state sync task panicked!");
1655
1656        // assertions
1657        let exp1 = StateSyncMessage {
1658            header: BlockHeader {
1659                number: 1,
1660                hash: Bytes::from("0x01"),
1661                parent_hash: Bytes::from("0x00"),
1662                revert: false,
1663                ..Default::default()
1664            },
1665            snapshots: Snapshot {
1666                states: [
1667                    (
1668                        "Component1".to_string(),
1669                        ComponentWithState {
1670                            state: ResponseProtocolState {
1671                                component_id: "Component1".to_string(),
1672                                ..Default::default()
1673                            },
1674                            component: ProtocolComponent {
1675                                id: "Component1".to_string(),
1676                                ..Default::default()
1677                            },
1678                            component_tvl: Some(100.0),
1679                            entrypoints: vec![],
1680                        },
1681                    ),
1682                    (
1683                        "Component2".to_string(),
1684                        ComponentWithState {
1685                            state: ResponseProtocolState {
1686                                component_id: "Component2".to_string(),
1687                                ..Default::default()
1688                            },
1689                            component: ProtocolComponent {
1690                                id: "Component2".to_string(),
1691                                ..Default::default()
1692                            },
1693                            component_tvl: Some(0.0),
1694                            entrypoints: vec![],
1695                        },
1696                    ),
1697                ]
1698                .into_iter()
1699                .collect(),
1700                vm_storage: HashMap::new(),
1701            },
1702            deltas: Some(deltas[0].clone()),
1703            removed_components: Default::default(),
1704        };
1705
1706        let exp2 = StateSyncMessage {
1707            header: BlockHeader {
1708                number: 2,
1709                hash: Bytes::from("0x02"),
1710                parent_hash: Bytes::from("0x01"),
1711                revert: false,
1712                ..Default::default()
1713            },
1714            snapshots: Snapshot {
1715                states: [
1716                    // This is the new component we queried once it passed the tvl threshold.
1717                    (
1718                        "Component3".to_string(),
1719                        ComponentWithState {
1720                            state: ResponseProtocolState {
1721                                component_id: "Component3".to_string(),
1722                                ..Default::default()
1723                            },
1724                            component: ProtocolComponent {
1725                                id: "Component3".to_string(),
1726                                ..Default::default()
1727                            },
1728                            component_tvl: Some(1000.0),
1729                            entrypoints: vec![],
1730                        },
1731                    ),
1732                ]
1733                .into_iter()
1734                .collect(),
1735                vm_storage: HashMap::new(),
1736            },
1737            // Our deltas are empty and since merge methods are
1738            // tested in tycho-common we don't have much to do here.
1739            deltas: Some(BlockChanges {
1740                extractor: "uniswap-v2".to_string(),
1741                chain: Chain::Ethereum,
1742                block: Block {
1743                    number: 2,
1744                    hash: Bytes::from("0x02"),
1745                    parent_hash: Bytes::from("0x01"),
1746                    chain: Chain::Ethereum,
1747                    ts: Default::default(),
1748                },
1749                revert: false,
1750                component_tvl: [
1751                    // "Component2" should not show here.
1752                    ("Component1".to_string(), 100.0),
1753                    ("Component3".to_string(), 1000.0),
1754                ]
1755                .into_iter()
1756                .collect(),
1757                ..Default::default()
1758            }),
1759            // "Component2" was removed, because its tvl changed to 0.
1760            removed_components: [(
1761                "Component2".to_string(),
1762                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1763            )]
1764            .into_iter()
1765            .collect(),
1766        };
1767        assert_eq!(first_msg.unwrap(), exp1);
1768        assert_eq!(second_msg.unwrap(), exp2);
1769    }
1770
1771    #[test_log::test(tokio::test)]
1772    async fn test_state_sync_with_tvl_range() {
1773        // Define the range for testing
1774        let remove_tvl_threshold = 5.0;
1775        let add_tvl_threshold = 7.0;
1776
1777        let mut rpc_client = make_mock_client();
1778        let mut deltas_client = MockDeltasClient::new();
1779
1780        rpc_client
1781            .expect_get_protocol_components()
1782            .with(mockall::predicate::function(
1783                move |request_params: &ProtocolComponentsRequestBody| {
1784                    if let Some(ids) = request_params.component_ids.as_ref() {
1785                        ids.contains(&"Component3".to_string())
1786                    } else {
1787                        false
1788                    }
1789                },
1790            ))
1791            .returning(|_| {
1792                Ok(ProtocolComponentRequestResponse {
1793                    protocol_components: vec![ProtocolComponent {
1794                        id: "Component3".to_string(),
1795                        ..Default::default()
1796                    }],
1797                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1798                })
1799            });
1800        // Mock get_snapshots for Component3
1801        rpc_client
1802            .expect_get_snapshots()
1803            .withf(
1804                |request: &SnapshotParameters,
1805                 _chunk_size: &Option<usize>,
1806                 _concurrency: &usize| {
1807                    request
1808                        .components
1809                        .contains_key("Component3")
1810                },
1811            )
1812            .returning(|_request, _chunk_size, _concurrency| {
1813                Ok(Snapshot {
1814                    states: [(
1815                        "Component3".to_string(),
1816                        ComponentWithState {
1817                            state: ResponseProtocolState {
1818                                component_id: "Component3".to_string(),
1819                                ..Default::default()
1820                            },
1821                            component: ProtocolComponent {
1822                                id: "Component3".to_string(),
1823                                ..Default::default()
1824                            },
1825                            component_tvl: Some(10.0),
1826                            entrypoints: vec![],
1827                        },
1828                    )]
1829                    .into_iter()
1830                    .collect(),
1831                    vm_storage: HashMap::new(),
1832                })
1833            });
1834
1835        // Mock for the initial snapshot retrieval
1836        rpc_client
1837            .expect_get_protocol_components()
1838            .returning(|_| {
1839                Ok(ProtocolComponentRequestResponse {
1840                    protocol_components: vec![
1841                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1842                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1843                    ],
1844                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1845                })
1846            });
1847
1848        // Mock get_snapshots for initial snapshot
1849        rpc_client
1850            .expect_get_snapshots()
1851            .returning(|_request, _chunk_size, _concurrency| {
1852                Ok(Snapshot {
1853                    states: [
1854                        (
1855                            "Component1".to_string(),
1856                            ComponentWithState {
1857                                state: ResponseProtocolState {
1858                                    component_id: "Component1".to_string(),
1859                                    ..Default::default()
1860                                },
1861                                component: ProtocolComponent {
1862                                    id: "Component1".to_string(),
1863                                    ..Default::default()
1864                                },
1865                                component_tvl: Some(6.0),
1866                                entrypoints: vec![],
1867                            },
1868                        ),
1869                        (
1870                            "Component2".to_string(),
1871                            ComponentWithState {
1872                                state: ResponseProtocolState {
1873                                    component_id: "Component2".to_string(),
1874                                    ..Default::default()
1875                                },
1876                                component: ProtocolComponent {
1877                                    id: "Component2".to_string(),
1878                                    ..Default::default()
1879                                },
1880                                component_tvl: Some(2.0),
1881                                entrypoints: vec![],
1882                            },
1883                        ),
1884                    ]
1885                    .into_iter()
1886                    .collect(),
1887                    vm_storage: HashMap::new(),
1888                })
1889            });
1890
1891        // Mock get_traced_entry_points for Ethereum chain
1892        rpc_client
1893            .expect_get_traced_entry_points()
1894            .returning(|_| {
1895                Ok(TracedEntryPointRequestResponse {
1896                    traced_entry_points: HashMap::new(),
1897                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1898                })
1899            });
1900
1901        let (tx, rx) = channel(1);
1902        deltas_client
1903            .expect_subscribe()
1904            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1905
1906        // Expect unsubscribe call during cleanup
1907        deltas_client
1908            .expect_unsubscribe()
1909            .return_once(|_| Ok(()));
1910
1911        let mut state_sync = ProtocolStateSynchronizer::new(
1912            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1913            true,
1914            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1915            1,
1916            Duration::from_secs(0),
1917            true,
1918            true,
1919            true,
1920            ArcRPCClient(Arc::new(rpc_client)),
1921            ArcDeltasClient(Arc::new(deltas_client)),
1922            10_u64,
1923        );
1924        state_sync
1925            .initialize()
1926            .await
1927            .expect("Init failed");
1928
1929        // Simulate the incoming BlockChanges
1930        let deltas = [
1931            BlockChanges {
1932                extractor: "uniswap-v2".to_string(),
1933                chain: Chain::Ethereum,
1934                block: Block {
1935                    number: 1,
1936                    hash: Bytes::from("0x01"),
1937                    parent_hash: Bytes::from("0x00"),
1938                    chain: Chain::Ethereum,
1939                    ts: Default::default(),
1940                },
1941                revert: false,
1942                ..Default::default()
1943            },
1944            BlockChanges {
1945                extractor: "uniswap-v2".to_string(),
1946                chain: Chain::Ethereum,
1947                block: Block {
1948                    number: 2,
1949                    hash: Bytes::from("0x02"),
1950                    parent_hash: Bytes::from("0x01"),
1951                    chain: Chain::Ethereum,
1952                    ts: Default::default(),
1953                },
1954                revert: false,
1955                component_tvl: [
1956                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1957                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1958                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1959                ]
1960                .into_iter()
1961                .collect(),
1962                ..Default::default()
1963            },
1964        ];
1965
1966        let (handle, mut rx) = state_sync.start().await;
1967        let (jh, close_tx) = handle.split();
1968
1969        // Simulate sending delta messages
1970        tx.send(deltas[0].clone())
1971            .await
1972            .expect("deltas channel msg 0 closed!");
1973
1974        // Expecting to receive the initial state message
1975        let _ = timeout(Duration::from_millis(100), rx.recv())
1976            .await
1977            .expect("waiting for first state msg timed out!")
1978            .expect("state sync block sender closed!");
1979
1980        // Send the third message, which should trigger TVL-based changes
1981        tx.send(deltas[1].clone())
1982            .await
1983            .expect("deltas channel msg 1 closed!");
1984        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1985            .await
1986            .expect("waiting for second state msg timed out!")
1987            .expect("state sync block sender closed!")
1988            .expect("no error");
1989
1990        let _ = close_tx.send(());
1991        jh.await
1992            .expect("state sync task panicked!");
1993
1994        let expected_second_msg = StateSyncMessage {
1995            header: BlockHeader {
1996                number: 2,
1997                hash: Bytes::from("0x02"),
1998                parent_hash: Bytes::from("0x01"),
1999                revert: false,
2000                ..Default::default()
2001            },
2002            snapshots: Snapshot {
2003                states: [(
2004                    "Component3".to_string(),
2005                    ComponentWithState {
2006                        state: ResponseProtocolState {
2007                            component_id: "Component3".to_string(),
2008                            ..Default::default()
2009                        },
2010                        component: ProtocolComponent {
2011                            id: "Component3".to_string(),
2012                            ..Default::default()
2013                        },
2014                        component_tvl: Some(10.0),
2015                        entrypoints: vec![], // TODO: add entrypoints?
2016                    },
2017                )]
2018                .into_iter()
2019                .collect(),
2020                vm_storage: HashMap::new(),
2021            },
2022            deltas: Some(BlockChanges {
2023                extractor: "uniswap-v2".to_string(),
2024                chain: Chain::Ethereum,
2025                block: Block {
2026                    number: 2,
2027                    hash: Bytes::from("0x02"),
2028                    parent_hash: Bytes::from("0x01"),
2029                    chain: Chain::Ethereum,
2030                    ts: Default::default(),
2031                },
2032                revert: false,
2033                component_tvl: [
2034                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
2035                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
2036                ]
2037                .into_iter()
2038                .collect(),
2039                ..Default::default()
2040            }),
2041            removed_components: [(
2042                "Component2".to_string(),
2043                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2044            )]
2045            .into_iter()
2046            .collect(),
2047        };
2048
2049        assert_eq!(second_msg, expected_second_msg);
2050    }
2051
2052    #[test_log::test(tokio::test)]
2053    async fn test_public_close_api_functionality() {
2054        // Tests the public close() API through the StateSynchronizer trait:
2055        // - close() fails before start() is called
2056        // - close() succeeds while synchronizer is running
2057        // - close() fails after already closed
2058        // This tests the full start/close lifecycle via the public API
2059
2060        let mut rpc_client = make_mock_client();
2061        let mut deltas_client = MockDeltasClient::new();
2062
2063        // Mock the initial components call
2064        rpc_client
2065            .expect_get_protocol_components()
2066            .returning(|_| {
2067                Ok(ProtocolComponentRequestResponse {
2068                    protocol_components: vec![],
2069                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2070                })
2071            });
2072
2073        // Set up deltas client that will wait for messages (blocking in state_sync)
2074        let (_tx, rx) = channel(1);
2075        deltas_client
2076            .expect_subscribe()
2077            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2078
2079        // Expect unsubscribe call during cleanup
2080        deltas_client
2081            .expect_unsubscribe()
2082            .return_once(|_| Ok(()));
2083
2084        let mut state_sync = ProtocolStateSynchronizer::new(
2085            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2086            true,
2087            ComponentFilter::with_tvl_range(0.0, 0.0),
2088            5, // Enough retries
2089            Duration::from_secs(0),
2090            true,
2091            false,
2092            true,
2093            ArcRPCClient(Arc::new(rpc_client)),
2094            ArcDeltasClient(Arc::new(deltas_client)),
2095            10000_u64, // Long timeout so task doesn't exit on its own
2096        );
2097
2098        state_sync
2099            .initialize()
2100            .await
2101            .expect("Init should succeed");
2102
2103        // Start the synchronizer and test the new split-based close mechanism
2104        let (handle, _rx) = state_sync.start().await;
2105        let (jh, close_tx) = handle.split();
2106
2107        // Give it time to start up and enter state_sync
2108        tokio::time::sleep(Duration::from_millis(100)).await;
2109
2110        // Send close signal should succeed
2111        close_tx
2112            .send(())
2113            .expect("Should be able to send close signal");
2114        // Task should stop cleanly
2115        jh.await.expect("Task should not panic");
2116    }
2117
2118    #[test_log::test(tokio::test)]
2119    async fn test_cleanup_runs_when_state_sync_processing_errors() {
2120        // Tests that cleanup code runs when state_sync() errors during delta processing.
2121        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
2122        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
2123
2124        let mut rpc_client = make_mock_client();
2125        let mut deltas_client = MockDeltasClient::new();
2126
2127        // Mock the initial components call
2128        rpc_client
2129            .expect_get_protocol_components()
2130            .returning(|_| {
2131                Ok(ProtocolComponentRequestResponse {
2132                    protocol_components: vec![],
2133                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2134                })
2135            });
2136
2137        // Mock to fail during snapshot retrieval (this will cause an error during processing)
2138        rpc_client
2139            .expect_get_protocol_states()
2140            .returning(|_| {
2141                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2142            });
2143
2144        // Set up deltas client to send one message that will trigger snapshot retrieval
2145        let (tx, rx) = channel(10);
2146        deltas_client
2147            .expect_subscribe()
2148            .return_once(move |_, _| {
2149                // Send a delta message that will require a snapshot
2150                let delta = BlockChanges {
2151                    extractor: "test".to_string(),
2152                    chain: Chain::Ethereum,
2153                    block: Block {
2154                        hash: Bytes::from("0x0123"),
2155                        number: 1,
2156                        parent_hash: Bytes::from("0x0000"),
2157                        chain: Chain::Ethereum,
2158                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2159                            .unwrap()
2160                            .naive_utc(),
2161                    },
2162                    revert: false,
2163                    // Add a new component to trigger snapshot request
2164                    new_protocol_components: [(
2165                        "new_component".to_string(),
2166                        ProtocolComponent {
2167                            id: "new_component".to_string(),
2168                            protocol_system: "test_protocol".to_string(),
2169                            protocol_type_name: "test".to_string(),
2170                            chain: Chain::Ethereum,
2171                            tokens: vec![Bytes::from("0x0badc0ffee")],
2172                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
2173                            static_attributes: Default::default(),
2174                            creation_tx: Default::default(),
2175                            created_at: Default::default(),
2176                            change: Default::default(),
2177                        },
2178                    )]
2179                    .into_iter()
2180                    .collect(),
2181                    component_tvl: [("new_component".to_string(), 100.0)]
2182                        .into_iter()
2183                        .collect(),
2184                    ..Default::default()
2185                };
2186
2187                tokio::spawn(async move {
2188                    let _ = tx.send(delta).await;
2189                    // Close the channel after sending one message
2190                });
2191
2192                Ok((Uuid::default(), rx))
2193            });
2194
2195        // Expect unsubscribe call during cleanup
2196        deltas_client
2197            .expect_unsubscribe()
2198            .return_once(|_| Ok(()));
2199
2200        let mut state_sync = ProtocolStateSynchronizer::new(
2201            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2202            true,
2203            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2204            1,
2205            Duration::from_secs(0),
2206            true,
2207            false,
2208            true,
2209            ArcRPCClient(Arc::new(rpc_client)),
2210            ArcDeltasClient(Arc::new(deltas_client)),
2211            5000_u64,
2212        );
2213
2214        state_sync
2215            .initialize()
2216            .await
2217            .expect("Init should succeed");
2218
2219        // Before calling state_sync, set a value in last_synced_block
2220        state_sync.last_synced_block = Some(BlockHeader {
2221            hash: Bytes::from("0x0badc0ffee"),
2222            number: 42,
2223            parent_hash: Bytes::from("0xbadbeef0"),
2224            revert: false,
2225            timestamp: 123456789,
2226            partial_block_index: None,
2227        });
2228
2229        // Create a channel for state_sync to send messages to
2230        let (mut block_tx, _block_rx) = channel(10);
2231
2232        // Call state_sync directly - this should error during processing
2233        let (_end_tx, end_rx) = oneshot::channel::<()>();
2234        let result = state_sync
2235            .state_sync(&mut block_tx, end_rx)
2236            .await;
2237        // Verify that state_sync returned an error
2238        assert!(result.is_err(), "state_sync should have errored during processing");
2239
2240        // Note: We can't verify internal state cleanup since state_sync consumes self,
2241        // but the cleanup logic is still tested by the fact that the method returns properly.
2242    }
2243
2244    #[test_log::test(tokio::test)]
2245    async fn test_close_signal_while_waiting_for_first_deltas() {
2246        // Tests close signal handling during the initial "waiting for deltas" phase.
2247        // This is the earliest possible close scenario - before any deltas are received.
2248        // Verifies: close signal received while waiting for first message triggers cleanup
2249        let mut rpc_client = make_mock_client();
2250        let mut deltas_client = MockDeltasClient::new();
2251
2252        rpc_client
2253            .expect_get_protocol_components()
2254            .returning(|_| {
2255                Ok(ProtocolComponentRequestResponse {
2256                    protocol_components: vec![],
2257                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2258                })
2259            });
2260
2261        let (_tx, rx) = channel(1);
2262        deltas_client
2263            .expect_subscribe()
2264            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2265
2266        deltas_client
2267            .expect_unsubscribe()
2268            .return_once(|_| Ok(()));
2269
2270        let mut state_sync = ProtocolStateSynchronizer::new(
2271            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2272            true,
2273            ComponentFilter::with_tvl_range(0.0, 0.0),
2274            1,
2275            Duration::from_secs(0),
2276            true,
2277            true,
2278            false,
2279            ArcRPCClient(Arc::new(rpc_client)),
2280            ArcDeltasClient(Arc::new(deltas_client)),
2281            10000_u64,
2282        );
2283
2284        state_sync
2285            .initialize()
2286            .await
2287            .expect("Init should succeed");
2288
2289        let (mut block_tx, _block_rx) = channel(10);
2290        let (end_tx, end_rx) = oneshot::channel::<()>();
2291
2292        // Start state_sync in a task
2293        let state_sync_handle = tokio::spawn(async move {
2294            state_sync
2295                .state_sync(&mut block_tx, end_rx)
2296                .await
2297        });
2298
2299        // Give it a moment to start
2300        tokio::time::sleep(Duration::from_millis(100)).await;
2301
2302        // Send close signal
2303        let _ = end_tx.send(());
2304
2305        // state_sync should exit cleanly
2306        let result = state_sync_handle
2307            .await
2308            .expect("Task should not panic");
2309        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2310
2311        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2312    }
2313
2314    #[test_log::test(tokio::test)]
2315    async fn test_close_signal_during_main_processing_loop() {
2316        // Tests close signal handling during the main delta processing loop.
2317        // This tests the scenario where first message is processed successfully,
2318        // then close signal is received while waiting for subsequent deltas.
2319        // Verifies: close signal in main loop (after initialization) triggers cleanup
2320
2321        let mut rpc_client = make_mock_client();
2322        let mut deltas_client = MockDeltasClient::new();
2323
2324        // Mock the initial components call
2325        rpc_client
2326            .expect_get_protocol_components()
2327            .returning(|_| {
2328                Ok(ProtocolComponentRequestResponse {
2329                    protocol_components: vec![],
2330                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2331                })
2332            });
2333
2334        // Mock the snapshot retrieval that happens after first message
2335        rpc_client
2336            .expect_get_protocol_states()
2337            .returning(|_| {
2338                Ok(ProtocolStateRequestResponse {
2339                    states: vec![],
2340                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2341                })
2342            });
2343
2344        rpc_client
2345            .expect_get_component_tvl()
2346            .returning(|_| {
2347                Ok(ComponentTvlRequestResponse {
2348                    tvl: HashMap::new(),
2349                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2350                })
2351            });
2352
2353        rpc_client
2354            .expect_get_traced_entry_points()
2355            .returning(|_| {
2356                Ok(TracedEntryPointRequestResponse {
2357                    traced_entry_points: HashMap::new(),
2358                    pagination: PaginationResponse::new(0, 20, 0),
2359                })
2360            });
2361
2362        // Set up deltas client to send one message, then keep channel open
2363        let (tx, rx) = channel(10);
2364        deltas_client
2365            .expect_subscribe()
2366            .return_once(move |_, _| {
2367                // Send first message immediately
2368                let first_delta = BlockChanges {
2369                    extractor: "test".to_string(),
2370                    chain: Chain::Ethereum,
2371                    block: Block {
2372                        hash: Bytes::from("0x0123"),
2373                        number: 1,
2374                        parent_hash: Bytes::from("0x0000"),
2375                        chain: Chain::Ethereum,
2376                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2377                            .unwrap()
2378                            .naive_utc(),
2379                    },
2380                    revert: false,
2381                    ..Default::default()
2382                };
2383
2384                tokio::spawn(async move {
2385                    let _ = tx.send(first_delta).await;
2386                    // Keep the sender alive but don't send more messages
2387                    // This will make the recv() block waiting for the next message
2388                    tokio::time::sleep(Duration::from_secs(30)).await;
2389                });
2390
2391                Ok((Uuid::default(), rx))
2392            });
2393
2394        deltas_client
2395            .expect_unsubscribe()
2396            .return_once(|_| Ok(()));
2397
2398        let mut state_sync = ProtocolStateSynchronizer::new(
2399            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2400            true,
2401            ComponentFilter::with_tvl_range(0.0, 1000.0),
2402            1,
2403            Duration::from_secs(0),
2404            true,
2405            false,
2406            true,
2407            ArcRPCClient(Arc::new(rpc_client)),
2408            ArcDeltasClient(Arc::new(deltas_client)),
2409            10000_u64,
2410        );
2411
2412        state_sync
2413            .initialize()
2414            .await
2415            .expect("Init should succeed");
2416
2417        let (mut block_tx, mut block_rx) = channel(10);
2418        let (end_tx, end_rx) = oneshot::channel::<()>();
2419
2420        // Start state_sync in a task
2421        let state_sync_handle = tokio::spawn(async move {
2422            state_sync
2423                .state_sync(&mut block_tx, end_rx)
2424                .await
2425        });
2426
2427        // Wait for the first message to be processed (snapshot sent)
2428        let first_snapshot = block_rx
2429            .recv()
2430            .await
2431            .expect("Should receive first snapshot")
2432            .expect("Synchronizer error");
2433        assert!(
2434            !first_snapshot
2435                .snapshots
2436                .states
2437                .is_empty() ||
2438                first_snapshot.deltas.is_some()
2439        );
2440        // Now send close signal - this should be handled in the main processing loop
2441        let _ = end_tx.send(());
2442
2443        // state_sync should exit cleanly after receiving close signal in main loop
2444        let result = state_sync_handle
2445            .await
2446            .expect("Task should not panic");
2447        assert!(
2448            result.is_ok(),
2449            "state_sync should exit cleanly when closed after first message: {result:?}"
2450        );
2451    }
2452
2453    #[test_log::test(tokio::test)]
2454    async fn test_max_retries_exceeded_error_propagation() {
2455        // Test that when max_retries is exceeded, the final error is sent through the channel
2456        // to the receiver and the synchronizer task exits cleanly
2457
2458        let mut rpc_client = make_mock_client();
2459        let mut deltas_client = MockDeltasClient::new();
2460
2461        // Mock the initial components call to succeed
2462        rpc_client
2463            .expect_get_protocol_components()
2464            .returning(|_| {
2465                Ok(ProtocolComponentRequestResponse {
2466                    protocol_components: vec![],
2467                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2468                })
2469            });
2470
2471        // Set up deltas client to consistently fail after subscription
2472        // This will cause connection errors and trigger retries
2473        deltas_client
2474            .expect_subscribe()
2475            .returning(|_, _| {
2476                // Return a connection error to trigger retries
2477                Err(DeltasError::NotConnected)
2478            });
2479
2480        // Expect multiple unsubscribe calls during retries
2481        deltas_client
2482            .expect_unsubscribe()
2483            .returning(|_| Ok(()))
2484            .times(0..=5);
2485
2486        // Create synchronizer with only 2 retries and short cooldown
2487        let mut state_sync = ProtocolStateSynchronizer::new(
2488            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2489            true,
2490            ComponentFilter::with_tvl_range(0.0, 1000.0),
2491            2,                         // max_retries = 2
2492            Duration::from_millis(10), // short retry cooldown
2493            true,
2494            false,
2495            true,
2496            ArcRPCClient(Arc::new(rpc_client)),
2497            ArcDeltasClient(Arc::new(deltas_client)),
2498            1000_u64,
2499        );
2500
2501        state_sync
2502            .initialize()
2503            .await
2504            .expect("Init should succeed");
2505
2506        // Start the synchronizer - it should fail to subscribe and retry
2507        let (handle, mut rx) = state_sync.start().await;
2508        let (jh, _close_tx) = handle.split();
2509
2510        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2511            .await
2512            .expect("responsds in time")
2513            .expect("channel open");
2514
2515        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2516        if let Err(err) = res {
2517            assert!(
2518                matches!(err, SynchronizerError::ConnectionClosed),
2519                "Expected ConnectionClosed error, got: {:?}",
2520                err
2521            );
2522        } else {
2523            panic!("Expected an error")
2524        }
2525
2526        // The task should complete (not hang) after max retries
2527        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2528        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2529    }
2530
2531    #[test_log::test(tokio::test)]
2532    async fn test_is_next_expected() {
2533        // Test the is_next_expected function to ensure it correctly identifies
2534        // when an incoming block is the expected next block in the chain
2535
2536        let mut state_sync = with_mocked_clients(true, false, None, None);
2537
2538        // Test 1: No previous block - should return false
2539        let incoming_header = BlockHeader {
2540            number: 100,
2541            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2542            parent_hash: Bytes::from(
2543                "0x0000000000000000000000000000000000000000000000000000000000000000",
2544            ),
2545            revert: false,
2546            timestamp: 123456789,
2547            partial_block_index: None,
2548        };
2549        assert!(
2550            !state_sync.is_next_expected(&incoming_header),
2551            "Should return false when no previous block is set"
2552        );
2553
2554        // Test 2: Set a previous block and test with matching parent hash
2555        let previous_header = BlockHeader {
2556            number: 99,
2557            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2558            parent_hash: Bytes::from(
2559                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2560            ),
2561            revert: false,
2562            timestamp: 123456788,
2563            partial_block_index: None,
2564        };
2565        state_sync.last_synced_block = Some(previous_header.clone());
2566
2567        assert!(
2568            state_sync.is_next_expected(&incoming_header),
2569            "Should return true when incoming parent_hash matches previous hash"
2570        );
2571
2572        // Test 3: Test with non-matching parent hash
2573        let non_matching_header = BlockHeader {
2574            number: 100,
2575            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2576            parent_hash: Bytes::from(
2577                "0x1111111111111111111111111111111111111111111111111111111111111111",
2578            ), // Wrong parent hash
2579            revert: false,
2580            timestamp: 123456789,
2581            partial_block_index: None,
2582        };
2583        assert!(
2584            !state_sync.is_next_expected(&non_matching_header),
2585            "Should return false when incoming parent_hash doesn't match previous hash"
2586        );
2587    }
2588
2589    #[test_log::test(tokio::test)]
2590    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2591        // Test that on synchronizer restart with the next expected block,
2592        // get_snapshot is not called and only deltas are sent
2593
2594        let mut rpc_client = make_mock_client();
2595        let mut deltas_client = MockDeltasClient::new();
2596
2597        // Mock the initial components call
2598        rpc_client
2599            .expect_get_protocol_components()
2600            .returning(|_| {
2601                Ok(ProtocolComponentRequestResponse {
2602                    protocol_components: vec![ProtocolComponent {
2603                        id: "Component1".to_string(),
2604                        ..Default::default()
2605                    }],
2606                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2607                })
2608            });
2609
2610        // Set up deltas client to send a message that is the next expected block
2611        let (tx, rx) = channel(10);
2612        deltas_client
2613            .expect_subscribe()
2614            .return_once(move |_, _| {
2615                let expected_next_delta = BlockChanges {
2616                    extractor: "uniswap-v2".to_string(),
2617                    chain: Chain::Ethereum,
2618                    block: Block {
2619                        hash: Bytes::from(
2620                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2621                        ), // This will be the next expected block
2622                        number: 2,
2623                        parent_hash: Bytes::from(
2624                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2625                        ), // This matches our last synced block hash
2626                        chain: Chain::Ethereum,
2627                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2628                            .unwrap()
2629                            .naive_utc(),
2630                    },
2631                    revert: false,
2632                    ..Default::default()
2633                };
2634
2635                tokio::spawn(async move {
2636                    let _ = tx.send(expected_next_delta).await;
2637                });
2638
2639                Ok((Uuid::default(), rx))
2640            });
2641
2642        deltas_client
2643            .expect_unsubscribe()
2644            .return_once(|_| Ok(()));
2645
2646        let mut state_sync = ProtocolStateSynchronizer::new(
2647            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2648            true,
2649            ComponentFilter::with_tvl_range(0.0, 1000.0),
2650            1,
2651            Duration::from_secs(0),
2652            true, // include_snapshots = true
2653            false,
2654            true,
2655            ArcRPCClient(Arc::new(rpc_client)),
2656            ArcDeltasClient(Arc::new(deltas_client)),
2657            10000_u64,
2658        );
2659
2660        // Initialize and set up the last synced block to simulate a restart scenario
2661        state_sync
2662            .initialize()
2663            .await
2664            .expect("Init should succeed");
2665
2666        // Set last_synced_block to simulate that we've previously synced block 1
2667        state_sync.last_synced_block = Some(BlockHeader {
2668            number: 1,
2669            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2670            parent_hash: Bytes::from(
2671                "0x0000000000000000000000000000000000000000000000000000000000000000",
2672            ),
2673            revert: false,
2674            timestamp: 123456789,
2675            partial_block_index: None,
2676        });
2677
2678        let (mut block_tx, mut block_rx) = channel(10);
2679        let (end_tx, end_rx) = oneshot::channel::<()>();
2680
2681        // Start state_sync
2682        let state_sync_handle = tokio::spawn(async move {
2683            state_sync
2684                .state_sync(&mut block_tx, end_rx)
2685                .await
2686        });
2687
2688        // Wait for the message - it should be a delta-only message (no snapshots)
2689        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2690            .await
2691            .expect("Should receive message within timeout")
2692            .expect("Channel should be open")
2693            .expect("Should not be an error");
2694
2695        // Send close signal
2696        let _ = end_tx.send(());
2697
2698        // Wait for state_sync to finish
2699        let _ = state_sync_handle
2700            .await
2701            .expect("Task should not panic");
2702
2703        // Verify the message contains deltas but no snapshots
2704        // (because we skipped snapshot retrieval)
2705        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2706        assert!(
2707            result_msg.snapshots.states.is_empty(),
2708            "Should not contain snapshots when next expected block is received"
2709        );
2710
2711        // Verify the block details match our expected next block
2712        if let Some(deltas) = &result_msg.deltas {
2713            assert_eq!(deltas.block.number, 2);
2714            assert_eq!(
2715                deltas.block.hash,
2716                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2717            );
2718            assert_eq!(
2719                deltas.block.parent_hash,
2720                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2721            );
2722        }
2723    }
2724
2725    #[test_log::test(tokio::test)]
2726    async fn test_skip_previously_processed_messages() {
2727        // Test that the synchronizer skips messages for blocks that have already been processed
2728        // This simulates a service restart scenario where old messages are re-emitted
2729
2730        let mut rpc_client = make_mock_client();
2731        let mut deltas_client = MockDeltasClient::new();
2732
2733        // Mock the initial components call
2734        rpc_client
2735            .expect_get_protocol_components()
2736            .returning(|_| {
2737                Ok(ProtocolComponentRequestResponse {
2738                    protocol_components: vec![ProtocolComponent {
2739                        id: "Component1".to_string(),
2740                        ..Default::default()
2741                    }],
2742                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2743                })
2744            });
2745
2746        // Mock snapshot calls for when we process the expected next block (block 6)
2747        rpc_client
2748            .expect_get_protocol_states()
2749            .returning(|_| {
2750                Ok(ProtocolStateRequestResponse {
2751                    states: vec![ResponseProtocolState {
2752                        component_id: "Component1".to_string(),
2753                        ..Default::default()
2754                    }],
2755                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2756                })
2757            });
2758
2759        rpc_client
2760            .expect_get_component_tvl()
2761            .returning(|_| {
2762                Ok(ComponentTvlRequestResponse {
2763                    tvl: [("Component1".to_string(), 100.0)]
2764                        .into_iter()
2765                        .collect(),
2766                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2767                })
2768            });
2769
2770        rpc_client
2771            .expect_get_traced_entry_points()
2772            .returning(|_| {
2773                Ok(TracedEntryPointRequestResponse {
2774                    traced_entry_points: HashMap::new(),
2775                    pagination: PaginationResponse::new(0, 20, 0),
2776                })
2777            });
2778
2779        // Set up deltas client to send old messages first, then the expected next block
2780        let (tx, rx) = channel(10);
2781        deltas_client
2782            .expect_subscribe()
2783            .return_once(move |_, _| {
2784                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2785                let old_messages = vec![
2786                    BlockChanges {
2787                        extractor: "uniswap-v2".to_string(),
2788                        chain: Chain::Ethereum,
2789                        block: Block {
2790                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2791                            number: 3,
2792                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2793                            chain: Chain::Ethereum,
2794                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2795                        },
2796                        revert: false,
2797                        ..Default::default()
2798                    },
2799                    BlockChanges {
2800                        extractor: "uniswap-v2".to_string(),
2801                        chain: Chain::Ethereum,
2802                        block: Block {
2803                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2804                            number: 4,
2805                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2806                            chain: Chain::Ethereum,
2807                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2808                        },
2809                        revert: false,
2810                        ..Default::default()
2811                    },
2812                    BlockChanges {
2813                        extractor: "uniswap-v2".to_string(),
2814                        chain: Chain::Ethereum,
2815                        block: Block {
2816                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2817                            number: 5,
2818                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2819                            chain: Chain::Ethereum,
2820                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2821                        },
2822                        revert: false,
2823                        ..Default::default()
2824                    },
2825                    // This is the expected next block (block 6)
2826                    BlockChanges {
2827                        extractor: "uniswap-v2".to_string(),
2828                        chain: Chain::Ethereum,
2829                        block: Block {
2830                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2831                            number: 6,
2832                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2833                            chain: Chain::Ethereum,
2834                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2835                        },
2836                        revert: false,
2837                        ..Default::default()
2838                    },
2839                ];
2840
2841                tokio::spawn(async move {
2842                    for message in old_messages {
2843                        let _ = tx.send(message).await;
2844                        tokio::time::sleep(Duration::from_millis(10)).await;
2845                    }
2846                });
2847
2848                Ok((Uuid::default(), rx))
2849            });
2850
2851        deltas_client
2852            .expect_unsubscribe()
2853            .return_once(|_| Ok(()));
2854
2855        let mut state_sync = ProtocolStateSynchronizer::new(
2856            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2857            true,
2858            ComponentFilter::with_tvl_range(0.0, 1000.0),
2859            1,
2860            Duration::from_secs(0),
2861            true,
2862            true,
2863            true,
2864            ArcRPCClient(Arc::new(rpc_client)),
2865            ArcDeltasClient(Arc::new(deltas_client)),
2866            10000_u64,
2867        );
2868
2869        // Initialize and set last_synced_block to simulate we've already processed block 5
2870        state_sync
2871            .initialize()
2872            .await
2873            .expect("Init should succeed");
2874
2875        state_sync.last_synced_block = Some(BlockHeader {
2876            number: 5,
2877            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2878            parent_hash: Bytes::from(
2879                "0x0000000000000000000000000000000000000000000000000000000000000004",
2880            ),
2881            revert: false,
2882            timestamp: 1234567892,
2883            partial_block_index: None,
2884        });
2885
2886        let (mut block_tx, mut block_rx) = channel(10);
2887        let (end_tx, end_rx) = oneshot::channel::<()>();
2888
2889        // Start state_sync
2890        let state_sync_handle = tokio::spawn(async move {
2891            state_sync
2892                .state_sync(&mut block_tx, end_rx)
2893                .await
2894        });
2895
2896        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2897        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2898            .await
2899            .expect("Should receive message within timeout")
2900            .expect("Channel should be open")
2901            .expect("Should not be an error");
2902
2903        // Send close signal
2904        let _ = end_tx.send(());
2905
2906        // Wait for state_sync to finish
2907        let _ = state_sync_handle
2908            .await
2909            .expect("Task should not panic");
2910
2911        // Verify we only got the message for block 6 (the expected next block)
2912        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2913        if let Some(deltas) = &result_msg.deltas {
2914            assert_eq!(
2915                deltas.block.number, 6,
2916                "Should only process block 6, skipping earlier blocks"
2917            );
2918            assert_eq!(
2919                deltas.block.hash,
2920                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2921            );
2922        }
2923
2924        // Verify that no additional messages are received immediately
2925        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2926        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2927            Err(_) => {
2928                // Timeout is expected - no more messages should come
2929            }
2930            Ok(Some(Err(_))) => {
2931                // Error received is also acceptable (connection closed)
2932            }
2933            Ok(Some(Ok(_))) => {
2934                panic!("Should not receive additional messages - old blocks should be skipped");
2935            }
2936            Ok(None) => {
2937                // Channel closed is also acceptable
2938            }
2939        }
2940    }
2941
2942    fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
2943        // Use vec to create Bytes from block number
2944        let hash = Bytes::from(vec![block_num as u8; 32]);
2945        let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
2946        BlockChanges {
2947            extractor: "uniswap-v2".to_string(),
2948            chain: Chain::Ethereum,
2949            block: Block {
2950                number: block_num,
2951                hash,
2952                parent_hash,
2953                chain: Chain::Ethereum,
2954                ts: Default::default(),
2955            },
2956            revert: false,
2957            partial_block_index: partial_idx,
2958            ..Default::default()
2959        }
2960    }
2961
2962    /// Test that full block as first message in partial mode is accepted
2963    #[test_log::test(tokio::test)]
2964    async fn test_partial_mode_accepts_full_block_as_first_message() {
2965        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2966        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2967            .with_partial_blocks(true);
2968        state_sync
2969            .initialize()
2970            .await
2971            .expect("Init failed");
2972
2973        let (handle, mut block_rx) = state_sync.start().await;
2974        let (jh, close_tx) = handle.split();
2975
2976        // Send full block as first message - should be accepted
2977        tx.send(make_block_changes(1, None))
2978            .await
2979            .unwrap();
2980
2981        // Should receive the full block immediately
2982        let msg = timeout(Duration::from_millis(100), block_rx.recv())
2983            .await
2984            .expect("Should receive message")
2985            .expect("Channel open")
2986            .expect("No error");
2987
2988        assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
2989        assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
2990
2991        let _ = close_tx.send(());
2992        jh.await.expect("Task should not panic");
2993    }
2994
2995    /// Test that block number increase is detected as new block
2996    #[test_log::test(tokio::test)]
2997    async fn test_partial_mode_detects_block_number_increase() {
2998        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2999        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3000            .with_partial_blocks(true);
3001        state_sync
3002            .initialize()
3003            .await
3004            .expect("Init failed");
3005
3006        let (handle, mut block_rx) = state_sync.start().await;
3007        let (jh, close_tx) = handle.split();
3008
3009        // Send partial messages for block 1 (will be skipped - waiting for new block)
3010        tx.send(make_block_changes(1, Some(0)))
3011            .await
3012            .unwrap();
3013        tx.send(make_block_changes(1, Some(3)))
3014            .await
3015            .unwrap();
3016
3017        // Verify no message received yet
3018        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3019            Err(_) => { /* Expected: timeout, no message yet */ }
3020            Ok(_) => panic!("Should not receive message while waiting for new block"),
3021        }
3022
3023        // Send partial for block 2 with HIGHER index (5 > 3) - should still be detected
3024        // because block number increased
3025        tx.send(make_block_changes(2, Some(5)))
3026            .await
3027            .unwrap();
3028
3029        // Should receive the message for block 2
3030        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3031            .await
3032            .expect("Should receive message")
3033            .expect("Channel open")
3034            .expect("No error");
3035
3036        assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3037        assert_eq!(msg.header.partial_block_index, Some(5));
3038
3039        let _ = close_tx.send(());
3040        jh.await.expect("Task should not panic");
3041    }
3042
3043    /// Test that partial mode skips new blocks that are already synced
3044    #[test_log::test(tokio::test)]
3045    async fn test_partial_mode_skips_already_synced_blocks() {
3046        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3047        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3048            .with_partial_blocks(true);
3049        state_sync
3050            .initialize()
3051            .await
3052            .expect("Init failed");
3053
3054        // Set last_synced_block to block 5 - we've already synced up to here
3055        state_sync.last_synced_block = Some(BlockHeader {
3056            number: 5,
3057            hash: Bytes::from("0x05"),
3058            parent_hash: Bytes::from("0x04"),
3059            revert: false,
3060            timestamp: 0,
3061            partial_block_index: None,
3062        });
3063
3064        let (handle, mut block_rx) = state_sync.start().await;
3065        let (jh, close_tx) = handle.split();
3066
3067        // Send partial for block 3 to establish baseline
3068        tx.send(make_block_changes(3, Some(2)))
3069            .await
3070            .unwrap();
3071
3072        // Send "new block" for block 4 (partial index decreased) - but block 4 < last_synced (5)
3073        tx.send(make_block_changes(4, Some(0)))
3074            .await
3075            .unwrap();
3076
3077        // Should be skipped because block 4 is already synced
3078        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3079            Err(_) => { /* Expected: skipped because already synced */ }
3080            Ok(_) => panic!("Should skip block 4 because it's already synced"),
3081        }
3082
3083        // Now send new block for block 6 (after last_synced)
3084        // First establish new partial index
3085        tx.send(make_block_changes(5, Some(3)))
3086            .await
3087            .unwrap();
3088        // Then trigger new block detection
3089        tx.send(make_block_changes(6, Some(0)))
3090            .await
3091            .unwrap();
3092
3093        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3094            .await
3095            .expect("Should receive message")
3096            .expect("Channel open")
3097            .expect("No error");
3098
3099        assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3100
3101        let _ = close_tx.send(());
3102        jh.await.expect("Task should not panic");
3103    }
3104
3105    #[test_log::test(tokio::test)]
3106    async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
3107        let header = BlockHeader::default();
3108        let mut rpc = make_mock_client();
3109        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3110
3111        let component_clone = component.clone();
3112        rpc.expect_get_snapshots()
3113            .returning(move |_request, _chunk_size, _concurrency| {
3114                Ok(Snapshot {
3115                    states: [(
3116                        "Component1".to_string(),
3117                        ComponentWithState {
3118                            state: ResponseProtocolState {
3119                                component_id: "Component1".to_string(),
3120                                ..Default::default()
3121                            },
3122                            component: component_clone.clone(),
3123                            entrypoints: vec![],
3124                            component_tvl: None,
3125                        },
3126                    )]
3127                    .into_iter()
3128                    .collect(),
3129                    vm_storage: HashMap::new(),
3130                })
3131            });
3132
3133        // get_traced_entry_points should NOT be called for a non-DCI protocol
3134        rpc.expect_get_traced_entry_points()
3135            .never();
3136
3137        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
3138        // uses_dci defaults to false, no .with_dci() call needed
3139        state_sync
3140            .component_tracker
3141            .components
3142            .insert("Component1".to_string(), component);
3143
3144        let components_arg = ["Component1".to_string()];
3145        let snap = state_sync
3146            .get_snapshots(header, Some(&components_arg))
3147            .await
3148            .expect("Retrieving snapshot failed");
3149
3150        assert!(snap
3151            .snapshots
3152            .states
3153            .contains_key("Component1"));
3154    }
3155
3156    #[test_log::test(tokio::test)]
3157    async fn test_get_snapshots_fetches_entrypoints_when_dci() {
3158        let header = BlockHeader::default();
3159        let mut rpc = make_mock_client();
3160        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
3161
3162        let component_clone = component.clone();
3163        rpc.expect_get_snapshots()
3164            .returning(move |_request, _chunk_size, _concurrency| {
3165                Ok(Snapshot {
3166                    states: [(
3167                        "Component1".to_string(),
3168                        ComponentWithState {
3169                            state: ResponseProtocolState {
3170                                component_id: "Component1".to_string(),
3171                                ..Default::default()
3172                            },
3173                            component: component_clone.clone(),
3174                            entrypoints: vec![],
3175                            component_tvl: None,
3176                        },
3177                    )]
3178                    .into_iter()
3179                    .collect(),
3180                    vm_storage: HashMap::new(),
3181                })
3182            });
3183
3184        // get_traced_entry_points SHOULD be called for a DCI protocol
3185        rpc.expect_get_traced_entry_points()
3186            .times(1)
3187            .returning(|_| {
3188                Ok(TracedEntryPointRequestResponse {
3189                    traced_entry_points: HashMap::new(),
3190                    pagination: PaginationResponse::new(0, 20, 0),
3191                })
3192            });
3193
3194        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
3195        state_sync
3196            .component_tracker
3197            .components
3198            .insert("Component1".to_string(), component);
3199
3200        let components_arg = ["Component1".to_string()];
3201        let snap = state_sync
3202            .get_snapshots(header, Some(&components_arg))
3203            .await
3204            .expect("Retrieving snapshot failed");
3205
3206        assert!(snap
3207            .snapshots
3208            .states
3209            .contains_key("Component1"));
3210    }
3211}