Skip to main content

tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59
60    /// Internal error that should not happen under normal operation.
61    #[error("Internal error: {0}")]
62    Internal(String),
63}
64
65pub type SyncResult<T> = Result<T, SynchronizerError>;
66
67impl<T> From<SendError<T>> for SynchronizerError {
68    fn from(err: SendError<T>) -> Self {
69        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
70    }
71}
72
73impl From<DeltasError> for SynchronizerError {
74    fn from(err: DeltasError) -> Self {
75        match err {
76            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
77            _ => SynchronizerError::ConnectionError(err.to_string()),
78        }
79    }
80}
81
82pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
83    extractor_id: ExtractorIdentity,
84    retrieve_balances: bool,
85    rpc_client: R,
86    deltas_client: D,
87    max_retries: u64,
88    retry_cooldown: Duration,
89    include_snapshots: bool,
90    component_tracker: ComponentTracker<R>,
91    last_synced_block: Option<BlockHeader>,
92    timeout: u64,
93    include_tvl: bool,
94    compression: bool,
95    partial_blocks: bool,
96}
97
98#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
99pub struct ComponentWithState {
100    pub state: ResponseProtocolState,
101    pub component: ProtocolComponent,
102    pub component_tvl: Option<f64>,
103    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
104}
105
106#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
107pub struct Snapshot {
108    pub states: HashMap<String, ComponentWithState>,
109    pub vm_storage: HashMap<Bytes, ResponseAccount>,
110}
111
112impl Snapshot {
113    fn extend(&mut self, other: Snapshot) {
114        self.states.extend(other.states);
115        self.vm_storage.extend(other.vm_storage);
116    }
117
118    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
119        &self.states
120    }
121
122    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
123        &self.vm_storage
124    }
125}
126
127#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
128pub struct StateSyncMessage<H>
129where
130    H: HeaderLike,
131{
132    /// The block information for this update.
133    pub header: H,
134    /// Snapshot for new components.
135    pub snapshots: Snapshot,
136    /// A single delta contains state updates for all tracked components, as well as additional
137    /// information about the system components e.g. newly added components (even below tvl), tvl
138    /// updates, balance updates.
139    pub deltas: Option<BlockChanges>,
140    /// Components that stopped being tracked.
141    pub removed_components: HashMap<String, ProtocolComponent>,
142}
143
144impl<H> StateSyncMessage<H>
145where
146    H: HeaderLike,
147{
148    pub fn merge(mut self, other: Self) -> Self {
149        // be careful with removed and snapshots attributes here, these can be ambiguous.
150        self.removed_components
151            .retain(|k, _| !other.snapshots.states.contains_key(k));
152        self.snapshots
153            .states
154            .retain(|k, _| !other.removed_components.contains_key(k));
155
156        self.snapshots.extend(other.snapshots);
157        let deltas = match (self.deltas, other.deltas) {
158            (Some(l), Some(r)) => Some(l.merge(r)),
159            (None, Some(r)) => Some(r),
160            (Some(l), None) => Some(l),
161            (None, None) => None,
162        };
163        self.removed_components
164            .extend(other.removed_components);
165        Self {
166            header: other.header,
167            snapshots: self.snapshots,
168            deltas,
169            removed_components: self.removed_components,
170        }
171    }
172}
173
174/// Handle for controlling a running synchronizer task.
175///
176/// This handle provides methods to gracefully shut down the synchronizer
177/// and await its completion with a timeout.
178pub struct SynchronizerTaskHandle {
179    join_handle: JoinHandle<()>,
180    close_tx: oneshot::Sender<()>,
181}
182
183/// StateSynchronizer
184///
185/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
186/// delivering messages to the client that let him reconstruct subsets of the protocol state.
187///
188/// This involves deciding which components to track according to the clients preferences,
189/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
190/// delivering delta messages for the components that have changed.
191impl SynchronizerTaskHandle {
192    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
193        Self { join_handle, close_tx }
194    }
195
196    /// Splits the handle into its join handle and close sender.
197    ///
198    /// This allows monitoring the task completion separately from controlling shutdown.
199    /// The join handle can be used with FuturesUnordered for monitoring, while the
200    /// close sender can be used to signal graceful shutdown.
201    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
202        (self.join_handle, self.close_tx)
203    }
204}
205
206#[async_trait]
207pub trait StateSynchronizer: Send + Sync + 'static {
208    async fn initialize(&mut self) -> SyncResult<()>;
209    /// Starts the state synchronization, consuming the synchronizer.
210    /// Returns a handle for controlling the running task and a receiver for messages.
211    async fn start(
212        mut self,
213    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
214}
215
216impl<R, D> ProtocolStateSynchronizer<R, D>
217where
218    // TODO: Consider moving these constraints directly to the
219    // client...
220    R: RPCClient + Clone + Send + Sync + 'static,
221    D: DeltasClient + Clone + Send + Sync + 'static,
222{
223    /// Creates a new state synchronizer.
224    #[allow(clippy::too_many_arguments)]
225    pub fn new(
226        extractor_id: ExtractorIdentity,
227        retrieve_balances: bool,
228        component_filter: ComponentFilter,
229        max_retries: u64,
230        retry_cooldown: Duration,
231        include_snapshots: bool,
232        include_tvl: bool,
233        compression: bool,
234        rpc_client: R,
235        deltas_client: D,
236        timeout: u64,
237    ) -> Self {
238        Self {
239            extractor_id: extractor_id.clone(),
240            retrieve_balances,
241            rpc_client: rpc_client.clone(),
242            include_snapshots,
243            deltas_client,
244            component_tracker: ComponentTracker::new(
245                extractor_id.chain,
246                extractor_id.name.as_str(),
247                component_filter,
248                rpc_client,
249            ),
250            max_retries,
251            retry_cooldown,
252            last_synced_block: None,
253            timeout,
254            include_tvl,
255            compression,
256            partial_blocks: false,
257        }
258    }
259
260    /// Enables receiving partial block updates.
261    pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
262        self.partial_blocks = partial_blocks;
263        self
264    }
265
266    /// Retrieves state snapshots of the requested components
267    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
268        &mut self,
269        header: BlockHeader,
270        ids: Option<I>,
271    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
272        if !self.include_snapshots {
273            return Ok(StateSyncMessage { header, ..Default::default() });
274        }
275
276        // Use given ids or use all if not passed
277        let component_ids: Vec<_> = match ids {
278            Some(ids) => ids.into_iter().cloned().collect(),
279            None => self
280                .component_tracker
281                .get_tracked_component_ids(),
282        };
283
284        if component_ids.is_empty() {
285            return Ok(StateSyncMessage { header, ..Default::default() });
286        }
287
288        // TODO: Find a smarter way to dynamically detect DCI protocols.
289        const DCI_PROTOCOLS: &[&str] = &[
290            "uniswap_v4_hooks",
291            "vm:curve",
292            "vm:balancer_v2",
293            "vm:balancer_v3",
294            "fluid_v1",
295            "erc4626",
296        ];
297        let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
298            let result = self
299                .rpc_client
300                .get_traced_entry_points_paginated(
301                    self.extractor_id.chain,
302                    &self.extractor_id.name,
303                    &component_ids,
304                    None,
305                    RPC_CLIENT_CONCURRENCY,
306                )
307                .await?;
308            self.component_tracker
309                .process_entrypoints(&result.clone().into());
310            result.traced_entry_points.clone()
311        } else {
312            HashMap::new()
313        };
314
315        // Get contract IDs from component tracker
316        let contract_ids: Vec<Bytes> = self
317            .component_tracker
318            .get_contracts_by_component(&component_ids)
319            .into_iter()
320            .collect();
321
322        // Filter components to only include the requested ones
323        let filtered_components: HashMap<_, _> = self
324            .component_tracker
325            .components
326            .iter()
327            .filter(|(id, _)| component_ids.contains(id))
328            .map(|(k, v)| (k.clone(), v.clone()))
329            .collect();
330
331        let request = SnapshotParameters::new(
332            self.extractor_id.chain,
333            &self.extractor_id.name,
334            &filtered_components,
335            &contract_ids,
336            header.number,
337        )
338        .entrypoints(&entrypoints_result)
339        .include_balances(self.retrieve_balances)
340        .include_tvl(self.include_tvl);
341        let snapshot_response = self
342            .rpc_client
343            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
344            .await?;
345
346        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
347        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
348
349        Ok(StateSyncMessage {
350            header,
351            snapshots: snapshot_response,
352            deltas: None,
353            removed_components: HashMap::new(),
354        })
355    }
356
357    /// Main method that does all the work.
358    ///
359    /// ## Return Value
360    ///
361    /// Returns a `Result` where:
362    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
363    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
364    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
365    ///   retry)
366    ///
367    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
368    /// signal may still arrive and we want to remain cancellable across retries.
369    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
370    async fn state_sync(
371        &mut self,
372        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
373        mut end_rx: oneshot::Receiver<()>,
374    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
375        // initialisation
376        let subscription_options = SubscriptionOptions::new()
377            .with_state(self.include_snapshots)
378            .with_compression(self.compression)
379            .with_partial_blocks(self.partial_blocks);
380        let (subscription_id, mut msg_rx) = match self
381            .deltas_client
382            .subscribe(self.extractor_id.clone(), subscription_options)
383            .await
384        {
385            Ok(result) => result,
386            Err(e) => return Err((e.into(), Some(end_rx))),
387        };
388
389        let result = async {
390            info!("Waiting for deltas...");
391            let mut warned_waiting_for_new_block = false;
392            let mut warned_skipping_synced = false;
393            // Track the last seen block number such that we know when we get the first partial
394            let mut last_block_number: Option<u64> = None;
395            let mut first_msg = loop {
396                let msg = select! {
397                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
398                        deltas_result
399                            .map_err(|_| {
400                                SynchronizerError::Timeout(format!(
401                                    "First deltas took longer than {t}s to arrive",
402                                    t = self.timeout
403                                ))
404                            })?
405                            .ok_or_else(|| {
406                                SynchronizerError::ConnectionError(
407                                    "Deltas channel closed before first message".to_string(),
408                                )
409                            })?
410                    },
411                    _ = &mut end_rx => {
412                        info!("Received close signal while waiting for first deltas");
413                        return Ok(());
414                    }
415                };
416
417                let incoming: BlockHeader = (&msg).into();
418
419                // Determine if this message is a candidate for starting synchronization.
420                // In partial mode, we wait for a new block to start (block number increases).
421                // In non-partial mode, all messages are candidates.
422                let is_new_block_candidate = if self.partial_blocks {
423                    match msg.partial_block_index {
424                        None => {
425                            // If we get a full block, it is a candidate
426                            last_block_number = Some(incoming.number);
427                            true
428                        }
429                        Some(current_partial_idx) => {
430                            let is_new_block = last_block_number
431                                .map(|prev_block| incoming.number > prev_block)
432                                .unwrap_or(false);
433
434                            if !warned_waiting_for_new_block {
435                                info!(
436                                    extractor=%self.extractor_id,
437                                    block=incoming.number,
438                                    partial_idx=current_partial_idx,
439                                    "Syncing. Waiting for new block to start"
440                                );
441                                warned_waiting_for_new_block = true;
442                            }
443                            last_block_number = Some(incoming.number);
444                            is_new_block
445                        }
446                    }
447                } else {
448                    true // Non-partial mode: all messages are candidates
449                };
450
451                if !is_new_block_candidate {
452                    continue;
453                }
454
455                // Check if we've already synced this block (applies to both modes)
456                if let Some(current) = &self.last_synced_block {
457                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
458                        if !warned_skipping_synced {
459                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping already synced block");
460                            warned_skipping_synced = true;
461                        }
462                        continue;
463                    }
464                }
465                break msg;
466            };
467
468            self.filter_deltas(&mut first_msg);
469
470            // initial snapshot
471            info!(height = first_msg.get_block().number, "First deltas received");
472            let header: BlockHeader = (&first_msg).into();
473            let deltas_msg = StateSyncMessage {
474                header: header.clone(),
475                snapshots: Default::default(),
476                deltas: Some(first_msg),
477                removed_components: Default::default(),
478            };
479
480            // If possible skip retrieving snapshots
481            let msg = if !self.is_next_expected(&header) {
482                info!("Retrieving snapshot");
483                 let snapshot_header = BlockHeader { revert: false, ..header.clone() };
484                let snapshot = self
485                    .get_snapshots::<Vec<&String>>(
486                        snapshot_header,
487                        None,
488                    )
489                    .await?
490                    .merge(deltas_msg);
491                let n_components = self.component_tracker.components.len();
492                let n_snapshots = snapshot.snapshots.states.len();
493                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
494                snapshot
495            } else {
496                deltas_msg
497            };
498            block_tx.send(Ok(msg)).await?;
499            self.last_synced_block = Some(header.clone());
500            loop {
501                select! {
502                    deltas_opt = msg_rx.recv() => {
503                        if let Some(mut deltas) = deltas_opt {
504                            let header: BlockHeader = (&deltas).into();
505                            debug!(block_number=?header.number, "Received delta message");
506
507                            let (snapshots, removed_components) = {
508                                // 1. Remove components based on latest changes
509                                // 2. Add components based on latest changes, query those for snapshots
510                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
511
512                                // Only components we don't track yet need a snapshot,
513                                let requiring_snapshot: Vec<_> = to_add
514                                    .iter()
515                                    .filter(|id| {
516                                        !self.component_tracker
517                                            .components
518                                            .contains_key(id.as_str())
519                                    })
520                                    .collect();
521                                debug!(components=?requiring_snapshot, "SnapshotRequest");
522                                self.component_tracker
523                                    .start_tracking(requiring_snapshot.as_slice())
524                                    .await?;
525
526                                let snapshots = self
527                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
528                                    .await?
529                                    .snapshots;
530
531                                let removed_components = if !to_remove.is_empty() {
532                                    self.component_tracker.stop_tracking(&to_remove)
533                                } else {
534                                    Default::default()
535                                };
536
537                                (snapshots, removed_components)
538                            };
539
540                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
541                            self.component_tracker.process_entrypoints(&deltas.dci_update);
542
543                            // 4. Filter deltas by currently tracked components / contracts
544                            self.filter_deltas(&mut deltas);
545                            let n_changes = deltas.n_changes();
546
547                            // 5. Send the message
548                            let next = StateSyncMessage {
549                                header: header.clone(),
550                                snapshots,
551                                deltas: Some(deltas),
552                                removed_components,
553                            };
554                            block_tx.send(Ok(next)).await?;
555                            self.last_synced_block = Some(header.clone());
556
557                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
558                        } else {
559                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
560                        }
561                    },
562                    _ = &mut end_rx => {
563                        info!("Received close signal during state_sync");
564                        return Ok(());
565                    }
566                }
567            }
568        }.await;
569
570        // This cleanup code now runs regardless of how the function exits (error or channel close)
571        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
572        //Ignore error
573        let _ = self
574            .deltas_client
575            .unsubscribe(subscription_id)
576            .await
577            .map_err(|err| {
578                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
579            });
580
581        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
582        // whether the end_rx was consumed (close signal received) or not
583        match result {
584            Ok(()) => Ok(()), // Success, likely due to close signal
585            Err(e) => {
586                // The error came from the inner async block. Since the async block
587                // can receive close signals (which would return Ok), any error means
588                // the close signal was NOT received, so we can return the end_rx for retry
589                Err((e, Some(end_rx)))
590            }
591        }
592    }
593
594    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
595        if let Some(block) = self.last_synced_block.as_ref() {
596            return incoming.parent_hash == block.hash;
597        }
598        false
599    }
600    fn filter_deltas(&self, deltas: &mut BlockChanges) {
601        deltas.filter_by_component(|id| {
602            self.component_tracker
603                .components
604                .contains_key(id)
605        });
606        deltas.filter_by_contract(|id| {
607            self.component_tracker
608                .contracts
609                .contains(id)
610        });
611    }
612}
613
614#[async_trait]
615impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
616where
617    R: RPCClient + Clone + Send + Sync + 'static,
618    D: DeltasClient + Clone + Send + Sync + 'static,
619{
620    async fn initialize(&mut self) -> SyncResult<()> {
621        info!("Retrieving relevant protocol components");
622        self.component_tracker
623            .initialise_components()
624            .await?;
625        info!(
626            n_components = self.component_tracker.components.len(),
627            n_contracts = self.component_tracker.contracts.len(),
628            "Finished retrieving components",
629        );
630
631        Ok(())
632    }
633
634    async fn start(
635        mut self,
636    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
637        let (mut tx, rx) = channel(15);
638        let (end_tx, end_rx) = oneshot::channel::<()>();
639
640        let jh = tokio::spawn(async move {
641            let mut retry_count = 0;
642            let mut current_end_rx = end_rx;
643            let mut final_error = None;
644
645            while retry_count < self.max_retries {
646                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
647
648                let res = self
649                    .state_sync(&mut tx, current_end_rx)
650                    .await;
651                match res {
652                    Ok(()) => {
653                        info!(
654                            extractor_id=%&self.extractor_id,
655                            retry_count,
656                            "State synchronization exited cleanly"
657                        );
658                        return;
659                    }
660                    Err((e, maybe_end_rx)) => {
661                        warn!(
662                            extractor_id=%&self.extractor_id,
663                            retry_count,
664                            error=%e,
665                            "State synchronization errored!"
666                        );
667
668                        // If we have the end_rx back, we can retry
669                        if let Some(recovered_end_rx) = maybe_end_rx {
670                            current_end_rx = recovered_end_rx;
671
672                            if let SynchronizerError::ConnectionClosed = e {
673                                // break synchronization loop if websocket client is dead
674                                error!(
675                                    "Websocket connection closed. State synchronization exiting."
676                                );
677                                let _ = tx.send(Err(e)).await;
678                                return;
679                            } else {
680                                // Store error in case this is our last retry
681                                final_error = Some(e);
682                            }
683                        } else {
684                            // Close signal was received, exit cleanly
685                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
686                            return;
687                        }
688                    }
689                }
690                sleep(self.retry_cooldown).await;
691                retry_count += 1;
692            }
693            if let Some(e) = final_error {
694                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
695                let _ = tx.send(Err(e)).await;
696            }
697        });
698
699        let handle = SynchronizerTaskHandle::new(jh, end_tx);
700        (handle, rx)
701    }
702}
703
704#[cfg(test)]
705mod test {
706    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
707    //!
708    //! ## Test Coverage Strategy:
709    //!
710    //! ### Shutdown & Close Signal Tests:
711    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
712    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
713    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
714    //!
715    //! ### Cleanup & Error Handling Tests:
716    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
717    //!
718    //! ### Coverage Summary:
719    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
720    //! ✓ Close signal before first deltas   ✓ Close signal during processing
721    //! ✓ Processing errors                  ✓ Channel closure
722    //! ✓ Public API close operations        ✓ Normal completion
723
724    use std::{collections::HashSet, sync::Arc};
725
726    use tycho_common::dto::{
727        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
728        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
729        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
730        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
731        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
732        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
733    };
734    use uuid::Uuid;
735
736    use super::*;
737    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
738
739    // Required for mock client to implement clone
740    struct ArcRPCClient<T>(Arc<T>);
741
742    // Default derive(Clone) does require T to be Clone as well.
743    impl<T> Clone for ArcRPCClient<T> {
744        fn clone(&self) -> Self {
745            ArcRPCClient(self.0.clone())
746        }
747    }
748
749    #[async_trait]
750    impl<T> RPCClient for ArcRPCClient<T>
751    where
752        T: RPCClient + Sync + Send + 'static,
753    {
754        async fn get_tokens(
755            &self,
756            request: &TokensRequestBody,
757        ) -> Result<TokensRequestResponse, RPCError> {
758            self.0.get_tokens(request).await
759        }
760
761        async fn get_contract_state(
762            &self,
763            request: &StateRequestBody,
764        ) -> Result<StateRequestResponse, RPCError> {
765            self.0.get_contract_state(request).await
766        }
767
768        async fn get_protocol_components(
769            &self,
770            request: &ProtocolComponentsRequestBody,
771        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
772            self.0
773                .get_protocol_components(request)
774                .await
775        }
776
777        async fn get_protocol_states(
778            &self,
779            request: &ProtocolStateRequestBody,
780        ) -> Result<ProtocolStateRequestResponse, RPCError> {
781            self.0
782                .get_protocol_states(request)
783                .await
784        }
785
786        async fn get_protocol_systems(
787            &self,
788            request: &ProtocolSystemsRequestBody,
789        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
790            self.0
791                .get_protocol_systems(request)
792                .await
793        }
794
795        async fn get_component_tvl(
796            &self,
797            request: &ComponentTvlRequestBody,
798        ) -> Result<ComponentTvlRequestResponse, RPCError> {
799            self.0.get_component_tvl(request).await
800        }
801
802        async fn get_traced_entry_points(
803            &self,
804            request: &TracedEntryPointRequestBody,
805        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
806            self.0
807                .get_traced_entry_points(request)
808                .await
809        }
810
811        async fn get_snapshots<'a>(
812            &self,
813            request: &SnapshotParameters<'a>,
814            chunk_size: Option<usize>,
815            concurrency: usize,
816        ) -> Result<Snapshot, RPCError> {
817            self.0
818                .get_snapshots(request, chunk_size, concurrency)
819                .await
820        }
821
822        fn compression(&self) -> bool {
823            self.0.compression()
824        }
825    }
826
827    // Required for mock client to implement clone
828    struct ArcDeltasClient<T>(Arc<T>);
829
830    // Default derive(Clone) does require T to be Clone as well.
831    impl<T> Clone for ArcDeltasClient<T> {
832        fn clone(&self) -> Self {
833            ArcDeltasClient(self.0.clone())
834        }
835    }
836
837    #[async_trait]
838    impl<T> DeltasClient for ArcDeltasClient<T>
839    where
840        T: DeltasClient + Sync + Send + 'static,
841    {
842        async fn subscribe(
843            &self,
844            extractor_id: ExtractorIdentity,
845            options: SubscriptionOptions,
846        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
847            self.0
848                .subscribe(extractor_id, options)
849                .await
850        }
851
852        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
853            self.0
854                .unsubscribe(subscription_id)
855                .await
856        }
857
858        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
859            self.0.connect().await
860        }
861
862        async fn close(&self) -> Result<(), DeltasError> {
863            self.0.close().await
864        }
865    }
866
867    fn with_mocked_clients(
868        native: bool,
869        include_tvl: bool,
870        rpc_client: Option<MockRPCClient>,
871        deltas_client: Option<MockDeltasClient>,
872    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
873    {
874        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
875        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
876
877        ProtocolStateSynchronizer::new(
878            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
879            native,
880            ComponentFilter::with_tvl_range(50.0, 50.0),
881            1,
882            Duration::from_secs(0),
883            true,
884            include_tvl,
885            true, // Does not matter as we mock the client that never compresses
886            rpc_client,
887            deltas_client,
888            10_u64,
889        )
890    }
891
892    fn state_snapshot_native() -> ProtocolStateRequestResponse {
893        ProtocolStateRequestResponse {
894            states: vec![ResponseProtocolState {
895                component_id: "Component1".to_string(),
896                ..Default::default()
897            }],
898            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
899        }
900    }
901
902    fn make_mock_client() -> MockRPCClient {
903        let mut m = MockRPCClient::new();
904        m.expect_compression()
905            .return_const(false);
906        m
907    }
908
909    #[test_log::test(tokio::test)]
910    async fn test_get_snapshots_native() {
911        let header = BlockHeader::default();
912        let mut rpc = make_mock_client();
913        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
914
915        let component_clone = component.clone();
916        rpc.expect_get_snapshots()
917            .returning(move |_request, _chunk_size, _concurrency| {
918                Ok(Snapshot {
919                    states: state_snapshot_native()
920                        .states
921                        .into_iter()
922                        .map(|state| {
923                            (
924                                state.component_id.clone(),
925                                ComponentWithState {
926                                    state,
927                                    component: component_clone.clone(),
928                                    entrypoints: vec![],
929                                    component_tvl: None,
930                                },
931                            )
932                        })
933                        .collect(),
934                    vm_storage: HashMap::new(),
935                })
936            });
937
938        rpc.expect_get_traced_entry_points()
939            .returning(|_| {
940                Ok(TracedEntryPointRequestResponse {
941                    traced_entry_points: HashMap::new(),
942                    pagination: PaginationResponse::new(0, 20, 0),
943                })
944            });
945
946        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
947        state_sync
948            .component_tracker
949            .components
950            .insert("Component1".to_string(), component.clone());
951        let components_arg = ["Component1".to_string()];
952        let exp = StateSyncMessage {
953            header: header.clone(),
954            snapshots: Snapshot {
955                states: state_snapshot_native()
956                    .states
957                    .into_iter()
958                    .map(|state| {
959                        (
960                            state.component_id.clone(),
961                            ComponentWithState {
962                                state,
963                                component: component.clone(),
964                                entrypoints: vec![],
965                                component_tvl: None,
966                            },
967                        )
968                    })
969                    .collect(),
970                vm_storage: HashMap::new(),
971            },
972            deltas: None,
973            removed_components: Default::default(),
974        };
975
976        let snap = state_sync
977            .get_snapshots(header, Some(&components_arg))
978            .await
979            .expect("Retrieving snapshot failed");
980
981        assert_eq!(snap, exp);
982    }
983
984    #[test_log::test(tokio::test)]
985    async fn test_get_snapshots_native_with_tvl() {
986        let header = BlockHeader::default();
987        let mut rpc = make_mock_client();
988        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
989
990        let component_clone = component.clone();
991        rpc.expect_get_snapshots()
992            .returning(move |_request, _chunk_size, _concurrency| {
993                Ok(Snapshot {
994                    states: state_snapshot_native()
995                        .states
996                        .into_iter()
997                        .map(|state| {
998                            (
999                                state.component_id.clone(),
1000                                ComponentWithState {
1001                                    state,
1002                                    component: component_clone.clone(),
1003                                    component_tvl: Some(100.0),
1004                                    entrypoints: vec![],
1005                                },
1006                            )
1007                        })
1008                        .collect(),
1009                    vm_storage: HashMap::new(),
1010                })
1011            });
1012
1013        rpc.expect_get_traced_entry_points()
1014            .returning(|_| {
1015                Ok(TracedEntryPointRequestResponse {
1016                    traced_entry_points: HashMap::new(),
1017                    pagination: PaginationResponse::new(0, 20, 0),
1018                })
1019            });
1020
1021        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1022        state_sync
1023            .component_tracker
1024            .components
1025            .insert("Component1".to_string(), component.clone());
1026        let components_arg = ["Component1".to_string()];
1027        let exp = StateSyncMessage {
1028            header: header.clone(),
1029            snapshots: Snapshot {
1030                states: state_snapshot_native()
1031                    .states
1032                    .into_iter()
1033                    .map(|state| {
1034                        (
1035                            state.component_id.clone(),
1036                            ComponentWithState {
1037                                state,
1038                                component: component.clone(),
1039                                component_tvl: Some(100.0),
1040                                entrypoints: vec![],
1041                            },
1042                        )
1043                    })
1044                    .collect(),
1045                vm_storage: HashMap::new(),
1046            },
1047            deltas: None,
1048            removed_components: Default::default(),
1049        };
1050
1051        let snap = state_sync
1052            .get_snapshots(header, Some(&components_arg))
1053            .await
1054            .expect("Retrieving snapshot failed");
1055
1056        assert_eq!(snap, exp);
1057    }
1058
1059    fn state_snapshot_vm() -> StateRequestResponse {
1060        StateRequestResponse {
1061            accounts: vec![
1062                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1063                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1064            ],
1065            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1066        }
1067    }
1068
1069    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1070        TracedEntryPointRequestResponse {
1071            traced_entry_points: HashMap::from([(
1072                "Component1".to_string(),
1073                vec![(
1074                    EntryPointWithTracingParams {
1075                        entry_point: EntryPoint {
1076                            external_id: "entrypoint_a".to_string(),
1077                            target: Bytes::from("0x0badc0ffee"),
1078                            signature: "sig()".to_string(),
1079                        },
1080                        params: TracingParams::RPCTracer(RPCTracerParams {
1081                            caller: Some(Bytes::from("0x0badc0ffee")),
1082                            calldata: Bytes::from("0x0badc0ffee"),
1083                            state_overrides: None,
1084                            prune_addresses: None,
1085                        }),
1086                    },
1087                    TracingResult {
1088                        retriggers: HashSet::from([(
1089                            Bytes::from("0x0badc0ffee"),
1090                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1091                        )]),
1092                        accessed_slots: HashMap::from([(
1093                            Bytes::from("0x0badc0ffee"),
1094                            HashSet::from([Bytes::from("0xbadbeef0")]),
1095                        )]),
1096                    },
1097                )],
1098            )]),
1099            pagination: PaginationResponse::new(0, 20, 0),
1100        }
1101    }
1102
1103    #[test_log::test(tokio::test)]
1104    async fn test_get_snapshots_vm() {
1105        let header = BlockHeader::default();
1106        let mut rpc = make_mock_client();
1107
1108        let traced_ep_response = traced_entry_point_response();
1109        rpc.expect_get_snapshots()
1110            .returning(move |_request, _chunk_size, _concurrency| {
1111                let vm_storage_accounts = state_snapshot_vm();
1112                Ok(Snapshot {
1113                    states: [(
1114                        "Component1".to_string(),
1115                        ComponentWithState {
1116                            state: ResponseProtocolState {
1117                                component_id: "Component1".to_string(),
1118                                ..Default::default()
1119                            },
1120                            component: ProtocolComponent {
1121                                id: "Component1".to_string(),
1122                                contract_ids: vec![
1123                                    Bytes::from("0x0badc0ffee"),
1124                                    Bytes::from("0xbabe42"),
1125                                ],
1126                                ..Default::default()
1127                            },
1128                            component_tvl: None,
1129                            entrypoints: traced_ep_response
1130                                .traced_entry_points
1131                                .get("Component1")
1132                                .cloned()
1133                                .unwrap_or_default(),
1134                        },
1135                    )]
1136                    .into_iter()
1137                    .collect(),
1138                    vm_storage: vm_storage_accounts
1139                        .accounts
1140                        .into_iter()
1141                        .map(|state| (state.address.clone(), state))
1142                        .collect(),
1143                })
1144            });
1145
1146        rpc.expect_get_traced_entry_points()
1147            .returning(|_| Ok(traced_entry_point_response()));
1148
1149        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1150        let component = ProtocolComponent {
1151            id: "Component1".to_string(),
1152            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1153            ..Default::default()
1154        };
1155        state_sync
1156            .component_tracker
1157            .components
1158            .insert("Component1".to_string(), component.clone());
1159        let components_arg = ["Component1".to_string()];
1160        let exp = StateSyncMessage {
1161            header: header.clone(),
1162            snapshots: Snapshot {
1163                states: [(
1164                    component.id.clone(),
1165                    ComponentWithState {
1166                        state: ResponseProtocolState {
1167                            component_id: "Component1".to_string(),
1168                            ..Default::default()
1169                        },
1170                        component: component.clone(),
1171                        component_tvl: None,
1172                        entrypoints: vec![(
1173                            EntryPointWithTracingParams {
1174                                entry_point: EntryPoint {
1175                                    external_id: "entrypoint_a".to_string(),
1176                                    target: Bytes::from("0x0badc0ffee"),
1177                                    signature: "sig()".to_string(),
1178                                },
1179                                params: TracingParams::RPCTracer(RPCTracerParams {
1180                                    caller: Some(Bytes::from("0x0badc0ffee")),
1181                                    calldata: Bytes::from("0x0badc0ffee"),
1182                                    state_overrides: None,
1183                                    prune_addresses: None,
1184                                }),
1185                            },
1186                            TracingResult {
1187                                retriggers: HashSet::from([(
1188                                    Bytes::from("0x0badc0ffee"),
1189                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1190                                )]),
1191                                accessed_slots: HashMap::from([(
1192                                    Bytes::from("0x0badc0ffee"),
1193                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1194                                )]),
1195                            },
1196                        )],
1197                    },
1198                )]
1199                .into_iter()
1200                .collect(),
1201                vm_storage: state_snapshot_vm()
1202                    .accounts
1203                    .into_iter()
1204                    .map(|state| (state.address.clone(), state))
1205                    .collect(),
1206            },
1207            deltas: None,
1208            removed_components: Default::default(),
1209        };
1210
1211        let snap = state_sync
1212            .get_snapshots(header, Some(&components_arg))
1213            .await
1214            .expect("Retrieving snapshot failed");
1215
1216        assert_eq!(snap, exp);
1217    }
1218
1219    #[test_log::test(tokio::test)]
1220    async fn test_get_snapshots_vm_with_tvl() {
1221        let header = BlockHeader::default();
1222        let mut rpc = make_mock_client();
1223        let component = ProtocolComponent {
1224            id: "Component1".to_string(),
1225            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1226            ..Default::default()
1227        };
1228
1229        let component_clone = component.clone();
1230        rpc.expect_get_snapshots()
1231            .returning(move |_request, _chunk_size, _concurrency| {
1232                let vm_storage_accounts = state_snapshot_vm();
1233                Ok(Snapshot {
1234                    states: [(
1235                        "Component1".to_string(),
1236                        ComponentWithState {
1237                            state: ResponseProtocolState {
1238                                component_id: "Component1".to_string(),
1239                                ..Default::default()
1240                            },
1241                            component: component_clone.clone(),
1242                            component_tvl: Some(100.0),
1243                            entrypoints: vec![],
1244                        },
1245                    )]
1246                    .into_iter()
1247                    .collect(),
1248                    vm_storage: vm_storage_accounts
1249                        .accounts
1250                        .into_iter()
1251                        .map(|state| (state.address.clone(), state))
1252                        .collect(),
1253                })
1254            });
1255
1256        rpc.expect_get_traced_entry_points()
1257            .returning(|_| {
1258                Ok(TracedEntryPointRequestResponse {
1259                    traced_entry_points: HashMap::new(),
1260                    pagination: PaginationResponse::new(0, 20, 0),
1261                })
1262            });
1263
1264        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1265        state_sync
1266            .component_tracker
1267            .components
1268            .insert("Component1".to_string(), component.clone());
1269        let components_arg = ["Component1".to_string()];
1270        let exp = StateSyncMessage {
1271            header: header.clone(),
1272            snapshots: Snapshot {
1273                states: [(
1274                    component.id.clone(),
1275                    ComponentWithState {
1276                        state: ResponseProtocolState {
1277                            component_id: "Component1".to_string(),
1278                            ..Default::default()
1279                        },
1280                        component: component.clone(),
1281                        component_tvl: Some(100.0),
1282                        entrypoints: vec![],
1283                    },
1284                )]
1285                .into_iter()
1286                .collect(),
1287                vm_storage: state_snapshot_vm()
1288                    .accounts
1289                    .into_iter()
1290                    .map(|state| (state.address.clone(), state))
1291                    .collect(),
1292            },
1293            deltas: None,
1294            removed_components: Default::default(),
1295        };
1296
1297        let snap = state_sync
1298            .get_snapshots(header, Some(&components_arg))
1299            .await
1300            .expect("Retrieving snapshot failed");
1301
1302        assert_eq!(snap, exp);
1303    }
1304
1305    /// Test that get_snapshots only fetches snapshots for requested components,
1306    /// not all tracked components. This prevents returning full snapshots repeatedly
1307    /// when only a subset of components need updates.
1308    #[test_log::test(tokio::test)]
1309    async fn test_get_snapshots_filters_to_requested_components_only() {
1310        let header = BlockHeader::default();
1311        let mut rpc = make_mock_client();
1312
1313        // Create three components
1314        let component1 = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1315        let component2 = ProtocolComponent { id: "Component2".to_string(), ..Default::default() };
1316        let component3 = ProtocolComponent { id: "Component3".to_string(), ..Default::default() };
1317
1318        let component2_clone = component2.clone();
1319
1320        // Mock the RPC call and verify it only receives Component2
1321        rpc.expect_get_snapshots()
1322            .withf(
1323                |request: &SnapshotParameters,
1324                 _chunk_size: &Option<usize>,
1325                 _concurrency: &usize| {
1326                    // Verify that the request contains ONLY Component2, not all tracked components
1327                    request.components.len() == 1 &&
1328                        request
1329                            .components
1330                            .contains_key("Component2")
1331                },
1332            )
1333            .times(1)
1334            .returning(move |_request, _chunk_size, _concurrency| {
1335                Ok(Snapshot {
1336                    states: [(
1337                        "Component2".to_string(),
1338                        ComponentWithState {
1339                            state: ResponseProtocolState {
1340                                component_id: "Component2".to_string(),
1341                                ..Default::default()
1342                            },
1343                            component: component2_clone.clone(),
1344                            entrypoints: vec![],
1345                            component_tvl: None,
1346                        },
1347                    )]
1348                    .into_iter()
1349                    .collect(),
1350                    vm_storage: HashMap::new(),
1351                })
1352            });
1353
1354        rpc.expect_get_traced_entry_points()
1355            .returning(|_| {
1356                Ok(TracedEntryPointRequestResponse {
1357                    traced_entry_points: HashMap::new(),
1358                    pagination: PaginationResponse::new(0, 20, 0),
1359                })
1360            });
1361
1362        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
1363
1364        // Track all three components
1365        state_sync
1366            .component_tracker
1367            .components
1368            .insert("Component1".to_string(), component1.clone());
1369        state_sync
1370            .component_tracker
1371            .components
1372            .insert("Component2".to_string(), component2.clone());
1373        state_sync
1374            .component_tracker
1375            .components
1376            .insert("Component3".to_string(), component3.clone());
1377
1378        // Request snapshot for ONLY Component2
1379        let components_arg = ["Component2".to_string()];
1380
1381        let snap = state_sync
1382            .get_snapshots(header.clone(), Some(&components_arg))
1383            .await
1384            .expect("Retrieving snapshot failed");
1385
1386        // Verify we only got Component2 back
1387        assert_eq!(snap.snapshots.states.len(), 1);
1388        assert!(snap
1389            .snapshots
1390            .states
1391            .contains_key("Component2"));
1392        assert!(!snap
1393            .snapshots
1394            .states
1395            .contains_key("Component1"));
1396        assert!(!snap
1397            .snapshots
1398            .states
1399            .contains_key("Component3"));
1400    }
1401
1402    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1403        let mut rpc_client = make_mock_client();
1404        // Mocks for the start_tracking call, these need to come first because they are more
1405        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1406        rpc_client
1407            .expect_get_protocol_components()
1408            .with(mockall::predicate::function(
1409                move |request_params: &ProtocolComponentsRequestBody| {
1410                    if let Some(ids) = request_params.component_ids.as_ref() {
1411                        ids.contains(&"Component3".to_string())
1412                    } else {
1413                        false
1414                    }
1415                },
1416            ))
1417            .returning(|_| {
1418                // return Component3
1419                Ok(ProtocolComponentRequestResponse {
1420                    protocol_components: vec![
1421                        // this component shall have a tvl update above threshold
1422                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1423                    ],
1424                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1425                })
1426            });
1427        // Mock get_snapshots for Component3
1428        rpc_client
1429            .expect_get_snapshots()
1430            .withf(
1431                |request: &SnapshotParameters,
1432                 _chunk_size: &Option<usize>,
1433                 _concurrency: &usize| {
1434                    request
1435                        .components
1436                        .contains_key("Component3")
1437                },
1438            )
1439            .returning(|_request, _chunk_size, _concurrency| {
1440                Ok(Snapshot {
1441                    states: [(
1442                        "Component3".to_string(),
1443                        ComponentWithState {
1444                            state: ResponseProtocolState {
1445                                component_id: "Component3".to_string(),
1446                                ..Default::default()
1447                            },
1448                            component: ProtocolComponent {
1449                                id: "Component3".to_string(),
1450                                ..Default::default()
1451                            },
1452                            component_tvl: Some(1000.0),
1453                            entrypoints: vec![],
1454                        },
1455                    )]
1456                    .into_iter()
1457                    .collect(),
1458                    vm_storage: HashMap::new(),
1459                })
1460            });
1461
1462        // mock calls for the initial state snapshots
1463        rpc_client
1464            .expect_get_protocol_components()
1465            .returning(|_| {
1466                // Initial sync of components
1467                Ok(ProtocolComponentRequestResponse {
1468                    protocol_components: vec![
1469                        // this component shall have a tvl update above threshold
1470                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1471                        // this component shall have a tvl update below threshold.
1472                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1473                        // a third component will have a tvl update above threshold
1474                    ],
1475                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1476                })
1477            });
1478
1479        rpc_client
1480            .expect_get_snapshots()
1481            .returning(|_request, _chunk_size, _concurrency| {
1482                Ok(Snapshot {
1483                    states: [
1484                        (
1485                            "Component1".to_string(),
1486                            ComponentWithState {
1487                                state: ResponseProtocolState {
1488                                    component_id: "Component1".to_string(),
1489                                    ..Default::default()
1490                                },
1491                                component: ProtocolComponent {
1492                                    id: "Component1".to_string(),
1493                                    ..Default::default()
1494                                },
1495                                component_tvl: Some(100.0),
1496                                entrypoints: vec![],
1497                            },
1498                        ),
1499                        (
1500                            "Component2".to_string(),
1501                            ComponentWithState {
1502                                state: ResponseProtocolState {
1503                                    component_id: "Component2".to_string(),
1504                                    ..Default::default()
1505                                },
1506                                component: ProtocolComponent {
1507                                    id: "Component2".to_string(),
1508                                    ..Default::default()
1509                                },
1510                                component_tvl: Some(0.0),
1511                                entrypoints: vec![],
1512                            },
1513                        ),
1514                    ]
1515                    .into_iter()
1516                    .collect(),
1517                    vm_storage: HashMap::new(),
1518                })
1519            });
1520
1521        // Mock get_traced_entry_points for Ethereum chain
1522        rpc_client
1523            .expect_get_traced_entry_points()
1524            .returning(|_| {
1525                Ok(TracedEntryPointRequestResponse {
1526                    traced_entry_points: HashMap::new(),
1527                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1528                })
1529            });
1530
1531        // Mock deltas client and messages
1532        let mut deltas_client = MockDeltasClient::new();
1533        let (tx, rx) = channel(1);
1534        deltas_client
1535            .expect_subscribe()
1536            .return_once(move |_, _| {
1537                // Return subscriber id and a channel
1538                Ok((Uuid::default(), rx))
1539            });
1540
1541        // Expect unsubscribe call during cleanup
1542        deltas_client
1543            .expect_unsubscribe()
1544            .return_once(|_| Ok(()));
1545
1546        (rpc_client, deltas_client, tx)
1547    }
1548
1549    /// Test strategy
1550    ///
1551    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1552    /// - send 2 dummy messages, containing only blocks
1553    /// - third message contains a new component with some significant tvl, one initial component
1554    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1555    #[test_log::test(tokio::test)]
1556    async fn test_state_sync() {
1557        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1558        let deltas = [
1559            BlockChanges {
1560                extractor: "uniswap-v2".to_string(),
1561                chain: Chain::Ethereum,
1562                block: Block {
1563                    number: 1,
1564                    hash: Bytes::from("0x01"),
1565                    parent_hash: Bytes::from("0x00"),
1566                    chain: Chain::Ethereum,
1567                    ts: Default::default(),
1568                },
1569                revert: false,
1570                dci_update: DCIUpdate {
1571                    new_entrypoints: HashMap::from([(
1572                        "Component1".to_string(),
1573                        HashSet::from([EntryPoint {
1574                            external_id: "entrypoint_a".to_string(),
1575                            target: Bytes::from("0x0badc0ffee"),
1576                            signature: "sig()".to_string(),
1577                        }]),
1578                    )]),
1579                    new_entrypoint_params: HashMap::from([(
1580                        "entrypoint_a".to_string(),
1581                        HashSet::from([(
1582                            TracingParams::RPCTracer(RPCTracerParams {
1583                                caller: Some(Bytes::from("0x0badc0ffee")),
1584                                calldata: Bytes::from("0x0badc0ffee"),
1585                                state_overrides: None,
1586                                prune_addresses: None,
1587                            }),
1588                            "Component1".to_string(),
1589                        )]),
1590                    )]),
1591                    trace_results: HashMap::from([(
1592                        "entrypoint_a".to_string(),
1593                        TracingResult {
1594                            retriggers: HashSet::from([(
1595                                Bytes::from("0x0badc0ffee"),
1596                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1597                            )]),
1598                            accessed_slots: HashMap::from([(
1599                                Bytes::from("0x0badc0ffee"),
1600                                HashSet::from([Bytes::from("0xbadbeef0")]),
1601                            )]),
1602                        },
1603                    )]),
1604                },
1605                ..Default::default()
1606            },
1607            BlockChanges {
1608                extractor: "uniswap-v2".to_string(),
1609                chain: Chain::Ethereum,
1610                block: Block {
1611                    number: 2,
1612                    hash: Bytes::from("0x02"),
1613                    parent_hash: Bytes::from("0x01"),
1614                    chain: Chain::Ethereum,
1615                    ts: Default::default(),
1616                },
1617                revert: false,
1618                component_tvl: [
1619                    ("Component1".to_string(), 100.0),
1620                    ("Component2".to_string(), 0.0),
1621                    ("Component3".to_string(), 1000.0),
1622                ]
1623                .into_iter()
1624                .collect(),
1625                ..Default::default()
1626            },
1627        ];
1628        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1629        state_sync
1630            .initialize()
1631            .await
1632            .expect("Init failed");
1633
1634        // Test starts here
1635        let (handle, mut rx) = state_sync.start().await;
1636        let (jh, close_tx) = handle.split();
1637        tx.send(deltas[0].clone())
1638            .await
1639            .expect("deltas channel msg 0 closed!");
1640        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1641            .await
1642            .expect("waiting for first state msg timed out!")
1643            .expect("state sync block sender closed!");
1644        tx.send(deltas[1].clone())
1645            .await
1646            .expect("deltas channel msg 1 closed!");
1647        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1648            .await
1649            .expect("waiting for second state msg timed out!")
1650            .expect("state sync block sender closed!");
1651        let _ = close_tx.send(());
1652        jh.await
1653            .expect("state sync task panicked!");
1654
1655        // assertions
1656        let exp1 = StateSyncMessage {
1657            header: BlockHeader {
1658                number: 1,
1659                hash: Bytes::from("0x01"),
1660                parent_hash: Bytes::from("0x00"),
1661                revert: false,
1662                ..Default::default()
1663            },
1664            snapshots: Snapshot {
1665                states: [
1666                    (
1667                        "Component1".to_string(),
1668                        ComponentWithState {
1669                            state: ResponseProtocolState {
1670                                component_id: "Component1".to_string(),
1671                                ..Default::default()
1672                            },
1673                            component: ProtocolComponent {
1674                                id: "Component1".to_string(),
1675                                ..Default::default()
1676                            },
1677                            component_tvl: Some(100.0),
1678                            entrypoints: vec![],
1679                        },
1680                    ),
1681                    (
1682                        "Component2".to_string(),
1683                        ComponentWithState {
1684                            state: ResponseProtocolState {
1685                                component_id: "Component2".to_string(),
1686                                ..Default::default()
1687                            },
1688                            component: ProtocolComponent {
1689                                id: "Component2".to_string(),
1690                                ..Default::default()
1691                            },
1692                            component_tvl: Some(0.0),
1693                            entrypoints: vec![],
1694                        },
1695                    ),
1696                ]
1697                .into_iter()
1698                .collect(),
1699                vm_storage: HashMap::new(),
1700            },
1701            deltas: Some(deltas[0].clone()),
1702            removed_components: Default::default(),
1703        };
1704
1705        let exp2 = StateSyncMessage {
1706            header: BlockHeader {
1707                number: 2,
1708                hash: Bytes::from("0x02"),
1709                parent_hash: Bytes::from("0x01"),
1710                revert: false,
1711                ..Default::default()
1712            },
1713            snapshots: Snapshot {
1714                states: [
1715                    // This is the new component we queried once it passed the tvl threshold.
1716                    (
1717                        "Component3".to_string(),
1718                        ComponentWithState {
1719                            state: ResponseProtocolState {
1720                                component_id: "Component3".to_string(),
1721                                ..Default::default()
1722                            },
1723                            component: ProtocolComponent {
1724                                id: "Component3".to_string(),
1725                                ..Default::default()
1726                            },
1727                            component_tvl: Some(1000.0),
1728                            entrypoints: vec![],
1729                        },
1730                    ),
1731                ]
1732                .into_iter()
1733                .collect(),
1734                vm_storage: HashMap::new(),
1735            },
1736            // Our deltas are empty and since merge methods are
1737            // tested in tycho-common we don't have much to do here.
1738            deltas: Some(BlockChanges {
1739                extractor: "uniswap-v2".to_string(),
1740                chain: Chain::Ethereum,
1741                block: Block {
1742                    number: 2,
1743                    hash: Bytes::from("0x02"),
1744                    parent_hash: Bytes::from("0x01"),
1745                    chain: Chain::Ethereum,
1746                    ts: Default::default(),
1747                },
1748                revert: false,
1749                component_tvl: [
1750                    // "Component2" should not show here.
1751                    ("Component1".to_string(), 100.0),
1752                    ("Component3".to_string(), 1000.0),
1753                ]
1754                .into_iter()
1755                .collect(),
1756                ..Default::default()
1757            }),
1758            // "Component2" was removed, because its tvl changed to 0.
1759            removed_components: [(
1760                "Component2".to_string(),
1761                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1762            )]
1763            .into_iter()
1764            .collect(),
1765        };
1766        assert_eq!(first_msg.unwrap(), exp1);
1767        assert_eq!(second_msg.unwrap(), exp2);
1768    }
1769
1770    #[test_log::test(tokio::test)]
1771    async fn test_state_sync_with_tvl_range() {
1772        // Define the range for testing
1773        let remove_tvl_threshold = 5.0;
1774        let add_tvl_threshold = 7.0;
1775
1776        let mut rpc_client = make_mock_client();
1777        let mut deltas_client = MockDeltasClient::new();
1778
1779        rpc_client
1780            .expect_get_protocol_components()
1781            .with(mockall::predicate::function(
1782                move |request_params: &ProtocolComponentsRequestBody| {
1783                    if let Some(ids) = request_params.component_ids.as_ref() {
1784                        ids.contains(&"Component3".to_string())
1785                    } else {
1786                        false
1787                    }
1788                },
1789            ))
1790            .returning(|_| {
1791                Ok(ProtocolComponentRequestResponse {
1792                    protocol_components: vec![ProtocolComponent {
1793                        id: "Component3".to_string(),
1794                        ..Default::default()
1795                    }],
1796                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1797                })
1798            });
1799        // Mock get_snapshots for Component3
1800        rpc_client
1801            .expect_get_snapshots()
1802            .withf(
1803                |request: &SnapshotParameters,
1804                 _chunk_size: &Option<usize>,
1805                 _concurrency: &usize| {
1806                    request
1807                        .components
1808                        .contains_key("Component3")
1809                },
1810            )
1811            .returning(|_request, _chunk_size, _concurrency| {
1812                Ok(Snapshot {
1813                    states: [(
1814                        "Component3".to_string(),
1815                        ComponentWithState {
1816                            state: ResponseProtocolState {
1817                                component_id: "Component3".to_string(),
1818                                ..Default::default()
1819                            },
1820                            component: ProtocolComponent {
1821                                id: "Component3".to_string(),
1822                                ..Default::default()
1823                            },
1824                            component_tvl: Some(10.0),
1825                            entrypoints: vec![],
1826                        },
1827                    )]
1828                    .into_iter()
1829                    .collect(),
1830                    vm_storage: HashMap::new(),
1831                })
1832            });
1833
1834        // Mock for the initial snapshot retrieval
1835        rpc_client
1836            .expect_get_protocol_components()
1837            .returning(|_| {
1838                Ok(ProtocolComponentRequestResponse {
1839                    protocol_components: vec![
1840                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1841                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1842                    ],
1843                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1844                })
1845            });
1846
1847        // Mock get_snapshots for initial snapshot
1848        rpc_client
1849            .expect_get_snapshots()
1850            .returning(|_request, _chunk_size, _concurrency| {
1851                Ok(Snapshot {
1852                    states: [
1853                        (
1854                            "Component1".to_string(),
1855                            ComponentWithState {
1856                                state: ResponseProtocolState {
1857                                    component_id: "Component1".to_string(),
1858                                    ..Default::default()
1859                                },
1860                                component: ProtocolComponent {
1861                                    id: "Component1".to_string(),
1862                                    ..Default::default()
1863                                },
1864                                component_tvl: Some(6.0),
1865                                entrypoints: vec![],
1866                            },
1867                        ),
1868                        (
1869                            "Component2".to_string(),
1870                            ComponentWithState {
1871                                state: ResponseProtocolState {
1872                                    component_id: "Component2".to_string(),
1873                                    ..Default::default()
1874                                },
1875                                component: ProtocolComponent {
1876                                    id: "Component2".to_string(),
1877                                    ..Default::default()
1878                                },
1879                                component_tvl: Some(2.0),
1880                                entrypoints: vec![],
1881                            },
1882                        ),
1883                    ]
1884                    .into_iter()
1885                    .collect(),
1886                    vm_storage: HashMap::new(),
1887                })
1888            });
1889
1890        // Mock get_traced_entry_points for Ethereum chain
1891        rpc_client
1892            .expect_get_traced_entry_points()
1893            .returning(|_| {
1894                Ok(TracedEntryPointRequestResponse {
1895                    traced_entry_points: HashMap::new(),
1896                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1897                })
1898            });
1899
1900        let (tx, rx) = channel(1);
1901        deltas_client
1902            .expect_subscribe()
1903            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1904
1905        // Expect unsubscribe call during cleanup
1906        deltas_client
1907            .expect_unsubscribe()
1908            .return_once(|_| Ok(()));
1909
1910        let mut state_sync = ProtocolStateSynchronizer::new(
1911            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1912            true,
1913            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1914            1,
1915            Duration::from_secs(0),
1916            true,
1917            true,
1918            true,
1919            ArcRPCClient(Arc::new(rpc_client)),
1920            ArcDeltasClient(Arc::new(deltas_client)),
1921            10_u64,
1922        );
1923        state_sync
1924            .initialize()
1925            .await
1926            .expect("Init failed");
1927
1928        // Simulate the incoming BlockChanges
1929        let deltas = [
1930            BlockChanges {
1931                extractor: "uniswap-v2".to_string(),
1932                chain: Chain::Ethereum,
1933                block: Block {
1934                    number: 1,
1935                    hash: Bytes::from("0x01"),
1936                    parent_hash: Bytes::from("0x00"),
1937                    chain: Chain::Ethereum,
1938                    ts: Default::default(),
1939                },
1940                revert: false,
1941                ..Default::default()
1942            },
1943            BlockChanges {
1944                extractor: "uniswap-v2".to_string(),
1945                chain: Chain::Ethereum,
1946                block: Block {
1947                    number: 2,
1948                    hash: Bytes::from("0x02"),
1949                    parent_hash: Bytes::from("0x01"),
1950                    chain: Chain::Ethereum,
1951                    ts: Default::default(),
1952                },
1953                revert: false,
1954                component_tvl: [
1955                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1956                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1957                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1958                ]
1959                .into_iter()
1960                .collect(),
1961                ..Default::default()
1962            },
1963        ];
1964
1965        let (handle, mut rx) = state_sync.start().await;
1966        let (jh, close_tx) = handle.split();
1967
1968        // Simulate sending delta messages
1969        tx.send(deltas[0].clone())
1970            .await
1971            .expect("deltas channel msg 0 closed!");
1972
1973        // Expecting to receive the initial state message
1974        let _ = timeout(Duration::from_millis(100), rx.recv())
1975            .await
1976            .expect("waiting for first state msg timed out!")
1977            .expect("state sync block sender closed!");
1978
1979        // Send the third message, which should trigger TVL-based changes
1980        tx.send(deltas[1].clone())
1981            .await
1982            .expect("deltas channel msg 1 closed!");
1983        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1984            .await
1985            .expect("waiting for second state msg timed out!")
1986            .expect("state sync block sender closed!")
1987            .expect("no error");
1988
1989        let _ = close_tx.send(());
1990        jh.await
1991            .expect("state sync task panicked!");
1992
1993        let expected_second_msg = StateSyncMessage {
1994            header: BlockHeader {
1995                number: 2,
1996                hash: Bytes::from("0x02"),
1997                parent_hash: Bytes::from("0x01"),
1998                revert: false,
1999                ..Default::default()
2000            },
2001            snapshots: Snapshot {
2002                states: [(
2003                    "Component3".to_string(),
2004                    ComponentWithState {
2005                        state: ResponseProtocolState {
2006                            component_id: "Component3".to_string(),
2007                            ..Default::default()
2008                        },
2009                        component: ProtocolComponent {
2010                            id: "Component3".to_string(),
2011                            ..Default::default()
2012                        },
2013                        component_tvl: Some(10.0),
2014                        entrypoints: vec![], // TODO: add entrypoints?
2015                    },
2016                )]
2017                .into_iter()
2018                .collect(),
2019                vm_storage: HashMap::new(),
2020            },
2021            deltas: Some(BlockChanges {
2022                extractor: "uniswap-v2".to_string(),
2023                chain: Chain::Ethereum,
2024                block: Block {
2025                    number: 2,
2026                    hash: Bytes::from("0x02"),
2027                    parent_hash: Bytes::from("0x01"),
2028                    chain: Chain::Ethereum,
2029                    ts: Default::default(),
2030                },
2031                revert: false,
2032                component_tvl: [
2033                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
2034                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
2035                ]
2036                .into_iter()
2037                .collect(),
2038                ..Default::default()
2039            }),
2040            removed_components: [(
2041                "Component2".to_string(),
2042                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
2043            )]
2044            .into_iter()
2045            .collect(),
2046        };
2047
2048        assert_eq!(second_msg, expected_second_msg);
2049    }
2050
2051    #[test_log::test(tokio::test)]
2052    async fn test_public_close_api_functionality() {
2053        // Tests the public close() API through the StateSynchronizer trait:
2054        // - close() fails before start() is called
2055        // - close() succeeds while synchronizer is running
2056        // - close() fails after already closed
2057        // This tests the full start/close lifecycle via the public API
2058
2059        let mut rpc_client = make_mock_client();
2060        let mut deltas_client = MockDeltasClient::new();
2061
2062        // Mock the initial components call
2063        rpc_client
2064            .expect_get_protocol_components()
2065            .returning(|_| {
2066                Ok(ProtocolComponentRequestResponse {
2067                    protocol_components: vec![],
2068                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2069                })
2070            });
2071
2072        // Set up deltas client that will wait for messages (blocking in state_sync)
2073        let (_tx, rx) = channel(1);
2074        deltas_client
2075            .expect_subscribe()
2076            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2077
2078        // Expect unsubscribe call during cleanup
2079        deltas_client
2080            .expect_unsubscribe()
2081            .return_once(|_| Ok(()));
2082
2083        let mut state_sync = ProtocolStateSynchronizer::new(
2084            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2085            true,
2086            ComponentFilter::with_tvl_range(0.0, 0.0),
2087            5, // Enough retries
2088            Duration::from_secs(0),
2089            true,
2090            false,
2091            true,
2092            ArcRPCClient(Arc::new(rpc_client)),
2093            ArcDeltasClient(Arc::new(deltas_client)),
2094            10000_u64, // Long timeout so task doesn't exit on its own
2095        );
2096
2097        state_sync
2098            .initialize()
2099            .await
2100            .expect("Init should succeed");
2101
2102        // Start the synchronizer and test the new split-based close mechanism
2103        let (handle, _rx) = state_sync.start().await;
2104        let (jh, close_tx) = handle.split();
2105
2106        // Give it time to start up and enter state_sync
2107        tokio::time::sleep(Duration::from_millis(100)).await;
2108
2109        // Send close signal should succeed
2110        close_tx
2111            .send(())
2112            .expect("Should be able to send close signal");
2113        // Task should stop cleanly
2114        jh.await.expect("Task should not panic");
2115    }
2116
2117    #[test_log::test(tokio::test)]
2118    async fn test_cleanup_runs_when_state_sync_processing_errors() {
2119        // Tests that cleanup code runs when state_sync() errors during delta processing.
2120        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
2121        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
2122
2123        let mut rpc_client = make_mock_client();
2124        let mut deltas_client = MockDeltasClient::new();
2125
2126        // Mock the initial components call
2127        rpc_client
2128            .expect_get_protocol_components()
2129            .returning(|_| {
2130                Ok(ProtocolComponentRequestResponse {
2131                    protocol_components: vec![],
2132                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2133                })
2134            });
2135
2136        // Mock to fail during snapshot retrieval (this will cause an error during processing)
2137        rpc_client
2138            .expect_get_protocol_states()
2139            .returning(|_| {
2140                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
2141            });
2142
2143        // Set up deltas client to send one message that will trigger snapshot retrieval
2144        let (tx, rx) = channel(10);
2145        deltas_client
2146            .expect_subscribe()
2147            .return_once(move |_, _| {
2148                // Send a delta message that will require a snapshot
2149                let delta = BlockChanges {
2150                    extractor: "test".to_string(),
2151                    chain: Chain::Ethereum,
2152                    block: Block {
2153                        hash: Bytes::from("0x0123"),
2154                        number: 1,
2155                        parent_hash: Bytes::from("0x0000"),
2156                        chain: Chain::Ethereum,
2157                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2158                            .unwrap()
2159                            .naive_utc(),
2160                    },
2161                    revert: false,
2162                    // Add a new component to trigger snapshot request
2163                    new_protocol_components: [(
2164                        "new_component".to_string(),
2165                        ProtocolComponent {
2166                            id: "new_component".to_string(),
2167                            protocol_system: "test_protocol".to_string(),
2168                            protocol_type_name: "test".to_string(),
2169                            chain: Chain::Ethereum,
2170                            tokens: vec![Bytes::from("0x0badc0ffee")],
2171                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
2172                            static_attributes: Default::default(),
2173                            creation_tx: Default::default(),
2174                            created_at: Default::default(),
2175                            change: Default::default(),
2176                        },
2177                    )]
2178                    .into_iter()
2179                    .collect(),
2180                    component_tvl: [("new_component".to_string(), 100.0)]
2181                        .into_iter()
2182                        .collect(),
2183                    ..Default::default()
2184                };
2185
2186                tokio::spawn(async move {
2187                    let _ = tx.send(delta).await;
2188                    // Close the channel after sending one message
2189                });
2190
2191                Ok((Uuid::default(), rx))
2192            });
2193
2194        // Expect unsubscribe call during cleanup
2195        deltas_client
2196            .expect_unsubscribe()
2197            .return_once(|_| Ok(()));
2198
2199        let mut state_sync = ProtocolStateSynchronizer::new(
2200            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2201            true,
2202            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2203            1,
2204            Duration::from_secs(0),
2205            true,
2206            false,
2207            true,
2208            ArcRPCClient(Arc::new(rpc_client)),
2209            ArcDeltasClient(Arc::new(deltas_client)),
2210            5000_u64,
2211        );
2212
2213        state_sync
2214            .initialize()
2215            .await
2216            .expect("Init should succeed");
2217
2218        // Before calling state_sync, set a value in last_synced_block
2219        state_sync.last_synced_block = Some(BlockHeader {
2220            hash: Bytes::from("0x0badc0ffee"),
2221            number: 42,
2222            parent_hash: Bytes::from("0xbadbeef0"),
2223            revert: false,
2224            timestamp: 123456789,
2225            partial_block_index: None,
2226        });
2227
2228        // Create a channel for state_sync to send messages to
2229        let (mut block_tx, _block_rx) = channel(10);
2230
2231        // Call state_sync directly - this should error during processing
2232        let (_end_tx, end_rx) = oneshot::channel::<()>();
2233        let result = state_sync
2234            .state_sync(&mut block_tx, end_rx)
2235            .await;
2236        // Verify that state_sync returned an error
2237        assert!(result.is_err(), "state_sync should have errored during processing");
2238
2239        // Note: We can't verify internal state cleanup since state_sync consumes self,
2240        // but the cleanup logic is still tested by the fact that the method returns properly.
2241    }
2242
2243    #[test_log::test(tokio::test)]
2244    async fn test_close_signal_while_waiting_for_first_deltas() {
2245        // Tests close signal handling during the initial "waiting for deltas" phase.
2246        // This is the earliest possible close scenario - before any deltas are received.
2247        // Verifies: close signal received while waiting for first message triggers cleanup
2248        let mut rpc_client = make_mock_client();
2249        let mut deltas_client = MockDeltasClient::new();
2250
2251        rpc_client
2252            .expect_get_protocol_components()
2253            .returning(|_| {
2254                Ok(ProtocolComponentRequestResponse {
2255                    protocol_components: vec![],
2256                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2257                })
2258            });
2259
2260        let (_tx, rx) = channel(1);
2261        deltas_client
2262            .expect_subscribe()
2263            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2264
2265        deltas_client
2266            .expect_unsubscribe()
2267            .return_once(|_| Ok(()));
2268
2269        let mut state_sync = ProtocolStateSynchronizer::new(
2270            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2271            true,
2272            ComponentFilter::with_tvl_range(0.0, 0.0),
2273            1,
2274            Duration::from_secs(0),
2275            true,
2276            true,
2277            false,
2278            ArcRPCClient(Arc::new(rpc_client)),
2279            ArcDeltasClient(Arc::new(deltas_client)),
2280            10000_u64,
2281        );
2282
2283        state_sync
2284            .initialize()
2285            .await
2286            .expect("Init should succeed");
2287
2288        let (mut block_tx, _block_rx) = channel(10);
2289        let (end_tx, end_rx) = oneshot::channel::<()>();
2290
2291        // Start state_sync in a task
2292        let state_sync_handle = tokio::spawn(async move {
2293            state_sync
2294                .state_sync(&mut block_tx, end_rx)
2295                .await
2296        });
2297
2298        // Give it a moment to start
2299        tokio::time::sleep(Duration::from_millis(100)).await;
2300
2301        // Send close signal
2302        let _ = end_tx.send(());
2303
2304        // state_sync should exit cleanly
2305        let result = state_sync_handle
2306            .await
2307            .expect("Task should not panic");
2308        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2309
2310        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2311    }
2312
2313    #[test_log::test(tokio::test)]
2314    async fn test_close_signal_during_main_processing_loop() {
2315        // Tests close signal handling during the main delta processing loop.
2316        // This tests the scenario where first message is processed successfully,
2317        // then close signal is received while waiting for subsequent deltas.
2318        // Verifies: close signal in main loop (after initialization) triggers cleanup
2319
2320        let mut rpc_client = make_mock_client();
2321        let mut deltas_client = MockDeltasClient::new();
2322
2323        // Mock the initial components call
2324        rpc_client
2325            .expect_get_protocol_components()
2326            .returning(|_| {
2327                Ok(ProtocolComponentRequestResponse {
2328                    protocol_components: vec![],
2329                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2330                })
2331            });
2332
2333        // Mock the snapshot retrieval that happens after first message
2334        rpc_client
2335            .expect_get_protocol_states()
2336            .returning(|_| {
2337                Ok(ProtocolStateRequestResponse {
2338                    states: vec![],
2339                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2340                })
2341            });
2342
2343        rpc_client
2344            .expect_get_component_tvl()
2345            .returning(|_| {
2346                Ok(ComponentTvlRequestResponse {
2347                    tvl: HashMap::new(),
2348                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2349                })
2350            });
2351
2352        rpc_client
2353            .expect_get_traced_entry_points()
2354            .returning(|_| {
2355                Ok(TracedEntryPointRequestResponse {
2356                    traced_entry_points: HashMap::new(),
2357                    pagination: PaginationResponse::new(0, 20, 0),
2358                })
2359            });
2360
2361        // Set up deltas client to send one message, then keep channel open
2362        let (tx, rx) = channel(10);
2363        deltas_client
2364            .expect_subscribe()
2365            .return_once(move |_, _| {
2366                // Send first message immediately
2367                let first_delta = BlockChanges {
2368                    extractor: "test".to_string(),
2369                    chain: Chain::Ethereum,
2370                    block: Block {
2371                        hash: Bytes::from("0x0123"),
2372                        number: 1,
2373                        parent_hash: Bytes::from("0x0000"),
2374                        chain: Chain::Ethereum,
2375                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2376                            .unwrap()
2377                            .naive_utc(),
2378                    },
2379                    revert: false,
2380                    ..Default::default()
2381                };
2382
2383                tokio::spawn(async move {
2384                    let _ = tx.send(first_delta).await;
2385                    // Keep the sender alive but don't send more messages
2386                    // This will make the recv() block waiting for the next message
2387                    tokio::time::sleep(Duration::from_secs(30)).await;
2388                });
2389
2390                Ok((Uuid::default(), rx))
2391            });
2392
2393        deltas_client
2394            .expect_unsubscribe()
2395            .return_once(|_| Ok(()));
2396
2397        let mut state_sync = ProtocolStateSynchronizer::new(
2398            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2399            true,
2400            ComponentFilter::with_tvl_range(0.0, 1000.0),
2401            1,
2402            Duration::from_secs(0),
2403            true,
2404            false,
2405            true,
2406            ArcRPCClient(Arc::new(rpc_client)),
2407            ArcDeltasClient(Arc::new(deltas_client)),
2408            10000_u64,
2409        );
2410
2411        state_sync
2412            .initialize()
2413            .await
2414            .expect("Init should succeed");
2415
2416        let (mut block_tx, mut block_rx) = channel(10);
2417        let (end_tx, end_rx) = oneshot::channel::<()>();
2418
2419        // Start state_sync in a task
2420        let state_sync_handle = tokio::spawn(async move {
2421            state_sync
2422                .state_sync(&mut block_tx, end_rx)
2423                .await
2424        });
2425
2426        // Wait for the first message to be processed (snapshot sent)
2427        let first_snapshot = block_rx
2428            .recv()
2429            .await
2430            .expect("Should receive first snapshot")
2431            .expect("Synchronizer error");
2432        assert!(
2433            !first_snapshot
2434                .snapshots
2435                .states
2436                .is_empty() ||
2437                first_snapshot.deltas.is_some()
2438        );
2439        // Now send close signal - this should be handled in the main processing loop
2440        let _ = end_tx.send(());
2441
2442        // state_sync should exit cleanly after receiving close signal in main loop
2443        let result = state_sync_handle
2444            .await
2445            .expect("Task should not panic");
2446        assert!(
2447            result.is_ok(),
2448            "state_sync should exit cleanly when closed after first message: {result:?}"
2449        );
2450    }
2451
2452    #[test_log::test(tokio::test)]
2453    async fn test_max_retries_exceeded_error_propagation() {
2454        // Test that when max_retries is exceeded, the final error is sent through the channel
2455        // to the receiver and the synchronizer task exits cleanly
2456
2457        let mut rpc_client = make_mock_client();
2458        let mut deltas_client = MockDeltasClient::new();
2459
2460        // Mock the initial components call to succeed
2461        rpc_client
2462            .expect_get_protocol_components()
2463            .returning(|_| {
2464                Ok(ProtocolComponentRequestResponse {
2465                    protocol_components: vec![],
2466                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2467                })
2468            });
2469
2470        // Set up deltas client to consistently fail after subscription
2471        // This will cause connection errors and trigger retries
2472        deltas_client
2473            .expect_subscribe()
2474            .returning(|_, _| {
2475                // Return a connection error to trigger retries
2476                Err(DeltasError::NotConnected)
2477            });
2478
2479        // Expect multiple unsubscribe calls during retries
2480        deltas_client
2481            .expect_unsubscribe()
2482            .returning(|_| Ok(()))
2483            .times(0..=5);
2484
2485        // Create synchronizer with only 2 retries and short cooldown
2486        let mut state_sync = ProtocolStateSynchronizer::new(
2487            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2488            true,
2489            ComponentFilter::with_tvl_range(0.0, 1000.0),
2490            2,                         // max_retries = 2
2491            Duration::from_millis(10), // short retry cooldown
2492            true,
2493            false,
2494            true,
2495            ArcRPCClient(Arc::new(rpc_client)),
2496            ArcDeltasClient(Arc::new(deltas_client)),
2497            1000_u64,
2498        );
2499
2500        state_sync
2501            .initialize()
2502            .await
2503            .expect("Init should succeed");
2504
2505        // Start the synchronizer - it should fail to subscribe and retry
2506        let (handle, mut rx) = state_sync.start().await;
2507        let (jh, _close_tx) = handle.split();
2508
2509        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2510            .await
2511            .expect("responsds in time")
2512            .expect("channel open");
2513
2514        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2515        if let Err(err) = res {
2516            assert!(
2517                matches!(err, SynchronizerError::ConnectionClosed),
2518                "Expected ConnectionClosed error, got: {:?}",
2519                err
2520            );
2521        } else {
2522            panic!("Expected an error")
2523        }
2524
2525        // The task should complete (not hang) after max retries
2526        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2527        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2528    }
2529
2530    #[test_log::test(tokio::test)]
2531    async fn test_is_next_expected() {
2532        // Test the is_next_expected function to ensure it correctly identifies
2533        // when an incoming block is the expected next block in the chain
2534
2535        let mut state_sync = with_mocked_clients(true, false, None, None);
2536
2537        // Test 1: No previous block - should return false
2538        let incoming_header = BlockHeader {
2539            number: 100,
2540            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2541            parent_hash: Bytes::from(
2542                "0x0000000000000000000000000000000000000000000000000000000000000000",
2543            ),
2544            revert: false,
2545            timestamp: 123456789,
2546            partial_block_index: None,
2547        };
2548        assert!(
2549            !state_sync.is_next_expected(&incoming_header),
2550            "Should return false when no previous block is set"
2551        );
2552
2553        // Test 2: Set a previous block and test with matching parent hash
2554        let previous_header = BlockHeader {
2555            number: 99,
2556            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2557            parent_hash: Bytes::from(
2558                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2559            ),
2560            revert: false,
2561            timestamp: 123456788,
2562            partial_block_index: None,
2563        };
2564        state_sync.last_synced_block = Some(previous_header.clone());
2565
2566        assert!(
2567            state_sync.is_next_expected(&incoming_header),
2568            "Should return true when incoming parent_hash matches previous hash"
2569        );
2570
2571        // Test 3: Test with non-matching parent hash
2572        let non_matching_header = BlockHeader {
2573            number: 100,
2574            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2575            parent_hash: Bytes::from(
2576                "0x1111111111111111111111111111111111111111111111111111111111111111",
2577            ), // Wrong parent hash
2578            revert: false,
2579            timestamp: 123456789,
2580            partial_block_index: None,
2581        };
2582        assert!(
2583            !state_sync.is_next_expected(&non_matching_header),
2584            "Should return false when incoming parent_hash doesn't match previous hash"
2585        );
2586    }
2587
2588    #[test_log::test(tokio::test)]
2589    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2590        // Test that on synchronizer restart with the next expected block,
2591        // get_snapshot is not called and only deltas are sent
2592
2593        let mut rpc_client = make_mock_client();
2594        let mut deltas_client = MockDeltasClient::new();
2595
2596        // Mock the initial components call
2597        rpc_client
2598            .expect_get_protocol_components()
2599            .returning(|_| {
2600                Ok(ProtocolComponentRequestResponse {
2601                    protocol_components: vec![ProtocolComponent {
2602                        id: "Component1".to_string(),
2603                        ..Default::default()
2604                    }],
2605                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2606                })
2607            });
2608
2609        // Set up deltas client to send a message that is the next expected block
2610        let (tx, rx) = channel(10);
2611        deltas_client
2612            .expect_subscribe()
2613            .return_once(move |_, _| {
2614                let expected_next_delta = BlockChanges {
2615                    extractor: "uniswap-v2".to_string(),
2616                    chain: Chain::Ethereum,
2617                    block: Block {
2618                        hash: Bytes::from(
2619                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2620                        ), // This will be the next expected block
2621                        number: 2,
2622                        parent_hash: Bytes::from(
2623                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2624                        ), // This matches our last synced block hash
2625                        chain: Chain::Ethereum,
2626                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2627                            .unwrap()
2628                            .naive_utc(),
2629                    },
2630                    revert: false,
2631                    ..Default::default()
2632                };
2633
2634                tokio::spawn(async move {
2635                    let _ = tx.send(expected_next_delta).await;
2636                });
2637
2638                Ok((Uuid::default(), rx))
2639            });
2640
2641        deltas_client
2642            .expect_unsubscribe()
2643            .return_once(|_| Ok(()));
2644
2645        let mut state_sync = ProtocolStateSynchronizer::new(
2646            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2647            true,
2648            ComponentFilter::with_tvl_range(0.0, 1000.0),
2649            1,
2650            Duration::from_secs(0),
2651            true, // include_snapshots = true
2652            false,
2653            true,
2654            ArcRPCClient(Arc::new(rpc_client)),
2655            ArcDeltasClient(Arc::new(deltas_client)),
2656            10000_u64,
2657        );
2658
2659        // Initialize and set up the last synced block to simulate a restart scenario
2660        state_sync
2661            .initialize()
2662            .await
2663            .expect("Init should succeed");
2664
2665        // Set last_synced_block to simulate that we've previously synced block 1
2666        state_sync.last_synced_block = Some(BlockHeader {
2667            number: 1,
2668            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2669            parent_hash: Bytes::from(
2670                "0x0000000000000000000000000000000000000000000000000000000000000000",
2671            ),
2672            revert: false,
2673            timestamp: 123456789,
2674            partial_block_index: None,
2675        });
2676
2677        let (mut block_tx, mut block_rx) = channel(10);
2678        let (end_tx, end_rx) = oneshot::channel::<()>();
2679
2680        // Start state_sync
2681        let state_sync_handle = tokio::spawn(async move {
2682            state_sync
2683                .state_sync(&mut block_tx, end_rx)
2684                .await
2685        });
2686
2687        // Wait for the message - it should be a delta-only message (no snapshots)
2688        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2689            .await
2690            .expect("Should receive message within timeout")
2691            .expect("Channel should be open")
2692            .expect("Should not be an error");
2693
2694        // Send close signal
2695        let _ = end_tx.send(());
2696
2697        // Wait for state_sync to finish
2698        let _ = state_sync_handle
2699            .await
2700            .expect("Task should not panic");
2701
2702        // Verify the message contains deltas but no snapshots
2703        // (because we skipped snapshot retrieval)
2704        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2705        assert!(
2706            result_msg.snapshots.states.is_empty(),
2707            "Should not contain snapshots when next expected block is received"
2708        );
2709
2710        // Verify the block details match our expected next block
2711        if let Some(deltas) = &result_msg.deltas {
2712            assert_eq!(deltas.block.number, 2);
2713            assert_eq!(
2714                deltas.block.hash,
2715                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2716            );
2717            assert_eq!(
2718                deltas.block.parent_hash,
2719                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2720            );
2721        }
2722    }
2723
2724    #[test_log::test(tokio::test)]
2725    async fn test_skip_previously_processed_messages() {
2726        // Test that the synchronizer skips messages for blocks that have already been processed
2727        // This simulates a service restart scenario where old messages are re-emitted
2728
2729        let mut rpc_client = make_mock_client();
2730        let mut deltas_client = MockDeltasClient::new();
2731
2732        // Mock the initial components call
2733        rpc_client
2734            .expect_get_protocol_components()
2735            .returning(|_| {
2736                Ok(ProtocolComponentRequestResponse {
2737                    protocol_components: vec![ProtocolComponent {
2738                        id: "Component1".to_string(),
2739                        ..Default::default()
2740                    }],
2741                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2742                })
2743            });
2744
2745        // Mock snapshot calls for when we process the expected next block (block 6)
2746        rpc_client
2747            .expect_get_protocol_states()
2748            .returning(|_| {
2749                Ok(ProtocolStateRequestResponse {
2750                    states: vec![ResponseProtocolState {
2751                        component_id: "Component1".to_string(),
2752                        ..Default::default()
2753                    }],
2754                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2755                })
2756            });
2757
2758        rpc_client
2759            .expect_get_component_tvl()
2760            .returning(|_| {
2761                Ok(ComponentTvlRequestResponse {
2762                    tvl: [("Component1".to_string(), 100.0)]
2763                        .into_iter()
2764                        .collect(),
2765                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2766                })
2767            });
2768
2769        rpc_client
2770            .expect_get_traced_entry_points()
2771            .returning(|_| {
2772                Ok(TracedEntryPointRequestResponse {
2773                    traced_entry_points: HashMap::new(),
2774                    pagination: PaginationResponse::new(0, 20, 0),
2775                })
2776            });
2777
2778        // Set up deltas client to send old messages first, then the expected next block
2779        let (tx, rx) = channel(10);
2780        deltas_client
2781            .expect_subscribe()
2782            .return_once(move |_, _| {
2783                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2784                let old_messages = vec![
2785                    BlockChanges {
2786                        extractor: "uniswap-v2".to_string(),
2787                        chain: Chain::Ethereum,
2788                        block: Block {
2789                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2790                            number: 3,
2791                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2792                            chain: Chain::Ethereum,
2793                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2794                        },
2795                        revert: false,
2796                        ..Default::default()
2797                    },
2798                    BlockChanges {
2799                        extractor: "uniswap-v2".to_string(),
2800                        chain: Chain::Ethereum,
2801                        block: Block {
2802                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2803                            number: 4,
2804                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2805                            chain: Chain::Ethereum,
2806                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2807                        },
2808                        revert: false,
2809                        ..Default::default()
2810                    },
2811                    BlockChanges {
2812                        extractor: "uniswap-v2".to_string(),
2813                        chain: Chain::Ethereum,
2814                        block: Block {
2815                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2816                            number: 5,
2817                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2818                            chain: Chain::Ethereum,
2819                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2820                        },
2821                        revert: false,
2822                        ..Default::default()
2823                    },
2824                    // This is the expected next block (block 6)
2825                    BlockChanges {
2826                        extractor: "uniswap-v2".to_string(),
2827                        chain: Chain::Ethereum,
2828                        block: Block {
2829                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2830                            number: 6,
2831                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2832                            chain: Chain::Ethereum,
2833                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2834                        },
2835                        revert: false,
2836                        ..Default::default()
2837                    },
2838                ];
2839
2840                tokio::spawn(async move {
2841                    for message in old_messages {
2842                        let _ = tx.send(message).await;
2843                        tokio::time::sleep(Duration::from_millis(10)).await;
2844                    }
2845                });
2846
2847                Ok((Uuid::default(), rx))
2848            });
2849
2850        deltas_client
2851            .expect_unsubscribe()
2852            .return_once(|_| Ok(()));
2853
2854        let mut state_sync = ProtocolStateSynchronizer::new(
2855            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2856            true,
2857            ComponentFilter::with_tvl_range(0.0, 1000.0),
2858            1,
2859            Duration::from_secs(0),
2860            true,
2861            true,
2862            true,
2863            ArcRPCClient(Arc::new(rpc_client)),
2864            ArcDeltasClient(Arc::new(deltas_client)),
2865            10000_u64,
2866        );
2867
2868        // Initialize and set last_synced_block to simulate we've already processed block 5
2869        state_sync
2870            .initialize()
2871            .await
2872            .expect("Init should succeed");
2873
2874        state_sync.last_synced_block = Some(BlockHeader {
2875            number: 5,
2876            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2877            parent_hash: Bytes::from(
2878                "0x0000000000000000000000000000000000000000000000000000000000000004",
2879            ),
2880            revert: false,
2881            timestamp: 1234567892,
2882            partial_block_index: None,
2883        });
2884
2885        let (mut block_tx, mut block_rx) = channel(10);
2886        let (end_tx, end_rx) = oneshot::channel::<()>();
2887
2888        // Start state_sync
2889        let state_sync_handle = tokio::spawn(async move {
2890            state_sync
2891                .state_sync(&mut block_tx, end_rx)
2892                .await
2893        });
2894
2895        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2896        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2897            .await
2898            .expect("Should receive message within timeout")
2899            .expect("Channel should be open")
2900            .expect("Should not be an error");
2901
2902        // Send close signal
2903        let _ = end_tx.send(());
2904
2905        // Wait for state_sync to finish
2906        let _ = state_sync_handle
2907            .await
2908            .expect("Task should not panic");
2909
2910        // Verify we only got the message for block 6 (the expected next block)
2911        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2912        if let Some(deltas) = &result_msg.deltas {
2913            assert_eq!(
2914                deltas.block.number, 6,
2915                "Should only process block 6, skipping earlier blocks"
2916            );
2917            assert_eq!(
2918                deltas.block.hash,
2919                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2920            );
2921        }
2922
2923        // Verify that no additional messages are received immediately
2924        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2925        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2926            Err(_) => {
2927                // Timeout is expected - no more messages should come
2928            }
2929            Ok(Some(Err(_))) => {
2930                // Error received is also acceptable (connection closed)
2931            }
2932            Ok(Some(Ok(_))) => {
2933                panic!("Should not receive additional messages - old blocks should be skipped");
2934            }
2935            Ok(None) => {
2936                // Channel closed is also acceptable
2937            }
2938        }
2939    }
2940
2941    fn make_block_changes(block_num: u64, partial_idx: Option<u32>) -> BlockChanges {
2942        // Use vec to create Bytes from block number
2943        let hash = Bytes::from(vec![block_num as u8; 32]);
2944        let parent_hash = Bytes::from(vec![block_num.saturating_sub(1) as u8; 32]);
2945        BlockChanges {
2946            extractor: "uniswap-v2".to_string(),
2947            chain: Chain::Ethereum,
2948            block: Block {
2949                number: block_num,
2950                hash,
2951                parent_hash,
2952                chain: Chain::Ethereum,
2953                ts: Default::default(),
2954            },
2955            revert: false,
2956            partial_block_index: partial_idx,
2957            ..Default::default()
2958        }
2959    }
2960
2961    /// Test that full block as first message in partial mode is accepted
2962    #[test_log::test(tokio::test)]
2963    async fn test_partial_mode_accepts_full_block_as_first_message() {
2964        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2965        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2966            .with_partial_blocks(true);
2967        state_sync
2968            .initialize()
2969            .await
2970            .expect("Init failed");
2971
2972        let (handle, mut block_rx) = state_sync.start().await;
2973        let (jh, close_tx) = handle.split();
2974
2975        // Send full block as first message - should be accepted
2976        tx.send(make_block_changes(1, None))
2977            .await
2978            .unwrap();
2979
2980        // Should receive the full block immediately
2981        let msg = timeout(Duration::from_millis(100), block_rx.recv())
2982            .await
2983            .expect("Should receive message")
2984            .expect("Channel open")
2985            .expect("No error");
2986
2987        assert_eq!(msg.header.number, 1, "Should use block 1 (full block)");
2988        assert_eq!(msg.header.partial_block_index, None, "Should be a full block");
2989
2990        let _ = close_tx.send(());
2991        jh.await.expect("Task should not panic");
2992    }
2993
2994    /// Test that block number increase is detected as new block
2995    #[test_log::test(tokio::test)]
2996    async fn test_partial_mode_detects_block_number_increase() {
2997        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
2998        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
2999            .with_partial_blocks(true);
3000        state_sync
3001            .initialize()
3002            .await
3003            .expect("Init failed");
3004
3005        let (handle, mut block_rx) = state_sync.start().await;
3006        let (jh, close_tx) = handle.split();
3007
3008        // Send partial messages for block 1 (will be skipped - waiting for new block)
3009        tx.send(make_block_changes(1, Some(0)))
3010            .await
3011            .unwrap();
3012        tx.send(make_block_changes(1, Some(3)))
3013            .await
3014            .unwrap();
3015
3016        // Verify no message received yet
3017        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3018            Err(_) => { /* Expected: timeout, no message yet */ }
3019            Ok(_) => panic!("Should not receive message while waiting for new block"),
3020        }
3021
3022        // Send partial for block 2 with HIGHER index (5 > 3) - should still be detected
3023        // because block number increased
3024        tx.send(make_block_changes(2, Some(5)))
3025            .await
3026            .unwrap();
3027
3028        // Should receive the message for block 2
3029        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3030            .await
3031            .expect("Should receive message")
3032            .expect("Channel open")
3033            .expect("No error");
3034
3035        assert_eq!(msg.header.number, 2, "Should use block 2 (block number increased)");
3036        assert_eq!(msg.header.partial_block_index, Some(5));
3037
3038        let _ = close_tx.send(());
3039        jh.await.expect("Task should not panic");
3040    }
3041
3042    /// Test that partial mode skips new blocks that are already synced
3043    #[test_log::test(tokio::test)]
3044    async fn test_partial_mode_skips_already_synced_blocks() {
3045        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
3046        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client))
3047            .with_partial_blocks(true);
3048        state_sync
3049            .initialize()
3050            .await
3051            .expect("Init failed");
3052
3053        // Set last_synced_block to block 5 - we've already synced up to here
3054        state_sync.last_synced_block = Some(BlockHeader {
3055            number: 5,
3056            hash: Bytes::from("0x05"),
3057            parent_hash: Bytes::from("0x04"),
3058            revert: false,
3059            timestamp: 0,
3060            partial_block_index: None,
3061        });
3062
3063        let (handle, mut block_rx) = state_sync.start().await;
3064        let (jh, close_tx) = handle.split();
3065
3066        // Send partial for block 3 to establish baseline
3067        tx.send(make_block_changes(3, Some(2)))
3068            .await
3069            .unwrap();
3070
3071        // Send "new block" for block 4 (partial index decreased) - but block 4 < last_synced (5)
3072        tx.send(make_block_changes(4, Some(0)))
3073            .await
3074            .unwrap();
3075
3076        // Should be skipped because block 4 is already synced
3077        match timeout(Duration::from_millis(50), block_rx.recv()).await {
3078            Err(_) => { /* Expected: skipped because already synced */ }
3079            Ok(_) => panic!("Should skip block 4 because it's already synced"),
3080        }
3081
3082        // Now send new block for block 6 (after last_synced)
3083        // First establish new partial index
3084        tx.send(make_block_changes(5, Some(3)))
3085            .await
3086            .unwrap();
3087        // Then trigger new block detection
3088        tx.send(make_block_changes(6, Some(0)))
3089            .await
3090            .unwrap();
3091
3092        let msg = timeout(Duration::from_millis(100), block_rx.recv())
3093            .await
3094            .expect("Should receive message")
3095            .expect("Channel open")
3096            .expect("No error");
3097
3098        assert_eq!(msg.header.number, 6, "Should use block 6 (after last synced)");
3099
3100        let _ = close_tx.send(());
3101        jh.await.expect("Task should not panic");
3102    }
3103}