tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, Chain, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59}
60
61pub type SyncResult<T> = Result<T, SynchronizerError>;
62
63impl<T> From<SendError<T>> for SynchronizerError {
64    fn from(err: SendError<T>) -> Self {
65        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
66    }
67}
68
69impl From<DeltasError> for SynchronizerError {
70    fn from(err: DeltasError) -> Self {
71        match err {
72            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
73            _ => SynchronizerError::ConnectionError(err.to_string()),
74        }
75    }
76}
77
78pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
79    extractor_id: ExtractorIdentity,
80    retrieve_balances: bool,
81    rpc_client: R,
82    deltas_client: D,
83    max_retries: u64,
84    retry_cooldown: Duration,
85    include_snapshots: bool,
86    component_tracker: ComponentTracker<R>,
87    last_synced_block: Option<BlockHeader>,
88    timeout: u64,
89    include_tvl: bool,
90}
91
92#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
93pub struct ComponentWithState {
94    pub state: ResponseProtocolState,
95    pub component: ProtocolComponent,
96    pub component_tvl: Option<f64>,
97    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
98}
99
100#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
101pub struct Snapshot {
102    pub states: HashMap<String, ComponentWithState>,
103    pub vm_storage: HashMap<Bytes, ResponseAccount>,
104}
105
106impl Snapshot {
107    fn extend(&mut self, other: Snapshot) {
108        self.states.extend(other.states);
109        self.vm_storage.extend(other.vm_storage);
110    }
111
112    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
113        &self.states
114    }
115
116    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
117        &self.vm_storage
118    }
119}
120
121#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
122pub struct StateSyncMessage<H>
123where
124    H: HeaderLike,
125{
126    /// The block information for this update.
127    pub header: H,
128    /// Snapshot for new components.
129    pub snapshots: Snapshot,
130    /// A single delta contains state updates for all tracked components, as well as additional
131    /// information about the system components e.g. newly added components (even below tvl), tvl
132    /// updates, balance updates.
133    pub deltas: Option<BlockChanges>,
134    /// Components that stopped being tracked.
135    pub removed_components: HashMap<String, ProtocolComponent>,
136}
137
138impl<H> StateSyncMessage<H>
139where
140    H: HeaderLike,
141{
142    pub fn merge(mut self, other: Self) -> Self {
143        // be careful with removed and snapshots attributes here, these can be ambiguous.
144        self.removed_components
145            .retain(|k, _| !other.snapshots.states.contains_key(k));
146        self.snapshots
147            .states
148            .retain(|k, _| !other.removed_components.contains_key(k));
149
150        self.snapshots.extend(other.snapshots);
151        let deltas = match (self.deltas, other.deltas) {
152            (Some(l), Some(r)) => Some(l.merge(r)),
153            (None, Some(r)) => Some(r),
154            (Some(l), None) => Some(l),
155            (None, None) => None,
156        };
157        self.removed_components
158            .extend(other.removed_components);
159        Self {
160            header: other.header,
161            snapshots: self.snapshots,
162            deltas,
163            removed_components: self.removed_components,
164        }
165    }
166}
167
168/// Handle for controlling a running synchronizer task.
169///
170/// This handle provides methods to gracefully shut down the synchronizer
171/// and await its completion with a timeout.
172pub struct SynchronizerTaskHandle {
173    join_handle: JoinHandle<()>,
174    close_tx: oneshot::Sender<()>,
175}
176
177/// StateSynchronizer
178///
179/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
180/// delivering messages to the client that let him reconstruct subsets of the protocol state.
181///
182/// This involves deciding which components to track according to the clients preferences,
183/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
184/// delivering delta messages for the components that have changed.
185impl SynchronizerTaskHandle {
186    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
187        Self { join_handle, close_tx }
188    }
189
190    /// Splits the handle into its join handle and close sender.
191    ///
192    /// This allows monitoring the task completion separately from controlling shutdown.
193    /// The join handle can be used with FuturesUnordered for monitoring, while the
194    /// close sender can be used to signal graceful shutdown.
195    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
196        (self.join_handle, self.close_tx)
197    }
198}
199
200#[async_trait]
201pub trait StateSynchronizer: Send + Sync + 'static {
202    async fn initialize(&mut self) -> SyncResult<()>;
203    /// Starts the state synchronization, consuming the synchronizer.
204    /// Returns a handle for controlling the running task and a receiver for messages.
205    async fn start(
206        mut self,
207    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
208}
209
210impl<R, D> ProtocolStateSynchronizer<R, D>
211where
212    // TODO: Consider moving these constraints directly to the
213    // client...
214    R: RPCClient + Clone + Send + Sync + 'static,
215    D: DeltasClient + Clone + Send + Sync + 'static,
216{
217    /// Creates a new state synchronizer.
218    #[allow(clippy::too_many_arguments)]
219    pub fn new(
220        extractor_id: ExtractorIdentity,
221        retrieve_balances: bool,
222        component_filter: ComponentFilter,
223        max_retries: u64,
224        retry_cooldown: Duration,
225        include_snapshots: bool,
226        include_tvl: bool,
227        rpc_client: R,
228        deltas_client: D,
229        timeout: u64,
230    ) -> Self {
231        Self {
232            extractor_id: extractor_id.clone(),
233            retrieve_balances,
234            rpc_client: rpc_client.clone(),
235            include_snapshots,
236            deltas_client,
237            component_tracker: ComponentTracker::new(
238                extractor_id.chain,
239                extractor_id.name.as_str(),
240                component_filter,
241                rpc_client,
242            ),
243            max_retries,
244            retry_cooldown,
245            last_synced_block: None,
246            timeout,
247            include_tvl,
248        }
249    }
250
251    /// Retrieves state snapshots of the requested components
252    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
253        &mut self,
254        header: BlockHeader,
255        ids: Option<I>,
256    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
257        if !self.include_snapshots {
258            return Ok(StateSyncMessage { header, ..Default::default() });
259        }
260
261        // Use given ids or use all if not passed
262        let component_ids: Vec<_> = match ids {
263            Some(ids) => ids.into_iter().cloned().collect(),
264            None => self
265                .component_tracker
266                .get_tracked_component_ids(),
267        };
268
269        if component_ids.is_empty() {
270            return Ok(StateSyncMessage { header, ..Default::default() });
271        }
272
273        //TODO: Improve this, we should not query for every component, but only for the ones that
274        // could have entrypoints. Maybe apply a filter per protocol?
275        let entrypoints_result = if self.extractor_id.chain == Chain::Ethereum {
276            let result = self
277                .rpc_client
278                .get_traced_entry_points_paginated(
279                    self.extractor_id.chain,
280                    &self.extractor_id.name,
281                    &component_ids,
282                    100,
283                    4,
284                )
285                .await?;
286            self.component_tracker
287                .process_entrypoints(&result.clone().into());
288            result.traced_entry_points.clone()
289        } else {
290            HashMap::new()
291        };
292
293        // Get contract IDs from component tracker
294        let contract_ids: Vec<Bytes> = self
295            .component_tracker
296            .get_contracts_by_component(&component_ids)
297            .into_iter()
298            .collect();
299
300        let request = SnapshotParameters::new(
301            self.extractor_id.chain,
302            &self.extractor_id.name,
303            &self.component_tracker.components,
304            &contract_ids,
305            header.number,
306        )
307        .entrypoints(&entrypoints_result)
308        .include_balances(self.retrieve_balances)
309        .include_tvl(self.include_tvl);
310        let snapshot_response = self
311            .rpc_client
312            .get_snapshots(&request, 100, 4)
313            .await?;
314
315        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
316        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
317
318        Ok(StateSyncMessage {
319            header,
320            snapshots: snapshot_response,
321            deltas: None,
322            removed_components: HashMap::new(),
323        })
324    }
325
326    /// Main method that does all the work.
327    ///
328    /// ## Return Value
329    ///
330    /// Returns a `Result` where:
331    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
332    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
333    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
334    ///   retry)
335    ///
336    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
337    /// signal may still arrive and we want to remain cancellable across retries.
338    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
339    async fn state_sync(
340        &mut self,
341        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
342        mut end_rx: oneshot::Receiver<()>,
343    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
344        // initialisation
345        let subscription_options = SubscriptionOptions::new().with_state(self.include_snapshots);
346        let (subscription_id, mut msg_rx) = match self
347            .deltas_client
348            .subscribe(self.extractor_id.clone(), subscription_options)
349            .await
350        {
351            Ok(result) => result,
352            Err(e) => return Err((e.into(), Some(end_rx))),
353        };
354
355        let result = async {
356            info!("Waiting for deltas...");
357            let mut warned = false;
358            let mut first_msg = loop {
359                let msg = select! {
360                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
361                        deltas_result
362                            .map_err(|_| {
363                                SynchronizerError::Timeout(format!(
364                                    "First deltas took longer than {t}s to arrive",
365                                    t = self.timeout
366                                ))
367                            })?
368                            .ok_or_else(|| {
369                                SynchronizerError::ConnectionError(
370                                    "Deltas channel closed before first message".to_string(),
371                                )
372                            })?
373                    },
374                    _ = &mut end_rx => {
375                        info!("Received close signal while waiting for first deltas");
376                        return Ok(());
377                    }
378                };
379
380                let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
381                if let Some(current) = &self.last_synced_block {
382                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
383                        if !warned {
384                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
385                            warned = true;
386                        }
387                        continue
388                    }
389                }
390                break msg;
391            };
392
393            self.filter_deltas(&mut first_msg);
394
395            // initial snapshot
396            let block = first_msg.get_block().clone();
397            info!(height = &block.number, "First deltas received");
398            let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
399            let deltas_msg = StateSyncMessage {
400                header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
401                snapshots: Default::default(),
402                deltas: Some(first_msg),
403                removed_components: Default::default(),
404            };
405
406            // If possible skip retrieving snapshots
407            let msg = if !self.is_next_expected(&header) {
408                info!("Retrieving snapshot");
409                let snapshot = self
410                    .get_snapshots::<Vec<&String>>(
411                        BlockHeader::from_block(&block, false),
412                        None,
413                    )
414                    .await?
415                    .merge(deltas_msg);
416                let n_components = self.component_tracker.components.len();
417                let n_snapshots = snapshot.snapshots.states.len();
418                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
419                snapshot
420            } else {
421                deltas_msg
422            };
423            block_tx.send(Ok(msg)).await?;
424            self.last_synced_block = Some(header.clone());
425            loop {
426                select! {
427                    deltas_opt = msg_rx.recv() => {
428                        if let Some(mut deltas) = deltas_opt {
429                            let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
430                            debug!(block_number=?header.number, "Received delta message");
431
432                            let (snapshots, removed_components) = {
433                                // 1. Remove components based on latest changes
434                                // 2. Add components based on latest changes, query those for snapshots
435                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
436
437                                // Only components we don't track yet need a snapshot,
438                                let requiring_snapshot: Vec<_> = to_add
439                                    .iter()
440                                    .filter(|id| {
441                                        !self.component_tracker
442                                            .components
443                                            .contains_key(id.as_str())
444                                    })
445                                    .collect();
446                                debug!(components=?requiring_snapshot, "SnapshotRequest");
447                                self.component_tracker
448                                    .start_tracking(requiring_snapshot.as_slice())
449                                    .await?;
450
451                                let snapshots = self
452                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
453                                    .await?
454                                    .snapshots;
455
456                                let removed_components = if !to_remove.is_empty() {
457                                    self.component_tracker.stop_tracking(&to_remove)
458                                } else {
459                                    Default::default()
460                                };
461
462                                (snapshots, removed_components)
463                            };
464
465                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
466                            self.component_tracker.process_entrypoints(&deltas.dci_update);
467
468                            // 4. Filter deltas by currently tracked components / contracts
469                            self.filter_deltas(&mut deltas);
470                            let n_changes = deltas.n_changes();
471
472                            // 5. Send the message
473                            let next = StateSyncMessage {
474                                header: header.clone(),
475                                snapshots,
476                                deltas: Some(deltas),
477                                removed_components,
478                            };
479                            block_tx.send(Ok(next)).await?;
480                            self.last_synced_block = Some(header.clone());
481
482                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
483                        } else {
484                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
485                        }
486                    },
487                    _ = &mut end_rx => {
488                        info!("Received close signal during state_sync");
489                        return Ok(());
490                    }
491                }
492            }
493        }.await;
494
495        // This cleanup code now runs regardless of how the function exits (error or channel close)
496        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
497        //Ignore error
498        let _ = self
499            .deltas_client
500            .unsubscribe(subscription_id)
501            .await
502            .map_err(|err| {
503                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
504            });
505
506        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
507        // whether the end_rx was consumed (close signal received) or not
508        match result {
509            Ok(()) => Ok(()), // Success, likely due to close signal
510            Err(e) => {
511                // The error came from the inner async block. Since the async block
512                // can receive close signals (which would return Ok), any error means
513                // the close signal was NOT received, so we can return the end_rx for retry
514                Err((e, Some(end_rx)))
515            }
516        }
517    }
518
519    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
520        if let Some(block) = self.last_synced_block.as_ref() {
521            return incoming.parent_hash == block.hash;
522        }
523        false
524    }
525    fn filter_deltas(&self, deltas: &mut BlockChanges) {
526        deltas.filter_by_component(|id| {
527            self.component_tracker
528                .components
529                .contains_key(id)
530        });
531        deltas.filter_by_contract(|id| {
532            self.component_tracker
533                .contracts
534                .contains(id)
535        });
536    }
537}
538
539#[async_trait]
540impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
541where
542    R: RPCClient + Clone + Send + Sync + 'static,
543    D: DeltasClient + Clone + Send + Sync + 'static,
544{
545    async fn initialize(&mut self) -> SyncResult<()> {
546        info!("Retrieving relevant protocol components");
547        self.component_tracker
548            .initialise_components()
549            .await?;
550        info!(
551            n_components = self.component_tracker.components.len(),
552            n_contracts = self.component_tracker.contracts.len(),
553            "Finished retrieving components",
554        );
555
556        Ok(())
557    }
558
559    async fn start(
560        mut self,
561    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
562        let (mut tx, rx) = channel(15);
563        let (end_tx, end_rx) = oneshot::channel::<()>();
564
565        let jh = tokio::spawn(async move {
566            let mut retry_count = 0;
567            let mut current_end_rx = end_rx;
568            let mut final_error = None;
569
570            while retry_count < self.max_retries {
571                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
572
573                let res = self
574                    .state_sync(&mut tx, current_end_rx)
575                    .await;
576                match res {
577                    Ok(()) => {
578                        info!(
579                            extractor_id=%&self.extractor_id,
580                            retry_count,
581                            "State synchronization exited cleanly"
582                        );
583                        return;
584                    }
585                    Err((e, maybe_end_rx)) => {
586                        warn!(
587                            extractor_id=%&self.extractor_id,
588                            retry_count,
589                            error=%e,
590                            "State synchronization errored!"
591                        );
592
593                        // If we have the end_rx back, we can retry
594                        if let Some(recovered_end_rx) = maybe_end_rx {
595                            current_end_rx = recovered_end_rx;
596
597                            if let SynchronizerError::ConnectionClosed = e {
598                                // break synchronization loop if websocket client is dead
599                                error!(
600                                    "Websocket connection closed. State synchronization exiting."
601                                );
602                                let _ = tx.send(Err(e)).await;
603                                return;
604                            } else {
605                                // Store error in case this is our last retry
606                                final_error = Some(e);
607                            }
608                        } else {
609                            // Close signal was received, exit cleanly
610                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
611                            return;
612                        }
613                    }
614                }
615                sleep(self.retry_cooldown).await;
616                retry_count += 1;
617            }
618            if let Some(e) = final_error {
619                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
620                let _ = tx.send(Err(e)).await;
621            }
622        });
623
624        let handle = SynchronizerTaskHandle::new(jh, end_tx);
625        (handle, rx)
626    }
627}
628
629#[cfg(test)]
630mod test {
631    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
632    //!
633    //! ## Test Coverage Strategy:
634    //!
635    //! ### Shutdown & Close Signal Tests:
636    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
637    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
638    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
639    //!
640    //! ### Cleanup & Error Handling Tests:
641    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
642    //!
643    //! ### Coverage Summary:
644    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
645    //! ✓ Close signal before first deltas   ✓ Close signal during processing
646    //! ✓ Processing errors                  ✓ Channel closure
647    //! ✓ Public API close operations        ✓ Normal completion
648
649    use std::{collections::HashSet, sync::Arc};
650
651    use test_log::test;
652    use tycho_common::dto::{
653        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
654        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
655        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
656        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
657        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
658        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
659    };
660    use uuid::Uuid;
661
662    use super::*;
663    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
664
665    // Required for mock client to implement clone
666    struct ArcRPCClient<T>(Arc<T>);
667
668    // Default derive(Clone) does require T to be Clone as well.
669    impl<T> Clone for ArcRPCClient<T> {
670        fn clone(&self) -> Self {
671            ArcRPCClient(self.0.clone())
672        }
673    }
674
675    #[async_trait]
676    impl<T> RPCClient for ArcRPCClient<T>
677    where
678        T: RPCClient + Sync + Send + 'static,
679    {
680        async fn get_tokens(
681            &self,
682            request: &TokensRequestBody,
683        ) -> Result<TokensRequestResponse, RPCError> {
684            self.0.get_tokens(request).await
685        }
686
687        async fn get_contract_state(
688            &self,
689            request: &StateRequestBody,
690        ) -> Result<StateRequestResponse, RPCError> {
691            self.0.get_contract_state(request).await
692        }
693
694        async fn get_protocol_components(
695            &self,
696            request: &ProtocolComponentsRequestBody,
697        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
698            self.0
699                .get_protocol_components(request)
700                .await
701        }
702
703        async fn get_protocol_states(
704            &self,
705            request: &ProtocolStateRequestBody,
706        ) -> Result<ProtocolStateRequestResponse, RPCError> {
707            self.0
708                .get_protocol_states(request)
709                .await
710        }
711
712        async fn get_protocol_systems(
713            &self,
714            request: &ProtocolSystemsRequestBody,
715        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
716            self.0
717                .get_protocol_systems(request)
718                .await
719        }
720
721        async fn get_component_tvl(
722            &self,
723            request: &ComponentTvlRequestBody,
724        ) -> Result<ComponentTvlRequestResponse, RPCError> {
725            self.0.get_component_tvl(request).await
726        }
727
728        async fn get_traced_entry_points(
729            &self,
730            request: &TracedEntryPointRequestBody,
731        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
732            self.0
733                .get_traced_entry_points(request)
734                .await
735        }
736
737        async fn get_snapshots<'a>(
738            &self,
739            request: &SnapshotParameters<'a>,
740            chunk_size: usize,
741            concurrency: usize,
742        ) -> Result<Snapshot, RPCError> {
743            self.0
744                .get_snapshots(request, chunk_size, concurrency)
745                .await
746        }
747    }
748
749    // Required for mock client to implement clone
750    struct ArcDeltasClient<T>(Arc<T>);
751
752    // Default derive(Clone) does require T to be Clone as well.
753    impl<T> Clone for ArcDeltasClient<T> {
754        fn clone(&self) -> Self {
755            ArcDeltasClient(self.0.clone())
756        }
757    }
758
759    #[async_trait]
760    impl<T> DeltasClient for ArcDeltasClient<T>
761    where
762        T: DeltasClient + Sync + Send + 'static,
763    {
764        async fn subscribe(
765            &self,
766            extractor_id: ExtractorIdentity,
767            options: SubscriptionOptions,
768        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
769            self.0
770                .subscribe(extractor_id, options)
771                .await
772        }
773
774        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
775            self.0
776                .unsubscribe(subscription_id)
777                .await
778        }
779
780        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
781            self.0.connect().await
782        }
783
784        async fn close(&self) -> Result<(), DeltasError> {
785            self.0.close().await
786        }
787    }
788
789    fn with_mocked_clients(
790        native: bool,
791        include_tvl: bool,
792        rpc_client: Option<MockRPCClient>,
793        deltas_client: Option<MockDeltasClient>,
794    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
795    {
796        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
797        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
798
799        ProtocolStateSynchronizer::new(
800            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
801            native,
802            ComponentFilter::with_tvl_range(50.0, 50.0),
803            1,
804            Duration::from_secs(0),
805            true,
806            include_tvl,
807            rpc_client,
808            deltas_client,
809            10_u64,
810        )
811    }
812
813    fn state_snapshot_native() -> ProtocolStateRequestResponse {
814        ProtocolStateRequestResponse {
815            states: vec![ResponseProtocolState {
816                component_id: "Component1".to_string(),
817                ..Default::default()
818            }],
819            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
820        }
821    }
822
823    #[test(tokio::test)]
824    async fn test_get_snapshots_native() {
825        let header = BlockHeader::default();
826        let mut rpc = MockRPCClient::new();
827        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
828
829        let component_clone = component.clone();
830        rpc.expect_get_snapshots()
831            .returning(move |_request, _chunk_size, _concurrency| {
832                Ok(Snapshot {
833                    states: state_snapshot_native()
834                        .states
835                        .into_iter()
836                        .map(|state| {
837                            (
838                                state.component_id.clone(),
839                                ComponentWithState {
840                                    state,
841                                    component: component_clone.clone(),
842                                    entrypoints: vec![],
843                                    component_tvl: None,
844                                },
845                            )
846                        })
847                        .collect(),
848                    vm_storage: HashMap::new(),
849                })
850            });
851
852        rpc.expect_get_traced_entry_points()
853            .returning(|_| {
854                Ok(TracedEntryPointRequestResponse {
855                    traced_entry_points: HashMap::new(),
856                    pagination: PaginationResponse::new(0, 20, 0),
857                })
858            });
859
860        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
861        state_sync
862            .component_tracker
863            .components
864            .insert("Component1".to_string(), component.clone());
865        let components_arg = ["Component1".to_string()];
866        let exp = StateSyncMessage {
867            header: header.clone(),
868            snapshots: Snapshot {
869                states: state_snapshot_native()
870                    .states
871                    .into_iter()
872                    .map(|state| {
873                        (
874                            state.component_id.clone(),
875                            ComponentWithState {
876                                state,
877                                component: component.clone(),
878                                entrypoints: vec![],
879                                component_tvl: None,
880                            },
881                        )
882                    })
883                    .collect(),
884                vm_storage: HashMap::new(),
885            },
886            deltas: None,
887            removed_components: Default::default(),
888        };
889
890        let snap = state_sync
891            .get_snapshots(header, Some(&components_arg))
892            .await
893            .expect("Retrieving snapshot failed");
894
895        assert_eq!(snap, exp);
896    }
897
898    #[test(tokio::test)]
899    async fn test_get_snapshots_native_with_tvl() {
900        let header = BlockHeader::default();
901        let mut rpc = MockRPCClient::new();
902        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
903
904        let component_clone = component.clone();
905        rpc.expect_get_snapshots()
906            .returning(move |_request, _chunk_size, _concurrency| {
907                Ok(Snapshot {
908                    states: state_snapshot_native()
909                        .states
910                        .into_iter()
911                        .map(|state| {
912                            (
913                                state.component_id.clone(),
914                                ComponentWithState {
915                                    state,
916                                    component: component_clone.clone(),
917                                    component_tvl: Some(100.0),
918                                    entrypoints: vec![],
919                                },
920                            )
921                        })
922                        .collect(),
923                    vm_storage: HashMap::new(),
924                })
925            });
926
927        rpc.expect_get_traced_entry_points()
928            .returning(|_| {
929                Ok(TracedEntryPointRequestResponse {
930                    traced_entry_points: HashMap::new(),
931                    pagination: PaginationResponse::new(0, 20, 0),
932                })
933            });
934
935        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
936        state_sync
937            .component_tracker
938            .components
939            .insert("Component1".to_string(), component.clone());
940        let components_arg = ["Component1".to_string()];
941        let exp = StateSyncMessage {
942            header: header.clone(),
943            snapshots: Snapshot {
944                states: state_snapshot_native()
945                    .states
946                    .into_iter()
947                    .map(|state| {
948                        (
949                            state.component_id.clone(),
950                            ComponentWithState {
951                                state,
952                                component: component.clone(),
953                                component_tvl: Some(100.0),
954                                entrypoints: vec![],
955                            },
956                        )
957                    })
958                    .collect(),
959                vm_storage: HashMap::new(),
960            },
961            deltas: None,
962            removed_components: Default::default(),
963        };
964
965        let snap = state_sync
966            .get_snapshots(header, Some(&components_arg))
967            .await
968            .expect("Retrieving snapshot failed");
969
970        assert_eq!(snap, exp);
971    }
972
973    fn state_snapshot_vm() -> StateRequestResponse {
974        StateRequestResponse {
975            accounts: vec![
976                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
977                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
978            ],
979            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
980        }
981    }
982
983    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
984        TracedEntryPointRequestResponse {
985            traced_entry_points: HashMap::from([(
986                "Component1".to_string(),
987                vec![(
988                    EntryPointWithTracingParams {
989                        entry_point: EntryPoint {
990                            external_id: "entrypoint_a".to_string(),
991                            target: Bytes::from("0x0badc0ffee"),
992                            signature: "sig()".to_string(),
993                        },
994                        params: TracingParams::RPCTracer(RPCTracerParams {
995                            caller: Some(Bytes::from("0x0badc0ffee")),
996                            calldata: Bytes::from("0x0badc0ffee"),
997                            state_overrides: None,
998                            prune_addresses: None,
999                        }),
1000                    },
1001                    TracingResult {
1002                        retriggers: HashSet::from([(
1003                            Bytes::from("0x0badc0ffee"),
1004                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1005                        )]),
1006                        accessed_slots: HashMap::from([(
1007                            Bytes::from("0x0badc0ffee"),
1008                            HashSet::from([Bytes::from("0xbadbeef0")]),
1009                        )]),
1010                    },
1011                )],
1012            )]),
1013            pagination: PaginationResponse::new(0, 20, 0),
1014        }
1015    }
1016
1017    #[test(tokio::test)]
1018    async fn test_get_snapshots_vm() {
1019        let header = BlockHeader::default();
1020        let mut rpc = MockRPCClient::new();
1021
1022        let traced_ep_response = traced_entry_point_response();
1023        rpc.expect_get_snapshots()
1024            .returning(move |_request, _chunk_size, _concurrency| {
1025                let vm_storage_accounts = state_snapshot_vm();
1026                Ok(Snapshot {
1027                    states: [(
1028                        "Component1".to_string(),
1029                        ComponentWithState {
1030                            state: ResponseProtocolState {
1031                                component_id: "Component1".to_string(),
1032                                ..Default::default()
1033                            },
1034                            component: ProtocolComponent {
1035                                id: "Component1".to_string(),
1036                                contract_ids: vec![
1037                                    Bytes::from("0x0badc0ffee"),
1038                                    Bytes::from("0xbabe42"),
1039                                ],
1040                                ..Default::default()
1041                            },
1042                            component_tvl: None,
1043                            entrypoints: traced_ep_response
1044                                .traced_entry_points
1045                                .get("Component1")
1046                                .cloned()
1047                                .unwrap_or_default(),
1048                        },
1049                    )]
1050                    .into_iter()
1051                    .collect(),
1052                    vm_storage: vm_storage_accounts
1053                        .accounts
1054                        .into_iter()
1055                        .map(|state| (state.address.clone(), state))
1056                        .collect(),
1057                })
1058            });
1059
1060        rpc.expect_get_traced_entry_points()
1061            .returning(|_| Ok(traced_entry_point_response()));
1062
1063        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1064        let component = ProtocolComponent {
1065            id: "Component1".to_string(),
1066            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1067            ..Default::default()
1068        };
1069        state_sync
1070            .component_tracker
1071            .components
1072            .insert("Component1".to_string(), component.clone());
1073        let components_arg = ["Component1".to_string()];
1074        let exp = StateSyncMessage {
1075            header: header.clone(),
1076            snapshots: Snapshot {
1077                states: [(
1078                    component.id.clone(),
1079                    ComponentWithState {
1080                        state: ResponseProtocolState {
1081                            component_id: "Component1".to_string(),
1082                            ..Default::default()
1083                        },
1084                        component: component.clone(),
1085                        component_tvl: None,
1086                        entrypoints: vec![(
1087                            EntryPointWithTracingParams {
1088                                entry_point: EntryPoint {
1089                                    external_id: "entrypoint_a".to_string(),
1090                                    target: Bytes::from("0x0badc0ffee"),
1091                                    signature: "sig()".to_string(),
1092                                },
1093                                params: TracingParams::RPCTracer(RPCTracerParams {
1094                                    caller: Some(Bytes::from("0x0badc0ffee")),
1095                                    calldata: Bytes::from("0x0badc0ffee"),
1096                                    state_overrides: None,
1097                                    prune_addresses: None,
1098                                }),
1099                            },
1100                            TracingResult {
1101                                retriggers: HashSet::from([(
1102                                    Bytes::from("0x0badc0ffee"),
1103                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1104                                )]),
1105                                accessed_slots: HashMap::from([(
1106                                    Bytes::from("0x0badc0ffee"),
1107                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1108                                )]),
1109                            },
1110                        )],
1111                    },
1112                )]
1113                .into_iter()
1114                .collect(),
1115                vm_storage: state_snapshot_vm()
1116                    .accounts
1117                    .into_iter()
1118                    .map(|state| (state.address.clone(), state))
1119                    .collect(),
1120            },
1121            deltas: None,
1122            removed_components: Default::default(),
1123        };
1124
1125        let snap = state_sync
1126            .get_snapshots(header, Some(&components_arg))
1127            .await
1128            .expect("Retrieving snapshot failed");
1129
1130        assert_eq!(snap, exp);
1131    }
1132
1133    #[test(tokio::test)]
1134    async fn test_get_snapshots_vm_with_tvl() {
1135        let header = BlockHeader::default();
1136        let mut rpc = MockRPCClient::new();
1137        let component = ProtocolComponent {
1138            id: "Component1".to_string(),
1139            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1140            ..Default::default()
1141        };
1142
1143        let component_clone = component.clone();
1144        rpc.expect_get_snapshots()
1145            .returning(move |_request, _chunk_size, _concurrency| {
1146                let vm_storage_accounts = state_snapshot_vm();
1147                Ok(Snapshot {
1148                    states: [(
1149                        "Component1".to_string(),
1150                        ComponentWithState {
1151                            state: ResponseProtocolState {
1152                                component_id: "Component1".to_string(),
1153                                ..Default::default()
1154                            },
1155                            component: component_clone.clone(),
1156                            component_tvl: Some(100.0),
1157                            entrypoints: vec![],
1158                        },
1159                    )]
1160                    .into_iter()
1161                    .collect(),
1162                    vm_storage: vm_storage_accounts
1163                        .accounts
1164                        .into_iter()
1165                        .map(|state| (state.address.clone(), state))
1166                        .collect(),
1167                })
1168            });
1169
1170        rpc.expect_get_traced_entry_points()
1171            .returning(|_| {
1172                Ok(TracedEntryPointRequestResponse {
1173                    traced_entry_points: HashMap::new(),
1174                    pagination: PaginationResponse::new(0, 20, 0),
1175                })
1176            });
1177
1178        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1179        state_sync
1180            .component_tracker
1181            .components
1182            .insert("Component1".to_string(), component.clone());
1183        let components_arg = ["Component1".to_string()];
1184        let exp = StateSyncMessage {
1185            header: header.clone(),
1186            snapshots: Snapshot {
1187                states: [(
1188                    component.id.clone(),
1189                    ComponentWithState {
1190                        state: ResponseProtocolState {
1191                            component_id: "Component1".to_string(),
1192                            ..Default::default()
1193                        },
1194                        component: component.clone(),
1195                        component_tvl: Some(100.0),
1196                        entrypoints: vec![],
1197                    },
1198                )]
1199                .into_iter()
1200                .collect(),
1201                vm_storage: state_snapshot_vm()
1202                    .accounts
1203                    .into_iter()
1204                    .map(|state| (state.address.clone(), state))
1205                    .collect(),
1206            },
1207            deltas: None,
1208            removed_components: Default::default(),
1209        };
1210
1211        let snap = state_sync
1212            .get_snapshots(header, Some(&components_arg))
1213            .await
1214            .expect("Retrieving snapshot failed");
1215
1216        assert_eq!(snap, exp);
1217    }
1218
1219    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1220        let mut rpc_client = MockRPCClient::new();
1221        // Mocks for the start_tracking call, these need to come first because they are more
1222        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1223        rpc_client
1224            .expect_get_protocol_components()
1225            .with(mockall::predicate::function(
1226                move |request_params: &ProtocolComponentsRequestBody| {
1227                    if let Some(ids) = request_params.component_ids.as_ref() {
1228                        ids.contains(&"Component3".to_string())
1229                    } else {
1230                        false
1231                    }
1232                },
1233            ))
1234            .returning(|_| {
1235                // return Component3
1236                Ok(ProtocolComponentRequestResponse {
1237                    protocol_components: vec![
1238                        // this component shall have a tvl update above threshold
1239                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1240                    ],
1241                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1242                })
1243            });
1244        // Mock get_snapshots for Component3
1245        rpc_client
1246            .expect_get_snapshots()
1247            .withf(|request: &SnapshotParameters, _chunk_size: &usize, _concurrency: &usize| {
1248                request
1249                    .components
1250                    .contains_key("Component3")
1251            })
1252            .returning(|_request, _chunk_size, _concurrency| {
1253                Ok(Snapshot {
1254                    states: [(
1255                        "Component3".to_string(),
1256                        ComponentWithState {
1257                            state: ResponseProtocolState {
1258                                component_id: "Component3".to_string(),
1259                                ..Default::default()
1260                            },
1261                            component: ProtocolComponent {
1262                                id: "Component3".to_string(),
1263                                ..Default::default()
1264                            },
1265                            component_tvl: Some(1000.0),
1266                            entrypoints: vec![],
1267                        },
1268                    )]
1269                    .into_iter()
1270                    .collect(),
1271                    vm_storage: HashMap::new(),
1272                })
1273            });
1274
1275        // mock calls for the initial state snapshots
1276        rpc_client
1277            .expect_get_protocol_components()
1278            .returning(|_| {
1279                // Initial sync of components
1280                Ok(ProtocolComponentRequestResponse {
1281                    protocol_components: vec![
1282                        // this component shall have a tvl update above threshold
1283                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1284                        // this component shall have a tvl update below threshold.
1285                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1286                        // a third component will have a tvl update above threshold
1287                    ],
1288                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1289                })
1290            });
1291
1292        rpc_client
1293            .expect_get_snapshots()
1294            .returning(|_request, _chunk_size, _concurrency| {
1295                Ok(Snapshot {
1296                    states: [
1297                        (
1298                            "Component1".to_string(),
1299                            ComponentWithState {
1300                                state: ResponseProtocolState {
1301                                    component_id: "Component1".to_string(),
1302                                    ..Default::default()
1303                                },
1304                                component: ProtocolComponent {
1305                                    id: "Component1".to_string(),
1306                                    ..Default::default()
1307                                },
1308                                component_tvl: Some(100.0),
1309                                entrypoints: vec![],
1310                            },
1311                        ),
1312                        (
1313                            "Component2".to_string(),
1314                            ComponentWithState {
1315                                state: ResponseProtocolState {
1316                                    component_id: "Component2".to_string(),
1317                                    ..Default::default()
1318                                },
1319                                component: ProtocolComponent {
1320                                    id: "Component2".to_string(),
1321                                    ..Default::default()
1322                                },
1323                                component_tvl: Some(0.0),
1324                                entrypoints: vec![],
1325                            },
1326                        ),
1327                    ]
1328                    .into_iter()
1329                    .collect(),
1330                    vm_storage: HashMap::new(),
1331                })
1332            });
1333
1334        // Mock get_traced_entry_points for Ethereum chain
1335        rpc_client
1336            .expect_get_traced_entry_points()
1337            .returning(|_| {
1338                Ok(TracedEntryPointRequestResponse {
1339                    traced_entry_points: HashMap::new(),
1340                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1341                })
1342            });
1343
1344        // Mock deltas client and messages
1345        let mut deltas_client = MockDeltasClient::new();
1346        let (tx, rx) = channel(1);
1347        deltas_client
1348            .expect_subscribe()
1349            .return_once(move |_, _| {
1350                // Return subscriber id and a channel
1351                Ok((Uuid::default(), rx))
1352            });
1353
1354        // Expect unsubscribe call during cleanup
1355        deltas_client
1356            .expect_unsubscribe()
1357            .return_once(|_| Ok(()));
1358
1359        (rpc_client, deltas_client, tx)
1360    }
1361
1362    /// Test strategy
1363    ///
1364    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1365    /// - send 2 dummy messages, containing only blocks
1366    /// - third message contains a new component with some significant tvl, one initial component
1367    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1368    #[test(tokio::test)]
1369    async fn test_state_sync() {
1370        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1371        let deltas = [
1372            BlockChanges {
1373                extractor: "uniswap-v2".to_string(),
1374                chain: Chain::Ethereum,
1375                block: Block {
1376                    number: 1,
1377                    hash: Bytes::from("0x01"),
1378                    parent_hash: Bytes::from("0x00"),
1379                    chain: Chain::Ethereum,
1380                    ts: Default::default(),
1381                },
1382                revert: false,
1383                dci_update: DCIUpdate {
1384                    new_entrypoints: HashMap::from([(
1385                        "Component1".to_string(),
1386                        HashSet::from([EntryPoint {
1387                            external_id: "entrypoint_a".to_string(),
1388                            target: Bytes::from("0x0badc0ffee"),
1389                            signature: "sig()".to_string(),
1390                        }]),
1391                    )]),
1392                    new_entrypoint_params: HashMap::from([(
1393                        "entrypoint_a".to_string(),
1394                        HashSet::from([(
1395                            TracingParams::RPCTracer(RPCTracerParams {
1396                                caller: Some(Bytes::from("0x0badc0ffee")),
1397                                calldata: Bytes::from("0x0badc0ffee"),
1398                                state_overrides: None,
1399                                prune_addresses: None,
1400                            }),
1401                            Some("Component1".to_string()),
1402                        )]),
1403                    )]),
1404                    trace_results: HashMap::from([(
1405                        "entrypoint_a".to_string(),
1406                        TracingResult {
1407                            retriggers: HashSet::from([(
1408                                Bytes::from("0x0badc0ffee"),
1409                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1410                            )]),
1411                            accessed_slots: HashMap::from([(
1412                                Bytes::from("0x0badc0ffee"),
1413                                HashSet::from([Bytes::from("0xbadbeef0")]),
1414                            )]),
1415                        },
1416                    )]),
1417                },
1418                ..Default::default()
1419            },
1420            BlockChanges {
1421                extractor: "uniswap-v2".to_string(),
1422                chain: Chain::Ethereum,
1423                block: Block {
1424                    number: 2,
1425                    hash: Bytes::from("0x02"),
1426                    parent_hash: Bytes::from("0x01"),
1427                    chain: Chain::Ethereum,
1428                    ts: Default::default(),
1429                },
1430                revert: false,
1431                component_tvl: [
1432                    ("Component1".to_string(), 100.0),
1433                    ("Component2".to_string(), 0.0),
1434                    ("Component3".to_string(), 1000.0),
1435                ]
1436                .into_iter()
1437                .collect(),
1438                ..Default::default()
1439            },
1440        ];
1441        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1442        state_sync
1443            .initialize()
1444            .await
1445            .expect("Init failed");
1446
1447        // Test starts here
1448        let (handle, mut rx) = state_sync.start().await;
1449        let (jh, close_tx) = handle.split();
1450        tx.send(deltas[0].clone())
1451            .await
1452            .expect("deltas channel msg 0 closed!");
1453        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1454            .await
1455            .expect("waiting for first state msg timed out!")
1456            .expect("state sync block sender closed!");
1457        tx.send(deltas[1].clone())
1458            .await
1459            .expect("deltas channel msg 1 closed!");
1460        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1461            .await
1462            .expect("waiting for second state msg timed out!")
1463            .expect("state sync block sender closed!");
1464        let _ = close_tx.send(());
1465        jh.await
1466            .expect("state sync task panicked!");
1467
1468        // assertions
1469        let exp1 = StateSyncMessage {
1470            header: BlockHeader {
1471                number: 1,
1472                hash: Bytes::from("0x01"),
1473                parent_hash: Bytes::from("0x00"),
1474                revert: false,
1475                ..Default::default()
1476            },
1477            snapshots: Snapshot {
1478                states: [
1479                    (
1480                        "Component1".to_string(),
1481                        ComponentWithState {
1482                            state: ResponseProtocolState {
1483                                component_id: "Component1".to_string(),
1484                                ..Default::default()
1485                            },
1486                            component: ProtocolComponent {
1487                                id: "Component1".to_string(),
1488                                ..Default::default()
1489                            },
1490                            component_tvl: Some(100.0),
1491                            entrypoints: vec![],
1492                        },
1493                    ),
1494                    (
1495                        "Component2".to_string(),
1496                        ComponentWithState {
1497                            state: ResponseProtocolState {
1498                                component_id: "Component2".to_string(),
1499                                ..Default::default()
1500                            },
1501                            component: ProtocolComponent {
1502                                id: "Component2".to_string(),
1503                                ..Default::default()
1504                            },
1505                            component_tvl: Some(0.0),
1506                            entrypoints: vec![],
1507                        },
1508                    ),
1509                ]
1510                .into_iter()
1511                .collect(),
1512                vm_storage: HashMap::new(),
1513            },
1514            deltas: Some(deltas[0].clone()),
1515            removed_components: Default::default(),
1516        };
1517
1518        let exp2 = StateSyncMessage {
1519            header: BlockHeader {
1520                number: 2,
1521                hash: Bytes::from("0x02"),
1522                parent_hash: Bytes::from("0x01"),
1523                revert: false,
1524                ..Default::default()
1525            },
1526            snapshots: Snapshot {
1527                states: [
1528                    // This is the new component we queried once it passed the tvl threshold.
1529                    (
1530                        "Component3".to_string(),
1531                        ComponentWithState {
1532                            state: ResponseProtocolState {
1533                                component_id: "Component3".to_string(),
1534                                ..Default::default()
1535                            },
1536                            component: ProtocolComponent {
1537                                id: "Component3".to_string(),
1538                                ..Default::default()
1539                            },
1540                            component_tvl: Some(1000.0),
1541                            entrypoints: vec![],
1542                        },
1543                    ),
1544                ]
1545                .into_iter()
1546                .collect(),
1547                vm_storage: HashMap::new(),
1548            },
1549            // Our deltas are empty and since merge methods are
1550            // tested in tycho-common we don't have much to do here.
1551            deltas: Some(BlockChanges {
1552                extractor: "uniswap-v2".to_string(),
1553                chain: Chain::Ethereum,
1554                block: Block {
1555                    number: 2,
1556                    hash: Bytes::from("0x02"),
1557                    parent_hash: Bytes::from("0x01"),
1558                    chain: Chain::Ethereum,
1559                    ts: Default::default(),
1560                },
1561                revert: false,
1562                component_tvl: [
1563                    // "Component2" should not show here.
1564                    ("Component1".to_string(), 100.0),
1565                    ("Component3".to_string(), 1000.0),
1566                ]
1567                .into_iter()
1568                .collect(),
1569                ..Default::default()
1570            }),
1571            // "Component2" was removed, because its tvl changed to 0.
1572            removed_components: [(
1573                "Component2".to_string(),
1574                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1575            )]
1576            .into_iter()
1577            .collect(),
1578        };
1579        assert_eq!(first_msg.unwrap(), exp1);
1580        assert_eq!(second_msg.unwrap(), exp2);
1581    }
1582
1583    #[test(tokio::test)]
1584    async fn test_state_sync_with_tvl_range() {
1585        // Define the range for testing
1586        let remove_tvl_threshold = 5.0;
1587        let add_tvl_threshold = 7.0;
1588
1589        let mut rpc_client = MockRPCClient::new();
1590        let mut deltas_client = MockDeltasClient::new();
1591
1592        rpc_client
1593            .expect_get_protocol_components()
1594            .with(mockall::predicate::function(
1595                move |request_params: &ProtocolComponentsRequestBody| {
1596                    if let Some(ids) = request_params.component_ids.as_ref() {
1597                        ids.contains(&"Component3".to_string())
1598                    } else {
1599                        false
1600                    }
1601                },
1602            ))
1603            .returning(|_| {
1604                Ok(ProtocolComponentRequestResponse {
1605                    protocol_components: vec![ProtocolComponent {
1606                        id: "Component3".to_string(),
1607                        ..Default::default()
1608                    }],
1609                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1610                })
1611            });
1612        // Mock get_snapshots for Component3
1613        rpc_client
1614            .expect_get_snapshots()
1615            .withf(|request: &SnapshotParameters, _chunk_size: &usize, _concurrency: &usize| {
1616                request
1617                    .components
1618                    .contains_key("Component3")
1619            })
1620            .returning(|_request, _chunk_size, _concurrency| {
1621                Ok(Snapshot {
1622                    states: [(
1623                        "Component3".to_string(),
1624                        ComponentWithState {
1625                            state: ResponseProtocolState {
1626                                component_id: "Component3".to_string(),
1627                                ..Default::default()
1628                            },
1629                            component: ProtocolComponent {
1630                                id: "Component3".to_string(),
1631                                ..Default::default()
1632                            },
1633                            component_tvl: Some(10.0),
1634                            entrypoints: vec![],
1635                        },
1636                    )]
1637                    .into_iter()
1638                    .collect(),
1639                    vm_storage: HashMap::new(),
1640                })
1641            });
1642
1643        // Mock for the initial snapshot retrieval
1644        rpc_client
1645            .expect_get_protocol_components()
1646            .returning(|_| {
1647                Ok(ProtocolComponentRequestResponse {
1648                    protocol_components: vec![
1649                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1650                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1651                    ],
1652                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1653                })
1654            });
1655
1656        // Mock get_snapshots for initial snapshot
1657        rpc_client
1658            .expect_get_snapshots()
1659            .returning(|_request, _chunk_size, _concurrency| {
1660                Ok(Snapshot {
1661                    states: [
1662                        (
1663                            "Component1".to_string(),
1664                            ComponentWithState {
1665                                state: ResponseProtocolState {
1666                                    component_id: "Component1".to_string(),
1667                                    ..Default::default()
1668                                },
1669                                component: ProtocolComponent {
1670                                    id: "Component1".to_string(),
1671                                    ..Default::default()
1672                                },
1673                                component_tvl: Some(6.0),
1674                                entrypoints: vec![],
1675                            },
1676                        ),
1677                        (
1678                            "Component2".to_string(),
1679                            ComponentWithState {
1680                                state: ResponseProtocolState {
1681                                    component_id: "Component2".to_string(),
1682                                    ..Default::default()
1683                                },
1684                                component: ProtocolComponent {
1685                                    id: "Component2".to_string(),
1686                                    ..Default::default()
1687                                },
1688                                component_tvl: Some(2.0),
1689                                entrypoints: vec![],
1690                            },
1691                        ),
1692                    ]
1693                    .into_iter()
1694                    .collect(),
1695                    vm_storage: HashMap::new(),
1696                })
1697            });
1698
1699        // Mock get_traced_entry_points for Ethereum chain
1700        rpc_client
1701            .expect_get_traced_entry_points()
1702            .returning(|_| {
1703                Ok(TracedEntryPointRequestResponse {
1704                    traced_entry_points: HashMap::new(),
1705                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1706                })
1707            });
1708
1709        let (tx, rx) = channel(1);
1710        deltas_client
1711            .expect_subscribe()
1712            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1713
1714        // Expect unsubscribe call during cleanup
1715        deltas_client
1716            .expect_unsubscribe()
1717            .return_once(|_| Ok(()));
1718
1719        let mut state_sync = ProtocolStateSynchronizer::new(
1720            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1721            true,
1722            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1723            1,
1724            Duration::from_secs(0),
1725            true,
1726            true,
1727            ArcRPCClient(Arc::new(rpc_client)),
1728            ArcDeltasClient(Arc::new(deltas_client)),
1729            10_u64,
1730        );
1731        state_sync
1732            .initialize()
1733            .await
1734            .expect("Init failed");
1735
1736        // Simulate the incoming BlockChanges
1737        let deltas = [
1738            BlockChanges {
1739                extractor: "uniswap-v2".to_string(),
1740                chain: Chain::Ethereum,
1741                block: Block {
1742                    number: 1,
1743                    hash: Bytes::from("0x01"),
1744                    parent_hash: Bytes::from("0x00"),
1745                    chain: Chain::Ethereum,
1746                    ts: Default::default(),
1747                },
1748                revert: false,
1749                ..Default::default()
1750            },
1751            BlockChanges {
1752                extractor: "uniswap-v2".to_string(),
1753                chain: Chain::Ethereum,
1754                block: Block {
1755                    number: 2,
1756                    hash: Bytes::from("0x02"),
1757                    parent_hash: Bytes::from("0x01"),
1758                    chain: Chain::Ethereum,
1759                    ts: Default::default(),
1760                },
1761                revert: false,
1762                component_tvl: [
1763                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1764                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1765                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1766                ]
1767                .into_iter()
1768                .collect(),
1769                ..Default::default()
1770            },
1771        ];
1772
1773        let (handle, mut rx) = state_sync.start().await;
1774        let (jh, close_tx) = handle.split();
1775
1776        // Simulate sending delta messages
1777        tx.send(deltas[0].clone())
1778            .await
1779            .expect("deltas channel msg 0 closed!");
1780
1781        // Expecting to receive the initial state message
1782        let _ = timeout(Duration::from_millis(100), rx.recv())
1783            .await
1784            .expect("waiting for first state msg timed out!")
1785            .expect("state sync block sender closed!");
1786
1787        // Send the third message, which should trigger TVL-based changes
1788        tx.send(deltas[1].clone())
1789            .await
1790            .expect("deltas channel msg 1 closed!");
1791        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1792            .await
1793            .expect("waiting for second state msg timed out!")
1794            .expect("state sync block sender closed!")
1795            .expect("no error");
1796
1797        let _ = close_tx.send(());
1798        jh.await
1799            .expect("state sync task panicked!");
1800
1801        let expected_second_msg = StateSyncMessage {
1802            header: BlockHeader {
1803                number: 2,
1804                hash: Bytes::from("0x02"),
1805                parent_hash: Bytes::from("0x01"),
1806                revert: false,
1807                ..Default::default()
1808            },
1809            snapshots: Snapshot {
1810                states: [(
1811                    "Component3".to_string(),
1812                    ComponentWithState {
1813                        state: ResponseProtocolState {
1814                            component_id: "Component3".to_string(),
1815                            ..Default::default()
1816                        },
1817                        component: ProtocolComponent {
1818                            id: "Component3".to_string(),
1819                            ..Default::default()
1820                        },
1821                        component_tvl: Some(10.0),
1822                        entrypoints: vec![], // TODO: add entrypoints?
1823                    },
1824                )]
1825                .into_iter()
1826                .collect(),
1827                vm_storage: HashMap::new(),
1828            },
1829            deltas: Some(BlockChanges {
1830                extractor: "uniswap-v2".to_string(),
1831                chain: Chain::Ethereum,
1832                block: Block {
1833                    number: 2,
1834                    hash: Bytes::from("0x02"),
1835                    parent_hash: Bytes::from("0x01"),
1836                    chain: Chain::Ethereum,
1837                    ts: Default::default(),
1838                },
1839                revert: false,
1840                component_tvl: [
1841                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1842                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1843                ]
1844                .into_iter()
1845                .collect(),
1846                ..Default::default()
1847            }),
1848            removed_components: [(
1849                "Component2".to_string(),
1850                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1851            )]
1852            .into_iter()
1853            .collect(),
1854        };
1855
1856        assert_eq!(second_msg, expected_second_msg);
1857    }
1858
1859    #[test(tokio::test)]
1860    async fn test_public_close_api_functionality() {
1861        // Tests the public close() API through the StateSynchronizer trait:
1862        // - close() fails before start() is called
1863        // - close() succeeds while synchronizer is running
1864        // - close() fails after already closed
1865        // This tests the full start/close lifecycle via the public API
1866
1867        let mut rpc_client = MockRPCClient::new();
1868        let mut deltas_client = MockDeltasClient::new();
1869
1870        // Mock the initial components call
1871        rpc_client
1872            .expect_get_protocol_components()
1873            .returning(|_| {
1874                Ok(ProtocolComponentRequestResponse {
1875                    protocol_components: vec![],
1876                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1877                })
1878            });
1879
1880        // Set up deltas client that will wait for messages (blocking in state_sync)
1881        let (_tx, rx) = channel(1);
1882        deltas_client
1883            .expect_subscribe()
1884            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1885
1886        // Expect unsubscribe call during cleanup
1887        deltas_client
1888            .expect_unsubscribe()
1889            .return_once(|_| Ok(()));
1890
1891        let mut state_sync = ProtocolStateSynchronizer::new(
1892            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1893            true,
1894            ComponentFilter::with_tvl_range(0.0, 0.0),
1895            5, // Enough retries
1896            Duration::from_secs(0),
1897            true,
1898            false,
1899            ArcRPCClient(Arc::new(rpc_client)),
1900            ArcDeltasClient(Arc::new(deltas_client)),
1901            10000_u64, // Long timeout so task doesn't exit on its own
1902        );
1903
1904        state_sync
1905            .initialize()
1906            .await
1907            .expect("Init should succeed");
1908
1909        // Start the synchronizer and test the new split-based close mechanism
1910        let (handle, _rx) = state_sync.start().await;
1911        let (jh, close_tx) = handle.split();
1912
1913        // Give it time to start up and enter state_sync
1914        tokio::time::sleep(Duration::from_millis(100)).await;
1915
1916        // Send close signal should succeed
1917        close_tx
1918            .send(())
1919            .expect("Should be able to send close signal");
1920        // Task should stop cleanly
1921        jh.await.expect("Task should not panic");
1922    }
1923
1924    #[test(tokio::test)]
1925    async fn test_cleanup_runs_when_state_sync_processing_errors() {
1926        // Tests that cleanup code runs when state_sync() errors during delta processing.
1927        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
1928        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
1929
1930        let mut rpc_client = MockRPCClient::new();
1931        let mut deltas_client = MockDeltasClient::new();
1932
1933        // Mock the initial components call
1934        rpc_client
1935            .expect_get_protocol_components()
1936            .returning(|_| {
1937                Ok(ProtocolComponentRequestResponse {
1938                    protocol_components: vec![],
1939                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1940                })
1941            });
1942
1943        // Mock to fail during snapshot retrieval (this will cause an error during processing)
1944        rpc_client
1945            .expect_get_protocol_states()
1946            .returning(|_| {
1947                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1948            });
1949
1950        // Set up deltas client to send one message that will trigger snapshot retrieval
1951        let (tx, rx) = channel(10);
1952        deltas_client
1953            .expect_subscribe()
1954            .return_once(move |_, _| {
1955                // Send a delta message that will require a snapshot
1956                let delta = BlockChanges {
1957                    extractor: "test".to_string(),
1958                    chain: Chain::Ethereum,
1959                    block: Block {
1960                        hash: Bytes::from("0x0123"),
1961                        number: 1,
1962                        parent_hash: Bytes::from("0x0000"),
1963                        chain: Chain::Ethereum,
1964                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
1965                            .unwrap()
1966                            .naive_utc(),
1967                    },
1968                    revert: false,
1969                    // Add a new component to trigger snapshot request
1970                    new_protocol_components: [(
1971                        "new_component".to_string(),
1972                        ProtocolComponent {
1973                            id: "new_component".to_string(),
1974                            protocol_system: "test_protocol".to_string(),
1975                            protocol_type_name: "test".to_string(),
1976                            chain: Chain::Ethereum,
1977                            tokens: vec![Bytes::from("0x0badc0ffee")],
1978                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
1979                            static_attributes: Default::default(),
1980                            creation_tx: Default::default(),
1981                            created_at: Default::default(),
1982                            change: Default::default(),
1983                        },
1984                    )]
1985                    .into_iter()
1986                    .collect(),
1987                    component_tvl: [("new_component".to_string(), 100.0)]
1988                        .into_iter()
1989                        .collect(),
1990                    ..Default::default()
1991                };
1992
1993                tokio::spawn(async move {
1994                    let _ = tx.send(delta).await;
1995                    // Close the channel after sending one message
1996                });
1997
1998                Ok((Uuid::default(), rx))
1999            });
2000
2001        // Expect unsubscribe call during cleanup
2002        deltas_client
2003            .expect_unsubscribe()
2004            .return_once(|_| Ok(()));
2005
2006        let mut state_sync = ProtocolStateSynchronizer::new(
2007            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2008            true,
2009            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2010            1,
2011            Duration::from_secs(0),
2012            true,
2013            false,
2014            ArcRPCClient(Arc::new(rpc_client)),
2015            ArcDeltasClient(Arc::new(deltas_client)),
2016            5000_u64,
2017        );
2018
2019        state_sync
2020            .initialize()
2021            .await
2022            .expect("Init should succeed");
2023
2024        // Before calling state_sync, set a value in last_synced_block
2025        state_sync.last_synced_block = Some(BlockHeader {
2026            hash: Bytes::from("0x0badc0ffee"),
2027            number: 42,
2028            parent_hash: Bytes::from("0xbadbeef0"),
2029            revert: false,
2030            timestamp: 123456789,
2031        });
2032
2033        // Create a channel for state_sync to send messages to
2034        let (mut block_tx, _block_rx) = channel(10);
2035
2036        // Call state_sync directly - this should error during processing
2037        let (_end_tx, end_rx) = oneshot::channel::<()>();
2038        let result = state_sync
2039            .state_sync(&mut block_tx, end_rx)
2040            .await;
2041        // Verify that state_sync returned an error
2042        assert!(result.is_err(), "state_sync should have errored during processing");
2043
2044        // Note: We can't verify internal state cleanup since state_sync consumes self,
2045        // but the cleanup logic is still tested by the fact that the method returns properly.
2046    }
2047
2048    #[test(tokio::test)]
2049    async fn test_close_signal_while_waiting_for_first_deltas() {
2050        // Tests close signal handling during the initial "waiting for deltas" phase.
2051        // This is the earliest possible close scenario - before any deltas are received.
2052        // Verifies: close signal received while waiting for first message triggers cleanup
2053        let mut rpc_client = MockRPCClient::new();
2054        let mut deltas_client = MockDeltasClient::new();
2055
2056        rpc_client
2057            .expect_get_protocol_components()
2058            .returning(|_| {
2059                Ok(ProtocolComponentRequestResponse {
2060                    protocol_components: vec![],
2061                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2062                })
2063            });
2064
2065        let (_tx, rx) = channel(1);
2066        deltas_client
2067            .expect_subscribe()
2068            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2069
2070        deltas_client
2071            .expect_unsubscribe()
2072            .return_once(|_| Ok(()));
2073
2074        let mut state_sync = ProtocolStateSynchronizer::new(
2075            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2076            true,
2077            ComponentFilter::with_tvl_range(0.0, 0.0),
2078            1,
2079            Duration::from_secs(0),
2080            true,
2081            false,
2082            ArcRPCClient(Arc::new(rpc_client)),
2083            ArcDeltasClient(Arc::new(deltas_client)),
2084            10000_u64,
2085        );
2086
2087        state_sync
2088            .initialize()
2089            .await
2090            .expect("Init should succeed");
2091
2092        let (mut block_tx, _block_rx) = channel(10);
2093        let (end_tx, end_rx) = oneshot::channel::<()>();
2094
2095        // Start state_sync in a task
2096        let state_sync_handle = tokio::spawn(async move {
2097            state_sync
2098                .state_sync(&mut block_tx, end_rx)
2099                .await
2100        });
2101
2102        // Give it a moment to start
2103        tokio::time::sleep(Duration::from_millis(100)).await;
2104
2105        // Send close signal
2106        let _ = end_tx.send(());
2107
2108        // state_sync should exit cleanly
2109        let result = state_sync_handle
2110            .await
2111            .expect("Task should not panic");
2112        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2113
2114        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2115    }
2116
2117    #[test(tokio::test)]
2118    async fn test_close_signal_during_main_processing_loop() {
2119        // Tests close signal handling during the main delta processing loop.
2120        // This tests the scenario where first message is processed successfully,
2121        // then close signal is received while waiting for subsequent deltas.
2122        // Verifies: close signal in main loop (after initialization) triggers cleanup
2123
2124        let mut rpc_client = MockRPCClient::new();
2125        let mut deltas_client = MockDeltasClient::new();
2126
2127        // Mock the initial components call
2128        rpc_client
2129            .expect_get_protocol_components()
2130            .returning(|_| {
2131                Ok(ProtocolComponentRequestResponse {
2132                    protocol_components: vec![],
2133                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2134                })
2135            });
2136
2137        // Mock the snapshot retrieval that happens after first message
2138        rpc_client
2139            .expect_get_protocol_states()
2140            .returning(|_| {
2141                Ok(ProtocolStateRequestResponse {
2142                    states: vec![],
2143                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2144                })
2145            });
2146
2147        rpc_client
2148            .expect_get_component_tvl()
2149            .returning(|_| {
2150                Ok(ComponentTvlRequestResponse {
2151                    tvl: HashMap::new(),
2152                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2153                })
2154            });
2155
2156        rpc_client
2157            .expect_get_traced_entry_points()
2158            .returning(|_| {
2159                Ok(TracedEntryPointRequestResponse {
2160                    traced_entry_points: HashMap::new(),
2161                    pagination: PaginationResponse::new(0, 20, 0),
2162                })
2163            });
2164
2165        // Set up deltas client to send one message, then keep channel open
2166        let (tx, rx) = channel(10);
2167        deltas_client
2168            .expect_subscribe()
2169            .return_once(move |_, _| {
2170                // Send first message immediately
2171                let first_delta = BlockChanges {
2172                    extractor: "test".to_string(),
2173                    chain: Chain::Ethereum,
2174                    block: Block {
2175                        hash: Bytes::from("0x0123"),
2176                        number: 1,
2177                        parent_hash: Bytes::from("0x0000"),
2178                        chain: Chain::Ethereum,
2179                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2180                            .unwrap()
2181                            .naive_utc(),
2182                    },
2183                    revert: false,
2184                    ..Default::default()
2185                };
2186
2187                tokio::spawn(async move {
2188                    let _ = tx.send(first_delta).await;
2189                    // Keep the sender alive but don't send more messages
2190                    // This will make the recv() block waiting for the next message
2191                    tokio::time::sleep(Duration::from_secs(30)).await;
2192                });
2193
2194                Ok((Uuid::default(), rx))
2195            });
2196
2197        deltas_client
2198            .expect_unsubscribe()
2199            .return_once(|_| Ok(()));
2200
2201        let mut state_sync = ProtocolStateSynchronizer::new(
2202            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2203            true,
2204            ComponentFilter::with_tvl_range(0.0, 1000.0),
2205            1,
2206            Duration::from_secs(0),
2207            true,
2208            false,
2209            ArcRPCClient(Arc::new(rpc_client)),
2210            ArcDeltasClient(Arc::new(deltas_client)),
2211            10000_u64,
2212        );
2213
2214        state_sync
2215            .initialize()
2216            .await
2217            .expect("Init should succeed");
2218
2219        let (mut block_tx, mut block_rx) = channel(10);
2220        let (end_tx, end_rx) = oneshot::channel::<()>();
2221
2222        // Start state_sync in a task
2223        let state_sync_handle = tokio::spawn(async move {
2224            state_sync
2225                .state_sync(&mut block_tx, end_rx)
2226                .await
2227        });
2228
2229        // Wait for the first message to be processed (snapshot sent)
2230        let first_snapshot = block_rx
2231            .recv()
2232            .await
2233            .expect("Should receive first snapshot")
2234            .expect("Synchronizer error");
2235        assert!(
2236            !first_snapshot
2237                .snapshots
2238                .states
2239                .is_empty() ||
2240                first_snapshot.deltas.is_some()
2241        );
2242        // Now send close signal - this should be handled in the main processing loop
2243        let _ = end_tx.send(());
2244
2245        // state_sync should exit cleanly after receiving close signal in main loop
2246        let result = state_sync_handle
2247            .await
2248            .expect("Task should not panic");
2249        assert!(
2250            result.is_ok(),
2251            "state_sync should exit cleanly when closed after first message: {result:?}"
2252        );
2253    }
2254
2255    #[test(tokio::test)]
2256    async fn test_max_retries_exceeded_error_propagation() {
2257        // Test that when max_retries is exceeded, the final error is sent through the channel
2258        // to the receiver and the synchronizer task exits cleanly
2259
2260        let mut rpc_client = MockRPCClient::new();
2261        let mut deltas_client = MockDeltasClient::new();
2262
2263        // Mock the initial components call to succeed
2264        rpc_client
2265            .expect_get_protocol_components()
2266            .returning(|_| {
2267                Ok(ProtocolComponentRequestResponse {
2268                    protocol_components: vec![],
2269                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2270                })
2271            });
2272
2273        // Set up deltas client to consistently fail after subscription
2274        // This will cause connection errors and trigger retries
2275        deltas_client
2276            .expect_subscribe()
2277            .returning(|_, _| {
2278                // Return a connection error to trigger retries
2279                Err(DeltasError::NotConnected)
2280            });
2281
2282        // Expect multiple unsubscribe calls during retries
2283        deltas_client
2284            .expect_unsubscribe()
2285            .returning(|_| Ok(()))
2286            .times(0..=5);
2287
2288        // Create synchronizer with only 2 retries and short cooldown
2289        let mut state_sync = ProtocolStateSynchronizer::new(
2290            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2291            true,
2292            ComponentFilter::with_tvl_range(0.0, 1000.0),
2293            2,                         // max_retries = 2
2294            Duration::from_millis(10), // short retry cooldown
2295            true,
2296            false,
2297            ArcRPCClient(Arc::new(rpc_client)),
2298            ArcDeltasClient(Arc::new(deltas_client)),
2299            1000_u64,
2300        );
2301
2302        state_sync
2303            .initialize()
2304            .await
2305            .expect("Init should succeed");
2306
2307        // Start the synchronizer - it should fail to subscribe and retry
2308        let (handle, mut rx) = state_sync.start().await;
2309        let (jh, _close_tx) = handle.split();
2310
2311        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2312            .await
2313            .expect("responsds in time")
2314            .expect("channel open");
2315
2316        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2317        if let Err(err) = res {
2318            assert!(
2319                matches!(err, SynchronizerError::ConnectionClosed),
2320                "Expected ConnectionClosed error, got: {:?}",
2321                err
2322            );
2323        } else {
2324            panic!("Expected an error")
2325        }
2326
2327        // The task should complete (not hang) after max retries
2328        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2329        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2330    }
2331
2332    #[test(tokio::test)]
2333    async fn test_is_next_expected() {
2334        // Test the is_next_expected function to ensure it correctly identifies
2335        // when an incoming block is the expected next block in the chain
2336
2337        let mut state_sync = with_mocked_clients(true, false, None, None);
2338
2339        // Test 1: No previous block - should return false
2340        let incoming_header = BlockHeader {
2341            number: 100,
2342            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2343            parent_hash: Bytes::from(
2344                "0x0000000000000000000000000000000000000000000000000000000000000000",
2345            ),
2346            revert: false,
2347            timestamp: 123456789,
2348        };
2349        assert!(
2350            !state_sync.is_next_expected(&incoming_header),
2351            "Should return false when no previous block is set"
2352        );
2353
2354        // Test 2: Set a previous block and test with matching parent hash
2355        let previous_header = BlockHeader {
2356            number: 99,
2357            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2358            parent_hash: Bytes::from(
2359                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2360            ),
2361            revert: false,
2362            timestamp: 123456788,
2363        };
2364        state_sync.last_synced_block = Some(previous_header.clone());
2365
2366        assert!(
2367            state_sync.is_next_expected(&incoming_header),
2368            "Should return true when incoming parent_hash matches previous hash"
2369        );
2370
2371        // Test 3: Test with non-matching parent hash
2372        let non_matching_header = BlockHeader {
2373            number: 100,
2374            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2375            parent_hash: Bytes::from(
2376                "0x1111111111111111111111111111111111111111111111111111111111111111",
2377            ), // Wrong parent hash
2378            revert: false,
2379            timestamp: 123456789,
2380        };
2381        assert!(
2382            !state_sync.is_next_expected(&non_matching_header),
2383            "Should return false when incoming parent_hash doesn't match previous hash"
2384        );
2385    }
2386
2387    #[test(tokio::test)]
2388    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2389        // Test that on synchronizer restart with the next expected block,
2390        // get_snapshot is not called and only deltas are sent
2391
2392        let mut rpc_client = MockRPCClient::new();
2393        let mut deltas_client = MockDeltasClient::new();
2394
2395        // Mock the initial components call
2396        rpc_client
2397            .expect_get_protocol_components()
2398            .returning(|_| {
2399                Ok(ProtocolComponentRequestResponse {
2400                    protocol_components: vec![ProtocolComponent {
2401                        id: "Component1".to_string(),
2402                        ..Default::default()
2403                    }],
2404                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2405                })
2406            });
2407
2408        // Set up deltas client to send a message that is the next expected block
2409        let (tx, rx) = channel(10);
2410        deltas_client
2411            .expect_subscribe()
2412            .return_once(move |_, _| {
2413                let expected_next_delta = BlockChanges {
2414                    extractor: "uniswap-v2".to_string(),
2415                    chain: Chain::Ethereum,
2416                    block: Block {
2417                        hash: Bytes::from(
2418                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2419                        ), // This will be the next expected block
2420                        number: 2,
2421                        parent_hash: Bytes::from(
2422                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2423                        ), // This matches our last synced block hash
2424                        chain: Chain::Ethereum,
2425                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2426                            .unwrap()
2427                            .naive_utc(),
2428                    },
2429                    revert: false,
2430                    ..Default::default()
2431                };
2432
2433                tokio::spawn(async move {
2434                    let _ = tx.send(expected_next_delta).await;
2435                });
2436
2437                Ok((Uuid::default(), rx))
2438            });
2439
2440        deltas_client
2441            .expect_unsubscribe()
2442            .return_once(|_| Ok(()));
2443
2444        let mut state_sync = ProtocolStateSynchronizer::new(
2445            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2446            true,
2447            ComponentFilter::with_tvl_range(0.0, 1000.0),
2448            1,
2449            Duration::from_secs(0),
2450            true, // include_snapshots = true
2451            false,
2452            ArcRPCClient(Arc::new(rpc_client)),
2453            ArcDeltasClient(Arc::new(deltas_client)),
2454            10000_u64,
2455        );
2456
2457        // Initialize and set up the last synced block to simulate a restart scenario
2458        state_sync
2459            .initialize()
2460            .await
2461            .expect("Init should succeed");
2462
2463        // Set last_synced_block to simulate that we've previously synced block 1
2464        state_sync.last_synced_block = Some(BlockHeader {
2465            number: 1,
2466            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2467            parent_hash: Bytes::from(
2468                "0x0000000000000000000000000000000000000000000000000000000000000000",
2469            ),
2470            revert: false,
2471            timestamp: 123456789,
2472        });
2473
2474        let (mut block_tx, mut block_rx) = channel(10);
2475        let (end_tx, end_rx) = oneshot::channel::<()>();
2476
2477        // Start state_sync
2478        let state_sync_handle = tokio::spawn(async move {
2479            state_sync
2480                .state_sync(&mut block_tx, end_rx)
2481                .await
2482        });
2483
2484        // Wait for the message - it should be a delta-only message (no snapshots)
2485        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2486            .await
2487            .expect("Should receive message within timeout")
2488            .expect("Channel should be open")
2489            .expect("Should not be an error");
2490
2491        // Send close signal
2492        let _ = end_tx.send(());
2493
2494        // Wait for state_sync to finish
2495        let _ = state_sync_handle
2496            .await
2497            .expect("Task should not panic");
2498
2499        // Verify the message contains deltas but no snapshots
2500        // (because we skipped snapshot retrieval)
2501        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2502        assert!(
2503            result_msg.snapshots.states.is_empty(),
2504            "Should not contain snapshots when next expected block is received"
2505        );
2506
2507        // Verify the block details match our expected next block
2508        if let Some(deltas) = &result_msg.deltas {
2509            assert_eq!(deltas.block.number, 2);
2510            assert_eq!(
2511                deltas.block.hash,
2512                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2513            );
2514            assert_eq!(
2515                deltas.block.parent_hash,
2516                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2517            );
2518        }
2519    }
2520
2521    #[test(tokio::test)]
2522    async fn test_skip_previously_processed_messages() {
2523        // Test that the synchronizer skips messages for blocks that have already been processed
2524        // This simulates a service restart scenario where old messages are re-emitted
2525
2526        let mut rpc_client = MockRPCClient::new();
2527        let mut deltas_client = MockDeltasClient::new();
2528
2529        // Mock the initial components call
2530        rpc_client
2531            .expect_get_protocol_components()
2532            .returning(|_| {
2533                Ok(ProtocolComponentRequestResponse {
2534                    protocol_components: vec![ProtocolComponent {
2535                        id: "Component1".to_string(),
2536                        ..Default::default()
2537                    }],
2538                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2539                })
2540            });
2541
2542        // Mock snapshot calls for when we process the expected next block (block 6)
2543        rpc_client
2544            .expect_get_protocol_states()
2545            .returning(|_| {
2546                Ok(ProtocolStateRequestResponse {
2547                    states: vec![ResponseProtocolState {
2548                        component_id: "Component1".to_string(),
2549                        ..Default::default()
2550                    }],
2551                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2552                })
2553            });
2554
2555        rpc_client
2556            .expect_get_component_tvl()
2557            .returning(|_| {
2558                Ok(ComponentTvlRequestResponse {
2559                    tvl: [("Component1".to_string(), 100.0)]
2560                        .into_iter()
2561                        .collect(),
2562                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2563                })
2564            });
2565
2566        rpc_client
2567            .expect_get_traced_entry_points()
2568            .returning(|_| {
2569                Ok(TracedEntryPointRequestResponse {
2570                    traced_entry_points: HashMap::new(),
2571                    pagination: PaginationResponse::new(0, 20, 0),
2572                })
2573            });
2574
2575        // Set up deltas client to send old messages first, then the expected next block
2576        let (tx, rx) = channel(10);
2577        deltas_client
2578            .expect_subscribe()
2579            .return_once(move |_, _| {
2580                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2581                let old_messages = vec![
2582                    BlockChanges {
2583                        extractor: "uniswap-v2".to_string(),
2584                        chain: Chain::Ethereum,
2585                        block: Block {
2586                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2587                            number: 3,
2588                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2589                            chain: Chain::Ethereum,
2590                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2591                        },
2592                        revert: false,
2593                        ..Default::default()
2594                    },
2595                    BlockChanges {
2596                        extractor: "uniswap-v2".to_string(),
2597                        chain: Chain::Ethereum,
2598                        block: Block {
2599                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2600                            number: 4,
2601                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2602                            chain: Chain::Ethereum,
2603                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2604                        },
2605                        revert: false,
2606                        ..Default::default()
2607                    },
2608                    BlockChanges {
2609                        extractor: "uniswap-v2".to_string(),
2610                        chain: Chain::Ethereum,
2611                        block: Block {
2612                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2613                            number: 5,
2614                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2615                            chain: Chain::Ethereum,
2616                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2617                        },
2618                        revert: false,
2619                        ..Default::default()
2620                    },
2621                    // This is the expected next block (block 6)
2622                    BlockChanges {
2623                        extractor: "uniswap-v2".to_string(),
2624                        chain: Chain::Ethereum,
2625                        block: Block {
2626                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2627                            number: 6,
2628                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2629                            chain: Chain::Ethereum,
2630                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2631                        },
2632                        revert: false,
2633                        ..Default::default()
2634                    },
2635                ];
2636
2637                tokio::spawn(async move {
2638                    for message in old_messages {
2639                        let _ = tx.send(message).await;
2640                        tokio::time::sleep(Duration::from_millis(10)).await;
2641                    }
2642                });
2643
2644                Ok((Uuid::default(), rx))
2645            });
2646
2647        deltas_client
2648            .expect_unsubscribe()
2649            .return_once(|_| Ok(()));
2650
2651        let mut state_sync = ProtocolStateSynchronizer::new(
2652            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2653            true,
2654            ComponentFilter::with_tvl_range(0.0, 1000.0),
2655            1,
2656            Duration::from_secs(0),
2657            true,
2658            true,
2659            ArcRPCClient(Arc::new(rpc_client)),
2660            ArcDeltasClient(Arc::new(deltas_client)),
2661            10000_u64,
2662        );
2663
2664        // Initialize and set last_synced_block to simulate we've already processed block 5
2665        state_sync
2666            .initialize()
2667            .await
2668            .expect("Init should succeed");
2669
2670        state_sync.last_synced_block = Some(BlockHeader {
2671            number: 5,
2672            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2673            parent_hash: Bytes::from(
2674                "0x0000000000000000000000000000000000000000000000000000000000000004",
2675            ),
2676            revert: false,
2677            timestamp: 1234567892,
2678        });
2679
2680        let (mut block_tx, mut block_rx) = channel(10);
2681        let (end_tx, end_rx) = oneshot::channel::<()>();
2682
2683        // Start state_sync
2684        let state_sync_handle = tokio::spawn(async move {
2685            state_sync
2686                .state_sync(&mut block_tx, end_rx)
2687                .await
2688        });
2689
2690        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2691        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2692            .await
2693            .expect("Should receive message within timeout")
2694            .expect("Channel should be open")
2695            .expect("Should not be an error");
2696
2697        // Send close signal
2698        let _ = end_tx.send(());
2699
2700        // Wait for state_sync to finish
2701        let _ = state_sync_handle
2702            .await
2703            .expect("Task should not panic");
2704
2705        // Verify we only got the message for block 6 (the expected next block)
2706        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2707        if let Some(deltas) = &result_msg.deltas {
2708            assert_eq!(
2709                deltas.block.number, 6,
2710                "Should only process block 6, skipping earlier blocks"
2711            );
2712            assert_eq!(
2713                deltas.block.hash,
2714                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2715            );
2716        }
2717
2718        // Verify that no additional messages are received immediately
2719        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2720        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2721            Err(_) => {
2722                // Timeout is expected - no more messages should come
2723            }
2724            Ok(Some(Err(_))) => {
2725                // Error received is also acceptable (connection closed)
2726            }
2727            Ok(Some(Ok(_))) => {
2728                panic!("Should not receive additional messages - old blocks should be skipped");
2729            }
2730            Ok(None) => {
2731                // Channel closed is also acceptable
2732            }
2733        }
2734    }
2735}