tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59}
60
61pub type SyncResult<T> = Result<T, SynchronizerError>;
62
63impl<T> From<SendError<T>> for SynchronizerError {
64    fn from(err: SendError<T>) -> Self {
65        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
66    }
67}
68
69impl From<DeltasError> for SynchronizerError {
70    fn from(err: DeltasError) -> Self {
71        match err {
72            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
73            _ => SynchronizerError::ConnectionError(err.to_string()),
74        }
75    }
76}
77
78pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
79    extractor_id: ExtractorIdentity,
80    retrieve_balances: bool,
81    rpc_client: R,
82    deltas_client: D,
83    max_retries: u64,
84    retry_cooldown: Duration,
85    include_snapshots: bool,
86    component_tracker: ComponentTracker<R>,
87    last_synced_block: Option<BlockHeader>,
88    timeout: u64,
89    include_tvl: bool,
90    compression: bool,
91}
92
93#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
94pub struct ComponentWithState {
95    pub state: ResponseProtocolState,
96    pub component: ProtocolComponent,
97    pub component_tvl: Option<f64>,
98    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
99}
100
101#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
102pub struct Snapshot {
103    pub states: HashMap<String, ComponentWithState>,
104    pub vm_storage: HashMap<Bytes, ResponseAccount>,
105}
106
107impl Snapshot {
108    fn extend(&mut self, other: Snapshot) {
109        self.states.extend(other.states);
110        self.vm_storage.extend(other.vm_storage);
111    }
112
113    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
114        &self.states
115    }
116
117    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
118        &self.vm_storage
119    }
120}
121
122#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
123pub struct StateSyncMessage<H>
124where
125    H: HeaderLike,
126{
127    /// The block information for this update.
128    pub header: H,
129    /// Snapshot for new components.
130    pub snapshots: Snapshot,
131    /// A single delta contains state updates for all tracked components, as well as additional
132    /// information about the system components e.g. newly added components (even below tvl), tvl
133    /// updates, balance updates.
134    pub deltas: Option<BlockChanges>,
135    /// Components that stopped being tracked.
136    pub removed_components: HashMap<String, ProtocolComponent>,
137}
138
139impl<H> StateSyncMessage<H>
140where
141    H: HeaderLike,
142{
143    pub fn merge(mut self, other: Self) -> Self {
144        // be careful with removed and snapshots attributes here, these can be ambiguous.
145        self.removed_components
146            .retain(|k, _| !other.snapshots.states.contains_key(k));
147        self.snapshots
148            .states
149            .retain(|k, _| !other.removed_components.contains_key(k));
150
151        self.snapshots.extend(other.snapshots);
152        let deltas = match (self.deltas, other.deltas) {
153            (Some(l), Some(r)) => Some(l.merge(r)),
154            (None, Some(r)) => Some(r),
155            (Some(l), None) => Some(l),
156            (None, None) => None,
157        };
158        self.removed_components
159            .extend(other.removed_components);
160        Self {
161            header: other.header,
162            snapshots: self.snapshots,
163            deltas,
164            removed_components: self.removed_components,
165        }
166    }
167}
168
169/// Handle for controlling a running synchronizer task.
170///
171/// This handle provides methods to gracefully shut down the synchronizer
172/// and await its completion with a timeout.
173pub struct SynchronizerTaskHandle {
174    join_handle: JoinHandle<()>,
175    close_tx: oneshot::Sender<()>,
176}
177
178/// StateSynchronizer
179///
180/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
181/// delivering messages to the client that let him reconstruct subsets of the protocol state.
182///
183/// This involves deciding which components to track according to the clients preferences,
184/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
185/// delivering delta messages for the components that have changed.
186impl SynchronizerTaskHandle {
187    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
188        Self { join_handle, close_tx }
189    }
190
191    /// Splits the handle into its join handle and close sender.
192    ///
193    /// This allows monitoring the task completion separately from controlling shutdown.
194    /// The join handle can be used with FuturesUnordered for monitoring, while the
195    /// close sender can be used to signal graceful shutdown.
196    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
197        (self.join_handle, self.close_tx)
198    }
199}
200
201#[async_trait]
202pub trait StateSynchronizer: Send + Sync + 'static {
203    async fn initialize(&mut self) -> SyncResult<()>;
204    /// Starts the state synchronization, consuming the synchronizer.
205    /// Returns a handle for controlling the running task and a receiver for messages.
206    async fn start(
207        mut self,
208    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
209}
210
211impl<R, D> ProtocolStateSynchronizer<R, D>
212where
213    // TODO: Consider moving these constraints directly to the
214    // client...
215    R: RPCClient + Clone + Send + Sync + 'static,
216    D: DeltasClient + Clone + Send + Sync + 'static,
217{
218    /// Creates a new state synchronizer.
219    #[allow(clippy::too_many_arguments)]
220    pub fn new(
221        extractor_id: ExtractorIdentity,
222        retrieve_balances: bool,
223        component_filter: ComponentFilter,
224        max_retries: u64,
225        retry_cooldown: Duration,
226        include_snapshots: bool,
227        include_tvl: bool,
228        compression: bool,
229        rpc_client: R,
230        deltas_client: D,
231        timeout: u64,
232    ) -> Self {
233        Self {
234            extractor_id: extractor_id.clone(),
235            retrieve_balances,
236            rpc_client: rpc_client.clone(),
237            include_snapshots,
238            deltas_client,
239            component_tracker: ComponentTracker::new(
240                extractor_id.chain,
241                extractor_id.name.as_str(),
242                component_filter,
243                rpc_client,
244            ),
245            max_retries,
246            retry_cooldown,
247            last_synced_block: None,
248            timeout,
249            include_tvl,
250            compression,
251        }
252    }
253
254    /// Retrieves state snapshots of the requested components
255    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
256        &mut self,
257        header: BlockHeader,
258        ids: Option<I>,
259    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
260        if !self.include_snapshots {
261            return Ok(StateSyncMessage { header, ..Default::default() });
262        }
263
264        // Use given ids or use all if not passed
265        let component_ids: Vec<_> = match ids {
266            Some(ids) => ids.into_iter().cloned().collect(),
267            None => self
268                .component_tracker
269                .get_tracked_component_ids(),
270        };
271
272        if component_ids.is_empty() {
273            return Ok(StateSyncMessage { header, ..Default::default() });
274        }
275
276        // TODO: Find a smarter way to dynamically detect DCI protocols.
277        const DCI_PROTOCOLS: &[&str] = &[
278            "uniswap_v4_hooks",
279            "vm:curve",
280            "vm:balancer_v2",
281            "vm:balancer_v3",
282            "fluid_v1",
283            "erc4626",
284        ];
285        let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
286            let result = self
287                .rpc_client
288                .get_traced_entry_points_paginated(
289                    self.extractor_id.chain,
290                    &self.extractor_id.name,
291                    &component_ids,
292                    None,
293                    RPC_CLIENT_CONCURRENCY,
294                )
295                .await?;
296            self.component_tracker
297                .process_entrypoints(&result.clone().into());
298            result.traced_entry_points.clone()
299        } else {
300            HashMap::new()
301        };
302
303        // Get contract IDs from component tracker
304        let contract_ids: Vec<Bytes> = self
305            .component_tracker
306            .get_contracts_by_component(&component_ids)
307            .into_iter()
308            .collect();
309
310        let request = SnapshotParameters::new(
311            self.extractor_id.chain,
312            &self.extractor_id.name,
313            &self.component_tracker.components,
314            &contract_ids,
315            header.number,
316        )
317        .entrypoints(&entrypoints_result)
318        .include_balances(self.retrieve_balances)
319        .include_tvl(self.include_tvl);
320        let snapshot_response = self
321            .rpc_client
322            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
323            .await?;
324
325        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
326        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
327
328        Ok(StateSyncMessage {
329            header,
330            snapshots: snapshot_response,
331            deltas: None,
332            removed_components: HashMap::new(),
333        })
334    }
335
336    /// Main method that does all the work.
337    ///
338    /// ## Return Value
339    ///
340    /// Returns a `Result` where:
341    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
342    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
343    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
344    ///   retry)
345    ///
346    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
347    /// signal may still arrive and we want to remain cancellable across retries.
348    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
349    async fn state_sync(
350        &mut self,
351        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
352        mut end_rx: oneshot::Receiver<()>,
353    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
354        // initialisation
355        let subscription_options = SubscriptionOptions::new()
356            .with_state(self.include_snapshots)
357            .with_compression(self.compression);
358        let (subscription_id, mut msg_rx) = match self
359            .deltas_client
360            .subscribe(self.extractor_id.clone(), subscription_options)
361            .await
362        {
363            Ok(result) => result,
364            Err(e) => return Err((e.into(), Some(end_rx))),
365        };
366
367        let result = async {
368            info!("Waiting for deltas...");
369            let mut warned = false;
370            let mut first_msg = loop {
371                let msg = select! {
372                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
373                        deltas_result
374                            .map_err(|_| {
375                                SynchronizerError::Timeout(format!(
376                                    "First deltas took longer than {t}s to arrive",
377                                    t = self.timeout
378                                ))
379                            })?
380                            .ok_or_else(|| {
381                                SynchronizerError::ConnectionError(
382                                    "Deltas channel closed before first message".to_string(),
383                                )
384                            })?
385                    },
386                    _ = &mut end_rx => {
387                        info!("Received close signal while waiting for first deltas");
388                        return Ok(());
389                    }
390                };
391
392                let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
393                if let Some(current) = &self.last_synced_block {
394                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
395                        if !warned {
396                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
397                            warned = true;
398                        }
399                        continue
400                    }
401                }
402                break msg;
403            };
404
405            self.filter_deltas(&mut first_msg);
406
407            // initial snapshot
408            let block = first_msg.get_block().clone();
409            info!(height = &block.number, "First deltas received");
410            let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
411            let deltas_msg = StateSyncMessage {
412                header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
413                snapshots: Default::default(),
414                deltas: Some(first_msg),
415                removed_components: Default::default(),
416            };
417
418            // If possible skip retrieving snapshots
419            let msg = if !self.is_next_expected(&header) {
420                info!("Retrieving snapshot");
421                let snapshot = self
422                    .get_snapshots::<Vec<&String>>(
423                        BlockHeader::from_block(&block, false),
424                        None,
425                    )
426                    .await?
427                    .merge(deltas_msg);
428                let n_components = self.component_tracker.components.len();
429                let n_snapshots = snapshot.snapshots.states.len();
430                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
431                snapshot
432            } else {
433                deltas_msg
434            };
435            block_tx.send(Ok(msg)).await?;
436            self.last_synced_block = Some(header.clone());
437            loop {
438                select! {
439                    deltas_opt = msg_rx.recv() => {
440                        if let Some(mut deltas) = deltas_opt {
441                            let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
442                            debug!(block_number=?header.number, "Received delta message");
443
444                            let (snapshots, removed_components) = {
445                                // 1. Remove components based on latest changes
446                                // 2. Add components based on latest changes, query those for snapshots
447                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
448
449                                // Only components we don't track yet need a snapshot,
450                                let requiring_snapshot: Vec<_> = to_add
451                                    .iter()
452                                    .filter(|id| {
453                                        !self.component_tracker
454                                            .components
455                                            .contains_key(id.as_str())
456                                    })
457                                    .collect();
458                                debug!(components=?requiring_snapshot, "SnapshotRequest");
459                                self.component_tracker
460                                    .start_tracking(requiring_snapshot.as_slice())
461                                    .await?;
462
463                                let snapshots = self
464                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
465                                    .await?
466                                    .snapshots;
467
468                                let removed_components = if !to_remove.is_empty() {
469                                    self.component_tracker.stop_tracking(&to_remove)
470                                } else {
471                                    Default::default()
472                                };
473
474                                (snapshots, removed_components)
475                            };
476
477                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
478                            self.component_tracker.process_entrypoints(&deltas.dci_update);
479
480                            // 4. Filter deltas by currently tracked components / contracts
481                            self.filter_deltas(&mut deltas);
482                            let n_changes = deltas.n_changes();
483
484                            // 5. Send the message
485                            let next = StateSyncMessage {
486                                header: header.clone(),
487                                snapshots,
488                                deltas: Some(deltas),
489                                removed_components,
490                            };
491                            block_tx.send(Ok(next)).await?;
492                            self.last_synced_block = Some(header.clone());
493
494                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
495                        } else {
496                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
497                        }
498                    },
499                    _ = &mut end_rx => {
500                        info!("Received close signal during state_sync");
501                        return Ok(());
502                    }
503                }
504            }
505        }.await;
506
507        // This cleanup code now runs regardless of how the function exits (error or channel close)
508        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
509        //Ignore error
510        let _ = self
511            .deltas_client
512            .unsubscribe(subscription_id)
513            .await
514            .map_err(|err| {
515                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
516            });
517
518        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
519        // whether the end_rx was consumed (close signal received) or not
520        match result {
521            Ok(()) => Ok(()), // Success, likely due to close signal
522            Err(e) => {
523                // The error came from the inner async block. Since the async block
524                // can receive close signals (which would return Ok), any error means
525                // the close signal was NOT received, so we can return the end_rx for retry
526                Err((e, Some(end_rx)))
527            }
528        }
529    }
530
531    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
532        if let Some(block) = self.last_synced_block.as_ref() {
533            return incoming.parent_hash == block.hash;
534        }
535        false
536    }
537    fn filter_deltas(&self, deltas: &mut BlockChanges) {
538        deltas.filter_by_component(|id| {
539            self.component_tracker
540                .components
541                .contains_key(id)
542        });
543        deltas.filter_by_contract(|id| {
544            self.component_tracker
545                .contracts
546                .contains(id)
547        });
548    }
549}
550
551#[async_trait]
552impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
553where
554    R: RPCClient + Clone + Send + Sync + 'static,
555    D: DeltasClient + Clone + Send + Sync + 'static,
556{
557    async fn initialize(&mut self) -> SyncResult<()> {
558        info!("Retrieving relevant protocol components");
559        self.component_tracker
560            .initialise_components()
561            .await?;
562        info!(
563            n_components = self.component_tracker.components.len(),
564            n_contracts = self.component_tracker.contracts.len(),
565            "Finished retrieving components",
566        );
567
568        Ok(())
569    }
570
571    async fn start(
572        mut self,
573    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
574        let (mut tx, rx) = channel(15);
575        let (end_tx, end_rx) = oneshot::channel::<()>();
576
577        let jh = tokio::spawn(async move {
578            let mut retry_count = 0;
579            let mut current_end_rx = end_rx;
580            let mut final_error = None;
581
582            while retry_count < self.max_retries {
583                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
584
585                let res = self
586                    .state_sync(&mut tx, current_end_rx)
587                    .await;
588                match res {
589                    Ok(()) => {
590                        info!(
591                            extractor_id=%&self.extractor_id,
592                            retry_count,
593                            "State synchronization exited cleanly"
594                        );
595                        return;
596                    }
597                    Err((e, maybe_end_rx)) => {
598                        warn!(
599                            extractor_id=%&self.extractor_id,
600                            retry_count,
601                            error=%e,
602                            "State synchronization errored!"
603                        );
604
605                        // If we have the end_rx back, we can retry
606                        if let Some(recovered_end_rx) = maybe_end_rx {
607                            current_end_rx = recovered_end_rx;
608
609                            if let SynchronizerError::ConnectionClosed = e {
610                                // break synchronization loop if websocket client is dead
611                                error!(
612                                    "Websocket connection closed. State synchronization exiting."
613                                );
614                                let _ = tx.send(Err(e)).await;
615                                return;
616                            } else {
617                                // Store error in case this is our last retry
618                                final_error = Some(e);
619                            }
620                        } else {
621                            // Close signal was received, exit cleanly
622                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
623                            return;
624                        }
625                    }
626                }
627                sleep(self.retry_cooldown).await;
628                retry_count += 1;
629            }
630            if let Some(e) = final_error {
631                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
632                let _ = tx.send(Err(e)).await;
633            }
634        });
635
636        let handle = SynchronizerTaskHandle::new(jh, end_tx);
637        (handle, rx)
638    }
639}
640
641#[cfg(test)]
642mod test {
643    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
644    //!
645    //! ## Test Coverage Strategy:
646    //!
647    //! ### Shutdown & Close Signal Tests:
648    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
649    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
650    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
651    //!
652    //! ### Cleanup & Error Handling Tests:
653    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
654    //!
655    //! ### Coverage Summary:
656    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
657    //! ✓ Close signal before first deltas   ✓ Close signal during processing
658    //! ✓ Processing errors                  ✓ Channel closure
659    //! ✓ Public API close operations        ✓ Normal completion
660
661    use std::{collections::HashSet, sync::Arc};
662
663    use test_log::test;
664    use tycho_common::dto::{
665        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
666        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
667        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
668        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
669        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
670        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
671    };
672    use uuid::Uuid;
673
674    use super::*;
675    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
676
677    // Required for mock client to implement clone
678    struct ArcRPCClient<T>(Arc<T>);
679
680    // Default derive(Clone) does require T to be Clone as well.
681    impl<T> Clone for ArcRPCClient<T> {
682        fn clone(&self) -> Self {
683            ArcRPCClient(self.0.clone())
684        }
685    }
686
687    #[async_trait]
688    impl<T> RPCClient for ArcRPCClient<T>
689    where
690        T: RPCClient + Sync + Send + 'static,
691    {
692        async fn get_tokens(
693            &self,
694            request: &TokensRequestBody,
695        ) -> Result<TokensRequestResponse, RPCError> {
696            self.0.get_tokens(request).await
697        }
698
699        async fn get_contract_state(
700            &self,
701            request: &StateRequestBody,
702        ) -> Result<StateRequestResponse, RPCError> {
703            self.0.get_contract_state(request).await
704        }
705
706        async fn get_protocol_components(
707            &self,
708            request: &ProtocolComponentsRequestBody,
709        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
710            self.0
711                .get_protocol_components(request)
712                .await
713        }
714
715        async fn get_protocol_states(
716            &self,
717            request: &ProtocolStateRequestBody,
718        ) -> Result<ProtocolStateRequestResponse, RPCError> {
719            self.0
720                .get_protocol_states(request)
721                .await
722        }
723
724        async fn get_protocol_systems(
725            &self,
726            request: &ProtocolSystemsRequestBody,
727        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
728            self.0
729                .get_protocol_systems(request)
730                .await
731        }
732
733        async fn get_component_tvl(
734            &self,
735            request: &ComponentTvlRequestBody,
736        ) -> Result<ComponentTvlRequestResponse, RPCError> {
737            self.0.get_component_tvl(request).await
738        }
739
740        async fn get_traced_entry_points(
741            &self,
742            request: &TracedEntryPointRequestBody,
743        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
744            self.0
745                .get_traced_entry_points(request)
746                .await
747        }
748
749        async fn get_snapshots<'a>(
750            &self,
751            request: &SnapshotParameters<'a>,
752            chunk_size: Option<usize>,
753            concurrency: usize,
754        ) -> Result<Snapshot, RPCError> {
755            self.0
756                .get_snapshots(request, chunk_size, concurrency)
757                .await
758        }
759
760        fn compression(&self) -> bool {
761            self.0.compression()
762        }
763    }
764
765    // Required for mock client to implement clone
766    struct ArcDeltasClient<T>(Arc<T>);
767
768    // Default derive(Clone) does require T to be Clone as well.
769    impl<T> Clone for ArcDeltasClient<T> {
770        fn clone(&self) -> Self {
771            ArcDeltasClient(self.0.clone())
772        }
773    }
774
775    #[async_trait]
776    impl<T> DeltasClient for ArcDeltasClient<T>
777    where
778        T: DeltasClient + Sync + Send + 'static,
779    {
780        async fn subscribe(
781            &self,
782            extractor_id: ExtractorIdentity,
783            options: SubscriptionOptions,
784        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
785            self.0
786                .subscribe(extractor_id, options)
787                .await
788        }
789
790        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
791            self.0
792                .unsubscribe(subscription_id)
793                .await
794        }
795
796        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
797            self.0.connect().await
798        }
799
800        async fn close(&self) -> Result<(), DeltasError> {
801            self.0.close().await
802        }
803    }
804
805    fn with_mocked_clients(
806        native: bool,
807        include_tvl: bool,
808        rpc_client: Option<MockRPCClient>,
809        deltas_client: Option<MockDeltasClient>,
810    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
811    {
812        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
813        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
814
815        ProtocolStateSynchronizer::new(
816            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
817            native,
818            ComponentFilter::with_tvl_range(50.0, 50.0),
819            1,
820            Duration::from_secs(0),
821            true,
822            include_tvl,
823            true, // Does not matter as we mock the client that never compresses
824            rpc_client,
825            deltas_client,
826            10_u64,
827        )
828    }
829
830    fn state_snapshot_native() -> ProtocolStateRequestResponse {
831        ProtocolStateRequestResponse {
832            states: vec![ResponseProtocolState {
833                component_id: "Component1".to_string(),
834                ..Default::default()
835            }],
836            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
837        }
838    }
839
840    fn make_mock_client() -> MockRPCClient {
841        let mut m = MockRPCClient::new();
842        m.expect_compression()
843            .return_const(false);
844        m
845    }
846
847    #[test(tokio::test)]
848    async fn test_get_snapshots_native() {
849        let header = BlockHeader::default();
850        let mut rpc = make_mock_client();
851        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
852
853        let component_clone = component.clone();
854        rpc.expect_get_snapshots()
855            .returning(move |_request, _chunk_size, _concurrency| {
856                Ok(Snapshot {
857                    states: state_snapshot_native()
858                        .states
859                        .into_iter()
860                        .map(|state| {
861                            (
862                                state.component_id.clone(),
863                                ComponentWithState {
864                                    state,
865                                    component: component_clone.clone(),
866                                    entrypoints: vec![],
867                                    component_tvl: None,
868                                },
869                            )
870                        })
871                        .collect(),
872                    vm_storage: HashMap::new(),
873                })
874            });
875
876        rpc.expect_get_traced_entry_points()
877            .returning(|_| {
878                Ok(TracedEntryPointRequestResponse {
879                    traced_entry_points: HashMap::new(),
880                    pagination: PaginationResponse::new(0, 20, 0),
881                })
882            });
883
884        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
885        state_sync
886            .component_tracker
887            .components
888            .insert("Component1".to_string(), component.clone());
889        let components_arg = ["Component1".to_string()];
890        let exp = StateSyncMessage {
891            header: header.clone(),
892            snapshots: Snapshot {
893                states: state_snapshot_native()
894                    .states
895                    .into_iter()
896                    .map(|state| {
897                        (
898                            state.component_id.clone(),
899                            ComponentWithState {
900                                state,
901                                component: component.clone(),
902                                entrypoints: vec![],
903                                component_tvl: None,
904                            },
905                        )
906                    })
907                    .collect(),
908                vm_storage: HashMap::new(),
909            },
910            deltas: None,
911            removed_components: Default::default(),
912        };
913
914        let snap = state_sync
915            .get_snapshots(header, Some(&components_arg))
916            .await
917            .expect("Retrieving snapshot failed");
918
919        assert_eq!(snap, exp);
920    }
921
922    #[test(tokio::test)]
923    async fn test_get_snapshots_native_with_tvl() {
924        let header = BlockHeader::default();
925        let mut rpc = make_mock_client();
926        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
927
928        let component_clone = component.clone();
929        rpc.expect_get_snapshots()
930            .returning(move |_request, _chunk_size, _concurrency| {
931                Ok(Snapshot {
932                    states: state_snapshot_native()
933                        .states
934                        .into_iter()
935                        .map(|state| {
936                            (
937                                state.component_id.clone(),
938                                ComponentWithState {
939                                    state,
940                                    component: component_clone.clone(),
941                                    component_tvl: Some(100.0),
942                                    entrypoints: vec![],
943                                },
944                            )
945                        })
946                        .collect(),
947                    vm_storage: HashMap::new(),
948                })
949            });
950
951        rpc.expect_get_traced_entry_points()
952            .returning(|_| {
953                Ok(TracedEntryPointRequestResponse {
954                    traced_entry_points: HashMap::new(),
955                    pagination: PaginationResponse::new(0, 20, 0),
956                })
957            });
958
959        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
960        state_sync
961            .component_tracker
962            .components
963            .insert("Component1".to_string(), component.clone());
964        let components_arg = ["Component1".to_string()];
965        let exp = StateSyncMessage {
966            header: header.clone(),
967            snapshots: Snapshot {
968                states: state_snapshot_native()
969                    .states
970                    .into_iter()
971                    .map(|state| {
972                        (
973                            state.component_id.clone(),
974                            ComponentWithState {
975                                state,
976                                component: component.clone(),
977                                component_tvl: Some(100.0),
978                                entrypoints: vec![],
979                            },
980                        )
981                    })
982                    .collect(),
983                vm_storage: HashMap::new(),
984            },
985            deltas: None,
986            removed_components: Default::default(),
987        };
988
989        let snap = state_sync
990            .get_snapshots(header, Some(&components_arg))
991            .await
992            .expect("Retrieving snapshot failed");
993
994        assert_eq!(snap, exp);
995    }
996
997    fn state_snapshot_vm() -> StateRequestResponse {
998        StateRequestResponse {
999            accounts: vec![
1000                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1001                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1002            ],
1003            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1004        }
1005    }
1006
1007    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1008        TracedEntryPointRequestResponse {
1009            traced_entry_points: HashMap::from([(
1010                "Component1".to_string(),
1011                vec![(
1012                    EntryPointWithTracingParams {
1013                        entry_point: EntryPoint {
1014                            external_id: "entrypoint_a".to_string(),
1015                            target: Bytes::from("0x0badc0ffee"),
1016                            signature: "sig()".to_string(),
1017                        },
1018                        params: TracingParams::RPCTracer(RPCTracerParams {
1019                            caller: Some(Bytes::from("0x0badc0ffee")),
1020                            calldata: Bytes::from("0x0badc0ffee"),
1021                            state_overrides: None,
1022                            prune_addresses: None,
1023                        }),
1024                    },
1025                    TracingResult {
1026                        retriggers: HashSet::from([(
1027                            Bytes::from("0x0badc0ffee"),
1028                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1029                        )]),
1030                        accessed_slots: HashMap::from([(
1031                            Bytes::from("0x0badc0ffee"),
1032                            HashSet::from([Bytes::from("0xbadbeef0")]),
1033                        )]),
1034                    },
1035                )],
1036            )]),
1037            pagination: PaginationResponse::new(0, 20, 0),
1038        }
1039    }
1040
1041    #[test(tokio::test)]
1042    async fn test_get_snapshots_vm() {
1043        let header = BlockHeader::default();
1044        let mut rpc = make_mock_client();
1045
1046        let traced_ep_response = traced_entry_point_response();
1047        rpc.expect_get_snapshots()
1048            .returning(move |_request, _chunk_size, _concurrency| {
1049                let vm_storage_accounts = state_snapshot_vm();
1050                Ok(Snapshot {
1051                    states: [(
1052                        "Component1".to_string(),
1053                        ComponentWithState {
1054                            state: ResponseProtocolState {
1055                                component_id: "Component1".to_string(),
1056                                ..Default::default()
1057                            },
1058                            component: ProtocolComponent {
1059                                id: "Component1".to_string(),
1060                                contract_ids: vec![
1061                                    Bytes::from("0x0badc0ffee"),
1062                                    Bytes::from("0xbabe42"),
1063                                ],
1064                                ..Default::default()
1065                            },
1066                            component_tvl: None,
1067                            entrypoints: traced_ep_response
1068                                .traced_entry_points
1069                                .get("Component1")
1070                                .cloned()
1071                                .unwrap_or_default(),
1072                        },
1073                    )]
1074                    .into_iter()
1075                    .collect(),
1076                    vm_storage: vm_storage_accounts
1077                        .accounts
1078                        .into_iter()
1079                        .map(|state| (state.address.clone(), state))
1080                        .collect(),
1081                })
1082            });
1083
1084        rpc.expect_get_traced_entry_points()
1085            .returning(|_| Ok(traced_entry_point_response()));
1086
1087        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1088        let component = ProtocolComponent {
1089            id: "Component1".to_string(),
1090            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1091            ..Default::default()
1092        };
1093        state_sync
1094            .component_tracker
1095            .components
1096            .insert("Component1".to_string(), component.clone());
1097        let components_arg = ["Component1".to_string()];
1098        let exp = StateSyncMessage {
1099            header: header.clone(),
1100            snapshots: Snapshot {
1101                states: [(
1102                    component.id.clone(),
1103                    ComponentWithState {
1104                        state: ResponseProtocolState {
1105                            component_id: "Component1".to_string(),
1106                            ..Default::default()
1107                        },
1108                        component: component.clone(),
1109                        component_tvl: None,
1110                        entrypoints: vec![(
1111                            EntryPointWithTracingParams {
1112                                entry_point: EntryPoint {
1113                                    external_id: "entrypoint_a".to_string(),
1114                                    target: Bytes::from("0x0badc0ffee"),
1115                                    signature: "sig()".to_string(),
1116                                },
1117                                params: TracingParams::RPCTracer(RPCTracerParams {
1118                                    caller: Some(Bytes::from("0x0badc0ffee")),
1119                                    calldata: Bytes::from("0x0badc0ffee"),
1120                                    state_overrides: None,
1121                                    prune_addresses: None,
1122                                }),
1123                            },
1124                            TracingResult {
1125                                retriggers: HashSet::from([(
1126                                    Bytes::from("0x0badc0ffee"),
1127                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1128                                )]),
1129                                accessed_slots: HashMap::from([(
1130                                    Bytes::from("0x0badc0ffee"),
1131                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1132                                )]),
1133                            },
1134                        )],
1135                    },
1136                )]
1137                .into_iter()
1138                .collect(),
1139                vm_storage: state_snapshot_vm()
1140                    .accounts
1141                    .into_iter()
1142                    .map(|state| (state.address.clone(), state))
1143                    .collect(),
1144            },
1145            deltas: None,
1146            removed_components: Default::default(),
1147        };
1148
1149        let snap = state_sync
1150            .get_snapshots(header, Some(&components_arg))
1151            .await
1152            .expect("Retrieving snapshot failed");
1153
1154        assert_eq!(snap, exp);
1155    }
1156
1157    #[test(tokio::test)]
1158    async fn test_get_snapshots_vm_with_tvl() {
1159        let header = BlockHeader::default();
1160        let mut rpc = make_mock_client();
1161        let component = ProtocolComponent {
1162            id: "Component1".to_string(),
1163            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1164            ..Default::default()
1165        };
1166
1167        let component_clone = component.clone();
1168        rpc.expect_get_snapshots()
1169            .returning(move |_request, _chunk_size, _concurrency| {
1170                let vm_storage_accounts = state_snapshot_vm();
1171                Ok(Snapshot {
1172                    states: [(
1173                        "Component1".to_string(),
1174                        ComponentWithState {
1175                            state: ResponseProtocolState {
1176                                component_id: "Component1".to_string(),
1177                                ..Default::default()
1178                            },
1179                            component: component_clone.clone(),
1180                            component_tvl: Some(100.0),
1181                            entrypoints: vec![],
1182                        },
1183                    )]
1184                    .into_iter()
1185                    .collect(),
1186                    vm_storage: vm_storage_accounts
1187                        .accounts
1188                        .into_iter()
1189                        .map(|state| (state.address.clone(), state))
1190                        .collect(),
1191                })
1192            });
1193
1194        rpc.expect_get_traced_entry_points()
1195            .returning(|_| {
1196                Ok(TracedEntryPointRequestResponse {
1197                    traced_entry_points: HashMap::new(),
1198                    pagination: PaginationResponse::new(0, 20, 0),
1199                })
1200            });
1201
1202        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1203        state_sync
1204            .component_tracker
1205            .components
1206            .insert("Component1".to_string(), component.clone());
1207        let components_arg = ["Component1".to_string()];
1208        let exp = StateSyncMessage {
1209            header: header.clone(),
1210            snapshots: Snapshot {
1211                states: [(
1212                    component.id.clone(),
1213                    ComponentWithState {
1214                        state: ResponseProtocolState {
1215                            component_id: "Component1".to_string(),
1216                            ..Default::default()
1217                        },
1218                        component: component.clone(),
1219                        component_tvl: Some(100.0),
1220                        entrypoints: vec![],
1221                    },
1222                )]
1223                .into_iter()
1224                .collect(),
1225                vm_storage: state_snapshot_vm()
1226                    .accounts
1227                    .into_iter()
1228                    .map(|state| (state.address.clone(), state))
1229                    .collect(),
1230            },
1231            deltas: None,
1232            removed_components: Default::default(),
1233        };
1234
1235        let snap = state_sync
1236            .get_snapshots(header, Some(&components_arg))
1237            .await
1238            .expect("Retrieving snapshot failed");
1239
1240        assert_eq!(snap, exp);
1241    }
1242
1243    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1244        let mut rpc_client = make_mock_client();
1245        // Mocks for the start_tracking call, these need to come first because they are more
1246        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1247        rpc_client
1248            .expect_get_protocol_components()
1249            .with(mockall::predicate::function(
1250                move |request_params: &ProtocolComponentsRequestBody| {
1251                    if let Some(ids) = request_params.component_ids.as_ref() {
1252                        ids.contains(&"Component3".to_string())
1253                    } else {
1254                        false
1255                    }
1256                },
1257            ))
1258            .returning(|_| {
1259                // return Component3
1260                Ok(ProtocolComponentRequestResponse {
1261                    protocol_components: vec![
1262                        // this component shall have a tvl update above threshold
1263                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1264                    ],
1265                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1266                })
1267            });
1268        // Mock get_snapshots for Component3
1269        rpc_client
1270            .expect_get_snapshots()
1271            .withf(
1272                |request: &SnapshotParameters,
1273                 _chunk_size: &Option<usize>,
1274                 _concurrency: &usize| {
1275                    request
1276                        .components
1277                        .contains_key("Component3")
1278                },
1279            )
1280            .returning(|_request, _chunk_size, _concurrency| {
1281                Ok(Snapshot {
1282                    states: [(
1283                        "Component3".to_string(),
1284                        ComponentWithState {
1285                            state: ResponseProtocolState {
1286                                component_id: "Component3".to_string(),
1287                                ..Default::default()
1288                            },
1289                            component: ProtocolComponent {
1290                                id: "Component3".to_string(),
1291                                ..Default::default()
1292                            },
1293                            component_tvl: Some(1000.0),
1294                            entrypoints: vec![],
1295                        },
1296                    )]
1297                    .into_iter()
1298                    .collect(),
1299                    vm_storage: HashMap::new(),
1300                })
1301            });
1302
1303        // mock calls for the initial state snapshots
1304        rpc_client
1305            .expect_get_protocol_components()
1306            .returning(|_| {
1307                // Initial sync of components
1308                Ok(ProtocolComponentRequestResponse {
1309                    protocol_components: vec![
1310                        // this component shall have a tvl update above threshold
1311                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1312                        // this component shall have a tvl update below threshold.
1313                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1314                        // a third component will have a tvl update above threshold
1315                    ],
1316                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1317                })
1318            });
1319
1320        rpc_client
1321            .expect_get_snapshots()
1322            .returning(|_request, _chunk_size, _concurrency| {
1323                Ok(Snapshot {
1324                    states: [
1325                        (
1326                            "Component1".to_string(),
1327                            ComponentWithState {
1328                                state: ResponseProtocolState {
1329                                    component_id: "Component1".to_string(),
1330                                    ..Default::default()
1331                                },
1332                                component: ProtocolComponent {
1333                                    id: "Component1".to_string(),
1334                                    ..Default::default()
1335                                },
1336                                component_tvl: Some(100.0),
1337                                entrypoints: vec![],
1338                            },
1339                        ),
1340                        (
1341                            "Component2".to_string(),
1342                            ComponentWithState {
1343                                state: ResponseProtocolState {
1344                                    component_id: "Component2".to_string(),
1345                                    ..Default::default()
1346                                },
1347                                component: ProtocolComponent {
1348                                    id: "Component2".to_string(),
1349                                    ..Default::default()
1350                                },
1351                                component_tvl: Some(0.0),
1352                                entrypoints: vec![],
1353                            },
1354                        ),
1355                    ]
1356                    .into_iter()
1357                    .collect(),
1358                    vm_storage: HashMap::new(),
1359                })
1360            });
1361
1362        // Mock get_traced_entry_points for Ethereum chain
1363        rpc_client
1364            .expect_get_traced_entry_points()
1365            .returning(|_| {
1366                Ok(TracedEntryPointRequestResponse {
1367                    traced_entry_points: HashMap::new(),
1368                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1369                })
1370            });
1371
1372        // Mock deltas client and messages
1373        let mut deltas_client = MockDeltasClient::new();
1374        let (tx, rx) = channel(1);
1375        deltas_client
1376            .expect_subscribe()
1377            .return_once(move |_, _| {
1378                // Return subscriber id and a channel
1379                Ok((Uuid::default(), rx))
1380            });
1381
1382        // Expect unsubscribe call during cleanup
1383        deltas_client
1384            .expect_unsubscribe()
1385            .return_once(|_| Ok(()));
1386
1387        (rpc_client, deltas_client, tx)
1388    }
1389
1390    /// Test strategy
1391    ///
1392    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1393    /// - send 2 dummy messages, containing only blocks
1394    /// - third message contains a new component with some significant tvl, one initial component
1395    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1396    #[test(tokio::test)]
1397    async fn test_state_sync() {
1398        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1399        let deltas = [
1400            BlockChanges {
1401                extractor: "uniswap-v2".to_string(),
1402                chain: Chain::Ethereum,
1403                block: Block {
1404                    number: 1,
1405                    hash: Bytes::from("0x01"),
1406                    parent_hash: Bytes::from("0x00"),
1407                    chain: Chain::Ethereum,
1408                    ts: Default::default(),
1409                },
1410                revert: false,
1411                dci_update: DCIUpdate {
1412                    new_entrypoints: HashMap::from([(
1413                        "Component1".to_string(),
1414                        HashSet::from([EntryPoint {
1415                            external_id: "entrypoint_a".to_string(),
1416                            target: Bytes::from("0x0badc0ffee"),
1417                            signature: "sig()".to_string(),
1418                        }]),
1419                    )]),
1420                    new_entrypoint_params: HashMap::from([(
1421                        "entrypoint_a".to_string(),
1422                        HashSet::from([(
1423                            TracingParams::RPCTracer(RPCTracerParams {
1424                                caller: Some(Bytes::from("0x0badc0ffee")),
1425                                calldata: Bytes::from("0x0badc0ffee"),
1426                                state_overrides: None,
1427                                prune_addresses: None,
1428                            }),
1429                            "Component1".to_string(),
1430                        )]),
1431                    )]),
1432                    trace_results: HashMap::from([(
1433                        "entrypoint_a".to_string(),
1434                        TracingResult {
1435                            retriggers: HashSet::from([(
1436                                Bytes::from("0x0badc0ffee"),
1437                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1438                            )]),
1439                            accessed_slots: HashMap::from([(
1440                                Bytes::from("0x0badc0ffee"),
1441                                HashSet::from([Bytes::from("0xbadbeef0")]),
1442                            )]),
1443                        },
1444                    )]),
1445                },
1446                ..Default::default()
1447            },
1448            BlockChanges {
1449                extractor: "uniswap-v2".to_string(),
1450                chain: Chain::Ethereum,
1451                block: Block {
1452                    number: 2,
1453                    hash: Bytes::from("0x02"),
1454                    parent_hash: Bytes::from("0x01"),
1455                    chain: Chain::Ethereum,
1456                    ts: Default::default(),
1457                },
1458                revert: false,
1459                component_tvl: [
1460                    ("Component1".to_string(), 100.0),
1461                    ("Component2".to_string(), 0.0),
1462                    ("Component3".to_string(), 1000.0),
1463                ]
1464                .into_iter()
1465                .collect(),
1466                ..Default::default()
1467            },
1468        ];
1469        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1470        state_sync
1471            .initialize()
1472            .await
1473            .expect("Init failed");
1474
1475        // Test starts here
1476        let (handle, mut rx) = state_sync.start().await;
1477        let (jh, close_tx) = handle.split();
1478        tx.send(deltas[0].clone())
1479            .await
1480            .expect("deltas channel msg 0 closed!");
1481        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1482            .await
1483            .expect("waiting for first state msg timed out!")
1484            .expect("state sync block sender closed!");
1485        tx.send(deltas[1].clone())
1486            .await
1487            .expect("deltas channel msg 1 closed!");
1488        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1489            .await
1490            .expect("waiting for second state msg timed out!")
1491            .expect("state sync block sender closed!");
1492        let _ = close_tx.send(());
1493        jh.await
1494            .expect("state sync task panicked!");
1495
1496        // assertions
1497        let exp1 = StateSyncMessage {
1498            header: BlockHeader {
1499                number: 1,
1500                hash: Bytes::from("0x01"),
1501                parent_hash: Bytes::from("0x00"),
1502                revert: false,
1503                ..Default::default()
1504            },
1505            snapshots: Snapshot {
1506                states: [
1507                    (
1508                        "Component1".to_string(),
1509                        ComponentWithState {
1510                            state: ResponseProtocolState {
1511                                component_id: "Component1".to_string(),
1512                                ..Default::default()
1513                            },
1514                            component: ProtocolComponent {
1515                                id: "Component1".to_string(),
1516                                ..Default::default()
1517                            },
1518                            component_tvl: Some(100.0),
1519                            entrypoints: vec![],
1520                        },
1521                    ),
1522                    (
1523                        "Component2".to_string(),
1524                        ComponentWithState {
1525                            state: ResponseProtocolState {
1526                                component_id: "Component2".to_string(),
1527                                ..Default::default()
1528                            },
1529                            component: ProtocolComponent {
1530                                id: "Component2".to_string(),
1531                                ..Default::default()
1532                            },
1533                            component_tvl: Some(0.0),
1534                            entrypoints: vec![],
1535                        },
1536                    ),
1537                ]
1538                .into_iter()
1539                .collect(),
1540                vm_storage: HashMap::new(),
1541            },
1542            deltas: Some(deltas[0].clone()),
1543            removed_components: Default::default(),
1544        };
1545
1546        let exp2 = StateSyncMessage {
1547            header: BlockHeader {
1548                number: 2,
1549                hash: Bytes::from("0x02"),
1550                parent_hash: Bytes::from("0x01"),
1551                revert: false,
1552                ..Default::default()
1553            },
1554            snapshots: Snapshot {
1555                states: [
1556                    // This is the new component we queried once it passed the tvl threshold.
1557                    (
1558                        "Component3".to_string(),
1559                        ComponentWithState {
1560                            state: ResponseProtocolState {
1561                                component_id: "Component3".to_string(),
1562                                ..Default::default()
1563                            },
1564                            component: ProtocolComponent {
1565                                id: "Component3".to_string(),
1566                                ..Default::default()
1567                            },
1568                            component_tvl: Some(1000.0),
1569                            entrypoints: vec![],
1570                        },
1571                    ),
1572                ]
1573                .into_iter()
1574                .collect(),
1575                vm_storage: HashMap::new(),
1576            },
1577            // Our deltas are empty and since merge methods are
1578            // tested in tycho-common we don't have much to do here.
1579            deltas: Some(BlockChanges {
1580                extractor: "uniswap-v2".to_string(),
1581                chain: Chain::Ethereum,
1582                block: Block {
1583                    number: 2,
1584                    hash: Bytes::from("0x02"),
1585                    parent_hash: Bytes::from("0x01"),
1586                    chain: Chain::Ethereum,
1587                    ts: Default::default(),
1588                },
1589                revert: false,
1590                component_tvl: [
1591                    // "Component2" should not show here.
1592                    ("Component1".to_string(), 100.0),
1593                    ("Component3".to_string(), 1000.0),
1594                ]
1595                .into_iter()
1596                .collect(),
1597                ..Default::default()
1598            }),
1599            // "Component2" was removed, because its tvl changed to 0.
1600            removed_components: [(
1601                "Component2".to_string(),
1602                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1603            )]
1604            .into_iter()
1605            .collect(),
1606        };
1607        assert_eq!(first_msg.unwrap(), exp1);
1608        assert_eq!(second_msg.unwrap(), exp2);
1609    }
1610
1611    #[test(tokio::test)]
1612    async fn test_state_sync_with_tvl_range() {
1613        // Define the range for testing
1614        let remove_tvl_threshold = 5.0;
1615        let add_tvl_threshold = 7.0;
1616
1617        let mut rpc_client = make_mock_client();
1618        let mut deltas_client = MockDeltasClient::new();
1619
1620        rpc_client
1621            .expect_get_protocol_components()
1622            .with(mockall::predicate::function(
1623                move |request_params: &ProtocolComponentsRequestBody| {
1624                    if let Some(ids) = request_params.component_ids.as_ref() {
1625                        ids.contains(&"Component3".to_string())
1626                    } else {
1627                        false
1628                    }
1629                },
1630            ))
1631            .returning(|_| {
1632                Ok(ProtocolComponentRequestResponse {
1633                    protocol_components: vec![ProtocolComponent {
1634                        id: "Component3".to_string(),
1635                        ..Default::default()
1636                    }],
1637                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1638                })
1639            });
1640        // Mock get_snapshots for Component3
1641        rpc_client
1642            .expect_get_snapshots()
1643            .withf(
1644                |request: &SnapshotParameters,
1645                 _chunk_size: &Option<usize>,
1646                 _concurrency: &usize| {
1647                    request
1648                        .components
1649                        .contains_key("Component3")
1650                },
1651            )
1652            .returning(|_request, _chunk_size, _concurrency| {
1653                Ok(Snapshot {
1654                    states: [(
1655                        "Component3".to_string(),
1656                        ComponentWithState {
1657                            state: ResponseProtocolState {
1658                                component_id: "Component3".to_string(),
1659                                ..Default::default()
1660                            },
1661                            component: ProtocolComponent {
1662                                id: "Component3".to_string(),
1663                                ..Default::default()
1664                            },
1665                            component_tvl: Some(10.0),
1666                            entrypoints: vec![],
1667                        },
1668                    )]
1669                    .into_iter()
1670                    .collect(),
1671                    vm_storage: HashMap::new(),
1672                })
1673            });
1674
1675        // Mock for the initial snapshot retrieval
1676        rpc_client
1677            .expect_get_protocol_components()
1678            .returning(|_| {
1679                Ok(ProtocolComponentRequestResponse {
1680                    protocol_components: vec![
1681                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1682                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1683                    ],
1684                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1685                })
1686            });
1687
1688        // Mock get_snapshots for initial snapshot
1689        rpc_client
1690            .expect_get_snapshots()
1691            .returning(|_request, _chunk_size, _concurrency| {
1692                Ok(Snapshot {
1693                    states: [
1694                        (
1695                            "Component1".to_string(),
1696                            ComponentWithState {
1697                                state: ResponseProtocolState {
1698                                    component_id: "Component1".to_string(),
1699                                    ..Default::default()
1700                                },
1701                                component: ProtocolComponent {
1702                                    id: "Component1".to_string(),
1703                                    ..Default::default()
1704                                },
1705                                component_tvl: Some(6.0),
1706                                entrypoints: vec![],
1707                            },
1708                        ),
1709                        (
1710                            "Component2".to_string(),
1711                            ComponentWithState {
1712                                state: ResponseProtocolState {
1713                                    component_id: "Component2".to_string(),
1714                                    ..Default::default()
1715                                },
1716                                component: ProtocolComponent {
1717                                    id: "Component2".to_string(),
1718                                    ..Default::default()
1719                                },
1720                                component_tvl: Some(2.0),
1721                                entrypoints: vec![],
1722                            },
1723                        ),
1724                    ]
1725                    .into_iter()
1726                    .collect(),
1727                    vm_storage: HashMap::new(),
1728                })
1729            });
1730
1731        // Mock get_traced_entry_points for Ethereum chain
1732        rpc_client
1733            .expect_get_traced_entry_points()
1734            .returning(|_| {
1735                Ok(TracedEntryPointRequestResponse {
1736                    traced_entry_points: HashMap::new(),
1737                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1738                })
1739            });
1740
1741        let (tx, rx) = channel(1);
1742        deltas_client
1743            .expect_subscribe()
1744            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1745
1746        // Expect unsubscribe call during cleanup
1747        deltas_client
1748            .expect_unsubscribe()
1749            .return_once(|_| Ok(()));
1750
1751        let mut state_sync = ProtocolStateSynchronizer::new(
1752            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1753            true,
1754            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1755            1,
1756            Duration::from_secs(0),
1757            true,
1758            true,
1759            true,
1760            ArcRPCClient(Arc::new(rpc_client)),
1761            ArcDeltasClient(Arc::new(deltas_client)),
1762            10_u64,
1763        );
1764        state_sync
1765            .initialize()
1766            .await
1767            .expect("Init failed");
1768
1769        // Simulate the incoming BlockChanges
1770        let deltas = [
1771            BlockChanges {
1772                extractor: "uniswap-v2".to_string(),
1773                chain: Chain::Ethereum,
1774                block: Block {
1775                    number: 1,
1776                    hash: Bytes::from("0x01"),
1777                    parent_hash: Bytes::from("0x00"),
1778                    chain: Chain::Ethereum,
1779                    ts: Default::default(),
1780                },
1781                revert: false,
1782                ..Default::default()
1783            },
1784            BlockChanges {
1785                extractor: "uniswap-v2".to_string(),
1786                chain: Chain::Ethereum,
1787                block: Block {
1788                    number: 2,
1789                    hash: Bytes::from("0x02"),
1790                    parent_hash: Bytes::from("0x01"),
1791                    chain: Chain::Ethereum,
1792                    ts: Default::default(),
1793                },
1794                revert: false,
1795                component_tvl: [
1796                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1797                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1798                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1799                ]
1800                .into_iter()
1801                .collect(),
1802                ..Default::default()
1803            },
1804        ];
1805
1806        let (handle, mut rx) = state_sync.start().await;
1807        let (jh, close_tx) = handle.split();
1808
1809        // Simulate sending delta messages
1810        tx.send(deltas[0].clone())
1811            .await
1812            .expect("deltas channel msg 0 closed!");
1813
1814        // Expecting to receive the initial state message
1815        let _ = timeout(Duration::from_millis(100), rx.recv())
1816            .await
1817            .expect("waiting for first state msg timed out!")
1818            .expect("state sync block sender closed!");
1819
1820        // Send the third message, which should trigger TVL-based changes
1821        tx.send(deltas[1].clone())
1822            .await
1823            .expect("deltas channel msg 1 closed!");
1824        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1825            .await
1826            .expect("waiting for second state msg timed out!")
1827            .expect("state sync block sender closed!")
1828            .expect("no error");
1829
1830        let _ = close_tx.send(());
1831        jh.await
1832            .expect("state sync task panicked!");
1833
1834        let expected_second_msg = StateSyncMessage {
1835            header: BlockHeader {
1836                number: 2,
1837                hash: Bytes::from("0x02"),
1838                parent_hash: Bytes::from("0x01"),
1839                revert: false,
1840                ..Default::default()
1841            },
1842            snapshots: Snapshot {
1843                states: [(
1844                    "Component3".to_string(),
1845                    ComponentWithState {
1846                        state: ResponseProtocolState {
1847                            component_id: "Component3".to_string(),
1848                            ..Default::default()
1849                        },
1850                        component: ProtocolComponent {
1851                            id: "Component3".to_string(),
1852                            ..Default::default()
1853                        },
1854                        component_tvl: Some(10.0),
1855                        entrypoints: vec![], // TODO: add entrypoints?
1856                    },
1857                )]
1858                .into_iter()
1859                .collect(),
1860                vm_storage: HashMap::new(),
1861            },
1862            deltas: Some(BlockChanges {
1863                extractor: "uniswap-v2".to_string(),
1864                chain: Chain::Ethereum,
1865                block: Block {
1866                    number: 2,
1867                    hash: Bytes::from("0x02"),
1868                    parent_hash: Bytes::from("0x01"),
1869                    chain: Chain::Ethereum,
1870                    ts: Default::default(),
1871                },
1872                revert: false,
1873                component_tvl: [
1874                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1875                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1876                ]
1877                .into_iter()
1878                .collect(),
1879                ..Default::default()
1880            }),
1881            removed_components: [(
1882                "Component2".to_string(),
1883                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1884            )]
1885            .into_iter()
1886            .collect(),
1887        };
1888
1889        assert_eq!(second_msg, expected_second_msg);
1890    }
1891
1892    #[test(tokio::test)]
1893    async fn test_public_close_api_functionality() {
1894        // Tests the public close() API through the StateSynchronizer trait:
1895        // - close() fails before start() is called
1896        // - close() succeeds while synchronizer is running
1897        // - close() fails after already closed
1898        // This tests the full start/close lifecycle via the public API
1899
1900        let mut rpc_client = make_mock_client();
1901        let mut deltas_client = MockDeltasClient::new();
1902
1903        // Mock the initial components call
1904        rpc_client
1905            .expect_get_protocol_components()
1906            .returning(|_| {
1907                Ok(ProtocolComponentRequestResponse {
1908                    protocol_components: vec![],
1909                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1910                })
1911            });
1912
1913        // Set up deltas client that will wait for messages (blocking in state_sync)
1914        let (_tx, rx) = channel(1);
1915        deltas_client
1916            .expect_subscribe()
1917            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1918
1919        // Expect unsubscribe call during cleanup
1920        deltas_client
1921            .expect_unsubscribe()
1922            .return_once(|_| Ok(()));
1923
1924        let mut state_sync = ProtocolStateSynchronizer::new(
1925            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1926            true,
1927            ComponentFilter::with_tvl_range(0.0, 0.0),
1928            5, // Enough retries
1929            Duration::from_secs(0),
1930            true,
1931            false,
1932            true,
1933            ArcRPCClient(Arc::new(rpc_client)),
1934            ArcDeltasClient(Arc::new(deltas_client)),
1935            10000_u64, // Long timeout so task doesn't exit on its own
1936        );
1937
1938        state_sync
1939            .initialize()
1940            .await
1941            .expect("Init should succeed");
1942
1943        // Start the synchronizer and test the new split-based close mechanism
1944        let (handle, _rx) = state_sync.start().await;
1945        let (jh, close_tx) = handle.split();
1946
1947        // Give it time to start up and enter state_sync
1948        tokio::time::sleep(Duration::from_millis(100)).await;
1949
1950        // Send close signal should succeed
1951        close_tx
1952            .send(())
1953            .expect("Should be able to send close signal");
1954        // Task should stop cleanly
1955        jh.await.expect("Task should not panic");
1956    }
1957
1958    #[test(tokio::test)]
1959    async fn test_cleanup_runs_when_state_sync_processing_errors() {
1960        // Tests that cleanup code runs when state_sync() errors during delta processing.
1961        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
1962        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
1963
1964        let mut rpc_client = make_mock_client();
1965        let mut deltas_client = MockDeltasClient::new();
1966
1967        // Mock the initial components call
1968        rpc_client
1969            .expect_get_protocol_components()
1970            .returning(|_| {
1971                Ok(ProtocolComponentRequestResponse {
1972                    protocol_components: vec![],
1973                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1974                })
1975            });
1976
1977        // Mock to fail during snapshot retrieval (this will cause an error during processing)
1978        rpc_client
1979            .expect_get_protocol_states()
1980            .returning(|_| {
1981                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1982            });
1983
1984        // Set up deltas client to send one message that will trigger snapshot retrieval
1985        let (tx, rx) = channel(10);
1986        deltas_client
1987            .expect_subscribe()
1988            .return_once(move |_, _| {
1989                // Send a delta message that will require a snapshot
1990                let delta = BlockChanges {
1991                    extractor: "test".to_string(),
1992                    chain: Chain::Ethereum,
1993                    block: Block {
1994                        hash: Bytes::from("0x0123"),
1995                        number: 1,
1996                        parent_hash: Bytes::from("0x0000"),
1997                        chain: Chain::Ethereum,
1998                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
1999                            .unwrap()
2000                            .naive_utc(),
2001                    },
2002                    revert: false,
2003                    // Add a new component to trigger snapshot request
2004                    new_protocol_components: [(
2005                        "new_component".to_string(),
2006                        ProtocolComponent {
2007                            id: "new_component".to_string(),
2008                            protocol_system: "test_protocol".to_string(),
2009                            protocol_type_name: "test".to_string(),
2010                            chain: Chain::Ethereum,
2011                            tokens: vec![Bytes::from("0x0badc0ffee")],
2012                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
2013                            static_attributes: Default::default(),
2014                            creation_tx: Default::default(),
2015                            created_at: Default::default(),
2016                            change: Default::default(),
2017                        },
2018                    )]
2019                    .into_iter()
2020                    .collect(),
2021                    component_tvl: [("new_component".to_string(), 100.0)]
2022                        .into_iter()
2023                        .collect(),
2024                    ..Default::default()
2025                };
2026
2027                tokio::spawn(async move {
2028                    let _ = tx.send(delta).await;
2029                    // Close the channel after sending one message
2030                });
2031
2032                Ok((Uuid::default(), rx))
2033            });
2034
2035        // Expect unsubscribe call during cleanup
2036        deltas_client
2037            .expect_unsubscribe()
2038            .return_once(|_| Ok(()));
2039
2040        let mut state_sync = ProtocolStateSynchronizer::new(
2041            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2042            true,
2043            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2044            1,
2045            Duration::from_secs(0),
2046            true,
2047            false,
2048            true,
2049            ArcRPCClient(Arc::new(rpc_client)),
2050            ArcDeltasClient(Arc::new(deltas_client)),
2051            5000_u64,
2052        );
2053
2054        state_sync
2055            .initialize()
2056            .await
2057            .expect("Init should succeed");
2058
2059        // Before calling state_sync, set a value in last_synced_block
2060        state_sync.last_synced_block = Some(BlockHeader {
2061            hash: Bytes::from("0x0badc0ffee"),
2062            number: 42,
2063            parent_hash: Bytes::from("0xbadbeef0"),
2064            revert: false,
2065            timestamp: 123456789,
2066        });
2067
2068        // Create a channel for state_sync to send messages to
2069        let (mut block_tx, _block_rx) = channel(10);
2070
2071        // Call state_sync directly - this should error during processing
2072        let (_end_tx, end_rx) = oneshot::channel::<()>();
2073        let result = state_sync
2074            .state_sync(&mut block_tx, end_rx)
2075            .await;
2076        // Verify that state_sync returned an error
2077        assert!(result.is_err(), "state_sync should have errored during processing");
2078
2079        // Note: We can't verify internal state cleanup since state_sync consumes self,
2080        // but the cleanup logic is still tested by the fact that the method returns properly.
2081    }
2082
2083    #[test(tokio::test)]
2084    async fn test_close_signal_while_waiting_for_first_deltas() {
2085        // Tests close signal handling during the initial "waiting for deltas" phase.
2086        // This is the earliest possible close scenario - before any deltas are received.
2087        // Verifies: close signal received while waiting for first message triggers cleanup
2088        let mut rpc_client = make_mock_client();
2089        let mut deltas_client = MockDeltasClient::new();
2090
2091        rpc_client
2092            .expect_get_protocol_components()
2093            .returning(|_| {
2094                Ok(ProtocolComponentRequestResponse {
2095                    protocol_components: vec![],
2096                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2097                })
2098            });
2099
2100        let (_tx, rx) = channel(1);
2101        deltas_client
2102            .expect_subscribe()
2103            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2104
2105        deltas_client
2106            .expect_unsubscribe()
2107            .return_once(|_| Ok(()));
2108
2109        let mut state_sync = ProtocolStateSynchronizer::new(
2110            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2111            true,
2112            ComponentFilter::with_tvl_range(0.0, 0.0),
2113            1,
2114            Duration::from_secs(0),
2115            true,
2116            true,
2117            false,
2118            ArcRPCClient(Arc::new(rpc_client)),
2119            ArcDeltasClient(Arc::new(deltas_client)),
2120            10000_u64,
2121        );
2122
2123        state_sync
2124            .initialize()
2125            .await
2126            .expect("Init should succeed");
2127
2128        let (mut block_tx, _block_rx) = channel(10);
2129        let (end_tx, end_rx) = oneshot::channel::<()>();
2130
2131        // Start state_sync in a task
2132        let state_sync_handle = tokio::spawn(async move {
2133            state_sync
2134                .state_sync(&mut block_tx, end_rx)
2135                .await
2136        });
2137
2138        // Give it a moment to start
2139        tokio::time::sleep(Duration::from_millis(100)).await;
2140
2141        // Send close signal
2142        let _ = end_tx.send(());
2143
2144        // state_sync should exit cleanly
2145        let result = state_sync_handle
2146            .await
2147            .expect("Task should not panic");
2148        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2149
2150        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2151    }
2152
2153    #[test(tokio::test)]
2154    async fn test_close_signal_during_main_processing_loop() {
2155        // Tests close signal handling during the main delta processing loop.
2156        // This tests the scenario where first message is processed successfully,
2157        // then close signal is received while waiting for subsequent deltas.
2158        // Verifies: close signal in main loop (after initialization) triggers cleanup
2159
2160        let mut rpc_client = make_mock_client();
2161        let mut deltas_client = MockDeltasClient::new();
2162
2163        // Mock the initial components call
2164        rpc_client
2165            .expect_get_protocol_components()
2166            .returning(|_| {
2167                Ok(ProtocolComponentRequestResponse {
2168                    protocol_components: vec![],
2169                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2170                })
2171            });
2172
2173        // Mock the snapshot retrieval that happens after first message
2174        rpc_client
2175            .expect_get_protocol_states()
2176            .returning(|_| {
2177                Ok(ProtocolStateRequestResponse {
2178                    states: vec![],
2179                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2180                })
2181            });
2182
2183        rpc_client
2184            .expect_get_component_tvl()
2185            .returning(|_| {
2186                Ok(ComponentTvlRequestResponse {
2187                    tvl: HashMap::new(),
2188                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2189                })
2190            });
2191
2192        rpc_client
2193            .expect_get_traced_entry_points()
2194            .returning(|_| {
2195                Ok(TracedEntryPointRequestResponse {
2196                    traced_entry_points: HashMap::new(),
2197                    pagination: PaginationResponse::new(0, 20, 0),
2198                })
2199            });
2200
2201        // Set up deltas client to send one message, then keep channel open
2202        let (tx, rx) = channel(10);
2203        deltas_client
2204            .expect_subscribe()
2205            .return_once(move |_, _| {
2206                // Send first message immediately
2207                let first_delta = BlockChanges {
2208                    extractor: "test".to_string(),
2209                    chain: Chain::Ethereum,
2210                    block: Block {
2211                        hash: Bytes::from("0x0123"),
2212                        number: 1,
2213                        parent_hash: Bytes::from("0x0000"),
2214                        chain: Chain::Ethereum,
2215                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2216                            .unwrap()
2217                            .naive_utc(),
2218                    },
2219                    revert: false,
2220                    ..Default::default()
2221                };
2222
2223                tokio::spawn(async move {
2224                    let _ = tx.send(first_delta).await;
2225                    // Keep the sender alive but don't send more messages
2226                    // This will make the recv() block waiting for the next message
2227                    tokio::time::sleep(Duration::from_secs(30)).await;
2228                });
2229
2230                Ok((Uuid::default(), rx))
2231            });
2232
2233        deltas_client
2234            .expect_unsubscribe()
2235            .return_once(|_| Ok(()));
2236
2237        let mut state_sync = ProtocolStateSynchronizer::new(
2238            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2239            true,
2240            ComponentFilter::with_tvl_range(0.0, 1000.0),
2241            1,
2242            Duration::from_secs(0),
2243            true,
2244            false,
2245            true,
2246            ArcRPCClient(Arc::new(rpc_client)),
2247            ArcDeltasClient(Arc::new(deltas_client)),
2248            10000_u64,
2249        );
2250
2251        state_sync
2252            .initialize()
2253            .await
2254            .expect("Init should succeed");
2255
2256        let (mut block_tx, mut block_rx) = channel(10);
2257        let (end_tx, end_rx) = oneshot::channel::<()>();
2258
2259        // Start state_sync in a task
2260        let state_sync_handle = tokio::spawn(async move {
2261            state_sync
2262                .state_sync(&mut block_tx, end_rx)
2263                .await
2264        });
2265
2266        // Wait for the first message to be processed (snapshot sent)
2267        let first_snapshot = block_rx
2268            .recv()
2269            .await
2270            .expect("Should receive first snapshot")
2271            .expect("Synchronizer error");
2272        assert!(
2273            !first_snapshot
2274                .snapshots
2275                .states
2276                .is_empty() ||
2277                first_snapshot.deltas.is_some()
2278        );
2279        // Now send close signal - this should be handled in the main processing loop
2280        let _ = end_tx.send(());
2281
2282        // state_sync should exit cleanly after receiving close signal in main loop
2283        let result = state_sync_handle
2284            .await
2285            .expect("Task should not panic");
2286        assert!(
2287            result.is_ok(),
2288            "state_sync should exit cleanly when closed after first message: {result:?}"
2289        );
2290    }
2291
2292    #[test(tokio::test)]
2293    async fn test_max_retries_exceeded_error_propagation() {
2294        // Test that when max_retries is exceeded, the final error is sent through the channel
2295        // to the receiver and the synchronizer task exits cleanly
2296
2297        let mut rpc_client = make_mock_client();
2298        let mut deltas_client = MockDeltasClient::new();
2299
2300        // Mock the initial components call to succeed
2301        rpc_client
2302            .expect_get_protocol_components()
2303            .returning(|_| {
2304                Ok(ProtocolComponentRequestResponse {
2305                    protocol_components: vec![],
2306                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2307                })
2308            });
2309
2310        // Set up deltas client to consistently fail after subscription
2311        // This will cause connection errors and trigger retries
2312        deltas_client
2313            .expect_subscribe()
2314            .returning(|_, _| {
2315                // Return a connection error to trigger retries
2316                Err(DeltasError::NotConnected)
2317            });
2318
2319        // Expect multiple unsubscribe calls during retries
2320        deltas_client
2321            .expect_unsubscribe()
2322            .returning(|_| Ok(()))
2323            .times(0..=5);
2324
2325        // Create synchronizer with only 2 retries and short cooldown
2326        let mut state_sync = ProtocolStateSynchronizer::new(
2327            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2328            true,
2329            ComponentFilter::with_tvl_range(0.0, 1000.0),
2330            2,                         // max_retries = 2
2331            Duration::from_millis(10), // short retry cooldown
2332            true,
2333            false,
2334            true,
2335            ArcRPCClient(Arc::new(rpc_client)),
2336            ArcDeltasClient(Arc::new(deltas_client)),
2337            1000_u64,
2338        );
2339
2340        state_sync
2341            .initialize()
2342            .await
2343            .expect("Init should succeed");
2344
2345        // Start the synchronizer - it should fail to subscribe and retry
2346        let (handle, mut rx) = state_sync.start().await;
2347        let (jh, _close_tx) = handle.split();
2348
2349        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2350            .await
2351            .expect("responsds in time")
2352            .expect("channel open");
2353
2354        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2355        if let Err(err) = res {
2356            assert!(
2357                matches!(err, SynchronizerError::ConnectionClosed),
2358                "Expected ConnectionClosed error, got: {:?}",
2359                err
2360            );
2361        } else {
2362            panic!("Expected an error")
2363        }
2364
2365        // The task should complete (not hang) after max retries
2366        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2367        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2368    }
2369
2370    #[test(tokio::test)]
2371    async fn test_is_next_expected() {
2372        // Test the is_next_expected function to ensure it correctly identifies
2373        // when an incoming block is the expected next block in the chain
2374
2375        let mut state_sync = with_mocked_clients(true, false, None, None);
2376
2377        // Test 1: No previous block - should return false
2378        let incoming_header = BlockHeader {
2379            number: 100,
2380            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2381            parent_hash: Bytes::from(
2382                "0x0000000000000000000000000000000000000000000000000000000000000000",
2383            ),
2384            revert: false,
2385            timestamp: 123456789,
2386        };
2387        assert!(
2388            !state_sync.is_next_expected(&incoming_header),
2389            "Should return false when no previous block is set"
2390        );
2391
2392        // Test 2: Set a previous block and test with matching parent hash
2393        let previous_header = BlockHeader {
2394            number: 99,
2395            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2396            parent_hash: Bytes::from(
2397                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2398            ),
2399            revert: false,
2400            timestamp: 123456788,
2401        };
2402        state_sync.last_synced_block = Some(previous_header.clone());
2403
2404        assert!(
2405            state_sync.is_next_expected(&incoming_header),
2406            "Should return true when incoming parent_hash matches previous hash"
2407        );
2408
2409        // Test 3: Test with non-matching parent hash
2410        let non_matching_header = BlockHeader {
2411            number: 100,
2412            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2413            parent_hash: Bytes::from(
2414                "0x1111111111111111111111111111111111111111111111111111111111111111",
2415            ), // Wrong parent hash
2416            revert: false,
2417            timestamp: 123456789,
2418        };
2419        assert!(
2420            !state_sync.is_next_expected(&non_matching_header),
2421            "Should return false when incoming parent_hash doesn't match previous hash"
2422        );
2423    }
2424
2425    #[test(tokio::test)]
2426    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2427        // Test that on synchronizer restart with the next expected block,
2428        // get_snapshot is not called and only deltas are sent
2429
2430        let mut rpc_client = make_mock_client();
2431        let mut deltas_client = MockDeltasClient::new();
2432
2433        // Mock the initial components call
2434        rpc_client
2435            .expect_get_protocol_components()
2436            .returning(|_| {
2437                Ok(ProtocolComponentRequestResponse {
2438                    protocol_components: vec![ProtocolComponent {
2439                        id: "Component1".to_string(),
2440                        ..Default::default()
2441                    }],
2442                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2443                })
2444            });
2445
2446        // Set up deltas client to send a message that is the next expected block
2447        let (tx, rx) = channel(10);
2448        deltas_client
2449            .expect_subscribe()
2450            .return_once(move |_, _| {
2451                let expected_next_delta = BlockChanges {
2452                    extractor: "uniswap-v2".to_string(),
2453                    chain: Chain::Ethereum,
2454                    block: Block {
2455                        hash: Bytes::from(
2456                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2457                        ), // This will be the next expected block
2458                        number: 2,
2459                        parent_hash: Bytes::from(
2460                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2461                        ), // This matches our last synced block hash
2462                        chain: Chain::Ethereum,
2463                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2464                            .unwrap()
2465                            .naive_utc(),
2466                    },
2467                    revert: false,
2468                    ..Default::default()
2469                };
2470
2471                tokio::spawn(async move {
2472                    let _ = tx.send(expected_next_delta).await;
2473                });
2474
2475                Ok((Uuid::default(), rx))
2476            });
2477
2478        deltas_client
2479            .expect_unsubscribe()
2480            .return_once(|_| Ok(()));
2481
2482        let mut state_sync = ProtocolStateSynchronizer::new(
2483            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2484            true,
2485            ComponentFilter::with_tvl_range(0.0, 1000.0),
2486            1,
2487            Duration::from_secs(0),
2488            true, // include_snapshots = true
2489            false,
2490            true,
2491            ArcRPCClient(Arc::new(rpc_client)),
2492            ArcDeltasClient(Arc::new(deltas_client)),
2493            10000_u64,
2494        );
2495
2496        // Initialize and set up the last synced block to simulate a restart scenario
2497        state_sync
2498            .initialize()
2499            .await
2500            .expect("Init should succeed");
2501
2502        // Set last_synced_block to simulate that we've previously synced block 1
2503        state_sync.last_synced_block = Some(BlockHeader {
2504            number: 1,
2505            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2506            parent_hash: Bytes::from(
2507                "0x0000000000000000000000000000000000000000000000000000000000000000",
2508            ),
2509            revert: false,
2510            timestamp: 123456789,
2511        });
2512
2513        let (mut block_tx, mut block_rx) = channel(10);
2514        let (end_tx, end_rx) = oneshot::channel::<()>();
2515
2516        // Start state_sync
2517        let state_sync_handle = tokio::spawn(async move {
2518            state_sync
2519                .state_sync(&mut block_tx, end_rx)
2520                .await
2521        });
2522
2523        // Wait for the message - it should be a delta-only message (no snapshots)
2524        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2525            .await
2526            .expect("Should receive message within timeout")
2527            .expect("Channel should be open")
2528            .expect("Should not be an error");
2529
2530        // Send close signal
2531        let _ = end_tx.send(());
2532
2533        // Wait for state_sync to finish
2534        let _ = state_sync_handle
2535            .await
2536            .expect("Task should not panic");
2537
2538        // Verify the message contains deltas but no snapshots
2539        // (because we skipped snapshot retrieval)
2540        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2541        assert!(
2542            result_msg.snapshots.states.is_empty(),
2543            "Should not contain snapshots when next expected block is received"
2544        );
2545
2546        // Verify the block details match our expected next block
2547        if let Some(deltas) = &result_msg.deltas {
2548            assert_eq!(deltas.block.number, 2);
2549            assert_eq!(
2550                deltas.block.hash,
2551                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2552            );
2553            assert_eq!(
2554                deltas.block.parent_hash,
2555                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2556            );
2557        }
2558    }
2559
2560    #[test(tokio::test)]
2561    async fn test_skip_previously_processed_messages() {
2562        // Test that the synchronizer skips messages for blocks that have already been processed
2563        // This simulates a service restart scenario where old messages are re-emitted
2564
2565        let mut rpc_client = make_mock_client();
2566        let mut deltas_client = MockDeltasClient::new();
2567
2568        // Mock the initial components call
2569        rpc_client
2570            .expect_get_protocol_components()
2571            .returning(|_| {
2572                Ok(ProtocolComponentRequestResponse {
2573                    protocol_components: vec![ProtocolComponent {
2574                        id: "Component1".to_string(),
2575                        ..Default::default()
2576                    }],
2577                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2578                })
2579            });
2580
2581        // Mock snapshot calls for when we process the expected next block (block 6)
2582        rpc_client
2583            .expect_get_protocol_states()
2584            .returning(|_| {
2585                Ok(ProtocolStateRequestResponse {
2586                    states: vec![ResponseProtocolState {
2587                        component_id: "Component1".to_string(),
2588                        ..Default::default()
2589                    }],
2590                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2591                })
2592            });
2593
2594        rpc_client
2595            .expect_get_component_tvl()
2596            .returning(|_| {
2597                Ok(ComponentTvlRequestResponse {
2598                    tvl: [("Component1".to_string(), 100.0)]
2599                        .into_iter()
2600                        .collect(),
2601                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2602                })
2603            });
2604
2605        rpc_client
2606            .expect_get_traced_entry_points()
2607            .returning(|_| {
2608                Ok(TracedEntryPointRequestResponse {
2609                    traced_entry_points: HashMap::new(),
2610                    pagination: PaginationResponse::new(0, 20, 0),
2611                })
2612            });
2613
2614        // Set up deltas client to send old messages first, then the expected next block
2615        let (tx, rx) = channel(10);
2616        deltas_client
2617            .expect_subscribe()
2618            .return_once(move |_, _| {
2619                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2620                let old_messages = vec![
2621                    BlockChanges {
2622                        extractor: "uniswap-v2".to_string(),
2623                        chain: Chain::Ethereum,
2624                        block: Block {
2625                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2626                            number: 3,
2627                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2628                            chain: Chain::Ethereum,
2629                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2630                        },
2631                        revert: false,
2632                        ..Default::default()
2633                    },
2634                    BlockChanges {
2635                        extractor: "uniswap-v2".to_string(),
2636                        chain: Chain::Ethereum,
2637                        block: Block {
2638                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2639                            number: 4,
2640                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2641                            chain: Chain::Ethereum,
2642                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2643                        },
2644                        revert: false,
2645                        ..Default::default()
2646                    },
2647                    BlockChanges {
2648                        extractor: "uniswap-v2".to_string(),
2649                        chain: Chain::Ethereum,
2650                        block: Block {
2651                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2652                            number: 5,
2653                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2654                            chain: Chain::Ethereum,
2655                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2656                        },
2657                        revert: false,
2658                        ..Default::default()
2659                    },
2660                    // This is the expected next block (block 6)
2661                    BlockChanges {
2662                        extractor: "uniswap-v2".to_string(),
2663                        chain: Chain::Ethereum,
2664                        block: Block {
2665                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2666                            number: 6,
2667                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2668                            chain: Chain::Ethereum,
2669                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2670                        },
2671                        revert: false,
2672                        ..Default::default()
2673                    },
2674                ];
2675
2676                tokio::spawn(async move {
2677                    for message in old_messages {
2678                        let _ = tx.send(message).await;
2679                        tokio::time::sleep(Duration::from_millis(10)).await;
2680                    }
2681                });
2682
2683                Ok((Uuid::default(), rx))
2684            });
2685
2686        deltas_client
2687            .expect_unsubscribe()
2688            .return_once(|_| Ok(()));
2689
2690        let mut state_sync = ProtocolStateSynchronizer::new(
2691            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2692            true,
2693            ComponentFilter::with_tvl_range(0.0, 1000.0),
2694            1,
2695            Duration::from_secs(0),
2696            true,
2697            true,
2698            true,
2699            ArcRPCClient(Arc::new(rpc_client)),
2700            ArcDeltasClient(Arc::new(deltas_client)),
2701            10000_u64,
2702        );
2703
2704        // Initialize and set last_synced_block to simulate we've already processed block 5
2705        state_sync
2706            .initialize()
2707            .await
2708            .expect("Init should succeed");
2709
2710        state_sync.last_synced_block = Some(BlockHeader {
2711            number: 5,
2712            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2713            parent_hash: Bytes::from(
2714                "0x0000000000000000000000000000000000000000000000000000000000000004",
2715            ),
2716            revert: false,
2717            timestamp: 1234567892,
2718        });
2719
2720        let (mut block_tx, mut block_rx) = channel(10);
2721        let (end_tx, end_rx) = oneshot::channel::<()>();
2722
2723        // Start state_sync
2724        let state_sync_handle = tokio::spawn(async move {
2725            state_sync
2726                .state_sync(&mut block_tx, end_rx)
2727                .await
2728        });
2729
2730        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2731        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2732            .await
2733            .expect("Should receive message within timeout")
2734            .expect("Channel should be open")
2735            .expect("Should not be an error");
2736
2737        // Send close signal
2738        let _ = end_tx.send(());
2739
2740        // Wait for state_sync to finish
2741        let _ = state_sync_handle
2742            .await
2743            .expect("Task should not panic");
2744
2745        // Verify we only got the message for block 6 (the expected next block)
2746        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2747        if let Some(deltas) = &result_msg.deltas {
2748            assert_eq!(
2749                deltas.block.number, 6,
2750                "Should only process block 6, skipping earlier blocks"
2751            );
2752            assert_eq!(
2753                deltas.block.hash,
2754                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2755            );
2756        }
2757
2758        // Verify that no additional messages are received immediately
2759        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2760        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2761            Err(_) => {
2762                // Timeout is expected - no more messages should come
2763            }
2764            Ok(Some(Err(_))) => {
2765                // Error received is also acceptable (connection closed)
2766            }
2767            Ok(Some(Ok(_))) => {
2768                panic!("Should not receive additional messages - old blocks should be skipped");
2769            }
2770            Ok(None) => {
2771                // Channel closed is also acceptable
2772            }
2773        }
2774    }
2775}