tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, Chain, EntryPointWithTracingParams, ExtractorIdentity, ProtocolComponent,
19        ResponseAccount, ResponseProtocolState, TracingResult,
20    },
21    Bytes,
22};
23
24use crate::{
25    deltas::{DeltasClient, SubscriptionOptions},
26    feed::{
27        component_tracker::{ComponentFilter, ComponentTracker},
28        BlockHeader, HeaderLike,
29    },
30    rpc::{RPCClient, RPCError, SnapshotParameters, RPC_CLIENT_CONCURRENCY},
31    DeltasError,
32};
33
34#[derive(Error, Debug)]
35pub enum SynchronizerError {
36    /// RPC client failures.
37    #[error("RPC error: {0}")]
38    RPCError(#[from] RPCError),
39
40    /// Issues with the main channel
41    #[error("{0}")]
42    ChannelError(String),
43
44    /// Timeout elapsed errors.
45    #[error("Timeout error: {0}")]
46    Timeout(String),
47
48    /// Failed to close the synchronizer.
49    #[error("Failed to close synchronizer: {0}")]
50    CloseError(String),
51
52    /// Server connection failures or interruptions.
53    #[error("Connection error: {0}")]
54    ConnectionError(String),
55
56    /// Connection closed
57    #[error("Connection closed")]
58    ConnectionClosed,
59}
60
61pub type SyncResult<T> = Result<T, SynchronizerError>;
62
63impl<T> From<SendError<T>> for SynchronizerError {
64    fn from(err: SendError<T>) -> Self {
65        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
66    }
67}
68
69impl From<DeltasError> for SynchronizerError {
70    fn from(err: DeltasError) -> Self {
71        match err {
72            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
73            _ => SynchronizerError::ConnectionError(err.to_string()),
74        }
75    }
76}
77
78pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
79    extractor_id: ExtractorIdentity,
80    retrieve_balances: bool,
81    rpc_client: R,
82    deltas_client: D,
83    max_retries: u64,
84    retry_cooldown: Duration,
85    include_snapshots: bool,
86    component_tracker: ComponentTracker<R>,
87    last_synced_block: Option<BlockHeader>,
88    timeout: u64,
89    include_tvl: bool,
90    compression: bool,
91}
92
93#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
94pub struct ComponentWithState {
95    pub state: ResponseProtocolState,
96    pub component: ProtocolComponent,
97    pub component_tvl: Option<f64>,
98    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
99}
100
101#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
102pub struct Snapshot {
103    pub states: HashMap<String, ComponentWithState>,
104    pub vm_storage: HashMap<Bytes, ResponseAccount>,
105}
106
107impl Snapshot {
108    fn extend(&mut self, other: Snapshot) {
109        self.states.extend(other.states);
110        self.vm_storage.extend(other.vm_storage);
111    }
112
113    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
114        &self.states
115    }
116
117    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
118        &self.vm_storage
119    }
120}
121
122#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
123pub struct StateSyncMessage<H>
124where
125    H: HeaderLike,
126{
127    /// The block information for this update.
128    pub header: H,
129    /// Snapshot for new components.
130    pub snapshots: Snapshot,
131    /// A single delta contains state updates for all tracked components, as well as additional
132    /// information about the system components e.g. newly added components (even below tvl), tvl
133    /// updates, balance updates.
134    pub deltas: Option<BlockChanges>,
135    /// Components that stopped being tracked.
136    pub removed_components: HashMap<String, ProtocolComponent>,
137}
138
139impl<H> StateSyncMessage<H>
140where
141    H: HeaderLike,
142{
143    pub fn merge(mut self, other: Self) -> Self {
144        // be careful with removed and snapshots attributes here, these can be ambiguous.
145        self.removed_components
146            .retain(|k, _| !other.snapshots.states.contains_key(k));
147        self.snapshots
148            .states
149            .retain(|k, _| !other.removed_components.contains_key(k));
150
151        self.snapshots.extend(other.snapshots);
152        let deltas = match (self.deltas, other.deltas) {
153            (Some(l), Some(r)) => Some(l.merge(r)),
154            (None, Some(r)) => Some(r),
155            (Some(l), None) => Some(l),
156            (None, None) => None,
157        };
158        self.removed_components
159            .extend(other.removed_components);
160        Self {
161            header: other.header,
162            snapshots: self.snapshots,
163            deltas,
164            removed_components: self.removed_components,
165        }
166    }
167}
168
169/// Handle for controlling a running synchronizer task.
170///
171/// This handle provides methods to gracefully shut down the synchronizer
172/// and await its completion with a timeout.
173pub struct SynchronizerTaskHandle {
174    join_handle: JoinHandle<()>,
175    close_tx: oneshot::Sender<()>,
176}
177
178/// StateSynchronizer
179///
180/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
181/// delivering messages to the client that let him reconstruct subsets of the protocol state.
182///
183/// This involves deciding which components to track according to the clients preferences,
184/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
185/// delivering delta messages for the components that have changed.
186impl SynchronizerTaskHandle {
187    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
188        Self { join_handle, close_tx }
189    }
190
191    /// Splits the handle into its join handle and close sender.
192    ///
193    /// This allows monitoring the task completion separately from controlling shutdown.
194    /// The join handle can be used with FuturesUnordered for monitoring, while the
195    /// close sender can be used to signal graceful shutdown.
196    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
197        (self.join_handle, self.close_tx)
198    }
199}
200
201#[async_trait]
202pub trait StateSynchronizer: Send + Sync + 'static {
203    async fn initialize(&mut self) -> SyncResult<()>;
204    /// Starts the state synchronization, consuming the synchronizer.
205    /// Returns a handle for controlling the running task and a receiver for messages.
206    async fn start(
207        mut self,
208    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
209}
210
211impl<R, D> ProtocolStateSynchronizer<R, D>
212where
213    // TODO: Consider moving these constraints directly to the
214    // client...
215    R: RPCClient + Clone + Send + Sync + 'static,
216    D: DeltasClient + Clone + Send + Sync + 'static,
217{
218    /// Creates a new state synchronizer.
219    #[allow(clippy::too_many_arguments)]
220    pub fn new(
221        extractor_id: ExtractorIdentity,
222        retrieve_balances: bool,
223        component_filter: ComponentFilter,
224        max_retries: u64,
225        retry_cooldown: Duration,
226        include_snapshots: bool,
227        include_tvl: bool,
228        compression: bool,
229        rpc_client: R,
230        deltas_client: D,
231        timeout: u64,
232    ) -> Self {
233        Self {
234            extractor_id: extractor_id.clone(),
235            retrieve_balances,
236            rpc_client: rpc_client.clone(),
237            include_snapshots,
238            deltas_client,
239            component_tracker: ComponentTracker::new(
240                extractor_id.chain,
241                extractor_id.name.as_str(),
242                component_filter,
243                rpc_client,
244            ),
245            max_retries,
246            retry_cooldown,
247            last_synced_block: None,
248            timeout,
249            include_tvl,
250            compression,
251        }
252    }
253
254    /// Retrieves state snapshots of the requested components
255    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
256        &mut self,
257        header: BlockHeader,
258        ids: Option<I>,
259    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
260        if !self.include_snapshots {
261            return Ok(StateSyncMessage { header, ..Default::default() });
262        }
263
264        // Use given ids or use all if not passed
265        let component_ids: Vec<_> = match ids {
266            Some(ids) => ids.into_iter().cloned().collect(),
267            None => self
268                .component_tracker
269                .get_tracked_component_ids(),
270        };
271
272        if component_ids.is_empty() {
273            return Ok(StateSyncMessage { header, ..Default::default() });
274        }
275
276        //TODO: Improve this, we should not query for every component, but only for the ones that
277        // could have entrypoints. Maybe apply a filter per protocol?
278        let entrypoints_result = if self.extractor_id.chain == Chain::Ethereum {
279            let result = self
280                .rpc_client
281                .get_traced_entry_points_paginated(
282                    self.extractor_id.chain,
283                    &self.extractor_id.name,
284                    &component_ids,
285                    None,
286                    RPC_CLIENT_CONCURRENCY,
287                )
288                .await?;
289            self.component_tracker
290                .process_entrypoints(&result.clone().into());
291            result.traced_entry_points.clone()
292        } else {
293            HashMap::new()
294        };
295
296        // Get contract IDs from component tracker
297        let contract_ids: Vec<Bytes> = self
298            .component_tracker
299            .get_contracts_by_component(&component_ids)
300            .into_iter()
301            .collect();
302
303        let request = SnapshotParameters::new(
304            self.extractor_id.chain,
305            &self.extractor_id.name,
306            &self.component_tracker.components,
307            &contract_ids,
308            header.number,
309        )
310        .entrypoints(&entrypoints_result)
311        .include_balances(self.retrieve_balances)
312        .include_tvl(self.include_tvl);
313        let snapshot_response = self
314            .rpc_client
315            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
316            .await?;
317
318        trace!(states=?&snapshot_response.states, "Retrieved ProtocolStates");
319        trace!(contract_states=?&snapshot_response.vm_storage, "Retrieved ContractState");
320
321        Ok(StateSyncMessage {
322            header,
323            snapshots: snapshot_response,
324            deltas: None,
325            removed_components: HashMap::new(),
326        })
327    }
328
329    /// Main method that does all the work.
330    ///
331    /// ## Return Value
332    ///
333    /// Returns a `Result` where:
334    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
335    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
336    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
337    ///   retry)
338    ///
339    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
340    /// signal may still arrive and we want to remain cancellable across retries.
341    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
342    async fn state_sync(
343        &mut self,
344        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
345        mut end_rx: oneshot::Receiver<()>,
346    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
347        // initialisation
348        let subscription_options = SubscriptionOptions::new()
349            .with_state(self.include_snapshots)
350            .with_compression(self.compression);
351        let (subscription_id, mut msg_rx) = match self
352            .deltas_client
353            .subscribe(self.extractor_id.clone(), subscription_options)
354            .await
355        {
356            Ok(result) => result,
357            Err(e) => return Err((e.into(), Some(end_rx))),
358        };
359
360        let result = async {
361            info!("Waiting for deltas...");
362            let mut warned = false;
363            let mut first_msg = loop {
364                let msg = select! {
365                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
366                        deltas_result
367                            .map_err(|_| {
368                                SynchronizerError::Timeout(format!(
369                                    "First deltas took longer than {t}s to arrive",
370                                    t = self.timeout
371                                ))
372                            })?
373                            .ok_or_else(|| {
374                                SynchronizerError::ConnectionError(
375                                    "Deltas channel closed before first message".to_string(),
376                                )
377                            })?
378                    },
379                    _ = &mut end_rx => {
380                        info!("Received close signal while waiting for first deltas");
381                        return Ok(());
382                    }
383                };
384
385                let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
386                if let Some(current) = &self.last_synced_block {
387                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
388                        if !warned {
389                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
390                            warned = true;
391                        }
392                        continue
393                    }
394                }
395                break msg;
396            };
397
398            self.filter_deltas(&mut first_msg);
399
400            // initial snapshot
401            let block = first_msg.get_block().clone();
402            info!(height = &block.number, "First deltas received");
403            let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
404            let deltas_msg = StateSyncMessage {
405                header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
406                snapshots: Default::default(),
407                deltas: Some(first_msg),
408                removed_components: Default::default(),
409            };
410
411            // If possible skip retrieving snapshots
412            let msg = if !self.is_next_expected(&header) {
413                info!("Retrieving snapshot");
414                let snapshot = self
415                    .get_snapshots::<Vec<&String>>(
416                        BlockHeader::from_block(&block, false),
417                        None,
418                    )
419                    .await?
420                    .merge(deltas_msg);
421                let n_components = self.component_tracker.components.len();
422                let n_snapshots = snapshot.snapshots.states.len();
423                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
424                snapshot
425            } else {
426                deltas_msg
427            };
428            block_tx.send(Ok(msg)).await?;
429            self.last_synced_block = Some(header.clone());
430            loop {
431                select! {
432                    deltas_opt = msg_rx.recv() => {
433                        if let Some(mut deltas) = deltas_opt {
434                            let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
435                            debug!(block_number=?header.number, "Received delta message");
436
437                            let (snapshots, removed_components) = {
438                                // 1. Remove components based on latest changes
439                                // 2. Add components based on latest changes, query those for snapshots
440                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
441
442                                // Only components we don't track yet need a snapshot,
443                                let requiring_snapshot: Vec<_> = to_add
444                                    .iter()
445                                    .filter(|id| {
446                                        !self.component_tracker
447                                            .components
448                                            .contains_key(id.as_str())
449                                    })
450                                    .collect();
451                                debug!(components=?requiring_snapshot, "SnapshotRequest");
452                                self.component_tracker
453                                    .start_tracking(requiring_snapshot.as_slice())
454                                    .await?;
455
456                                let snapshots = self
457                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
458                                    .await?
459                                    .snapshots;
460
461                                let removed_components = if !to_remove.is_empty() {
462                                    self.component_tracker.stop_tracking(&to_remove)
463                                } else {
464                                    Default::default()
465                                };
466
467                                (snapshots, removed_components)
468                            };
469
470                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
471                            self.component_tracker.process_entrypoints(&deltas.dci_update);
472
473                            // 4. Filter deltas by currently tracked components / contracts
474                            self.filter_deltas(&mut deltas);
475                            let n_changes = deltas.n_changes();
476
477                            // 5. Send the message
478                            let next = StateSyncMessage {
479                                header: header.clone(),
480                                snapshots,
481                                deltas: Some(deltas),
482                                removed_components,
483                            };
484                            block_tx.send(Ok(next)).await?;
485                            self.last_synced_block = Some(header.clone());
486
487                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
488                        } else {
489                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
490                        }
491                    },
492                    _ = &mut end_rx => {
493                        info!("Received close signal during state_sync");
494                        return Ok(());
495                    }
496                }
497            }
498        }.await;
499
500        // This cleanup code now runs regardless of how the function exits (error or channel close)
501        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
502        //Ignore error
503        let _ = self
504            .deltas_client
505            .unsubscribe(subscription_id)
506            .await
507            .map_err(|err| {
508                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
509            });
510
511        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
512        // whether the end_rx was consumed (close signal received) or not
513        match result {
514            Ok(()) => Ok(()), // Success, likely due to close signal
515            Err(e) => {
516                // The error came from the inner async block. Since the async block
517                // can receive close signals (which would return Ok), any error means
518                // the close signal was NOT received, so we can return the end_rx for retry
519                Err((e, Some(end_rx)))
520            }
521        }
522    }
523
524    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
525        if let Some(block) = self.last_synced_block.as_ref() {
526            return incoming.parent_hash == block.hash;
527        }
528        false
529    }
530    fn filter_deltas(&self, deltas: &mut BlockChanges) {
531        deltas.filter_by_component(|id| {
532            self.component_tracker
533                .components
534                .contains_key(id)
535        });
536        deltas.filter_by_contract(|id| {
537            self.component_tracker
538                .contracts
539                .contains(id)
540        });
541    }
542}
543
544#[async_trait]
545impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
546where
547    R: RPCClient + Clone + Send + Sync + 'static,
548    D: DeltasClient + Clone + Send + Sync + 'static,
549{
550    async fn initialize(&mut self) -> SyncResult<()> {
551        info!("Retrieving relevant protocol components");
552        self.component_tracker
553            .initialise_components()
554            .await?;
555        info!(
556            n_components = self.component_tracker.components.len(),
557            n_contracts = self.component_tracker.contracts.len(),
558            "Finished retrieving components",
559        );
560
561        Ok(())
562    }
563
564    async fn start(
565        mut self,
566    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
567        let (mut tx, rx) = channel(15);
568        let (end_tx, end_rx) = oneshot::channel::<()>();
569
570        let jh = tokio::spawn(async move {
571            let mut retry_count = 0;
572            let mut current_end_rx = end_rx;
573            let mut final_error = None;
574
575            while retry_count < self.max_retries {
576                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
577
578                let res = self
579                    .state_sync(&mut tx, current_end_rx)
580                    .await;
581                match res {
582                    Ok(()) => {
583                        info!(
584                            extractor_id=%&self.extractor_id,
585                            retry_count,
586                            "State synchronization exited cleanly"
587                        );
588                        return;
589                    }
590                    Err((e, maybe_end_rx)) => {
591                        warn!(
592                            extractor_id=%&self.extractor_id,
593                            retry_count,
594                            error=%e,
595                            "State synchronization errored!"
596                        );
597
598                        // If we have the end_rx back, we can retry
599                        if let Some(recovered_end_rx) = maybe_end_rx {
600                            current_end_rx = recovered_end_rx;
601
602                            if let SynchronizerError::ConnectionClosed = e {
603                                // break synchronization loop if websocket client is dead
604                                error!(
605                                    "Websocket connection closed. State synchronization exiting."
606                                );
607                                let _ = tx.send(Err(e)).await;
608                                return;
609                            } else {
610                                // Store error in case this is our last retry
611                                final_error = Some(e);
612                            }
613                        } else {
614                            // Close signal was received, exit cleanly
615                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
616                            return;
617                        }
618                    }
619                }
620                sleep(self.retry_cooldown).await;
621                retry_count += 1;
622            }
623            if let Some(e) = final_error {
624                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
625                let _ = tx.send(Err(e)).await;
626            }
627        });
628
629        let handle = SynchronizerTaskHandle::new(jh, end_tx);
630        (handle, rx)
631    }
632}
633
634#[cfg(test)]
635mod test {
636    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
637    //!
638    //! ## Test Coverage Strategy:
639    //!
640    //! ### Shutdown & Close Signal Tests:
641    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
642    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
643    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
644    //!
645    //! ### Cleanup & Error Handling Tests:
646    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
647    //!
648    //! ### Coverage Summary:
649    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
650    //! ✓ Close signal before first deltas   ✓ Close signal during processing
651    //! ✓ Processing errors                  ✓ Channel closure
652    //! ✓ Public API close operations        ✓ Normal completion
653
654    use std::{collections::HashSet, sync::Arc};
655
656    use test_log::test;
657    use tycho_common::dto::{
658        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
659        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
660        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
661        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
662        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
663        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
664    };
665    use uuid::Uuid;
666
667    use super::*;
668    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
669
670    // Required for mock client to implement clone
671    struct ArcRPCClient<T>(Arc<T>);
672
673    // Default derive(Clone) does require T to be Clone as well.
674    impl<T> Clone for ArcRPCClient<T> {
675        fn clone(&self) -> Self {
676            ArcRPCClient(self.0.clone())
677        }
678    }
679
680    #[async_trait]
681    impl<T> RPCClient for ArcRPCClient<T>
682    where
683        T: RPCClient + Sync + Send + 'static,
684    {
685        async fn get_tokens(
686            &self,
687            request: &TokensRequestBody,
688        ) -> Result<TokensRequestResponse, RPCError> {
689            self.0.get_tokens(request).await
690        }
691
692        async fn get_contract_state(
693            &self,
694            request: &StateRequestBody,
695        ) -> Result<StateRequestResponse, RPCError> {
696            self.0.get_contract_state(request).await
697        }
698
699        async fn get_protocol_components(
700            &self,
701            request: &ProtocolComponentsRequestBody,
702        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
703            self.0
704                .get_protocol_components(request)
705                .await
706        }
707
708        async fn get_protocol_states(
709            &self,
710            request: &ProtocolStateRequestBody,
711        ) -> Result<ProtocolStateRequestResponse, RPCError> {
712            self.0
713                .get_protocol_states(request)
714                .await
715        }
716
717        async fn get_protocol_systems(
718            &self,
719            request: &ProtocolSystemsRequestBody,
720        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
721            self.0
722                .get_protocol_systems(request)
723                .await
724        }
725
726        async fn get_component_tvl(
727            &self,
728            request: &ComponentTvlRequestBody,
729        ) -> Result<ComponentTvlRequestResponse, RPCError> {
730            self.0.get_component_tvl(request).await
731        }
732
733        async fn get_traced_entry_points(
734            &self,
735            request: &TracedEntryPointRequestBody,
736        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
737            self.0
738                .get_traced_entry_points(request)
739                .await
740        }
741
742        async fn get_snapshots<'a>(
743            &self,
744            request: &SnapshotParameters<'a>,
745            chunk_size: Option<usize>,
746            concurrency: usize,
747        ) -> Result<Snapshot, RPCError> {
748            self.0
749                .get_snapshots(request, chunk_size, concurrency)
750                .await
751        }
752
753        fn compression(&self) -> bool {
754            self.0.compression()
755        }
756    }
757
758    // Required for mock client to implement clone
759    struct ArcDeltasClient<T>(Arc<T>);
760
761    // Default derive(Clone) does require T to be Clone as well.
762    impl<T> Clone for ArcDeltasClient<T> {
763        fn clone(&self) -> Self {
764            ArcDeltasClient(self.0.clone())
765        }
766    }
767
768    #[async_trait]
769    impl<T> DeltasClient for ArcDeltasClient<T>
770    where
771        T: DeltasClient + Sync + Send + 'static,
772    {
773        async fn subscribe(
774            &self,
775            extractor_id: ExtractorIdentity,
776            options: SubscriptionOptions,
777        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
778            self.0
779                .subscribe(extractor_id, options)
780                .await
781        }
782
783        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
784            self.0
785                .unsubscribe(subscription_id)
786                .await
787        }
788
789        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
790            self.0.connect().await
791        }
792
793        async fn close(&self) -> Result<(), DeltasError> {
794            self.0.close().await
795        }
796    }
797
798    fn with_mocked_clients(
799        native: bool,
800        include_tvl: bool,
801        rpc_client: Option<MockRPCClient>,
802        deltas_client: Option<MockDeltasClient>,
803    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
804    {
805        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
806        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
807
808        ProtocolStateSynchronizer::new(
809            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
810            native,
811            ComponentFilter::with_tvl_range(50.0, 50.0),
812            1,
813            Duration::from_secs(0),
814            true,
815            include_tvl,
816            true, // Does not matter as we mock the client that never compresses
817            rpc_client,
818            deltas_client,
819            10_u64,
820        )
821    }
822
823    fn state_snapshot_native() -> ProtocolStateRequestResponse {
824        ProtocolStateRequestResponse {
825            states: vec![ResponseProtocolState {
826                component_id: "Component1".to_string(),
827                ..Default::default()
828            }],
829            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
830        }
831    }
832
833    fn make_mock_client() -> MockRPCClient {
834        let mut m = MockRPCClient::new();
835        m.expect_compression()
836            .return_const(false);
837        m
838    }
839
840    #[test(tokio::test)]
841    async fn test_get_snapshots_native() {
842        let header = BlockHeader::default();
843        let mut rpc = make_mock_client();
844        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
845
846        let component_clone = component.clone();
847        rpc.expect_get_snapshots()
848            .returning(move |_request, _chunk_size, _concurrency| {
849                Ok(Snapshot {
850                    states: state_snapshot_native()
851                        .states
852                        .into_iter()
853                        .map(|state| {
854                            (
855                                state.component_id.clone(),
856                                ComponentWithState {
857                                    state,
858                                    component: component_clone.clone(),
859                                    entrypoints: vec![],
860                                    component_tvl: None,
861                                },
862                            )
863                        })
864                        .collect(),
865                    vm_storage: HashMap::new(),
866                })
867            });
868
869        rpc.expect_get_traced_entry_points()
870            .returning(|_| {
871                Ok(TracedEntryPointRequestResponse {
872                    traced_entry_points: HashMap::new(),
873                    pagination: PaginationResponse::new(0, 20, 0),
874                })
875            });
876
877        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
878        state_sync
879            .component_tracker
880            .components
881            .insert("Component1".to_string(), component.clone());
882        let components_arg = ["Component1".to_string()];
883        let exp = StateSyncMessage {
884            header: header.clone(),
885            snapshots: Snapshot {
886                states: state_snapshot_native()
887                    .states
888                    .into_iter()
889                    .map(|state| {
890                        (
891                            state.component_id.clone(),
892                            ComponentWithState {
893                                state,
894                                component: component.clone(),
895                                entrypoints: vec![],
896                                component_tvl: None,
897                            },
898                        )
899                    })
900                    .collect(),
901                vm_storage: HashMap::new(),
902            },
903            deltas: None,
904            removed_components: Default::default(),
905        };
906
907        let snap = state_sync
908            .get_snapshots(header, Some(&components_arg))
909            .await
910            .expect("Retrieving snapshot failed");
911
912        assert_eq!(snap, exp);
913    }
914
915    #[test(tokio::test)]
916    async fn test_get_snapshots_native_with_tvl() {
917        let header = BlockHeader::default();
918        let mut rpc = make_mock_client();
919        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
920
921        let component_clone = component.clone();
922        rpc.expect_get_snapshots()
923            .returning(move |_request, _chunk_size, _concurrency| {
924                Ok(Snapshot {
925                    states: state_snapshot_native()
926                        .states
927                        .into_iter()
928                        .map(|state| {
929                            (
930                                state.component_id.clone(),
931                                ComponentWithState {
932                                    state,
933                                    component: component_clone.clone(),
934                                    component_tvl: Some(100.0),
935                                    entrypoints: vec![],
936                                },
937                            )
938                        })
939                        .collect(),
940                    vm_storage: HashMap::new(),
941                })
942            });
943
944        rpc.expect_get_traced_entry_points()
945            .returning(|_| {
946                Ok(TracedEntryPointRequestResponse {
947                    traced_entry_points: HashMap::new(),
948                    pagination: PaginationResponse::new(0, 20, 0),
949                })
950            });
951
952        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
953        state_sync
954            .component_tracker
955            .components
956            .insert("Component1".to_string(), component.clone());
957        let components_arg = ["Component1".to_string()];
958        let exp = StateSyncMessage {
959            header: header.clone(),
960            snapshots: Snapshot {
961                states: state_snapshot_native()
962                    .states
963                    .into_iter()
964                    .map(|state| {
965                        (
966                            state.component_id.clone(),
967                            ComponentWithState {
968                                state,
969                                component: component.clone(),
970                                component_tvl: Some(100.0),
971                                entrypoints: vec![],
972                            },
973                        )
974                    })
975                    .collect(),
976                vm_storage: HashMap::new(),
977            },
978            deltas: None,
979            removed_components: Default::default(),
980        };
981
982        let snap = state_sync
983            .get_snapshots(header, Some(&components_arg))
984            .await
985            .expect("Retrieving snapshot failed");
986
987        assert_eq!(snap, exp);
988    }
989
990    fn state_snapshot_vm() -> StateRequestResponse {
991        StateRequestResponse {
992            accounts: vec![
993                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
994                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
995            ],
996            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
997        }
998    }
999
1000    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1001        TracedEntryPointRequestResponse {
1002            traced_entry_points: HashMap::from([(
1003                "Component1".to_string(),
1004                vec![(
1005                    EntryPointWithTracingParams {
1006                        entry_point: EntryPoint {
1007                            external_id: "entrypoint_a".to_string(),
1008                            target: Bytes::from("0x0badc0ffee"),
1009                            signature: "sig()".to_string(),
1010                        },
1011                        params: TracingParams::RPCTracer(RPCTracerParams {
1012                            caller: Some(Bytes::from("0x0badc0ffee")),
1013                            calldata: Bytes::from("0x0badc0ffee"),
1014                            state_overrides: None,
1015                            prune_addresses: None,
1016                        }),
1017                    },
1018                    TracingResult {
1019                        retriggers: HashSet::from([(
1020                            Bytes::from("0x0badc0ffee"),
1021                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1022                        )]),
1023                        accessed_slots: HashMap::from([(
1024                            Bytes::from("0x0badc0ffee"),
1025                            HashSet::from([Bytes::from("0xbadbeef0")]),
1026                        )]),
1027                    },
1028                )],
1029            )]),
1030            pagination: PaginationResponse::new(0, 20, 0),
1031        }
1032    }
1033
1034    #[test(tokio::test)]
1035    async fn test_get_snapshots_vm() {
1036        let header = BlockHeader::default();
1037        let mut rpc = make_mock_client();
1038
1039        let traced_ep_response = traced_entry_point_response();
1040        rpc.expect_get_snapshots()
1041            .returning(move |_request, _chunk_size, _concurrency| {
1042                let vm_storage_accounts = state_snapshot_vm();
1043                Ok(Snapshot {
1044                    states: [(
1045                        "Component1".to_string(),
1046                        ComponentWithState {
1047                            state: ResponseProtocolState {
1048                                component_id: "Component1".to_string(),
1049                                ..Default::default()
1050                            },
1051                            component: ProtocolComponent {
1052                                id: "Component1".to_string(),
1053                                contract_ids: vec![
1054                                    Bytes::from("0x0badc0ffee"),
1055                                    Bytes::from("0xbabe42"),
1056                                ],
1057                                ..Default::default()
1058                            },
1059                            component_tvl: None,
1060                            entrypoints: traced_ep_response
1061                                .traced_entry_points
1062                                .get("Component1")
1063                                .cloned()
1064                                .unwrap_or_default(),
1065                        },
1066                    )]
1067                    .into_iter()
1068                    .collect(),
1069                    vm_storage: vm_storage_accounts
1070                        .accounts
1071                        .into_iter()
1072                        .map(|state| (state.address.clone(), state))
1073                        .collect(),
1074                })
1075            });
1076
1077        rpc.expect_get_traced_entry_points()
1078            .returning(|_| Ok(traced_entry_point_response()));
1079
1080        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1081        let component = ProtocolComponent {
1082            id: "Component1".to_string(),
1083            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1084            ..Default::default()
1085        };
1086        state_sync
1087            .component_tracker
1088            .components
1089            .insert("Component1".to_string(), component.clone());
1090        let components_arg = ["Component1".to_string()];
1091        let exp = StateSyncMessage {
1092            header: header.clone(),
1093            snapshots: Snapshot {
1094                states: [(
1095                    component.id.clone(),
1096                    ComponentWithState {
1097                        state: ResponseProtocolState {
1098                            component_id: "Component1".to_string(),
1099                            ..Default::default()
1100                        },
1101                        component: component.clone(),
1102                        component_tvl: None,
1103                        entrypoints: vec![(
1104                            EntryPointWithTracingParams {
1105                                entry_point: EntryPoint {
1106                                    external_id: "entrypoint_a".to_string(),
1107                                    target: Bytes::from("0x0badc0ffee"),
1108                                    signature: "sig()".to_string(),
1109                                },
1110                                params: TracingParams::RPCTracer(RPCTracerParams {
1111                                    caller: Some(Bytes::from("0x0badc0ffee")),
1112                                    calldata: Bytes::from("0x0badc0ffee"),
1113                                    state_overrides: None,
1114                                    prune_addresses: None,
1115                                }),
1116                            },
1117                            TracingResult {
1118                                retriggers: HashSet::from([(
1119                                    Bytes::from("0x0badc0ffee"),
1120                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1121                                )]),
1122                                accessed_slots: HashMap::from([(
1123                                    Bytes::from("0x0badc0ffee"),
1124                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1125                                )]),
1126                            },
1127                        )],
1128                    },
1129                )]
1130                .into_iter()
1131                .collect(),
1132                vm_storage: state_snapshot_vm()
1133                    .accounts
1134                    .into_iter()
1135                    .map(|state| (state.address.clone(), state))
1136                    .collect(),
1137            },
1138            deltas: None,
1139            removed_components: Default::default(),
1140        };
1141
1142        let snap = state_sync
1143            .get_snapshots(header, Some(&components_arg))
1144            .await
1145            .expect("Retrieving snapshot failed");
1146
1147        assert_eq!(snap, exp);
1148    }
1149
1150    #[test(tokio::test)]
1151    async fn test_get_snapshots_vm_with_tvl() {
1152        let header = BlockHeader::default();
1153        let mut rpc = make_mock_client();
1154        let component = ProtocolComponent {
1155            id: "Component1".to_string(),
1156            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1157            ..Default::default()
1158        };
1159
1160        let component_clone = component.clone();
1161        rpc.expect_get_snapshots()
1162            .returning(move |_request, _chunk_size, _concurrency| {
1163                let vm_storage_accounts = state_snapshot_vm();
1164                Ok(Snapshot {
1165                    states: [(
1166                        "Component1".to_string(),
1167                        ComponentWithState {
1168                            state: ResponseProtocolState {
1169                                component_id: "Component1".to_string(),
1170                                ..Default::default()
1171                            },
1172                            component: component_clone.clone(),
1173                            component_tvl: Some(100.0),
1174                            entrypoints: vec![],
1175                        },
1176                    )]
1177                    .into_iter()
1178                    .collect(),
1179                    vm_storage: vm_storage_accounts
1180                        .accounts
1181                        .into_iter()
1182                        .map(|state| (state.address.clone(), state))
1183                        .collect(),
1184                })
1185            });
1186
1187        rpc.expect_get_traced_entry_points()
1188            .returning(|_| {
1189                Ok(TracedEntryPointRequestResponse {
1190                    traced_entry_points: HashMap::new(),
1191                    pagination: PaginationResponse::new(0, 20, 0),
1192                })
1193            });
1194
1195        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1196        state_sync
1197            .component_tracker
1198            .components
1199            .insert("Component1".to_string(), component.clone());
1200        let components_arg = ["Component1".to_string()];
1201        let exp = StateSyncMessage {
1202            header: header.clone(),
1203            snapshots: Snapshot {
1204                states: [(
1205                    component.id.clone(),
1206                    ComponentWithState {
1207                        state: ResponseProtocolState {
1208                            component_id: "Component1".to_string(),
1209                            ..Default::default()
1210                        },
1211                        component: component.clone(),
1212                        component_tvl: Some(100.0),
1213                        entrypoints: vec![],
1214                    },
1215                )]
1216                .into_iter()
1217                .collect(),
1218                vm_storage: state_snapshot_vm()
1219                    .accounts
1220                    .into_iter()
1221                    .map(|state| (state.address.clone(), state))
1222                    .collect(),
1223            },
1224            deltas: None,
1225            removed_components: Default::default(),
1226        };
1227
1228        let snap = state_sync
1229            .get_snapshots(header, Some(&components_arg))
1230            .await
1231            .expect("Retrieving snapshot failed");
1232
1233        assert_eq!(snap, exp);
1234    }
1235
1236    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1237        let mut rpc_client = make_mock_client();
1238        // Mocks for the start_tracking call, these need to come first because they are more
1239        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1240        rpc_client
1241            .expect_get_protocol_components()
1242            .with(mockall::predicate::function(
1243                move |request_params: &ProtocolComponentsRequestBody| {
1244                    if let Some(ids) = request_params.component_ids.as_ref() {
1245                        ids.contains(&"Component3".to_string())
1246                    } else {
1247                        false
1248                    }
1249                },
1250            ))
1251            .returning(|_| {
1252                // return Component3
1253                Ok(ProtocolComponentRequestResponse {
1254                    protocol_components: vec![
1255                        // this component shall have a tvl update above threshold
1256                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1257                    ],
1258                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1259                })
1260            });
1261        // Mock get_snapshots for Component3
1262        rpc_client
1263            .expect_get_snapshots()
1264            .withf(
1265                |request: &SnapshotParameters,
1266                 _chunk_size: &Option<usize>,
1267                 _concurrency: &usize| {
1268                    request
1269                        .components
1270                        .contains_key("Component3")
1271                },
1272            )
1273            .returning(|_request, _chunk_size, _concurrency| {
1274                Ok(Snapshot {
1275                    states: [(
1276                        "Component3".to_string(),
1277                        ComponentWithState {
1278                            state: ResponseProtocolState {
1279                                component_id: "Component3".to_string(),
1280                                ..Default::default()
1281                            },
1282                            component: ProtocolComponent {
1283                                id: "Component3".to_string(),
1284                                ..Default::default()
1285                            },
1286                            component_tvl: Some(1000.0),
1287                            entrypoints: vec![],
1288                        },
1289                    )]
1290                    .into_iter()
1291                    .collect(),
1292                    vm_storage: HashMap::new(),
1293                })
1294            });
1295
1296        // mock calls for the initial state snapshots
1297        rpc_client
1298            .expect_get_protocol_components()
1299            .returning(|_| {
1300                // Initial sync of components
1301                Ok(ProtocolComponentRequestResponse {
1302                    protocol_components: vec![
1303                        // this component shall have a tvl update above threshold
1304                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1305                        // this component shall have a tvl update below threshold.
1306                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1307                        // a third component will have a tvl update above threshold
1308                    ],
1309                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1310                })
1311            });
1312
1313        rpc_client
1314            .expect_get_snapshots()
1315            .returning(|_request, _chunk_size, _concurrency| {
1316                Ok(Snapshot {
1317                    states: [
1318                        (
1319                            "Component1".to_string(),
1320                            ComponentWithState {
1321                                state: ResponseProtocolState {
1322                                    component_id: "Component1".to_string(),
1323                                    ..Default::default()
1324                                },
1325                                component: ProtocolComponent {
1326                                    id: "Component1".to_string(),
1327                                    ..Default::default()
1328                                },
1329                                component_tvl: Some(100.0),
1330                                entrypoints: vec![],
1331                            },
1332                        ),
1333                        (
1334                            "Component2".to_string(),
1335                            ComponentWithState {
1336                                state: ResponseProtocolState {
1337                                    component_id: "Component2".to_string(),
1338                                    ..Default::default()
1339                                },
1340                                component: ProtocolComponent {
1341                                    id: "Component2".to_string(),
1342                                    ..Default::default()
1343                                },
1344                                component_tvl: Some(0.0),
1345                                entrypoints: vec![],
1346                            },
1347                        ),
1348                    ]
1349                    .into_iter()
1350                    .collect(),
1351                    vm_storage: HashMap::new(),
1352                })
1353            });
1354
1355        // Mock get_traced_entry_points for Ethereum chain
1356        rpc_client
1357            .expect_get_traced_entry_points()
1358            .returning(|_| {
1359                Ok(TracedEntryPointRequestResponse {
1360                    traced_entry_points: HashMap::new(),
1361                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1362                })
1363            });
1364
1365        // Mock deltas client and messages
1366        let mut deltas_client = MockDeltasClient::new();
1367        let (tx, rx) = channel(1);
1368        deltas_client
1369            .expect_subscribe()
1370            .return_once(move |_, _| {
1371                // Return subscriber id and a channel
1372                Ok((Uuid::default(), rx))
1373            });
1374
1375        // Expect unsubscribe call during cleanup
1376        deltas_client
1377            .expect_unsubscribe()
1378            .return_once(|_| Ok(()));
1379
1380        (rpc_client, deltas_client, tx)
1381    }
1382
1383    /// Test strategy
1384    ///
1385    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1386    /// - send 2 dummy messages, containing only blocks
1387    /// - third message contains a new component with some significant tvl, one initial component
1388    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1389    #[test(tokio::test)]
1390    async fn test_state_sync() {
1391        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1392        let deltas = [
1393            BlockChanges {
1394                extractor: "uniswap-v2".to_string(),
1395                chain: Chain::Ethereum,
1396                block: Block {
1397                    number: 1,
1398                    hash: Bytes::from("0x01"),
1399                    parent_hash: Bytes::from("0x00"),
1400                    chain: Chain::Ethereum,
1401                    ts: Default::default(),
1402                },
1403                revert: false,
1404                dci_update: DCIUpdate {
1405                    new_entrypoints: HashMap::from([(
1406                        "Component1".to_string(),
1407                        HashSet::from([EntryPoint {
1408                            external_id: "entrypoint_a".to_string(),
1409                            target: Bytes::from("0x0badc0ffee"),
1410                            signature: "sig()".to_string(),
1411                        }]),
1412                    )]),
1413                    new_entrypoint_params: HashMap::from([(
1414                        "entrypoint_a".to_string(),
1415                        HashSet::from([(
1416                            TracingParams::RPCTracer(RPCTracerParams {
1417                                caller: Some(Bytes::from("0x0badc0ffee")),
1418                                calldata: Bytes::from("0x0badc0ffee"),
1419                                state_overrides: None,
1420                                prune_addresses: None,
1421                            }),
1422                            Some("Component1".to_string()),
1423                        )]),
1424                    )]),
1425                    trace_results: HashMap::from([(
1426                        "entrypoint_a".to_string(),
1427                        TracingResult {
1428                            retriggers: HashSet::from([(
1429                                Bytes::from("0x0badc0ffee"),
1430                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1431                            )]),
1432                            accessed_slots: HashMap::from([(
1433                                Bytes::from("0x0badc0ffee"),
1434                                HashSet::from([Bytes::from("0xbadbeef0")]),
1435                            )]),
1436                        },
1437                    )]),
1438                },
1439                ..Default::default()
1440            },
1441            BlockChanges {
1442                extractor: "uniswap-v2".to_string(),
1443                chain: Chain::Ethereum,
1444                block: Block {
1445                    number: 2,
1446                    hash: Bytes::from("0x02"),
1447                    parent_hash: Bytes::from("0x01"),
1448                    chain: Chain::Ethereum,
1449                    ts: Default::default(),
1450                },
1451                revert: false,
1452                component_tvl: [
1453                    ("Component1".to_string(), 100.0),
1454                    ("Component2".to_string(), 0.0),
1455                    ("Component3".to_string(), 1000.0),
1456                ]
1457                .into_iter()
1458                .collect(),
1459                ..Default::default()
1460            },
1461        ];
1462        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1463        state_sync
1464            .initialize()
1465            .await
1466            .expect("Init failed");
1467
1468        // Test starts here
1469        let (handle, mut rx) = state_sync.start().await;
1470        let (jh, close_tx) = handle.split();
1471        tx.send(deltas[0].clone())
1472            .await
1473            .expect("deltas channel msg 0 closed!");
1474        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1475            .await
1476            .expect("waiting for first state msg timed out!")
1477            .expect("state sync block sender closed!");
1478        tx.send(deltas[1].clone())
1479            .await
1480            .expect("deltas channel msg 1 closed!");
1481        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1482            .await
1483            .expect("waiting for second state msg timed out!")
1484            .expect("state sync block sender closed!");
1485        let _ = close_tx.send(());
1486        jh.await
1487            .expect("state sync task panicked!");
1488
1489        // assertions
1490        let exp1 = StateSyncMessage {
1491            header: BlockHeader {
1492                number: 1,
1493                hash: Bytes::from("0x01"),
1494                parent_hash: Bytes::from("0x00"),
1495                revert: false,
1496                ..Default::default()
1497            },
1498            snapshots: Snapshot {
1499                states: [
1500                    (
1501                        "Component1".to_string(),
1502                        ComponentWithState {
1503                            state: ResponseProtocolState {
1504                                component_id: "Component1".to_string(),
1505                                ..Default::default()
1506                            },
1507                            component: ProtocolComponent {
1508                                id: "Component1".to_string(),
1509                                ..Default::default()
1510                            },
1511                            component_tvl: Some(100.0),
1512                            entrypoints: vec![],
1513                        },
1514                    ),
1515                    (
1516                        "Component2".to_string(),
1517                        ComponentWithState {
1518                            state: ResponseProtocolState {
1519                                component_id: "Component2".to_string(),
1520                                ..Default::default()
1521                            },
1522                            component: ProtocolComponent {
1523                                id: "Component2".to_string(),
1524                                ..Default::default()
1525                            },
1526                            component_tvl: Some(0.0),
1527                            entrypoints: vec![],
1528                        },
1529                    ),
1530                ]
1531                .into_iter()
1532                .collect(),
1533                vm_storage: HashMap::new(),
1534            },
1535            deltas: Some(deltas[0].clone()),
1536            removed_components: Default::default(),
1537        };
1538
1539        let exp2 = StateSyncMessage {
1540            header: BlockHeader {
1541                number: 2,
1542                hash: Bytes::from("0x02"),
1543                parent_hash: Bytes::from("0x01"),
1544                revert: false,
1545                ..Default::default()
1546            },
1547            snapshots: Snapshot {
1548                states: [
1549                    // This is the new component we queried once it passed the tvl threshold.
1550                    (
1551                        "Component3".to_string(),
1552                        ComponentWithState {
1553                            state: ResponseProtocolState {
1554                                component_id: "Component3".to_string(),
1555                                ..Default::default()
1556                            },
1557                            component: ProtocolComponent {
1558                                id: "Component3".to_string(),
1559                                ..Default::default()
1560                            },
1561                            component_tvl: Some(1000.0),
1562                            entrypoints: vec![],
1563                        },
1564                    ),
1565                ]
1566                .into_iter()
1567                .collect(),
1568                vm_storage: HashMap::new(),
1569            },
1570            // Our deltas are empty and since merge methods are
1571            // tested in tycho-common we don't have much to do here.
1572            deltas: Some(BlockChanges {
1573                extractor: "uniswap-v2".to_string(),
1574                chain: Chain::Ethereum,
1575                block: Block {
1576                    number: 2,
1577                    hash: Bytes::from("0x02"),
1578                    parent_hash: Bytes::from("0x01"),
1579                    chain: Chain::Ethereum,
1580                    ts: Default::default(),
1581                },
1582                revert: false,
1583                component_tvl: [
1584                    // "Component2" should not show here.
1585                    ("Component1".to_string(), 100.0),
1586                    ("Component3".to_string(), 1000.0),
1587                ]
1588                .into_iter()
1589                .collect(),
1590                ..Default::default()
1591            }),
1592            // "Component2" was removed, because its tvl changed to 0.
1593            removed_components: [(
1594                "Component2".to_string(),
1595                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1596            )]
1597            .into_iter()
1598            .collect(),
1599        };
1600        assert_eq!(first_msg.unwrap(), exp1);
1601        assert_eq!(second_msg.unwrap(), exp2);
1602    }
1603
1604    #[test(tokio::test)]
1605    async fn test_state_sync_with_tvl_range() {
1606        // Define the range for testing
1607        let remove_tvl_threshold = 5.0;
1608        let add_tvl_threshold = 7.0;
1609
1610        let mut rpc_client = make_mock_client();
1611        let mut deltas_client = MockDeltasClient::new();
1612
1613        rpc_client
1614            .expect_get_protocol_components()
1615            .with(mockall::predicate::function(
1616                move |request_params: &ProtocolComponentsRequestBody| {
1617                    if let Some(ids) = request_params.component_ids.as_ref() {
1618                        ids.contains(&"Component3".to_string())
1619                    } else {
1620                        false
1621                    }
1622                },
1623            ))
1624            .returning(|_| {
1625                Ok(ProtocolComponentRequestResponse {
1626                    protocol_components: vec![ProtocolComponent {
1627                        id: "Component3".to_string(),
1628                        ..Default::default()
1629                    }],
1630                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1631                })
1632            });
1633        // Mock get_snapshots for Component3
1634        rpc_client
1635            .expect_get_snapshots()
1636            .withf(
1637                |request: &SnapshotParameters,
1638                 _chunk_size: &Option<usize>,
1639                 _concurrency: &usize| {
1640                    request
1641                        .components
1642                        .contains_key("Component3")
1643                },
1644            )
1645            .returning(|_request, _chunk_size, _concurrency| {
1646                Ok(Snapshot {
1647                    states: [(
1648                        "Component3".to_string(),
1649                        ComponentWithState {
1650                            state: ResponseProtocolState {
1651                                component_id: "Component3".to_string(),
1652                                ..Default::default()
1653                            },
1654                            component: ProtocolComponent {
1655                                id: "Component3".to_string(),
1656                                ..Default::default()
1657                            },
1658                            component_tvl: Some(10.0),
1659                            entrypoints: vec![],
1660                        },
1661                    )]
1662                    .into_iter()
1663                    .collect(),
1664                    vm_storage: HashMap::new(),
1665                })
1666            });
1667
1668        // Mock for the initial snapshot retrieval
1669        rpc_client
1670            .expect_get_protocol_components()
1671            .returning(|_| {
1672                Ok(ProtocolComponentRequestResponse {
1673                    protocol_components: vec![
1674                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1675                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1676                    ],
1677                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1678                })
1679            });
1680
1681        // Mock get_snapshots for initial snapshot
1682        rpc_client
1683            .expect_get_snapshots()
1684            .returning(|_request, _chunk_size, _concurrency| {
1685                Ok(Snapshot {
1686                    states: [
1687                        (
1688                            "Component1".to_string(),
1689                            ComponentWithState {
1690                                state: ResponseProtocolState {
1691                                    component_id: "Component1".to_string(),
1692                                    ..Default::default()
1693                                },
1694                                component: ProtocolComponent {
1695                                    id: "Component1".to_string(),
1696                                    ..Default::default()
1697                                },
1698                                component_tvl: Some(6.0),
1699                                entrypoints: vec![],
1700                            },
1701                        ),
1702                        (
1703                            "Component2".to_string(),
1704                            ComponentWithState {
1705                                state: ResponseProtocolState {
1706                                    component_id: "Component2".to_string(),
1707                                    ..Default::default()
1708                                },
1709                                component: ProtocolComponent {
1710                                    id: "Component2".to_string(),
1711                                    ..Default::default()
1712                                },
1713                                component_tvl: Some(2.0),
1714                                entrypoints: vec![],
1715                            },
1716                        ),
1717                    ]
1718                    .into_iter()
1719                    .collect(),
1720                    vm_storage: HashMap::new(),
1721                })
1722            });
1723
1724        // Mock get_traced_entry_points for Ethereum chain
1725        rpc_client
1726            .expect_get_traced_entry_points()
1727            .returning(|_| {
1728                Ok(TracedEntryPointRequestResponse {
1729                    traced_entry_points: HashMap::new(),
1730                    pagination: PaginationResponse { page: 0, page_size: 100, total: 0 },
1731                })
1732            });
1733
1734        let (tx, rx) = channel(1);
1735        deltas_client
1736            .expect_subscribe()
1737            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1738
1739        // Expect unsubscribe call during cleanup
1740        deltas_client
1741            .expect_unsubscribe()
1742            .return_once(|_| Ok(()));
1743
1744        let mut state_sync = ProtocolStateSynchronizer::new(
1745            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1746            true,
1747            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1748            1,
1749            Duration::from_secs(0),
1750            true,
1751            true,
1752            true,
1753            ArcRPCClient(Arc::new(rpc_client)),
1754            ArcDeltasClient(Arc::new(deltas_client)),
1755            10_u64,
1756        );
1757        state_sync
1758            .initialize()
1759            .await
1760            .expect("Init failed");
1761
1762        // Simulate the incoming BlockChanges
1763        let deltas = [
1764            BlockChanges {
1765                extractor: "uniswap-v2".to_string(),
1766                chain: Chain::Ethereum,
1767                block: Block {
1768                    number: 1,
1769                    hash: Bytes::from("0x01"),
1770                    parent_hash: Bytes::from("0x00"),
1771                    chain: Chain::Ethereum,
1772                    ts: Default::default(),
1773                },
1774                revert: false,
1775                ..Default::default()
1776            },
1777            BlockChanges {
1778                extractor: "uniswap-v2".to_string(),
1779                chain: Chain::Ethereum,
1780                block: Block {
1781                    number: 2,
1782                    hash: Bytes::from("0x02"),
1783                    parent_hash: Bytes::from("0x01"),
1784                    chain: Chain::Ethereum,
1785                    ts: Default::default(),
1786                },
1787                revert: false,
1788                component_tvl: [
1789                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1790                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1791                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1792                ]
1793                .into_iter()
1794                .collect(),
1795                ..Default::default()
1796            },
1797        ];
1798
1799        let (handle, mut rx) = state_sync.start().await;
1800        let (jh, close_tx) = handle.split();
1801
1802        // Simulate sending delta messages
1803        tx.send(deltas[0].clone())
1804            .await
1805            .expect("deltas channel msg 0 closed!");
1806
1807        // Expecting to receive the initial state message
1808        let _ = timeout(Duration::from_millis(100), rx.recv())
1809            .await
1810            .expect("waiting for first state msg timed out!")
1811            .expect("state sync block sender closed!");
1812
1813        // Send the third message, which should trigger TVL-based changes
1814        tx.send(deltas[1].clone())
1815            .await
1816            .expect("deltas channel msg 1 closed!");
1817        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1818            .await
1819            .expect("waiting for second state msg timed out!")
1820            .expect("state sync block sender closed!")
1821            .expect("no error");
1822
1823        let _ = close_tx.send(());
1824        jh.await
1825            .expect("state sync task panicked!");
1826
1827        let expected_second_msg = StateSyncMessage {
1828            header: BlockHeader {
1829                number: 2,
1830                hash: Bytes::from("0x02"),
1831                parent_hash: Bytes::from("0x01"),
1832                revert: false,
1833                ..Default::default()
1834            },
1835            snapshots: Snapshot {
1836                states: [(
1837                    "Component3".to_string(),
1838                    ComponentWithState {
1839                        state: ResponseProtocolState {
1840                            component_id: "Component3".to_string(),
1841                            ..Default::default()
1842                        },
1843                        component: ProtocolComponent {
1844                            id: "Component3".to_string(),
1845                            ..Default::default()
1846                        },
1847                        component_tvl: Some(10.0),
1848                        entrypoints: vec![], // TODO: add entrypoints?
1849                    },
1850                )]
1851                .into_iter()
1852                .collect(),
1853                vm_storage: HashMap::new(),
1854            },
1855            deltas: Some(BlockChanges {
1856                extractor: "uniswap-v2".to_string(),
1857                chain: Chain::Ethereum,
1858                block: Block {
1859                    number: 2,
1860                    hash: Bytes::from("0x02"),
1861                    parent_hash: Bytes::from("0x01"),
1862                    chain: Chain::Ethereum,
1863                    ts: Default::default(),
1864                },
1865                revert: false,
1866                component_tvl: [
1867                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1868                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1869                ]
1870                .into_iter()
1871                .collect(),
1872                ..Default::default()
1873            }),
1874            removed_components: [(
1875                "Component2".to_string(),
1876                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1877            )]
1878            .into_iter()
1879            .collect(),
1880        };
1881
1882        assert_eq!(second_msg, expected_second_msg);
1883    }
1884
1885    #[test(tokio::test)]
1886    async fn test_public_close_api_functionality() {
1887        // Tests the public close() API through the StateSynchronizer trait:
1888        // - close() fails before start() is called
1889        // - close() succeeds while synchronizer is running
1890        // - close() fails after already closed
1891        // This tests the full start/close lifecycle via the public API
1892
1893        let mut rpc_client = make_mock_client();
1894        let mut deltas_client = MockDeltasClient::new();
1895
1896        // Mock the initial components call
1897        rpc_client
1898            .expect_get_protocol_components()
1899            .returning(|_| {
1900                Ok(ProtocolComponentRequestResponse {
1901                    protocol_components: vec![],
1902                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1903                })
1904            });
1905
1906        // Set up deltas client that will wait for messages (blocking in state_sync)
1907        let (_tx, rx) = channel(1);
1908        deltas_client
1909            .expect_subscribe()
1910            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1911
1912        // Expect unsubscribe call during cleanup
1913        deltas_client
1914            .expect_unsubscribe()
1915            .return_once(|_| Ok(()));
1916
1917        let mut state_sync = ProtocolStateSynchronizer::new(
1918            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1919            true,
1920            ComponentFilter::with_tvl_range(0.0, 0.0),
1921            5, // Enough retries
1922            Duration::from_secs(0),
1923            true,
1924            false,
1925            true,
1926            ArcRPCClient(Arc::new(rpc_client)),
1927            ArcDeltasClient(Arc::new(deltas_client)),
1928            10000_u64, // Long timeout so task doesn't exit on its own
1929        );
1930
1931        state_sync
1932            .initialize()
1933            .await
1934            .expect("Init should succeed");
1935
1936        // Start the synchronizer and test the new split-based close mechanism
1937        let (handle, _rx) = state_sync.start().await;
1938        let (jh, close_tx) = handle.split();
1939
1940        // Give it time to start up and enter state_sync
1941        tokio::time::sleep(Duration::from_millis(100)).await;
1942
1943        // Send close signal should succeed
1944        close_tx
1945            .send(())
1946            .expect("Should be able to send close signal");
1947        // Task should stop cleanly
1948        jh.await.expect("Task should not panic");
1949    }
1950
1951    #[test(tokio::test)]
1952    async fn test_cleanup_runs_when_state_sync_processing_errors() {
1953        // Tests that cleanup code runs when state_sync() errors during delta processing.
1954        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
1955        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
1956
1957        let mut rpc_client = make_mock_client();
1958        let mut deltas_client = MockDeltasClient::new();
1959
1960        // Mock the initial components call
1961        rpc_client
1962            .expect_get_protocol_components()
1963            .returning(|_| {
1964                Ok(ProtocolComponentRequestResponse {
1965                    protocol_components: vec![],
1966                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1967                })
1968            });
1969
1970        // Mock to fail during snapshot retrieval (this will cause an error during processing)
1971        rpc_client
1972            .expect_get_protocol_states()
1973            .returning(|_| {
1974                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1975            });
1976
1977        // Set up deltas client to send one message that will trigger snapshot retrieval
1978        let (tx, rx) = channel(10);
1979        deltas_client
1980            .expect_subscribe()
1981            .return_once(move |_, _| {
1982                // Send a delta message that will require a snapshot
1983                let delta = BlockChanges {
1984                    extractor: "test".to_string(),
1985                    chain: Chain::Ethereum,
1986                    block: Block {
1987                        hash: Bytes::from("0x0123"),
1988                        number: 1,
1989                        parent_hash: Bytes::from("0x0000"),
1990                        chain: Chain::Ethereum,
1991                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
1992                            .unwrap()
1993                            .naive_utc(),
1994                    },
1995                    revert: false,
1996                    // Add a new component to trigger snapshot request
1997                    new_protocol_components: [(
1998                        "new_component".to_string(),
1999                        ProtocolComponent {
2000                            id: "new_component".to_string(),
2001                            protocol_system: "test_protocol".to_string(),
2002                            protocol_type_name: "test".to_string(),
2003                            chain: Chain::Ethereum,
2004                            tokens: vec![Bytes::from("0x0badc0ffee")],
2005                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
2006                            static_attributes: Default::default(),
2007                            creation_tx: Default::default(),
2008                            created_at: Default::default(),
2009                            change: Default::default(),
2010                        },
2011                    )]
2012                    .into_iter()
2013                    .collect(),
2014                    component_tvl: [("new_component".to_string(), 100.0)]
2015                        .into_iter()
2016                        .collect(),
2017                    ..Default::default()
2018                };
2019
2020                tokio::spawn(async move {
2021                    let _ = tx.send(delta).await;
2022                    // Close the channel after sending one message
2023                });
2024
2025                Ok((Uuid::default(), rx))
2026            });
2027
2028        // Expect unsubscribe call during cleanup
2029        deltas_client
2030            .expect_unsubscribe()
2031            .return_once(|_| Ok(()));
2032
2033        let mut state_sync = ProtocolStateSynchronizer::new(
2034            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2035            true,
2036            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
2037            1,
2038            Duration::from_secs(0),
2039            true,
2040            false,
2041            true,
2042            ArcRPCClient(Arc::new(rpc_client)),
2043            ArcDeltasClient(Arc::new(deltas_client)),
2044            5000_u64,
2045        );
2046
2047        state_sync
2048            .initialize()
2049            .await
2050            .expect("Init should succeed");
2051
2052        // Before calling state_sync, set a value in last_synced_block
2053        state_sync.last_synced_block = Some(BlockHeader {
2054            hash: Bytes::from("0x0badc0ffee"),
2055            number: 42,
2056            parent_hash: Bytes::from("0xbadbeef0"),
2057            revert: false,
2058            timestamp: 123456789,
2059        });
2060
2061        // Create a channel for state_sync to send messages to
2062        let (mut block_tx, _block_rx) = channel(10);
2063
2064        // Call state_sync directly - this should error during processing
2065        let (_end_tx, end_rx) = oneshot::channel::<()>();
2066        let result = state_sync
2067            .state_sync(&mut block_tx, end_rx)
2068            .await;
2069        // Verify that state_sync returned an error
2070        assert!(result.is_err(), "state_sync should have errored during processing");
2071
2072        // Note: We can't verify internal state cleanup since state_sync consumes self,
2073        // but the cleanup logic is still tested by the fact that the method returns properly.
2074    }
2075
2076    #[test(tokio::test)]
2077    async fn test_close_signal_while_waiting_for_first_deltas() {
2078        // Tests close signal handling during the initial "waiting for deltas" phase.
2079        // This is the earliest possible close scenario - before any deltas are received.
2080        // Verifies: close signal received while waiting for first message triggers cleanup
2081        let mut rpc_client = make_mock_client();
2082        let mut deltas_client = MockDeltasClient::new();
2083
2084        rpc_client
2085            .expect_get_protocol_components()
2086            .returning(|_| {
2087                Ok(ProtocolComponentRequestResponse {
2088                    protocol_components: vec![],
2089                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2090                })
2091            });
2092
2093        let (_tx, rx) = channel(1);
2094        deltas_client
2095            .expect_subscribe()
2096            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2097
2098        deltas_client
2099            .expect_unsubscribe()
2100            .return_once(|_| Ok(()));
2101
2102        let mut state_sync = ProtocolStateSynchronizer::new(
2103            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2104            true,
2105            ComponentFilter::with_tvl_range(0.0, 0.0),
2106            1,
2107            Duration::from_secs(0),
2108            true,
2109            true,
2110            false,
2111            ArcRPCClient(Arc::new(rpc_client)),
2112            ArcDeltasClient(Arc::new(deltas_client)),
2113            10000_u64,
2114        );
2115
2116        state_sync
2117            .initialize()
2118            .await
2119            .expect("Init should succeed");
2120
2121        let (mut block_tx, _block_rx) = channel(10);
2122        let (end_tx, end_rx) = oneshot::channel::<()>();
2123
2124        // Start state_sync in a task
2125        let state_sync_handle = tokio::spawn(async move {
2126            state_sync
2127                .state_sync(&mut block_tx, end_rx)
2128                .await
2129        });
2130
2131        // Give it a moment to start
2132        tokio::time::sleep(Duration::from_millis(100)).await;
2133
2134        // Send close signal
2135        let _ = end_tx.send(());
2136
2137        // state_sync should exit cleanly
2138        let result = state_sync_handle
2139            .await
2140            .expect("Task should not panic");
2141        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2142
2143        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2144    }
2145
2146    #[test(tokio::test)]
2147    async fn test_close_signal_during_main_processing_loop() {
2148        // Tests close signal handling during the main delta processing loop.
2149        // This tests the scenario where first message is processed successfully,
2150        // then close signal is received while waiting for subsequent deltas.
2151        // Verifies: close signal in main loop (after initialization) triggers cleanup
2152
2153        let mut rpc_client = make_mock_client();
2154        let mut deltas_client = MockDeltasClient::new();
2155
2156        // Mock the initial components call
2157        rpc_client
2158            .expect_get_protocol_components()
2159            .returning(|_| {
2160                Ok(ProtocolComponentRequestResponse {
2161                    protocol_components: vec![],
2162                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2163                })
2164            });
2165
2166        // Mock the snapshot retrieval that happens after first message
2167        rpc_client
2168            .expect_get_protocol_states()
2169            .returning(|_| {
2170                Ok(ProtocolStateRequestResponse {
2171                    states: vec![],
2172                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2173                })
2174            });
2175
2176        rpc_client
2177            .expect_get_component_tvl()
2178            .returning(|_| {
2179                Ok(ComponentTvlRequestResponse {
2180                    tvl: HashMap::new(),
2181                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2182                })
2183            });
2184
2185        rpc_client
2186            .expect_get_traced_entry_points()
2187            .returning(|_| {
2188                Ok(TracedEntryPointRequestResponse {
2189                    traced_entry_points: HashMap::new(),
2190                    pagination: PaginationResponse::new(0, 20, 0),
2191                })
2192            });
2193
2194        // Set up deltas client to send one message, then keep channel open
2195        let (tx, rx) = channel(10);
2196        deltas_client
2197            .expect_subscribe()
2198            .return_once(move |_, _| {
2199                // Send first message immediately
2200                let first_delta = BlockChanges {
2201                    extractor: "test".to_string(),
2202                    chain: Chain::Ethereum,
2203                    block: Block {
2204                        hash: Bytes::from("0x0123"),
2205                        number: 1,
2206                        parent_hash: Bytes::from("0x0000"),
2207                        chain: Chain::Ethereum,
2208                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2209                            .unwrap()
2210                            .naive_utc(),
2211                    },
2212                    revert: false,
2213                    ..Default::default()
2214                };
2215
2216                tokio::spawn(async move {
2217                    let _ = tx.send(first_delta).await;
2218                    // Keep the sender alive but don't send more messages
2219                    // This will make the recv() block waiting for the next message
2220                    tokio::time::sleep(Duration::from_secs(30)).await;
2221                });
2222
2223                Ok((Uuid::default(), rx))
2224            });
2225
2226        deltas_client
2227            .expect_unsubscribe()
2228            .return_once(|_| Ok(()));
2229
2230        let mut state_sync = ProtocolStateSynchronizer::new(
2231            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2232            true,
2233            ComponentFilter::with_tvl_range(0.0, 1000.0),
2234            1,
2235            Duration::from_secs(0),
2236            true,
2237            false,
2238            true,
2239            ArcRPCClient(Arc::new(rpc_client)),
2240            ArcDeltasClient(Arc::new(deltas_client)),
2241            10000_u64,
2242        );
2243
2244        state_sync
2245            .initialize()
2246            .await
2247            .expect("Init should succeed");
2248
2249        let (mut block_tx, mut block_rx) = channel(10);
2250        let (end_tx, end_rx) = oneshot::channel::<()>();
2251
2252        // Start state_sync in a task
2253        let state_sync_handle = tokio::spawn(async move {
2254            state_sync
2255                .state_sync(&mut block_tx, end_rx)
2256                .await
2257        });
2258
2259        // Wait for the first message to be processed (snapshot sent)
2260        let first_snapshot = block_rx
2261            .recv()
2262            .await
2263            .expect("Should receive first snapshot")
2264            .expect("Synchronizer error");
2265        assert!(
2266            !first_snapshot
2267                .snapshots
2268                .states
2269                .is_empty() ||
2270                first_snapshot.deltas.is_some()
2271        );
2272        // Now send close signal - this should be handled in the main processing loop
2273        let _ = end_tx.send(());
2274
2275        // state_sync should exit cleanly after receiving close signal in main loop
2276        let result = state_sync_handle
2277            .await
2278            .expect("Task should not panic");
2279        assert!(
2280            result.is_ok(),
2281            "state_sync should exit cleanly when closed after first message: {result:?}"
2282        );
2283    }
2284
2285    #[test(tokio::test)]
2286    async fn test_max_retries_exceeded_error_propagation() {
2287        // Test that when max_retries is exceeded, the final error is sent through the channel
2288        // to the receiver and the synchronizer task exits cleanly
2289
2290        let mut rpc_client = make_mock_client();
2291        let mut deltas_client = MockDeltasClient::new();
2292
2293        // Mock the initial components call to succeed
2294        rpc_client
2295            .expect_get_protocol_components()
2296            .returning(|_| {
2297                Ok(ProtocolComponentRequestResponse {
2298                    protocol_components: vec![],
2299                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2300                })
2301            });
2302
2303        // Set up deltas client to consistently fail after subscription
2304        // This will cause connection errors and trigger retries
2305        deltas_client
2306            .expect_subscribe()
2307            .returning(|_, _| {
2308                // Return a connection error to trigger retries
2309                Err(DeltasError::NotConnected)
2310            });
2311
2312        // Expect multiple unsubscribe calls during retries
2313        deltas_client
2314            .expect_unsubscribe()
2315            .returning(|_| Ok(()))
2316            .times(0..=5);
2317
2318        // Create synchronizer with only 2 retries and short cooldown
2319        let mut state_sync = ProtocolStateSynchronizer::new(
2320            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2321            true,
2322            ComponentFilter::with_tvl_range(0.0, 1000.0),
2323            2,                         // max_retries = 2
2324            Duration::from_millis(10), // short retry cooldown
2325            true,
2326            false,
2327            true,
2328            ArcRPCClient(Arc::new(rpc_client)),
2329            ArcDeltasClient(Arc::new(deltas_client)),
2330            1000_u64,
2331        );
2332
2333        state_sync
2334            .initialize()
2335            .await
2336            .expect("Init should succeed");
2337
2338        // Start the synchronizer - it should fail to subscribe and retry
2339        let (handle, mut rx) = state_sync.start().await;
2340        let (jh, _close_tx) = handle.split();
2341
2342        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2343            .await
2344            .expect("responsds in time")
2345            .expect("channel open");
2346
2347        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2348        if let Err(err) = res {
2349            assert!(
2350                matches!(err, SynchronizerError::ConnectionClosed),
2351                "Expected ConnectionClosed error, got: {:?}",
2352                err
2353            );
2354        } else {
2355            panic!("Expected an error")
2356        }
2357
2358        // The task should complete (not hang) after max retries
2359        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2360        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2361    }
2362
2363    #[test(tokio::test)]
2364    async fn test_is_next_expected() {
2365        // Test the is_next_expected function to ensure it correctly identifies
2366        // when an incoming block is the expected next block in the chain
2367
2368        let mut state_sync = with_mocked_clients(true, false, None, None);
2369
2370        // Test 1: No previous block - should return false
2371        let incoming_header = BlockHeader {
2372            number: 100,
2373            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2374            parent_hash: Bytes::from(
2375                "0x0000000000000000000000000000000000000000000000000000000000000000",
2376            ),
2377            revert: false,
2378            timestamp: 123456789,
2379        };
2380        assert!(
2381            !state_sync.is_next_expected(&incoming_header),
2382            "Should return false when no previous block is set"
2383        );
2384
2385        // Test 2: Set a previous block and test with matching parent hash
2386        let previous_header = BlockHeader {
2387            number: 99,
2388            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2389            parent_hash: Bytes::from(
2390                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2391            ),
2392            revert: false,
2393            timestamp: 123456788,
2394        };
2395        state_sync.last_synced_block = Some(previous_header.clone());
2396
2397        assert!(
2398            state_sync.is_next_expected(&incoming_header),
2399            "Should return true when incoming parent_hash matches previous hash"
2400        );
2401
2402        // Test 3: Test with non-matching parent hash
2403        let non_matching_header = BlockHeader {
2404            number: 100,
2405            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2406            parent_hash: Bytes::from(
2407                "0x1111111111111111111111111111111111111111111111111111111111111111",
2408            ), // Wrong parent hash
2409            revert: false,
2410            timestamp: 123456789,
2411        };
2412        assert!(
2413            !state_sync.is_next_expected(&non_matching_header),
2414            "Should return false when incoming parent_hash doesn't match previous hash"
2415        );
2416    }
2417
2418    #[test(tokio::test)]
2419    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2420        // Test that on synchronizer restart with the next expected block,
2421        // get_snapshot is not called and only deltas are sent
2422
2423        let mut rpc_client = make_mock_client();
2424        let mut deltas_client = MockDeltasClient::new();
2425
2426        // Mock the initial components call
2427        rpc_client
2428            .expect_get_protocol_components()
2429            .returning(|_| {
2430                Ok(ProtocolComponentRequestResponse {
2431                    protocol_components: vec![ProtocolComponent {
2432                        id: "Component1".to_string(),
2433                        ..Default::default()
2434                    }],
2435                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2436                })
2437            });
2438
2439        // Set up deltas client to send a message that is the next expected block
2440        let (tx, rx) = channel(10);
2441        deltas_client
2442            .expect_subscribe()
2443            .return_once(move |_, _| {
2444                let expected_next_delta = BlockChanges {
2445                    extractor: "uniswap-v2".to_string(),
2446                    chain: Chain::Ethereum,
2447                    block: Block {
2448                        hash: Bytes::from(
2449                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2450                        ), // This will be the next expected block
2451                        number: 2,
2452                        parent_hash: Bytes::from(
2453                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2454                        ), // This matches our last synced block hash
2455                        chain: Chain::Ethereum,
2456                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2457                            .unwrap()
2458                            .naive_utc(),
2459                    },
2460                    revert: false,
2461                    ..Default::default()
2462                };
2463
2464                tokio::spawn(async move {
2465                    let _ = tx.send(expected_next_delta).await;
2466                });
2467
2468                Ok((Uuid::default(), rx))
2469            });
2470
2471        deltas_client
2472            .expect_unsubscribe()
2473            .return_once(|_| Ok(()));
2474
2475        let mut state_sync = ProtocolStateSynchronizer::new(
2476            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2477            true,
2478            ComponentFilter::with_tvl_range(0.0, 1000.0),
2479            1,
2480            Duration::from_secs(0),
2481            true, // include_snapshots = true
2482            false,
2483            true,
2484            ArcRPCClient(Arc::new(rpc_client)),
2485            ArcDeltasClient(Arc::new(deltas_client)),
2486            10000_u64,
2487        );
2488
2489        // Initialize and set up the last synced block to simulate a restart scenario
2490        state_sync
2491            .initialize()
2492            .await
2493            .expect("Init should succeed");
2494
2495        // Set last_synced_block to simulate that we've previously synced block 1
2496        state_sync.last_synced_block = Some(BlockHeader {
2497            number: 1,
2498            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2499            parent_hash: Bytes::from(
2500                "0x0000000000000000000000000000000000000000000000000000000000000000",
2501            ),
2502            revert: false,
2503            timestamp: 123456789,
2504        });
2505
2506        let (mut block_tx, mut block_rx) = channel(10);
2507        let (end_tx, end_rx) = oneshot::channel::<()>();
2508
2509        // Start state_sync
2510        let state_sync_handle = tokio::spawn(async move {
2511            state_sync
2512                .state_sync(&mut block_tx, end_rx)
2513                .await
2514        });
2515
2516        // Wait for the message - it should be a delta-only message (no snapshots)
2517        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2518            .await
2519            .expect("Should receive message within timeout")
2520            .expect("Channel should be open")
2521            .expect("Should not be an error");
2522
2523        // Send close signal
2524        let _ = end_tx.send(());
2525
2526        // Wait for state_sync to finish
2527        let _ = state_sync_handle
2528            .await
2529            .expect("Task should not panic");
2530
2531        // Verify the message contains deltas but no snapshots
2532        // (because we skipped snapshot retrieval)
2533        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2534        assert!(
2535            result_msg.snapshots.states.is_empty(),
2536            "Should not contain snapshots when next expected block is received"
2537        );
2538
2539        // Verify the block details match our expected next block
2540        if let Some(deltas) = &result_msg.deltas {
2541            assert_eq!(deltas.block.number, 2);
2542            assert_eq!(
2543                deltas.block.hash,
2544                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2545            );
2546            assert_eq!(
2547                deltas.block.parent_hash,
2548                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2549            );
2550        }
2551    }
2552
2553    #[test(tokio::test)]
2554    async fn test_skip_previously_processed_messages() {
2555        // Test that the synchronizer skips messages for blocks that have already been processed
2556        // This simulates a service restart scenario where old messages are re-emitted
2557
2558        let mut rpc_client = make_mock_client();
2559        let mut deltas_client = MockDeltasClient::new();
2560
2561        // Mock the initial components call
2562        rpc_client
2563            .expect_get_protocol_components()
2564            .returning(|_| {
2565                Ok(ProtocolComponentRequestResponse {
2566                    protocol_components: vec![ProtocolComponent {
2567                        id: "Component1".to_string(),
2568                        ..Default::default()
2569                    }],
2570                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2571                })
2572            });
2573
2574        // Mock snapshot calls for when we process the expected next block (block 6)
2575        rpc_client
2576            .expect_get_protocol_states()
2577            .returning(|_| {
2578                Ok(ProtocolStateRequestResponse {
2579                    states: vec![ResponseProtocolState {
2580                        component_id: "Component1".to_string(),
2581                        ..Default::default()
2582                    }],
2583                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2584                })
2585            });
2586
2587        rpc_client
2588            .expect_get_component_tvl()
2589            .returning(|_| {
2590                Ok(ComponentTvlRequestResponse {
2591                    tvl: [("Component1".to_string(), 100.0)]
2592                        .into_iter()
2593                        .collect(),
2594                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2595                })
2596            });
2597
2598        rpc_client
2599            .expect_get_traced_entry_points()
2600            .returning(|_| {
2601                Ok(TracedEntryPointRequestResponse {
2602                    traced_entry_points: HashMap::new(),
2603                    pagination: PaginationResponse::new(0, 20, 0),
2604                })
2605            });
2606
2607        // Set up deltas client to send old messages first, then the expected next block
2608        let (tx, rx) = channel(10);
2609        deltas_client
2610            .expect_subscribe()
2611            .return_once(move |_, _| {
2612                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2613                let old_messages = vec![
2614                    BlockChanges {
2615                        extractor: "uniswap-v2".to_string(),
2616                        chain: Chain::Ethereum,
2617                        block: Block {
2618                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2619                            number: 3,
2620                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2621                            chain: Chain::Ethereum,
2622                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2623                        },
2624                        revert: false,
2625                        ..Default::default()
2626                    },
2627                    BlockChanges {
2628                        extractor: "uniswap-v2".to_string(),
2629                        chain: Chain::Ethereum,
2630                        block: Block {
2631                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2632                            number: 4,
2633                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2634                            chain: Chain::Ethereum,
2635                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2636                        },
2637                        revert: false,
2638                        ..Default::default()
2639                    },
2640                    BlockChanges {
2641                        extractor: "uniswap-v2".to_string(),
2642                        chain: Chain::Ethereum,
2643                        block: Block {
2644                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2645                            number: 5,
2646                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2647                            chain: Chain::Ethereum,
2648                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2649                        },
2650                        revert: false,
2651                        ..Default::default()
2652                    },
2653                    // This is the expected next block (block 6)
2654                    BlockChanges {
2655                        extractor: "uniswap-v2".to_string(),
2656                        chain: Chain::Ethereum,
2657                        block: Block {
2658                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2659                            number: 6,
2660                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2661                            chain: Chain::Ethereum,
2662                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2663                        },
2664                        revert: false,
2665                        ..Default::default()
2666                    },
2667                ];
2668
2669                tokio::spawn(async move {
2670                    for message in old_messages {
2671                        let _ = tx.send(message).await;
2672                        tokio::time::sleep(Duration::from_millis(10)).await;
2673                    }
2674                });
2675
2676                Ok((Uuid::default(), rx))
2677            });
2678
2679        deltas_client
2680            .expect_unsubscribe()
2681            .return_once(|_| Ok(()));
2682
2683        let mut state_sync = ProtocolStateSynchronizer::new(
2684            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2685            true,
2686            ComponentFilter::with_tvl_range(0.0, 1000.0),
2687            1,
2688            Duration::from_secs(0),
2689            true,
2690            true,
2691            true,
2692            ArcRPCClient(Arc::new(rpc_client)),
2693            ArcDeltasClient(Arc::new(deltas_client)),
2694            10000_u64,
2695        );
2696
2697        // Initialize and set last_synced_block to simulate we've already processed block 5
2698        state_sync
2699            .initialize()
2700            .await
2701            .expect("Init should succeed");
2702
2703        state_sync.last_synced_block = Some(BlockHeader {
2704            number: 5,
2705            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2706            parent_hash: Bytes::from(
2707                "0x0000000000000000000000000000000000000000000000000000000000000004",
2708            ),
2709            revert: false,
2710            timestamp: 1234567892,
2711        });
2712
2713        let (mut block_tx, mut block_rx) = channel(10);
2714        let (end_tx, end_rx) = oneshot::channel::<()>();
2715
2716        // Start state_sync
2717        let state_sync_handle = tokio::spawn(async move {
2718            state_sync
2719                .state_sync(&mut block_tx, end_rx)
2720                .await
2721        });
2722
2723        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2724        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2725            .await
2726            .expect("Should receive message within timeout")
2727            .expect("Channel should be open")
2728            .expect("Should not be an error");
2729
2730        // Send close signal
2731        let _ = end_tx.send(());
2732
2733        // Wait for state_sync to finish
2734        let _ = state_sync_handle
2735            .await
2736            .expect("Task should not panic");
2737
2738        // Verify we only got the message for block 6 (the expected next block)
2739        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2740        if let Some(deltas) = &result_msg.deltas {
2741            assert_eq!(
2742                deltas.block.number, 6,
2743                "Should only process block 6, skipping earlier blocks"
2744            );
2745            assert_eq!(
2746                deltas.block.hash,
2747                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2748            );
2749        }
2750
2751        // Verify that no additional messages are received immediately
2752        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2753        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2754            Err(_) => {
2755                // Timeout is expected - no more messages should come
2756            }
2757            Ok(Some(Err(_))) => {
2758                // Error received is also acceptable (connection closed)
2759            }
2760            Ok(Some(Ok(_))) => {
2761                panic!("Should not receive additional messages - old blocks should be skipped");
2762            }
2763            Ok(None) => {
2764                // Channel closed is also acceptable
2765            }
2766        }
2767    }
2768}