Skip to main content

tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59
60    /// Internal error that should not happen under normal operation.
61    #[error("Internal error: {0}")]
62    Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68    fn from(err: SendError<T>) -> Self {
69        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70    }
71}
72
73impl From<DeltasError> for SynchronizerError {
74    fn from(err: DeltasError) -> Self {
75        match err {
76            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77            _ => SynchronizerError::ConnectionError(err.to_string()),
78        }
79    }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83    extractor_id: ExtractorIdentity,
84    retrieve_balances: bool,
85    rpc_client: R,
86    deltas_client: D,
87    max_retries: u64,
88    retry_cooldown: Duration,
89    include_snapshots: bool,
90    component_tracker: ComponentTracker<R>,
91    last_synced_block: Option<BlockHeader>,
92    timeout: u64,
93    include_tvl: bool,
94    compression: bool,
95    partial_blocks: bool,
96}
97
98#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
99pub struct ComponentWithState {
100    pub state: ResponseProtocolState,
101    pub component: ProtocolComponent,
102    pub component_tvl: Option<f64>,
103    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
104}
105
106#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
107pub struct Snapshot {
108    pub states: HashMap<String, ComponentWithState>,
109    pub vm_storage: HashMap<Bytes, ResponseAccount>,
110}
111
112impl Snapshot {
113    fn extend(&mut self, other: Snapshot) {
114        self.states.extend(other.states);
115        self.vm_storage.extend(other.vm_storage);
116    }
117
118    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
119        &self.states
120    }
121
122    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
123        &self.vm_storage
124    }
125}
126
127#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
128pub struct StateSyncMessage<H>
129where
130    H: HeaderLike,
131{
132    /// The block information for this update.
133    pub header: H,
134    /// Snapshot for new components.
135    pub snapshots: Snapshot,
136    /// A single delta contains state updates for all tracked components, as well as additional
137    /// information about the system components e.g. newly added components (even below tvl), tvl
138    /// updates, balance updates.
139    pub deltas: Option<BlockChanges>,
140    /// Components that stopped being tracked.
141    pub removed_components: HashMap<String, ProtocolComponent>,
142}
143
144impl<H> StateSyncMessage<H>
145where
146    H: HeaderLike,
147{
148    pub fn merge(mut self, other: Self) -> Self {
149        // be careful with removed and snapshots attributes here, these can be ambiguous.
150        self.removed_components
151            .retain(|k, _| !other.snapshots.states.contains_key(k));
152        self.snapshots
153            .states
154            .retain(|k, _| !other.removed_components.contains_key(k));
155
156        self.snapshots.extend(other.snapshots);
157        let deltas = match (self.deltas, other.deltas) {
158            (Some(l), Some(r)) => Some(l.merge(r)),
159            (None, Some(r)) => Some(r),
160            (Some(l), None) => Some(l),
161            (None, None) => None,
162        };
163        self.removed_components
164            .extend(other.removed_components);
165        Self {
166            header: other.header,
167            snapshots: self.snapshots,
168            deltas,
169            removed_components: self.removed_components,
170        }
171    }
172}
173
174/// Handle for controlling a running synchronizer task.
175///
176/// This handle provides methods to gracefully shut down the synchronizer
177/// and await its completion with a timeout.
178pub struct SynchronizerTaskHandle {
179    join_handle: JoinHandle<()>,
180    close_tx: oneshot::Sender<()>,
181}
182
183/// StateSynchronizer
184///
185/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
186/// delivering messages to the client that let him reconstruct subsets of the protocol state.
187///
188/// This involves deciding which components to track according to the clients preferences,
189/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
190/// delivering delta messages for the components that have changed.
191impl SynchronizerTaskHandle {
192    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
193        Self { join_handle, close_tx }
194    }
195
196    /// Splits the handle into its join handle and close sender.
197    ///
198    /// This allows monitoring the task completion separately from controlling shutdown.
199    /// The join handle can be used with FuturesUnordered for monitoring, while the
200    /// close sender can be used to signal graceful shutdown.
201    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
202        (self.join_handle, self.close_tx)
203    }
204}
205
206#[async_trait]
207pub trait StateSynchronizer: Send + Sync + 'static {
208    async fn initialize(&mut self) -> SyncResult<()>;
209    /// Starts the state synchronization, consuming the synchronizer.
210    /// Returns a handle for controlling the running task and a receiver for messages.
211    async fn start(
212        mut self,
213    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
214}
215
216impl<R, D> ProtocolStateSynchronizer<R, D>
217where
218    // TODO: Consider moving these constraints directly to the
219    // client...
220    R: RPCClient + Clone + Send + Sync + 'static,
221    D: DeltasClient + Clone + Send + Sync + 'static,
222{
223    /// Creates a new state synchronizer.
224    #[allow(clippy::too_many_arguments)]
225    pub fn new(
226        extractor_id: ExtractorIdentity,
227        retrieve_balances: bool,
228        component_filter: ComponentFilter,
229        max_retries: u64,
230        retry_cooldown: Duration,
231        include_snapshots: bool,
232        include_tvl: bool,
233        compression: bool,
234        rpc_client: R,
235        deltas_client: D,
236        timeout: u64,
237    ) -> Self {
238        Self {
239            extractor_id: extractor_id.clone(),
240            retrieve_balances,
241            rpc_client: rpc_client.clone(),
242            include_snapshots,
243            deltas_client,
244            component_tracker: ComponentTracker::new(
245                extractor_id.chain,
246                extractor_id.name.as_str(),
247                component_filter,
248                rpc_client,
249            ),
250            max_retries,
251            retry_cooldown,
252            last_synced_block: None,
253            timeout,
254            include_tvl,
255            compression,
256            partial_blocks: false,
257        }
258    }
259
260    /// Enables receiving partial block updates.
261    pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
262        self.partial_blocks = partial_blocks;
263        self
264    }
265
266    /// Retrieves state snapshots of the requested components
267    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
268        &mut self,
269        header: BlockHeader,
270        ids: Option<I>,
271    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
272        if !self.include_snapshots {
273            return Ok(StateSyncMessage { header, ..Default::default() });
274        }
275
276        // Use given ids or use all if not passed
277        let component_ids: Vec<_> = match ids {
278            Some(ids) => ids.into_iter().cloned().collect(),
279            None => self
280                .component_tracker
281                .get_tracked_component_ids(),
282        };
283
284        if component_ids.is_empty() {
285            return Ok(StateSyncMessage { header, ..Default::default() });
286        }
287
288        // TODO: Find a smarter way to dynamically detect DCI protocols.
289        const DCI_PROTOCOLS: &[&str] = &[
290            "uniswap_v4_hooks",
291            "vm:curve",
292            "vm:balancer_v2",
293            "vm:balancer_v3",
294            "fluid_v1",
295            "erc4626",
296        ];
297        let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
298            let result = self
299                .rpc_client
300                .get_traced_entry_points_paginated(
301                    self.extractor_id.chain,
302                    &self.extractor_id.name,
303                    &component_ids,
304                    None,
305                    RPC_CLIENT_CONCURRENCY,
306                )
307                .await?;
308            self.component_tracker
309                .process_entrypoints(&result.clone().into());
310            result.traced_entry_points.clone()
311        } else {
312            HashMap::new()
313        };
314
315        // Get contract IDs from component tracker
316        let contract_ids: Vec<Bytes> = self
317            .component_tracker
318            .get_contracts_by_component(&component_ids)
319            .into_iter()
320            .collect();
321
322        let request = SnapshotParameters::new(
323            self.extractor_id.chain,
324            &self.extractor_id.name,
325            &self.component_tracker.components,
326            &contract_ids,
327            header.number,
328        )
329        .entrypoints(&entrypoints_result)
330        .include_balances(self.retrieve_balances)
331        .include_tvl(self.include_tvl);
332        let snapshot_response = self
333            .rpc_client
334            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
335            .await?;
336
337        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
338        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
339
340        Ok(StateSyncMessage {
341            header,
342            snapshots: snapshot_response,
343            deltas: None,
344            removed_components: HashMap::new(),
345        })
346    }
347
348    /// Main method that does all the work.
349    ///
350    /// ## Return Value
351    ///
352    /// Returns a `Result` where:
353    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
354    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
355    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
356    ///   retry)
357    ///
358    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
359    /// signal may still arrive and we want to remain cancellable across retries.
360    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
361    async fn state_sync(
362        &mut self,
363        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
364        mut end_rx: oneshot::Receiver<()>,
365    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
366        // initialisation
367        let subscription_options = SubscriptionOptions::new()
368            .with_state(self.include_snapshots)
369            .with_compression(self.compression)
370            .with_partial_blocks(self.partial_blocks);
371        let (subscription_id, mut msg_rx) = match self
372            .deltas_client
373            .subscribe(self.extractor_id.clone(), subscription_options)
374            .await
375        {
376            Ok(result) => result,
377            Err(e) => return Err((e.into(), Some(end_rx))),
378        };
379
380        let result = async {
381            info!("Waiting for deltas...");
382            let mut warned_waiting_for_new_block = false;
383            let mut warned_skipping_synced = false;
384            // Track the last seen block number such that we know when we get the first partial
385            let mut last_block_number: Option<u64> = None;
386            let mut first_msg = loop {
387                let msg = select! {
388                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
389                        deltas_result
390                            .map_err(|_| {
391                                SynchronizerError::Timeout(format!(
392                                    "First deltas took longer than {t}s to arrive",
393                                    t = self.timeout
394                                ))
395                            })?
396                            .ok_or_else(|| {
397                                SynchronizerError::ConnectionError(
398                                    "Deltas channel closed before first message".to_string(),
399                                )
400                            })?
401                    },
402                    _ = &mut end_rx => {
403                        info!("Received close signal while waiting for first deltas");
404                        return Ok(());
405                    }
406                };
407
408                let incoming: BlockHeader = (&msg).into();
409
410                // Determine if this message is a candidate for starting synchronization.
411                // In partial mode, we wait for a new block to start (block number increases).
412                // In non-partial mode, all messages are candidates.
413                let is_new_block_candidate = if self.partial_blocks {
414                    match msg.partial_block_index {
415                        None => {
416                            // If we get a full block, it is a candidate
417                            last_block_number = Some(incoming.number);
418                            true
419                        }
420                        Some(current_partial_idx) => {
421                            let is_new_block = last_block_number
422                                .map(|prev_block| incoming.number > prev_block)
423                                .unwrap_or(false);
424
425                            if !warned_waiting_for_new_block {
426                                info!(
427                                    extractor=%self.extractor_id,
428                                    block=incoming.number,
429                                    partial_idx=current_partial_idx,
430                                    "Syncing. Waiting for new block to start"
431                                );
432                                warned_waiting_for_new_block = true;
433                            }
434                            last_block_number = Some(incoming.number);
435                            is_new_block
436                        }
437                    }
438                } else {
439                    true // Non-partial mode: all messages are candidates
440                };
441
442                if !is_new_block_candidate {
443                    continue;
444                }
445
446                // Check if we've already synced this block (applies to both modes)
447                if let Some(current) = &self.last_synced_block {
448                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
449                        if !warned_skipping_synced {
450                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
451                            warned_skipping_synced = true;
452                        }
453                        continue;
454                    }
455                }
456                break msg;
457            };
458
459            self.filter_deltas(&mut first_msg);
460
461            // initial snapshot
462            info!(height = first_msg.get_block().number, "First deltas received");
463            let header: BlockHeader = (&first_msg).into();
464            let deltas_msg = StateSyncMessage {
465                header: header.clone(),
466                snapshots: Default::default(),
467                deltas: Some(first_msg),
468                removed_components: Default::default(),
469            };
470
471            // If possible skip retrieving snapshots
472            let msg = if !self.is_next_expected(&header) {
473                info!("Retrieving snapshot");
474                 let snapshot_header = BlockHeader { revert: false, ..header.clone() };
475                let snapshot = self
476                    .get_snapshots::<Vec<&String>>(
477                        snapshot_header,
478                        None,
479                    )
480                    .await?
481                    .merge(deltas_msg);
482                let n_components = self.component_tracker.components.len();
483                let n_snapshots = snapshot.snapshots.states.len();
484                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
485                snapshot
486            } else {
487                deltas_msg
488            };
489            block_tx.send(Ok(msg)).await?;
490            self.last_synced_block = Some(header.clone());
491            loop {
492                select! {
493                    deltas_opt = msg_rx.recv() => {
494                        if let Some(mut deltas) = deltas_opt {
495                            let header: BlockHeader = (&deltas).into();
496                            debug!(block_number=?header.number, "Received delta message");
497
498                            let (snapshots, removed_components) = {
499                                // 1. Remove components based on latest changes
500                                // 2. Add components based on latest changes, query those for snapshots
501                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
502
503                                // Only components we don't track yet need a snapshot,
504                                let requiring_snapshot: Vec<_> = to_add
505                                    .iter()
506                                    .filter(|id| {
507                                        !self.component_tracker
508                                            .components
509                                            .contains_key(id.as_str())
510                                    })
511                                    .collect();
512                                debug!(components=?requiring_snapshot, "SnapshotRequest");
513                                self.component_tracker
514                                    .start_tracking(requiring_snapshot.as_slice())
515                                    .await?;
516
517                                let snapshots = self
518                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
519                                    .await?
520                                    .snapshots;
521
522                                let removed_components = if !to_remove.is_empty() {
523                                    self.component_tracker.stop_tracking(&to_remove)
524                                } else {
525                                    Default::default()
526                                };
527
528                                (snapshots, removed_components)
529                            };
530
531                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
532                            self.component_tracker.process_entrypoints(&deltas.dci_update);
533
534                            // 4. Filter deltas by currently tracked components / contracts
535                            self.filter_deltas(&mut deltas);
536                            let n_changes = deltas.n_changes();
537
538                            // 5. Send the message
539                            let next = StateSyncMessage {
540                                header: header.clone(),
541                                snapshots,
542                                deltas: Some(deltas),
543                                removed_components,
544                            };
545                            block_tx.send(Ok(next)).await?;
546                            self.last_synced_block = Some(header.clone());
547
548                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
549                        } else {
550                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
551                        }
552                    },
553                    _ = &mut end_rx => {
554                        info!("Received close signal during state_sync");
555                        return Ok(());
556                    }
557                }
558            }
559        }.await;
560
561        // This cleanup code now runs regardless of how the function exits (error or channel close)
562        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
563        //Ignore error
564        let _ = self
565            .deltas_client
566            .unsubscribe(subscription_id)
567            .await
568            .map_err(|err| {
569                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
570            });
571
572        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
573        // whether the end_rx was consumed (close signal received) or not
574        match result {
575            Ok(()) => Ok(()), // Success, likely due to close signal
576            Err(e) => {
577                // The error came from the inner async block. Since the async block
578                // can receive close signals (which would return Ok), any error means
579                // the close signal was NOT received, so we can return the end_rx for retry
580                Err((e, Some(end_rx)))
581            }
582        }
583    }
584
585    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
586        if let Some(block) = self.last_synced_block.as_ref() {
587            return incoming.parent_hash == block.hash;
588        }
589        false
590    }
591    fn filter_deltas(&self, deltas: &mut BlockChanges) {
592        deltas.filter_by_component(|id| {
593            self.component_tracker
594                .components
595                .contains_key(id)
596        });
597        deltas.filter_by_contract(|id| {
598            self.component_tracker
599                .contracts
600                .contains(id)
601        });
602    }
603}
604
605#[async_trait]
606impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
607where
608    R: RPCClient + Clone + Send + Sync + 'static,
609    D: DeltasClient + Clone + Send + Sync + 'static,
610{
611    async fn initialize(&mut self) -> SyncResult<()> {
612        info!("Retrieving relevant protocol components");
613        self.component_tracker
614            .initialise_components()
615            .await?;
616        info!(
617            n_components = self.component_tracker.components.len(),
618            n_contracts = self.component_tracker.contracts.len(),
619            "Finished retrieving components",
620        );
621
622        Ok(())
623    }
624
625    async fn start(
626        mut self,
627    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
628        let (mut tx, rx) = channel(15);
629        let (end_tx, end_rx) = oneshot::channel::<()>();
630
631        let jh = tokio::spawn(async move {
632            let mut retry_count = 0;
633            let mut current_end_rx = end_rx;
634            let mut final_error = None;
635
636            while retry_count < self.max_retries {
637                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
638
639                let res = self
640                    .state_sync(&mut tx, current_end_rx)
641                    .await;
642                match res {
643                    Ok(()) => {
644                        info!(
645                            extractor_id=%&self.extractor_id,
646                            retry_count,
647                            "State synchronization exited cleanly"
648                        );
649                        return;
650                    }
651                    Err((e, maybe_end_rx)) => {
652                        warn!(
653                            extractor_id=%&self.extractor_id,
654                            retry_count,
655                            error=%e,
656                            "State synchronization errored!"
657                        );
658
659                        // If we have the end_rx back, we can retry
660                        if let Some(recovered_end_rx) = maybe_end_rx {
661                            current_end_rx = recovered_end_rx;
662
663                            if let SynchronizerError::ConnectionClosed = e {
664                                // break synchronization loop if websocket client is dead
665                                error!(
666                                    "Websocket connection closed. State synchronization exiting."
667                                );
668                                let _ = tx.send(Err(e)).await;
669                                return;
670                            } else {
671                                // Store error in case this is our last retry
672                                final_error = Some(e);
673                            }
674                        } else {
675                            // Close signal was received, exit cleanly
676                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
677                            return;
678                        }
679                    }
680                }
681                sleep(self.retry_cooldown).await;
682                retry_count += 1;
683            }
684            if let Some(e) = final_error {
685                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
686                let _ = tx.send(Err(e)).await;
687            }
688        });
689
690        let handle = SynchronizerTaskHandle::new(jh, end_tx);
691        (handle, rx)
692    }
693}
694
695#[cfg(test)]
696mod test {
697    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
698    //!
699    //! ## Test Coverage Strategy:
700    //!
701    //! ### Shutdown & Close Signal Tests:
702    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
703    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
704    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
705    //!
706    //! ### Cleanup & Error Handling Tests:
707    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
708    //!
709    //! ### Coverage Summary:
710    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
711    //! ✓ Close signal before first deltas   ✓ Close signal during processing
712    //! ✓ Processing errors                  ✓ Channel closure
713    //! ✓ Public API close operations        ✓ Normal completion
714
715    use std::{collections::HashSet, sync::Arc};
716
717    use tycho_common::dto::{
718        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
719        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
720        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
721        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
722        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
723        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
724    };
725    use uuid::Uuid;
726
727    use super::*;
728    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
729
730    // Required for mock client to implement clone
731    struct ArcRPCClient<T>(Arc<T>);
732
733    // Default derive(Clone) does require T to be Clone as well.
734    impl<T> Clone for ArcRPCClient<T> {
735        fn clone(&self) -> Self {
736            ArcRPCClient(self.0.clone())
737        }
738    }
739
740    #[async_trait]
741    impl<T> RPCClient for ArcRPCClient<T>
742    where
743        T: RPCClient + Sync + Send + 'static,
744    {
745        async fn get_tokens(
746            &self,
747            request: &TokensRequestBody,
748        ) -> Result<TokensRequestResponse, RPCError> {
749            self.0.get_tokens(request).await
750        }
751
752        async fn get_contract_state(
753            &self,
754            request: &StateRequestBody,
755        ) -> Result<StateRequestResponse, RPCError> {
756            self.0.get_contract_state(request).await
757        }
758
759        async fn get_protocol_components(
760            &self,
761            request: &ProtocolComponentsRequestBody,
762        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
763            self.0
764                .get_protocol_components(request)
765                .await
766        }
767
768        async fn get_protocol_states(
769            &self,
770            request: &ProtocolStateRequestBody,
771        ) -> Result<ProtocolStateRequestResponse, RPCError> {
772            self.0
773                .get_protocol_states(request)
774                .await
775        }
776
777        async fn get_protocol_systems(
778            &self,
779            request: &ProtocolSystemsRequestBody,
780        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
781            self.0
782                .get_protocol_systems(request)
783                .await
784        }
785
786        async fn get_component_tvl(
787            &self,
788            request: &ComponentTvlRequestBody,
789        ) -> Result<ComponentTvlRequestResponse, RPCError> {
790            self.0.get_component_tvl(request).await
791        }
792
793        async fn get_traced_entry_points(
794            &self,
795            request: &TracedEntryPointRequestBody,
796        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
797            self.0
798                .get_traced_entry_points(request)
799                .await
800        }
801
802        async fn get_snapshots<'a>(
803            &self,
804            request: &SnapshotParameters<'a>,
805            chunk_size: Option<usize>,
806            concurrency: usize,
807        ) -> Result<Snapshot, RPCError> {
808            self.0
809                .get_snapshots(request, chunk_size, concurrency)
810                .await
811        }
812
813        fn compression(&self) -> bool {
814            self.0.compression()
815        }
816    }
817
818    // Required for mock client to implement clone
819    struct ArcDeltasClient<T>(Arc<T>);
820
821    // Default derive(Clone) does require T to be Clone as well.
822    impl<T> Clone for ArcDeltasClient<T> {
823        fn clone(&self) -> Self {
824            ArcDeltasClient(self.0.clone())
825        }
826    }
827
828    #[async_trait]
829    impl<T> DeltasClient for ArcDeltasClient<T>
830    where
831        T: DeltasClient + Sync + Send + 'static,
832    {
833        async fn subscribe(
834            &self,
835            extractor_id: ExtractorIdentity,
836            options: SubscriptionOptions,
837        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
838            self.0
839                .subscribe(extractor_id, options)
840                .await
841        }
842
843        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
844            self.0
845                .unsubscribe(subscription_id)
846                .await
847        }
848
849        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
850            self.0.connect().await
851        }
852
853        async fn close(&self) -> Result<(), DeltasError> {
854            self.0.close().await
855        }
856    }
857
858    fn with_mocked_clients(
859        native: bool,
860        include_tvl: bool,
861        rpc_client: Option<MockRPCClient>,
862        deltas_client: Option<MockDeltasClient>,
863    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
864    {
865        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
866        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
867
868        ProtocolStateSynchronizer::new(
869            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
870            native,
871            ComponentFilter::with_tvl_range(50.0, 50.0),
872            1,
873            Duration::from_secs(0),
874            true,
875            include_tvl,
876            true, // Does not matter as we mock the client that never compresses
877            rpc_client,
878            deltas_client,
879            10_u64,
880        )
881    }
882
883    fn state_snapshot_native() -> ProtocolStateRequestResponse {
884        ProtocolStateRequestResponse {
885            states: vec![ResponseProtocolState {
886                component_id: "Component1".to_string(),
887                ..Default::default()
888            }],
889            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
890        }
891    }
892
893    fn make_mock_client() -> MockRPCClient {
894        let mut m = MockRPCClient::new();
895        m.expect_compression()
896            .return_const(false);
897        m
898    }
899
900    #[test_log::test(tokio::test)]
901    async fn test_get_snapshots_native() {
902        let header = BlockHeader::default();
903        let mut rpc = make_mock_client();
904        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
905
906        let component_clone = component.clone();
907        rpc.expect_get_snapshots()
908            .returning(move |_request, _chunk_size, _concurrency| {
909                Ok(Snapshot {
910                    states: state_snapshot_native()
911                        .states
912                        .into_iter()
913                        .map(|state| {
914                            (
915                                state.component_id.clone(),
916                                ComponentWithState {
917                                    state,
918                                    component: component_clone.clone(),
919                                    entrypoints: vec![],
920                                    component_tvl: None,
921                                },
922                            )
923                        })
924                        .collect(),
925                    vm_storage: HashMap::new(),
926                })
927            });
928
929        rpc.expect_get_traced_entry_points()
930            .returning(|_| {
931                Ok(TracedEntryPointRequestResponse {
932                    traced_entry_points: HashMap::new(),
933                    pagination: PaginationResponse::new(0, 20, 0),
934                })
935            });
936
937        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
938        state_sync
939            .component_tracker
940            .components
941            .insert("Component1".to_string(), component.clone());
942        let components_arg = ["Component1".to_string()];
943        let exp = StateSyncMessage {
944            header: header.clone(),
945            snapshots: Snapshot {
946                states: state_snapshot_native()
947                    .states
948                    .into_iter()
949                    .map(|state| {
950                        (
951                            state.component_id.clone(),
952                            ComponentWithState {
953                                state,
954                                component: component.clone(),
955                                entrypoints: vec![],
956                                component_tvl: None,
957                            },
958                        )
959                    })
960                    .collect(),
961                vm_storage: HashMap::new(),
962            },
963            deltas: None,
964            removed_components: Default::default(),
965        };
966
967        let snap = state_sync
968            .get_snapshots(header, Some(&components_arg))
969            .await
970            .expect("Retrieving snapshot failed");
971
972        assert_eq!(snap, exp);
973    }
974
975    #[test_log::test(tokio::test)]
976    async fn test_get_snapshots_native_with_tvl() {
977        let header = BlockHeader::default();
978        let mut rpc = make_mock_client();
979        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
980
981        let component_clone = component.clone();
982        rpc.expect_get_snapshots()
983            .returning(move |_request, _chunk_size, _concurrency| {
984                Ok(Snapshot {
985                    states: state_snapshot_native()
986                        .states
987                        .into_iter()
988                        .map(|state| {
989                            (
990                                state.component_id.clone(),
991                                ComponentWithState {
992                                    state,
993                                    component: component_clone.clone(),
994                                    component_tvl: Some(100.0),
995                                    entrypoints: vec![],
996                                },
997                            )
998                        })
999                        .collect(),
1000                    vm_storage: HashMap::new(),
1001                })
1002            });
1003
1004        rpc.expect_get_traced_entry_points()
1005            .returning(|_| {
1006                Ok(TracedEntryPointRequestResponse {
1007                    traced_entry_points: HashMap::new(),
1008                    pagination: PaginationResponse::new(0, 20, 0),
1009                })
1010            });
1011
1012        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1013        state_sync
1014            .component_tracker
1015            .components
1016            .insert("Component1".to_string(), component.clone());
1017        let components_arg = ["Component1".to_string()];
1018        let exp = StateSyncMessage {
1019            header: header.clone(),
1020            snapshots: Snapshot {
1021                states: state_snapshot_native()
1022                    .states
1023                    .into_iter()
1024                    .map(|state| {
1025                        (
1026                            state.component_id.clone(),
1027                            ComponentWithState {
1028                                state,
1029                                component: component.clone(),
1030                                component_tvl: Some(100.0),
1031                                entrypoints: vec![],
1032                            },
1033                        )
1034                    })
1035                    .collect(),
1036                vm_storage: HashMap::new(),
1037            },
1038            deltas: None,
1039            removed_components: Default::default(),
1040        };
1041
1042        let snap = state_sync
1043            .get_snapshots(header, Some(&components_arg))
1044            .await
1045            .expect("Retrieving snapshot failed");
1046
1047        assert_eq!(snap, exp);
1048    }
1049
1050    fn state_snapshot_vm() -> StateRequestResponse {
1051        StateRequestResponse {
1052            accounts: vec![
1053                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1054                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1055            ],
1056            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1057        }
1058    }
1059
1060    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1061        TracedEntryPointRequestResponse {
1062            traced_entry_points: HashMap::from([(
1063                "Component1".to_string(),
1064                vec![(
1065                    EntryPointWithTracingParams {
1066                        entry_point: EntryPoint {
1067                            external_id: "entrypoint_a".to_string(),
1068                            target: Bytes::from("0x0badc0ffee"),
1069                            signature: "sig()".to_string(),
1070                        },
1071                        params: TracingParams::RPCTracer(RPCTracerParams {
1072                            caller: Some(Bytes::from("0x0badc0ffee")),
1073                            calldata: Bytes::from("0x0badc0ffee"),
1074                            state_overrides: None,
1075                            prune_addresses: None,
1076                        }),
1077                    },
1078                    TracingResult {
1079                        retriggers: HashSet::from([(
1080                            Bytes::from("0x0badc0ffee"),
1081                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1082                        )]),
1083                        accessed_slots: HashMap::from([(
1084                            Bytes::from("0x0badc0ffee"),
1085                            HashSet::from([Bytes::from("0xbadbeef0")]),
1086                        )]),
1087                    },
1088                )],
1089            )]),
1090            pagination: PaginationResponse::new(0, 20, 0),
1091        }
1092    }
1093
1094    #[test_log::test(tokio::test)]
1095    async fn test_get_snapshots_vm() {
1096        let header = BlockHeader::default();
1097        let mut rpc = make_mock_client();
1098
1099        let traced_ep_response = traced_entry_point_response();
1100        rpc.expect_get_snapshots()
1101            .returning(move |_request, _chunk_size, _concurrency| {
1102                let vm_storage_accounts = state_snapshot_vm();
1103                Ok(Snapshot {
1104                    states: [(
1105                        "Component1".to_string(),
1106                        ComponentWithState {
1107                            state: ResponseProtocolState {
1108                                component_id: "Component1".to_string(),
1109                                ..Default::default()
1110                            },
1111                            component: ProtocolComponent {
1112                                id: "Component1".to_string(),
1113                                contract_ids: vec![
1114                                    Bytes::from("0x0badc0ffee"),
1115                                    Bytes::from("0xbabe42"),
1116                                ],
1117                                ..Default::default()
1118                            },
1119                            component_tvl: None,
1120                            entrypoints: traced_ep_response
1121                                .traced_entry_points
1122                                .get("Component1")
1123                                .cloned()
1124                                .unwrap_or_default(),
1125                        },
1126                    )]
1127                    .into_iter()
1128                    .collect(),
1129                    vm_storage: vm_storage_accounts
1130                        .accounts
1131                        .into_iter()
1132                        .map(|state| (state.address.clone(), state))
1133                        .collect(),
1134                })
1135            });
1136
1137        rpc.expect_get_traced_entry_points()
1138            .returning(|_| Ok(traced_entry_point_response()));
1139
1140        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1141        let component = ProtocolComponent {
1142            id: "Component1".to_string(),
1143            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1144            ..Default::default()
1145        };
1146        state_sync
1147            .component_tracker
1148            .components
1149            .insert("Component1".to_string(), component.clone());
1150        let components_arg = ["Component1".to_string()];
1151        let exp = StateSyncMessage {
1152            header: header.clone(),
1153            snapshots: Snapshot {
1154                states: [(
1155                    component.id.clone(),
1156                    ComponentWithState {
1157                        state: ResponseProtocolState {
1158                            component_id: "Component1".to_string(),
1159                            ..Default::default()
1160                        },
1161                        component: component.clone(),
1162                        component_tvl: None,
1163                        entrypoints: vec![(
1164                            EntryPointWithTracingParams {
1165                                entry_point: EntryPoint {
1166                                    external_id: "entrypoint_a".to_string(),
1167                                    target: Bytes::from("0x0badc0ffee"),
1168                                    signature: "sig()".to_string(),
1169                                },
1170                                params: TracingParams::RPCTracer(RPCTracerParams {
1171                                    caller: Some(Bytes::from("0x0badc0ffee")),
1172                                    calldata: Bytes::from("0x0badc0ffee"),
1173                                    state_overrides: None,
1174                                    prune_addresses: None,
1175                                }),
1176                            },
1177                            TracingResult {
1178                                retriggers: HashSet::from([(
1179                                    Bytes::from("0x0badc0ffee"),
1180                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1181                                )]),
1182                                accessed_slots: HashMap::from([(
1183                                    Bytes::from("0x0badc0ffee"),
1184                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1185                                )]),
1186                            },
1187                        )],
1188                    },
1189                )]
1190                .into_iter()
1191                .collect(),
1192                vm_storage: state_snapshot_vm()
1193                    .accounts
1194                    .into_iter()
1195                    .map(|state| (state.address.clone(), state))
1196                    .collect(),
1197            },
1198            deltas: None,
1199            removed_components: Default::default(),
1200        };
1201
1202        let snap = state_sync
1203            .get_snapshots(header, Some(&components_arg))
1204            .await
1205            .expect("Retrieving snapshot failed");
1206
1207        assert_eq!(snap, exp);
1208    }
1209
1210    #[test_log::test(tokio::test)]
1211    async fn test_get_snapshots_vm_with_tvl() {
1212        let header = BlockHeader::default();
1213        let mut rpc = make_mock_client();
1214        let component = ProtocolComponent {
1215            id: "Component1".to_string(),
1216            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1217            ..Default::default()
1218        };
1219
1220        let component_clone = component.clone();
1221        rpc.expect_get_snapshots()
1222            .returning(move |_request, _chunk_size, _concurrency| {
1223                let vm_storage_accounts = state_snapshot_vm();
1224                Ok(Snapshot {
1225                    states: [(
1226                        "Component1".to_string(),
1227                        ComponentWithState {
1228                            state: ResponseProtocolState {
1229                                component_id: "Component1".to_string(),
1230                                ..Default::default()
1231                            },
1232                            component: component_clone.clone(),
1233                            component_tvl: Some(100.0),
1234                            entrypoints: vec![],
1235                        },
1236                    )]
1237                    .into_iter()
1238                    .collect(),
1239                    vm_storage: vm_storage_accounts
1240                        .accounts
1241                        .into_iter()
1242                        .map(|state| (state.address.clone(), state))
1243                        .collect(),
1244                })
1245            });
1246
1247        rpc.expect_get_traced_entry_points()
1248            .returning(|_| {
1249                Ok(TracedEntryPointRequestResponse {
1250                    traced_entry_points: HashMap::new(),
1251                    pagination: PaginationResponse::new(0, 20, 0),
1252                })
1253            });
1254
1255        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1256        state_sync
1257            .component_tracker
1258            .components
1259            .insert("Component1".to_string(), component.clone());
1260        let components_arg = ["Component1".to_string()];
1261        let exp = StateSyncMessage {
1262            header: header.clone(),
1263            snapshots: Snapshot {
1264                states: [(
1265                    component.id.clone(),
1266                    ComponentWithState {
1267                        state: ResponseProtocolState {
1268                            component_id: "Component1".to_string(),
1269                            ..Default::default()
1270                        },
1271                        component: component.clone(),
1272                        component_tvl: Some(100.0),
1273                        entrypoints: vec![],
1274                    },
1275                )]
1276                .into_iter()
1277                .collect(),
1278                vm_storage: state_snapshot_vm()
1279                    .accounts
1280                    .into_iter()
1281                    .map(|state| (state.address.clone(), state))
1282                    .collect(),
1283            },
1284            deltas: None,
1285            removed_components: Default::default(),
1286        };
1287
1288        let snap = state_sync
1289            .get_snapshots(header, Some(&components_arg))
1290            .await
1291            .expect("Retrieving snapshot failed");
1292
1293        assert_eq!(snap, exp);
1294    }
1295
1296    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1297        let mut rpc_client = make_mock_client();
1298        // Mocks for the start_tracking call, these need to come first because they are more
1299        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1300        rpc_client
1301            .expect_get_protocol_components()
1302            .with(mockall::predicate::function(
1303                move |request_params: &ProtocolComponentsRequestBody| {
1304                    if let Some(ids) = request_params.component_ids.as_ref() {
1305                        ids.contains(&"Component3".to_string())
1306                    } else {
1307                        false
1308                    }
1309                },
1310            ))
1311            .returning(|_| {
1312                // return Component3
1313                Ok(ProtocolComponentRequestResponse {
1314                    protocol_components: vec![
1315                        // this component shall have a tvl update above threshold
1316                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1317                    ],
1318                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1319                })
1320            });
1321        // Mock get_snapshots for Component3
1322        rpc_client
1323            .expect_get_snapshots()
1324            .withf(
1325                |request: &SnapshotParameters,
1326                 _chunk_size: &Option<usize>,
1327                 _concurrency: &usize| {
1328                    request
1329                        .components
1330                        .contains_key("Component3")
1331                },
1332            )
1333            .returning(|_request, _chunk_size, _concurrency| {
1334                Ok(Snapshot {
1335                    states: [(
1336                        "Component3".to_string(),
1337                        ComponentWithState {
1338                            state: ResponseProtocolState {
1339                                component_id: "Component3".to_string(),
1340                                ..Default::default()
1341                            },
1342                            component: ProtocolComponent {
1343                                id: "Component3".to_string(),
1344                                ..Default::default()
1345                            },
1346                            component_tvl: Some(1000.0),
1347                            entrypoints: vec![],
1348                        },
1349                    )]
1350                    .into_iter()
1351                    .collect(),
1352                    vm_storage: HashMap::new(),
1353                })
1354            });
1355
1356        // mock calls for the initial state snapshots
1357        rpc_client
1358            .expect_get_protocol_components()
1359            .returning(|_| {
1360                // Initial sync of components
1361                Ok(ProtocolComponentRequestResponse {
1362                    protocol_components: vec![
1363                        // this component shall have a tvl update above threshold
1364                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1365                        // this component shall have a tvl update below threshold.
1366                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1367                        // a third component will have a tvl update above threshold
1368                    ],
1369                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1370                })
1371            });
1372
1373        rpc_client
1374            .expect_get_snapshots()
1375            .returning(|_request, _chunk_size, _concurrency| {
1376                Ok(Snapshot {
1377                    states: [
1378                        (
1379                            "Component1".to_string(),
1380                            ComponentWithState {
1381                                state: ResponseProtocolState {
1382                                    component_id: "Component1".to_string(),
1383                                    ..Default::default()
1384                                },
1385                                component: ProtocolComponent {
1386                                    id: "Component1".to_string(),
1387                                    ..Default::default()
1388                                },
1389                                component_tvl: Some(100.0),
1390                                entrypoints: vec![],
1391                            },
1392                        ),
1393                        (
1394                            "Component2".to_string(),
1395                            ComponentWithState {
1396                                state: ResponseProtocolState {
1397                                    component_id: "Component2".to_string(),
1398                                    ..Default::default()
1399                                },
1400                                component: ProtocolComponent {
1401                                    id: "Component2".to_string(),
1402                                    ..Default::default()
1403                                },
1404                                component_tvl: Some(0.0),
1405                                entrypoints: vec![],
1406                            },
1407                        ),
1408                    ]
1409                    .into_iter()
1410                    .collect(),
1411                    vm_storage: HashMap::new(),
1412                })
1413            });
1414
1415        // Mock get_traced_entry_points for Ethereum chain
1416        rpc_client
1417            .expect_get_traced_entry_points()
1418            .returning(|_| {
1419                Ok(TracedEntryPointRequestResponse {
1420                    traced_entry_points: HashMap::new(),
1421                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1422                })
1423            });
1424
1425        // Mock deltas client and messages
1426        let mut deltas_client = MockDeltasClient::new();
1427        let (tx, rx) = channel(1);
1428        deltas_client
1429            .expect_subscribe()
1430            .return_once(move |_, _| {
1431                // Return subscriber id and a channel
1432                Ok((Uuid::default(), rx))
1433            });
1434
1435        // Expect unsubscribe call during cleanup
1436        deltas_client
1437            .expect_unsubscribe()
1438            .return_once(|_| Ok(()));
1439
1440        (rpc_client, deltas_client, tx)
1441    }
1442
1443    /// Test strategy
1444    ///
1445    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1446    /// - send 2 dummy messages, containing only blocks
1447    /// - third message contains a new component with some significant tvl, one initial component
1448    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1449    #[test_log::test(tokio::test)]
1450    async fn test_state_sync() {
1451        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1452        let deltas = [
1453            BlockChanges {
1454                extractor: "uniswap-v2".to_string(),
1455                chain: Chain::Ethereum,
1456                block: Block {
1457                    number: 1,
1458                    hash: Bytes::from("0x01"),
1459                    parent_hash: Bytes::from("0x00"),
1460                    chain: Chain::Ethereum,
1461                    ts: Default::default(),
1462                },
1463                revert: false,
1464                dci_update: DCIUpdate {
1465                    new_entrypoints: HashMap::from([(
1466                        "Component1".to_string(),
1467                        HashSet::from([EntryPoint {
1468                            external_id: "entrypoint_a".to_string(),
1469                            target: Bytes::from("0x0badc0ffee"),
1470                            signature: "sig()".to_string(),
1471                        }]),
1472                    )]),
1473                    new_entrypoint_params: HashMap::from([(
1474                        "entrypoint_a".to_string(),
1475                        HashSet::from([(
1476                            TracingParams::RPCTracer(RPCTracerParams {
1477                                caller: Some(Bytes::from("0x0badc0ffee")),
1478                                calldata: Bytes::from("0x0badc0ffee"),
1479                                state_overrides: None,
1480                                prune_addresses: None,
1481                            }),
1482                            "Component1".to_string(),
1483                        )]),
1484                    )]),
1485                    trace_results: HashMap::from([(
1486                        "entrypoint_a".to_string(),
1487                        TracingResult {
1488                            retriggers: HashSet::from([(
1489                                Bytes::from("0x0badc0ffee"),
1490                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1491                            )]),
1492                            accessed_slots: HashMap::from([(
1493                                Bytes::from("0x0badc0ffee"),
1494                                HashSet::from([Bytes::from("0xbadbeef0")]),
1495                            )]),
1496                        },
1497                    )]),
1498                },
1499                ..Default::default()
1500            },
1501            BlockChanges {
1502                extractor: "uniswap-v2".to_string(),
1503                chain: Chain::Ethereum,
1504                block: Block {
1505                    number: 2,
1506                    hash: Bytes::from("0x02"),
1507                    parent_hash: Bytes::from("0x01"),
1508                    chain: Chain::Ethereum,
1509                    ts: Default::default(),
1510                },
1511                revert: false,
1512                component_tvl: [
1513                    ("Component1".to_string(), 100.0),
1514                    ("Component2".to_string(), 0.0),
1515                    ("Component3".to_string(), 1000.0),
1516                ]
1517                .into_iter()
1518                .collect(),
1519                ..Default::default()
1520            },
1521        ];
1522        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1523        state_sync
1524            .initialize()
1525            .await
1526            .expect("Init failed");
1527
1528        // Test starts here
1529        let (handle, mut rx) = state_sync.start().await;
1530        let (jh, close_tx) = handle.split();
1531        tx.send(deltas[0].clone())
1532            .await
1533            .expect("deltas channel msg 0 closed!");
1534        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1535            .await
1536            .expect("waiting for first state msg timed out!")
1537            .expect("state sync block sender closed!");
1538        tx.send(deltas[1].clone())
1539            .await
1540            .expect("deltas channel msg 1 closed!");
1541        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1542            .await
1543            .expect("waiting for second state msg timed out!")
1544            .expect("state sync block sender closed!");
1545        let _ = close_tx.send(());
1546        jh.await
1547            .expect("state sync task panicked!");
1548
1549        // assertions
1550        let exp1 = StateSyncMessage {
1551            header: BlockHeader {
1552                number: 1,
1553                hash: Bytes::from("0x01"),
1554                parent_hash: Bytes::from("0x00"),
1555                revert: false,
1556                ..Default::default()
1557            },
1558            snapshots: Snapshot {
1559                states: [
1560                    (
1561                        "Component1".to_string(),
1562                        ComponentWithState {
1563                            state: ResponseProtocolState {
1564                                component_id: "Component1".to_string(),
1565                                ..Default::default()
1566                            },
1567                            component: ProtocolComponent {
1568                                id: "Component1".to_string(),
1569                                ..Default::default()
1570                            },
1571                            component_tvl: Some(100.0),
1572                            entrypoints: vec![],
1573                        },
1574                    ),
1575                    (
1576                        "Component2".to_string(),
1577                        ComponentWithState {
1578                            state: ResponseProtocolState {
1579                                component_id: "Component2".to_string(),
1580                                ..Default::default()
1581                            },
1582                            component: ProtocolComponent {
1583                                id: "Component2".to_string(),
1584                                ..Default::default()
1585                            },
1586                            component_tvl: Some(0.0),
1587                            entrypoints: vec![],
1588                        },
1589                    ),
1590                ]
1591                .into_iter()
1592                .collect(),
1593                vm_storage: HashMap::new(),
1594            },
1595            deltas: Some(deltas[0].clone()),
1596            removed_components: Default::default(),
1597        };
1598
1599        let exp2 = StateSyncMessage {
1600            header: BlockHeader {
1601                number: 2,
1602                hash: Bytes::from("0x02"),
1603                parent_hash: Bytes::from("0x01"),
1604                revert: false,
1605                ..Default::default()
1606            },
1607            snapshots: Snapshot {
1608                states: [
1609                    // This is the new component we queried once it passed the tvl threshold.
1610                    (
1611                        "Component3".to_string(),
1612                        ComponentWithState {
1613                            state: ResponseProtocolState {
1614                                component_id: "Component3".to_string(),
1615                                ..Default::default()
1616                            },
1617                            component: ProtocolComponent {
1618                                id: "Component3".to_string(),
1619                                ..Default::default()
1620                            },
1621                            component_tvl: Some(1000.0),
1622                            entrypoints: vec![],
1623                        },
1624                    ),
1625                ]
1626                .into_iter()
1627                .collect(),
1628                vm_storage: HashMap::new(),
1629            },
1630            // Our deltas are empty and since merge methods are
1631            // tested in tycho-common we don't have much to do here.
1632            deltas: Some(BlockChanges {
1633                extractor: "uniswap-v2".to_string(),
1634                chain: Chain::Ethereum,
1635                block: Block {
1636                    number: 2,
1637                    hash: Bytes::from("0x02"),
1638                    parent_hash: Bytes::from("0x01"),
1639                    chain: Chain::Ethereum,
1640                    ts: Default::default(),
1641                },
1642                revert: false,
1643                component_tvl: [
1644                    // "Component2" should not show here.
1645                    ("Component1".to_string(), 100.0),
1646                    ("Component3".to_string(), 1000.0),
1647                ]
1648                .into_iter()
1649                .collect(),
1650                ..Default::default()
1651            }),
1652            // "Component2" was removed, because its tvl changed to 0.
1653            removed_components: [(
1654                "Component2".to_string(),
1655                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1656            )]
1657            .into_iter()
1658            .collect(),
1659        };
1660        assert_eq!(first_msg.unwrap(), exp1);
1661        assert_eq!(second_msg.unwrap(), exp2);
1662    }
1663
1664    #[test_log::test(tokio::test)]
1665    async fn test_state_sync_with_tvl_range() {
1666        // Define the range for testing
1667        let remove_tvl_threshold = 5.0;
1668        let add_tvl_threshold = 7.0;
1669
1670        let mut rpc_client = make_mock_client();
1671        let mut deltas_client = MockDeltasClient::new();
1672
1673        rpc_client
1674            .expect_get_protocol_components()
1675            .with(mockall::predicate::function(
1676                move |request_params: &ProtocolComponentsRequestBody| {
1677                    if let Some(ids) = request_params.component_ids.as_ref() {
1678                        ids.contains(&"Component3".to_string())
1679                    } else {
1680                        false
1681                    }
1682                },
1683            ))
1684            .returning(|_| {
1685                Ok(ProtocolComponentRequestResponse {
1686                    protocol_components: vec![ProtocolComponent {
1687                        id: "Component3".to_string(),
1688                        ..Default::default()
1689                    }],
1690                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1691                })
1692            });
1693        // Mock get_snapshots for Component3
1694        rpc_client
1695            .expect_get_snapshots()
1696            .withf(
1697                |request: &SnapshotParameters,
1698                 _chunk_size: &Option<usize>,
1699                 _concurrency: &usize| {
1700                    request
1701                        .components
1702                        .contains_key("Component3")
1703                },
1704            )
1705            .returning(|_request, _chunk_size, _concurrency| {
1706                Ok(Snapshot {
1707                    states: [(
1708                        "Component3".to_string(),
1709                        ComponentWithState {
1710                            state: ResponseProtocolState {
1711                                component_id: "Component3".to_string(),
1712                                ..Default::default()
1713                            },
1714                            component: ProtocolComponent {
1715                                id: "Component3".to_string(),
1716                                ..Default::default()
1717                            },
1718                            component_tvl: Some(10.0),
1719                            entrypoints: vec![],
1720                        },
1721                    )]
1722                    .into_iter()
1723                    .collect(),
1724                    vm_storage: HashMap::new(),
1725                })
1726            });
1727
1728        // Mock for the initial snapshot retrieval
1729        rpc_client
1730            .expect_get_protocol_components()
1731            .returning(|_| {
1732                Ok(ProtocolComponentRequestResponse {
1733                    protocol_components: vec![
1734                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1735                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1736                    ],
1737                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1738                })
1739            });
1740
1741        // Mock get_snapshots for initial snapshot
1742        rpc_client
1743            .expect_get_snapshots()
1744            .returning(|_request, _chunk_size, _concurrency| {
1745                Ok(Snapshot {
1746                    states: [
1747                        (
1748                            "Component1".to_string(),
1749                            ComponentWithState {
1750                                state: ResponseProtocolState {
1751                                    component_id: "Component1".to_string(),
1752                                    ..Default::default()
1753                                },
1754                                component: ProtocolComponent {
1755                                    id: "Component1".to_string(),
1756                                    ..Default::default()
1757                                },
1758                                component_tvl: Some(6.0),
1759                                entrypoints: vec![],
1760                            },
1761                        ),
1762                        (
1763                            "Component2".to_string(),
1764                            ComponentWithState {
1765                                state: ResponseProtocolState {
1766                                    component_id: "Component2".to_string(),
1767                                    ..Default::default()
1768                                },
1769                                component: ProtocolComponent {
1770                                    id: "Component2".to_string(),
1771                                    ..Default::default()
1772                                },
1773                                component_tvl: Some(2.0),
1774                                entrypoints: vec![],
1775                            },
1776                        ),
1777                    ]
1778                    .into_iter()
1779                    .collect(),
1780                    vm_storage: HashMap::new(),
1781                })
1782            });
1783
1784        // Mock get_traced_entry_points for Ethereum chain
1785        rpc_client
1786            .expect_get_traced_entry_points()
1787            .returning(|_| {
1788                Ok(TracedEntryPointRequestResponse {
1789                    traced_entry_points: HashMap::new(),
1790                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1791                })
1792            });
1793
1794        let (tx, rx) = channel(1);
1795        deltas_client
1796            .expect_subscribe()
1797            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1798
1799        // Expect unsubscribe call during cleanup
1800        deltas_client
1801            .expect_unsubscribe()
1802            .return_once(|_| Ok(()));
1803
1804        let mut state_sync = ProtocolStateSynchronizer::new(
1805            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1806            true,
1807            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1808            1,
1809            Duration::from_secs(0),
1810            true,
1811            true,
1812            true,
1813            ArcRPCClient(Arc::new(rpc_client)),
1814            ArcDeltasClient(Arc::new(deltas_client)),
1815            10_u64,
1816        );
1817        state_sync
1818            .initialize()
1819            .await
1820            .expect("Init failed");
1821
1822        // Simulate the incoming BlockChanges
1823        let deltas = [
1824            BlockChanges {
1825                extractor: "uniswap-v2".to_string(),
1826                chain: Chain::Ethereum,
1827                block: Block {
1828                    number: 1,
1829                    hash: Bytes::from("0x01"),
1830                    parent_hash: Bytes::from("0x00"),
1831                    chain: Chain::Ethereum,
1832                    ts: Default::default(),
1833                },
1834                revert: false,
1835                ..Default::default()
1836            },
1837            BlockChanges {
1838                extractor: "uniswap-v2".to_string(),
1839                chain: Chain::Ethereum,
1840                block: Block {
1841                    number: 2,
1842                    hash: Bytes::from("0x02"),
1843                    parent_hash: Bytes::from("0x01"),
1844                    chain: Chain::Ethereum,
1845                    ts: Default::default(),
1846                },
1847                revert: false,
1848                component_tvl: [
1849                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1850                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1851                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1852                ]
1853                .into_iter()
1854                .collect(),
1855                ..Default::default()
1856            },
1857        ];
1858
1859        let (handle, mut rx) = state_sync.start().await;
1860        let (jh, close_tx) = handle.split();
1861
1862        // Simulate sending delta messages
1863        tx.send(deltas[0].clone())
1864            .await
1865            .expect("deltas channel msg 0 closed!");
1866
1867        // Expecting to receive the initial state message
1868        let _ = timeout(Duration::from_millis(100), rx.recv())
1869            .await
1870            .expect("waiting for first state msg timed out!")
1871            .expect("state sync block sender closed!");
1872
1873        // Send the third message, which should trigger TVL-based changes
1874        tx.send(deltas[1].clone())
1875            .await
1876            .expect("deltas channel msg 1 closed!");
1877        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1878            .await
1879            .expect("waiting for second state msg timed out!")
1880            .expect("state sync block sender closed!")
1881            .expect("no error");
1882
1883        let _ = close_tx.send(());
1884        jh.await
1885            .expect("state sync task panicked!");
1886
1887        let expected_second_msg = StateSyncMessage {
1888            header: BlockHeader {
1889                number: 2,
1890                hash: Bytes::from("0x02"),
1891                parent_hash: Bytes::from("0x01"),
1892                revert: false,
1893                ..Default::default()
1894            },
1895            snapshots: Snapshot {
1896                states: [(
1897                    "Component3".to_string(),
1898                    ComponentWithState {
1899                        state: ResponseProtocolState {
1900                            component_id: "Component3".to_string(),
1901                            ..Default::default()
1902                        },
1903                        component: ProtocolComponent {
1904                            id: "Component3".to_string(),
1905                            ..Default::default()
1906                        },
1907                        component_tvl: Some(10.0),
1908                        entrypoints: vec![], // TODO: add entrypoints?
1909                    },
1910                )]
1911                .into_iter()
1912                .collect(),
1913                vm_storage: HashMap::new(),
1914            },
1915            deltas: Some(BlockChanges {
1916                extractor: "uniswap-v2".to_string(),
1917                chain: Chain::Ethereum,
1918                block: Block {
1919                    number: 2,
1920                    hash: Bytes::from("0x02"),
1921                    parent_hash: Bytes::from("0x01"),
1922                    chain: Chain::Ethereum,
1923                    ts: Default::default(),
1924                },
1925                revert: false,
1926                component_tvl: [
1927                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1928                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1929                ]
1930                .into_iter()
1931                .collect(),
1932                ..Default::default()
1933            }),
1934            removed_components: [(
1935                "Component2".to_string(),
1936                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1937            )]
1938            .into_iter()
1939            .collect(),
1940        };
1941
1942        assert_eq!(second_msg, expected_second_msg);
1943    }
1944
1945    #[test_log::test(tokio::test)]
1946    async fn test_public_close_api_functionality() {
1947        // Tests the public close() API through the StateSynchronizer trait:
1948        // - close() fails before start() is called
1949        // - close() succeeds while synchronizer is running
1950        // - close() fails after already closed
1951        // This tests the full start/close lifecycle via the public API
1952
1953        let mut rpc_client = make_mock_client();
1954        let mut deltas_client = MockDeltasClient::new();
1955
1956        // Mock the initial components call
1957        rpc_client
1958            .expect_get_protocol_components()
1959            .returning(|_| {
1960                Ok(ProtocolComponentRequestResponse {
1961                    protocol_components: vec![],
1962                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1963                })
1964            });
1965
1966        // Set up deltas client that will wait for messages (blocking in state_sync)
1967        let (_tx, rx) = channel(1);
1968        deltas_client
1969            .expect_subscribe()
1970            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1971
1972        // Expect unsubscribe call during cleanup
1973        deltas_client
1974            .expect_unsubscribe()
1975            .return_once(|_| Ok(()));
1976
1977        let mut state_sync = ProtocolStateSynchronizer::new(
1978            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1979            true,
1980            ComponentFilter::with_tvl_range(0.0, 0.0),
1981            5, // Enough retries
1982            Duration::from_secs(0),
1983            true,
1984            false,
1985            true,
1986            ArcRPCClient(Arc::new(rpc_client)),
1987            ArcDeltasClient(Arc::new(deltas_client)),
1988            10000_u64, // Long timeout so task doesn't exit on its own
1989        );
1990
1991        state_sync
1992            .initialize()
1993            .await
1994            .expect("Init should succeed");
1995
1996        // Start the synchronizer and test the new split-based close mechanism
1997        let (handle, _rx) = state_sync.start().await;
1998        let (jh, close_tx) = handle.split();
1999
2000        // Give it time to start up and enter state_sync
2001        tokio::time::sleep(Duration::from_millis(100)).await;
2002
2003        // Send close signal should succeed
2004        close_tx
2005            .send(())
2006            .expect("Should be able to send close signal");
2007        // Task should stop cleanly
2008        jh.await.expect("Task should not panic");
2009    }
2010
2011    #[test_log::test(tokio::test)]
2012    async fn test_cleanup_runs_when_state_sync_processing_errors() {
2013        // Tests that cleanup code runs when state_sync() errors during delta processing.
2014        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
2015        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
2016
2017        let mut rpc_client = make_mock_client();
2018        let mut deltas_client = MockDeltasClient::new();
2019
2020        // Mock the initial components call
2021        rpc_client
2022            .expect_get_protocol_components()
2023            .returning(|_| {
2024                Ok(ProtocolComponentRequestResponse {
2025                    protocol_components: vec![],
2026                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2027                })
2028            });
2029
2030        // Mock to fail during snapshot retrieval (this will cause an error during processing)
2031        rpc_client
2032            .expect_get_protocol_states()
2033            .returning(|_| {
2034                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2035            });
2036
2037        // Set up deltas client to send one message that will trigger snapshot retrieval
2038        let (tx, rx) = channel(10);
2039        deltas_client
2040            .expect_subscribe()
2041            .return_once(move |_, _| {
2042                // Send a delta message that will require a snapshot
2043                let delta = BlockChanges {
2044                    extractor: "test".to_string(),
2045                    chain: Chain::Ethereum,
2046                    block: Block {
2047                        hash: Bytes::from("0x0123"),
2048                        number: 1,
2049                        parent_hash: Bytes::from("0x0000"),
2050                        chain: Chain::Ethereum,
2051                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2052                            .unwrap()
2053                            .naive_utc(),
2054                    },
2055                    revert: false,
2056                    // Add a new component to trigger snapshot request
2057                    new_protocol_components: [(
2058                        "new_component".to_string(),
2059                        ProtocolComponent {
2060                            id: "new_component".to_string(),
2061                            protocol_system: "test_protocol".to_string(),
2062                            protocol_type_name: "test".to_string(),
2063                            chain: Chain::Ethereum,
2064                            tokens: vec![Bytes::from("0x0badc0ffee")],
2065                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
2066                            static_attributes: Default::default(),
2067                            creation_tx: Default::default(),
2068                            created_at: Default::default(),
2069                            change: Default::default(),
2070                        },
2071                    )]
2072                    .into_iter()
2073                    .collect(),
2074                    component_tvl: [("new_component".to_string(), 100.0)]
2075                        .into_iter()
2076                        .collect(),
2077                    ..Default::default()
2078                };
2079
2080                tokio::spawn(async move {
2081                    let _ = tx.send(delta).await;
2082                    // Close the channel after sending one message
2083                });
2084
2085                Ok((Uuid::default(), rx))
2086            });
2087
2088        // Expect unsubscribe call during cleanup
2089        deltas_client
2090            .expect_unsubscribe()
2091            .return_once(|_| Ok(()));
2092
2093        let mut state_sync = ProtocolStateSynchronizer::new(
2094            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2095            true,
2096            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2097            1,
2098            Duration::from_secs(0),
2099            true,
2100            false,
2101            true,
2102            ArcRPCClient(Arc::new(rpc_client)),
2103            ArcDeltasClient(Arc::new(deltas_client)),
2104            5000_u64,
2105        );
2106
2107        state_sync
2108            .initialize()
2109            .await
2110            .expect("Init should succeed");
2111
2112        // Before calling state_sync, set a value in last_synced_block
2113        state_sync.last_synced_block = Some(BlockHeader {
2114            hash: Bytes::from("0x0badc0ffee"),
2115            number: 42,
2116            parent_hash: Bytes::from("0xbadbeef0"),
2117            revert: false,
2118            timestamp: 123456789,
2119            partial_block_index: None,
2120        });
2121
2122        // Create a channel for state_sync to send messages to
2123        let (mut block_tx, _block_rx) = channel(10);
2124
2125        // Call state_sync directly - this should error during processing
2126        let (_end_tx, end_rx) = oneshot::channel::<()>();
2127        let result = state_sync
2128            .state_sync(&mut block_tx, end_rx)
2129            .await;
2130        // Verify that state_sync returned an error
2131        assert!(result.is_err(), "state_sync should have errored during processing");
2132
2133        // Note: We can't verify internal state cleanup since state_sync consumes self,
2134        // but the cleanup logic is still tested by the fact that the method returns properly.
2135    }
2136
2137    #[test_log::test(tokio::test)]
2138    async fn test_close_signal_while_waiting_for_first_deltas() {
2139        // Tests close signal handling during the initial "waiting for deltas" phase.
2140        // This is the earliest possible close scenario - before any deltas are received.
2141        // Verifies: close signal received while waiting for first message triggers cleanup
2142        let mut rpc_client = make_mock_client();
2143        let mut deltas_client = MockDeltasClient::new();
2144
2145        rpc_client
2146            .expect_get_protocol_components()
2147            .returning(|_| {
2148                Ok(ProtocolComponentRequestResponse {
2149                    protocol_components: vec![],
2150                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2151                })
2152            });
2153
2154        let (_tx, rx) = channel(1);
2155        deltas_client
2156            .expect_subscribe()
2157            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2158
2159        deltas_client
2160            .expect_unsubscribe()
2161            .return_once(|_| Ok(()));
2162
2163        let mut state_sync = ProtocolStateSynchronizer::new(
2164            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2165            true,
2166            ComponentFilter::with_tvl_range(0.0, 0.0),
2167            1,
2168            Duration::from_secs(0),
2169            true,
2170            true,
2171            false,
2172            ArcRPCClient(Arc::new(rpc_client)),
2173            ArcDeltasClient(Arc::new(deltas_client)),
2174            10000_u64,
2175        );
2176
2177        state_sync
2178            .initialize()
2179            .await
2180            .expect("Init should succeed");
2181
2182        let (mut block_tx, _block_rx) = channel(10);
2183        let (end_tx, end_rx) = oneshot::channel::<()>();
2184
2185        // Start state_sync in a task
2186        let state_sync_handle = tokio::spawn(async move {
2187            state_sync
2188                .state_sync(&mut block_tx, end_rx)
2189                .await
2190        });
2191
2192        // Give it a moment to start
2193        tokio::time::sleep(Duration::from_millis(100)).await;
2194
2195        // Send close signal
2196        let _ = end_tx.send(());
2197
2198        // state_sync should exit cleanly
2199        let result = state_sync_handle
2200            .await
2201            .expect("Task should not panic");
2202        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2203
2204        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2205    }
2206
2207    #[test_log::test(tokio::test)]
2208    async fn test_close_signal_during_main_processing_loop() {
2209        // Tests close signal handling during the main delta processing loop.
2210        // This tests the scenario where first message is processed successfully,
2211        // then close signal is received while waiting for subsequent deltas.
2212        // Verifies: close signal in main loop (after initialization) triggers cleanup
2213
2214        let mut rpc_client = make_mock_client();
2215        let mut deltas_client = MockDeltasClient::new();
2216
2217        // Mock the initial components call
2218        rpc_client
2219            .expect_get_protocol_components()
2220            .returning(|_| {
2221                Ok(ProtocolComponentRequestResponse {
2222                    protocol_components: vec![],
2223                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2224                })
2225            });
2226
2227        // Mock the snapshot retrieval that happens after first message
2228        rpc_client
2229            .expect_get_protocol_states()
2230            .returning(|_| {
2231                Ok(ProtocolStateRequestResponse {
2232                    states: vec![],
2233                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2234                })
2235            });
2236
2237        rpc_client
2238            .expect_get_component_tvl()
2239            .returning(|_| {
2240                Ok(ComponentTvlRequestResponse {
2241                    tvl: HashMap::new(),
2242                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2243                })
2244            });
2245
2246        rpc_client
2247            .expect_get_traced_entry_points()
2248            .returning(|_| {
2249                Ok(TracedEntryPointRequestResponse {
2250                    traced_entry_points: HashMap::new(),
2251                    pagination: PaginationResponse::new(0, 20, 0),
2252                })
2253            });
2254
2255        // Set up deltas client to send one message, then keep channel open
2256        let (tx, rx) = channel(10);
2257        deltas_client
2258            .expect_subscribe()
2259            .return_once(move |_, _| {
2260                // Send first message immediately
2261                let first_delta = BlockChanges {
2262                    extractor: "test".to_string(),
2263                    chain: Chain::Ethereum,
2264                    block: Block {
2265                        hash: Bytes::from("0x0123"),
2266                        number: 1,
2267                        parent_hash: Bytes::from("0x0000"),
2268                        chain: Chain::Ethereum,
2269                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2270                            .unwrap()
2271                            .naive_utc(),
2272                    },
2273                    revert: false,
2274                    ..Default::default()
2275                };
2276
2277                tokio::spawn(async move {
2278                    let _ = tx.send(first_delta).await;
2279                    // Keep the sender alive but don't send more messages
2280                    // This will make the recv() block waiting for the next message
2281                    tokio::time::sleep(Duration::from_secs(30)).await;
2282                });
2283
2284                Ok((Uuid::default(), rx))
2285            });
2286
2287        deltas_client
2288            .expect_unsubscribe()
2289            .return_once(|_| Ok(()));
2290
2291        let mut state_sync = ProtocolStateSynchronizer::new(
2292            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2293            true,
2294            ComponentFilter::with_tvl_range(0.0, 1000.0),
2295            1,
2296            Duration::from_secs(0),
2297            true,
2298            false,
2299            true,
2300            ArcRPCClient(Arc::new(rpc_client)),
2301            ArcDeltasClient(Arc::new(deltas_client)),
2302            10000_u64,
2303        );
2304
2305        state_sync
2306            .initialize()
2307            .await
2308            .expect("Init should succeed");
2309
2310        let (mut block_tx, mut block_rx) = channel(10);
2311        let (end_tx, end_rx) = oneshot::channel::<()>();
2312
2313        // Start state_sync in a task
2314        let state_sync_handle = tokio::spawn(async move {
2315            state_sync
2316                .state_sync(&mut block_tx, end_rx)
2317                .await
2318        });
2319
2320        // Wait for the first message to be processed (snapshot sent)
2321        let first_snapshot = block_rx
2322            .recv()
2323            .await
2324            .expect("Should receive first snapshot")
2325            .expect("Synchronizer error");
2326        assert!(
2327            !first_snapshot
2328                .snapshots
2329                .states
2330                .is_empty() ||
2331                first_snapshot.deltas.is_some()
2332        );
2333        // Now send close signal - this should be handled in the main processing loop
2334        let _ = end_tx.send(());
2335
2336        // state_sync should exit cleanly after receiving close signal in main loop
2337        let result = state_sync_handle
2338            .await
2339            .expect("Task should not panic");
2340        assert!(
2341            result.is_ok(),
2342            "state_sync should exit cleanly when closed after first message: {result:?}"
2343        );
2344    }
2345
2346    #[test_log::test(tokio::test)]
2347    async fn test_max_retries_exceeded_error_propagation() {
2348        // Test that when max_retries is exceeded, the final error is sent through the channel
2349        // to the receiver and the synchronizer task exits cleanly
2350
2351        let mut rpc_client = make_mock_client();
2352        let mut deltas_client = MockDeltasClient::new();
2353
2354        // Mock the initial components call to succeed
2355        rpc_client
2356            .expect_get_protocol_components()
2357            .returning(|_| {
2358                Ok(ProtocolComponentRequestResponse {
2359                    protocol_components: vec![],
2360                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2361                })
2362            });
2363
2364        // Set up deltas client to consistently fail after subscription
2365        // This will cause connection errors and trigger retries
2366        deltas_client
2367            .expect_subscribe()
2368            .returning(|_, _| {
2369                // Return a connection error to trigger retries
2370                Err(DeltasError::NotConnected)
2371            });
2372
2373        // Expect multiple unsubscribe calls during retries
2374        deltas_client
2375            .expect_unsubscribe()
2376            .returning(|_| Ok(()))
2377            .times(0..=5);
2378
2379        // Create synchronizer with only 2 retries and short cooldown
2380        let mut state_sync = ProtocolStateSynchronizer::new(
2381            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2382            true,
2383            ComponentFilter::with_tvl_range(0.0, 1000.0),
2384            2,                         // max_retries = 2
2385            Duration::from_millis(10), // short retry cooldown
2386            true,
2387            false,
2388            true,
2389            ArcRPCClient(Arc::new(rpc_client)),
2390            ArcDeltasClient(Arc::new(deltas_client)),
2391            1000_u64,
2392        );
2393
2394        state_sync
2395            .initialize()
2396            .await
2397            .expect("Init should succeed");
2398
2399        // Start the synchronizer - it should fail to subscribe and retry
2400        let (handle, mut rx) = state_sync.start().await;
2401        let (jh, _close_tx) = handle.split();
2402
2403        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2404            .await
2405            .expect("responsds in time")
2406            .expect("channel open");
2407
2408        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2409        if let Err(err) = res {
2410            assert!(
2411                matches!(err, SynchronizerError::ConnectionClosed),
2412                "Expected ConnectionClosed error, got: {:?}",
2413                err
2414            );
2415        } else {
2416            panic!("Expected an error")
2417        }
2418
2419        // The task should complete (not hang) after max retries
2420        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2421        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2422    }
2423
2424    #[test_log::test(tokio::test)]
2425    async fn test_is_next_expected() {
2426        // Test the is_next_expected function to ensure it correctly identifies
2427        // when an incoming block is the expected next block in the chain
2428
2429        let mut state_sync = with_mocked_clients(true, false, None, None);
2430
2431        // Test 1: No previous block - should return false
2432        let incoming_header = BlockHeader {
2433            number: 100,
2434            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2435            parent_hash: Bytes::from(
2436                "0x0000000000000000000000000000000000000000000000000000000000000000",
2437            ),
2438            revert: false,
2439            timestamp: 123456789,
2440            partial_block_index: None,
2441        };
2442        assert!(
2443            !state_sync.is_next_expected(&incoming_header),
2444            "Should return false when no previous block is set"
2445        );
2446
2447        // Test 2: Set a previous block and test with matching parent hash
2448        let previous_header = BlockHeader {
2449            number: 99,
2450            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2451            parent_hash: Bytes::from(
2452                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2453            ),
2454            revert: false,
2455            timestamp: 123456788,
2456            partial_block_index: None,
2457        };
2458        state_sync.last_synced_block = Some(previous_header.clone());
2459
2460        assert!(
2461            state_sync.is_next_expected(&incoming_header),
2462            "Should return true when incoming parent_hash matches previous hash"
2463        );
2464
2465        // Test 3: Test with non-matching parent hash
2466        let non_matching_header = BlockHeader {
2467            number: 100,
2468            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2469            parent_hash: Bytes::from(
2470                "0x1111111111111111111111111111111111111111111111111111111111111111",
2471            ), // Wrong parent hash
2472            revert: false,
2473            timestamp: 123456789,
2474            partial_block_index: None,
2475        };
2476        assert!(
2477            !state_sync.is_next_expected(&non_matching_header),
2478            "Should return false when incoming parent_hash doesn't match previous hash"
2479        );
2480    }
2481
2482    #[test_log::test(tokio::test)]
2483    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2484        // Test that on synchronizer restart with the next expected block,
2485        // get_snapshot is not called and only deltas are sent
2486
2487        let mut rpc_client = make_mock_client();
2488        let mut deltas_client = MockDeltasClient::new();
2489
2490        // Mock the initial components call
2491        rpc_client
2492            .expect_get_protocol_components()
2493            .returning(|_| {
2494                Ok(ProtocolComponentRequestResponse {
2495                    protocol_components: vec![ProtocolComponent {
2496                        id: "Component1".to_string(),
2497                        ..Default::default()
2498                    }],
2499                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2500                })
2501            });
2502
2503        // Set up deltas client to send a message that is the next expected block
2504        let (tx, rx) = channel(10);
2505        deltas_client
2506            .expect_subscribe()
2507            .return_once(move |_, _| {
2508                let expected_next_delta = BlockChanges {
2509                    extractor: "uniswap-v2".to_string(),
2510                    chain: Chain::Ethereum,
2511                    block: Block {
2512                        hash: Bytes::from(
2513                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2514                        ), // This will be the next expected block
2515                        number: 2,
2516                        parent_hash: Bytes::from(
2517                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2518                        ), // This matches our last synced block hash
2519                        chain: Chain::Ethereum,
2520                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2521                            .unwrap()
2522                            .naive_utc(),
2523                    },
2524                    revert: false,
2525                    ..Default::default()
2526                };
2527
2528                tokio::spawn(async move {
2529                    let _ = tx.send(expected_next_delta).await;
2530                });
2531
2532                Ok((Uuid::default(), rx))
2533            });
2534
2535        deltas_client
2536            .expect_unsubscribe()
2537            .return_once(|_| Ok(()));
2538
2539        let mut state_sync = ProtocolStateSynchronizer::new(
2540            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2541            true,
2542            ComponentFilter::with_tvl_range(0.0, 1000.0),
2543            1,
2544            Duration::from_secs(0),
2545            true, // include_snapshots = true
2546            false,
2547            true,
2548            ArcRPCClient(Arc::new(rpc_client)),
2549            ArcDeltasClient(Arc::new(deltas_client)),
2550            10000_u64,
2551        );
2552
2553        // Initialize and set up the last synced block to simulate a restart scenario
2554        state_sync
2555            .initialize()
2556            .await
2557            .expect("Init should succeed");
2558
2559        // Set last_synced_block to simulate that we've previously synced block 1
2560        state_sync.last_synced_block = Some(BlockHeader {
2561            number: 1,
2562            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2563            parent_hash: Bytes::from(
2564                "0x0000000000000000000000000000000000000000000000000000000000000000",
2565            ),
2566            revert: false,
2567            timestamp: 123456789,
2568            partial_block_index: None,
2569        });
2570
2571        let (mut block_tx, mut block_rx) = channel(10);
2572        let (end_tx, end_rx) = oneshot::channel::<()>();
2573
2574        // Start state_sync
2575        let state_sync_handle = tokio::spawn(async move {
2576            state_sync
2577                .state_sync(&mut block_tx, end_rx)
2578                .await
2579        });
2580
2581        // Wait for the message - it should be a delta-only message (no snapshots)
2582        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2583            .await
2584            .expect("Should receive message within timeout")
2585            .expect("Channel should be open")
2586            .expect("Should not be an error");
2587
2588        // Send close signal
2589        let _ = end_tx.send(());
2590
2591        // Wait for state_sync to finish
2592        let _ = state_sync_handle
2593            .await
2594            .expect("Task should not panic");
2595
2596        // Verify the message contains deltas but no snapshots
2597        // (because we skipped snapshot retrieval)
2598        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2599        assert!(
2600            result_msg.snapshots.states.is_empty(),
2601            "Should not contain snapshots when next expected block is received"
2602        );
2603
2604        // Verify the block details match our expected next block
2605        if let Some(deltas) = &result_msg.deltas {
2606            assert_eq!(deltas.block.number, 2);
2607            assert_eq!(
2608                deltas.block.hash,
2609                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2610            );
2611            assert_eq!(
2612                deltas.block.parent_hash,
2613                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2614            );
2615        }
2616    }
2617
2618    #[test_log::test(tokio::test)]
2619    async fn test_skip_previously_processed_messages() {
2620        // Test that the synchronizer skips messages for blocks that have already been processed
2621        // This simulates a service restart scenario where old messages are re-emitted
2622
2623        let mut rpc_client = make_mock_client();
2624        let mut deltas_client = MockDeltasClient::new();
2625
2626        // Mock the initial components call
2627        rpc_client
2628            .expect_get_protocol_components()
2629            .returning(|_| {
2630                Ok(ProtocolComponentRequestResponse {
2631                    protocol_components: vec![ProtocolComponent {
2632                        id: "Component1".to_string(),
2633                        ..Default::default()
2634                    }],
2635                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2636                })
2637            });
2638
2639        // Mock snapshot calls for when we process the expected next block (block 6)
2640        rpc_client
2641            .expect_get_protocol_states()
2642            .returning(|_| {
2643                Ok(ProtocolStateRequestResponse {
2644                    states: vec![ResponseProtocolState {
2645                        component_id: "Component1".to_string(),
2646                        ..Default::default()
2647                    }],
2648                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2649                })
2650            });
2651
2652        rpc_client
2653            .expect_get_component_tvl()
2654            .returning(|_| {
2655                Ok(ComponentTvlRequestResponse {
2656                    tvl: [("Component1".to_string(), 100.0)]
2657                        .into_iter()
2658                        .collect(),
2659                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2660                })
2661            });
2662
2663        rpc_client
2664            .expect_get_traced_entry_points()
2665            .returning(|_| {
2666                Ok(TracedEntryPointRequestResponse {
2667                    traced_entry_points: HashMap::new(),
2668                    pagination: PaginationResponse::new(0, 20, 0),
2669                })
2670            });
2671
2672        // Set up deltas client to send old messages first, then the expected next block
2673        let (tx, rx) = channel(10);
2674        deltas_client
2675            .expect_subscribe()
2676            .return_once(move |_, _| {
2677                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2678                let old_messages = vec![
2679                    BlockChanges {
2680                        extractor: "uniswap-v2".to_string(),
2681                        chain: Chain::Ethereum,
2682                        block: Block {
2683                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2684                            number: 3,
2685                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2686                            chain: Chain::Ethereum,
2687                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2688                        },
2689                        revert: false,
2690                        ..Default::default()
2691                    },
2692                    BlockChanges {
2693                        extractor: "uniswap-v2".to_string(),
2694                        chain: Chain::Ethereum,
2695                        block: Block {
2696                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2697                            number: 4,
2698                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2699                            chain: Chain::Ethereum,
2700                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2701                        },
2702                        revert: false,
2703                        ..Default::default()
2704                    },
2705                    BlockChanges {
2706                        extractor: "uniswap-v2".to_string(),
2707                        chain: Chain::Ethereum,
2708                        block: Block {
2709                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2710                            number: 5,
2711                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2712                            chain: Chain::Ethereum,
2713                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2714                        },
2715                        revert: false,
2716                        ..Default::default()
2717                    },
2718                    // This is the expected next block (block 6)
2719                    BlockChanges {
2720                        extractor: "uniswap-v2".to_string(),
2721                        chain: Chain::Ethereum,
2722                        block: Block {
2723                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2724                            number: 6,
2725                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2726                            chain: Chain::Ethereum,
2727                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2728                        },
2729                        revert: false,
2730                        ..Default::default()
2731                    },
2732                ];
2733
2734                tokio::spawn(async move {
2735                    for message in old_messages {
2736                        let _ = tx.send(message).await;
2737                        tokio::time::sleep(Duration::from_millis(10)).await;
2738                    }
2739                });
2740
2741                Ok((Uuid::default(), rx))
2742            });
2743
2744        deltas_client
2745            .expect_unsubscribe()
2746            .return_once(|_| Ok(()));
2747
2748        let mut state_sync = ProtocolStateSynchronizer::new(
2749            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2750            true,
2751            ComponentFilter::with_tvl_range(0.0, 1000.0),
2752            1,
2753            Duration::from_secs(0),
2754            true,
2755            true,
2756            true,
2757            ArcRPCClient(Arc::new(rpc_client)),
2758            ArcDeltasClient(Arc::new(deltas_client)),
2759            10000_u64,
2760        );
2761
2762        // Initialize and set last_synced_block to simulate we've already processed block 5
2763        state_sync
2764            .initialize()
2765            .await
2766            .expect("Init should succeed");
2767
2768        state_sync.last_synced_block = Some(BlockHeader {
2769            number: 5,
2770            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2771            parent_hash: Bytes::from(
2772                "0x0000000000000000000000000000000000000000000000000000000000000004",
2773            ),
2774            revert: false,
2775            timestamp: 1234567892,
2776            partial_block_index: None,
2777        });
2778
2779        let (mut block_tx, mut block_rx) = channel(10);
2780        let (end_tx, end_rx) = oneshot::channel::<()>();
2781
2782        // Start state_sync
2783        let state_sync_handle = tokio::spawn(async move {
2784            state_sync
2785                .state_sync(&mut block_tx, end_rx)
2786                .await
2787        });
2788
2789        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2790        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2791            .await
2792            .expect("Should receive message within timeout")
2793            .expect("Channel should be open")
2794            .expect("Should not be an error");
2795
2796        // Send close signal
2797        let _ = end_tx.send(());
2798
2799        // Wait for state_sync to finish
2800        let _ = state_sync_handle
2801            .await
2802            .expect("Task should not panic");
2803
2804        // Verify we only got the message for block 6 (the expected next block)
2805        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2806        if let Some(deltas) = &result_msg.deltas {
2807            assert_eq!(
2808                deltas.block.number, 6,
2809                "Should only process block 6, skipping earlier blocks"
2810            );
2811            assert_eq!(
2812                deltas.block.hash,
2813                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2814            );
2815        }
2816
2817        // Verify that no additional messages are received immediately
2818        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2819        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2820            Err(_) => {
2821                // Timeout is expected - no more messages should come
2822            }
2823            Ok(Some(Err(_))) => {
2824                // Error received is also acceptable (connection closed)
2825            }
2826            Ok(Some(Ok(_))) => {
2827                panic!("Should not receive additional messages - old blocks should be skipped");
2828            }
2829            Ok(None) => {
2830                // Channel closed is also acceptable
2831            }
2832        }
2833    }
2834
2835    fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
2836        // Use vec to create Bytes from block number
2837        let hash = Bytes::from(vec![block_num as u8; 32]);
2838        let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
2839        BlockChanges {
2840            extractor: "uniswap-v2".to_string(),
2841            chain: Chain::Ethereum,
2842            block: Block {
2843                number: block_num,
2844                hash,
2845                parent_hash,
2846                chain: Chain::Ethereum,
2847                ts: Default::default(),
2848            },
2849            revert: false,
2850            partial_block_index: partial_idx,
2851            ..Default::default()
2852        }
2853    }
2854
2855    /// Test that full block as first message in partial mode is accepted
2856    #[test_log::test(tokio::test)]
2857    async fn test_partial_mode_accepts_full_block_as_first_message() {
2858        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2859        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2860            .with_partial_blocks(true);
2861        state_sync
2862            .initialize()
2863            .await
2864            .expect("Init failed");
2865
2866        let (handle, mut block_rx) = state_sync.start().await;
2867        let (jh, close_tx) = handle.split();
2868
2869        // Send full block as first message - should be accepted
2870        tx.send(make_block_changes(1, None))
2871            .await
2872            .unwrap();
2873
2874        // Should receive the full block immediately
2875        let msg = timeout(Duration::from_millis(100), block_rx.recv())
2876            .await
2877            .expect("Should receive message")
2878            .expect("Channel open")
2879            .expect("No error");
2880
2881        assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
2882        assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
2883
2884        let _ = close_tx.send(());
2885        jh.await.expect("Task should not panic");
2886    }
2887
2888    /// Test that block number increase is detected as new block
2889    #[test_log::test(tokio::test)]
2890    async fn test_partial_mode_detects_block_number_increase() {
2891        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2892        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2893            .with_partial_blocks(true);
2894        state_sync
2895            .initialize()
2896            .await
2897            .expect("Init failed");
2898
2899        let (handle, mut block_rx) = state_sync.start().await;
2900        let (jh, close_tx) = handle.split();
2901
2902        // Send partial messages for block 1 (will be skipped - waiting for new block)
2903        tx.send(make_block_changes(1, Some(0)))
2904            .await
2905            .unwrap();
2906        tx.send(make_block_changes(1, Some(3)))
2907            .await
2908            .unwrap();
2909
2910        // Verify no message received yet
2911        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2912            Err(_) => { /* Expected: timeout, no message yet */ }
2913            Ok(_) => panic!("Should not receive message while waiting for new block"),
2914        }
2915
2916        // Send partial for block 2 with HIGHER index (5 > 3) - should still be detected
2917        // because block number increased
2918        tx.send(make_block_changes(2, Some(5)))
2919            .await
2920            .unwrap();
2921
2922        // Should receive the message for block 2
2923        let msg = timeout(Duration::from_millis(100), block_rx.recv())
2924            .await
2925            .expect("Should receive message")
2926            .expect("Channel open")
2927            .expect("No error");
2928
2929        assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
2930        assert_eq!(msg.header.partial_block_index, Some(5));
2931
2932        let _ = close_tx.send(());
2933        jh.await.expect("Task should not panic");
2934    }
2935
2936    /// Test that partial mode skips new blocks that are already synced
2937    #[test_log::test(tokio::test)]
2938    async fn test_partial_mode_skips_already_synced_blocks() {
2939        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2940        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2941            .with_partial_blocks(true);
2942        state_sync
2943            .initialize()
2944            .await
2945            .expect("Init failed");
2946
2947        // Set last_synced_block to block 5 - we've already synced up to here
2948        state_sync.last_synced_block = Some(BlockHeader {
2949            number: 5,
2950            hash: Bytes::from("0x05"),
2951            parent_hash: Bytes::from("0x04"),
2952            revert: false,
2953            timestamp: 0,
2954            partial_block_index: None,
2955        });
2956
2957        let (handle, mut block_rx) = state_sync.start().await;
2958        let (jh, close_tx) = handle.split();
2959
2960        // Send partial for block 3 to establish baseline
2961        tx.send(make_block_changes(3, Some(2)))
2962            .await
2963            .unwrap();
2964
2965        // Send "new block" for block 4 (partial index decreased) - but block 4 < last_synced (5)
2966        tx.send(make_block_changes(4, Some(0)))
2967            .await
2968            .unwrap();
2969
2970        // Should be skipped because block 4 is already synced
2971        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2972            Err(_) => { /* Expected: skipped because already synced */ }
2973            Ok(_) => panic!("Should skip block 4 because it's already synced"),
2974        }
2975
2976        // Now send new block for block 6 (after last_synced)
2977        // First establish new partial index
2978        tx.send(make_block_changes(5, Some(3)))
2979            .await
2980            .unwrap();
2981        // Then trigger new block detection
2982        tx.send(make_block_changes(6, Some(0)))
2983            .await
2984            .unwrap();
2985
2986        let msg = timeout(Duration::from_millis(100), block_rx.recv())
2987            .await
2988            .expect("Should receive message")
2989            .expect("Channel open")
2990            .expect("No error");
2991
2992        assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
2993
2994        let _ = close_tx.send(());
2995        jh.await.expect("Task should not panic");
2996    }
2997}